HDFS-11865. Ozone: Do not initialize Ratis cluster during datanode startup.

This commit is contained in:
Tsz-Wo Nicholas Sze 2017-05-28 15:19:32 +08:00 committed by Owen O'Malley
parent e3c8f6a24d
commit b71efcf1b0
12 changed files with 271 additions and 93 deletions

View File

@ -22,13 +22,10 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.ratis.RatisHelper;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.client.ClientFactory;
import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
@ -36,10 +33,8 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/** /**
* An abstract implementation of {@link XceiverClientSpi} using Ratis. * An abstract implementation of {@link XceiverClientSpi} using Ratis.
@ -67,24 +62,6 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType) {
this.rpcType = rpcType; this.rpcType = rpcType;
} }
static RaftClient newRaftClient(Pipeline pipeline, RpcType rpcType) {
final List<RaftPeer> peers = pipeline.getMachines().stream()
.map(dn -> dn.getXferAddr())
.map(addr -> new RaftPeer(new RaftPeerId(addr), addr))
.collect(Collectors.toList());
final RaftProperties properties = new RaftProperties();
final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(
properties, null));
return RaftClient.newBuilder()
.setClientRpc(factory.newRaftClientRpc())
.setServers(peers)
.setLeaderId(new RaftPeerId(pipeline.getLeader().getXferAddr()))
.setProperties(properties)
.build();
}
@Override @Override
public Pipeline getPipeline() { public Pipeline getPipeline() {
return pipeline; return pipeline;
@ -92,7 +69,8 @@ public Pipeline getPipeline() {
@Override @Override
public void connect() throws Exception { public void connect() throws Exception {
if (!client.compareAndSet(null, newRaftClient(pipeline, rpcType))) { if (!client.compareAndSet(null,
RatisHelper.newRaftClient(rpcType, getPipeline()))) {
throw new IllegalStateException("Client is already connected."); throw new IllegalStateException("Client is already connected.");
} }
} }

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.client.ClientFactory;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* Ratis helper methods.
*/
public interface RatisHelper {
Logger LOG = LoggerFactory.getLogger(RatisHelper.class);
static String toRaftPeerIdString(DatanodeID id) {
return id.getIpAddr() + ":" + id.getContainerPort();
}
static RaftPeerId toRaftPeerId(DatanodeID id) {
return RaftPeerId.valueOf(toRaftPeerIdString(id));
}
static RaftPeer toRaftPeer(String id) {
return new RaftPeer(RaftPeerId.valueOf(id), id);
}
static RaftPeer toRaftPeer(DatanodeID id) {
return toRaftPeer(toRaftPeerIdString(id));
}
static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
return pipeline.getMachines().stream()
.map(RatisHelper::toRaftPeer)
.collect(Collectors.toList());
}
static RaftPeer[] toRaftPeerArray(Pipeline pipeline) {
return toRaftPeers(pipeline).toArray(RaftPeer.EMPTY_PEERS);
}
static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) {
return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()),
toRaftPeers(pipeline));
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) {
return newRaftClient(rpcType, leader.getId(),
new ArrayList<>(Arrays.asList(leader)));
}
static RaftClient newRaftClient(
RpcType rpcType, RaftPeerId leader, List<RaftPeer> peers) {
LOG.trace("newRaftClient: {}, leader={}, peers={}", rpcType, leader, peers);
final RaftProperties properties = new RaftProperties();
final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(
properties, null));
return RaftClient.newBuilder()
.setClientRpc(factory.newRaftClientRpc())
.setServers(peers)
.setLeaderId(leader)
.setProperties(properties)
.build();
}
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis;
/**
* This package contains classes related to Apache Ratis.
*/

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.shaded.com.google.protobuf;
/**
* This package contains classes related to the shaded protobuf in Apache Ratis.
*/

View File

