From 01f37e42f050207b7659bf74e2484cf8bdae2d89 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Wed, 23 Oct 2013 01:28:48 +0000 Subject: [PATCH] HDFS-5390. Send one incremental block report per storage directory. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1534891 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES_HDFS-2832.txt | 3 + .../hdfs/server/datanode/BPOfferService.java | 13 +- .../hdfs/server/datanode/BPServiceActor.java | 179 ++++++++++++------ .../hdfs/server/datanode/BlockReceiver.java | 16 +- .../hadoop/hdfs/server/datanode/DataNode.java | 20 +- .../hdfs/server/datanode/DataXceiver.java | 11 +- .../hadoop/hdfs/server/datanode/Replica.java | 5 + .../hdfs/server/datanode/ReplicaInfo.java | 8 + .../datanode/fsdataset/FsDatasetSpi.java | 2 +- .../datanode/fsdataset/FsVolumeSpi.java | 3 + .../impl/FsDatasetAsyncDiskService.java | 2 +- .../fsdataset/impl/FsDatasetImpl.java | 3 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 1 + .../server/datanode/SimulatedFSDataset.java | 14 +- .../server/datanode/TestBPOfferService.java | 2 +- .../server/datanode/TestDirectoryScanner.java | 5 + 16 files changed, 204 insertions(+), 83 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt index edc6bee4fe0..80b8ba800d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt @@ -42,3 +42,6 @@ IMPROVEMENTS: HDFS-5398. NameNode changes to process storage reports per storage directory. (Arpit Agarwal) + + HDFS-5390. Send one incremental block report per storage directory. + (Arpit Agarwal) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index e11cc87b4a8..c9486e9e4bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -192,7 +192,8 @@ class BPOfferService { * till namenode is informed before responding with success to the * client? For now we don't. */ - void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) { + void notifyNamenodeReceivedBlock( + ExtendedBlock block, String delHint, String storageUuid) { checkBlock(block); checkDelHint(delHint); ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( @@ -201,7 +202,7 @@ class BPOfferService { delHint); for (BPServiceActor actor : bpServices) { - actor.notifyNamenodeBlockImmediately(bInfo); + actor.notifyNamenodeBlockImmediately(bInfo, storageUuid); } } @@ -218,23 +219,23 @@ class BPOfferService { "delHint is null"); } - void notifyNamenodeDeletedBlock(ExtendedBlock block) { + void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) { checkBlock(block); ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null); for (BPServiceActor actor : bpServices) { - actor.notifyNamenodeDeletedBlock(bInfo); + actor.notifyNamenodeDeletedBlock(bInfo, storageUuid); } } - void notifyNamenodeReceivingBlock(ExtendedBlock block) { + void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) { checkBlock(block); ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null); for (BPServiceActor actor : bpServices) { - actor.notifyNamenodeBlockImmediately(bInfo); + actor.notifyNamenodeBlockImmediately(bInfo, storageUuid); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 32e32966f79..4eb843d247f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.util.Collection; -import java.util.Iterator; import java.util.Map; import org.apache.commons.logging.Log; @@ -94,9 +93,9 @@ class BPServiceActor implements Runnable { * keyed by block ID, contains the pending changes which have yet to be * reported to the NN. Access should be synchronized on this object. */ - private final Map pendingIncrementalBR - = Maps.newHashMap(); - + private final Map + pendingIncrementalBRperStorage = Maps.newConcurrentMap(); + private volatile int pendingReceivedRequests = 0; private volatile boolean shouldServiceRun = true; private final DataNode dn; @@ -263,64 +262,84 @@ class BPServiceActor implements Runnable { * @throws IOException */ private void reportReceivedDeletedBlocks() throws IOException { - - // check if there are newly received blocks - ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null; - synchronized (pendingIncrementalBR) { - int numBlocks = pendingIncrementalBR.size(); - if (numBlocks > 0) { - // - // Send newly-received and deleted blockids to namenode - // - receivedAndDeletedBlockArray = pendingIncrementalBR - .values().toArray(new ReceivedDeletedBlockInfo[numBlocks]); + // For each storage, check if there are newly received blocks and if + // so then send an incremental report to the NameNode. + for (Map.Entry entry : + pendingIncrementalBRperStorage.entrySet()) { + final String storageUuid = entry.getKey(); + final PerStoragePendingIncrementalBR perStorageMap = entry.getValue(); + ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null; + // TODO: We can probably use finer-grained synchronization now. + synchronized (pendingIncrementalBRperStorage) { + if (perStorageMap.getBlockInfoCount() > 0) { + // Send newly-received and deleted blockids to namenode + receivedAndDeletedBlockArray = perStorageMap.dequeueBlockInfos(); + pendingReceivedRequests -= receivedAndDeletedBlockArray.length; + } } - pendingIncrementalBR.clear(); - } - if (receivedAndDeletedBlockArray != null) { - StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( - bpRegistration.getDatanodeUuid(), receivedAndDeletedBlockArray) }; - boolean success = false; - try { - bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(), - report); - success = true; - } finally { - synchronized (pendingIncrementalBR) { - if (!success) { - // If we didn't succeed in sending the report, put all of the - // blocks back onto our queue, but only in the case where we didn't - // put something newer in the meantime. - for (ReceivedDeletedBlockInfo rdbi : receivedAndDeletedBlockArray) { - if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) { - pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi); - } + + if (receivedAndDeletedBlockArray != null) { + StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( + storageUuid, receivedAndDeletedBlockArray) }; + boolean success = false; + try { + bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(), + report); + success = true; + } finally { + synchronized (pendingIncrementalBRperStorage) { + if (!success) { + // If we didn't succeed in sending the report, put all of the + // blocks back onto our queue, but only in the case where we + // didn't put something newer in the meantime. + perStorageMap.putMissingBlockInfos(receivedAndDeletedBlockArray); + pendingReceivedRequests += perStorageMap.getBlockInfoCount(); } } - pendingReceivedRequests = pendingIncrementalBR.size(); } } } } + /** + * Retrieve the incremental BR state for a given storage UUID + * @param storageUuid + * @return + */ + private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage( + String storageUuid) { + PerStoragePendingIncrementalBR mapForStorage = + pendingIncrementalBRperStorage.get(storageUuid); + + if (mapForStorage == null) { + // This is the first time we are adding incremental BR state for + // this storage so create a new map. This is required once per + // storage, per service actor. + mapForStorage = new PerStoragePendingIncrementalBR(); + pendingIncrementalBRperStorage.put(storageUuid, mapForStorage); + } + + return mapForStorage; + } + /* * Informing the name node could take a long long time! Should we wait * till namenode is informed before responding with success to the * client? For now we don't. */ - void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo bInfo) { - synchronized (pendingIncrementalBR) { - pendingIncrementalBR.put( - bInfo.getBlock().getBlockId(), bInfo); + void notifyNamenodeBlockImmediately( + ReceivedDeletedBlockInfo bInfo, String storageUuid) { + synchronized (pendingIncrementalBRperStorage) { + getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo); pendingReceivedRequests++; - pendingIncrementalBR.notifyAll(); + pendingIncrementalBRperStorage.notifyAll(); } } - void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo bInfo) { - synchronized (pendingIncrementalBR) { - pendingIncrementalBR.put( - bInfo.getBlock().getBlockId(), bInfo); + void notifyNamenodeDeletedBlock( + ReceivedDeletedBlockInfo bInfo, String storageUuid) { + synchronized (pendingIncrementalBRperStorage) { + getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo); } } @@ -329,13 +348,13 @@ class BPServiceActor implements Runnable { */ @VisibleForTesting void triggerBlockReportForTests() { - synchronized (pendingIncrementalBR) { + synchronized (pendingIncrementalBRperStorage) { lastBlockReport = 0; lastHeartbeat = 0; - pendingIncrementalBR.notifyAll(); + pendingIncrementalBRperStorage.notifyAll(); while (lastBlockReport == 0) { try { - pendingIncrementalBR.wait(100); + pendingIncrementalBRperStorage.wait(100); } catch (InterruptedException e) { return; } @@ -345,12 +364,12 @@ class BPServiceActor implements Runnable { @VisibleForTesting void triggerHeartbeatForTests() { - synchronized (pendingIncrementalBR) { + synchronized (pendingIncrementalBRperStorage) { lastHeartbeat = 0; - pendingIncrementalBR.notifyAll(); + pendingIncrementalBRperStorage.notifyAll(); while (lastHeartbeat == 0) { try { - pendingIncrementalBR.wait(100); + pendingIncrementalBRperStorage.wait(100); } catch (InterruptedException e) { return; } @@ -360,13 +379,13 @@ class BPServiceActor implements Runnable { @VisibleForTesting void triggerDeletionReportForTests() { - synchronized (pendingIncrementalBR) { + synchronized (pendingIncrementalBRperStorage) { lastDeletedReport = 0; - pendingIncrementalBR.notifyAll(); + pendingIncrementalBRperStorage.notifyAll(); while (lastDeletedReport == 0) { try { - pendingIncrementalBR.wait(100); + pendingIncrementalBRperStorage.wait(100); } catch (InterruptedException e) { return; } @@ -582,10 +601,10 @@ class BPServiceActor implements Runnable { // long waitTime = dnConf.heartBeatInterval - (Time.now() - lastHeartbeat); - synchronized(pendingIncrementalBR) { + synchronized(pendingIncrementalBRperStorage) { if (waitTime > 0 && pendingReceivedRequests == 0) { try { - pendingIncrementalBR.wait(waitTime); + pendingIncrementalBRperStorage.wait(waitTime); } catch (InterruptedException ie) { LOG.warn("BPOfferService for " + this + " interrupted"); } @@ -756,4 +775,52 @@ class BPServiceActor implements Runnable { } } + private static class PerStoragePendingIncrementalBR { + private Map pendingIncrementalBR = + Maps.newHashMap(); + + /** + * Return the number of blocks on this storage that have pending + * incremental block reports. + * @return + */ + int getBlockInfoCount() { + return pendingIncrementalBR.size(); + } + + /** + * Dequeue and return all pending incremental block report state. + * @return + */ + ReceivedDeletedBlockInfo[] dequeueBlockInfos() { + ReceivedDeletedBlockInfo[] blockInfos = + pendingIncrementalBR.values().toArray( + new ReceivedDeletedBlockInfo[getBlockInfoCount()]); + + pendingIncrementalBR.clear(); + return blockInfos; + } + + /** + * Add blocks from blockArray to pendingIncrementalBR, unless the + * block already exists in pendingIncrementalBR. + * @param blockArray list of blocks to add. + */ + void putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) { + for (ReceivedDeletedBlockInfo rdbi : blockArray) { + if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) { + pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi); + } + } + } + + /** + * Add pending incremental block report for a single block. + * @param blockID + * @param blockInfo + */ + void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) { + pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 625daf06253..2dd4194114b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -162,7 +162,8 @@ class BlockReceiver implements Closeable { switch (stage) { case PIPELINE_SETUP_CREATE: replicaInfo = datanode.data.createRbw(block); - datanode.notifyNamenodeReceivingBlock(block); + datanode.notifyNamenodeReceivingBlock( + block, replicaInfo.getStorageUuid()); break; case PIPELINE_SETUP_STREAMING_RECOVERY: replicaInfo = datanode.data.recoverRbw( @@ -176,7 +177,8 @@ class BlockReceiver implements Closeable { block.getLocalBlock()); } block.setGenerationStamp(newGs); - datanode.notifyNamenodeReceivingBlock(block); + datanode.notifyNamenodeReceivingBlock( + block, replicaInfo.getStorageUuid()); break; case PIPELINE_SETUP_APPEND_RECOVERY: replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd); @@ -185,7 +187,8 @@ class BlockReceiver implements Closeable { block.getLocalBlock()); } block.setGenerationStamp(newGs); - datanode.notifyNamenodeReceivingBlock(block); + datanode.notifyNamenodeReceivingBlock( + block, replicaInfo.getStorageUuid()); break; case TRANSFER_RBW: case TRANSFER_FINALIZED: @@ -252,6 +255,10 @@ class BlockReceiver implements Closeable { /** Return the datanode object. */ DataNode getDataNode() {return datanode;} + public Replica getReplicaInfo() { + return replicaInfo; + } + /** * close files. */ @@ -1072,7 +1079,8 @@ class BlockReceiver implements Closeable { : 0; block.setNumBytes(replicaInfo.getNumBytes()); datanode.data.finalizeBlock(block); - datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT); + datanode.closeBlock( + block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid()); if (ClientTraceLog.isInfoEnabled() && isClient) { long offset = 0; DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c051452aaca..526c89e7a43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -520,10 +520,11 @@ public class DataNode extends Configured } // calls specific to BP - protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) { + protected void notifyNamenodeReceivedBlock( + ExtendedBlock block, String delHint, String storageUuid) { BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); if(bpos != null) { - bpos.notifyNamenodeReceivedBlock(block, delHint); + bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid); } else { LOG.error("Cannot find BPOfferService for reporting block received for bpid=" + block.getBlockPoolId()); @@ -531,10 +532,11 @@ public class DataNode extends Configured } // calls specific to BP - protected void notifyNamenodeReceivingBlock(ExtendedBlock block) { + protected void notifyNamenodeReceivingBlock( + ExtendedBlock block, String storageUuid) { BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); if(bpos != null) { - bpos.notifyNamenodeReceivingBlock(block); + bpos.notifyNamenodeReceivingBlock(block, storageUuid); } else { LOG.error("Cannot find BPOfferService for reporting block receiving for bpid=" + block.getBlockPoolId()); @@ -542,10 +544,10 @@ public class DataNode extends Configured } /** Notify the corresponding namenode to delete the block. */ - public void notifyNamenodeDeletedBlock(ExtendedBlock block) { + public void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) { BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); if (bpos != null) { - bpos.notifyNamenodeDeletedBlock(block); + bpos.notifyNamenodeDeletedBlock(block, storageUuid); } else { LOG.error("Cannot find BPOfferService for reporting block deleted for bpid=" + block.getBlockPoolId()); @@ -1528,11 +1530,11 @@ public class DataNode extends Configured * @param block * @param delHint */ - void closeBlock(ExtendedBlock block, String delHint) { + void closeBlock(ExtendedBlock block, String delHint, String storageUuid) { metrics.incrBlocksWritten(); BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); if(bpos != null) { - bpos.notifyNamenodeReceivedBlock(block, delHint); + bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid); } else { LOG.warn("Cannot find BPOfferService for reporting block received for bpid=" + block.getBlockPoolId()); @@ -1892,7 +1894,7 @@ public class DataNode extends Configured ExtendedBlock newBlock = new ExtendedBlock(oldBlock); newBlock.setGenerationStamp(recoveryId); newBlock.setNumBytes(newLength); - notifyNamenodeReceivedBlock(newBlock, ""); + notifyNamenodeReceivedBlock(newBlock, "", storageID); return storageID; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index e1de714d2ba..b4da358bda6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -447,6 +447,7 @@ class DataXceiver extends Receiver implements Runnable { String mirrorNode = null; // the name:port of next target String firstBadLink = ""; // first datanode that failed in connection setup Status mirrorInStatus = SUCCESS; + Replica replica; try { if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { @@ -457,8 +458,10 @@ class DataXceiver extends Receiver implements Runnable { stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, cachingStrategy); + replica = blockReceiver.getReplicaInfo(); } else { - datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd); + replica = + datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd); } // @@ -590,7 +593,8 @@ class DataXceiver extends Receiver implements Runnable { // the block is finalized in the PacketResponder. if (isDatanode || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { - datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT); + datanode.closeBlock( + block, DataNode.EMPTY_DEL_HINT, replica.getStorageUuid()); LOG.info("Received " + block + " src: " + remoteAddress + " dest: " + localAddress + " of size " + block.getNumBytes()); } @@ -859,7 +863,8 @@ class DataXceiver extends Receiver implements Runnable { dataXceiverServer.balanceThrottler, null); // notify name node - datanode.notifyNamenodeReceivedBlock(block, delHint); + datanode.notifyNamenodeReceivedBlock( + block, delHint, blockReceiver.getReplicaInfo().getStorageUuid()); LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java index bd0485394a3..a480bb161f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java @@ -54,4 +54,9 @@ public interface Replica { * @return the number of bytes that are visible to readers */ public long getVisibleLength(); + + /** + * Return the storageUuid of the volume that stores this replica. + */ + public String getStorageUuid(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index 76bd1c0c6a4..27467f23ff3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -137,6 +137,14 @@ abstract public class ReplicaInfo extends Block implements Replica { void setVolume(FsVolumeSpi vol) { this.volume = vol; } + + /** + * Get the storageUuid of the volume that stores this replica. + */ + @Override + public String getStorageUuid() { + return volume.getStorageID(); + } /** * Return the parent directory path where this replica is located diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index f4c49bc04e8..3f8158b9a9b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -243,7 +243,7 @@ public interface FsDatasetSpi extends FSDatasetMBean { * @param expectedBlockLen the number of bytes the replica is expected to have * @throws IOException */ - public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen + public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen ) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index c0d71303876..b14ef562541 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -26,6 +26,9 @@ import org.apache.hadoop.hdfs.StorageType; * This is an interface for the underlying volume. */ public interface FsVolumeSpi { + /** @return the StorageUuid of the volume */ + public String getStorageID(); + /** @return a list of block pools. */ public String[] getBlockPoolList(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 806921d2263..4c0e19e612e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -195,7 +195,7 @@ class FsDatasetAsyncDiskService { + " at file " + blockFile + ". Ignored."); } else { if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){ - datanode.notifyNamenodeDeletedBlock(block); + datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID()); } volume.decDfsUsed(block.getBlockPoolId(), dfsBytes); LOG.info("Deleted " + block.getBlockPoolId() + " " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 85238e459bf..f5928e47cdb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -699,7 +699,7 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override // FsDatasetSpi - public void recoverClose(ExtendedBlock b, long newGS, + public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { LOG.info("Recover failed close " + b); // check replica's state @@ -710,6 +710,7 @@ class FsDatasetImpl implements FsDatasetSpi { if (replicaInfo.getState() == ReplicaState.RBW) { finalizeReplica(b.getBlockPoolId(), replicaInfo); } + return replicaInfo; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 30574b3b6d9..24eb5779070 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -290,6 +290,7 @@ class FsVolumeImpl implements FsVolumeSpi { } } + @Override public String getStorageID() { return storageID; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 05033d6ff5c..bf211afdcea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -131,6 +131,11 @@ public class SimulatedFSDataset implements FsDatasetSpi { } } + @Override + public String getStorageUuid() { + return storage.getStorageUuid(); + } + @Override synchronized public long getGenerationStamp() { return theBlock.getGenerationStamp(); @@ -314,6 +319,8 @@ public class SimulatedFSDataset implements FsDatasetSpi { private static class SimulatedStorage { private Map map = new HashMap(); + private final String storageUuid = "SimulatedStorage-UUID"; + private long capacity; // in bytes synchronized long getFree() { @@ -375,6 +382,10 @@ public class SimulatedFSDataset implements FsDatasetSpi { } return bpStorage; } + + public String getStorageUuid() { + return storageUuid; + } } private final Map> blockMap @@ -625,7 +636,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { } @Override // FsDatasetSpi - public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) + public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { final Map map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); @@ -639,6 +650,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { map.remove(b.getLocalBlock()); binfo.theBlock.setGenerationStamp(newGS); map.put(binfo.theBlock, binfo); + return binfo; } @Override // FsDatasetSpi diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 2b027004fe0..5374917fb70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -176,7 +176,7 @@ public class TestBPOfferService { waitForBlockReport(mockNN2); // When we receive a block, it should report it to both NNs - bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, ""); + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, "", ""); ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1); assertEquals(1, ret.length); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index cec8a15daa4..25182669f92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -412,6 +412,11 @@ public class TestDirectoryScanner { public StorageType getStorageType() { return StorageType.DEFAULT; } + + @Override + public String getStorageID() { + return ""; + } } private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();