HDDS-16. Remove Pipeline from Datanode Container Protocol protobuf definition. Contributed by Mukul Kumar Singh.

This commit is contained in:
Xiaoyu Yao 2018-05-10 14:49:58 -07:00
parent 48d0b54849
commit 7369f41020
12 changed files with 85 additions and 68 deletions

View File

@ -24,7 +24,7 @@
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.KeyData;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.KeyValue;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.client.BlockID;
import java.io.IOException;
@ -85,7 +85,7 @@ public ChunkOutputStream(BlockID blockID, String key,
KeyValue keyValue = KeyValue.newBuilder()
.setKey("TYPE").setValue("KEY").build();
this.containerKeyData = KeyData.newBuilder()
.setBlockID(blockID.getProtobuf())
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.addMetadata(keyValue);
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.client;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
/**
@ -56,4 +57,15 @@ public static BlockID getFromProtobuf(HddsProtos.BlockID blockID) {
return new BlockID(blockID.getContainerID(),
blockID.getLocalID());
}
public ContainerProtos.DatanodeBlockID getDatanodeBlockIDProtobuf() {
return ContainerProtos.DatanodeBlockID.newBuilder().
setContainerID(containerID).setLocalID(localID).build();
}
public static BlockID getFromProtobuf(ContainerProtos.DatanodeBlockID blockID) {
return new BlockID(blockID.getContainerID(),
blockID.getLocalID());
}
}

View File

@ -50,7 +50,7 @@
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.KeyValue;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.client.BlockID;
import java.io.IOException;
@ -133,7 +133,7 @@ public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient,
ChunkInfo chunk, BlockID blockID, String traceID) throws IOException {
ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
.newBuilder()
.setBlockID(blockID.getProtobuf())
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(chunk);
String id = xceiverClient.getPipeline().getLeader().getUuidString();
ContainerCommandRequestProto request = ContainerCommandRequestProto
@ -163,7 +163,7 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
throws IOException {
WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
.newBuilder()
.setBlockID(blockID.getProtobuf())
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(chunk)
.setData(data);
String id = xceiverClient.getPipeline().getLeader().getUuidString();
@ -195,7 +195,7 @@ public static void writeSmallFile(XceiverClientSpi client,
throws IOException {
KeyData containerKeyData =
KeyData.newBuilder().setBlockID(blockID.getProtobuf())
KeyData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
.build();
PutKeyRequestProto.Builder createKeyRequest =
PutKeyRequestProto.newBuilder()
@ -241,7 +241,6 @@ public static void createContainer(XceiverClientSpi client, long containerID,
ContainerProtos.ContainerData.Builder containerData = ContainerProtos
.ContainerData.newBuilder();
containerData.setContainerID(containerID);
createRequest.setPipeline(client.getPipeline().getProtobufMessage());
createRequest.setContainerData(containerData.build());
String id = client.getPipeline().getLeader().getUuidString();
@ -321,7 +320,6 @@ public static ReadContainerResponseProto readContainer(
ReadContainerRequestProto.Builder readRequest =
ReadContainerRequestProto.newBuilder();
readRequest.setContainerID(containerID);
readRequest.setPipeline(client.getPipeline().getProtobufMessage());
String id = client.getPipeline().getLeader().getUuidString();
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
@ -348,7 +346,7 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
BlockID blockID, String traceID) throws IOException {
KeyData containerKeyData = KeyData
.newBuilder()
.setBlockID(blockID.getProtobuf())
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.build();
GetKeyRequestProto.Builder getKey = GetKeyRequestProto

View File

@ -20,7 +20,6 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import java.io.IOException;
import java.util.Map;
@ -111,8 +110,8 @@ public ContainerProtos.ChunkInfo getProtoBufMessage() {
}
for (Map.Entry<String, String> entry : metadata.entrySet()) {
HddsProtos.KeyValue.Builder keyValBuilder =
HddsProtos.KeyValue.newBuilder();
ContainerProtos.KeyValue.Builder keyValBuilder =
ContainerProtos.KeyValue.newBuilder();
builder.addMetadata(keyValBuilder.setKey(entry.getKey())
.setValue(entry.getValue()).build());
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.container.common.helpers;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.client.BlockID;
import java.io.IOException;
@ -76,11 +75,11 @@ public static KeyData getFromProtoBuf(ContainerProtos.KeyData data) throws
public ContainerProtos.KeyData getProtoBufMessage() {
ContainerProtos.KeyData.Builder builder =
ContainerProtos.KeyData.newBuilder();
builder.setBlockID(this.blockID.getProtobuf());
builder.setBlockID(this.blockID.getDatanodeBlockIDProtobuf());
builder.addAllChunks(this.chunks);
for (Map.Entry<String, String> entry : metadata.entrySet()) {
HddsProtos.KeyValue.Builder keyValBuilder =
HddsProtos.KeyValue.newBuilder();
ContainerProtos.KeyValue.Builder keyValBuilder =
ContainerProtos.KeyValue.newBuilder();
builder.addMetadata(keyValBuilder.setKey(entry.getKey())
.setValue(entry.getValue()).build());
}

View File

@ -27,9 +27,7 @@
option java_package = "org.apache.hadoop.hdds.protocol.proto";
option java_outer_classname = "ContainerProtos";
option java_generate_equals_and_hash = true;
package hadoop.hdds;
import "hdfs.proto";
import "hdds.proto";
package hadoop.hdds.datanode;
/**
* Commands that are used to manipulate the state of containers on a datanode.
@ -134,6 +132,28 @@ enum Result {
CLOSED_CONTAINER_RETRY = 27;
}
/**
* Block ID that uniquely identify a block in Datanode.
*/
message DatanodeBlockID {
required int64 containerID = 1;
required int64 localID = 2;
}
message KeyValue {
required string key = 1;
optional string value = 2;
}
/**
* Lifecycle states of a container in Datanode.
*/
enum ContainerLifeCycleState {
OPEN = 1;
CLOSING = 2;
CLOSED = 3;
}
message ContainerCommandRequestProto {
required Type cmdType = 1; // Type of the command
@ -205,7 +225,7 @@ message ContainerData {
optional int64 bytesUsed = 6;
optional int64 size = 7;
optional int64 keyCount = 8;
optional LifeCycleState state = 9 [default = OPEN];
optional ContainerLifeCycleState state = 9 [default = OPEN];
}
message ContainerMeta {
@ -215,26 +235,23 @@ message ContainerMeta {
// Container Messages.
message CreateContainerRequestProto {
required Pipeline pipeline = 1;
required ContainerData containerData = 2;
required ContainerData containerData = 1;
}
message CreateContainerResponseProto {
}
message ReadContainerRequestProto {
required Pipeline pipeline = 1;
required int64 containerID = 2;
required int64 containerID = 1;
}
message ReadContainerResponseProto {
optional ContainerData containerData = 2;
optional ContainerData containerData = 1;
}
message UpdateContainerRequestProto {
required Pipeline pipeline = 1;
required ContainerData containerData = 2;
optional bool forceUpdate = 3 [default = false];
required ContainerData containerData = 1;
optional bool forceUpdate = 2 [default = false];
}
message UpdateContainerResponseProto {
@ -262,12 +279,12 @@ message CloseContainerRequestProto {
}
message CloseContainerResponseProto {
optional string hash = 2;
optional int64 containerID = 3;
optional string hash = 1;
optional int64 containerID = 2;
}
message KeyData {
required BlockID blockID = 1;
required DatanodeBlockID blockID = 1;
optional int64 flags = 2; // for future use.
repeated KeyValue metadata = 3;
repeated ChunkInfo chunks = 4;
@ -291,7 +308,7 @@ message GetKeyResponseProto {
message DeleteKeyRequestProto {
required BlockID blockID = 1;
required DatanodeBlockID blockID = 1;
}
message DeleteKeyResponseProto {
@ -300,7 +317,7 @@ message DeleteKeyResponseProto {
message ListKeyRequestProto {
required int64 containerID = 1;
optional int64 startLocalID = 2;
required uint32 count = 4;
required uint32 count = 3;
}
@ -325,7 +342,7 @@ enum Stage {
}
message WriteChunkRequestProto {
required BlockID blockID = 1;
required DatanodeBlockID blockID = 1;
required ChunkInfo chunkData = 2;
optional bytes data = 3;
optional Stage stage = 4 [default = COMBINED];
@ -335,26 +352,26 @@ message WriteChunkResponseProto {
}
message ReadChunkRequestProto {
required BlockID blockID = 1;
required DatanodeBlockID blockID = 1;
required ChunkInfo chunkData = 2;
}
message ReadChunkResponseProto {
required BlockID blockID = 1;
required DatanodeBlockID blockID = 1;
required ChunkInfo chunkData = 2;
required bytes data = 3;
}
message DeleteChunkRequestProto {
required BlockID blockID = 1;
required ChunkInfo chunkData = 3;
required DatanodeBlockID blockID = 1;
required ChunkInfo chunkData = 2;
}
message DeleteChunkResponseProto {
}
message ListChunkRequestProto {
required BlockID blockID = 1;
required DatanodeBlockID blockID = 1;
required string prevChunkName = 2;
required uint32 count = 3;
}

View File

@ -22,7 +22,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
.ContainerLifeCycleState;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.util.Time;
@ -48,7 +49,7 @@ public class ContainerData {
private AtomicLong bytesUsed;
private long maxSize;
private long containerID;
private HddsProtos.LifeCycleState state;
private ContainerLifeCycleState state;
/**
* Constructs a ContainerData Object.
@ -63,7 +64,7 @@ public ContainerData(long containerID,
ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT) * OzoneConsts.GB;
this.bytesUsed = new AtomicLong(0L);
this.containerID = containerID;
this.state = HddsProtos.LifeCycleState.OPEN;
this.state = ContainerLifeCycleState.OPEN;
}
/**
@ -133,8 +134,8 @@ public ContainerProtos.ContainerData getProtoBufMessage() {
builder.setState(this.getState());
for (Map.Entry<String, String> entry : metadata.entrySet()) {
HddsProtos.KeyValue.Builder keyValBuilder =
HddsProtos.KeyValue.newBuilder();
ContainerProtos.KeyValue.Builder keyValBuilder =
ContainerProtos.KeyValue.newBuilder();
builder.addMetadata(keyValBuilder.setKey(entry.getKey())
.setValue(entry.getValue()).build());
}
@ -250,11 +251,11 @@ public synchronized long getContainerID() {
return containerID;
}
public synchronized void setState(HddsProtos.LifeCycleState state) {
public synchronized void setState(ContainerLifeCycleState state) {
this.state = state;
}
public synchronized HddsProtos.LifeCycleState getState() {
public synchronized ContainerLifeCycleState getState() {
return this.state;
}
@ -263,7 +264,7 @@ public synchronized HddsProtos.LifeCycleState getState() {
* @return - boolean
*/
public synchronized boolean isOpen() {
return HddsProtos.LifeCycleState.OPEN == state;
return ContainerLifeCycleState.OPEN == state;
}
/**
@ -271,7 +272,7 @@ public synchronized boolean isOpen() {
*/
public synchronized void closeContainer() {
// TODO: closed or closing here
setState(HddsProtos.LifeCycleState.CLOSED);
setState(ContainerLifeCycleState.CLOSED);
// Some thing brain dead for now. name + Time stamp of when we get the close
// container message.

View File

@ -21,7 +21,6 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
@ -393,10 +392,6 @@ private ContainerCommandResponseProto handleCreateContainer(
msg.getCreateContainer().getContainerData(), conf);
Preconditions.checkNotNull(cData, "Container data is null");
Pipeline pipeline = Pipeline.getFromProtoBuf(
msg.getCreateContainer().getPipeline());
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
this.containerManager.createContainer(cData);
return ContainerUtils.getContainerResponse(msg);
}

View File

@ -27,7 +27,8 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ContainerData;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
.ContainerLifeCycleState;
import java.io.IOException;
import java.util.stream.Collectors;
@ -77,7 +78,7 @@ public void execute(CommandLine cmd) throws IOException {
// Print container report info.
logOut("Container id: %s", containerID);
String openStatus =
containerData.getState() == HddsProtos.LifeCycleState.OPEN ? "OPEN" :
containerData.getState() == ContainerLifeCycleState.OPEN ? "OPEN" :
"CLOSED";
logOut("Container State: %s", openStatus);
if (!containerData.getHash().isEmpty()) {

View File

@ -38,7 +38,7 @@ final class OzoneContainerTranslation {
public static KeyData containerKeyDataForRead(BlockID blockID) {
return KeyData
.newBuilder()
.setBlockID(blockID.getProtobuf())
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.build();
}

View File

@ -28,11 +28,10 @@
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.KeyValue;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -204,7 +203,7 @@ public static ContainerCommandRequestProto getWriteChunkRequest(
Pipeline newPipeline =
new Pipeline(pipeline.getPipelineChannel());
writeRequest.setBlockID(blockID.getProtobuf());
writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());
byte[] data = getData(datalen);
ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, datalen);
@ -361,7 +360,6 @@ public static ContainerCommandRequestProto getCreateContainerRequest(
.ContainerData.newBuilder();
containerData.setContainerID(containerID);
createRequest.setContainerData(containerData.build());
createRequest.setPipeline(pipeline.getProtobufMessage());
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
@ -399,7 +397,6 @@ public static ContainerCommandRequestProto getUpdateContainerRequest(
}
Pipeline pipeline =
ContainerTestHelper.createSingleNodePipeline();
updateRequestBuilder.setPipeline(pipeline.getProtobufMessage());
updateRequestBuilder.setContainerData(containerData.build());
ContainerCommandRequestProto.Builder request =
@ -469,7 +466,8 @@ public static ContainerCommandRequestProto getPutKeyRequest(
*/
public static ContainerCommandRequestProto getKeyRequest(
Pipeline pipeline, ContainerProtos.PutKeyRequestProto putKeyRequest) {
HddsProtos.BlockID blockID = putKeyRequest.getKeyData().getBlockID();
ContainerProtos.DatanodeBlockID blockID =
putKeyRequest.getKeyData().getBlockID();
LOG.trace("getKey: blockID={}", blockID);
ContainerProtos.GetKeyRequestProto.Builder getRequest =

View File

@ -32,7 +32,6 @@
import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel;
import org.apache.hadoop.util.Time;
import org.openjdk.jmh.annotations.Benchmark;
@ -168,8 +167,6 @@ public void cleanup() throws IOException {
private ContainerCommandRequestProto getCreateContainerCommand(long containerID) {
CreateContainerRequestProto.Builder createRequest =
CreateContainerRequestProto.newBuilder();
createRequest.setPipeline(
new Pipeline(pipelineChannel).getProtobufMessage());
createRequest.setContainerData(
ContainerData.newBuilder().setContainerID(
containerID).build());
@ -187,7 +184,7 @@ private ContainerCommandRequestProto getWriteChunkCommand(
BlockID blockID, String chunkName) {
WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
.newBuilder()
.setBlockID(blockID.getProtobuf())
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(getChunkInfo(blockID, chunkName))
.setData(data);
@ -204,7 +201,7 @@ private ContainerCommandRequestProto getReadChunkCommand(
BlockID blockID, String chunkName) {
ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
.newBuilder()
.setBlockID(blockID.getProtobuf())
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(getChunkInfo(blockID, chunkName));
ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto
.newBuilder();
@ -258,7 +255,7 @@ private ContainerProtos.KeyData getKeyData(
BlockID blockID, String chunkKey) {
ContainerProtos.KeyData.Builder builder = ContainerProtos.KeyData
.newBuilder()
.setBlockID(blockID.getProtobuf())
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.addChunks(getChunkInfo(blockID, chunkKey));
return builder.build();
}