@ -83,10 +83,9 @@ public final class OzoneConfigKeys {
= ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY; = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY;
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT
= ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT; = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT;
public static final String DFS_CONTAINER_RATIS_CONF = /** A unique ID to identify a Ratis server. */
"dfs.container.ratis.conf"; public static final String DFS_CONTAINER_RATIS_SERVER_ID =
public static final String DFS_CONTAINER_RATIS_DATANODE_ADDRESS = "dfs.container.ratis.server.id";
"dfs.container.ratis.datanode.address";
public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR = public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =
"dfs.container.ratis.datanode.storage.dir"; "dfs.container.ratis.datanode.storage.dir";

View File

@ -18,9 +18,7 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis; package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@ -28,23 +26,25 @@
import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerConfigKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collections;
import java.util.List; import java.util.Objects;
import java.util.stream.Collectors;
/** /**
* Creates a ratis server endpoint that acts as the communication layer for * Creates a ratis server endpoint that acts as the communication layer for
* Ozone containers. * Ozone containers.
*/ */
public final class XceiverServerRatis implements XceiverServerSpi { public final class XceiverServerRatis implements XceiverServerSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
static RaftProperties newRaftProperties( static RaftProperties newRaftProperties(
RpcType rpc, int port, String storageDir) { RpcType rpc, int port, String storageDir) {
final RaftProperties properties = new RaftProperties(); final RaftProperties properties = new RaftProperties();
@ -62,37 +62,31 @@ public static XceiverServerRatis newXceiverServerRatis(
Configuration ozoneConf, ContainerDispatcher dispatcher) Configuration ozoneConf, ContainerDispatcher dispatcher)
throws IOException { throws IOException {
final String id = ozoneConf.get( final String id = ozoneConf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS); OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID);
final Collection<String> servers = ozoneConf.getStringCollection( final int port = ozoneConf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF); OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
final String storageDir = ozoneConf.get( final String storageDir = ozoneConf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR); OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
final String rpcType = ozoneConf.get( final String rpcType = ozoneConf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType); final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
return new XceiverServerRatis(id, servers, storageDir, dispatcher, rpc); return new XceiverServerRatis(id, port, storageDir, dispatcher, rpc);
} }
private final int port; private final int port;
private final RaftServer server; private final RaftServer server;
private XceiverServerRatis( private XceiverServerRatis(
String id, Collection<String> servers, String storageDir, String id, int port, String storageDir,
ContainerDispatcher dispatcher, RpcType rpcType) throws IOException { ContainerDispatcher dispatcher, RpcType rpcType) throws IOException {
Preconditions.checkArgument(servers.contains(id), Objects.requireNonNull(id, "id == null");
"%s is not one of %s specified in %s", this.port = port;
id, servers, OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF);
final List<RaftPeer> peers = servers.stream()
.map(addr -> new RaftPeer(new RaftPeerId(addr), addr))
.collect(Collectors.toList());
this.port = NetUtils.createSocketAddr(id).getPort();
this.server = RaftServer.newBuilder() this.server = RaftServer.newBuilder()
.setServerId(new RaftPeerId(id)) .setServerId(RaftPeerId.valueOf(id))
.setPeers(peers) .setPeers(Collections.emptyList())
.setProperties(newRaftProperties(rpcType, port, storageDir)) .setProperties(newRaftProperties(rpcType, port, storageDir))
.setStateMachine(new ContainerStateMachine(dispatcher)) .setStateMachine(new ContainerStateMachine(dispatcher))
.build(); .build();
@ -100,6 +94,8 @@ private XceiverServerRatis(
@Override @Override
public void start() throws IOException { public void start() throws IOException {
LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
server.getId(), getIPCPort());
server.start(); server.start();
} }

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
/**
* This package contains classes for the server implementation
* using Apache Ratis
*/

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys; import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.ksm.KeySpaceManager; import org.apache.hadoop.ozone.ksm.KeySpaceManager;
import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.ScmConfigKeys;
@ -102,12 +103,11 @@ protected void setupDatanodeAddress(
if (!useRatis) { if (!useRatis) {
return; return;
} }
final String[] ids = dnConf.getStrings( final String address = ContainerTestHelper.createLocalAddress();
OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF); setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID,
// TODO: use the i-th raft server as the i-th datanode address address);
// this only work for one Raft cluster setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS, String.valueOf(NetUtils.createSocketAddr(address).getPort()));
ids[i]);
setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
getInstanceStorageDir(i, -1).getCanonicalPath()); getInstanceStorageDir(i, -1).getCanonicalPath());
} }
@ -206,16 +206,13 @@ public OzoneClient createOzoneClient() throws OzoneException {
*/ */
public void waitOzoneReady() throws TimeoutException, InterruptedException { public void waitOzoneReady() throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> { GenericTestUtils.waitFor(() -> {
if (scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY) final int healthy = scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY);
>= numDataNodes) { final boolean isReady = healthy >= numDataNodes;
return true; LOG.info("{}. Got {} of {} DN Heartbeats.",
} isReady? "Cluster is ready" : "Waiting for cluster to be ready",
LOG.info("Waiting for cluster to be ready. Got {} of {} DN Heartbeats.", healthy, numDataNodes);
scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY), return isReady;
numDataNodes); }, 1000, 60 * 1000); //wait for 1 min.
return false;
}, 1000, 5 * 60 * 1000); //wait for 5 mins.
} }
/** /**

View File

@ -26,7 +26,6 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.stream.Collectors;
/** /**
* Helpers for Ratis tests. * Helpers for Ratis tests.
@ -34,23 +33,17 @@
public interface RatisTestHelper { public interface RatisTestHelper {
Logger LOG = LoggerFactory.getLogger(RatisTestHelper.class); Logger LOG = LoggerFactory.getLogger(RatisTestHelper.class);
static void initRatisConf( static void initRatisConf(RpcType rpc, Configuration conf) {
RpcType rpc, Pipeline pipeline, Configuration conf) {
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, true); conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, true);
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, rpc.name()); conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, rpc.name());
LOG.info(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY LOG.info(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY
+ " = " + rpc.name()); + " = " + rpc.name());
final String s = pipeline.getMachines().stream()
.map(dn -> dn.getXferAddr())
.collect(Collectors.joining(","));
conf.setStrings(OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF, s);
LOG.info(OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF + " = " + s);
} }
static XceiverClientRatis newXceiverClientRatis( static XceiverClientRatis newXceiverClientRatis(
RpcType rpcType, Pipeline pipeline, OzoneConfiguration conf) RpcType rpcType, Pipeline pipeline, OzoneConfiguration conf)
throws IOException { throws IOException {
initRatisConf(rpcType, pipeline, conf); initRatisConf(rpcType, conf);
return XceiverClientRatis.newXceiverClientRatis(pipeline, conf); return XceiverClientRatis.newXceiverClientRatis(pipeline, conf);
} }
} }

View File

@ -80,6 +80,11 @@ public static Pipeline createSingleNodePipeline(String containerName) throws
return createPipeline(containerName, 1); return createPipeline(containerName, 1);
} }
public static String createLocalAddress() throws IOException {
try(ServerSocket s = new ServerSocket(0)) {
return "127.0.0.1:" + s.getLocalPort();
}
}
public static DatanodeID createDatanodeID() throws IOException { public static DatanodeID createDatanodeID() throws IOException {
ServerSocket socket = new ServerSocket(0); ServerSocket socket = new ServerSocket(0);
int port = socket.getLocalPort(); int port = socket.getLocalPort();
@ -100,13 +105,26 @@ public static DatanodeID createDatanodeID() throws IOException {
public static Pipeline createPipeline(String containerName, int numNodes) public static Pipeline createPipeline(String containerName, int numNodes)
throws IOException { throws IOException {
Preconditions.checkArgument(numNodes >= 1); Preconditions.checkArgument(numNodes >= 1);
final DatanodeID leader = createDatanodeID(); final List<DatanodeID> ids = new ArrayList<>(numNodes);
Pipeline pipeline = new Pipeline(leader.getDatanodeUuid()); for(int i = 0; i < numNodes; i++) {
ids.add(createDatanodeID());
}
return createPipeline(containerName, ids);
}
public static Pipeline createPipeline(
String containerName, Iterable<DatanodeID> ids)
throws IOException {
Objects.requireNonNull(ids, "ids == null");
final Iterator<DatanodeID> i = ids.iterator();
Preconditions.checkArgument(i.hasNext());
final DatanodeID leader = i.next();
final Pipeline pipeline = new Pipeline(leader.getDatanodeUuid());
pipeline.setContainerName(containerName); pipeline.setContainerName(containerName);
pipeline.addMember(leader); pipeline.addMember(leader);
for(int i = 1; i < numNodes; i++) { for(; i.hasNext();) {
pipeline.addMember(createDatanodeID()); pipeline.addMember(i.next());
} }
return pipeline; return pipeline;
} }

View File

@ -18,23 +18,31 @@
package org.apache.hadoop.ozone.container.ozoneimpl; package org.apache.hadoop.ozone.container.ozoneimpl;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.RatisTestHelper; import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.ratis.RatisHelper;
import org.apache.hadoop.scm.XceiverClientRatis;
import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.CheckedBiConsumer; import org.apache.ratis.util.CheckedBiConsumer;
import org.apache.ratis.util.CollectionUtils;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.List;
/** /**
* Tests ozone containers with Apache Ratis. * Tests ozone containers with Apache Ratis.
*/ */
@ -78,19 +86,31 @@ private static void runTest(
throws Exception { throws Exception {
LOG.info(testName + "(rpc=" + rpc + ", numNodes=" + numNodes); LOG.info(testName + "(rpc=" + rpc + ", numNodes=" + numNodes);
// create Ozone clusters
final OzoneConfiguration conf = newOzoneConfiguration(); final OzoneConfiguration conf = newOzoneConfiguration();
final String containerName = OzoneUtils.getRequestID(); RatisTestHelper.initRatisConf(rpc, conf);
final Pipeline pipeline = ContainerTestHelper.createPipeline(
containerName, numNodes);
final XceiverClientSpi client = RatisTestHelper.newXceiverClientRatis(
rpc, pipeline, conf);
final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf) final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL) .setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL)
.numDataNodes(pipeline.getMachines().size()) .numDataNodes(numNodes)
.build(); .build();
cluster.waitOzoneReady(); cluster.waitOzoneReady();
final String containerName = OzoneUtils.getRequestID();
final List<DataNode> datanodes = cluster.getDataNodes();
final Pipeline pipeline = ContainerTestHelper.createPipeline(containerName,
CollectionUtils.as(datanodes, DataNode::getDatanodeId));
LOG.info("pipeline=" + pipeline);
// Create Ratis cluster
final RaftPeer[] peers = RatisHelper.toRaftPeerArray(pipeline);
for(RaftPeer p : peers) {
final RaftClient client = RatisHelper.newRaftClient(rpc, p);
client.reinitialize(peers, p.getId());
}
LOG.info("reinitialize done");
final XceiverClientSpi client = XceiverClientRatis.newXceiverClientRatis(
pipeline, conf);
try { try {
test.accept(containerName, client); test.accept(containerName, client);
} finally { } finally {

View File

@ -40,7 +40,11 @@
import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.util.CheckedBiConsumer;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -87,7 +91,8 @@ public void testClientServer() throws Exception {
(pipeline, conf) -> conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, (pipeline, conf) -> conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort()), pipeline.getLeader().getContainerPort()),
XceiverClient::new, XceiverClient::new,
(dn, conf) -> new XceiverServer(conf, new TestContainerDispatcher())); (dn, conf) -> new XceiverServer(conf, new TestContainerDispatcher()),
(dn, p) -> {});
} }
@FunctionalInterface @FunctionalInterface
@ -110,7 +115,8 @@ public void testClientServerRatisGrpc() throws Exception {
static XceiverServerRatis newXceiverServerRatis( static XceiverServerRatis newXceiverServerRatis(
DatanodeID dn, OzoneConfiguration conf) throws IOException { DatanodeID dn, OzoneConfiguration conf) throws IOException {
final String id = dn.getXferAddr(); final String id = dn.getXferAddr();
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS, id); conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID, id);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, dn.getContainerPort());
final String dir = TEST_DIR + id.replace(':', '_'); final String dir = TEST_DIR + id.replace(':', '_');
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir); conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
@ -118,13 +124,22 @@ static XceiverServerRatis newXceiverServerRatis(
return XceiverServerRatis.newXceiverServerRatis(conf, dispatcher); return XceiverServerRatis.newXceiverServerRatis(conf, dispatcher);
} }
static void initXceiverServerRatis(
RpcType rpc, DatanodeID id, Pipeline pipeline) throws IOException {
final RaftPeer p = RatisHelper.toRaftPeer(id);
final RaftPeer[] peers = RatisHelper.toRaftPeerArray(pipeline);
final RaftClient client = RatisHelper.newRaftClient(rpc, p);
client.reinitialize(peers, p.getId());
}
static void runTestClientServerRatis(RpcType rpc, int numNodes) static void runTestClientServerRatis(RpcType rpc, int numNodes)
throws Exception { throws Exception {
runTestClientServer(numNodes, runTestClientServer(numNodes,
(pipeline, conf) -> RatisTestHelper.initRatisConf( (pipeline, conf) -> RatisTestHelper.initRatisConf(rpc, conf),
rpc, pipeline, conf),
XceiverClientRatis::newXceiverClientRatis, XceiverClientRatis::newXceiverClientRatis,
TestContainerServer::newXceiverServerRatis); TestContainerServer::newXceiverServerRatis,
(dn, p) -> initXceiverServerRatis(rpc, dn, p));
} }
static void runTestClientServer( static void runTestClientServer(
@ -133,7 +148,8 @@ static void runTestClientServer(
CheckedBiFunction<Pipeline, OzoneConfiguration, XceiverClientSpi, CheckedBiFunction<Pipeline, OzoneConfiguration, XceiverClientSpi,
IOException> createClient, IOException> createClient,
CheckedBiFunction<DatanodeID, OzoneConfiguration, XceiverServerSpi, CheckedBiFunction<DatanodeID, OzoneConfiguration, XceiverServerSpi,
IOException> createServer) IOException> createServer,
CheckedBiConsumer<DatanodeID, Pipeline, IOException> initServer)
throws Exception { throws Exception {
final List<XceiverServerSpi> servers = new ArrayList<>(); final List<XceiverServerSpi> servers = new ArrayList<>();
XceiverClientSpi client = null; XceiverClientSpi client = null;
@ -148,6 +164,7 @@ static void runTestClientServer(
final XceiverServerSpi s = createServer.apply(dn, conf); final XceiverServerSpi s = createServer.apply(dn, conf);
servers.add(s); servers.add(s);
s.start(); s.start();
initServer.accept(dn, pipeline);
} }
client = createClient.apply(pipeline, conf); client = createClient.apply(pipeline, conf);