From 391ce535a739dc92cb90017d759217265a4fd969 Mon Sep 17 00:00:00 2001 From: Vinitha Reddy Gankidi Date: Fri, 14 Oct 2016 10:37:44 -0700 Subject: [PATCH] HDFS-10301. Remove FBR tracking state to fix false zombie storage detection for interleaving block reports. Contributed by Vinitha Gankidi. --- .../server/blockmanagement/BlockManager.java | 75 +++++-------------- .../blockmanagement/DatanodeDescriptor.java | 48 ------------ .../blockmanagement/DatanodeStorageInfo.java | 11 --- .../server/namenode/NameNodeRpcServer.java | 4 +- .../blockmanagement/TestBlockManager.java | 19 ++--- .../TestNameNodePrunesMissingStorages.java | 70 +++++++++++++++-- .../server/datanode/BlockReportTestBase.java | 50 +++++++++++++ .../TestAddOverReplicatedStripedBlocks.java | 4 + 8 files changed, 147 insertions(+), 134 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7949439b431..7b13add4cb1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1347,6 +1347,8 @@ public class BlockManager implements BlockStatsMXBean { } } checkSafeMode(); + LOG.info("Removed blocks associated with storage {} from DataNode {}", + storageInfo, node); } /** @@ -2191,7 +2193,7 @@ public class BlockManager implements BlockStatsMXBean { public boolean processReport(final DatanodeID nodeID, final DatanodeStorage storage, final BlockListAsLongs newReport, - BlockReportContext context, boolean lastStorageInRpc) throws IOException { + BlockReportContext context) throws IOException { namesystem.writeLock(); final long startTime = Time.monotonicNow(); //after acquiring write lock final long endTime; @@ -2245,32 +2247,6 @@ public class BlockManager implements BlockStatsMXBean { } storageInfo.receivedBlockReport(); - if (context != null) { - storageInfo.setLastBlockReportId(context.getReportId()); - if (lastStorageInRpc) { - int rpcsSeen = node.updateBlockReportContext(context); - if (rpcsSeen >= context.getTotalRpcs()) { - long leaseId = blockReportLeaseManager.removeLease(node); - BlockManagerFaultInjector.getInstance(). - removeBlockReportLease(node, leaseId); - List zombies = node.removeZombieStorages(); - if (zombies.isEmpty()) { - LOG.debug("processReport 0x{}: no zombie storages found.", - Long.toHexString(context.getReportId())); - } else { - for (DatanodeStorageInfo zombie : zombies) { - removeZombieReplicas(context, zombie); - } - } - node.clearBlockReportContext(); - } else { - LOG.debug("processReport 0x{}: {} more RPCs remaining in this " + - "report.", Long.toHexString(context.getReportId()), - (context.getTotalRpcs() - rpcsSeen) - ); - } - } - } } finally { endTime = Time.monotonicNow(); namesystem.writeUnlock(); @@ -2295,36 +2271,25 @@ public class BlockManager implements BlockStatsMXBean { return !node.hasStaleStorages(); } - private void removeZombieReplicas(BlockReportContext context, - DatanodeStorageInfo zombie) { - LOG.warn("processReport 0x{}: removing zombie storage {}, which no " + - "longer exists on the DataNode.", - Long.toHexString(context.getReportId()), zombie.getStorageID()); - assert(namesystem.hasWriteLock()); - Iterator iter = zombie.getBlockIterator(); - int prevBlocks = zombie.numBlocks(); - while (iter.hasNext()) { - BlockInfo block = iter.next(); - // We assume that a block can be on only one storage in a DataNode. - // That's why we pass in the DatanodeDescriptor rather than the - // DatanodeStorageInfo. - // TODO: remove this assumption in case we want to put a block on - // more than one storage on a datanode (and because it's a difficult - // assumption to really enforce) - // DatanodeStorageInfo must be removed using the iterator to avoid - // ConcurrentModificationException in the underlying storage - iter.remove(); - removeStoredBlock(block, zombie.getDatanodeDescriptor()); - Block b = getBlockOnStorage(block, zombie); - if (b != null) { - invalidateBlocks.remove(zombie.getDatanodeDescriptor(), b); + public void removeBRLeaseIfNeeded(final DatanodeID nodeID, + final BlockReportContext context) throws IOException { + namesystem.writeLock(); + DatanodeDescriptor node; + try { + node = datanodeManager.getDatanode(nodeID); + if (context != null) { + if (context.getTotalRpcs() == context.getCurRpc() + 1) { + long leaseId = this.getBlockReportLeaseManager().removeLease(node); + BlockManagerFaultInjector.getInstance(). + removeBlockReportLease(node, leaseId); + } + LOG.debug("Processing RPC with index {} out of total {} RPCs in " + + "processReport 0x{}", context.getCurRpc(), + context.getTotalRpcs(), Long.toHexString(context.getReportId())); } + } finally { + namesystem.writeUnlock(); } - assert(zombie.numBlocks() == 0); - LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " + - "which no longer exists on the DataNode.", - Long.toHexString(context.getReportId()), prevBlocks, - zombie.getStorageID()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index c74d7c553fd..6d163ecafa8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; -import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -31,7 +30,6 @@ import java.util.Queue; import java.util.Set; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -43,7 +41,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; -import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -68,8 +65,6 @@ public class DatanodeDescriptor extends DatanodeInfo { LoggerFactory.getLogger(DatanodeDescriptor.class); public static final DatanodeDescriptor[] EMPTY_ARRAY = {}; private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min - private static final List EMPTY_STORAGE_INFO_LIST = - ImmutableList.of(); /** Block and targets pair */ @InterfaceAudience.Private @@ -154,10 +149,6 @@ public class DatanodeDescriptor extends DatanodeInfo { public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus(); - private long curBlockReportId = 0; - - private BitSet curBlockReportRpcsSeen = null; - private final Map storageMap = new HashMap<>(); @@ -257,20 +248,6 @@ public class DatanodeDescriptor extends DatanodeInfo { updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null); } - public int updateBlockReportContext(BlockReportContext context) { - if (curBlockReportId != context.getReportId()) { - curBlockReportId = context.getReportId(); - curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs()); - } - curBlockReportRpcsSeen.set(context.getCurRpc()); - return curBlockReportRpcsSeen.cardinality(); - } - - public void clearBlockReportContext() { - curBlockReportId = 0; - curBlockReportRpcsSeen = null; - } - public CachedBlocksList getPendingCached() { return pendingCached; } @@ -334,31 +311,6 @@ public class DatanodeDescriptor extends DatanodeInfo { } } - List removeZombieStorages() { - List zombies = null; - synchronized (storageMap) { - Iterator> iter = - storageMap.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - DatanodeStorageInfo storageInfo = entry.getValue(); - if (storageInfo.getLastBlockReportId() != curBlockReportId) { - LOG.info("{} had lastBlockReportId 0x{} but curBlockReportId = 0x{}", - storageInfo.getStorageID(), - Long.toHexString(storageInfo.getLastBlockReportId()), - Long.toHexString(curBlockReportId)); - iter.remove(); - if (zombies == null) { - zombies = new LinkedList<>(); - } - zombies.add(storageInfo); - } - storageInfo.setLastBlockReportId(0); - } - } - return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies; - } - public void resetBlocks() { setCapacity(0); setRemaining(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index d98a2c15cc2..b4c8aaae7cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -99,9 +99,6 @@ public class DatanodeStorageInfo { private final FoldedTreeSet blocks = new FoldedTreeSet<>(); - // The ID of the last full block report which updated this storage. - private long lastBlockReportId = 0; - /** The number of block reports received */ private int blockReportCount = 0; @@ -166,14 +163,6 @@ public class DatanodeStorageInfo { this.blockPoolUsed = blockPoolUsed; } - long getLastBlockReportId() { - return lastBlockReportId; - } - - void setLastBlockReportId(long lastBlockReportId) { - this.lastBlockReportId = lastBlockReportId; - } - State getState() { return this.state; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index a97a307aefd..78941631985 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1441,11 +1441,13 @@ public class NameNodeRpcServer implements NamenodeProtocols { @Override public Boolean call() throws IOException { return bm.processReport(nodeReg, reports[index].getStorage(), - blocks, context, (index == reports.length - 1)); + blocks, context); } }); metrics.incrStorageBlockReportOps(); } + bm.removeBRLeaseIfNeeded(nodeReg, context); + BlockManagerFaultInjector.getInstance(). incomingBlockReportRpc(nodeReg, context); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 942569a7dc2..2c7c72032b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -716,12 +716,12 @@ public class TestBlockManager { reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); // send block report again, should NOT be processed reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); // re-register as if node restarted, should update existing node @@ -732,7 +732,7 @@ public class TestBlockManager { // send block report, should be processed after restart reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); // Reinitialize as registration with empty storage list pruned // node.storageMap. ds = node.getStorageInfos()[0]; @@ -761,7 +761,7 @@ public class TestBlockManager { reset(node); doReturn(1).when(node).numBlocks(); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); } @@ -835,7 +835,7 @@ public class TestBlockManager { assertEquals(0, ds.getBlockReportCount()); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), builder.build(), - new BlockReportContext(1, 0, System.nanoTime(), 0, true), false); + new BlockReportContext(1, 0, System.nanoTime(), 0, true)); assertEquals(1, ds.getBlockReportCount()); // verify the storage info is correct @@ -874,8 +874,7 @@ public class TestBlockManager { assertEquals(0, ds.getBlockReportCount()); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), generateReport(blocks), - new BlockReportContext(1, 0, System.nanoTime(), 0, false), - false); + new BlockReportContext(1, 0, System.nanoTime(), 0, false)); assertEquals(1, ds.getBlockReportCount()); // verify the storage info is correct for (BlockInfo block : blocks) { @@ -885,8 +884,7 @@ public class TestBlockManager { // Send unsorted report bm.processReport(node, new DatanodeStorage(ds.getStorageID()), generateReport(blocks), - new BlockReportContext(1, 0, System.nanoTime(), 0, false), - false); + new BlockReportContext(1, 0, System.nanoTime(), 0, false)); assertEquals(2, ds.getBlockReportCount()); // verify the storage info is correct for (BlockInfo block : blocks) { @@ -897,8 +895,7 @@ public class TestBlockManager { Collections.sort(blocks); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), generateReport(blocks), - new BlockReportContext(1, 0, System.nanoTime(), 0, true), - false); + new BlockReportContext(1, 0, System.nanoTime(), 0, true)); assertEquals(3, ds.getBlockReportCount()); // verify the storage info is correct for (BlockInfo block : blocks) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index 6efc53a8746..274627f9e25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -19,24 +19,23 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import com.google.common.base.Supplier; +import java.util.ArrayList; +import java.util.Collection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -47,7 +46,6 @@ import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Test; -import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; @@ -56,13 +54,11 @@ import java.io.FileReader; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Writer; -import java.util.Arrays; -import java.util.EnumSet; import java.util.Iterator; -import java.util.List; import java.util.UUID; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNotNull; @@ -160,6 +156,8 @@ public class TestNameNodePrunesMissingStorages { public void testRemovingStorageDoesNotProduceZombies() throws Exception { Configuration conf = new HdfsConfiguration(); conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 1000); final int NUM_STORAGES_PER_DN = 2; final MiniDFSCluster cluster = new MiniDFSCluster .Builder(conf).numDataNodes(3) @@ -262,7 +260,7 @@ public class TestNameNodePrunesMissingStorages { assertEquals(NUM_STORAGES_PER_DN - 1, infos.length); return true; } - }, 10, 30000); + }, 1000, 30000); } finally { if (cluster != null) { cluster.shutdown(); @@ -371,4 +369,60 @@ public class TestNameNodePrunesMissingStorages { cluster.shutdown(); } } + + @Test(timeout=300000) + public void testNameNodePrunesUnreportedStorages() throws Exception { + Configuration conf = new HdfsConfiguration(); + // Create a cluster with one datanode with two storages + MiniDFSCluster cluster = new MiniDFSCluster + .Builder(conf).numDataNodes(1) + .storagesPerDatanode(2) + .build(); + // Create two files to ensure each storage has a block + DFSTestUtil.createFile(cluster.getFileSystem(), new Path("file1"), + 102400, 102400, 102400, (short)1, + 0x1BAD5EE); + DFSTestUtil.createFile(cluster.getFileSystem(), new Path("file2"), + 102400, 102400, 102400, (short)1, + 0x1BAD5EED); + // Get the datanode storages and data directories + DataNode dn = cluster.getDataNodes().get(0); + BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager(); + DatanodeDescriptor dnDescriptor = bm.getDatanodeManager(). + getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid()); + DatanodeStorageInfo[] dnStoragesInfosBeforeRestart = + dnDescriptor.getStorageInfos(); + Collection oldDirs = new ArrayList(dn.getConf(). + getTrimmedStringCollection(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)); + // Keep the first data directory and remove the second. + String newDirs = oldDirs.iterator().next(); + conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); + // Restart the datanode with the new conf + cluster.stopDataNode(0); + cluster.startDataNodes(conf, 1, false, null, null); + dn = cluster.getDataNodes().get(0); + cluster.waitActive(); + // Assert that the dnDescriptor has both the storages after restart + assertArrayEquals(dnStoragesInfosBeforeRestart, + dnDescriptor.getStorageInfos()); + // Assert that the removed storage is marked as FAILED + // when DN heartbeats to the NN + int numFailedStoragesWithBlocks = 0; + DatanodeStorageInfo failedStorageInfo = null; + for (DatanodeStorageInfo dnStorageInfo: dnDescriptor.getStorageInfos()) { + if (dnStorageInfo.areBlocksOnFailedStorage()) { + numFailedStoragesWithBlocks++; + failedStorageInfo = dnStorageInfo; + } + } + assertEquals(1, numFailedStoragesWithBlocks); + // Heartbeat manager removes the blocks associated with this failed storage + bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck(); + assertTrue(!failedStorageInfo.areBlocksOnFailedStorage()); + // pruneStorageMap removes the unreported storage + cluster.triggerHeartbeats(); + // Assert that the unreported storage is pruned + assertEquals(DataNode.getStorageLocations(dn.getConf()).size(), + dnDescriptor.getStorageInfos().length); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index 53b92636941..6810a0b3db3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -29,7 +29,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -50,7 +55,10 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; @@ -649,6 +657,48 @@ public abstract class BlockReportTestBase { DFSTestUtil.readFile(fs, filePath); } + // See HDFS-10301 + @Test(timeout = 300000) + public void testInterleavedBlockReports() + throws IOException, ExecutionException, InterruptedException { + int numConcurrentBlockReports = 3; + DataNode dn = cluster.getDataNodes().get(DN_N0); + final String poolId = cluster.getNamesystem().getBlockPoolId(); + LOG.info("Block pool id: " + poolId); + final DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); + final StorageBlockReport[] reports = + getBlockReports(dn, poolId, true, true); + + // Get the list of storage ids associated with the datanode + // before the test + BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager(); + final DatanodeDescriptor dnDescriptor = + bm.getDatanodeManager().getDatanode(dn.getDatanodeId()); + DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos(); + + // Send the block report concurrently using + // numThreads=numConcurrentBlockReports + ExecutorService executorService = + Executors.newFixedThreadPool(numConcurrentBlockReports); + List> futureList = new ArrayList<>(numConcurrentBlockReports); + for (int i = 0; i < numConcurrentBlockReports; i++) { + futureList.add(executorService.submit(new Callable() { + @Override + public Void call() throws IOException { + sendBlockReports(dnR, poolId, reports); + return null; + } + })); + } + for (Future future : futureList) { + future.get(); + } + executorService.shutdown(); + + // Verify that the storages match before and after the test + Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos()); + } + private void waitForTempReplica(Block bl, int DN_N1) throws IOException { final boolean tooLongWait = false; final int TIMEOUT = 40000; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java index 7b281a6ef38..13dcccfe48d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -236,6 +237,9 @@ public class TestAddOverReplicatedStripedBlocks { } } + // This test is going to be rewritten in HDFS-10854. Ignoring this test + // temporarily as it fails with the fix for HDFS-10301. + @Ignore @Test public void testProcessOverReplicatedAndMissingStripedBlock() throws Exception {