diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java index 738a588510f..a0ad24eb076 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java @@ -22,13 +22,10 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.ratis.RatisHelper; import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.ratis.client.ClientFactory; import org.apache.ratis.client.RaftClient; -import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; @@ -36,10 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; /** * An abstract implementation of {@link XceiverClientSpi} using Ratis. @@ -67,24 +62,6 @@ public final class XceiverClientRatis implements XceiverClientSpi { this.rpcType = rpcType; } - static RaftClient newRaftClient(Pipeline pipeline, RpcType rpcType) { - final List peers = pipeline.getMachines().stream() - .map(dn -> dn.getXferAddr()) - .map(addr -> new RaftPeer(new RaftPeerId(addr), addr)) - .collect(Collectors.toList()); - - final RaftProperties properties = new RaftProperties(); - final ClientFactory factory = ClientFactory.cast(rpcType.newFactory( - properties, null)); - - return RaftClient.newBuilder() - .setClientRpc(factory.newRaftClientRpc()) - .setServers(peers) - .setLeaderId(new RaftPeerId(pipeline.getLeader().getXferAddr())) - .setProperties(properties) - .build(); - } - @Override public Pipeline getPipeline() { return pipeline; @@ -92,7 +69,8 @@ public final class XceiverClientRatis implements XceiverClientSpi { @Override public void connect() throws Exception { - if (!client.compareAndSet(null, newRaftClient(pipeline, rpcType))) { + if (!client.compareAndSet(null, + RatisHelper.newRaftClient(rpcType, getPipeline()))) { throw new IllegalStateException("Client is already connected."); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java new file mode 100644 index 00000000000..bedd9a8d391 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis; + +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.ratis.client.ClientFactory; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.rpc.RpcType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Ratis helper methods. + */ +public interface RatisHelper { + Logger LOG = LoggerFactory.getLogger(RatisHelper.class); + + static String toRaftPeerIdString(DatanodeID id) { + return id.getIpAddr() + ":" + id.getContainerPort(); + } + + static RaftPeerId toRaftPeerId(DatanodeID id) { + return RaftPeerId.valueOf(toRaftPeerIdString(id)); + } + + static RaftPeer toRaftPeer(String id) { + return new RaftPeer(RaftPeerId.valueOf(id), id); + } + + static RaftPeer toRaftPeer(DatanodeID id) { + return toRaftPeer(toRaftPeerIdString(id)); + } + + static List toRaftPeers(Pipeline pipeline) { + return pipeline.getMachines().stream() + .map(RatisHelper::toRaftPeer) + .collect(Collectors.toList()); + } + + static RaftPeer[] toRaftPeerArray(Pipeline pipeline) { + return toRaftPeers(pipeline).toArray(RaftPeer.EMPTY_PEERS); + } + + static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) { + return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()), + toRaftPeers(pipeline)); + } + + static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) { + return newRaftClient(rpcType, leader.getId(), + new ArrayList<>(Arrays.asList(leader))); + } + + static RaftClient newRaftClient( + RpcType rpcType, RaftPeerId leader, List peers) { + LOG.trace("newRaftClient: {}, leader={}, peers={}", rpcType, leader, peers); + final RaftProperties properties = new RaftProperties(); + final ClientFactory factory = ClientFactory.cast(rpcType.newFactory( + properties, null)); + + return RaftClient.newBuilder() + .setClientRpc(factory.newRaftClientRpc()) + .setServers(peers) + .setLeaderId(leader) + .setProperties(properties) + .build(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java new file mode 100644 index 00000000000..c13c20c6060 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +/** + * This package contains classes related to Apache Ratis. + */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java new file mode 100644 index 00000000000..032dd9634b2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.shaded.com.google.protobuf; + +/** + * This package contains classes related to the shaded protobuf in Apache Ratis. + */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index feca6202f4f..74890541b1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -83,10 +83,9 @@ public final class OzoneConfigKeys { = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY; public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT; - public static final String DFS_CONTAINER_RATIS_CONF = - "dfs.container.ratis.conf"; - public static final String DFS_CONTAINER_RATIS_DATANODE_ADDRESS = - "dfs.container.ratis.datanode.address"; + /** A unique ID to identify a Ratis server. */ + public static final String DFS_CONTAINER_RATIS_SERVER_ID = + "dfs.container.ratis.server.id"; public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR = "dfs.container.ratis.datanode.storage.dir"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 4c82ac2db25..69f3801540a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -18,9 +18,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; -import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; @@ -28,23 +26,25 @@ import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.netty.NettyConfigKeys; -import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; +import java.util.Collections; +import java.util.Objects; /** * Creates a ratis server endpoint that acts as the communication layer for * Ozone containers. */ public final class XceiverServerRatis implements XceiverServerSpi { + static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class); + static RaftProperties newRaftProperties( RpcType rpc, int port, String storageDir) { final RaftProperties properties = new RaftProperties(); @@ -62,37 +62,31 @@ public final class XceiverServerRatis implements XceiverServerSpi { Configuration ozoneConf, ContainerDispatcher dispatcher) throws IOException { final String id = ozoneConf.get( - OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS); - final Collection servers = ozoneConf.getStringCollection( - OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF); + OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID); + final int port = ozoneConf.getInt( + OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, + OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); final String storageDir = ozoneConf.get( OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR); final String rpcType = ozoneConf.get( OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType); - return new XceiverServerRatis(id, servers, storageDir, dispatcher, rpc); + return new XceiverServerRatis(id, port, storageDir, dispatcher, rpc); } private final int port; private final RaftServer server; private XceiverServerRatis( - String id, Collection servers, String storageDir, + String id, int port, String storageDir, ContainerDispatcher dispatcher, RpcType rpcType) throws IOException { - Preconditions.checkArgument(servers.contains(id), - "%s is not one of %s specified in %s", - id, servers, OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF); - - final List peers = servers.stream() - .map(addr -> new RaftPeer(new RaftPeerId(addr), addr)) - .collect(Collectors.toList()); - - this.port = NetUtils.createSocketAddr(id).getPort(); + Objects.requireNonNull(id, "id == null"); + this.port = port; this.server = RaftServer.newBuilder() - .setServerId(new RaftPeerId(id)) - .setPeers(peers) + .setServerId(RaftPeerId.valueOf(id)) + .setPeers(Collections.emptyList()) .setProperties(newRaftProperties(rpcType, port, storageDir)) .setStateMachine(new ContainerStateMachine(dispatcher)) .build(); @@ -100,6 +94,8 @@ public final class XceiverServerRatis implements XceiverServerSpi { @Override public void start() throws IOException { + LOG.info("Starting {} {} at port {}", getClass().getSimpleName(), + server.getId(), getIPCPort()); server.start(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java new file mode 100644 index 00000000000..8debfe02837 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.transport.server.ratis; + +/** + * This package contains classes for the server implementation + * using Apache Ratis + */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 173b9116bc6..5cfcaffa5bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.ksm.KSMConfigKeys; import org.apache.hadoop.ozone.ksm.KeySpaceManager; import org.apache.hadoop.scm.ScmConfigKeys; @@ -102,12 +103,11 @@ public final class MiniOzoneCluster extends MiniDFSCluster if (!useRatis) { return; } - final String[] ids = dnConf.getStrings( - OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF); - // TODO: use the i-th raft server as the i-th datanode address - // this only work for one Raft cluster - setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS, - ids[i]); + final String address = ContainerTestHelper.createLocalAddress(); + setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID, + address); + setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, + String.valueOf(NetUtils.createSocketAddr(address).getPort())); setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, getInstanceStorageDir(i, -1).getCanonicalPath()); } @@ -206,16 +206,13 @@ public final class MiniOzoneCluster extends MiniDFSCluster */ public void waitOzoneReady() throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(() -> { - if (scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY) - >= numDataNodes) { - return true; - } - LOG.info("Waiting for cluster to be ready. Got {} of {} DN Heartbeats.", - scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY), - numDataNodes); - - return false; - }, 1000, 5 * 60 * 1000); //wait for 5 mins. + final int healthy = scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY); + final boolean isReady = healthy >= numDataNodes; + LOG.info("{}. Got {} of {} DN Heartbeats.", + isReady? "Cluster is ready" : "Waiting for cluster to be ready", + healthy, numDataNodes); + return isReady; + }, 1000, 60 * 1000); //wait for 1 min. } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java index d56fad2bd5f..89664ebd47b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java @@ -26,7 +26,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.stream.Collectors; /** * Helpers for Ratis tests. @@ -34,23 +33,17 @@ import java.util.stream.Collectors; public interface RatisTestHelper { Logger LOG = LoggerFactory.getLogger(RatisTestHelper.class); - static void initRatisConf( - RpcType rpc, Pipeline pipeline, Configuration conf) { + static void initRatisConf(RpcType rpc, Configuration conf) { conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, true); conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, rpc.name()); LOG.info(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY + " = " + rpc.name()); - final String s = pipeline.getMachines().stream() - .map(dn -> dn.getXferAddr()) - .collect(Collectors.joining(",")); - conf.setStrings(OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF, s); - LOG.info(OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF + " = " + s); } static XceiverClientRatis newXceiverClientRatis( RpcType rpcType, Pipeline pipeline, OzoneConfiguration conf) throws IOException { - initRatisConf(rpcType, pipeline, conf); + initRatisConf(rpcType, conf); return XceiverClientRatis.newXceiverClientRatis(pipeline, conf); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index a1abfebc612..6db7621222d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -80,6 +80,11 @@ public final class ContainerTestHelper { return createPipeline(containerName, 1); } + public static String createLocalAddress() throws IOException { + try(ServerSocket s = new ServerSocket(0)) { + return "127.0.0.1:" + s.getLocalPort(); + } + } public static DatanodeID createDatanodeID() throws IOException { ServerSocket socket = new ServerSocket(0); int port = socket.getLocalPort(); @@ -100,13 +105,26 @@ public final class ContainerTestHelper { public static Pipeline createPipeline(String containerName, int numNodes) throws IOException { Preconditions.checkArgument(numNodes >= 1); - final DatanodeID leader = createDatanodeID(); - Pipeline pipeline = new Pipeline(leader.getDatanodeUuid()); + final List ids = new ArrayList<>(numNodes); + for(int i = 0; i < numNodes; i++) { + ids.add(createDatanodeID()); + } + return createPipeline(containerName, ids); + } + + public static Pipeline createPipeline( + String containerName, Iterable ids) + throws IOException { + Objects.requireNonNull(ids, "ids == null"); + final Iterator i = ids.iterator(); + Preconditions.checkArgument(i.hasNext()); + final DatanodeID leader = i.next(); + final Pipeline pipeline = new Pipeline(leader.getDatanodeUuid()); pipeline.setContainerName(containerName); pipeline.addMember(leader); - for(int i = 1; i < numNodes; i++) { - pipeline.addMember(createDatanodeID()); + for(; i.hasNext();) { + pipeline.addMember(i.next()); } return pipeline; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java index 2662909ebad..3adb881da13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java @@ -18,23 +18,31 @@ package org.apache.hadoop.ozone.container.ozoneimpl; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.RatisTestHelper; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.ratis.RatisHelper; +import org.apache.hadoop.scm.XceiverClientRatis; import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.util.CheckedBiConsumer; +import org.apache.ratis.util.CollectionUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + /** * Tests ozone containers with Apache Ratis. */ @@ -78,19 +86,31 @@ public class TestOzoneContainerRatis { throws Exception { LOG.info(testName + "(rpc=" + rpc + ", numNodes=" + numNodes); + // create Ozone clusters final OzoneConfiguration conf = newOzoneConfiguration(); - final String containerName = OzoneUtils.getRequestID(); - final Pipeline pipeline = ContainerTestHelper.createPipeline( - containerName, numNodes); - final XceiverClientSpi client = RatisTestHelper.newXceiverClientRatis( - rpc, pipeline, conf); - + RatisTestHelper.initRatisConf(rpc, conf); final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf) .setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL) - .numDataNodes(pipeline.getMachines().size()) + .numDataNodes(numNodes) .build(); cluster.waitOzoneReady(); + final String containerName = OzoneUtils.getRequestID(); + final List datanodes = cluster.getDataNodes(); + final Pipeline pipeline = ContainerTestHelper.createPipeline(containerName, + CollectionUtils.as(datanodes, DataNode::getDatanodeId)); + + LOG.info("pipeline=" + pipeline); + // Create Ratis cluster + final RaftPeer[] peers = RatisHelper.toRaftPeerArray(pipeline); + for(RaftPeer p : peers) { + final RaftClient client = RatisHelper.newRaftClient(rpc, p); + client.reinitialize(peers, p.getId()); + } + + LOG.info("reinitialize done"); + final XceiverClientSpi client = XceiverClientRatis.newXceiverClientRatis( + pipeline, conf); try { test.accept(containerName, client); } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java index 5fc6a7cc4e3..ad64caeffd3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java @@ -40,7 +40,11 @@ import org.apache.hadoop.scm.XceiverClientRatis; import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.RatisHelper; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.util.CheckedBiConsumer; import org.junit.Assert; import org.junit.Test; @@ -92,7 +96,8 @@ public class TestContainerServer { (pipeline, conf) -> conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, pipeline.getLeader().getContainerPort()), XceiverClient::new, - (dn, conf) -> new XceiverServer(conf, new TestContainerDispatcher())); + (dn, conf) -> new XceiverServer(conf, new TestContainerDispatcher()), + (dn, p) -> {}); } @FunctionalInterface @@ -115,7 +120,8 @@ public class TestContainerServer { static XceiverServerRatis newXceiverServerRatis( DatanodeID dn, OzoneConfiguration conf) throws IOException { final String id = dn.getXferAddr(); - conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS, id); + conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID, id); + conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, dn.getContainerPort()); final String dir = TEST_DIR + id.replace(':', '_'); conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir); @@ -123,13 +129,22 @@ public class TestContainerServer { return XceiverServerRatis.newXceiverServerRatis(conf, dispatcher); } + static void initXceiverServerRatis( + RpcType rpc, DatanodeID id, Pipeline pipeline) throws IOException { + final RaftPeer p = RatisHelper.toRaftPeer(id); + final RaftPeer[] peers = RatisHelper.toRaftPeerArray(pipeline); + final RaftClient client = RatisHelper.newRaftClient(rpc, p); + client.reinitialize(peers, p.getId()); + } + + static void runTestClientServerRatis(RpcType rpc, int numNodes) throws Exception { runTestClientServer(numNodes, - (pipeline, conf) -> RatisTestHelper.initRatisConf( - rpc, pipeline, conf), + (pipeline, conf) -> RatisTestHelper.initRatisConf(rpc, conf), XceiverClientRatis::newXceiverClientRatis, - TestContainerServer::newXceiverServerRatis); + TestContainerServer::newXceiverServerRatis, + (dn, p) -> initXceiverServerRatis(rpc, dn, p)); } static void runTestClientServer( @@ -138,7 +153,8 @@ public class TestContainerServer { CheckedBiFunction createClient, CheckedBiFunction createServer) + IOException> createServer, + CheckedBiConsumer initServer) throws Exception { final List servers = new ArrayList<>(); XceiverClientSpi client = null; @@ -153,6 +169,7 @@ public class TestContainerServer { final XceiverServerSpi s = createServer.apply(dn, conf); servers.add(s); s.start(); + initServer.accept(dn, pipeline); } client = createClient.apply(pipeline, conf); diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index a5e7720e884..7d8460a81e5 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -99,7 +99,7 @@ 1.0.0-M33 - 0.1-SNAPSHOT + 0.1.1-alpha-SNAPSHOT 1.8