From f88574acdefae2816236bf6180916be96c6a6874 Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Sun, 5 Feb 2012 01:39:30 +0000 Subject: [PATCH] HDFS-2880. Protobuf chagnes in DatanodeProtocol to add multiple storages. Contributed by Suresh Srinivas. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1240653 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ ...atanodeProtocolClientSideTranslatorPB.java | 42 +++++++++------ ...atanodeProtocolServerSideTranslatorPB.java | 12 +++-- .../src/main/proto/DatanodeProtocol.proto | 54 +++++++++++++++---- 4 files changed, 82 insertions(+), 29 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4a1d09e7e09..b807b7c60da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -45,6 +45,9 @@ Trunk (unreleased changes) HDFS-2697. Move RefreshAuthPolicy, RefreshUserMappings, GetUserMappings protocol to protocol buffers. (jitendra) + HDFS-2880. Protobuf chagnes in DatanodeProtocol to add multiple storages. + (suresh) + IMPROVEMENTS HADOOP-7524 Change RPC to allow multiple protocols including multuple diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 5d41714495b..32a114f79be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -46,6 +46,9 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgra import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto; import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -169,11 +172,16 @@ public class DatanodeProtocolClientSideTranslatorPB implements long capacity, long dfsUsed, long remaining, long blockPoolUsed, int xmitsInProgress, int xceiverCount, int failedVolumes) throws IOException { - HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder() - .setRegistration(PBHelper.convert(registration)).setCapacity(capacity) + StorageReportProto report = StorageReportProto.newBuilder() + .setBlockPoolUsed(blockPoolUsed).setCapacity(capacity) .setDfsUsed(dfsUsed).setRemaining(remaining) - .setBlockPoolUsed(blockPoolUsed).setXmitsInProgress(xmitsInProgress) - .setXceiverCount(xceiverCount).setFailedVolumes(failedVolumes).build(); + .setStorageID(registration.getStorageID()).build(); + + HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder() + .setRegistration(PBHelper.convert(registration)).addReports(report) + .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) + .setFailedVolumes(failedVolumes) + .build(); HeartbeatResponseProto resp; try { resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, req); @@ -192,15 +200,17 @@ public class DatanodeProtocolClientSideTranslatorPB implements @Override public DatanodeCommand blockReport(DatanodeRegistration registration, String poolId, long[] blocks) throws IOException { - BlockReportRequestProto.Builder builder = BlockReportRequestProto - .newBuilder().setRegistration(PBHelper.convert(registration)) - .setBlockPoolId(poolId); + StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto + .newBuilder().setStorageID(registration.getStorageID()); + if (blocks != null) { for (int i = 0; i < blocks.length; i++) { - builder.addBlocks(blocks[i]); + reportBuilder.addBlocks(blocks[i]); } } - BlockReportRequestProto req = builder.build(); + BlockReportRequestProto req = BlockReportRequestProto + .newBuilder().setRegistration(PBHelper.convert(registration)) + .setBlockPoolId(poolId).addReports(reportBuilder.build()).build(); BlockReportResponseProto resp; try { resp = rpcProxy.blockReport(NULL_CONTROLLER, req); @@ -211,19 +221,21 @@ public class DatanodeProtocolClientSideTranslatorPB implements } @Override - public void blockReceivedAndDeleted(DatanodeRegistration registration, + public void blockReceivedAndDeleted(DatanodeRegistration reg, String poolId, ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException { - BlockReceivedAndDeletedRequestProto.Builder builder = - BlockReceivedAndDeletedRequestProto.newBuilder() - .setRegistration(PBHelper.convert(registration)) - .setBlockPoolId(poolId); + StorageReceivedDeletedBlocksProto.Builder builder = + StorageReceivedDeletedBlocksProto.newBuilder() + .setStorageID(reg.getStorageID()); if (receivedAndDeletedBlocks != null) { for (int i = 0; i < receivedAndDeletedBlocks.length; i++) { builder.addBlocks(PBHelper.convert(receivedAndDeletedBlocks[i])); } } - BlockReceivedAndDeletedRequestProto req = builder.build(); + BlockReceivedAndDeletedRequestProto req = + BlockReceivedAndDeletedRequestProto.newBuilder() + .setRegistration(PBHelper.convert(reg)) + .setBlockPoolId(poolId).addBlocks(builder.build()).build(); try { rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, req); } catch (ServiceException se) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index f0526e4bb7a..77b288f6060 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportR import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto; @@ -41,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterData import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto; @@ -98,9 +98,10 @@ public class DatanodeProtocolServerSideTranslatorPB implements HeartbeatRequestProto request) throws ServiceException { DatanodeCommand[] cmds = null; try { + StorageReportProto report = request.getReports(0); cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()), - request.getCapacity(), request.getDfsUsed(), request.getRemaining(), - request.getBlockPoolUsed(), request.getXmitsInProgress(), + report.getCapacity(), report.getDfsUsed(), report.getRemaining(), + report.getBlockPoolUsed(), request.getXmitsInProgress(), request.getXceiverCount(), request.getFailedVolumes()); } catch (IOException e) { throw new ServiceException(e); @@ -121,7 +122,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements public BlockReportResponseProto blockReport(RpcController controller, BlockReportRequestProto request) throws ServiceException { DatanodeCommand cmd = null; - List blockIds = request.getBlocksList(); + List blockIds = request.getReports(0).getBlocksList(); long[] blocks = new long[blockIds.size()]; for (int i = 0; i < blockIds.size(); i++) { blocks[i] = blockIds.get(i); @@ -144,7 +145,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted( RpcController controller, BlockReceivedAndDeletedRequestProto request) throws ServiceException { - List rdbip = request.getBlocksList(); + List rdbip = request.getBlocks(0) + .getBlocksList(); ReceivedDeletedBlockInfo[] info = new ReceivedDeletedBlockInfo[rdbip.size()]; for (int i = 0; i < rdbip.size(); i++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index dff8d5ab1a3..81ca74b8ee7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -35,6 +35,19 @@ message DatanodeRegistrationProto { required ExportedBlockKeysProto keys = 3; // Block keys } +/** + * Represents a storage available on the datanode + */ +message DatanodeStorageProto { + enum StorageState { + NORMAL = 0; + READ_ONLY = 1; + } + + required string storageID = 1; // Unique identifier for the storage + optional StorageState state = 2 [default = NORMAL]; +} + /** * Commands sent from namenode to the datanodes */ @@ -136,6 +149,7 @@ message UpgradeCommandProto { */ message RegisterDatanodeRequestProto { required DatanodeRegistrationProto registration = 1; // Datanode info + repeated DatanodeStorageProto storages = 2; // Storages on the datanode } /** @@ -159,13 +173,19 @@ message RegisterDatanodeResponseProto { */ message HeartbeatRequestProto { required DatanodeRegistrationProto registration = 1; // Datanode info - required uint64 capacity = 2; - required uint64 dfsUsed = 3; - required uint64 remaining = 4; - required uint64 blockPoolUsed = 5; - required uint32 xmitsInProgress = 6; - required uint32 xceiverCount = 7; - required uint32 failedVolumes = 8; + repeated StorageReportProto reports = 2; + optional uint32 xmitsInProgress = 3 [ default = 0 ]; + optional uint32 xceiverCount = 4 [ default = 0 ]; + optional uint32 failedVolumes = 5 [ default = 0 ]; +} + +message StorageReportProto { + required string storageID = 1; + optional bool failed = 2 [ default = false ]; + optional uint64 capacity = 3 [ default = 0 ]; + optional uint64 dfsUsed = 4 [ default = 0 ]; + optional uint64 remaining = 5 [ default = 0 ]; + optional uint64 blockPoolUsed = 6 [ default = 0 ]; } /** @@ -185,7 +205,15 @@ message HeartbeatResponseProto { message BlockReportRequestProto { required DatanodeRegistrationProto registration = 1; required string blockPoolId = 2; - repeated uint64 blocks = 3 [packed=true]; + repeated StorageBlockReportProto reports = 3; +} + +/** + * Report of blocks in a storage + */ +message StorageBlockReportProto { + required string storageID = 1; // Storage ID + repeated uint64 blocks = 2 [packed=true]; } /** @@ -207,6 +235,14 @@ message ReceivedDeletedBlockInfoProto { optional string deleteHint = 2; } +/** + * List of blocks received and deleted for a storage. + */ +message StorageReceivedDeletedBlocksProto { + required string storageID = 1; + repeated ReceivedDeletedBlockInfoProto blocks = 2; +} + /** * registration - datanode registration information * blockPoolID - block pool ID of the reported blocks @@ -215,7 +251,7 @@ message ReceivedDeletedBlockInfoProto { message BlockReceivedAndDeletedRequestProto { required DatanodeRegistrationProto registration = 1; required string blockPoolId = 2; - repeated ReceivedDeletedBlockInfoProto blocks = 3; + repeated StorageReceivedDeletedBlocksProto blocks = 3; } /**