HDDS-389. Remove XceiverServer and XceiverClient and related classes. Contributed by chencan.

This commit is contained in:
Nanda kumar 2018-09-15 00:18:52 +05:30
parent 446cb8301e
commit c1df3084ff
9 changed files with 36 additions and 814 deletions

View File

@ -1,209 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.ratis.shaded.io.netty.bootstrap.Bootstrap;
import org.apache.ratis.shaded.io.netty.channel.Channel;
import org.apache.ratis.shaded.io.netty.channel.EventLoopGroup;
import org.apache.ratis.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.ratis.shaded.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.ratis.shaded.io.netty.handler.logging.LogLevel;
import org.apache.ratis.shaded.io.netty.handler.logging.LoggingHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
/**
* A Client for the storageContainer protocol.
*/
public class XceiverClient extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
private final Pipeline pipeline;
private final Configuration config;
private Channel channel;
private Bootstrap b;
private EventLoopGroup group;
private final Semaphore semaphore;
private boolean closed = false;
/**
* Constructs a client that can communicate with the Container framework on
* data nodes.
*
* @param pipeline - Pipeline that defines the machines.
* @param config -- Ozone Config
*/
public XceiverClient(Pipeline pipeline, Configuration config) {
super();
Preconditions.checkNotNull(pipeline);
Preconditions.checkNotNull(config);
this.pipeline = pipeline;
this.config = config;
this.semaphore =
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
}
@Override
public void connect() throws Exception {
if (closed) {
throw new IOException("This channel is not connected.");
}
if (channel != null && channel.isActive()) {
throw new IOException("This client is already connected to a host.");
}
group = new NioEventLoopGroup();
b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new XceiverClientInitializer(this.pipeline, semaphore));
DatanodeDetails leader = this.pipeline.getLeader();
// read port from the data node, on failure use default configured
// port.
int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
if (port == 0) {
port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
}
LOG.debug("Connecting to server Port : " + port);
channel = b.connect(leader.getHostName(), port).sync().channel();
}
public void reconnect() throws IOException {
try {
connect();
if (channel == null || !channel.isActive()) {
throw new IOException("This channel is not connected.");
}
} catch (Exception e) {
LOG.error("Error while connecting: ", e);
throw new IOException(e);
}
}
/**
* Returns if the exceiver client connects to a server.
*
* @return True if the connection is alive, false otherwise.
*/
@VisibleForTesting
public boolean isConnected() {
return channel.isActive();
}
@Override
public void close() {
closed = true;
if (group != null) {
group.shutdownGracefully().awaitUninterruptibly();
}
}
@Override
public Pipeline getPipeline() {
return pipeline;
}
@Override
public ContainerProtos.ContainerCommandResponseProto sendCommand(
ContainerProtos.ContainerCommandRequestProto request) throws IOException {
try {
if ((channel == null) || (!channel.isActive())) {
reconnect();
}
XceiverClientHandler handler =
channel.pipeline().get(XceiverClientHandler.class);
return handler.sendCommand(request);
} catch (ExecutionException | InterruptedException e) {
/**
* In case the netty channel handler throws an exception,
* the exception thrown will be wrapped within {@link ExecutionException}.
* Unwarpping here so that original exception gets passed
* to to the client.
*/
if (e instanceof ExecutionException) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
}
}
throw new IOException(
"Unexpected exception during execution:" + e.getMessage());
}
}
/**
* Sends a given command to server gets a waitable future back.
*
* @param request Request
* @return Response to the command
* @throws IOException
*/
@Override
public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request)
throws IOException, ExecutionException, InterruptedException {
if ((channel == null) || (!channel.isActive())) {
reconnect();
}
XceiverClientHandler handler =
channel.pipeline().get(XceiverClientHandler.class);
return handler.sendCommandAsync(request);
}
/**
* Create a pipeline.
*/
@Override
public void createPipeline()
throws IOException {
// For stand alone pipeline, there is no notion called setup pipeline.
}
public void destroyPipeline() {
// For stand alone pipeline, there is no notion called destroy pipeline.
}
/**
* Returns pipeline Type.
*
* @return - Stand Alone as the type.
*/
@Override
public HddsProtos.ReplicationType getPipelineType() {
return HddsProtos.ReplicationType.STAND_ALONE;
}
}

View File

@ -1,202 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;
import com.google.common.base.Preconditions;
import org.apache.ratis.shaded.io.netty.channel.Channel;
import org.apache.ratis.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.shaded.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
/**
* Netty client handler.
*/
public class XceiverClientHandler extends
SimpleChannelInboundHandler<ContainerCommandResponseProto> {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
private final ConcurrentMap<String, ResponseFuture> responses =
new ConcurrentHashMap<>();
private final Pipeline pipeline;
private volatile Channel channel;
private XceiverClientMetrics metrics;
private final Semaphore semaphore;
/**
* Constructs a client that can communicate to a container server.
*/
public XceiverClientHandler(Pipeline pipeline, Semaphore semaphore) {
super(false);
Preconditions.checkNotNull(pipeline);
this.pipeline = pipeline;
this.metrics = XceiverClientManager.getXceiverClientMetrics();
this.semaphore = semaphore;
}
/**
* <strong>Please keep in mind that this method will be renamed to {@code
* messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
* <p>
* Is called for each message of type {@link ContainerProtos
* .ContainerCommandResponseProto}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link
* SimpleChannelInboundHandler} belongs to
* @param msg the message to handle
* @throws Exception is thrown if an error occurred
*/
@Override
public void channelRead0(ChannelHandlerContext ctx,
ContainerProtos.ContainerCommandResponseProto msg)
throws Exception {
Preconditions.checkNotNull(msg);
metrics.decrPendingContainerOpsMetrics(msg.getCmdType());
String key = msg.getTraceID();
ResponseFuture response = responses.remove(key);
semaphore.release();
if (response != null) {
response.getFuture().complete(msg);
long requestTime = response.getRequestTime();
metrics.addContainerOpsLatency(msg.getCmdType(),
Time.monotonicNowNanos() - requestTime);
} else {
LOG.error("A reply received for message that was not queued. trace " +
"ID: {}", msg.getTraceID());
}
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
LOG.debug("channelRegistered: Connected to ctx");
channel = ctx.channel();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.info("Exception in client " + cause.toString());
Iterator<String> keyIterator = responses.keySet().iterator();
while (keyIterator.hasNext()) {
ResponseFuture response = responses.remove(keyIterator.next());
response.getFuture().completeExceptionally(cause);
semaphore.release();
}
ctx.close();
}
/**
* Since netty is async, we send a work request and then wait until a response
* appears in the reply queue. This is simple sync interface for clients. we
* should consider building async interfaces for client if this turns out to
* be a performance bottleneck.
*
* @param request - request.
* @return -- response
*/
public ContainerCommandResponseProto sendCommand(
ContainerProtos.ContainerCommandRequestProto request)
throws ExecutionException, InterruptedException {
Future<ContainerCommandResponseProto> future = sendCommandAsync(request);
return future.get();
}
/**
* SendCommandAsyc queues a command to the Netty Subsystem and returns a
* CompletableFuture. This Future is marked compeleted in the channelRead0
* when the call comes back.
* @param request - Request to execute
* @return CompletableFuture
*/
public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
ContainerProtos.ContainerCommandRequestProto request)
throws InterruptedException {
// Throw an exception of request doesn't have traceId
if (StringUtils.isEmpty(request.getTraceID())) {
throw new IllegalArgumentException("Invalid trace ID");
}
// Setting the datanode ID in the commands, so that we can distinguish
// commands when the cluster simulator is running.
if(!request.hasDatanodeUuid()) {
throw new IllegalArgumentException("Invalid Datanode ID");
}
metrics.incrPendingContainerOpsMetrics(request.getCmdType());
CompletableFuture<ContainerCommandResponseProto> future
= new CompletableFuture<>();
ResponseFuture response = new ResponseFuture(future,
Time.monotonicNowNanos());
semaphore.acquire();
ResponseFuture previous = responses.putIfAbsent(
request.getTraceID(), response);
if (previous != null) {
LOG.error("Command with Trace already exists. Ignoring this command. " +
"{}. Previous Command: {}", request.getTraceID(),
previous.toString());
throw new IllegalStateException("Duplicate trace ID. Command with this " +
"trace ID is already executing. Please ensure that " +
"trace IDs are not reused. ID: " + request.getTraceID());
}
channel.writeAndFlush(request);
return response.getFuture();
}
/**
* Class wraps response future info.
*/
static class ResponseFuture {
private final long requestTime;
private final CompletableFuture<ContainerCommandResponseProto> future;
ResponseFuture(CompletableFuture<ContainerCommandResponseProto> future,
long requestTime) {
this.future = future;
this.requestTime = requestTime;
}
public long getRequestTime() {
return requestTime;
}
public CompletableFuture<ContainerCommandResponseProto> getFuture() {
return future;
}
}
}

