diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 81a0d22215d..01fd66c83c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1107,6 +1107,8 @@ public class BlockManager { invalidateBlocks.remove(node, block); } namesystem.checkSafeMode(); + LOG.info("Removed blocks associated with storage {} from DataNode {}", + storageInfo, node); } /** @@ -1827,8 +1829,8 @@ public class BlockManager { */ public boolean processReport(final DatanodeID nodeID, final DatanodeStorage storage, - final BlockListAsLongs newReport, BlockReportContext context, - boolean lastStorageInRpc) throws IOException { + final BlockListAsLongs newReport, + BlockReportContext context) throws IOException { namesystem.writeLock(); final long startTime = Time.monotonicNow(); //after acquiring write lock final long endTime; @@ -1870,29 +1872,6 @@ public class BlockManager { } storageInfo.receivedBlockReport(); - if (context != null) { - storageInfo.setLastBlockReportId(context.getReportId()); - if (lastStorageInRpc) { - int rpcsSeen = node.updateBlockReportContext(context); - if (rpcsSeen >= context.getTotalRpcs()) { - List zombies = node.removeZombieStorages(); - if (zombies.isEmpty()) { - LOG.debug("processReport 0x{}: no zombie storages found.", - Long.toHexString(context.getReportId())); - } else { - for (DatanodeStorageInfo zombie : zombies) { - removeZombieReplicas(context, zombie); - } - } - node.clearBlockReportContext(); - } else { - LOG.debug("processReport 0x{}: {} more RPCs remaining in this " + - "report.", Long.toHexString(context.getReportId()), - (context.getTotalRpcs() - rpcsSeen) - ); - } - } - } } finally { endTime = Time.monotonicNow(); namesystem.writeUnlock(); @@ -1919,32 +1898,6 @@ public class BlockManager { return !node.hasStaleStorages(); } - private void removeZombieReplicas(BlockReportContext context, - DatanodeStorageInfo zombie) { - LOG.warn("processReport 0x{}: removing zombie storage {}, which no " + - "longer exists on the DataNode.", - Long.toHexString(context.getReportId()), zombie.getStorageID()); - assert(namesystem.hasWriteLock()); - Iterator iter = zombie.getBlockIterator(); - int prevBlocks = zombie.numBlocks(); - while (iter.hasNext()) { - BlockInfoContiguous block = iter.next(); - // We assume that a block can be on only one storage in a DataNode. - // That's why we pass in the DatanodeDescriptor rather than the - // DatanodeStorageInfo. - // TODO: remove this assumption in case we want to put a block on - // more than one storage on a datanode (and because it's a difficult - // assumption to really enforce) - removeStoredBlock(block, zombie.getDatanodeDescriptor()); - invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block); - } - assert(zombie.numBlocks() == 0); - LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " + - "which no longer exists on the DataNode.", - Long.toHexString(context.getReportId()), prevBlocks, - zombie.getStorageID()); - } - /** * Rescan the list of blocks which were previously postponed. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 840b2e8a6ca..2ec56786fde 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; -import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -32,7 +31,6 @@ import java.util.Set; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -42,7 +40,6 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; -import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -67,24 +64,6 @@ public class DatanodeDescriptor extends DatanodeInfo { // If node is not decommissioning, do not use this object for anything. public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus(); - private long curBlockReportId = 0; - - private BitSet curBlockReportRpcsSeen = null; - - public int updateBlockReportContext(BlockReportContext context) { - if (curBlockReportId != context.getReportId()) { - curBlockReportId = context.getReportId(); - curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs()); - } - curBlockReportRpcsSeen.set(context.getCurRpc()); - return curBlockReportRpcsSeen.cardinality(); - } - - public void clearBlockReportContext() { - curBlockReportId = 0; - curBlockReportRpcsSeen = null; - } - /** Block and targets pair */ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -306,34 +285,6 @@ public class DatanodeDescriptor extends DatanodeInfo { } } - static final private List EMPTY_STORAGE_INFO_LIST = - ImmutableList.of(); - - List removeZombieStorages() { - List zombies = null; - synchronized (storageMap) { - Iterator> iter = - storageMap.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - DatanodeStorageInfo storageInfo = entry.getValue(); - if (storageInfo.getLastBlockReportId() != curBlockReportId) { - LOG.info(storageInfo.getStorageID() + " had lastBlockReportId 0x" + - Long.toHexString(storageInfo.getLastBlockReportId()) + - ", but curBlockReportId = 0x" + - Long.toHexString(curBlockReportId)); - iter.remove(); - if (zombies == null) { - zombies = new LinkedList(); - } - zombies.add(storageInfo); - } - storageInfo.setLastBlockReportId(0); - } - } - return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies; - } - /** * Remove block from the list of blocks belonging to the data-node. Remove * data-node from the block. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 4aef7906422..20b8bd374d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -116,9 +116,6 @@ public class DatanodeStorageInfo { private volatile BlockInfoContiguous blockList = null; private int numBlocks = 0; - // The ID of the last full block report which updated this storage. - private long lastBlockReportId = 0; - /** The number of block reports received */ private int blockReportCount = 0; @@ -183,14 +180,6 @@ public class DatanodeStorageInfo { this.blockPoolUsed = blockPoolUsed; } - long getLastBlockReportId() { - return lastBlockReportId; - } - - void setLastBlockReportId(long lastBlockReportId) { - this.lastBlockReportId = lastBlockReportId; - } - State getState() { return this.state; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 582e4920b22..ef1ea60e65e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1324,7 +1324,7 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override public Boolean call() throws IOException { return bm.processReport(nodeReg, reports[index].getStorage(), - blocks, context, (index == reports.length - 1)); + blocks, context); } }); metrics.incrStorageBlockReportOps(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 5f08886bd58..6e4b747da90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -678,12 +678,12 @@ public class TestBlockManager { reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); // send block report again, should NOT be processed reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); // re-register as if node restarted, should update existing node @@ -694,7 +694,7 @@ public class TestBlockManager { // send block report, should be processed after restart reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); // Reinitialize as registration with empty storage list pruned // node.storageMap. ds = node.getStorageInfos()[0]; @@ -723,7 +723,7 @@ public class TestBlockManager { reset(node); doReturn(1).when(node).numBlocks(); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - BlockListAsLongs.EMPTY, null, false); + BlockListAsLongs.EMPTY, null); assertEquals(1, ds.getBlockReportCount()); } @@ -796,7 +796,7 @@ public class TestBlockManager { // Make sure it's the first full report assertEquals(0, ds.getBlockReportCount()); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - builder.build(), null, false); + builder.build(), null); assertEquals(1, ds.getBlockReportCount()); // verify the storage info is correct diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index 4b97d01b73e..74d197e2627 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import com.google.common.base.Supplier; +import java.util.ArrayList; +import java.util.Collection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,11 +45,11 @@ import org.junit.Test; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNotNull; @@ -145,6 +147,8 @@ public class TestNameNodePrunesMissingStorages { public void testRemovingStorageDoesNotProduceZombies() throws Exception { Configuration conf = new HdfsConfiguration(); conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 1000); final int NUM_STORAGES_PER_DN = 2; final MiniDFSCluster cluster = new MiniDFSCluster .Builder(conf).numDataNodes(3) @@ -242,11 +246,67 @@ public class TestNameNodePrunesMissingStorages { assertEquals(NUM_STORAGES_PER_DN - 1, infos.length); return true; } - }, 10, 30000); + }, 1000, 30000); } finally { if (cluster != null) { cluster.shutdown(); } } } + + @Test(timeout=300000) + public void testNameNodePrunesUnreportedStorages() throws Exception { + Configuration conf = new HdfsConfiguration(); + // Create a cluster with one datanode with two storages + MiniDFSCluster cluster = new MiniDFSCluster + .Builder(conf).numDataNodes(1) + .storagesPerDatanode(2) + .build(); + // Create two files to ensure each storage has a block + DFSTestUtil.createFile(cluster.getFileSystem(), new Path("file1"), + 102400, 102400, 102400, (short)1, + 0x1BAD5EE); + DFSTestUtil.createFile(cluster.getFileSystem(), new Path("file2"), + 102400, 102400, 102400, (short)1, + 0x1BAD5EED); + // Get the datanode storages and data directories + DataNode dn = cluster.getDataNodes().get(0); + BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager(); + DatanodeDescriptor dnDescriptor = bm.getDatanodeManager(). + getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid()); + DatanodeStorageInfo[] dnStoragesInfosBeforeRestart = + dnDescriptor.getStorageInfos(); + Collection oldDirs = new ArrayList(dn.getConf(). + getTrimmedStringCollection(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)); + // Keep the first data directory and remove the second. + String newDirs = oldDirs.iterator().next(); + conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); + // Restart the datanode with the new conf + cluster.stopDataNode(0); + cluster.startDataNodes(conf, 1, false, null, null); + dn = cluster.getDataNodes().get(0); + cluster.waitActive(); + // Assert that the dnDescriptor has both the storages after restart + assertArrayEquals(dnStoragesInfosBeforeRestart, + dnDescriptor.getStorageInfos()); + // Assert that the removed storage is marked as FAILED + // when DN heartbeats to the NN + int numFailedStoragesWithBlocks = 0; + DatanodeStorageInfo failedStorageInfo = null; + for (DatanodeStorageInfo dnStorageInfo: dnDescriptor.getStorageInfos()) { + if (dnStorageInfo.areBlocksOnFailedStorage()) { + numFailedStoragesWithBlocks++; + failedStorageInfo = dnStorageInfo; + } + } + assertEquals(1, numFailedStoragesWithBlocks); + // Heartbeat manager removes the blocks associated with this failed storage + bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck(); + assertTrue(!failedStorageInfo.areBlocksOnFailedStorage()); + // pruneStorageMap removes the unreported storage + cluster.triggerHeartbeats(); + // Assert that the unreported storage is pruned + assertEquals(DataNode.getStorageLocations(dn.getConf()).size(), + dnDescriptor.getStorageInfos().length); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index 0a57005ab69..42c4aecc059 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -29,7 +29,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -50,7 +55,10 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; @@ -647,6 +655,48 @@ public abstract class BlockReportTestBase { DFSTestUtil.readFile(fs, filePath); } + // See HDFS-10301 + @Test(timeout = 300000) + public void testInterleavedBlockReports() + throws IOException, ExecutionException, InterruptedException { + int numConcurrentBlockReports = 3; + DataNode dn = cluster.getDataNodes().get(DN_N0); + final String poolId = cluster.getNamesystem().getBlockPoolId(); + LOG.info("Block pool id: " + poolId); + final DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); + final StorageBlockReport[] reports = + getBlockReports(dn, poolId, true, true); + + // Get the list of storage ids associated with the datanode + // before the test + BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager(); + final DatanodeDescriptor dnDescriptor = + bm.getDatanodeManager().getDatanode(dn.getDatanodeId()); + DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos(); + + // Send the block report concurrently using + // numThreads=numConcurrentBlockReports + ExecutorService executorService = + Executors.newFixedThreadPool(numConcurrentBlockReports); + List> futureList = new ArrayList<>(numConcurrentBlockReports); + for (int i = 0; i < numConcurrentBlockReports; i++) { + futureList.add(executorService.submit(new Callable() { + @Override + public Void call() throws IOException { + sendBlockReports(dnR, poolId, reports); + return null; + } + })); + } + for (Future future : futureList) { + future.get(); + } + executorService.shutdown(); + + // Verify that the storages match before and after the test + Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos()); + } + private void waitForTempReplica(Block bl, int DN_N1) throws IOException { final boolean tooLongWait = false; final int TIMEOUT = 40000;