HDDS-49. Standalone protocol should use grpc in place of netty.

Contributed by Mukul Kumar Singh.
This commit is contained in:
Anu Engineer 2018-05-22 16:51:43 -07:00
parent 3e5f7ea986
commit 5a9140690a
15 changed files with 540 additions and 36 deletions

View File

@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.util.Time;
import org.apache.ratis.shaded.io.grpc.ManagedChannel;
import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* A Client for the storageContainer protocol.
*/
public class XceiverClientGrpc extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
private final Pipeline pipeline;
private final Configuration config;
private XceiverClientProtocolServiceStub asyncStub;
private XceiverClientMetrics metrics;
private ManagedChannel channel;
private final Semaphore semaphore;
/**
* Constructs a client that can communicate with the Container framework on
* data nodes.
*
* @param pipeline - Pipeline that defines the machines.
* @param config -- Ozone Config
*/
public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
super();
Preconditions.checkNotNull(pipeline);
Preconditions.checkNotNull(config);
this.pipeline = pipeline;
this.config = config;
this.semaphore =
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
this.metrics = XceiverClientManager.getXceiverClientMetrics();
}
@Override
public void connect() throws Exception {
DatanodeDetails leader = this.pipeline.getLeader();
// read port from the data node, on failure use default configured
// port.
int port = leader.getContainerPort();
if (port == 0) {
port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
}
LOG.debug("Connecting to server Port : " + leader.getIpAddress());
channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port)
.usePlaintext(true)
.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
.build();
asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);
}
/**
* Returns if the xceiver client connects to a server.
*
* @return True if the connection is alive, false otherwise.
*/
@VisibleForTesting
public boolean isConnected() {
return !channel.isTerminated() && !channel.isShutdown();
}
@Override
public void close() {
channel.shutdownNow();
try {
channel.awaitTermination(60, TimeUnit.MINUTES);
} catch (Exception e) {
LOG.error("Unexpected exception while waiting for channel termination",
e);
}
}
@Override
public Pipeline getPipeline() {
return pipeline;
}
@Override
public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException {
try {
return sendCommandAsync(request).get();
} catch (ExecutionException | InterruptedException e) {
/**
* In case the grpc channel handler throws an exception,
* the exception thrown will be wrapped within {@link ExecutionException}.
* Unwarpping here so that original exception gets passed
* to to the client.
*/
if (e instanceof ExecutionException) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
}
}
throw new IOException(
"Unexpected exception during execution:" + e.getMessage());
}
}
/**
* Sends a given command to server gets a waitable future back.
*
* @param request Request
* @return Response to the command
* @throws IOException
*/
@Override
public CompletableFuture<ContainerCommandResponseProto>
sendCommandAsync(ContainerCommandRequestProto request)
throws IOException, ExecutionException, InterruptedException {
final CompletableFuture<ContainerCommandResponseProto> replyFuture =
new CompletableFuture<>();
semaphore.acquire();
long requestTime = Time.monotonicNowNanos();
metrics.incrPendingContainerOpsMetrics(request.getCmdType());
// create a new grpc stream for each non-async call.
final StreamObserver<ContainerCommandRequestProto> requestObserver =
asyncStub.send(new StreamObserver<ContainerCommandResponseProto>() {
@Override
public void onNext(ContainerCommandResponseProto value) {
replyFuture.complete(value);
metrics.decrPendingContainerOpsMetrics(request.getCmdType());
metrics.addContainerOpsLatency(request.getCmdType(),
Time.monotonicNowNanos() - requestTime);
semaphore.release();
}
@Override
public void onError(Throwable t) {
replyFuture.completeExceptionally(t);
metrics.decrPendingContainerOpsMetrics(request.getCmdType());
metrics.addContainerOpsLatency(request.getCmdType(),
Time.monotonicNowNanos() - requestTime);
semaphore.release();
}
@Override
public void onCompleted() {
if (!replyFuture.isDone()) {
replyFuture.completeExceptionally(
new IOException("Stream completed but no reply for request "
+ request));
}
}
});
requestObserver.onNext(request);
requestObserver.onCompleted();
return replyFuture;
}
/**
* Create a pipeline.
*
* @param pipelineID - Name of the pipeline.
* @param datanodes - Datanodes
*/
@Override
public void createPipeline(String pipelineID, List<DatanodeDetails> datanodes)
throws IOException {
// For stand alone pipeline, there is no notion called setup pipeline.
return;
}
/**
* Returns pipeline Type.
*
* @return - Stand Alone as the type.
*/
@Override
public HddsProtos.ReplicationType getPipelineType() {
return HddsProtos.ReplicationType.STAND_ALONE;
}
}

