diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 66c59fae599..6958dcc45be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -45,9 +45,12 @@ Trunk (unreleased changes) HDFS-2697. Move RefreshAuthPolicy, RefreshUserMappings, GetUserMappings protocol to protocol buffers. (jitendra) - HDFS-2880. Protobuf chagnes in DatanodeProtocol to add multiple storages. + HDFS-2880. Protobuf changes in DatanodeProtocol to add multiple storages. (suresh) + HDFS-2899. Service protocol changes in DatanodeProtocol to add multiple + storages. (suresh) + IMPROVEMENTS HADOOP-7524 Change RPC to allow multiple protocols including multuple diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 264f624e216..7b604564589 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -48,15 +48,18 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterData import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto; import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -153,13 +156,17 @@ public class DatanodeProtocolClientSideTranslatorPB implements } @Override - public DatanodeRegistration registerDatanode(DatanodeRegistration registration) - throws IOException { - RegisterDatanodeRequestProto req = RegisterDatanodeRequestProto - .newBuilder().setRegistration(PBHelper.convert(registration)).build(); + public DatanodeRegistration registerDatanode(DatanodeRegistration registration, + DatanodeStorage[] storages) throws IOException { + RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto + .newBuilder().setRegistration(PBHelper.convert(registration)); + for (DatanodeStorage s : storages) { + builder.addStorages(PBHelper.convert(s)); + } + RegisterDatanodeResponseProto resp; try { - resp = rpcProxy.registerDatanode(NULL_CONTROLLER, req); + resp = rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build()); } catch (ServiceException se) { throw ProtobufHelper.getRemoteException(se); } @@ -168,22 +175,19 @@ public class DatanodeProtocolClientSideTranslatorPB implements @Override public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration, - long capacity, long dfsUsed, long remaining, long blockPoolUsed, - int xmitsInProgress, int xceiverCount, int failedVolumes) - throws IOException { - StorageReportProto report = StorageReportProto.newBuilder() - .setBlockPoolUsed(blockPoolUsed).setCapacity(capacity) - .setDfsUsed(dfsUsed).setRemaining(remaining) - .setStorageID(registration.getStorageID()).build(); - - HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder() - .setRegistration(PBHelper.convert(registration)).addReports(report) + StorageReport[] reports, int xmitsInProgress, int xceiverCount, + int failedVolumes) throws IOException { + HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() + .setRegistration(PBHelper.convert(registration)) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) - .setFailedVolumes(failedVolumes) - .build(); + .setFailedVolumes(failedVolumes); + for (StorageReport r : reports) { + builder.addReports(PBHelper.convert(r)); + } + HeartbeatResponseProto resp; try { - resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, req); + resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build()); } catch (ServiceException se) { throw ProtobufHelper.getRemoteException(se); } @@ -198,21 +202,23 @@ public class DatanodeProtocolClientSideTranslatorPB implements @Override public DatanodeCommand blockReport(DatanodeRegistration registration, - String poolId, long[] blocks) throws IOException { - StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto - .newBuilder().setStorageID(registration.getStorageID()); + String poolId, StorageBlockReport[] reports) throws IOException { + BlockReportRequestProto.Builder builder = BlockReportRequestProto + .newBuilder().setRegistration(PBHelper.convert(registration)) + .setBlockPoolId(poolId); - if (blocks != null) { + for (StorageBlockReport r : reports) { + StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto + .newBuilder().setStorageID(r.getStorageID()); + long[] blocks = r.getBlocks(); for (int i = 0; i < blocks.length; i++) { reportBuilder.addBlocks(blocks[i]); } + builder.addReports(reportBuilder.build()); } - BlockReportRequestProto req = BlockReportRequestProto - .newBuilder().setRegistration(PBHelper.convert(registration)) - .setBlockPoolId(poolId).addReports(reportBuilder.build()).build(); BlockReportResponseProto resp; try { - resp = rpcProxy.blockReport(NULL_CONTROLLER, req); + resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build()); } catch (ServiceException se) { throw ProtobufHelper.getRemoteException(se); } @@ -220,23 +226,24 @@ public class DatanodeProtocolClientSideTranslatorPB implements } @Override - public void blockReceivedAndDeleted(DatanodeRegistration reg, - String poolId, ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) + public void blockReceivedAndDeleted(DatanodeRegistration registration, + String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException { - StorageReceivedDeletedBlocksProto.Builder builder = - StorageReceivedDeletedBlocksProto.newBuilder() - .setStorageID(reg.getStorageID()); - if (receivedAndDeletedBlocks != null) { - for (int i = 0; i < receivedAndDeletedBlocks.length; i++) { - builder.addBlocks(PBHelper.convert(receivedAndDeletedBlocks[i])); - } - } - BlockReceivedAndDeletedRequestProto req = + BlockReceivedAndDeletedRequestProto.Builder builder = BlockReceivedAndDeletedRequestProto.newBuilder() - .setRegistration(PBHelper.convert(reg)) - .setBlockPoolId(poolId).addBlocks(builder.build()).build(); + .setRegistration(PBHelper.convert(registration)) + .setBlockPoolId(poolId); + for (StorageReceivedDeletedBlocks storageBlock : receivedAndDeletedBlocks) { + StorageReceivedDeletedBlocksProto.Builder repBuilder = + StorageReceivedDeletedBlocksProto.newBuilder(); + repBuilder.setStorageID(storageBlock.getStorageID()); + for (ReceivedDeletedBlockInfo rdBlock : storageBlock.getBlocks()) { + repBuilder.addBlocks(PBHelper.convert(rdBlock)); + } + builder.addBlocks(repBuilder.build()); + } try { - rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, req); + rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, builder.build()); } catch (ServiceException se) { throw ProtobufHelper.getRemoteException(se); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index 77b288f6060..2ad7f31e20c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -40,6 +40,8 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterData import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; @@ -49,8 +51,12 @@ import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; @@ -84,8 +90,12 @@ public class DatanodeProtocolServerSideTranslatorPB implements DatanodeRegistration registration = PBHelper.convert(request .getRegistration()); DatanodeRegistration registrationResp; + DatanodeStorage[] storages = new DatanodeStorage[request.getStoragesCount()]; + for (int i = 0; i < request.getStoragesCount(); i++) { + storages[i] = PBHelper.convert(request.getStorages(i)); + } try { - registrationResp = impl.registerDatanode(registration); + registrationResp = impl.registerDatanode(registration, storages); } catch (IOException e) { throw new ServiceException(e); } @@ -98,11 +108,17 @@ public class DatanodeProtocolServerSideTranslatorPB implements HeartbeatRequestProto request) throws ServiceException { DatanodeCommand[] cmds = null; try { - StorageReportProto report = request.getReports(0); + List list = request.getReportsList(); + StorageReport[] report = new StorageReport[list.size()]; + int i = 0; + for (StorageReportProto p : list) { + report[i++] = new StorageReport(p.getStorageID(), p.getFailed(), + p.getCapacity(), p.getDfsUsed(), p.getRemaining(), + p.getBlockPoolUsed()); + } cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()), - report.getCapacity(), report.getDfsUsed(), report.getRemaining(), - report.getBlockPoolUsed(), request.getXmitsInProgress(), - request.getXceiverCount(), request.getFailedVolumes()); + report, request.getXmitsInProgress(), request.getXceiverCount(), + request.getFailedVolumes()); } catch (IOException e) { throw new ServiceException(e); } @@ -122,14 +138,21 @@ public class DatanodeProtocolServerSideTranslatorPB implements public BlockReportResponseProto blockReport(RpcController controller, BlockReportRequestProto request) throws ServiceException { DatanodeCommand cmd = null; - List blockIds = request.getReports(0).getBlocksList(); - long[] blocks = new long[blockIds.size()]; - for (int i = 0; i < blockIds.size(); i++) { - blocks[i] = blockIds.get(i); + StorageBlockReport[] report = + new StorageBlockReport[request.getReportsCount()]; + + int index = 0; + for (StorageBlockReportProto s : request.getReportsList()) { + List blockIds = s.getBlocksList(); + long[] blocks = new long[blockIds.size()]; + for (int i = 0; i < blockIds.size(); i++) { + blocks[i] = blockIds.get(i); + } + report[index++] = new StorageBlockReport(s.getStorageID(), blocks); } try { cmd = impl.blockReport(PBHelper.convert(request.getRegistration()), - request.getBlockPoolId(), blocks); + request.getBlockPoolId(), report); } catch (IOException e) { throw new ServiceException(e); } @@ -145,12 +168,18 @@ public class DatanodeProtocolServerSideTranslatorPB implements public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted( RpcController controller, BlockReceivedAndDeletedRequestProto request) throws ServiceException { - List rdbip = request.getBlocks(0) - .getBlocksList(); - ReceivedDeletedBlockInfo[] info = - new ReceivedDeletedBlockInfo[rdbip.size()]; - for (int i = 0; i < rdbip.size(); i++) { - info[i] = PBHelper.convert(rdbip.get(i)); + List sBlocks = request.getBlocksList(); + StorageReceivedDeletedBlocks[] info = + new StorageReceivedDeletedBlocks[sBlocks.size()]; + for (int i = 0; i < sBlocks.size(); i++) { + StorageReceivedDeletedBlocksProto sBlock = sBlocks.get(i); + List list = sBlock.getBlocksList(); + ReceivedDeletedBlockInfo[] rdBlocks = + new ReceivedDeletedBlockInfo[list.size()]; + for (int j = 0; j < list.size(); j++) { + rdBlocks[j] = PBHelper.convert(list.get(j)); + } + info[i] = new StorageReceivedDeletedBlocks(sBlock.getStorageID(), rdBlocks); } try { impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index c45e4cb1410..b30a077ee68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; @@ -52,10 +53,13 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommand import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto.StorageState; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.UpgradeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; @@ -108,6 +112,8 @@ import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; @@ -118,6 +124,7 @@ import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; @@ -1236,4 +1243,41 @@ public class PBHelper { setSpaceQuota(cs.getSpaceQuota()). build(); } + + public static DatanodeStorageProto convert(DatanodeStorage s) { + return DatanodeStorageProto.newBuilder() + .setState(PBHelper.convert(s.getState())) + .setStorageID(s.getStorageID()).build(); + } + + private static StorageState convert(State state) { + switch(state) { + case READ_ONLY: + return StorageState.READ_ONLY; + case NORMAL: + default: + return StorageState.NORMAL; + } + } + + public static DatanodeStorage convert(DatanodeStorageProto s) { + return new DatanodeStorage(s.getStorageID(), PBHelper.convert(s.getState())); + } + + private static State convert(StorageState state) { + switch(state) { + case READ_ONLY: + return DatanodeStorage.State.READ_ONLY; + case NORMAL: + default: + return DatanodeStorage.State.NORMAL; + } + } + + public static StorageReportProto convert(StorageReport r) { + return StorageReportProto.newBuilder() + .setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity()) + .setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining()) + .setStorageID(r.getStorageID()).build(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index ab8c8def843..111fbee2852 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; @@ -47,14 +46,17 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; @@ -295,8 +297,10 @@ class BPOfferService implements Runnable { } } if (receivedAndDeletedBlockArray != null) { + StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( + bpRegistration.getStorageID(), receivedAndDeletedBlockArray) }; bpNamenode.blockReceivedAndDeleted(bpRegistration, getBlockPoolId(), - receivedAndDeletedBlockArray); + report); synchronized (receivedAndDeletedBlockList) { for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) { receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]); @@ -365,8 +369,9 @@ class BPOfferService implements Runnable { // Send block report long brSendStartTime = now(); - cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport - .getBlockListAsLongs()); + StorageBlockReport[] report = { new StorageBlockReport( + bpRegistration.getStorageID(), bReport.getBlockListAsLongs()) }; + cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), report); // Log the block report processing stats from Datanode perspective long brSendCost = now() - brSendStartTime; @@ -398,11 +403,11 @@ class BPOfferService implements Runnable { DatanodeCommand [] sendHeartBeat() throws IOException { - return bpNamenode.sendHeartbeat(bpRegistration, - dn.data.getCapacity(), - dn.data.getDfsUsed(), - dn.data.getRemaining(), - dn.data.getBlockPoolUsed(getBlockPoolId()), + // reports number of failed volumes + StorageReport[] report = { new StorageReport(bpRegistration.getStorageID(), + false, dn.data.getCapacity(), dn.data.getDfsUsed(), + dn.data.getRemaining(), dn.data.getBlockPoolUsed(getBlockPoolId())) }; + return bpNamenode.sendHeartbeat(bpRegistration, report, dn.xmitsInProgress.get(), dn.getXceiverCount(), dn.data.getNumFailedVolumes()); } @@ -572,7 +577,8 @@ class BPOfferService implements Runnable { while (shouldRun()) { try { // Use returned registration from namenode with updated machine name. - bpRegistration = bpNamenode.registerDatanode(bpRegistration); + bpRegistration = bpNamenode.registerDatanode(bpRegistration, + new DatanodeStorage[0]); break; } catch(SocketTimeoutException e) { // namenode is busy LOG.info("Problem connecting to server: " + nnAddr); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 36e0695c943..92da5bedb9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -87,6 +87,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; @@ -94,8 +95,10 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; @@ -841,8 +844,8 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // DatanodeProtocol - public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg) - throws IOException { + public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg, + DatanodeStorage[] storages) throws IOException { verifyVersion(nodeReg.getVersion()); namesystem.registerDatanode(nodeReg); @@ -851,19 +854,20 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // DatanodeProtocol public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg, - long capacity, long dfsUsed, long remaining, long blockPoolUsed, - int xmitsInProgress, int xceiverCount, int failedVolumes) - throws IOException { + StorageReport[] report, int xmitsInProgress, int xceiverCount, + int failedVolumes) throws IOException { verifyRequest(nodeReg); - return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining, - blockPoolUsed, xceiverCount, xmitsInProgress, failedVolumes); + return namesystem.handleHeartbeat(nodeReg, report[0].getCapacity(), + report[0].getDfsUsed(), report[0].getRemaining(), + report[0].getBlockPoolUsed(), xceiverCount, xmitsInProgress, + failedVolumes); } @Override // DatanodeProtocol public DatanodeCommand blockReport(DatanodeRegistration nodeReg, - String poolId, long[] blocks) throws IOException { + String poolId, StorageBlockReport[] reports) throws IOException { verifyRequest(nodeReg); - BlockListAsLongs blist = new BlockListAsLongs(blocks); + BlockListAsLongs blist = new BlockListAsLongs(reports[0].getBlocks()); if(stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*BLOCK* NameNode.blockReport: " + "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks() @@ -878,7 +882,7 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // DatanodeProtocol public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, - ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException { + StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException { verifyRequest(nodeReg); if(stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " @@ -886,7 +890,7 @@ class NameNodeRpcServer implements NamenodeProtocols { +" blocks."); } namesystem.getBlockManager().blockReceivedAndDeleted( - nodeReg, poolId, receivedAndDeletedBlocks); + nodeReg, poolId, receivedAndDeletedBlocks[0].getBlocks()); } @Override // DatanodeProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 2a57c267df5..a12b042d4d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -80,13 +80,16 @@ public interface DatanodeProtocol extends VersionedProtocol { * Register Datanode. * * @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem#registerDatanode(DatanodeRegistration) - * + * @param registration datanode registration information + * @param storages list of storages on the datanode`` * @return updated {@link org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration}, which contains * new storageID if the datanode did not have one and * registration ID for further communication. */ - public DatanodeRegistration registerDatanode(DatanodeRegistration registration - ) throws IOException; + public DatanodeRegistration registerDatanode( + DatanodeRegistration registration, DatanodeStorage[] storages) + throws IOException; + /** * sendHeartbeat() tells the NameNode that the DataNode is still * alive and well. Includes some status info, too. @@ -95,19 +98,14 @@ public interface DatanodeProtocol extends VersionedProtocol { * A DatanodeCommand tells the DataNode to invalidate local block(s), * or to copy them to other DataNodes, etc. * @param registration datanode registration information - * @param capacity total storage capacity available at the datanode - * @param dfsUsed storage used by HDFS - * @param remaining remaining storage available for HDFS - * @param blockPoolUsed storage used by the block pool + * @param reports utilization report per storage * @param xmitsInProgress number of transfers from this datanode to others * @param xceiverCount number of active transceiver threads * @param failedVolumes number of failed volumes * @throws IOException on error */ public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration, - long capacity, - long dfsUsed, long remaining, - long blockPoolUsed, + StorageReport[] reports, int xmitsInProgress, int xceiverCount, int failedVolumes) throws IOException; @@ -120,7 +118,7 @@ public interface DatanodeProtocol extends VersionedProtocol { * infrequently afterwards. * @param registration * @param poolId - the block pool ID for the blocks - * @param blocks - the block list as an array of longs. + * @param reports - report of blocks per storage * Each block is represented as 2 longs. * This is done instead of Block[] to reduce memory used by block reports. * @@ -128,8 +126,7 @@ public interface DatanodeProtocol extends VersionedProtocol { * @throws IOException */ public DatanodeCommand blockReport(DatanodeRegistration registration, - String poolId, - long[] blocks) throws IOException; + String poolId, StorageBlockReport[] reports) throws IOException; /** * blockReceivedAndDeleted() allows the DataNode to tell the NameNode about @@ -143,7 +140,7 @@ public interface DatanodeProtocol extends VersionedProtocol { */ public void blockReceivedAndDeleted(DatanodeRegistration registration, String poolId, - ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) + StorageReceivedDeletedBlocks[] rcvdAndDeletedBlocks) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java new file mode 100644 index 00000000000..5166b091a9c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +/** + * Class capatures information about a storage in Datanode + */ +public class DatanodeStorage { + public enum State { + NORMAL, + READ_ONLY + } + + private final String storageID; + private final State state; + + public DatanodeStorage(String sid, State s) { + storageID = sid; + state = s; + } + + public String getStorageID() { + return storageID; + } + + public State getState() { + return state; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java new file mode 100644 index 00000000000..d734406100c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.protocol; + +/** + * Block report for a Datanode storage + */ +public class StorageBlockReport { + private final String storageID; + private final long[] blocks; + + public StorageBlockReport(String sid, long[] blocks) { + this.storageID = sid; + this.blocks = blocks; + } + + public String getStorageID() { + return storageID; + } + + public long[] getBlocks() { + return blocks; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java new file mode 100644 index 00000000000..c3d44ab131d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.protocol; + +/** + * Report of block received and deleted per Datanode + * storage. + */ +public class StorageReceivedDeletedBlocks { + private final String storageID; + private final ReceivedDeletedBlockInfo[] blocks; + + public String getStorageID() { + return storageID; + } + + public ReceivedDeletedBlockInfo[] getBlocks() { + return blocks; + } + + public StorageReceivedDeletedBlocks(final String storageID, + final ReceivedDeletedBlockInfo[] blocks) { + this.storageID = storageID; + this.blocks = blocks; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java new file mode 100644 index 00000000000..ff43c2f57f3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java @@ -0,0 +1,47 @@ +package org.apache.hadoop.hdfs.server.protocol; + +/** + * Utilization report for a Datanode storage + */ +public class StorageReport { + private final String storageID; + private final boolean failed; + private final long capacity; + private final long dfsUsed; + private final long remaining; + private final long blockPoolUsed; + + public StorageReport(String sid, boolean failed, long capacity, long dfsUsed, + long remaining, long bpUsed) { + this.storageID = sid; + this.failed = failed; + this.capacity = capacity; + this.dfsUsed = dfsUsed; + this.remaining = remaining; + this.blockPoolUsed = bpUsed; + } + + public String getStorageID() { + return storageID; + } + + public boolean isFailed() { + return failed; + } + + public long getCapacity() { + return capacity; + } + + public long getDfsUsed() { + return dfsUsed; + } + + public long getRemaining() { + return remaining; + } + + public long getBlockPoolUsed() { + return blockPoolUsed; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java index 82e24e65c3a..a12cb334801 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; import org.apache.log4j.Level; @@ -146,8 +147,9 @@ public class TestBlockReport { DataNode dn = cluster.getDataNodes().get(DN_N0); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - cluster.getNameNodeRpc().blockReport(dnR, poolId, - new BlockListAsLongs(blocks, null).getBlockListAsLongs()); + StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(), + new BlockListAsLongs(blocks, null).getBlockListAsLongs()) }; + cluster.getNameNodeRpc().blockReport(dnR, poolId, report); List blocksAfterReport = DFSTestUtil.getAllBlocks(fs.open(filePath)); @@ -180,7 +182,7 @@ public class TestBlockReport { Path filePath = new Path("/" + METHOD_NAME + ".dat"); DFSTestUtil.createFile(fs, filePath, - (long) FILE_SIZE, REPL_FACTOR, rand.nextLong()); + FILE_SIZE, REPL_FACTOR, rand.nextLong()); // mock around with newly created blocks and delete some File dataDir = new File(cluster.getDataDirectory()); @@ -226,8 +228,9 @@ public class TestBlockReport { DataNode dn = cluster.getDataNodes().get(DN_N0); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - cluster.getNameNodeRpc().blockReport(dnR, poolId, - new BlockListAsLongs(blocks, null).getBlockListAsLongs()); + StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(), + new BlockListAsLongs(blocks, null).getBlockListAsLongs()) }; + cluster.getNameNodeRpc().blockReport(dnR, poolId, report); BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem() .getBlockManager()); @@ -266,9 +269,10 @@ public class TestBlockReport { DataNode dn = cluster.getDataNodes().get(DN_N0); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); + StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(), + new BlockListAsLongs(blocks, null).getBlockListAsLongs()) }; DatanodeCommand dnCmd = - cluster.getNameNodeRpc().blockReport(dnR, poolId, - new BlockListAsLongs(blocks, null).getBlockListAsLongs()); + cluster.getNameNodeRpc().blockReport(dnR, poolId, report); if(LOG.isDebugEnabled()) { LOG.debug("Got the command: " + dnCmd); } @@ -284,9 +288,8 @@ public class TestBlockReport { * This test isn't a representative case for BlockReport * The empty method is going to be left here to keep the naming * of the test plan in synch with the actual implementation - * @throws IOException in case of errors */ - public void blockReport_04() throws IOException { + public void blockReport_04() { } // Client requests new block from NN. The test corrupts this very block @@ -295,7 +298,7 @@ public class TestBlockReport { // BlockScanner which is out of scope of this test // Keeping the name to be in synch with the test plan // - public void blockReport_05() throws IOException { + public void blockReport_05() { } /** @@ -319,8 +322,9 @@ public class TestBlockReport { DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - cluster.getNameNodeRpc().blockReport(dnR, poolId, - new BlockListAsLongs(blocks, null).getBlockListAsLongs()); + StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(), + new BlockListAsLongs(blocks, null).getBlockListAsLongs()) }; + cluster.getNameNodeRpc().blockReport(dnR, poolId, report); printStats(); assertEquals("Wrong number of PendingReplication Blocks", 0, cluster.getNamesystem().getUnderReplicatedBlocks()); @@ -368,8 +372,9 @@ public class TestBlockReport { DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - cluster.getNameNodeRpc().blockReport(dnR, poolId, - new BlockListAsLongs(blocks, null).getBlockListAsLongs()); + StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(), + new BlockListAsLongs(blocks, null).getBlockListAsLongs()) }; + cluster.getNameNodeRpc().blockReport(dnR, poolId, report); printStats(); assertEquals("Wrong number of Corrupted blocks", 1, cluster.getNamesystem().getCorruptReplicaBlocks() + @@ -390,8 +395,9 @@ public class TestBlockReport { LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName()); } - cluster.getNameNodeRpc().blockReport(dnR, poolId, + report[0] = new StorageBlockReport(dnR.getStorageID(), new BlockListAsLongs(blocks, null).getBlockListAsLongs()); + cluster.getNameNodeRpc().blockReport(dnR, poolId, report); printStats(); assertEquals("Wrong number of Corrupted blocks", @@ -440,8 +446,9 @@ public class TestBlockReport { DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - cluster.getNameNodeRpc().blockReport(dnR, poolId, - new BlockListAsLongs(blocks, null).getBlockListAsLongs()); + StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(), + new BlockListAsLongs(blocks, null).getBlockListAsLongs()) }; + cluster.getNameNodeRpc().blockReport(dnR, poolId, report); printStats(); assertEquals("Wrong number of PendingReplication blocks", blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks()); @@ -486,8 +493,9 @@ public class TestBlockReport { DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - cluster.getNameNodeRpc().blockReport(dnR, poolId, - new BlockListAsLongs(blocks, null).getBlockListAsLongs()); + StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(), + new BlockListAsLongs(blocks, null).getBlockListAsLongs()) }; + cluster.getNameNodeRpc().blockReport(dnR, poolId, report); printStats(); assertEquals("Wrong number of PendingReplication blocks", 2, cluster.getNamesystem().getPendingReplicationBlocks()); @@ -550,7 +558,7 @@ public class TestBlockReport { .when(spy).blockReport( Mockito.anyObject(), Mockito.anyString(), - Mockito.anyObject()); + Mockito.anyObject()); // Force a block report to be generated. The block report will have // an RBW replica in it. Wait for the RPC to be sent, but block @@ -638,8 +646,7 @@ public class TestBlockReport { // Write file and start second data node. private ArrayList writeFile(final String METHOD_NAME, final long fileSize, - Path filePath) - throws IOException { + Path filePath) { ArrayList blocks = null; try { REPL_FACTOR = 2; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index c6fb4304da2..86d63a36d77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.net.NetUtils; @@ -144,8 +145,9 @@ public class TestDataNodeVolumeFailure { DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3 String bpid = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid); - long[] bReport = dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs(); - cluster.getNameNodeRpc().blockReport(dnR, bpid, bReport); + StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(), + dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs()) }; + cluster.getNameNodeRpc().blockReport(dnR, bpid, report); // verify number of blocks and files... verify(filename, filesize); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index afc003f9381..d128167e5b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -25,8 +25,6 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; -import javax.security.auth.login.LoginException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; @@ -46,9 +44,13 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.io.EnumSetWritable; @@ -104,7 +106,7 @@ public class NNThroughputBenchmark { static NameNode nameNode; static NamenodeProtocols nameNodeProto; - NNThroughputBenchmark(Configuration conf) throws IOException, LoginException { + NNThroughputBenchmark(Configuration conf) throws IOException { config = conf; // We do not need many handlers, since each thread simulates a handler // by calling name-node methods directly @@ -125,7 +127,7 @@ public class NNThroughputBenchmark { nameNodeProto = nameNode.getRpcServer(); } - void close() throws IOException { + void close() { nameNode.stop(); } @@ -795,7 +797,10 @@ public class NNThroughputBenchmark { dnRegistration.setStorageInfo(new DataStorage(nsInfo, "")); DataNode.setNewStorageID(dnRegistration); // register datanode - dnRegistration = nameNodeProto.registerDatanode(dnRegistration); + + DatanodeStorage[] storages = { new DatanodeStorage( + dnRegistration.getStorageID(), DatanodeStorage.State.NORMAL) }; + dnRegistration = nameNodeProto.registerDatanode(dnRegistration, storages); } /** @@ -805,8 +810,10 @@ public class NNThroughputBenchmark { void sendHeartbeat() throws IOException { // register datanode // TODO:FEDERATION currently a single block pool is supported + StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(), + false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, - DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0); + rep, 0, 0, 0); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { if(LOG.isDebugEnabled()) { @@ -849,9 +856,10 @@ public class NNThroughputBenchmark { @SuppressWarnings("unused") // keep it for future blockReceived benchmark int replicateBlocks() throws IOException { // register datanode - // TODO:FEDERATION currently a single block pool is supported + StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(), + false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, - DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0, 0); + rep, 0, 0, 0); if (cmds != null) { for (DatanodeCommand cmd : cmds) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { @@ -881,10 +889,12 @@ public class NNThroughputBenchmark { receivedDNReg.setStorageInfo( new DataStorage(nsInfo, dnInfo.getStorageID())); receivedDNReg.setInfoPort(dnInfo.getInfoPort()); + ReceivedDeletedBlockInfo[] rdBlocks = { new ReceivedDeletedBlockInfo( + blocks[i], DataNode.EMPTY_DEL_HINT) }; + StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( + receivedDNReg.getStorageID(), rdBlocks) }; nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode - .getNamesystem().getBlockPoolId(), - new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo( - blocks[i], DataNode.EMPTY_DEL_HINT) }); + .getNamesystem().getBlockPoolId(), report); } } return blocks.length; @@ -916,7 +926,7 @@ public class NNThroughputBenchmark { config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3 * 60); parseArguments(args); // adjust replication to the number of data-nodes - this.replication = (short)Math.min((int)replication, getNumDatanodes()); + this.replication = (short)Math.min(replication, getNumDatanodes()); } /** @@ -996,10 +1006,12 @@ public class NNThroughputBenchmark { for(DatanodeInfo dnInfo : loc.getLocations()) { int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName()); datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock()); + ReceivedDeletedBlockInfo[] rdBlocks = { new ReceivedDeletedBlockInfo( + loc.getBlock().getLocalBlock(), "") }; + StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( + datanodes[dnIdx].dnRegistration.getStorageID(), rdBlocks) }; nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc - .getBlock().getBlockPoolId(), - new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(loc - .getBlock().getLocalBlock(), "") }); + .getBlock().getBlockPoolId(), report); } } return prevBlock; @@ -1016,8 +1028,10 @@ public class NNThroughputBenchmark { assert daemonId < numThreads : "Wrong daemonId."; TinyDatanode dn = datanodes[daemonId]; long start = System.currentTimeMillis(); + StorageBlockReport[] report = { new StorageBlockReport( + dn.dnRegistration.getStorageID(), dn.getBlockReportList()) }; nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem() - .getBlockPoolId(), dn.getBlockReportList()); + .getBlockPoolId(), report); long end = System.currentTimeMillis(); return end-start; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index 2e73ec556a2..301c4d4f362 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -38,6 +38,9 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.junit.After; import org.junit.Test; @@ -108,19 +111,22 @@ public class TestDeadDatanode { ReceivedDeletedBlockInfo[] blocks = { new ReceivedDeletedBlockInfo( new Block(0), "") }; + StorageReceivedDeletedBlocks[] storageBlocks = { + new StorageReceivedDeletedBlocks(reg.getStorageID(), blocks) }; // Ensure blockReceived call from dead datanode is rejected with IOException try { - dnp.blockReceivedAndDeleted(reg, poolId, blocks); + dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks); Assert.fail("Expected IOException is not thrown"); } catch (IOException ex) { // Expected } // Ensure blockReport from dead datanode is rejected with IOException - long[] blockReport = new long[] { 0L, 0L, 0L }; + StorageBlockReport[] report = { new StorageBlockReport(reg.getStorageID(), + new long[] { 0L, 0L, 0L }) }; try { - dnp.blockReport(reg, poolId, blockReport); + dnp.blockReport(reg, poolId, report); Assert.fail("Expected IOException is not thrown"); } catch (IOException ex) { // Expected @@ -128,7 +134,9 @@ public class TestDeadDatanode { // Ensure heartbeat from dead datanode is rejected with a command // that asks datanode to register again - DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, 0, 0, 0, 0, 0, 0, 0); + StorageReport[] rep = { new StorageReport(reg.getStorageID(), false, 0, 0, + 0, 0) }; + DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0, 0, 0); Assert.assertEquals(1, cmd.length); Assert.assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER .getAction());