From 979dfe4c2ef5775dfbe8dcbcb07d3138a7257707 Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Tue, 8 Mar 2016 10:29:27 -0800 Subject: [PATCH] HDFS-9891. Ozone: Add container transport client. Contributed by Anu Engineer. --- .../hadoop/hdfs/protocol/DatanodeID.java | 53 +++++++ .../src/main/proto/hdfs.proto | 1 + .../hadoop-hdfs/CHANGES-HDFS-7240.txt | 2 + .../ozone/container/helpers/Pipeline.java | 132 ++++++++++++++++++ .../transport/client/XceiverClient.java | 122 ++++++++++++++++ .../client/XceiverClientHandler.java | 112 +++++++++++++++ .../client/XceiverClientInitializer.java | 68 +++++++++ .../ozone/container/ContainerTestHelper.java | 103 ++++++++++++++ .../transport/server/TestContainerServer.java | 71 ++++++++-- 9 files changed, 650 insertions(+), 14 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClient.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientHandler.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientInitializer.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java index 5fd845d9c73..30e946d3ec5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; /** * This class represents the primary identifier for a Datanode. @@ -49,6 +50,7 @@ public class DatanodeID implements Comparable { private int infoSecurePort; // info server port private int ipcPort; // IPC server port private String xferAddr; + private int containerPort; // container server port. /** * UUID identifying a given datanode. For upgraded Datanodes this is the @@ -274,4 +276,55 @@ public class DatanodeID implements Comparable { public int compareTo(DatanodeID that) { return getXferAddr().compareTo(that.getXferAddr()); } + + /** + * Returns the container port. + * @return Port + */ + public int getContainerPort() { + return containerPort; + } + + /** + * Sets the container port. + * @param containerPort - container port. + */ + public void setContainerPort(int containerPort) { + this.containerPort = containerPort; + } + + /** + * Returns a DataNode ID from the protocol buffers. + * + * @param datanodeIDProto - protoBuf Message + * @return DataNodeID + */ + public static DatanodeID getFromProtoBuf(HdfsProtos.DatanodeIDProto + datanodeIDProto) { + DatanodeID id = new DatanodeID(datanodeIDProto.getDatanodeUuid(), + datanodeIDProto.getIpAddr(), datanodeIDProto.getHostName(), + datanodeIDProto.getXferPort(), datanodeIDProto.getInfoPort(), + datanodeIDProto.getInfoSecurePort(), datanodeIDProto.getIpcPort()); + id.setContainerPort(datanodeIDProto.getContainerPort()); + return id; + } + + /** + * Returns a DataNodeID protobuf message from a datanode ID. + * @return HdfsProtos.DatanodeIDProto + */ + public HdfsProtos.DatanodeIDProto getProtoBufMessage() { + HdfsProtos.DatanodeIDProto.Builder builder = + HdfsProtos.DatanodeIDProto.newBuilder(); + + return builder.setDatanodeUuid(this.getDatanodeUuid()) + .setIpAddr(this.getIpcAddr()) + .setHostName(this.getHostName()) + .setXferPort(this.getXferPort()) + .setInfoPort(this.getInfoPort()) + .setInfoSecurePort(this.getInfoSecurePort()) + .setIpcPort(this.getIpcPort()) + .setContainerPort(this.getContainerPort()) + .build(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index 0db8a3f5084..f9b875cfb5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -58,6 +58,7 @@ message DatanodeIDProto { required uint32 infoPort = 5; // datanode http port required uint32 ipcPort = 6; // ipc server port optional uint32 infoSecurePort = 7 [default = 0]; // datanode https port + optional uint32 containerPort = 8 [default = 0]; // Ozone container protocol } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt index 30f28d2de0e..9f4fcf54e49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt @@ -41,3 +41,5 @@ (Anu Engineer via cnauroth) HDFS-9873. Ozone: Add container transport server (Anu Engineer via cnauroth) + + HDFS-9891. Ozone: Add container transport client (Anu Engineer via cnauroth) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java new file mode 100644 index 00000000000..d1bcc8d939c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.helpers; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * A pipeline represents the group of machines over which a container lives. + */ +public class Pipeline { + private String containerName; + private String leaderID; + private Map datanodes; + + /** + * Constructs a new pipeline data structure. + * + * @param leaderID - First machine in this pipeline. + */ + public Pipeline(String leaderID) { + this.leaderID = leaderID; + datanodes = new TreeMap<>(); + } + + /** + * Gets pipeline object from protobuf. + * + * @param pipeline - ProtoBuf definition for the pipeline. + * @return Pipeline Object + */ + public static Pipeline getFromProtoBuf(ContainerProtos.Pipeline pipeline) { + Preconditions.checkNotNull(pipeline); + Pipeline newPipeline = new Pipeline(pipeline.getLeaderID()); + for (HdfsProtos.DatanodeIDProto dataID : pipeline.getMembersList()) { + newPipeline.addMember(DatanodeID.getFromProtoBuf(dataID)); + } + if (pipeline.hasContainerName()) { + newPipeline.containerName = newPipeline.getContainerName(); + } + return newPipeline; + } + + /** Adds a member to pipeline */ + + /** + * Adds a member to the pipeline. + * + * @param dataNodeId - Datanode to be added. + */ + public void addMember(DatanodeID dataNodeId) { + datanodes.put(dataNodeId.getDatanodeUuid(), dataNodeId); + } + + /** + * Returns the first machine in the set of datanodes. + * + * @return First Machine. + */ + public DatanodeID getLeader() { + return datanodes.get(leaderID); + } + + /** + * Returns all machines that make up this pipeline. + * + * @return List of Machines. + */ + public List getMachines() { + return new ArrayList<>(datanodes.values()); + } + + /** + * Return a Protobuf Pipeline message from pipeline. + * + * @return Protobuf message + */ + public ContainerProtos.Pipeline getProtobufMessage() { + ContainerProtos.Pipeline.Builder builder = + ContainerProtos.Pipeline.newBuilder(); + for (DatanodeID datanode : datanodes.values()) { + builder.addMembers(datanode.getProtoBufMessage()); + } + builder.setLeaderID(leaderID); + if (this.containerName != null) { + builder.setContainerName(this.containerName); + } + return builder.build(); + } + + /** + * Returns containerName if available. + * + * @return String. + */ + public String getContainerName() { + return containerName; + } + + /** + * Sets the container Name. + * + * @param containerName - Name of the container. + */ + public void setContainerName(String containerName) { + this.containerName = containerName; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClient.java new file mode 100644 index 00000000000..0c2686dbe95 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClient.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.transport.client; + +import com.google.common.base.Preconditions; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.container.helpers.Pipeline; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A Client for the storageContainer protocol. + */ +public class XceiverClient { + static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class); + private final Pipeline pipeline; + private final OzoneConfiguration config; + private ChannelFuture channelFuture; + private Bootstrap b; + private EventLoopGroup group; + + /** + * Constructs a client that can communicate with the Container framework on + * data nodes. + * @param pipeline - Pipeline that defines the machines. + * @param config -- Ozone Config + */ + public XceiverClient(Pipeline pipeline, OzoneConfiguration config) { + Preconditions.checkNotNull(pipeline); + Preconditions.checkNotNull(config); + this.pipeline = pipeline; + this.config = config; + } + + /** + * Connects to the leader in the pipeline. + */ + public void connect() throws Exception { + if (channelFuture != null + && channelFuture.channel() != null + && channelFuture.channel().isActive()) { + throw new IOException("This client is already connected to a host."); + } + + group = new NioEventLoopGroup(); + b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .handler(new XceiverClientInitializer(this.pipeline)); + DatanodeID leader = this.pipeline.getLeader(); + + // read port from the data node, on failure use default configured + // port. + int port = leader.getContainerPort(); + if (port == 0) { + port = config.getInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT, + OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT); + } + LOG.debug("Connecting to server Port : " + port); + channelFuture = b.connect(leader.getHostName(), port).sync(); + } + + /** + * Close the client. + */ + public void close() { + if(group != null) { + group.shutdownGracefully(); + } + + if (channelFuture != null) { + channelFuture.channel().close(); + } + } + + /** + * Sends a given command to server and gets the reply back. + * @param request Request + * @return Response to the command + * @throws IOException + */ + public ContainerProtos.ContainerCommandResponseProto sendCommand( + ContainerProtos.ContainerCommandRequestProto request) + throws IOException { + if((channelFuture == null) || (!channelFuture.channel().isActive())) { + throw new IOException("This channel is not connected."); + } + XceiverClientHandler handler = + channelFuture.channel().pipeline().get(XceiverClientHandler.class); + + return handler.sendCommand(request); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientHandler.java new file mode 100644 index 00000000000..25624f47d93 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientHandler.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.transport.client; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.ozone.container.helpers.Pipeline; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Netty client handler. + */ +public class XceiverClientHandler extends + SimpleChannelInboundHandler { + + static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class); + private final BlockingQueue + responses = new LinkedBlockingQueue<>(); + private final Pipeline pipeline; + private volatile Channel channel; + + /** + * Constructs a client that can communicate to a container server. + */ + public XceiverClientHandler(Pipeline pipeline) { + super(false); + this.pipeline = pipeline; + } + + /** + * Please keep in mind that this method will be renamed to {@code + * messageReceived(ChannelHandlerContext, I)} in 5.0. + *

+ * Is called for each message of type {@link ContainerProtos + * .ContainerCommandResponseProto}. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link + * SimpleChannelInboundHandler} belongs to + * @param msg the message to handle + * @throws Exception is thrown if an error occurred + */ + @Override + public void channelRead0(ChannelHandlerContext ctx, + ContainerProtos.ContainerCommandResponseProto msg) + throws Exception { + responses.add(msg); + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) { + LOG.debug("channelRegistered: Connected to ctx"); + channel = ctx.channel(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + LOG.info("Exception in client " + cause.toString()); + ctx.close(); + } + + /** + * Since netty is async, we send a work request and then wait until a response + * appears in the reply queue. This is simple sync interface for clients. we + * should consider building async interfaces for client if this turns out to + * be a performance bottleneck. + * + * @param request - request. + * @return -- response + */ + public ContainerProtos.ContainerCommandResponseProto + sendCommand(ContainerProtos.ContainerCommandRequestProto request) { + + ContainerProtos.ContainerCommandResponseProto response; + channel.writeAndFlush(request); + boolean interrupted = false; + for (; ; ) { + try { + response = responses.take(); + break; + } catch (InterruptedException ignore) { + interrupted = true; + } + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + return response; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientInitializer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientInitializer.java new file mode 100644 index 00000000000..6951f28913b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientInitializer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.transport.client; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufEncoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import org.apache.hadoop.ozone.container.helpers.Pipeline; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; + +/** + * Setup the netty pipeline. + */ +public class XceiverClientInitializer extends + ChannelInitializer { + private final Pipeline pipeline; + + /** + * Constructs an Initializer for the client pipeline. + * @param pipeline - Pipeline. + */ + public XceiverClientInitializer(Pipeline pipeline) { + this.pipeline = pipeline; + } + + /** + * This method will be called once when the Channel is registered. After + * the method returns this instance will be removed from the + * ChannelPipeline of the Channel. + * + * @param ch Channel which was registered. + * @throws Exception is thrown if an error occurs. In that case the + * Channel will be closed. + */ + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + + p.addLast(new ProtobufVarint32FrameDecoder()); + p.addLast(new ProtobufDecoder(ContainerProtos + .ContainerCommandResponseProto.getDefaultInstance())); + + p.addLast(new ProtobufVarint32LengthFieldPrepender()); + p.addLast(new ProtobufEncoder()); + + p.addLast(new XceiverClientHandler(this.pipeline)); + + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java new file mode 100644 index 00000000000..0622c82c06f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.container.helpers.Pipeline; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.UUID; + +/** + * Helpers for container tests. + */ +public class ContainerTestHelper { + + /** + * Create a pipeline with single node replica. + * + * @return Pipeline with single node in it. + * @throws IOException + */ + public static Pipeline createSingleNodePipeline() throws IOException { + ServerSocket socket = new ServerSocket(0); + int port = socket.getLocalPort(); + DatanodeID datanodeID = new DatanodeID(socket.getInetAddress() + .getHostAddress(), socket.getInetAddress().getHostName(), + UUID.randomUUID().toString(), port, port, port, port); + datanodeID.setContainerPort(port); + Pipeline pipeline = new Pipeline(datanodeID.getDatanodeUuid()); + pipeline.addMember(datanodeID); + socket.close(); + return pipeline; + } + + /** + * Returns a create container command for test purposes. There are a bunch of + * tests where we need to just send a request and get a reply. + * + * @return ContainerCommandRequestProto. + */ + public static ContainerCommandRequestProto getCreateContainerRequest() throws + IOException { + ContainerProtos.CreateContainerRequestProto.Builder createRequest = + ContainerProtos.CreateContainerRequestProto + .newBuilder(); + ContainerProtos.ContainerData.Builder containerData = ContainerProtos + .ContainerData.newBuilder(); + containerData.setName("testContainer"); + createRequest.setPipeline( + ContainerTestHelper.createSingleNodePipeline().getProtobufMessage()); + createRequest.setContainerData(containerData.build()); + + ContainerCommandRequestProto.Builder request = + ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.CreateContainer); + request.setCreateContainer(createRequest); + return request.build(); + } + + /** + * Returns a create container response for test purposes. There are a bunch of + * tests where we need to just send a request and get a reply. + * + * @return ContainerCommandRequestProto. + */ + public static ContainerCommandResponseProto + getCreateContainerResponse(ContainerCommandRequestProto request) throws + IOException { + ContainerProtos.CreateContainerResponseProto.Builder createResponse = + ContainerProtos.CreateContainerResponseProto.newBuilder(); + + ContainerCommandResponseProto.Builder response = + ContainerCommandResponseProto.newBuilder(); + response.setCmdType(ContainerProtos.Type.CreateContainer); + response.setTraceID(request.getTraceID()); + response.setCreateContainer(createResponse.build()); + return response.build(); + } + + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java index 37820eb12af..f546a12e8fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java @@ -19,9 +19,16 @@ package org.apache.hadoop.ozone.container.transport.server; import io.netty.channel.embedded.EmbeddedChannel; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.helpers.Pipeline; import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.transport.client.XceiverClient; import org.junit.Assert; import org.junit.Test; @@ -30,17 +37,53 @@ import java.io.IOException; public class TestContainerServer { @Test - public void testPipeline() { - EmbeddedChannel channel = new EmbeddedChannel(new XceiverServerHandler( - new TestContainerDispatcher())); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .getDefaultInstance(); - channel.writeInbound(request); - Assert.assertTrue(channel.finish()); - ContainerCommandResponseProto response = channel.readOutbound(); - Assert.assertTrue( - ContainerCommandResponseProto.getDefaultInstance().equals(response)); - channel.close(); + public void testPipeline() throws IOException { + EmbeddedChannel channel = null; + try { + channel = new EmbeddedChannel(new XceiverServerHandler( + new TestContainerDispatcher())); + ContainerCommandRequestProto request = + ContainerTestHelper.getCreateContainerRequest(); + channel.writeInbound(request); + Assert.assertTrue(channel.finish()); + ContainerCommandResponseProto response = channel.readOutbound(); + Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + } finally { + if (channel != null) { + channel.close(); + } + } + } + + @Test + public void testClientServer() throws Exception { + XceiverServer server = null; + XceiverClient client = null; + + try { + Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(); + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT, + pipeline.getLeader().getContainerPort()); + + server = new XceiverServer(conf, new TestContainerDispatcher()); + client = new XceiverClient(pipeline, conf); + + server.start(); + client.connect(); + + ContainerCommandRequestProto request = + ContainerTestHelper.getCreateContainerRequest(); + ContainerCommandResponseProto response = client.sendCommand(request); + Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + } finally { + if (client != null) { + client.close(); + } + if (server != null) { + server.stop(); + } + } } private class TestContainerDispatcher implements ContainerDispatcher { @@ -54,7 +97,7 @@ public class TestContainerServer { @Override public ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg) throws IOException { - return ContainerCommandResponseProto.getDefaultInstance(); + return ContainerTestHelper.getCreateContainerResponse(msg); } } }