HDFS-11843. Ozone: XceiverClientRatis should implement XceiverClientSpi.connect().

This commit is contained in:
Tsz-Wo Nicholas Sze 2017-05-19 17:08:40 -07:00 committed by Owen O'Malley
parent 4fb9064523
commit 877e751c84
6 changed files with 251 additions and 128 deletions

View File

@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -56,11 +58,16 @@ public final class XceiverClientRatis implements XceiverClientSpi {
} }
private final Pipeline pipeline; private final Pipeline pipeline;
private final RaftClient client; private final RpcType rpcType;
private final AtomicReference<RaftClient> client = new AtomicReference<>();
/** Constructs a client. */ /** Constructs a client. */
XceiverClientRatis(Pipeline pipeline, RpcType rpcType) { private XceiverClientRatis(Pipeline pipeline, RpcType rpcType) {
this.pipeline = pipeline; this.pipeline = pipeline;
this.rpcType = rpcType;
}
static RaftClient newRaftClient(Pipeline pipeline, RpcType rpcType) {
final List<RaftPeer> peers = pipeline.getMachines().stream() final List<RaftPeer> peers = pipeline.getMachines().stream()
.map(dn -> dn.getXferAddr()) .map(dn -> dn.getXferAddr())
.map(addr -> new RaftPeer(new RaftPeerId(addr), addr)) .map(addr -> new RaftPeer(new RaftPeerId(addr), addr))
@ -70,7 +77,7 @@ public final class XceiverClientRatis implements XceiverClientSpi {
final ClientFactory factory = ClientFactory.cast(rpcType.newFactory( final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(
properties, null)); properties, null));
client = RaftClient.newBuilder() return RaftClient.newBuilder()
.setClientRpc(factory.newRaftClientRpc()) .setClientRpc(factory.newRaftClientRpc())
.setServers(peers) .setServers(peers)
.setLeaderId(new RaftPeerId(pipeline.getLeader().getXferAddr())) .setLeaderId(new RaftPeerId(pipeline.getLeader().getXferAddr()))
@ -85,23 +92,32 @@ public final class XceiverClientRatis implements XceiverClientSpi {
@Override @Override
public void connect() throws Exception { public void connect() throws Exception {
// do nothing. if (!client.compareAndSet(null, newRaftClient(pipeline, rpcType))) {
throw new IllegalStateException("Client is already connected.");
}
} }
@Override @Override
public void close() { public void close() {
try { final RaftClient c = client.getAndSet(null);
client.close(); if (c != null) {
} catch (IOException e) { try {
throw new IllegalStateException(e); c.close();
} catch (IOException e) {
throw new IllegalStateException(e);
}
} }
} }
private RaftClient getClient() {
return Objects.requireNonNull(client.get(), "client is null");
}
@Override @Override
public ContainerCommandResponseProto sendCommand( public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException { ContainerCommandRequestProto request) throws IOException {
LOG.debug("sendCommand {}", request); LOG.debug("sendCommand {}", request);
final RaftClientReply reply = client.send( final RaftClientReply reply = getClient().send(
() -> ShadedProtoUtil.asShadedByteString(request.toByteArray())); () -> ShadedProtoUtil.asShadedByteString(request.toByteArray()));
LOG.debug("reply {}", reply); LOG.debug("reply {}", reply);
Preconditions.checkState(reply.isSuccess()); Preconditions.checkState(reply.isSuccess());

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.scm.XceiverClientRatis;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.rpc.RpcType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.stream.Collectors;
/**
* Helpers for Ratis tests.
*/
public interface RatisTestHelper {
Logger LOG = LoggerFactory.getLogger(RatisTestHelper.class);
static void initRatisConf(
RpcType rpc, Pipeline pipeline, Configuration conf) {
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, true);
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, rpc.name());
LOG.info(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY
+ " = " + rpc.name());
final String s = pipeline.getMachines().stream()
.map(dn -> dn.getXferAddr())
.collect(Collectors.joining(","));
conf.setStrings(OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF, s);
LOG.info(OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF + " = " + s);
}
static XceiverClientRatis newXceiverClientRatis(
RpcType rpcType, Pipeline pipeline, OzoneConfiguration conf)
throws IOException {
initRatisConf(rpcType, pipeline, conf);
return XceiverClientRatis.newXceiverClientRatis(pipeline, conf);
}
}

View File

@ -21,20 +21,20 @@ package org.apache.hadoop.ozone.container;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import org.apache.commons.codec.binary.Hex; import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ContainerCommandRequestProto; .ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ContainerCommandResponseProto; .ContainerCommandResponseProto;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.container.common.helpers.KeyData; import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.ratis.rpc.RpcType; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert; import org.junit.Assert;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -43,12 +43,7 @@ import java.io.IOException;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.util.LinkedList; import java.util.*;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.Map;
import java.util.stream.Collectors;
/** /**
* Helpers for container tests. * Helpers for container tests.
@ -64,6 +59,15 @@ public final class ContainerTestHelper {
private ContainerTestHelper() { private ContainerTestHelper() {
} }
public static void setOzoneLocalStorageRoot(
Class<?> clazz, OzoneConfiguration conf) {
String path = GenericTestUtils.getTempPath(clazz.getSimpleName());
path += conf.getTrimmed(
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
}
// TODO: mock multi-node pipeline // TODO: mock multi-node pipeline
/** /**
* Create a pipeline with single node replica. * Create a pipeline with single node replica.
@ -107,16 +111,6 @@ public final class ContainerTestHelper {
return pipeline; return pipeline;
} }
public static void initRatisConf(
RpcType rpc, Pipeline pipeline, Configuration conf) {
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, true);
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, rpc.name());
conf.setStrings(OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF,
pipeline.getMachines().stream()
.map(dn -> dn.getXferAddr())
.collect(Collectors.joining(",")));
}
/** /**
* Creates a ChunkInfo for testing. * Creates a ChunkInfo for testing.
* *

View File

@ -26,12 +26,8 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.XceiverClient; import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.XceiverClientRatis;
import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -40,6 +36,7 @@ import org.junit.rules.Timeout;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
/** /**
* Tests ozone containers. * Tests ozone containers.
*/ */
@ -53,12 +50,7 @@ public class TestOzoneContainer {
@Test @Test
public void testCreateOzoneContainer() throws Exception { public void testCreateOzoneContainer() throws Exception {
String containerName = OzoneUtils.getRequestID(); String containerName = OzoneUtils.getRequestID();
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = newOzoneConfiguration();
String path = GenericTestUtils
.getTempPath(TestOzoneContainer.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
OzoneContainer container = null; OzoneContainer container = null;
MiniOzoneCluster cluster = null; MiniOzoneCluster cluster = null;
try { try {
@ -76,12 +68,7 @@ public class TestOzoneContainer {
XceiverClient client = new XceiverClient(pipeline, conf); XceiverClient client = new XceiverClient(pipeline, conf);
client.connect(); client.connect();
ContainerProtos.ContainerCommandRequestProto request = createContainerForTesting(client, containerName);
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
} finally { } finally {
if (container != null) { if (container != null) {
container.stop(); container.stop();
@ -92,69 +79,19 @@ public class TestOzoneContainer {
} }
} }
@Test static OzoneConfiguration newOzoneConfiguration() {
public void testOzoneContainerViaDataNodeRatisGrpc() throws Exception {
runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.GRPC, 1);
runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.GRPC, 3);
}
@Test
public void testOzoneContainerViaDataNodeRatisNetty() throws Exception {
runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.NETTY, 1);
runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.NETTY, 3);
}
private static void runTestOzoneContainerViaDataNodeRatis(
RpcType rpc, int numNodes) throws Exception {
ContainerTestHelper.LOG.info("runTestOzoneContainerViaDataNodeRatis(rpc="
+ rpc + ", numNodes=" + numNodes);
final String containerName = OzoneUtils.getRequestID();
final Pipeline pipeline = ContainerTestHelper.createPipeline(
containerName, numNodes);
final OzoneConfiguration conf = initOzoneConfiguration(pipeline);
ContainerTestHelper.initRatisConf(rpc, pipeline, conf);
final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL)
.numDataNodes(pipeline.getMachines().size())
.build();
cluster.waitOzoneReady();
final XceiverClientSpi client = XceiverClientRatis.newXceiverClientRatis(
pipeline, conf);
try {
runTestOzoneContainerViaDataNode(containerName, client);
} finally {
cluster.shutdown();
}
}
private static OzoneConfiguration initOzoneConfiguration(Pipeline pipeline) {
final OzoneConfiguration conf = new OzoneConfiguration(); final OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, ContainerTestHelper.setOzoneLocalStorageRoot(
pipeline.getLeader().getContainerPort()); TestOzoneContainer.class, conf);
setOzoneLocalStorageRoot(conf);
return conf; return conf;
} }
private static void setOzoneLocalStorageRoot(OzoneConfiguration conf) {
String path = GenericTestUtils
.getTempPath(TestOzoneContainer.class.getSimpleName());
path += conf.getTrimmed(
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
}
@Test @Test
public void testOzoneContainerViaDataNode() throws Exception { public void testOzoneContainerViaDataNode() throws Exception {
MiniOzoneCluster cluster = null; MiniOzoneCluster cluster = null;
try { try {
String containerName = OzoneUtils.getRequestID(); String containerName = OzoneUtils.getRequestID();
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = newOzoneConfiguration();
setOzoneLocalStorageRoot(conf);
// Start ozone container Via Datanode create. // Start ozone container Via Datanode create.
@ -178,8 +115,8 @@ public class TestOzoneContainer {
} }
} }
static void runTestOzoneContainerViaDataNode(String containerName, static void runTestOzoneContainerViaDataNode(
XceiverClientSpi client) throws Exception { String containerName, XceiverClientSpi client) throws Exception {
ContainerProtos.ContainerCommandRequestProto ContainerProtos.ContainerCommandRequestProto
request, writeChunkRequest, putKeyRequest, request, writeChunkRequest, putKeyRequest,
updateRequest1, updateRequest2; updateRequest1, updateRequest2;
@ -261,33 +198,44 @@ public class TestOzoneContainer {
public void testBothGetandPutSmallFile() throws Exception { public void testBothGetandPutSmallFile() throws Exception {
MiniOzoneCluster cluster = null; MiniOzoneCluster cluster = null;
XceiverClient client = null; XceiverClient client = null;
ContainerProtos.ContainerCommandResponseProto response;
ContainerProtos.ContainerCommandRequestProto
smallFileRequest, getSmallFileRequest;
try { try {
String keyName = OzoneUtils.getRequestID(); OzoneConfiguration conf = newOzoneConfiguration();
OzoneConfiguration conf = new OzoneConfiguration();
client = createClientForTesting(conf); client = createClientForTesting(conf);
cluster = new MiniOzoneCluster.Builder(conf) cluster = new MiniOzoneCluster.Builder(conf)
.setRandomContainerPort(false) .setRandomContainerPort(false)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
String containerName = client.getPipeline().getContainerName();
runTestBothGetandPutSmallFile(containerName, client);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
static void runTestBothGetandPutSmallFile(
String containerName, XceiverClientSpi client) throws Exception {
try {
client.connect(); client.connect();
String containerName = client.getPipeline().getContainerName();
createContainerForTesting(client, containerName); createContainerForTesting(client, containerName);
smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest( String keyName = OzoneUtils.getRequestID();
client.getPipeline(), containerName, keyName, 1024); final ContainerProtos.ContainerCommandRequestProto smallFileRequest
= ContainerTestHelper.getWriteSmallFileRequest(
client.getPipeline(), containerName, keyName, 1024);
response = client.sendCommand(smallFileRequest); ContainerProtos.ContainerCommandResponseProto response
= client.sendCommand(smallFileRequest);
Assert.assertNotNull(response); Assert.assertNotNull(response);
Assert.assertTrue(smallFileRequest.getTraceID() Assert.assertTrue(smallFileRequest.getTraceID()
.equals(response.getTraceID())); .equals(response.getTraceID()));
getSmallFileRequest = final ContainerProtos.ContainerCommandRequestProto getSmallFileRequest
ContainerTestHelper.getReadSmallFileRequest(smallFileRequest = ContainerTestHelper.getReadSmallFileRequest(
.getPutSmallFile().getKey()); smallFileRequest.getPutSmallFile().getKey());
response = client.sendCommand(getSmallFileRequest); response = client.sendCommand(getSmallFileRequest);
Assert.assertArrayEquals( Assert.assertArrayEquals(
smallFileRequest.getPutSmallFile().getData().toByteArray(), smallFileRequest.getPutSmallFile().getData().toByteArray(),
@ -296,9 +244,6 @@ public class TestOzoneContainer {
if (client != null) { if (client != null) {
client.close(); client.close();
} }
if (cluster != null) {
cluster.shutdown();
}
} }
} }
@ -311,7 +256,7 @@ public class TestOzoneContainer {
writeChunkRequest, putKeyRequest, request; writeChunkRequest, putKeyRequest, request;
try { try {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = newOzoneConfiguration();
client = createClientForTesting(conf); client = createClientForTesting(conf);
cluster = new MiniOzoneCluster.Builder(conf) cluster = new MiniOzoneCluster.Builder(conf)
@ -399,7 +344,7 @@ public class TestOzoneContainer {
ContainerProtos.ContainerCommandRequestProto request, ContainerProtos.ContainerCommandRequestProto request,
writeChunkRequest, putKeyRequest; writeChunkRequest, putKeyRequest;
try { try {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = newOzoneConfiguration();
client = createClientForTesting(conf); client = createClientForTesting(conf);
cluster = new MiniOzoneCluster.Builder(conf) cluster = new MiniOzoneCluster.Builder(conf)
@ -470,18 +415,10 @@ public class TestOzoneContainer {
} }
} }
private XceiverClient createClientForTesting(OzoneConfiguration conf) private static XceiverClient createClientForTesting(OzoneConfiguration conf)
throws Exception { throws Exception {
String containerName = OzoneUtils.getRequestID(); String containerName = OzoneUtils.getRequestID();
String path = GenericTestUtils
.getTempPath(TestOzoneContainer.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
// Start ozone container Via Datanode create. // Start ozone container Via Datanode create.
Pipeline pipeline = Pipeline pipeline =
ContainerTestHelper.createSingleNodePipeline(containerName); ContainerTestHelper.createSingleNodePipeline(containerName);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ozoneimpl;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.CheckedBiConsumer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests ozone containers with Apache Ratis.
*/
public class TestOzoneContainerRatis {
private static final Logger LOG = LoggerFactory.getLogger(
TestOzoneContainerRatis.class);
static OzoneConfiguration newOzoneConfiguration() {
final OzoneConfiguration conf = new OzoneConfiguration();
ContainerTestHelper.setOzoneLocalStorageRoot(
TestOzoneContainerRatis.class, conf);
return conf;
}
/** Set the timeout for every test. */
@Rule
public Timeout testTimeout = new Timeout(300000);
@Test
public void testOzoneContainerViaDataNodeRatisGrpc() throws Exception {
runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.GRPC, 1);
runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.GRPC, 3);
}
@Test
public void testOzoneContainerViaDataNodeRatisNetty() throws Exception {
runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.NETTY, 1);
runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.NETTY, 3);
}
private static void runTestOzoneContainerViaDataNodeRatis(
RpcType rpc, int numNodes) throws Exception {
runTest("runTestOzoneContainerViaDataNodeRatis", rpc, numNodes,
TestOzoneContainer::runTestOzoneContainerViaDataNode);
}
private static void runTest(
String testName, RpcType rpc, int numNodes,
CheckedBiConsumer<String, XceiverClientSpi, Exception> test)
throws Exception {
LOG.info(testName + "(rpc=" + rpc + ", numNodes=" + numNodes);
final OzoneConfiguration conf = newOzoneConfiguration();
final String containerName = OzoneUtils.getRequestID();
final Pipeline pipeline = ContainerTestHelper.createPipeline(
containerName, numNodes);
final XceiverClientSpi client = RatisTestHelper.newXceiverClientRatis(
rpc, pipeline, conf);
final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL)
.numDataNodes(pipeline.getMachines().size())
.build();
cluster.waitOzoneReady();
try {
test.accept(containerName, client);
} finally {
cluster.shutdown();
}
}
private static void runTestBothGetandPutSmallFileRatis(
RpcType rpc, int numNodes) throws Exception {
runTest("runTestBothGetandPutSmallFileRatis", rpc, numNodes,
TestOzoneContainer::runTestBothGetandPutSmallFile);
}
@Test
public void testBothGetandPutSmallFileRatisNetty() throws Exception {
runTestBothGetandPutSmallFileRatis(SupportedRpcType.NETTY, 1);
runTestBothGetandPutSmallFileRatis(SupportedRpcType.NETTY, 3);
}
@Test
public void testBothGetandPutSmallFileRatisGrpc() throws Exception {
runTestBothGetandPutSmallFileRatis(SupportedRpcType.GRPC, 1);
runTestBothGetandPutSmallFileRatis(SupportedRpcType.GRPC, 3);
}
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerComm
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.impl.Dispatcher; import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@ -120,7 +121,7 @@ public class TestContainerServer {
static void runTestClientServerRatis(RpcType rpc, int numNodes) static void runTestClientServerRatis(RpcType rpc, int numNodes)
throws Exception { throws Exception {
runTestClientServer(numNodes, runTestClientServer(numNodes,
(pipeline, conf) -> ContainerTestHelper.initRatisConf( (pipeline, conf) -> RatisTestHelper.initRatisConf(
rpc, pipeline, conf), rpc, pipeline, conf),
XceiverClientRatis::newXceiverClientRatis, XceiverClientRatis::newXceiverClientRatis,
TestContainerServer::newXceiverServerRatis); TestContainerServer::newXceiverServerRatis);