From 5b95971f8a6dee09d1143c6cf121afa22fa6c16e Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 26 Feb 2016 15:32:25 -0800 Subject: [PATCH] HDFS-9710. DN can be configured to send block receipt IBRs in batches. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../hdfs/server/datanode/BPOfferService.java | 17 +- .../hdfs/server/datanode/BPServiceActor.java | 19 +- .../hdfs/server/datanode/BlockReceiver.java | 8 +- .../hadoop/hdfs/server/datanode/DNConf.java | 4 + .../hadoop/hdfs/server/datanode/DataNode.java | 26 +- .../hdfs/server/datanode/DataXceiver.java | 14 +- .../IncrementalBlockReportManager.java | 51 +++- .../datanode/fsdataset/FsDatasetSpi.java | 4 +- .../fsdataset/impl/FsDatasetImpl.java | 12 +- .../StorageReceivedDeletedBlocks.java | 7 + .../server/datanode/SimulatedFSDataset.java | 11 +- .../server/datanode/TestBPOfferService.java | 2 +- .../hdfs/server/datanode/TestBatchIbr.java | 263 ++++++++++++++++++ .../TestDataXceiverLazyPersistHint.java | 6 +- .../datanode/TestIncrementalBlockReports.java | 2 +- .../extdataset/ExternalDatasetImpl.java | 4 +- .../impl/TestInterDatanodeProtocol.java | 6 +- 19 files changed, 388 insertions(+), 75 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBatchIbr.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 652a87fc0b2..5b036491718 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1014,6 +1014,9 @@ Release 2.8.0 - UNRELEASED HDFS-9425. Expose number of blocks per volume as a metric (Brahma Reddy Battula via vinayakumarb) + HDFS-9710. DN can be configured to send block receipt IBRs in batches. + (szetszwo) + BUG FIXES HDFS-8091: ACLStatus and XAttributes should be presented to diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 99d944019a9..bd12bf5b819 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -517,6 +517,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval"; public static final int DFS_DF_INTERVAL_DEFAULT = 60000; + public static final String DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY + = "dfs.blockreport.incremental.intervalMsec"; + public static final long DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT + = 0; public static final String DFS_BLOCKREPORT_INTERVAL_MSEC_KEY = "dfs.blockreport.intervalMsec"; public static final long DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 6 * 60 * 60 * 1000; public static final String DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 37a10c73f8d..b4d89fc4ebc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -231,29 +231,32 @@ class BPOfferService { * till namenode is informed before responding with success to the * client? For now we don't. */ - void notifyNamenodeReceivedBlock( - ExtendedBlock block, String delHint, String storageUuid) { + void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint, + String storageUuid, boolean isOnTransientStorage) { notifyNamenodeBlock(block, BlockStatus.RECEIVED_BLOCK, delHint, - storageUuid); + storageUuid, isOnTransientStorage); } void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) { - notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid); + notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid, + false); } void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) { - notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid); + notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid, + false); } private void notifyNamenodeBlock(ExtendedBlock block, BlockStatus status, - String delHint, String storageUuid) { + String delHint, String storageUuid, boolean isOnTransientStorage) { checkBlock(block); final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo( block.getLocalBlock(), status, delHint); final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid); for (BPServiceActor actor : bpServices) { - actor.getIbrManager().notifyNamenodeBlock(info, storage); + actor.getIbrManager().notifyNamenodeBlock(info, storage, + isOnTransientStorage); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 336b943fca6..6d42e34cb4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -97,8 +97,7 @@ class BPServiceActor implements Runnable { private final DNConf dnConf; private long prevBlockReportId; - private final IncrementalBlockReportManager ibrManager - = new IncrementalBlockReportManager(); + private final IncrementalBlockReportManager ibrManager; private DatanodeRegistration bpRegistration; final LinkedList bpThreadQueue @@ -109,6 +108,7 @@ class BPServiceActor implements Runnable { this.dn = bpos.getDataNode(); this.nnAddr = nnAddr; this.dnConf = dn.getDnConf(); + this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval); prevBlockReportId = ThreadLocalRandom.current().nextLong(); scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval); } @@ -568,20 +568,9 @@ class BPServiceActor implements Runnable { processCommand(new DatanodeCommand[]{ cmd }); } - // // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. - // - long waitTime = scheduler.getHeartbeatWaitTime(); - synchronized(ibrManager) { - if (waitTime > 0 && !ibrManager.sendImmediately()) { - try { - ibrManager.wait(waitTime); - } catch (InterruptedException ie) { - LOG.warn("BPOfferService for " + this + " interrupted"); - } - } - } // synchronized + ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime()); } catch(RemoteException re) { String reClass = re.getClassName(); if (UnregisteredNodeException.class.getName().equals(reClass) || @@ -768,7 +757,7 @@ class BPServiceActor implements Runnable { void triggerBlockReport(BlockReportOptions options) { if (options.isIncremental()) { LOG.info(bpos.toString() + ": scheduling an incremental block report."); - ibrManager.triggerIBR(); + ibrManager.triggerIBR(true); } else { LOG.info(bpos.toString() + ": scheduling a full block report."); synchronized(ibrManager) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 5a8fa979758..3040489e377 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -299,8 +299,8 @@ class BlockReceiver implements Closeable { /** Return the datanode object. */ DataNode getDataNode() {return datanode;} - String getStorageUuid() { - return replicaInfo.getStorageUuid(); + Replica getReplica() { + return replicaInfo; } /** @@ -1428,8 +1428,8 @@ class BlockReceiver implements Closeable { datanode.data.setPinning(block); } - datanode.closeBlock( - block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid()); + datanode.closeBlock(block, null, replicaInfo.getStorageUuid(), + replicaInfo.isOnTransientStorage()); if (ClientTraceLog.isInfoEnabled() && isClient) { long offset = 0; DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index fe80efea56e..a1e72140dfd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -89,6 +89,7 @@ public class DNConf { final long heartBeatInterval; final long blockReportInterval; final long blockReportSplitThreshold; + final long ibrInterval; final long initialBlockReportDelayMs; final long cacheReportInterval; final long dfsclientSlowIoWarningThresholdMs; @@ -156,6 +157,9 @@ public class DNConf { DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); + this.ibrInterval = conf.getLong( + DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, + DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT); this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT); this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index f41dff384a1..f8430d20688 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -311,7 +311,6 @@ public class DataNode extends ReconfigurableBase volatile FsDatasetSpi data = null; private String clusterId = null; - public final static String EMPTY_DEL_HINT = ""; final AtomicInteger xmitsInProgress = new AtomicInteger(); Daemon dataXceiverServer = null; DataXceiverServer xserver = null; @@ -1088,11 +1087,12 @@ public class DataNode extends ReconfigurableBase } // calls specific to BP - public void notifyNamenodeReceivedBlock( - ExtendedBlock block, String delHint, String storageUuid) { + public void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint, + String storageUuid, boolean isOnTransientStorage) { BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); if(bpos != null) { - bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid); + bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid, + isOnTransientStorage); } else { LOG.error("Cannot find BPOfferService for reporting block received for bpid=" + block.getBlockPoolId()); @@ -2351,15 +2351,11 @@ public class DataNode extends ReconfigurableBase * @param delHint hint on which excess block to delete * @param storageUuid UUID of the storage where block is stored */ - void closeBlock(ExtendedBlock block, String delHint, String storageUuid) { + void closeBlock(ExtendedBlock block, String delHint, String storageUuid, + boolean isTransientStorage) { metrics.incrBlocksWritten(); - BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); - if(bpos != null) { - bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid); - } else { - LOG.warn("Cannot find BPOfferService for reporting block received for bpid=" - + block.getBlockPoolId()); - } + notifyNamenodeReceivedBlock(block, delHint, storageUuid, + isTransientStorage); } /** Start a single datanode daemon and wait for it to finish. @@ -2689,7 +2685,7 @@ public class DataNode extends ReconfigurableBase public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock, final long recoveryId, final long newBlockId, final long newLength) throws IOException { - final String storageID = data.updateReplicaUnderRecovery(oldBlock, + final Replica r = data.updateReplicaUnderRecovery(oldBlock, recoveryId, newBlockId, newLength); // Notify the namenode of the updated block info. This is important // for HA, since otherwise the standby node may lose track of the @@ -2698,7 +2694,9 @@ public class DataNode extends ReconfigurableBase newBlock.setGenerationStamp(recoveryId); newBlock.setBlockId(newBlockId); newBlock.setNumBytes(newLength); - notifyNamenodeReceivedBlock(newBlock, "", storageID); + final String storageID = r.getStorageUuid(); + notifyNamenodeReceivedBlock(newBlock, null, storageID, + r.isOnTransientStorage()); return storageID; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index cb5bb71d11e..c23e8a76b54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -672,7 +672,9 @@ class DataXceiver extends Receiver implements Runnable { String firstBadLink = ""; // first datanode that failed in connection setup Status mirrorInStatus = SUCCESS; final String storageUuid; + final boolean isOnTransientStorage; try { + final Replica replica; if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver @@ -682,12 +684,13 @@ class DataXceiver extends Receiver implements Runnable { stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, cachingStrategy, allowLazyPersist, pinning); - - storageUuid = blockReceiver.getStorageUuid(); + replica = blockReceiver.getReplica(); } else { - storageUuid = datanode.data.recoverClose( + replica = datanode.data.recoverClose( block, latestGenerationStamp, minBytesRcvd); } + storageUuid = replica.getStorageUuid(); + isOnTransientStorage = replica.isOnTransientStorage(); // // Connect to downstream machine, if appropriate @@ -830,7 +833,7 @@ class DataXceiver extends Receiver implements Runnable { // the block is finalized in the PacketResponder. if (isDatanode || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { - datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT, storageUuid); + datanode.closeBlock(block, null, storageUuid, isOnTransientStorage); LOG.info("Received " + block + " src: " + remoteAddress + " dest: " + localAddress + " of size " + block.getNumBytes()); } @@ -1146,8 +1149,9 @@ class DataXceiver extends Receiver implements Runnable { dataXceiverServer.balanceThrottler, null, true); // notify name node + final Replica r = blockReceiver.getReplica(); datanode.notifyNamenodeReceivedBlock( - block, delHint, blockReceiver.getStorageUuid()); + block, delHint, r.getStorageUuid(), r.isOnTransientStorage()); LOG.info("Moved " + block + " from " + peer.getRemoteAddressString() + ", delHint=" + delHint); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java index 4462f94283c..b9b348a7de2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.util.Time.monotonicNow; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -33,6 +34,8 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; @@ -42,6 +45,9 @@ import com.google.common.collect.Maps; */ @InterfaceAudience.Private class IncrementalBlockReportManager { + private static final Logger LOG = LoggerFactory.getLogger( + IncrementalBlockReportManager.class); + private static class PerStorageIBR { /** The blocks in this IBR. */ final Map blocks = Maps.newHashMap(); @@ -103,8 +109,29 @@ class IncrementalBlockReportManager { */ private volatile boolean readyToSend = false; + /** The time interval between two IBRs. */ + private final long ibrInterval; + + /** The timestamp of the last IBR. */ + private volatile long lastIBR; + + IncrementalBlockReportManager(final long ibrInterval) { + this.ibrInterval = ibrInterval; + this.lastIBR = monotonicNow() - ibrInterval; + } + boolean sendImmediately() { - return readyToSend; + return readyToSend && monotonicNow() - ibrInterval >= lastIBR; + } + + synchronized void waitTillNextIBR(long waitTime) { + if (waitTime > 0 && !sendImmediately()) { + try { + wait(ibrInterval > 0 && ibrInterval < waitTime? ibrInterval: waitTime); + } catch (InterruptedException ie) { + LOG.warn(getClass().getSimpleName() + " interrupted"); + } + } } private synchronized StorageReceivedDeletedBlocks[] generateIBRs() { @@ -144,6 +171,9 @@ class IncrementalBlockReportManager { } // Send incremental block reports to the Namenode outside the lock + if (LOG.isDebugEnabled()) { + LOG.debug("call blockReceivedAndDeleted: " + Arrays.toString(reports)); + } boolean success = false; final long startTime = monotonicNow(); try { @@ -151,7 +181,9 @@ class IncrementalBlockReportManager { success = true; } finally { metrics.addIncrementalBlockReport(monotonicNow() - startTime); - if (!success) { + if (success) { + lastIBR = startTime; + } else { // If we didn't succeed in sending the report, put all of the // blocks back onto our queue, but only in the case where we // didn't put something newer in the meantime. @@ -191,7 +223,7 @@ class IncrementalBlockReportManager { } synchronized void notifyNamenodeBlock(ReceivedDeletedBlockInfo rdbi, - DatanodeStorage storage) { + DatanodeStorage storage, boolean isOnTransientStorage) { addRDBI(rdbi, storage); final BlockStatus status = rdbi.getStatus(); @@ -200,18 +232,23 @@ class IncrementalBlockReportManager { readyToSend = true; } else if (status == BlockStatus.RECEIVED_BLOCK) { // the report is sent right away. - triggerIBR(); + triggerIBR(isOnTransientStorage); } } - synchronized void triggerIBR() { + synchronized void triggerIBR(boolean force) { readyToSend = true; - notifyAll(); + if (force) { + lastIBR = monotonicNow() - ibrInterval; + } + if (sendImmediately()) { + notifyAll(); + } } @VisibleForTesting synchronized void triggerDeletionReportForTests() { - triggerIBR(); + triggerIBR(true); while (sendImmediately()) { try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index af6a53244c1..788b75b5f5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -375,7 +375,7 @@ public interface FsDatasetSpi extends FSDatasetMBean { * @return the storage uuid of the replica. * @throws IOException */ - String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen + Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen ) throws IOException; /** @@ -525,7 +525,7 @@ public interface FsDatasetSpi extends FSDatasetMBean { * Update replica's generation stamp and length and finalize it. * @return the ID of storage that stores the block */ - String updateReplicaUnderRecovery(ExtendedBlock oldBlock, + Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, long newBlockId, long newLength) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 677008ceee2..163c8d066eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1284,7 +1284,7 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override // FsDatasetSpi - public synchronized String recoverClose(ExtendedBlock b, long newGS, + public synchronized Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { LOG.info("Recover failed close " + b); // check replica's state @@ -1295,7 +1295,7 @@ class FsDatasetImpl implements FsDatasetSpi { if (replicaInfo.getState() == ReplicaState.RBW) { finalizeReplica(b.getBlockPoolId(), replicaInfo); } - return replicaInfo.getStorageUuid(); + return replicaInfo; } /** @@ -2427,7 +2427,7 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override // FsDatasetSpi - public synchronized String updateReplicaUnderRecovery( + public synchronized Replica updateReplicaUnderRecovery( final ExtendedBlock oldBlock, final long recoveryId, final long newBlockId, @@ -2487,8 +2487,7 @@ class FsDatasetImpl implements FsDatasetSpi { //check replica files after update checkReplicaFiles(finalized); - //return storage ID - return getVolume(new ExtendedBlock(bpid, finalized)).getStorageID(); + return finalized; } private FinalizedReplica updateReplicaUnderRecovery( @@ -2871,7 +2870,8 @@ class FsDatasetImpl implements FsDatasetSpi { datanode.getShortCircuitRegistry().processBlockInvalidation( ExtendedBlockId.fromExtendedBlock(extendedBlock)); datanode.notifyNamenodeReceivedBlock( - extendedBlock, null, newReplicaInfo.getStorageUuid()); + extendedBlock, null, newReplicaInfo.getStorageUuid(), + newReplicaInfo.isOnTransientStorage()); // Remove the old replicas if (blockFile.delete() || !blockFile.exists()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java index db9505a952f..29216cf063a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.protocol; +import java.util.Arrays; + /** * Report of block received and deleted per Datanode * storage. @@ -51,4 +53,9 @@ public class StorageReceivedDeletedBlocks { this.storage = storage; this.blocks = blocks; } + + @Override + public String toString() { + return storage + Arrays.toString(blocks); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index cfa42b330ba..a2ae209adf3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -899,7 +899,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { } @Override // FsDatasetSpi - public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) + public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { final Map map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); @@ -913,7 +913,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { map.remove(b.getLocalBlock()); binfo.theBlock.setGenerationStamp(newGS); map.put(binfo.theBlock, binfo); - return binfo.getStorageUuid(); + return binfo; } @Override // FsDatasetSpi @@ -1192,12 +1192,11 @@ public class SimulatedFSDataset implements FsDatasetSpi { } @Override // FsDatasetSpi - public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, + public Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, long newBlockId, - long newlength) { - // Caller does not care about the exact Storage UUID returned. - return datanodeUuid; + long newlength) throws IOException { + return getMap(oldBlock.getBlockPoolId()).get(oldBlock.getLocalBlock()); } @Override // FsDatasetSpi diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index cb5f272b314..1421f0fda09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -193,7 +193,7 @@ public class TestBPOfferService { waitForBlockReport(mockNN2); // When we receive a block, it should report it to both NNs - bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, "", ""); + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false); ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1); assertEquals(1, ret.length); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBatchIbr.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBatchIbr.java new file mode 100644 index 00000000000..38c8a386831 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBatchIbr.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY; + +import java.io.IOException; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.MetricsAsserts; +import org.apache.hadoop.util.Time; +import org.apache.log4j.Level; +import org.junit.Assert; +import org.junit.Test; + +/** + * This test verifies that incremental block reports are sent in batch mode + * and the namenode allows closing a file with COMMITTED blocks. + */ +public class TestBatchIbr { + public static final Log LOG = LogFactory.getLog(TestBatchIbr.class); + + private static final short NUM_DATANODES = 4; + private static final int BLOCK_SIZE = 1024; + private static final int MAX_BLOCK_NUM = 8; + private static final int NUM_FILES = 1000; + private static final int NUM_THREADS = 128; + + private static final ThreadLocalBuffer IO_BUF = new ThreadLocalBuffer(); + private static final ThreadLocalBuffer VERIFY_BUF = new ThreadLocalBuffer(); + + static { + GenericTestUtils.setLogLevel( + LogFactory.getLog(IncrementalBlockReportManager.class), Level.ALL); + } + + static HdfsConfiguration newConf(long ibrInterval) throws IOException { + final HdfsConfiguration conf = new HdfsConfiguration(); + conf.setLong(DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setBoolean(ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true); + + if (ibrInterval > 0) { + conf.setLong(DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, ibrInterval); + } + return conf; + } + + static ExecutorService createExecutor() throws Exception { + final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); + final ExecutorCompletionService completion + = new ExecutorCompletionService<>(executor); + + // initialize all threads and buffers + for(int i = 0; i < NUM_THREADS; i++) { + completion.submit(new Callable() { + @Override + public Path call() throws Exception { + IO_BUF.get(); + VERIFY_BUF.get(); + return null; + } + }); + } + for(int i = 0; i < NUM_THREADS; i++) { + completion.take().get(); + } + return executor; + } + + static void runIbrTest(final long ibrInterval) throws Exception { + final ExecutorService executor = createExecutor(); + final Random ran = new Random(); + + final Configuration conf = newConf(ibrInterval); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(NUM_DATANODES).build(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + + try { + final String dirPathString = "/dir"; + final Path dir = new Path(dirPathString); + dfs.mkdirs(dir); + + // start testing + final long testStartTime = Time.monotonicNow(); + final ExecutorCompletionService createService + = new ExecutorCompletionService<>(executor); + final AtomicLong createFileTime = new AtomicLong(); + final AtomicInteger numBlockCreated = new AtomicInteger(); + + // create files + for(int i = 0; i < NUM_FILES; i++) { + createService.submit(new Callable() { + @Override + public Path call() throws Exception { + final long start = Time.monotonicNow(); + try { + final long seed = ran.nextLong(); + final int numBlocks = ran.nextInt(MAX_BLOCK_NUM) + 1; + numBlockCreated.addAndGet(numBlocks); + return createFile(dir, numBlocks, seed, dfs); + } finally { + createFileTime.addAndGet(Time.monotonicNow() - start); + } + } + }); + } + + // verify files + final ExecutorCompletionService verifyService + = new ExecutorCompletionService<>(executor); + final AtomicLong verifyFileTime = new AtomicLong(); + for(int i = 0; i < NUM_FILES; i++) { + final Path file = createService.take().get(); + verifyService.submit(new Callable() { + @Override + public Boolean call() throws Exception { + final long start = Time.monotonicNow(); + try { + return verifyFile(file, dfs); + } finally { + verifyFileTime.addAndGet(Time.monotonicNow() - start); + } + } + }); + } + for(int i = 0; i < NUM_FILES; i++) { + Assert.assertTrue(verifyService.take().get()); + } + final long testEndTime = Time.monotonicNow(); + + LOG.info("ibrInterval=" + ibrInterval + " (" + + toConfString(DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, conf) + + "), numBlockCreated=" + numBlockCreated); + LOG.info("duration=" + toSecondString(testEndTime - testStartTime) + + ", createFileTime=" + toSecondString(createFileTime.get()) + + ", verifyFileTime=" + toSecondString(verifyFileTime.get())); + LOG.info("NUM_FILES=" + NUM_FILES + + ", MAX_BLOCK_NUM=" + MAX_BLOCK_NUM + + ", BLOCK_SIZE=" + BLOCK_SIZE + + ", NUM_THREADS=" + NUM_THREADS + + ", NUM_DATANODES=" + NUM_DATANODES); + logIbrCounts(cluster.getDataNodes()); + } finally { + executor.shutdown(); + cluster.shutdown(); + } + } + + static String toConfString(String key, Configuration conf) { + return key + "=" + conf.get(key); + } + + static String toSecondString(long ms) { + return (ms/1000.0) + "s"; + } + + static void logIbrCounts(List datanodes) { + final String name = "IncrementalBlockReportsNumOps"; + for(DataNode dn : datanodes) { + final MetricsRecordBuilder m = MetricsAsserts.getMetrics( + dn.getMetrics().name()); + final long ibr = MetricsAsserts.getLongCounter(name, m); + LOG.info(dn.getDisplayName() + ": " + name + "=" + ibr); + } + + } + + static class ThreadLocalBuffer extends ThreadLocal { + @Override + protected byte[] initialValue() { + return new byte[BLOCK_SIZE]; + } + } + + static byte[] nextBytes(int blockIndex, long seed, byte[] bytes) { + byte b = (byte)(seed ^ (seed >> blockIndex)); + for(int i = 0; i < bytes.length; i++) { + bytes[i] = b++; + } + return bytes; + } + + static Path createFile(Path dir, int numBlocks, long seed, + DistributedFileSystem dfs) throws IOException { + final Path f = new Path(dir, seed + "_" + numBlocks); + final byte[] bytes = IO_BUF.get(); + + try(FSDataOutputStream out = dfs.create(f)) { + for(int i = 0; i < numBlocks; i++) { + out.write(nextBytes(i, seed, bytes)); + } + } + return f; + } + + static boolean verifyFile(Path f, DistributedFileSystem dfs) { + final long seed; + final int numBlocks; + { + final String name = f.getName(); + final int i = name.indexOf('_'); + seed = Long.parseLong(name.substring(0, i)); + numBlocks = Integer.parseInt(name.substring(i + 1)); + } + + final byte[] computed = IO_BUF.get(); + final byte[] expected = VERIFY_BUF.get(); + + try(FSDataInputStream in = dfs.open(f)) { + for(int i = 0; i < numBlocks; i++) { + in.read(computed); + nextBytes(i, seed, expected); + Assert.assertArrayEquals(expected, computed); + } + return true; + } catch(Exception e) { + LOG.error("Failed to verify file " + f); + return false; + } + } + + @Test + public void testIbr() throws Exception { + runIbrTest(0L); + runIbrTest(100L); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java index 3af959c4c9d..c21cc86783e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java @@ -138,12 +138,14 @@ public class TestDataXceiverLazyPersistHint { PeerLocality locality, NonLocalLazyPersist nonLocalLazyPersist, final ArgumentCaptor captor) throws IOException { + final BlockReceiver mockBlockReceiver = mock(BlockReceiver.class); + doReturn(mock(Replica.class)).when(mockBlockReceiver).getReplica(); + DataXceiver xceiverSpy = spy(DataXceiver.create( getMockPeer(locality), getMockDn(nonLocalLazyPersist), mock(DataXceiverServer.class))); - - doReturn(mock(BlockReceiver.class)).when(xceiverSpy).getBlockReceiver( + doReturn(mockBlockReceiver).when(xceiverSpy).getBlockReceiver( any(ExtendedBlock.class), any(StorageType.class), any(DataInputStream.class), anyString(), anyString(), any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java index 676d85580ec..228897418e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java @@ -88,7 +88,7 @@ public class TestIncrementalBlockReports { ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo( getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null); DatanodeStorage s = singletonDn.getFSDataset().getStorage(storageUuid); - actor.getIbrManager().notifyNamenodeBlock(rdbi, s); + actor.getIbrManager().notifyNamenodeBlock(rdbi, s, false); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 2c2e520131e..155202b7c58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -177,7 +177,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi { } @Override - public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) + public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlkLen) throws IOException { return null; } @@ -272,7 +272,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi { } @Override - public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, + public Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, long newBlockId, long newLength) throws IOException { return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java index b7efdf657b7..c054641611d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java @@ -328,7 +328,6 @@ public class TestInterDatanodeProtocol { try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); - String bpid = cluster.getNamesystem().getBlockPoolId(); //create a file DistributedFileSystem dfs = cluster.getFileSystem(); @@ -379,10 +378,11 @@ public class TestInterDatanodeProtocol { } //update - final String storageID = fsdataset.updateReplicaUnderRecovery( + final Replica r = fsdataset.updateReplicaUnderRecovery( new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, rri.getBlockId(), newlength); - assertTrue(storageID != null); + assertTrue(r != null); + assertTrue(r.getStorageUuid() != null); } finally { if (cluster != null) cluster.shutdown();