diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index fbb61cb7d93..62a5947d3e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -171,6 +171,9 @@ Release 2.5.0 - UNRELEASED HDFS-5892. TestDeleteBlockPool fails in branch-2. (Ted Yu via wheat9) + HDFS-6289. HA failover can fail if there are pending DN messages for DNs + which no longer exist. (atm) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 0a4941210fe..3c771860bca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1007,6 +1007,8 @@ public class BlockManager { while(it.hasNext()) { removeStoredBlock(it.next(), node); } + // Remove all pending DN messages referencing this DN. + pendingDNMessages.removeAllMessagesForDatanode(node); node.resetBlocks(); invalidateBlocks.remove(node.getDatanodeUuid()); @@ -1081,7 +1083,8 @@ public class BlockManager { DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { throw new IOException("Cannot mark " + b - + " as corrupt because datanode " + dn + " does not exist"); + + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid() + + ") does not exist"); } BlockCollection bc = b.corrupted.getBlockCollection(); @@ -1984,6 +1987,9 @@ public class BlockManager { // If the block is an out-of-date generation stamp or state, // but we're the standby, we shouldn't treat it as corrupt, // but instead just queue it for later processing. + // TODO: Pretty confident this should be s/storedBlock/block below, + // since we should be postponing the info of the reported block, not + // the stored block. See HDFS-6289 for more context. queueReportedBlock(dn, storageID, storedBlock, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java index 1a9e582c5e9..0a1ba65f125 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java @@ -76,6 +76,27 @@ class PendingDataNodeMessages { } } + /** + * Remove all pending DN messages which reference the given DN. + * @param dn the datanode whose messages we should remove. + */ + void removeAllMessagesForDatanode(DatanodeDescriptor dn) { + for (Map.Entry> entry : + queueByBlockId.entrySet()) { + Queue newQueue = Lists.newLinkedList(); + Queue oldQueue = entry.getValue(); + while (!oldQueue.isEmpty()) { + ReportedBlockInfo rbi = oldQueue.remove(); + if (!rbi.getNode().equals(dn)) { + newQueue.add(rbi); + } else { + count--; + } + } + queueByBlockId.put(entry.getKey(), newQueue); + } + } + void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, ReplicaState reportedState) { block = new Block(block); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java index 9750a9c4324..adefbdb56f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java @@ -50,7 +50,7 @@ public class FsDatasetUtil { } /** Find the corresponding meta data file from a given block file */ - static File findMetaFile(final File blockFile) throws IOException { + public static File findMetaFile(final File blockFile) throws IOException { final String prefix = blockFile.getName() + "_"; final File parent = blockFile.getParentFile(); final File[] matches = parent.listFiles(new FilenameFilter() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 5d69e04e80b..1a93500623d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -83,10 +83,12 @@ import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; @@ -1707,6 +1709,14 @@ public class MiniDFSCluster { LOG.warn("Corrupting the block " + blockFile); return true; } + + public static boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk, + long newGenStamp) throws IOException { + File blockFile = getBlockFile(dnIndex, blk); + File metaFile = FsDatasetUtil.findMetaFile(blockFile); + return metaFile.renameTo(new File(DatanodeUtil.getMetaName( + blockFile.getAbsolutePath(), newGenStamp))); + } /* * Shutdown a particular datanode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java new file mode 100644 index 00000000000..37c7df9a956 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.URISyntaxException; +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.util.ThreadUtil; +import org.junit.Test; + +public class TestPendingCorruptDnMessages { + + private static final Path filePath = new Path("/foo.txt"); + + @Test + public void testChangedStorageId() throws IOException, URISyntaxException, + InterruptedException { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .build(); + + try { + cluster.transitionToActive(0); + + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + OutputStream out = fs.create(filePath); + out.write("foo bar baz".getBytes()); + out.close(); + + HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0), + cluster.getNameNode(1)); + + // Change the gen stamp of the block on datanode to go back in time (gen + // stamps start at 1000) + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath); + assertTrue(MiniDFSCluster.changeGenStampOfBlock(0, block, 900)); + + // Stop the DN so the replica with the changed gen stamp will be reported + // when this DN starts up. + DataNodeProperties dnProps = cluster.stopDataNode(0); + + // Restart the namenode so that when the DN comes up it will see an initial + // block report. + cluster.restartNameNode(1, false); + assertTrue(cluster.restartDataNode(dnProps, true)); + + // Wait until the standby NN queues up the corrupt block in the pending DN + // message queue. + while (cluster.getNamesystem(1).getBlockManager() + .getPendingDataNodeMessageCount() < 1) { + ThreadUtil.sleepAtLeastIgnoreInterrupts(1000); + } + + assertEquals(1, cluster.getNamesystem(1).getBlockManager() + .getPendingDataNodeMessageCount()); + String oldStorageId = getRegisteredDatanodeUid(cluster, 1); + + // Reformat/restart the DN. + assertTrue(wipeAndRestartDn(cluster, 0)); + + // Give the DN time to start up and register, which will cause the + // DatanodeManager to dissociate the old storage ID from the DN xfer addr. + String newStorageId = ""; + do { + ThreadUtil.sleepAtLeastIgnoreInterrupts(1000); + newStorageId = getRegisteredDatanodeUid(cluster, 1); + System.out.println("====> oldStorageId: " + oldStorageId + + " newStorageId: " + newStorageId); + } while (newStorageId.equals(oldStorageId)); + + assertEquals(0, cluster.getNamesystem(1).getBlockManager() + .getPendingDataNodeMessageCount()); + + // Now try to fail over. + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + } finally { + cluster.shutdown(); + } + } + + private static String getRegisteredDatanodeUid( + MiniDFSCluster cluster, int nnIndex) { + List registeredDatanodes = cluster.getNamesystem(nnIndex) + .getBlockManager().getDatanodeManager() + .getDatanodeListForReport(DatanodeReportType.ALL); + assertEquals(1, registeredDatanodes.size()); + return registeredDatanodes.get(0).getDatanodeUuid(); + } + + private static boolean wipeAndRestartDn(MiniDFSCluster cluster, int dnIndex) + throws IOException { + // stop the DN, reformat it, then start it again with the same xfer port. + DataNodeProperties dnProps = cluster.stopDataNode(dnIndex); + cluster.formatDataNodeDirs(); + return cluster.restartDataNode(dnProps, true); + } + +}