HDFS-6289. HA failover can fail if there are pending DN messages for DNs which no longer exist. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1591414 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2014-04-30 17:48:17 +00:00
parent 915b7bc3a6
commit fdf685576e
6 changed files with 175 additions and 2 deletions

View File

@ -171,6 +171,9 @@ Release 2.5.0 - UNRELEASED
HDFS-5892. TestDeleteBlockPool fails in branch-2. (Ted Yu via wheat9)
HDFS-6289. HA failover can fail if there are pending DN messages for DNs
which no longer exist. (atm)
Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1007,6 +1007,8 @@ public class BlockManager {
while(it.hasNext()) {
removeStoredBlock(it.next(), node);
}
// Remove all pending DN messages referencing this DN.
pendingDNMessages.removeAllMessagesForDatanode(node);
node.resetBlocks();
invalidateBlocks.remove(node.getDatanodeUuid());
@ -1081,7 +1083,8 @@ public class BlockManager {
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
throw new IOException("Cannot mark " + b
+ " as corrupt because datanode " + dn + " does not exist");
+ " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
+ ") does not exist");
}
BlockCollection bc = b.corrupted.getBlockCollection();
@ -1984,6 +1987,9 @@ public class BlockManager {
// If the block is an out-of-date generation stamp or state,
// but we're the standby, we shouldn't treat it as corrupt,
// but instead just queue it for later processing.
// TODO: Pretty confident this should be s/storedBlock/block below,
// since we should be postponing the info of the reported block, not
// the stored block. See HDFS-6289 for more context.
queueReportedBlock(dn, storageID, storedBlock, reportedState,
QUEUE_REASON_CORRUPT_STATE);
} else {

View File

@ -76,6 +76,27 @@ class PendingDataNodeMessages {
}
}
/**
* Remove all pending DN messages which reference the given DN.
* @param dn the datanode whose messages we should remove.
*/
void removeAllMessagesForDatanode(DatanodeDescriptor dn) {
for (Map.Entry<Block, Queue<ReportedBlockInfo>> entry :
queueByBlockId.entrySet()) {
Queue<ReportedBlockInfo> newQueue = Lists.newLinkedList();
Queue<ReportedBlockInfo> oldQueue = entry.getValue();
while (!oldQueue.isEmpty()) {
ReportedBlockInfo rbi = oldQueue.remove();
if (!rbi.getNode().equals(dn)) {
newQueue.add(rbi);
} else {
count--;
}
}
queueByBlockId.put(entry.getKey(), newQueue);
}
}
void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
ReplicaState reportedState) {
block = new Block(block);

View File

@ -50,7 +50,7 @@ public class FsDatasetUtil {
}
/** Find the corresponding meta data file from a given block file */
static File findMetaFile(final File blockFile) throws IOException {
public static File findMetaFile(final File blockFile) throws IOException {
final String prefix = blockFile.getName() + "_";
final File parent = blockFile.getParentFile();
final File[] matches = parent.listFiles(new FilenameFilter() {

View File

@ -83,10 +83,12 @@ import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
@ -1707,6 +1709,14 @@ public class MiniDFSCluster {
LOG.warn("Corrupting the block " + blockFile);
return true;
}
public static boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk,
long newGenStamp) throws IOException {
File blockFile = getBlockFile(dnIndex, blk);
File metaFile = FsDatasetUtil.findMetaFile(blockFile);
return metaFile.renameTo(new File(DatanodeUtil.getMetaName(
blockFile.getAbsolutePath(), newGenStamp)));
}
/*
* Shutdown a particular datanode

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.util.ThreadUtil;
import org.junit.Test;
public class TestPendingCorruptDnMessages {
private static final Path filePath = new Path("/foo.txt");
@Test
public void testChangedStorageId() throws IOException, URISyntaxException,
InterruptedException {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.build();
try {
cluster.transitionToActive(0);
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
OutputStream out = fs.create(filePath);
out.write("foo bar baz".getBytes());
out.close();
HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
cluster.getNameNode(1));
// Change the gen stamp of the block on datanode to go back in time (gen
// stamps start at 1000)
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
assertTrue(MiniDFSCluster.changeGenStampOfBlock(0, block, 900));
// Stop the DN so the replica with the changed gen stamp will be reported
// when this DN starts up.
DataNodeProperties dnProps = cluster.stopDataNode(0);
// Restart the namenode so that when the DN comes up it will see an initial
// block report.
cluster.restartNameNode(1, false);
assertTrue(cluster.restartDataNode(dnProps, true));
// Wait until the standby NN queues up the corrupt block in the pending DN
// message queue.
while (cluster.getNamesystem(1).getBlockManager()
.getPendingDataNodeMessageCount() < 1) {
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
}
assertEquals(1, cluster.getNamesystem(1).getBlockManager()
.getPendingDataNodeMessageCount());
String oldStorageId = getRegisteredDatanodeUid(cluster, 1);
// Reformat/restart the DN.
assertTrue(wipeAndRestartDn(cluster, 0));
// Give the DN time to start up and register, which will cause the
// DatanodeManager to dissociate the old storage ID from the DN xfer addr.
String newStorageId = "";
do {
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
newStorageId = getRegisteredDatanodeUid(cluster, 1);
System.out.println("====> oldStorageId: " + oldStorageId +
" newStorageId: " + newStorageId);
} while (newStorageId.equals(oldStorageId));
assertEquals(0, cluster.getNamesystem(1).getBlockManager()
.getPendingDataNodeMessageCount());
// Now try to fail over.
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
} finally {
cluster.shutdown();
}
}
private static String getRegisteredDatanodeUid(
MiniDFSCluster cluster, int nnIndex) {
List<DatanodeDescriptor> registeredDatanodes = cluster.getNamesystem(nnIndex)
.getBlockManager().getDatanodeManager()
.getDatanodeListForReport(DatanodeReportType.ALL);
assertEquals(1, registeredDatanodes.size());
return registeredDatanodes.get(0).getDatanodeUuid();
}
private static boolean wipeAndRestartDn(MiniDFSCluster cluster, int dnIndex)
throws IOException {
// stop the DN, reformat it, then start it again with the same xfer port.
DataNodeProperties dnProps = cluster.stopDataNode(dnIndex);
cluster.formatDataNodeDirs();
return cluster.restartDataNode(dnProps, true);
}
}