View File

@ -41,8 +41,6 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
.ReplicationType.RATIS;
/**
* XceiverClientManager is responsible for the lifecycle of XceiverClient
@ -62,6 +60,7 @@ public class XceiverClientManager implements Closeable {
private final Configuration conf;
private final Cache<Long, XceiverClientSpi> clientCache;
private final boolean useRatis;
private final boolean useGrpc;
private static XceiverClientMetrics metrics;
/**
@ -79,6 +78,8 @@ public class XceiverClientManager implements Closeable {
this.useRatis = conf.getBoolean(
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
this.useGrpc = conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT);
this.conf = conf;
this.clientCache = CacheBuilder.newBuilder()
.expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
@ -146,9 +147,19 @@ public class XceiverClientManager implements Closeable {
new Callable<XceiverClientSpi>() {
@Override
public XceiverClientSpi call() throws Exception {
XceiverClientSpi client = pipeline.getType() == RATIS ?
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)
: new XceiverClient(pipeline, conf);
XceiverClientSpi client = null;
switch (pipeline.getType()) {
case RATIS:
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf);
break;
case STAND_ALONE:
client = useGrpc ? new XceiverClientGrpc(pipeline, conf) :
new XceiverClient(pipeline, conf);
break;
case CHAINED:
default:
throw new IOException ("not implemented" + pipeline.getType());
}
client.connect();
return client;
}

View File

@ -49,13 +49,13 @@ public class XceiverClientMetrics {
this.containerOpsLatency = new MutableRate[numEnumEntries];
for (int i = 0; i < numEnumEntries; i++) {
pendingOpsArray[i] = registry.newCounter(
"numPending" + ContainerProtos.Type.valueOf(i + 1),
"number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops",
"numPending" + ContainerProtos.Type.forNumber(i + 1),
"number of pending" + ContainerProtos.Type.forNumber(i + 1) + " ops",
(long) 0);
containerOpsLatency[i] = registry.newRate(
ContainerProtos.Type.valueOf(i + 1) + "Latency",
"latency of " + ContainerProtos.Type.valueOf(i + 1)
ContainerProtos.Type.forNumber(i + 1) + "Latency",
"latency of " + ContainerProtos.Type.forNumber(i + 1)
+ " ops");
}
}

View File

@ -18,4 +18,7 @@
<Match>
<Package name="org.apache.hadoop.hdds.protocol.proto"/>
</Match>
<Match>
<Package name="org.apache.hadoop.hdds.protocol.datanode.proto"/>
</Match>
</FindBugsFilter>

View File

@ -61,6 +61,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>ratis-grpc</artifactId>
<groupId>org.apache.ratis</groupId>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<version>2.2.0</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
@ -108,7 +114,15 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
<goal>compile-custom</goal>
<goal>test-compile-custom</goal>
</goals>
<configuration>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
</execution>
</executions>
</plugin>
@ -122,6 +136,9 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<replace token="com.google.protobuf" value="org.apache.ratis.shaded.com.google.protobuf"
dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto">
</replace>
<replace token="io.grpc" value="org.apache.ratis.shaded.io.grpc"
dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto">
</replace>
</tasks>
</configuration>
<goals>

View File

@ -49,6 +49,10 @@ public final class ScmConfigKeys {
= "dfs.container.ratis.enabled";
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
= false;
public static final String DFS_CONTAINER_GRPC_ENABLED_KEY
= "dfs.container.grpc.enabled";
public static final boolean DFS_CONTAINER_GRPC_ENABLED_DEFAULT
= false;
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY
= "dfs.container.ratis.rpc.type";
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT

View File

@ -24,6 +24,7 @@
// This file contains protocol buffers that are used to transfer data
// to and from the datanode.
syntax = "proto2";
option java_package = "org.apache.hadoop.hdds.protocol.datanode.proto";
option java_outer_classname = "ContainerProtos";
option java_generate_equals_and_hash = true;
@ -418,3 +419,9 @@ message CopyContainerResponseProto {
repeated bytes data = 5;
optional int64 checksum = 6;
}
service XceiverClientProtocolService {
// A client-to-datanode RPC to send container commands
rpc send(stream ContainerCommandRequestProto) returns
(stream ContainerCommandResponseProto) {}
}

View File

@ -71,6 +71,15 @@
the replication pipeline supported by ozone.
</description>
</property>
<property>
<name>dfs.container.grpc.enabled</name>
<value>false</value>
<tag>OZONE, MANAGEMENT, PIPELINE, RATIS</tag>
<description>Ozone supports different kinds of replication pipelines
protocols. grpc is one of the replication pipeline protocol supported by
ozone.
</description>
</property>
<property>
<name>dfs.container.ratis.ipc</name>
<value>9858</value>

View File

@ -63,20 +63,20 @@ public class ContainerMetrics {
this.registry = new MetricsRegistry("StorageContainerMetrics");
for (int i = 0; i < numEnumEntries; i++) {
numOpsArray[i] = registry.newCounter(
"num" + ContainerProtos.Type.valueOf(i + 1),
"number of " + ContainerProtos.Type.valueOf(i + 1) + " ops",
"num" + ContainerProtos.Type.forNumber(i + 1),
"number of " + ContainerProtos.Type.forNumber(i + 1) + " ops",
(long) 0);
opsBytesArray[i] = registry.newCounter(
"bytes" + ContainerProtos.Type.valueOf(i + 1),
"bytes used by " + ContainerProtos.Type.valueOf(i + 1) + "op",
"bytes" + ContainerProtos.Type.forNumber(i + 1),
"bytes used by " + ContainerProtos.Type.forNumber(i + 1) + "op",
(long) 0);
opsLatency[i] = registry.newRate(
"latency" + ContainerProtos.Type.valueOf(i + 1),
ContainerProtos.Type.valueOf(i + 1) + " op");
"latency" + ContainerProtos.Type.forNumber(i + 1),
ContainerProtos.Type.forNumber(i + 1) + " op");
for (int j = 0; j < len; j++) {
int interval = intervals[j];
String quantileName = ContainerProtos.Type.valueOf(i + 1) + "Nanos"
String quantileName = ContainerProtos.Type.forNumber(i + 1) + "Nanos"
+ interval + "s";
opsLatQuantiles[i][j] = registry.newQuantiles(quantileName,
"latency of Container ops", "ops", "latency", interval);

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.server;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto
.XceiverClientProtocolServiceGrpc;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Grpc Service for handling Container Commands on datanode.
*/
public class GrpcXceiverService extends
XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceImplBase {
public static final Logger
LOG = LoggerFactory.getLogger(GrpcXceiverService.class);
private final ContainerDispatcher dispatcher;
public GrpcXceiverService(ContainerDispatcher dispatcher) {
this.dispatcher = dispatcher;
}
@Override
public StreamObserver<ContainerCommandRequestProto> send(
StreamObserver<ContainerCommandResponseProto> responseObserver) {
return new StreamObserver<ContainerCommandRequestProto>() {
private final AtomicBoolean isClosed = new AtomicBoolean(false);
@Override
public void onNext(ContainerCommandRequestProto request) {
try {
ContainerCommandResponseProto resp = dispatcher.dispatch(request);
responseObserver.onNext(resp);
} catch (Throwable e) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} got exception when processing"
+ " ContainerCommandRequestProto {}: {}", request, e);
}
responseObserver.onError(e);
}
}
@Override
public void onError(Throwable t) {
// for now we just log a msg
LOG.info("{}: ContainerCommand send on error. Exception: {}", t);
}
@Override
public void onCompleted() {
if (isClosed.compareAndSet(false, true)) {
LOG.info("{}: ContainerCommand send completed");
responseObserver.onCompleted();
}
}
};
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.server;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.ratis.shaded.io.grpc.Server;
import org.apache.ratis.shaded.io.grpc.ServerBuilder;
import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
/**
* Creates a Grpc server endpoint that acts as the communication layer for
* Ozone containers.
*/
public final class XceiverServerGrpc implements XceiverServerSpi {
private static final Logger
LOG = LoggerFactory.getLogger(XceiverServerGrpc.class);
private int port;
private Server server;
/**
* Constructs a Grpc server class.
*
* @param conf - Configuration
*/
public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
ContainerDispatcher dispatcher) {
Preconditions.checkNotNull(conf);
this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
// Get an available port on current node and
// use that as the container port
if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
try (ServerSocket socket = new ServerSocket()) {
socket.setReuseAddress(true);
SocketAddress address = new InetSocketAddress(0);
socket.bind(address);
this.port = socket.getLocalPort();
LOG.info("Found a free port for the server : {}", this.port);
} catch (IOException e) {
LOG.error("Unable find a random free port for the server, "
+ "fallback to use default port {}", this.port, e);
}
}
datanodeDetails.setContainerPort(port);
server = ((NettyServerBuilder) ServerBuilder.forPort(port))
.maxMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
.addService(new GrpcXceiverService(dispatcher))
.build();
}
@Override
public int getIPCPort() {
return this.port;
}
/**
* Returns the Replication type supported by this end-point.
*
* @return enum -- {Stand_Alone, Ratis, Grpc, Chained}
*/
@Override
public HddsProtos.ReplicationType getServerType() {
return HddsProtos.ReplicationType.STAND_ALONE;
}
@Override
public void start() throws IOException {
server.start();
}
@Override
public void stop() {
server.shutdown();
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -39,6 +40,8 @@ import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.common.statemachine.background
.BlockDeletingService;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.ozone.container.common.transport.server
.XceiverServerGrpc;
import org.apache.hadoop.ozone.container.common.transport.server
.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
@ -121,8 +124,14 @@ public class OzoneContainer {
this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
boolean useGrpc = this.ozoneConfig.getBoolean(
ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT);
server = new XceiverServerSpi[]{
new XceiverServer(datanodeDetails, this.ozoneConfig, this.dispatcher),
useGrpc ? new XceiverServerGrpc(datanodeDetails,
this.ozoneConfig, this.dispatcher) :
new XceiverServer(datanodeDetails,
this.ozoneConfig, this.dispatcher),
XceiverServerRatis
.newXceiverServerRatis(datanodeDetails, this.ozoneConfig, dispatcher)
};

View File

@ -220,13 +220,13 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
datanodeService.stop();
datanodeService.join();
// ensure same ports are used across restarts.
Configuration conf = datanodeService.getConf();
Configuration config = datanodeService.getConf();
int currentPort = datanodeService.getDatanodeDetails().getContainerPort();
conf.setInt(DFS_CONTAINER_IPC_PORT, currentPort);
conf.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false);
config.setInt(DFS_CONTAINER_IPC_PORT, currentPort);
config.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false);
int ratisPort = datanodeService.getDatanodeDetails().getRatisPort();
conf.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
conf.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
config.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
config.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
datanodeService.start(null);
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.scm;
import com.google.common.cache.Cache;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@ -30,13 +31,17 @@ import org.apache.hadoop.hdds.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.junit.Assert;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import static org.apache.hadoop.hdds.scm
.ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
@ -44,19 +49,32 @@ import static org.apache.hadoop.hdds.scm
/**
* Test for XceiverClientManager caching and eviction.
*/
@RunWith(Parameterized.class)
public class TestXceiverClientManager {
private static OzoneConfiguration config;
private static MiniOzoneCluster cluster;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private static String containerOwner = "OZONE";
private static boolean shouldUseGrpc;
@Parameterized.Parameters
public static Collection<Object[]> withGrpc() {
return Arrays.asList(new Object[][] {{false}, {true}});
}
public TestXceiverClientManager(boolean useGrpc) {
shouldUseGrpc = useGrpc;
}
@Rule
public ExpectedException exception = ExpectedException.none();
@BeforeClass
public static void init() throws Exception {
@Before
public void init() throws Exception {
config = new OzoneConfiguration();
config.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
shouldUseGrpc);
cluster = MiniOzoneCluster.newBuilder(config)
.setNumDatanodes(3)
.build();
@ -65,8 +83,8 @@ public class TestXceiverClientManager {
.getStorageContainerLocationClient();
}
@AfterClass
public static void shutdown() {
@After
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
@ -76,6 +94,8 @@ public class TestXceiverClientManager {
@Test
public void testCaching() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
shouldUseGrpc);
XceiverClientManager clientManager = new XceiverClientManager(conf);
ContainerInfo container1 = storageContainerLocationClient
@ -106,6 +126,8 @@ public class TestXceiverClientManager {
public void testFreeByReference() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
shouldUseGrpc);
XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<Long, XceiverClientSpi> cache =
clientManager.getClientCache();
@ -140,10 +162,18 @@ public class TestXceiverClientManager {
// After releasing the client, this connection should be closed
// and any container operations should fail
clientManager.releaseClient(client1);
exception.expect(IOException.class);
exception.expectMessage("This channel is not connected.");
ContainerProtocolCalls.createContainer(client1,
container1.getContainerID(), traceID1);
String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" :
"This channel is not connected.";
try {
ContainerProtocolCalls.createContainer(client1,
container1.getContainerID(), traceID1);
Assert.fail("Create container should throw exception on closed"
+ "client");
} catch (Exception e) {
Assert.assertEquals(e.getClass(), IOException.class);
Assert.assertTrue(e.getMessage().contains(expectedMessage));
}
clientManager.releaseClient(client2);
}
@ -151,6 +181,8 @@ public class TestXceiverClientManager {
public void testFreeByEviction() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
shouldUseGrpc);
XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<Long, XceiverClientSpi> cache =
clientManager.getClientCache();
@ -181,10 +213,17 @@ public class TestXceiverClientManager {
// Any container operation should now fail
String traceID2 = "trace" + RandomStringUtils.randomNumeric(4);
exception.expect(IOException.class);
exception.expectMessage("This channel is not connected.");
ContainerProtocolCalls.createContainer(client1,
container1.getContainerID(), traceID2);
String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" :
"This channel is not connected.";
try {
ContainerProtocolCalls.createContainer(client1,
container1.getContainerID(), traceID2);
Assert.fail("Create container should throw exception on closed"
+ "client");
} catch (Exception e) {
Assert.assertEquals(e.getClass(), IOException.class);
Assert.assertTrue(e.getMessage().contains(expectedMessage));
}
clientManager.releaseClient(client2);
}
}

View File

@ -107,6 +107,7 @@
<!-- Maven protoc compiler -->
<protobuf-maven-plugin.version>0.5.1</protobuf-maven-plugin.version>
<protobuf-compile.version>3.5.0</protobuf-compile.version>
<grpc.version>1.10.0</grpc.version>
<os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
<!-- define the Java language version used by the compiler -->