HDFS-9873. Ozone: Add container transport server. Contributed by Anu Engineer.
This commit is contained in:
parent
2fff7d58da
commit
b31a5d67f1
|
@ -39,3 +39,5 @@
|
||||||
|
|
||||||
HDFS-9848. Ozone: Add Ozone Client lib for volume handling.
|
HDFS-9848. Ozone: Add Ozone Client lib for volume handling.
|
||||||
(Anu Engineer via cnauroth)
|
(Anu Engineer via cnauroth)
|
||||||
|
|
||||||
|
HDFS-9873. Ozone: Add container transport server (Anu Engineer via cnauroth)
|
||||||
|
|
|
@ -25,6 +25,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public final class OzoneConfigKeys {
|
public final class OzoneConfigKeys {
|
||||||
|
public static final String DFS_OZONE_CONTAINER_IPC_PORT =
|
||||||
|
"dfs.ozone.container.ipc";
|
||||||
|
public static final int DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT = 50011;
|
||||||
public static final String DFS_STORAGE_LOCAL_ROOT =
|
public static final String DFS_STORAGE_LOCAL_ROOT =
|
||||||
"dfs.ozone.localstorage.root";
|
"dfs.ozone.localstorage.root";
|
||||||
public static final String DFS_STORAGE_LOCAL_ROOT_DEFAULT = "/tmp/ozone";
|
public static final String DFS_STORAGE_LOCAL_ROOT_DEFAULT = "/tmp/ozone";
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.interfaces;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dispatcher acts as the bridge between the transport layer and
|
||||||
|
* the actual container layer. This layer is capable of transforming
|
||||||
|
* protobuf objects into corresponding class and issue the function call
|
||||||
|
* into the lower layers.
|
||||||
|
*
|
||||||
|
* The reply from the request is dispatched to the client.
|
||||||
|
*/
|
||||||
|
public interface ContainerDispatcher {
|
||||||
|
/**
|
||||||
|
* Dispatches commands to container layer.
|
||||||
|
* @param msg - Command Request
|
||||||
|
* @return Command Response
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,92 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.transport.server;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||||
|
import io.netty.handler.logging.LogLevel;
|
||||||
|
import io.netty.handler.logging.LoggingHandler;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a netty server endpoint that acts as the communication layer for
|
||||||
|
* Ozone containers.
|
||||||
|
*/
|
||||||
|
public final class XceiverServer {
|
||||||
|
private final int port;
|
||||||
|
private final ContainerDispatcher storageContainer;
|
||||||
|
|
||||||
|
private EventLoopGroup bossGroup;
|
||||||
|
private EventLoopGroup workerGroup;
|
||||||
|
private Channel channel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a netty server class.
|
||||||
|
*
|
||||||
|
* @param conf - Configuration
|
||||||
|
*/
|
||||||
|
public XceiverServer(OzoneConfiguration conf,
|
||||||
|
ContainerDispatcher dispatcher) {
|
||||||
|
Preconditions.checkNotNull(conf);
|
||||||
|
this.port = conf.getInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
|
||||||
|
OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT);
|
||||||
|
this.storageContainer = dispatcher;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts running the server.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void start() throws Exception {
|
||||||
|
bossGroup = new NioEventLoopGroup();
|
||||||
|
workerGroup = new NioEventLoopGroup();
|
||||||
|
channel = new ServerBootstrap()
|
||||||
|
.group(bossGroup, workerGroup)
|
||||||
|
.channel(NioServerSocketChannel.class)
|
||||||
|
.handler(new LoggingHandler(LogLevel.INFO))
|
||||||
|
.childHandler(new XceiverServerInitializer(storageContainer))
|
||||||
|
.bind(port)
|
||||||
|
.syncUninterruptibly()
|
||||||
|
.channel();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops a running server.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void stop() throws Exception {
|
||||||
|
if (bossGroup != null) {
|
||||||
|
bossGroup.shutdownGracefully();
|
||||||
|
}
|
||||||
|
if (workerGroup != null) {
|
||||||
|
workerGroup.shutdownGracefully();
|
||||||
|
}
|
||||||
|
if (channel != null) {
|
||||||
|
channel.close().awaitUninterruptibly();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,80 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.transport.server;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
|
||||||
|
import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Netty server handlers that respond to Network events.
|
||||||
|
*/
|
||||||
|
public class XceiverServerHandler extends
|
||||||
|
SimpleChannelInboundHandler<ContainerCommandRequestProto> {
|
||||||
|
|
||||||
|
static final Logger LOG = LoggerFactory.getLogger(XceiverServerHandler.class);
|
||||||
|
private final ContainerDispatcher dispatcher;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor for server handler.
|
||||||
|
* @param dispatcher - Dispatcher interface
|
||||||
|
*/
|
||||||
|
public XceiverServerHandler(ContainerDispatcher dispatcher) {
|
||||||
|
this.dispatcher = dispatcher;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <strong>Please keep in mind that this method will be renamed to {@code
|
||||||
|
* messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
|
||||||
|
* <p>
|
||||||
|
* Is called for each message of type {@link ContainerCommandRequestProto}.
|
||||||
|
*
|
||||||
|
* @param ctx the {@link ChannelHandlerContext} which this {@link
|
||||||
|
* SimpleChannelInboundHandler} belongs to
|
||||||
|
* @param msg the message to handle
|
||||||
|
* @throws Exception is thrown if an error occurred
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void channelRead0(ChannelHandlerContext ctx,
|
||||||
|
ContainerCommandRequestProto msg) throws
|
||||||
|
Exception {
|
||||||
|
ContainerCommandResponseProto response = this.dispatcher.dispatch(msg);
|
||||||
|
LOG.debug("Writing the reponse back to client.");
|
||||||
|
ctx.writeAndFlush(response);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)}
|
||||||
|
* Sub-classes may override this method to change behavior.
|
||||||
|
*
|
||||||
|
* @param ctx - Channel Handler Context
|
||||||
|
* @param cause - Exception
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
|
||||||
|
throws Exception {
|
||||||
|
LOG.error("An exception caught in the pipeline : " + cause.toString());
|
||||||
|
super.exceptionCaught(ctx, cause);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,61 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.transport.server;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import io.netty.channel.ChannelInitializer;
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.channel.socket.SocketChannel;
|
||||||
|
import io.netty.handler.codec.protobuf.ProtobufDecoder;
|
||||||
|
import io.netty.handler.codec.protobuf.ProtobufEncoder;
|
||||||
|
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
||||||
|
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
|
||||||
|
import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a channel for the XceiverServer.
|
||||||
|
*/
|
||||||
|
public class XceiverServerInitializer extends ChannelInitializer<SocketChannel>{
|
||||||
|
private final ContainerDispatcher dispatcher;
|
||||||
|
public XceiverServerInitializer(ContainerDispatcher dispatcher) {
|
||||||
|
Preconditions.checkNotNull(dispatcher);
|
||||||
|
this.dispatcher = dispatcher;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method will be called once the Channel is registered. After
|
||||||
|
* the method returns this instance will be removed from the {@link
|
||||||
|
* ChannelPipeline}
|
||||||
|
*
|
||||||
|
* @param ch the which was registered.
|
||||||
|
* @throws Exception is thrown if an error occurs. In that case the channel
|
||||||
|
* will be closed.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) throws Exception {
|
||||||
|
ChannelPipeline pipeline = ch.pipeline();
|
||||||
|
pipeline.addLast(new ProtobufVarint32FrameDecoder());
|
||||||
|
pipeline.addLast(new ProtobufDecoder(ContainerCommandRequestProto
|
||||||
|
.getDefaultInstance()));
|
||||||
|
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
|
||||||
|
pipeline.addLast(new ProtobufEncoder());
|
||||||
|
pipeline.addLast(new XceiverServerHandler(dispatcher));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,60 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.transport.server;
|
||||||
|
|
||||||
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
|
||||||
|
import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class TestContainerServer {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPipeline() {
|
||||||
|
EmbeddedChannel channel = new EmbeddedChannel(new XceiverServerHandler(
|
||||||
|
new TestContainerDispatcher()));
|
||||||
|
ContainerCommandRequestProto request = ContainerCommandRequestProto
|
||||||
|
.getDefaultInstance();
|
||||||
|
channel.writeInbound(request);
|
||||||
|
Assert.assertTrue(channel.finish());
|
||||||
|
ContainerCommandResponseProto response = channel.readOutbound();
|
||||||
|
Assert.assertTrue(
|
||||||
|
ContainerCommandResponseProto.getDefaultInstance().equals(response));
|
||||||
|
channel.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class TestContainerDispatcher implements ContainerDispatcher {
|
||||||
|
/**
|
||||||
|
* Dispatches commands to container layer.
|
||||||
|
*
|
||||||
|
* @param msg - Command Request
|
||||||
|
* @return Command Response
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public ContainerCommandResponseProto
|
||||||
|
dispatch(ContainerCommandRequestProto msg) throws IOException {
|
||||||
|
return ContainerCommandResponseProto.getDefaultInstance();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue