HDFS-9891. Ozone: Add container transport client. Contributed by Anu Engineer.
This commit is contained in:
parent
b31a5d67f1
commit
979dfe4c2e
|
@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class represents the primary identifier for a Datanode.
|
* This class represents the primary identifier for a Datanode.
|
||||||
|
@ -49,6 +50,7 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||||
private int infoSecurePort; // info server port
|
private int infoSecurePort; // info server port
|
||||||
private int ipcPort; // IPC server port
|
private int ipcPort; // IPC server port
|
||||||
private String xferAddr;
|
private String xferAddr;
|
||||||
|
private int containerPort; // container server port.
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* UUID identifying a given datanode. For upgraded Datanodes this is the
|
* UUID identifying a given datanode. For upgraded Datanodes this is the
|
||||||
|
@ -274,4 +276,55 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||||
public int compareTo(DatanodeID that) {
|
public int compareTo(DatanodeID that) {
|
||||||
return getXferAddr().compareTo(that.getXferAddr());
|
return getXferAddr().compareTo(that.getXferAddr());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the container port.
|
||||||
|
* @return Port
|
||||||
|
*/
|
||||||
|
public int getContainerPort() {
|
||||||
|
return containerPort;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the container port.
|
||||||
|
* @param containerPort - container port.
|
||||||
|
*/
|
||||||
|
public void setContainerPort(int containerPort) {
|
||||||
|
this.containerPort = containerPort;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a DataNode ID from the protocol buffers.
|
||||||
|
*
|
||||||
|
* @param datanodeIDProto - protoBuf Message
|
||||||
|
* @return DataNodeID
|
||||||
|
*/
|
||||||
|
public static DatanodeID getFromProtoBuf(HdfsProtos.DatanodeIDProto
|
||||||
|
datanodeIDProto) {
|
||||||
|
DatanodeID id = new DatanodeID(datanodeIDProto.getDatanodeUuid(),
|
||||||
|
datanodeIDProto.getIpAddr(), datanodeIDProto.getHostName(),
|
||||||
|
datanodeIDProto.getXferPort(), datanodeIDProto.getInfoPort(),
|
||||||
|
datanodeIDProto.getInfoSecurePort(), datanodeIDProto.getIpcPort());
|
||||||
|
id.setContainerPort(datanodeIDProto.getContainerPort());
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a DataNodeID protobuf message from a datanode ID.
|
||||||
|
* @return HdfsProtos.DatanodeIDProto
|
||||||
|
*/
|
||||||
|
public HdfsProtos.DatanodeIDProto getProtoBufMessage() {
|
||||||
|
HdfsProtos.DatanodeIDProto.Builder builder =
|
||||||
|
HdfsProtos.DatanodeIDProto.newBuilder();
|
||||||
|
|
||||||
|
return builder.setDatanodeUuid(this.getDatanodeUuid())
|
||||||
|
.setIpAddr(this.getIpcAddr())
|
||||||
|
.setHostName(this.getHostName())
|
||||||
|
.setXferPort(this.getXferPort())
|
||||||
|
.setInfoPort(this.getInfoPort())
|
||||||
|
.setInfoSecurePort(this.getInfoSecurePort())
|
||||||
|
.setIpcPort(this.getIpcPort())
|
||||||
|
.setContainerPort(this.getContainerPort())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,6 +58,7 @@ message DatanodeIDProto {
|
||||||
required uint32 infoPort = 5; // datanode http port
|
required uint32 infoPort = 5; // datanode http port
|
||||||
required uint32 ipcPort = 6; // ipc server port
|
required uint32 ipcPort = 6; // ipc server port
|
||||||
optional uint32 infoSecurePort = 7 [default = 0]; // datanode https port
|
optional uint32 infoSecurePort = 7 [default = 0]; // datanode https port
|
||||||
|
optional uint32 containerPort = 8 [default = 0]; // Ozone container protocol
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -41,3 +41,5 @@
|
||||||
(Anu Engineer via cnauroth)
|
(Anu Engineer via cnauroth)
|
||||||
|
|
||||||
HDFS-9873. Ozone: Add container transport server (Anu Engineer via cnauroth)
|
HDFS-9873. Ozone: Add container transport server (Anu Engineer via cnauroth)
|
||||||
|
|
||||||
|
HDFS-9891. Ozone: Add container transport client (Anu Engineer via cnauroth)
|
||||||
|
|
|
@ -0,0 +1,132 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.helpers;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A pipeline represents the group of machines over which a container lives.
|
||||||
|
*/
|
||||||
|
public class Pipeline {
|
||||||
|
private String containerName;
|
||||||
|
private String leaderID;
|
||||||
|
private Map<String, DatanodeID> datanodes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a new pipeline data structure.
|
||||||
|
*
|
||||||
|
* @param leaderID - First machine in this pipeline.
|
||||||
|
*/
|
||||||
|
public Pipeline(String leaderID) {
|
||||||
|
this.leaderID = leaderID;
|
||||||
|
datanodes = new TreeMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets pipeline object from protobuf.
|
||||||
|
*
|
||||||
|
* @param pipeline - ProtoBuf definition for the pipeline.
|
||||||
|
* @return Pipeline Object
|
||||||
|
*/
|
||||||
|
public static Pipeline getFromProtoBuf(ContainerProtos.Pipeline pipeline) {
|
||||||
|
Preconditions.checkNotNull(pipeline);
|
||||||
|
Pipeline newPipeline = new Pipeline(pipeline.getLeaderID());
|
||||||
|
for (HdfsProtos.DatanodeIDProto dataID : pipeline.getMembersList()) {
|
||||||
|
newPipeline.addMember(DatanodeID.getFromProtoBuf(dataID));
|
||||||
|
}
|
||||||
|
if (pipeline.hasContainerName()) {
|
||||||
|
newPipeline.containerName = newPipeline.getContainerName();
|
||||||
|
}
|
||||||
|
return newPipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Adds a member to pipeline */
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a member to the pipeline.
|
||||||
|
*
|
||||||
|
* @param dataNodeId - Datanode to be added.
|
||||||
|
*/
|
||||||
|
public void addMember(DatanodeID dataNodeId) {
|
||||||
|
datanodes.put(dataNodeId.getDatanodeUuid(), dataNodeId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the first machine in the set of datanodes.
|
||||||
|
*
|
||||||
|
* @return First Machine.
|
||||||
|
*/
|
||||||
|
public DatanodeID getLeader() {
|
||||||
|
return datanodes.get(leaderID);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns all machines that make up this pipeline.
|
||||||
|
*
|
||||||
|
* @return List of Machines.
|
||||||
|
*/
|
||||||
|
public List<DatanodeID> getMachines() {
|
||||||
|
return new ArrayList<>(datanodes.values());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a Protobuf Pipeline message from pipeline.
|
||||||
|
*
|
||||||
|
* @return Protobuf message
|
||||||
|
*/
|
||||||
|
public ContainerProtos.Pipeline getProtobufMessage() {
|
||||||
|
ContainerProtos.Pipeline.Builder builder =
|
||||||
|
ContainerProtos.Pipeline.newBuilder();
|
||||||
|
for (DatanodeID datanode : datanodes.values()) {
|
||||||
|
builder.addMembers(datanode.getProtoBufMessage());
|
||||||
|
}
|
||||||
|
builder.setLeaderID(leaderID);
|
||||||
|
if (this.containerName != null) {
|
||||||
|
builder.setContainerName(this.containerName);
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns containerName if available.
|
||||||
|
*
|
||||||
|
* @return String.
|
||||||
|
*/
|
||||||
|
public String getContainerName() {
|
||||||
|
return containerName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the container Name.
|
||||||
|
*
|
||||||
|
* @param containerName - Name of the container.
|
||||||
|
*/
|
||||||
|
public void setContainerName(String containerName) {
|
||||||
|
this.containerName = containerName;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,122 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.transport.client;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import io.netty.bootstrap.Bootstrap;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
|
import io.netty.handler.logging.LogLevel;
|
||||||
|
import io.netty.handler.logging.LoggingHandler;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.ozone.container.helpers.Pipeline;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Client for the storageContainer protocol.
|
||||||
|
*/
|
||||||
|
public class XceiverClient {
|
||||||
|
static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
|
||||||
|
private final Pipeline pipeline;
|
||||||
|
private final OzoneConfiguration config;
|
||||||
|
private ChannelFuture channelFuture;
|
||||||
|
private Bootstrap b;
|
||||||
|
private EventLoopGroup group;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a client that can communicate with the Container framework on
|
||||||
|
* data nodes.
|
||||||
|
* @param pipeline - Pipeline that defines the machines.
|
||||||
|
* @param config -- Ozone Config
|
||||||
|
*/
|
||||||
|
public XceiverClient(Pipeline pipeline, OzoneConfiguration config) {
|
||||||
|
Preconditions.checkNotNull(pipeline);
|
||||||
|
Preconditions.checkNotNull(config);
|
||||||
|
this.pipeline = pipeline;
|
||||||
|
this.config = config;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connects to the leader in the pipeline.
|
||||||
|
*/
|
||||||
|
public void connect() throws Exception {
|
||||||
|
if (channelFuture != null
|
||||||
|
&& channelFuture.channel() != null
|
||||||
|
&& channelFuture.channel().isActive()) {
|
||||||
|
throw new IOException("This client is already connected to a host.");
|
||||||
|
}
|
||||||
|
|
||||||
|
group = new NioEventLoopGroup();
|
||||||
|
b = new Bootstrap();
|
||||||
|
b.group(group)
|
||||||
|
.channel(NioSocketChannel.class)
|
||||||
|
.handler(new LoggingHandler(LogLevel.INFO))
|
||||||
|
.handler(new XceiverClientInitializer(this.pipeline));
|
||||||
|
DatanodeID leader = this.pipeline.getLeader();
|
||||||
|
|
||||||
|
// read port from the data node, on failure use default configured
|
||||||
|
// port.
|
||||||
|
int port = leader.getContainerPort();
|
||||||
|
if (port == 0) {
|
||||||
|
port = config.getInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
|
||||||
|
OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT);
|
||||||
|
}
|
||||||
|
LOG.debug("Connecting to server Port : " + port);
|
||||||
|
channelFuture = b.connect(leader.getHostName(), port).sync();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the client.
|
||||||
|
*/
|
||||||
|
public void close() {
|
||||||
|
if(group != null) {
|
||||||
|
group.shutdownGracefully();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (channelFuture != null) {
|
||||||
|
channelFuture.channel().close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends a given command to server and gets the reply back.
|
||||||
|
* @param request Request
|
||||||
|
* @return Response to the command
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public ContainerProtos.ContainerCommandResponseProto sendCommand(
|
||||||
|
ContainerProtos.ContainerCommandRequestProto request)
|
||||||
|
throws IOException {
|
||||||
|
if((channelFuture == null) || (!channelFuture.channel().isActive())) {
|
||||||
|
throw new IOException("This channel is not connected.");
|
||||||
|
}
|
||||||
|
XceiverClientHandler handler =
|
||||||
|
channelFuture.channel().pipeline().get(XceiverClientHandler.class);
|
||||||
|
|
||||||
|
return handler.sendCommand(request);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,112 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.container.transport.client;
|
||||||
|
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
|
import org.apache.hadoop.ozone.container.helpers.Pipeline;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Netty client handler.
|
||||||
|
*/
|
||||||
|
public class XceiverClientHandler extends
|
||||||
|
SimpleChannelInboundHandler<ContainerProtos.ContainerCommandResponseProto> {
|
||||||
|
|
||||||
|
static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
|
||||||
|
private final BlockingQueue<ContainerProtos.ContainerCommandResponseProto>
|
||||||
|
responses = new LinkedBlockingQueue<>();
|
||||||
|
private final Pipeline pipeline;
|
||||||
|
private volatile Channel channel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a client that can communicate to a container server.
|
||||||
|
*/
|
||||||
|
public XceiverClientHandler(Pipeline pipeline) {
|
||||||
|
super(false);
|
||||||
|
this.pipeline = pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <strong>Please keep in mind that this method will be renamed to {@code
|
||||||
|
* messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
|
||||||
|
* <p>
|
||||||
|
* Is called for each message of type {@link ContainerProtos
|
||||||
|
* .ContainerCommandResponseProto}.
|
||||||
|
*
|
||||||
|
* @param ctx the {@link ChannelHandlerContext} which this {@link
|
||||||
|
* SimpleChannelInboundHandler} belongs to
|
||||||
|
* @param msg the message to handle
|
||||||
|
* @throws Exception is thrown if an error occurred
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void channelRead0(ChannelHandlerContext ctx,
|
||||||
|
ContainerProtos.ContainerCommandResponseProto msg)
|
||||||
|
throws Exception {
|
||||||
|
responses.add(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRegistered(ChannelHandlerContext ctx) {
|
||||||
|
LOG.debug("channelRegistered: Connected to ctx");
|
||||||
|
channel = ctx.channel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||||
|
LOG.info("Exception in client " + cause.toString());
|
||||||
|
ctx.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Since netty is async, we send a work request and then wait until a response
|
||||||
|
* appears in the reply queue. This is simple sync interface for clients. we
|
||||||
|
* should consider building async interfaces for client if this turns out to
|
||||||
|
* be a performance bottleneck.
|
||||||
|
*
|
||||||
|
* @param request - request.
|
||||||
|
* @return -- response
|
||||||
|
*/
|
||||||
|
public ContainerProtos.ContainerCommandResponseProto
|
||||||
|
sendCommand(ContainerProtos.ContainerCommandRequestProto request) {
|
||||||
|
|
||||||
|
ContainerProtos.ContainerCommandResponseProto response;
|
||||||
|
channel.writeAndFlush(request);
|
||||||
|
boolean interrupted = false;
|
||||||
|
for (; ; ) {
|
||||||
|
try {
|
||||||
|
response = responses.take();
|
||||||
|
break;
|
||||||
|
} catch (InterruptedException ignore) {
|
||||||
|
interrupted = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (interrupted) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,68 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.container.transport.client;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelInitializer;
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.channel.socket.SocketChannel;
|
||||||
|
import io.netty.handler.codec.protobuf.ProtobufDecoder;
|
||||||
|
import io.netty.handler.codec.protobuf.ProtobufEncoder;
|
||||||
|
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
||||||
|
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
|
||||||
|
import org.apache.hadoop.ozone.container.helpers.Pipeline;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setup the netty pipeline.
|
||||||
|
*/
|
||||||
|
public class XceiverClientInitializer extends
|
||||||
|
ChannelInitializer<SocketChannel> {
|
||||||
|
private final Pipeline pipeline;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs an Initializer for the client pipeline.
|
||||||
|
* @param pipeline - Pipeline.
|
||||||
|
*/
|
||||||
|
public XceiverClientInitializer(Pipeline pipeline) {
|
||||||
|
this.pipeline = pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method will be called once when the Channel is registered. After
|
||||||
|
* the method returns this instance will be removed from the
|
||||||
|
* ChannelPipeline of the Channel.
|
||||||
|
*
|
||||||
|
* @param ch Channel which was registered.
|
||||||
|
* @throws Exception is thrown if an error occurs. In that case the
|
||||||
|
* Channel will be closed.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) throws Exception {
|
||||||
|
ChannelPipeline p = ch.pipeline();
|
||||||
|
|
||||||
|
p.addLast(new ProtobufVarint32FrameDecoder());
|
||||||
|
p.addLast(new ProtobufDecoder(ContainerProtos
|
||||||
|
.ContainerCommandResponseProto.getDefaultInstance()));
|
||||||
|
|
||||||
|
p.addLast(new ProtobufVarint32LengthFieldPrepender());
|
||||||
|
p.addLast(new ProtobufEncoder());
|
||||||
|
|
||||||
|
p.addLast(new XceiverClientHandler(this.pipeline));
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,103 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||||
|
.ContainerCommandRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||||
|
.ContainerCommandResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.ozone.container.helpers.Pipeline;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.ServerSocket;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helpers for container tests.
|
||||||
|
*/
|
||||||
|
public class ContainerTestHelper {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a pipeline with single node replica.
|
||||||
|
*
|
||||||
|
* @return Pipeline with single node in it.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static Pipeline createSingleNodePipeline() throws IOException {
|
||||||
|
ServerSocket socket = new ServerSocket(0);
|
||||||
|
int port = socket.getLocalPort();
|
||||||
|
DatanodeID datanodeID = new DatanodeID(socket.getInetAddress()
|
||||||
|
.getHostAddress(), socket.getInetAddress().getHostName(),
|
||||||
|
UUID.randomUUID().toString(), port, port, port, port);
|
||||||
|
datanodeID.setContainerPort(port);
|
||||||
|
Pipeline pipeline = new Pipeline(datanodeID.getDatanodeUuid());
|
||||||
|
pipeline.addMember(datanodeID);
|
||||||
|
socket.close();
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a create container command for test purposes. There are a bunch of
|
||||||
|
* tests where we need to just send a request and get a reply.
|
||||||
|
*
|
||||||
|
* @return ContainerCommandRequestProto.
|
||||||
|
*/
|
||||||
|
public static ContainerCommandRequestProto getCreateContainerRequest() throws
|
||||||
|
IOException {
|
||||||
|
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
|
||||||
|
ContainerProtos.CreateContainerRequestProto
|
||||||
|
.newBuilder();
|
||||||
|
ContainerProtos.ContainerData.Builder containerData = ContainerProtos
|
||||||
|
.ContainerData.newBuilder();
|
||||||
|
containerData.setName("testContainer");
|
||||||
|
createRequest.setPipeline(
|
||||||
|
ContainerTestHelper.createSingleNodePipeline().getProtobufMessage());
|
||||||
|
createRequest.setContainerData(containerData.build());
|
||||||
|
|
||||||
|
ContainerCommandRequestProto.Builder request =
|
||||||
|
ContainerCommandRequestProto.newBuilder();
|
||||||
|
request.setCmdType(ContainerProtos.Type.CreateContainer);
|
||||||
|
request.setCreateContainer(createRequest);
|
||||||
|
return request.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a create container response for test purposes. There are a bunch of
|
||||||
|
* tests where we need to just send a request and get a reply.
|
||||||
|
*
|
||||||
|
* @return ContainerCommandRequestProto.
|
||||||
|
*/
|
||||||
|
public static ContainerCommandResponseProto
|
||||||
|
getCreateContainerResponse(ContainerCommandRequestProto request) throws
|
||||||
|
IOException {
|
||||||
|
ContainerProtos.CreateContainerResponseProto.Builder createResponse =
|
||||||
|
ContainerProtos.CreateContainerResponseProto.newBuilder();
|
||||||
|
|
||||||
|
ContainerCommandResponseProto.Builder response =
|
||||||
|
ContainerCommandResponseProto.newBuilder();
|
||||||
|
response.setCmdType(ContainerProtos.Type.CreateContainer);
|
||||||
|
response.setTraceID(request.getTraceID());
|
||||||
|
response.setCreateContainer(createResponse.build());
|
||||||
|
return response.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -19,9 +19,16 @@
|
||||||
package org.apache.hadoop.ozone.container.transport.server;
|
package org.apache.hadoop.ozone.container.transport.server;
|
||||||
|
|
||||||
import io.netty.channel.embedded.EmbeddedChannel;
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
|
.ContainerCommandRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||||
|
.ContainerCommandResponseProto;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
||||||
|
import org.apache.hadoop.ozone.container.helpers.Pipeline;
|
||||||
import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
|
import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
|
||||||
|
import org.apache.hadoop.ozone.container.transport.client.XceiverClient;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -30,17 +37,53 @@ import java.io.IOException;
|
||||||
public class TestContainerServer {
|
public class TestContainerServer {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPipeline() {
|
public void testPipeline() throws IOException {
|
||||||
EmbeddedChannel channel = new EmbeddedChannel(new XceiverServerHandler(
|
EmbeddedChannel channel = null;
|
||||||
new TestContainerDispatcher()));
|
try {
|
||||||
ContainerCommandRequestProto request = ContainerCommandRequestProto
|
channel = new EmbeddedChannel(new XceiverServerHandler(
|
||||||
.getDefaultInstance();
|
new TestContainerDispatcher()));
|
||||||
channel.writeInbound(request);
|
ContainerCommandRequestProto request =
|
||||||
Assert.assertTrue(channel.finish());
|
ContainerTestHelper.getCreateContainerRequest();
|
||||||
ContainerCommandResponseProto response = channel.readOutbound();
|
channel.writeInbound(request);
|
||||||
Assert.assertTrue(
|
Assert.assertTrue(channel.finish());
|
||||||
ContainerCommandResponseProto.getDefaultInstance().equals(response));
|
ContainerCommandResponseProto response = channel.readOutbound();
|
||||||
channel.close();
|
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||||
|
} finally {
|
||||||
|
if (channel != null) {
|
||||||
|
channel.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientServer() throws Exception {
|
||||||
|
XceiverServer server = null;
|
||||||
|
XceiverClient client = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
|
||||||
|
OzoneConfiguration conf = new OzoneConfiguration();
|
||||||
|
conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
|
||||||
|
pipeline.getLeader().getContainerPort());
|
||||||
|
|
||||||
|
server = new XceiverServer(conf, new TestContainerDispatcher());
|
||||||
|
client = new XceiverClient(pipeline, conf);
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
client.connect();
|
||||||
|
|
||||||
|
ContainerCommandRequestProto request =
|
||||||
|
ContainerTestHelper.getCreateContainerRequest();
|
||||||
|
ContainerCommandResponseProto response = client.sendCommand(request);
|
||||||
|
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||||
|
} finally {
|
||||||
|
if (client != null) {
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
if (server != null) {
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class TestContainerDispatcher implements ContainerDispatcher {
|
private class TestContainerDispatcher implements ContainerDispatcher {
|
||||||
|
@ -54,7 +97,7 @@ public class TestContainerServer {
|
||||||
@Override
|
@Override
|
||||||
public ContainerCommandResponseProto
|
public ContainerCommandResponseProto
|
||||||
dispatch(ContainerCommandRequestProto msg) throws IOException {
|
dispatch(ContainerCommandRequestProto msg) throws IOException {
|
||||||
return ContainerCommandResponseProto.getDefaultInstance();
|
return ContainerTestHelper.getCreateContainerResponse(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue