HDFS-12593. Ozone: update Ratis to the latest snapshot. Contributed by Tsz Wo Nicholas Sze.

This commit is contained in:
Mukul Kumar Singh 2017-10-11 17:35:17 +05:30 committed by Owen O'Malley
parent 5ed663b32f
commit c85d3b2d25
5 changed files with 54 additions and 33 deletions

View File

@ -20,12 +20,12 @@ package org.apache.hadoop.scm;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.ratis.RatisHelper;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
@ -37,11 +37,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* An abstract implementation of {@link XceiverClientSpi} using Ratis.
@ -75,8 +77,9 @@ public final class XceiverClientRatis extends XceiverClientSpi {
*/
public void createPipeline(String clusterId, List<DatanodeID> datanodes)
throws IOException {
final RaftPeer[] newPeers = datanodes.stream().map(RatisHelper::toRaftPeer)
.toArray(RaftPeer[]::new);
final List<RaftPeer> newPeers = datanodes.stream()
.map(RatisHelper::toRaftPeer)
.collect(Collectors.toList());
LOG.debug("initializing pipeline:{} with nodes:{}", clusterId, newPeers);
reinitialize(datanodes, newPeers);
}
@ -90,7 +93,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
return OzoneProtos.ReplicationType.RATIS;
}
private void reinitialize(List<DatanodeID> datanodes, RaftPeer[] newPeers)
private void reinitialize(
List<DatanodeID> datanodes, Collection<RaftPeer> newPeers)
throws IOException {
if (datanodes.isEmpty()) {
return;
@ -120,11 +124,11 @@ public final class XceiverClientRatis extends XceiverClientSpi {
* @param newPeers - Raft machines
* @throws IOException - on Failure.
*/
private void reinitialize(DatanodeID datanode, RaftPeer[] newPeers)
private void reinitialize(DatanodeID datanode, Collection<RaftPeer> newPeers)
throws IOException {
final RaftPeer p = RatisHelper.toRaftPeer(datanode);
try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) {
client.reinitialize(newPeers, p.getId());
client.reinitialize(RatisHelper.newRaftGroup(newPeers), p.getId());
} catch (IOException ioe) {
LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ",
p, datanode, ioe);

View File

@ -23,15 +23,15 @@ import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.client.ClientFactory;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.*;
import java.util.stream.Collectors;
/**
@ -41,6 +41,10 @@ public interface RatisHelper {
Logger LOG = LoggerFactory.getLogger(RatisHelper.class);
static String toRaftPeerIdString(DatanodeID id) {
return id.getIpAddr() + "_" + id.getRatisPort();
}
static String toRaftPeerAddressString(DatanodeID id) {
return id.getIpAddr() + ":" + id.getRatisPort();
}
@ -48,44 +52,59 @@ public interface RatisHelper {
return RaftPeerId.valueOf(toRaftPeerIdString(id));
}
static RaftPeer toRaftPeer(String id) {
return new RaftPeer(RaftPeerId.valueOf(id), id);
}
static RaftPeer toRaftPeer(DatanodeID id) {
return toRaftPeer(toRaftPeerIdString(id));
return new RaftPeer(toRaftPeerId(id), toRaftPeerAddressString(id));
}
static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
return pipeline.getMachines().stream()
.map(RatisHelper::toRaftPeer)
return toRaftPeers(pipeline.getMachines());
}
static <E extends DatanodeID> List<RaftPeer> toRaftPeers(List<E> datanodes) {
return datanodes.stream().map(RatisHelper::toRaftPeer)
.collect(Collectors.toList());
}
static RaftPeer[] toRaftPeerArray(Pipeline pipeline) {
return toRaftPeers(pipeline).toArray(RaftPeer.EMPTY_PEERS);
/* TODO: use a dummy id for all groups for the moment.
* It should be changed to a unique id for each group.
*/
RaftGroupId DUMMY_GROUP_ID = RaftGroupId.randomId();
RaftGroup EMPTY_GROUP = new RaftGroup(DUMMY_GROUP_ID,
Collections.emptyList());
static RaftGroup emptyRaftGroup() {
return EMPTY_GROUP;
}
static RaftGroup newRaftGroup(Collection<RaftPeer> peers) {
return peers.isEmpty()? emptyRaftGroup()
: new RaftGroup(DUMMY_GROUP_ID, peers);
}
static RaftGroup newRaftGroup(Pipeline pipeline) {
return newRaftGroup(toRaftPeers(pipeline));
}
static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) {
return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()),
toRaftPeers(pipeline));
newRaftGroup(pipeline));
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) {
return newRaftClient(rpcType, leader.getId(),
new ArrayList<>(Arrays.asList(leader)));
newRaftGroup(new ArrayList<>(Arrays.asList(leader))));
}
static RaftClient newRaftClient(
RpcType rpcType, RaftPeerId leader, List<RaftPeer> peers) {
LOG.trace("newRaftClient: {}, leader={}, peers={}", rpcType, leader, peers);
RpcType rpcType, RaftPeerId leader, RaftGroup group) {
LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);
final RaftProperties properties = new RaftProperties();
final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(
properties, null));
final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(null));
return RaftClient.newBuilder()
.setClientRpc(factory.newRaftClientRpc())
.setServers(peers)
.setRaftGroup(group)
.setLeaderId(leader)
.setProperties(properties)
.build();

View File

@ -80,7 +80,7 @@ public class ContainerStateMachine extends BaseStateMachine {
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
final SMLogEntryProto logEntry = trx.getSMLogEntry().get();
final SMLogEntryProto logEntry = trx.getSMLogEntry();
return dispatch(ShadedProtoUtil.asByteString(logEntry.getData()),
response ->
() -> ShadedProtoUtil.asShadedByteString(response.toByteArray())

View File

@ -45,7 +45,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Objects;
/**
@ -64,7 +63,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
this.server = RaftServer.newBuilder()
.setServerId(RatisHelper.toRaftPeerId(id))
.setPeers(Collections.emptyList())
.setGroup(RatisHelper.emptyRaftGroup())
.setProperties(newRaftProperties(rpcType, port, storageDir))
.setStateMachine(new ContainerStateMachine(dispatcher))
.build();
@ -73,7 +72,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
static RaftProperties newRaftProperties(
RpcType rpc, int port, String storageDir) {
final RaftProperties properties = new RaftProperties();
RaftServerConfigKeys.setStorageDir(properties, storageDir);
RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
RaftConfigKeys.Rpc.setType(properties, rpc);
if (rpc == SupportedRpcType.GRPC) {
GrpcConfigKeys.Server.setPort(properties, port);

View File

@ -129,9 +129,8 @@ public class TestContainerServer {
static void initXceiverServerRatis(
RpcType rpc, DatanodeID id, Pipeline pipeline) throws IOException {
final RaftPeer p = RatisHelper.toRaftPeer(id);
final RaftPeer[] peers = RatisHelper.toRaftPeerArray(pipeline);
final RaftClient client = RatisHelper.newRaftClient(rpc, p);
client.reinitialize(peers, p.getId());
client.reinitialize(RatisHelper.newRaftGroup(pipeline), p.getId());
}