View File

@ -1,74 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;
import org.apache.ratis.shaded.io.netty.channel.ChannelInitializer;
import org.apache.ratis.shaded.io.netty.channel.ChannelPipeline;
import org.apache.ratis.shaded.io.netty.channel.socket.SocketChannel;
import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
import org.apache.ratis.shaded.io.netty.handler.codec.protobuf
.ProtobufVarint32FrameDecoder;
import org.apache.ratis.shaded.io.netty.handler.codec.protobuf
.ProtobufVarint32LengthFieldPrepender;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import java.util.concurrent.Semaphore;
/**
* Setup the netty pipeline.
*/
public class XceiverClientInitializer extends
ChannelInitializer<SocketChannel> {
private final Pipeline pipeline;
private final Semaphore semaphore;
/**
* Constructs an Initializer for the client pipeline.
* @param pipeline - Pipeline.
*/
public XceiverClientInitializer(Pipeline pipeline, Semaphore semaphore) {
this.pipeline = pipeline;
this.semaphore = semaphore;
}
/**
* This method will be called once when the Channel is registered. After
* the method returns this instance will be removed from the
* ChannelPipeline of the Channel.
*
* @param ch Channel which was registered.
* @throws Exception is thrown if an error occurs. In that case the
* Channel will be closed.
*/
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(ContainerProtos
.ContainerCommandResponseProto.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new XceiverClientHandler(this.pipeline, this.semaphore));
}
}

View File

@ -1,140 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.server;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.ratis.shaded.io.netty.bootstrap.ServerBootstrap;
import org.apache.ratis.shaded.io.netty.channel.Channel;
import org.apache.ratis.shaded.io.netty.channel.EventLoopGroup;
import org.apache.ratis.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.ratis.shaded.io.netty.channel.socket.nio
.NioServerSocketChannel;
import org.apache.ratis.shaded.io.netty.handler.logging.LogLevel;
import org.apache.ratis.shaded.io.netty.handler.logging.LoggingHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
/**
* Creates a netty server endpoint that acts as the communication layer for
* Ozone containers.
*/
public final class XceiverServer implements XceiverServerSpi {
private static final Logger
LOG = LoggerFactory.getLogger(XceiverServer.class);
private int port;
private final ContainerDispatcher storageContainer;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel channel;
/**
* Constructs a netty server class.
*
* @param conf - Configuration
*/
public XceiverServer(DatanodeDetails datanodeDetails, Configuration conf,
ContainerDispatcher dispatcher) {
Preconditions.checkNotNull(conf);
this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
// Get an available port on current node and
// use that as the container port
if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
try (ServerSocket socket = new ServerSocket()) {
socket.setReuseAddress(true);
SocketAddress address = new InetSocketAddress(0);
socket.bind(address);
this.port = socket.getLocalPort();
LOG.info("Found a free port for the server : {}", this.port);
} catch (IOException e) {
LOG.error("Unable find a random free port for the server, "
+ "fallback to use default port {}", this.port, e);
}
}
datanodeDetails.setPort(
DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port));
this.storageContainer = dispatcher;
}
@Override
public int getIPCPort() {
return this.port;
}
/**
* Returns the Replication type supported by this end-point.
*
* @return enum -- {Stand_Alone, Ratis, Chained}
*/
@Override
public HddsProtos.ReplicationType getServerType() {
return HddsProtos.ReplicationType.STAND_ALONE;
}
@Override
public void start() throws IOException {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
channel = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new XceiverServerInitializer(storageContainer))
.bind(port)
.syncUninterruptibly()
.channel();
}
@Override
public void stop() {
if (storageContainer != null) {
storageContainer.shutdown();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
if (channel != null) {
channel.close().awaitUninterruptibly();
}
}
@Override
public void submitRequest(ContainerCommandRequestProto request,
HddsProtos.PipelineID pipelineID) {
storageContainer.dispatch(request);
}
}

