HDFS-11580. Ozone: Support asynchronus client API for SCM and containers. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2017-08-01 12:32:11 -07:00 committed by Owen O'Malley
parent 1ad95cf2a9
commit 9cf40547ce
7 changed files with 193 additions and 35 deletions

View File

@ -35,6 +35,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
@ -51,6 +53,7 @@ public class XceiverClient extends XceiverClientSpi {
/**
* Constructs a client that can communicate with the Container framework on
* data nodes.
*
* @param pipeline - Pipeline that defines the machines.
* @param config -- Ozone Config
*/
@ -91,6 +94,7 @@ public class XceiverClient extends XceiverClientSpi {
/**
* Returns if the exceiver client connects to a server.
*
* @return True if the connection is alive, false otherwise.
*/
@VisibleForTesting
@ -100,7 +104,7 @@ public class XceiverClient extends XceiverClientSpi {
@Override
public void close() {
if(group != null) {
if (group != null) {
group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
@ -118,12 +122,35 @@ public class XceiverClient extends XceiverClientSpi {
public ContainerProtos.ContainerCommandResponseProto sendCommand(
ContainerProtos.ContainerCommandRequestProto request)
throws IOException {
if((channelFuture == null) || (!channelFuture.channel().isActive())) {
try {
if ((channelFuture == null) || (!channelFuture.channel().isActive())) {
throw new IOException("This channel is not connected.");
}
XceiverClientHandler handler =
channelFuture.channel().pipeline().get(XceiverClientHandler.class);
return handler.sendCommand(request);
return handler.sendCommand(request);
} catch (ExecutionException | InterruptedException e) {
throw new IOException("Unexpected exception during execution", e);
}
}
/**
* Sends a given command to server gets a waitable future back.
*
* @param request Request
* @return Response to the command
* @throws IOException
*/
@Override
public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request)
throws IOException, ExecutionException, InterruptedException {
if ((channelFuture == null) || (!channelFuture.channel().isActive())) {
throw new IOException("This channel is not connected.");
}
XceiverClientHandler handler =
channelFuture.channel().pipeline().get(XceiverClientHandler.class);
return handler.sendCommandAsync(request);
}
}

View File

@ -17,26 +17,36 @@
*/
package org.apache.hadoop.scm;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* Netty client handler.
*/
public class XceiverClientHandler extends
SimpleChannelInboundHandler<ContainerProtos.ContainerCommandResponseProto> {
SimpleChannelInboundHandler<ContainerCommandResponseProto> {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
private final BlockingQueue<ContainerProtos.ContainerCommandResponseProto>
responses = new LinkedBlockingQueue<>();
private final ConcurrentMap<String,
CompletableFuture<ContainerCommandResponseProto>> responses =
new ConcurrentHashMap<>();
private final Pipeline pipeline;
private volatile Channel channel;
@ -56,15 +66,24 @@ public class XceiverClientHandler extends
* .ContainerCommandResponseProto}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link
* SimpleChannelInboundHandler} belongs to
* SimpleChannelInboundHandler} belongs to
* @param msg the message to handle
* @throws Exception is thrown if an error occurred
*/
@Override
public void channelRead0(ChannelHandlerContext ctx,
ContainerProtos.ContainerCommandResponseProto msg)
ContainerProtos.ContainerCommandResponseProto msg)
throws Exception {
responses.add(msg);
Preconditions.checkNotNull(msg);
String key = msg.getTraceID();
CompletableFuture<ContainerCommandResponseProto> future =
responses.remove(key);
if (future != null) {
future.complete(msg);
} else {
LOG.error("A reply received for message that was not queued. trace " +
"ID: {}", msg.getTraceID());
}
}
@Override
@ -88,25 +107,39 @@ public class XceiverClientHandler extends
* @param request - request.
* @return -- response
*/
public ContainerProtos.ContainerCommandResponseProto
sendCommand(ContainerProtos.ContainerCommandRequestProto request) {
ContainerProtos.ContainerCommandResponseProto response;
channel.writeAndFlush(request);
boolean interrupted = false;
for (;;) {
try {
response = responses.take();
break;
} catch (InterruptedException ignore) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return response;
public ContainerCommandResponseProto
sendCommand(ContainerProtos.ContainerCommandRequestProto request)
throws ExecutionException, InterruptedException {
Future<ContainerCommandResponseProto> future = sendCommandAsync(request);
return future.get();
}
/**
* SendCommandAsyc queues a command to the Netty Subsystem and returns a
* CompletableFuture. This Future is marked compeleted in the channelRead0
* when the call comes back.
* @param request - Request to execute
* @return CompletableFuture
*/
public CompletableFuture<ContainerCommandResponseProto>
sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request) {
CompletableFuture<ContainerCommandResponseProto> response =
new CompletableFuture<>();
CompletableFuture<ContainerCommandResponseProto> previous =
responses.putIfAbsent(request.getTraceID(), response);
if (previous != null) {
LOG.error("Command with Trace already exists. Ignoring this command. " +
"{}. Previous Command: {}", request.getTraceID(),
previous.toString());
throw new IllegalStateException("Duplicate trace ID. Command with this " +
"trace ID is already executing. Please ensure that " +
"trace IDs are not reused. ID: " + request.getTraceID());
}
channel.writeAndFlush(request);
return response;
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.scm;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.ratis.RatisHelper;
@ -34,6 +35,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -103,4 +106,18 @@ public final class XceiverClientRatis extends XceiverClientSpi {
return ContainerCommandResponseProto.parseFrom(
ShadedProtoUtil.asByteString(reply.getMessage().getContent()));
}
/**
* Sends a given command to server gets a waitable future back.
*
* @param request Request
* @return Response to the command
* @throws IOException
*/
@Override
public CompletableFuture<ContainerCommandResponseProto>
sendCommandAsync(ContainerCommandRequestProto request)
throws IOException, ExecutionException, InterruptedException {
throw new IOException("Not implemented");
}
}

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -94,4 +96,15 @@ public abstract class XceiverClientSpi implements Closeable {
*/
public abstract ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException;
/**
* Sends a given command to server gets a waitable future back.
* @param request Request
* @return Response to the command
* @throws IOException
*/
public abstract CompletableFuture<ContainerCommandResponseProto>
sendCommandAsync(ContainerCommandRequestProto request) throws IOException,
ExecutionException, InterruptedException;
}

View File

@ -59,7 +59,7 @@ import org.apache.hadoop.scm.XceiverClientSpi;
* Implementation of all container protocol calls performed by Container
* clients.
*/
public final class ContainerProtocolCalls {
public final class ContainerProtocolCalls {
/**
* There is no need to instantiate this class.

View File

@ -248,6 +248,7 @@ public final class ContainerTestHelper {
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.PutSmallFile);
request.setPutSmallFile(smallFileRequest);
request.setTraceID(UUID.randomUUID().toString());
return request.build();
}
@ -517,7 +518,8 @@ public final class ContainerTestHelper {
pipeline.getProtobufMessage()).build();
ContainerProtos.ContainerCommandRequestProto cmd =
ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos
.Type.CloseContainer).setCloseContainer(closeReqeuest).build();
.Type.CloseContainer).setCloseContainer(closeReqeuest)
.build();
return cmd;
}
@ -533,7 +535,9 @@ public final class ContainerTestHelper {
ContainerProtos.DeleteContainerRequestProto.newBuilder().setName(
pipeline.getContainerName()).setPipeline(
pipeline.getProtobufMessage()).setForceDelete(forceDelete).build();
return ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos
.Type.DeleteContainer).setDeleteContainer(deleteRequest).build();
return ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.DeleteContainer)
.setDeleteContainer(deleteRequest)
.build();
}
}

View File

@ -28,14 +28,17 @@ import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* Tests ozone containers.
@ -226,7 +229,6 @@ public class TestOzoneContainer {
final ContainerProtos.ContainerCommandRequestProto smallFileRequest
= ContainerTestHelper.getWriteSmallFileRequest(
client.getPipeline(), containerName, keyName, 1024);
ContainerProtos.ContainerCommandResponseProto response
= client.sendCommand(smallFileRequest);
Assert.assertNotNull(response);
@ -247,6 +249,8 @@ public class TestOzoneContainer {
}
}
@Test
public void testCloseContainer() throws Exception {
MiniOzoneCluster cluster = null;
@ -415,6 +419,66 @@ public class TestOzoneContainer {
}
}
// Runs a set of commands as Async calls and verifies that calls indeed worked
// as expected.
static void runAsyncTests(
String containerName, XceiverClientSpi client) throws Exception {
try {
client.connect();
createContainerForTesting(client, containerName);
final List<CompletableFuture> computeResults = new LinkedList<>();
int requestCount = 1000;
// Create a bunch of Async calls from this test.
for(int x = 0; x <requestCount; x++) {
String keyName = OzoneUtils.getRequestID();
final ContainerProtos.ContainerCommandRequestProto smallFileRequest
= ContainerTestHelper.getWriteSmallFileRequest(
client.getPipeline(), containerName, keyName, 1024);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
response = client.sendCommandAsync(smallFileRequest);
computeResults.add(response);
}
CompletableFuture<Void> combinedFuture =
CompletableFuture.allOf(computeResults.toArray(
new CompletableFuture[computeResults.size()]));
// Wait for all futures to complete.
combinedFuture.get();
// Assert that all futures are indeed done.
for (CompletableFuture future : computeResults) {
Assert.assertTrue(future.isDone());
}
} finally {
if (client != null) {
client.close();
}
}
}
@Test
public void testXcieverClientAsync() throws Exception {
MiniOzoneCluster cluster = null;
XceiverClient client = null;
try {
OzoneConfiguration conf = newOzoneConfiguration();
client = createClientForTesting(conf);
cluster = new MiniOzoneCluster.Builder(conf)
.setRandomContainerPort(false)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
String containerName = client.getPipeline().getContainerName();
runAsyncTests(containerName, client);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private static XceiverClient createClientForTesting(OzoneConfiguration conf)
throws Exception {
String containerName = OzoneUtils.getRequestID();