diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 39ac94ddd79..8649e489028 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -61,7 +61,6 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; @@ -1233,6 +1232,8 @@ public class BlockManager implements BlockStatsMXBean { invalidateBlocks.remove(node, block); } namesystem.checkSafeMode(); + LOG.info("Removed blocks associated with storage {} from DataNode {}", + storageInfo, node); } /** @@ -1942,8 +1943,8 @@ public class BlockManager implements BlockStatsMXBean { */ public boolean processReport(final DatanodeID nodeID, final DatanodeStorage storage, - final BlockListAsLongs newReport, BlockReportContext context, - boolean lastStorageInRpc) throws IOException { + final BlockListAsLongs newReport, + BlockReportContext context) throws IOException { namesystem.writeLock(); final long startTime = Time.monotonicNow(); //after acquiring write lock final long endTime; @@ -1997,32 +1998,6 @@ public class BlockManager implements BlockStatsMXBean { } storageInfo.receivedBlockReport(); - if (context != null) { - storageInfo.setLastBlockReportId(context.getReportId()); - if (lastStorageInRpc) { - int rpcsSeen = node.updateBlockReportContext(context); - if (rpcsSeen >= context.getTotalRpcs()) { - long leaseId = blockReportLeaseManager.removeLease(node); - BlockManagerFaultInjector.getInstance(). - removeBlockReportLease(node, leaseId); - List zombies = node.removeZombieStorages(); - if (zombies.isEmpty()) { - LOG.debug("processReport 0x{}: no zombie storages found.", - Long.toHexString(context.getReportId())); - } else { - for (DatanodeStorageInfo zombie : zombies) { - removeZombieReplicas(context, zombie); - } - } - node.clearBlockReportContext(); - } else { - LOG.debug("processReport 0x{}: {} more RPCs remaining in this " + - "report.", Long.toHexString(context.getReportId()), - (context.getTotalRpcs() - rpcsSeen) - ); - } - } - } } finally { endTime = Time.monotonicNow(); namesystem.writeUnlock(); @@ -2047,30 +2022,25 @@ public class BlockManager implements BlockStatsMXBean { return !node.hasStaleStorages(); } - private void removeZombieReplicas(BlockReportContext context, - DatanodeStorageInfo zombie) { - LOG.warn("processReport 0x{}: removing zombie storage {}, which no " + - "longer exists on the DataNode.", - Long.toHexString(context.getReportId()), zombie.getStorageID()); - assert(namesystem.hasWriteLock()); - Iterator iter = zombie.getBlockIterator(); - int prevBlocks = zombie.numBlocks(); - while (iter.hasNext()) { - BlockInfo block = iter.next(); - // We assume that a block can be on only one storage in a DataNode. - // That's why we pass in the DatanodeDescriptor rather than the - // DatanodeStorageInfo. - // TODO: remove this assumption in case we want to put a block on - // more than one storage on a datanode (and because it's a difficult - // assumption to really enforce) - removeStoredBlock(block, zombie.getDatanodeDescriptor()); - invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block); + public void removeBRLeaseIfNeeded(final DatanodeID nodeID, + final BlockReportContext context) throws IOException { + namesystem.writeLock(); + DatanodeDescriptor node; + try { + node = datanodeManager.getDatanode(nodeID); + if (context != null) { + if (context.getTotalRpcs() == context.getCurRpc() + 1) { + long leaseId = this.getBlockReportLeaseManager().removeLease(node); + BlockManagerFaultInjector.getInstance(). + removeBlockReportLease(node, leaseId); + } + LOG.debug("Processing RPC with index {} out of total {} RPCs in " + + "processReport 0x{}", context.getCurRpc(), + context.getTotalRpcs(), Long.toHexString(context.getReportId())); + } + } finally { + namesystem.writeUnlock(); } - assert(zombie.numBlocks() == 0); - LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " + - "which no longer exists on the DataNode.", - Long.toHexString(context.getReportId()), prevBlocks, - zombie.getStorageID()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 9e25f536ff9..f2910a95a7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; -import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -31,7 +30,6 @@ import java.util.Queue; import java.util.Set; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -41,7 +39,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; -import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -53,8 +50,6 @@ import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; - /** * This class extends the DatanodeInfo class with ephemeral information (eg * health, capacity, what blocks are associated with the Datanode) that is @@ -67,8 +62,6 @@ public class DatanodeDescriptor extends DatanodeInfo { LoggerFactory.getLogger(DatanodeDescriptor.class); public static final DatanodeDescriptor[] EMPTY_ARRAY = {}; private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min - private static final List EMPTY_STORAGE_INFO_LIST = - ImmutableList.of(); /** Block and targets pair */ @InterfaceAudience.Private @@ -153,10 +146,6 @@ public class DatanodeDescriptor extends DatanodeInfo { public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus(); - private long curBlockReportId = 0; - - private BitSet curBlockReportRpcsSeen = null; - private final Map storageMap = new HashMap<>(); @@ -253,20 +242,6 @@ public class DatanodeDescriptor extends DatanodeInfo { updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null); } - public int updateBlockReportContext(BlockReportContext context) { - if (curBlockReportId != context.getReportId()) { - curBlockReportId = context.getReportId(); - curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs()); - } - curBlockReportRpcsSeen.set(context.getCurRpc()); - return curBlockReportRpcsSeen.cardinality(); - } - - public void clearBlockReportContext() { - curBlockReportId = 0; - curBlockReportRpcsSeen = null; - } - public CachedBlocksList getPendingCached() { return pendingCached; } @@ -330,31 +305,6 @@ public class DatanodeDescriptor extends DatanodeInfo { } } - List removeZombieStorages() { - List zombies = null; - synchronized (storageMap) { - Iterator> iter = - storageMap.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - DatanodeStorageInfo storageInfo = entry.getValue(); - if (storageInfo.getLastBlockReportId() != curBlockReportId) { - LOG.info("{} had lastBlockReportId 0x{} but curBlockReportId = 0x{}", - storageInfo.getStorageID(), - Long.toHexString(storageInfo.getLastBlockReportId()), - Long.toHexString(curBlockReportId)); - iter.remove(); - if (zombies == null) { - zombies = new LinkedList<>(); - } - zombies.add(storageInfo); - } - storageInfo.setLastBlockReportId(0); - } - } - return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies; - } - public void resetBlocks() { setCapacity(0); setRemaining(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 319d2f375e4..862b1bfedfb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -123,9 +123,6 @@ public class DatanodeStorageInfo { private volatile BlockInfo blockList = null; private int numBlocks = 0; - // The ID of the last full block report which updated this storage. - private long lastBlockReportId = 0; - /** The number of block reports received */ private int blockReportCount = 0; @@ -190,14 +187,6 @@ public class DatanodeStorageInfo { this.blockPoolUsed = blockPoolUsed; } - long getLastBlockReportId() { - return lastBlockReportId; - } - - void setLastBlockReportId(long lastBlockReportId) { - this.lastBlockReportId = lastBlockReportId; - } - State getState() { return this.state; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 09248e5f76a..6fa50723c8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1424,11 +1424,13 @@ public class NameNodeRpcServer implements NamenodeProtocols { @Override public Boolean call() throws IOException { return bm.processReport(nodeReg, reports[index].getStorage(), - blocks, context, (index == reports.length - 1)); + blocks, context); } }); metrics.incrStorageBlockReportOps(); } + bm.removeBRLeaseIfNeeded(nodeReg, context); + BlockManagerFaultInjector.getInstance(). incomingBlockReportRpc(nodeReg, context); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index c0f910cae51..f4de9685cf9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeId; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.TestINodeFile; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; @@ -713,12 +714,12 @@ public class TestBlockManager { reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); // send block report again, should NOT be processed reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); // re-register as if node restarted, should update existing node @@ -729,7 +730,7 @@ public class TestBlockManager { // send block report, should be processed after restart reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); // Reinitialize as registration with empty storage list pruned // node.storageMap. ds = node.getStorageInfos()[0]; @@ -758,7 +759,7 @@ public class TestBlockManager { reset(node); doReturn(1).when(node).numBlocks(); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); } @@ -831,7 +832,8 @@ public class TestBlockManager { // Make sure it's the first full report assertEquals(0, ds.getBlockReportCount()); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - builder.build(), null, false); + builder.build(), + new BlockReportContext(1, 0, System.nanoTime(), 0)); assertEquals(1, ds.getBlockReportCount()); // verify the storage info is correct diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index cea686596e5..5214af3115a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -19,23 +19,22 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import com.google.common.base.Supplier; +import java.util.ArrayList; +import java.util.Collection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -46,7 +45,6 @@ import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Test; -import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; @@ -55,13 +53,11 @@ import java.io.FileReader; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Writer; -import java.util.Arrays; -import java.util.EnumSet; import java.util.Iterator; -import java.util.List; import java.util.UUID; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNotNull; @@ -159,6 +155,8 @@ public class TestNameNodePrunesMissingStorages { public void testRemovingStorageDoesNotProduceZombies() throws Exception { Configuration conf = new HdfsConfiguration(); conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 1000); final int NUM_STORAGES_PER_DN = 2; final MiniDFSCluster cluster = new MiniDFSCluster .Builder(conf).numDataNodes(3) @@ -257,7 +255,7 @@ public class TestNameNodePrunesMissingStorages { assertEquals(NUM_STORAGES_PER_DN - 1, infos.length); return true; } - }, 10, 30000); + }, 1000, 30000); } finally { if (cluster != null) { cluster.shutdown(); @@ -365,4 +363,60 @@ public class TestNameNodePrunesMissingStorages { cluster.shutdown(); } } + + @Test(timeout=300000) + public void testNameNodePrunesUnreportedStorages() throws Exception { + Configuration conf = new HdfsConfiguration(); + // Create a cluster with one datanode with two storages + MiniDFSCluster cluster = new MiniDFSCluster + .Builder(conf).numDataNodes(1) + .storagesPerDatanode(2) + .build(); + // Create two files to ensure each storage has a block + DFSTestUtil.createFile(cluster.getFileSystem(), new Path("file1"), + 102400, 102400, 102400, (short)1, + 0x1BAD5EE); + DFSTestUtil.createFile(cluster.getFileSystem(), new Path("file2"), + 102400, 102400, 102400, (short)1, + 0x1BAD5EED); + // Get the datanode storages and data directories + DataNode dn = cluster.getDataNodes().get(0); + BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager(); + DatanodeDescriptor dnDescriptor = bm.getDatanodeManager(). + getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid()); + DatanodeStorageInfo[] dnStoragesInfosBeforeRestart = + dnDescriptor.getStorageInfos(); + Collection oldDirs = new ArrayList(dn.getConf(). + getTrimmedStringCollection(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)); + // Keep the first data directory and remove the second. + String newDirs = oldDirs.iterator().next(); + conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); + // Restart the datanode with the new conf + cluster.stopDataNode(0); + cluster.startDataNodes(conf, 1, false, null, null); + dn = cluster.getDataNodes().get(0); + cluster.waitActive(); + // Assert that the dnDescriptor has both the storages after restart + assertArrayEquals(dnStoragesInfosBeforeRestart, + dnDescriptor.getStorageInfos()); + // Assert that the removed storage is marked as FAILED + // when DN heartbeats to the NN + int numFailedStoragesWithBlocks = 0; + DatanodeStorageInfo failedStorageInfo = null; + for (DatanodeStorageInfo dnStorageInfo: dnDescriptor.getStorageInfos()) { + if (dnStorageInfo.areBlocksOnFailedStorage()) { + numFailedStoragesWithBlocks++; + failedStorageInfo = dnStorageInfo; + } + } + assertEquals(1, numFailedStoragesWithBlocks); + // Heartbeat manager removes the blocks associated with this failed storage + bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck(); + assertTrue(!failedStorageInfo.areBlocksOnFailedStorage()); + // pruneStorageMap removes the unreported storage + cluster.triggerHeartbeats(); + // Assert that the unreported storage is pruned + assertEquals(DataNode.getStorageLocations(dn.getConf()).size(), + dnDescriptor.getStorageInfos().length); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index 53b92636941..6810a0b3db3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -29,7 +29,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -50,7 +55,10 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; @@ -649,6 +657,48 @@ public abstract class BlockReportTestBase { DFSTestUtil.readFile(fs, filePath); } + // See HDFS-10301 + @Test(timeout = 300000) + public void testInterleavedBlockReports() + throws IOException, ExecutionException, InterruptedException { + int numConcurrentBlockReports = 3; + DataNode dn = cluster.getDataNodes().get(DN_N0); + final String poolId = cluster.getNamesystem().getBlockPoolId(); + LOG.info("Block pool id: " + poolId); + final DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); + final StorageBlockReport[] reports = + getBlockReports(dn, poolId, true, true); + + // Get the list of storage ids associated with the datanode + // before the test + BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager(); + final DatanodeDescriptor dnDescriptor = + bm.getDatanodeManager().getDatanode(dn.getDatanodeId()); + DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos(); + + // Send the block report concurrently using + // numThreads=numConcurrentBlockReports + ExecutorService executorService = + Executors.newFixedThreadPool(numConcurrentBlockReports); + List> futureList = new ArrayList<>(numConcurrentBlockReports); + for (int i = 0; i < numConcurrentBlockReports; i++) { + futureList.add(executorService.submit(new Callable() { + @Override + public Void call() throws IOException { + sendBlockReports(dnR, poolId, reports); + return null; + } + })); + } + for (Future future : futureList) { + future.get(); + } + executorService.shutdown(); + + // Verify that the storages match before and after the test + Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos()); + } + private void waitForTempReplica(Block bl, int DN_N1) throws IOException { final boolean tooLongWait = false; final int TIMEOUT = 40000;