View File

@ -1,82 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.server;
import org.apache.ratis.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.shaded.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Netty server handlers that respond to Network events.
*/
public class XceiverServerHandler extends
SimpleChannelInboundHandler<ContainerCommandRequestProto> {
static final Logger LOG = LoggerFactory.getLogger(XceiverServerHandler.class);
private final ContainerDispatcher dispatcher;
/**
* Constructor for server handler.
* @param dispatcher - Dispatcher interface
*/
public XceiverServerHandler(ContainerDispatcher dispatcher) {
this.dispatcher = dispatcher;
}
/**
* <strong>Please keep in mind that this method will be renamed to {@code
* messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
* <p>
* Is called for each message of type {@link ContainerCommandRequestProto}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link
* SimpleChannelInboundHandler} belongs to
* @param msg the message to handle
* @throws Exception is thrown if an error occurred
*/
@Override
public void channelRead0(ChannelHandlerContext ctx,
ContainerCommandRequestProto msg) throws
Exception {
ContainerCommandResponseProto response = this.dispatcher.dispatch(msg);
LOG.debug("Writing the reponse back to client.");
ctx.writeAndFlush(response);
}
/**
* Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)}
* Sub-classes may override this method to change behavior.
*
* @param ctx - Channel Handler Context
* @param cause - Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
LOG.error("An exception caught in the pipeline : " + cause.toString());
super.exceptionCaught(ctx, cause);
}
}

View File

@ -1,64 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.server;
import com.google.common.base.Preconditions;
import org.apache.ratis.shaded.io.netty.channel.ChannelInitializer;
import org.apache.ratis.shaded.io.netty.channel.ChannelPipeline;
import org.apache.ratis.shaded.io.netty.channel.socket.SocketChannel;
import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
import org.apache.ratis.shaded.io.netty.handler.codec.protobuf
.ProtobufVarint32FrameDecoder;
import org.apache.ratis.shaded.io.netty.handler.codec.protobuf
.ProtobufVarint32LengthFieldPrepender;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
/**
* Creates a channel for the XceiverServer.
*/
public class XceiverServerInitializer extends ChannelInitializer<SocketChannel>{
private final ContainerDispatcher dispatcher;
public XceiverServerInitializer(ContainerDispatcher dispatcher) {
Preconditions.checkNotNull(dispatcher);
this.dispatcher = dispatcher;
}
/**
* This method will be called once the Channel is registered. After
* the method returns this instance will be removed from the {@link
* ChannelPipeline}
*
* @param ch the which was registered.
* @throws Exception is thrown if an error occurs. In that case the channel
* will be closed.
*/
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(ContainerCommandRequestProto
.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new XceiverServerHandler(dispatcher));
}
}

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachin
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.XceiverClient;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.test.TestGenericTestUtils;
@ -100,7 +100,7 @@ public class TestMiniOzoneCluster {
pipeline.addMember(datanodeDetails);
// Verify client is able to connect to the container
try (XceiverClient client = new XceiverClient(pipeline, conf)){
try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){
client.connect();
assertTrue(client.isConnected());
}

View File

@ -24,6 +24,7 @@ import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -37,11 +38,12 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.XceiverClient;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
@ -54,10 +56,16 @@ import java.util.UUID;
*/
public class TestContainerMetrics {
private GrpcReplicationService createReplicationService(
ContainerSet containerSet) {
return new GrpcReplicationService(
new OnDemandContainerReplicationSource(containerSet));
}
@Test
public void testContainerMetrics() throws Exception {
XceiverServer server = null;
XceiverClient client = null;
XceiverServerGrpc server = null;
XceiverClientGrpc client = null;
long containerID = ContainerTestHelper.getTestContainerID();
String path = GenericTestUtils.getRandomizedTempPath();
@ -81,8 +89,9 @@ public class TestContainerMetrics {
volumeSet, null);
dispatcher.setScmId(UUID.randomUUID().toString());
server = new XceiverServer(datanodeDetails, conf, dispatcher);
client = new XceiverClient(pipeline, conf);
server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,
createReplicationService(containerSet));
client = new XceiverClientGrpc(pipeline, conf);
server.start();
client.connect();

View File

@ -22,6 +22,8 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
import org.apache.ratis.shaded.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@ -36,12 +38,11 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerHandler;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.hdds.scm.XceiverClient;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
@ -70,43 +71,24 @@ public class TestContainerServer {
static final String TEST_DIR
= GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
@Test
public void testPipeline() throws IOException {
EmbeddedChannel channel = null;
String containerName = OzoneUtils.getRequestID();
try {
channel = new EmbeddedChannel(new XceiverServerHandler(
new TestContainerDispatcher()));
ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(
ContainerTestHelper.getTestContainerID(),
ContainerTestHelper.createSingleNodePipeline());
channel.writeInbound(request);
Assert.assertTrue(channel.finish());
Object responseObject = channel.readOutbound();
Assert.assertTrue(responseObject instanceof
ContainerCommandResponseProto);
ContainerCommandResponseProto response =
(ContainerCommandResponseProto) responseObject;
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
} finally {
if (channel != null) {
channel.close();
}
}
private GrpcReplicationService createReplicationService(
ContainerSet containerSet) {
return new GrpcReplicationService(
new OnDemandContainerReplicationSource(containerSet));
}
@Test
public void testClientServer() throws Exception {
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
ContainerSet containerSet = new ContainerSet();
runTestClientServer(1,
(pipeline, conf) -> conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader()
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()),
XceiverClient::new,
(dn, conf) -> new XceiverServer(datanodeDetails, conf,
new TestContainerDispatcher()),
XceiverClientGrpc::new,
(dn, conf) -> new XceiverServerGrpc(datanodeDetails, conf,
new TestContainerDispatcher(),
createReplicationService(containerSet)),
(dn, p) -> {});
}
@ -193,8 +175,8 @@ public class TestContainerServer {
@Test
public void testClientServerWithContainerDispatcher() throws Exception {
XceiverServer server = null;
XceiverClient client = null;
XceiverServerGrpc server = null;
XceiverClientGrpc client = null;
try {
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
@ -203,12 +185,14 @@ public class TestContainerServer {
pipeline.getLeader()
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue());
ContainerSet containerSet = new ContainerSet();
HddsDispatcher dispatcher = new HddsDispatcher(
conf, mock(ContainerSet.class), mock(VolumeSet.class), null);
dispatcher.init();
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
server = new XceiverServer(datanodeDetails, conf, dispatcher);
client = new XceiverClient(pipeline, conf);
server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,
createReplicationService(containerSet));
client = new XceiverClientGrpc(pipeline, conf);
server.start();
client.connect();