From b31a5d67f11842eefd31d929dc8b346796f4c0d6 Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Mon, 7 Mar 2016 21:47:28 -0800 Subject: [PATCH] HDFS-9873. Ozone: Add container transport server. Contributed by Anu Engineer. --- .../hadoop-hdfs/CHANGES-HDFS-7240.txt | 2 + .../apache/hadoop/ozone/OzoneConfigKeys.java | 3 + .../interfaces/ContainerDispatcher.java | 44 +++++++++ .../transport/server/XceiverServer.java | 92 +++++++++++++++++++ .../server/XceiverServerHandler.java | 80 ++++++++++++++++ .../server/XceiverServerInitializer.java | 61 ++++++++++++ .../transport/server/TestContainerServer.java | 60 ++++++++++++ 7 files changed, 342 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServer.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerHandler.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerInitializer.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt index 95e365854ee..30f28d2de0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt @@ -39,3 +39,5 @@ HDFS-9848. Ozone: Add Ozone Client lib for volume handling. (Anu Engineer via cnauroth) + + HDFS-9873. Ozone: Add container transport server (Anu Engineer via cnauroth) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 7d9cdd5f67d..ba24866e606 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -25,6 +25,9 @@ import org.apache.hadoop.classification.InterfaceAudience; */ @InterfaceAudience.Private public final class OzoneConfigKeys { + public static final String DFS_OZONE_CONTAINER_IPC_PORT = + "dfs.ozone.container.ipc"; + public static final int DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT = 50011; public static final String DFS_STORAGE_LOCAL_ROOT = "dfs.ozone.localstorage.root"; public static final String DFS_STORAGE_LOCAL_ROOT_DEFAULT = "/tmp/ozone"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java new file mode 100644 index 00000000000..f587b2adf14 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.interfaces; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; + +import java.io.IOException; + +/** + * Dispatcher acts as the bridge between the transport layer and + * the actual container layer. This layer is capable of transforming + * protobuf objects into corresponding class and issue the function call + * into the lower layers. + * + * The reply from the request is dispatched to the client. + */ +public interface ContainerDispatcher { + /** + * Dispatches commands to container layer. + * @param msg - Command Request + * @return Command Response + * @throws IOException + */ + ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg) + throws IOException; + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServer.java new file mode 100644 index 00000000000..66ffa93f612 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServer.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.transport.server; + +import com.google.common.base.Preconditions; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher; + +/** + * Creates a netty server endpoint that acts as the communication layer for + * Ozone containers. + */ +public final class XceiverServer { + private final int port; + private final ContainerDispatcher storageContainer; + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private Channel channel; + + /** + * Constructs a netty server class. + * + * @param conf - Configuration + */ + public XceiverServer(OzoneConfiguration conf, + ContainerDispatcher dispatcher) { + Preconditions.checkNotNull(conf); + this.port = conf.getInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT, + OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT); + this.storageContainer = dispatcher; + } + + /** + * Starts running the server. + * + * @throws Exception + */ + public void start() throws Exception { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + channel = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new XceiverServerInitializer(storageContainer)) + .bind(port) + .syncUninterruptibly() + .channel(); + } + + /** + * Stops a running server. + * + * @throws Exception + */ + public void stop() throws Exception { + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + if (channel != null) { + channel.close().awaitUninterruptibly(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerHandler.java new file mode 100644 index 00000000000..887ad62765c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerHandler.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.transport.server; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher; + +/** + * Netty server handlers that respond to Network events. + */ +public class XceiverServerHandler extends + SimpleChannelInboundHandler { + + static final Logger LOG = LoggerFactory.getLogger(XceiverServerHandler.class); + private final ContainerDispatcher dispatcher; + + /** + * Constructor for server handler. + * @param dispatcher - Dispatcher interface + */ + public XceiverServerHandler(ContainerDispatcher dispatcher) { + this.dispatcher = dispatcher; + } + + /** + * Please keep in mind that this method will be renamed to {@code + * messageReceived(ChannelHandlerContext, I)} in 5.0. + *

+ * Is called for each message of type {@link ContainerCommandRequestProto}. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link + * SimpleChannelInboundHandler} belongs to + * @param msg the message to handle + * @throws Exception is thrown if an error occurred + */ + @Override + public void channelRead0(ChannelHandlerContext ctx, + ContainerCommandRequestProto msg) throws + Exception { + ContainerCommandResponseProto response = this.dispatcher.dispatch(msg); + LOG.debug("Writing the reponse back to client."); + ctx.writeAndFlush(response); + + } + + /** + * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} + * Sub-classes may override this method to change behavior. + * + * @param ctx - Channel Handler Context + * @param cause - Exception + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + LOG.error("An exception caught in the pipeline : " + cause.toString()); + super.exceptionCaught(ctx, cause); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerInitializer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerInitializer.java new file mode 100644 index 00000000000..0e578553949 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerInitializer.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.transport.server; + +import com.google.common.base.Preconditions; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufEncoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; + +/** + * Creates a channel for the XceiverServer. + */ +public class XceiverServerInitializer extends ChannelInitializer{ + private final ContainerDispatcher dispatcher; + public XceiverServerInitializer(ContainerDispatcher dispatcher) { + Preconditions.checkNotNull(dispatcher); + this.dispatcher = dispatcher; + } + + /** + * This method will be called once the Channel is registered. After + * the method returns this instance will be removed from the {@link + * ChannelPipeline} + * + * @param ch the which was registered. + * @throws Exception is thrown if an error occurs. In that case the channel + * will be closed. + */ + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new ProtobufVarint32FrameDecoder()); + pipeline.addLast(new ProtobufDecoder(ContainerCommandRequestProto + .getDefaultInstance())); + pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); + pipeline.addLast(new ProtobufEncoder()); + pipeline.addLast(new XceiverServerHandler(dispatcher)); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java new file mode 100644 index 00000000000..37820eb12af --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.transport.server; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class TestContainerServer { + + @Test + public void testPipeline() { + EmbeddedChannel channel = new EmbeddedChannel(new XceiverServerHandler( + new TestContainerDispatcher())); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .getDefaultInstance(); + channel.writeInbound(request); + Assert.assertTrue(channel.finish()); + ContainerCommandResponseProto response = channel.readOutbound(); + Assert.assertTrue( + ContainerCommandResponseProto.getDefaultInstance().equals(response)); + channel.close(); + } + + private class TestContainerDispatcher implements ContainerDispatcher { + /** + * Dispatches commands to container layer. + * + * @param msg - Command Request + * @return Command Response + * @throws IOException + */ + @Override + public ContainerCommandResponseProto + dispatch(ContainerCommandRequestProto msg) throws IOException { + return ContainerCommandResponseProto.getDefaultInstance(); + } + } +}