From cf611255d6fcd7016e0ce2a3f80ccd0d4e051d9f Mon Sep 17 00:00:00 2001 From: Eli Collins Date: Wed, 1 Feb 2012 05:16:49 +0000 Subject: [PATCH] HDFS-2742. HA: observed dataloss in replication stress test. Contributed by Todd Lipcon git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1238940 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES.HDFS-1623.txt | 2 + .../server/blockmanagement/BlockInfo.java | 2 +- .../server/blockmanagement/BlockManager.java | 172 ++++++++-- .../PendingDataNodeMessages.java | 134 ++++++++ .../hdfs/server/namenode/FSEditLogLoader.java | 55 ++-- .../hdfs/server/namenode/FSNamesystem.java | 204 +++++++----- .../server/namenode/NameNodeRpcServer.java | 29 -- .../hdfs/server/namenode/Namesystem.java | 6 + .../namenode/PendingDataNodeMessages.java | 201 ------------ .../TestPendingDataNodeMessages.java | 68 ++++ .../hdfs/server/namenode/NameNodeAdapter.java | 2 +- .../server/namenode/ha/TestDNFencing.java | 174 +++++++++- .../server/namenode/ha/TestHASafeMode.java | 305 +++++++++++++----- 13 files changed, 923 insertions(+), 431 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index d3a725ba86b..0406f192a24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -145,3 +145,5 @@ HDFS-2824. Fix failover when prior NN died just after creating an edit log segme HDFS-2853. HA: NN fails to start if the shared edits dir is marked required (atm via eli) HDFS-2845. SBN should not allow browsing of the file system via web UI. (Bikas Saha via atm) + +HDFS-2742. HA: observed dataloss in replication stress test. (todd via eli) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index 4c46d01b525..d0c7692228c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -180,7 +180,7 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement { /** * Count the number of data-nodes the block belongs to. */ - int numNodes() { + public int numNodes() { assert this.triplets != null : "BlockInfo is not initialized"; assert triplets.length % 3 == 0 : "Malformed BlockInfo"; for(int idx = getCapacity()-1; idx >= 0; idx--) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 9f2dfba55ea..ca861318820 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.IOException; import java.io.PrintWriter; +import java.io.StringWriter; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -28,6 +29,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.TreeMap; import org.apache.commons.logging.Log; @@ -49,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.Util; @@ -58,7 +61,6 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; -import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; @@ -69,7 +71,6 @@ import org.apache.hadoop.net.Node; import org.apache.hadoop.util.Daemon; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.collect.Sets; /** @@ -83,11 +84,20 @@ public class BlockManager { /** Default load factor of map */ public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f; + private static final String QUEUE_REASON_CORRUPT_STATE = + "it has the wrong state or generation stamp"; + + private static final String QUEUE_REASON_FUTURE_GENSTAMP = + "generation stamp is in the future"; + private final Namesystem namesystem; private final DatanodeManager datanodeManager; private final HeartbeatManager heartbeatManager; private final BlockTokenSecretManager blockTokenSecretManager; + + private final PendingDataNodeMessages pendingDNMessages = + new PendingDataNodeMessages(); private volatile long pendingReplicationBlocksCount = 0L; private volatile long corruptReplicaBlocksCount = 0L; @@ -124,6 +134,10 @@ public class BlockManager { public long getPostponedMisreplicatedBlocksCount() { return postponedMisreplicatedBlocksCount; } + /** Used by metrics */ + public int getPendingDataNodeMessageCount() { + return pendingDNMessages.count(); + } /**replicationRecheckInterval is how often namenode checks for new replication work*/ private final long replicationRecheckInterval; @@ -479,12 +493,24 @@ public class BlockManager { if(curBlock.isComplete()) return curBlock; BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock; - if (!force && ucBlock.numNodes() < minReplication) + int numNodes = ucBlock.numNodes(); + if (!force && numNodes < minReplication) throw new IOException("Cannot complete block: " + "block does not satisfy minimal replication requirement."); BlockInfo completeBlock = ucBlock.convertToCompleteBlock(); // replace penultimate block in file fileINode.setBlock(blkIndex, completeBlock); + + // Since safe-mode only counts complete blocks, and we now have + // one more complete block, we need to adjust the total up, and + // also count it as safe, if we have at least the minimum replica + // count. (We may not have the minimum replica count yet if this is + // a "forced" completion when a file is getting closed by an + // OP_CLOSE edit on the standby). + namesystem.adjustSafeModeBlockTotals(0, 1); + namesystem.incrementSafeBlockCount( + Math.min(numNodes, minReplication)); + // replace block in the blocksMap return blocksMap.replaceBlock(completeBlock); } @@ -547,6 +573,14 @@ public class BlockManager { String datanodeId = dd.getStorageID(); invalidateBlocks.remove(datanodeId, oldBlock); } + + // Adjust safe-mode totals, since under-construction blocks don't + // count in safe-mode. + namesystem.adjustSafeModeBlockTotals( + // decrement safe if we had enough + targets.length >= minReplication ? -1 : 0, + // always decrement total blocks + -1); final long fileLength = fileINode.computeContentSummary().getLength(); final long pos = fileLength - ucBlock.getNumBytes(); @@ -1483,9 +1517,19 @@ public class BlockManager { assert (node.numBlocks() == 0); BlockReportIterator itBR = report.getBlockReportIterator(); + boolean isStandby = namesystem.isInStandbyState(); + while(itBR.hasNext()) { Block iblk = itBR.next(); ReplicaState reportedState = itBR.getCurrentReplicaState(); + + if (isStandby && + namesystem.isGenStampInFuture(iblk.getGenerationStamp())) { + queueReportedBlock(node, iblk, reportedState, + QUEUE_REASON_FUTURE_GENSTAMP); + continue; + } + BlockInfo storedBlock = blocksMap.getStoredBlock(iblk); // If block does not belong to any file, we are done. if (storedBlock == null) continue; @@ -1493,7 +1537,14 @@ public class BlockManager { // If block is corrupt, mark it and continue to next block. BlockUCState ucState = storedBlock.getBlockUCState(); if (isReplicaCorrupt(iblk, reportedState, storedBlock, ucState, node)) { - markBlockAsCorrupt(storedBlock, node); + if (namesystem.isInStandbyState()) { + // In the Standby, we may receive a block report for a file that we + // just have an out-of-date gen-stamp or state for, for example. + queueReportedBlock(node, iblk, reportedState, + QUEUE_REASON_CORRUPT_STATE); + } else { + markBlockAsCorrupt(storedBlock, node); + } continue; } @@ -1576,7 +1627,8 @@ public class BlockManager { * @param toCorrupt replicas with unexpected length or generation stamp; * add to corrupt replicas * @param toUC replicas of blocks currently under construction - * @return + * @return the up-to-date stored block, if it should be kept. + * Otherwise, null. */ private BlockInfo processReportedBlock(final DatanodeDescriptor dn, final Block block, final ReplicaState reportedState, @@ -1591,6 +1643,13 @@ public class BlockManager { + " replicaState = " + reportedState); } + if (namesystem.isInStandbyState() && + namesystem.isGenStampInFuture(block.getGenerationStamp())) { + queueReportedBlock(dn, block, reportedState, + QUEUE_REASON_FUTURE_GENSTAMP); + return null; + } + // find block by blockId BlockInfo storedBlock = blocksMap.getStoredBlock(block); if(storedBlock == null) { @@ -1615,7 +1674,16 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block } if (isReplicaCorrupt(block, reportedState, storedBlock, ucState, dn)) { - toCorrupt.add(storedBlock); + if (namesystem.isInStandbyState()) { + // If the block is an out-of-date generation stamp or state, + // but we're the standby, we shouldn't treat it as corrupt, + // but instead just queue it for later processing. + queueReportedBlock(dn, storedBlock, reportedState, + QUEUE_REASON_CORRUPT_STATE); + + } else { + toCorrupt.add(storedBlock); + } return storedBlock; } @@ -1633,6 +1701,68 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block return storedBlock; } + /** + * Queue the given reported block for later processing in the + * standby node. {@see PendingDataNodeMessages}. + * @param reason a textual reason to report in the debug logs + */ + private void queueReportedBlock(DatanodeDescriptor dn, Block block, + ReplicaState reportedState, String reason) { + assert namesystem.isInStandbyState(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Queueing reported block " + block + + " in state " + reportedState + + " from datanode " + dn + " for later processing " + + "because " + reason + "."); + } + pendingDNMessages.enqueueReportedBlock(dn, block, reportedState); + } + + /** + * Try to process any messages that were previously queued for the given + * block. This is called from FSEditLogLoader whenever a block's state + * in the namespace has changed or a new block has been created. + */ + public void processQueuedMessagesForBlock(Block b) throws IOException { + Queue queue = pendingDNMessages.takeBlockQueue(b); + if (queue == null) { + // Nothing to re-process + return; + } + processQueuedMessages(queue); + } + + private void processQueuedMessages(Iterable rbis) + throws IOException { + for (ReportedBlockInfo rbi : rbis) { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing previouly queued message " + rbi); + } + processAndHandleReportedBlock( + rbi.getNode(), rbi.getBlock(), rbi.getReportedState(), null); + } + } + + /** + * Process any remaining queued datanode messages after entering + * active state. At this point they will not be re-queued since + * we are the definitive master node and thus should be up-to-date + * with the namespace information. + */ + public void processAllPendingDNMessages() throws IOException { + assert !namesystem.isInStandbyState() : + "processAllPendingDNMessages() should be called after exiting " + + "standby state!"; + int count = pendingDNMessages.count(); + if (count > 0) { + LOG.info("Processing " + count + " messages from DataNodes " + + "that were previously queued during standby state."); + } + processQueuedMessages(pendingDNMessages.takeAll()); + assert pendingDNMessages.count() == 0; + } + /* * The next two methods test the various cases under which we must conclude * the replica is corrupt, or under construction. These are laid out @@ -1742,13 +1872,15 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED - && numCurrentReplica >= minReplication) + && numCurrentReplica >= minReplication) { storedBlock = completeBlock(storedBlock.getINode(), storedBlock, false); - - // check whether safe replication is reached for the block - // only complete blocks are counted towards that - if(storedBlock.isComplete()) + } else if (storedBlock.isComplete()) { + // check whether safe replication is reached for the block + // only complete blocks are counted towards that. + // In the case that the block just became complete above, completeBlock() + // handles the safe block count maintenance. namesystem.incrementSafeBlockCount(numCurrentReplica); + } } /** @@ -1807,15 +1939,17 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block + pendingReplications.getNumReplicas(storedBlock); if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && - numLiveReplicas >= minReplication) + numLiveReplicas >= minReplication) { storedBlock = completeBlock(fileINode, storedBlock, false); - - // check whether safe replication is reached for the block - // only complete blocks are counted towards that - // Is no-op if not in safe mode. - if(storedBlock.isComplete()) + } else if (storedBlock.isComplete()) { + // check whether safe replication is reached for the block + // only complete blocks are counted towards that + // Is no-op if not in safe mode. + // In the case that the block just became complete above, completeBlock() + // handles the safe block count maintenance. namesystem.incrementSafeBlockCount(numCurrentReplica); - + } + // if file is under construction, then done for now if (fileINode.isUnderConstruction()) { return storedBlock; @@ -2514,7 +2648,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block } public int getActiveBlockCount() { - return blocksMap.size() - (int)invalidateBlocks.numBlocks(); + return blocksMap.size(); } public DatanodeDescriptor[] getNodes(BlockInfo block) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java new file mode 100644 index 00000000000..b7da1160484 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.util.List; +import java.util.Map; +import java.util.Queue; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * In the Standby Node, we can receive messages about blocks + * before they are actually available in the namespace, or while + * they have an outdated state in the namespace. In those cases, + * we queue those block-related messages in this structure. + * */ +class PendingDataNodeMessages { + + Map> queueByBlockId = + Maps.newHashMap(); + private int count = 0; + + + static class ReportedBlockInfo { + private final Block block; + private final DatanodeDescriptor dn; + private final ReplicaState reportedState; + + ReportedBlockInfo(DatanodeDescriptor dn, Block block, + ReplicaState reportedState) { + this.dn = dn; + this.block = block; + this.reportedState = reportedState; + } + + Block getBlock() { + return block; + } + + DatanodeDescriptor getNode() { + return dn; + } + + ReplicaState getReportedState() { + return reportedState; + } + + @Override + public String toString() { + return "ReportedBlockInfo [block=" + block + ", dn=" + dn + + ", reportedState=" + reportedState + "]"; + } + } + + void enqueueReportedBlock(DatanodeDescriptor dn, Block block, + ReplicaState reportedState) { + block = new Block(block); + getBlockQueue(block).add( + new ReportedBlockInfo(dn, block, reportedState)); + count++; + } + + /** + * @return any messages that were previously queued for the given block, + * or null if no messages were queued. + */ + Queue takeBlockQueue(Block block) { + Queue queue = queueByBlockId.remove(block); + if (queue != null) { + count -= queue.size(); + } + return queue; + } + + + private Queue getBlockQueue(Block block) { + Queue queue = queueByBlockId.get(block); + if (queue == null) { + queue = Lists.newLinkedList(); + queueByBlockId.put(block, queue); + } + return queue; + } + + public int count() { + return count ; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (Map.Entry> entry : + queueByBlockId.entrySet()) { + sb.append("Block " + entry.getKey() + ":\n"); + for (ReportedBlockInfo rbi : entry.getValue()) { + sb.append(" ").append(rbi).append("\n"); + } + } + return sb.toString(); + } + + public Iterable takeAll() { + List rbis = Lists.newArrayListWithCapacity( + count); + for (Queue q : queueByBlockId.values()) { + rbis.addAll(q); + } + queueByBlockId.clear(); + count = 0; + return rbis; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 8c664d0695e..d51752f5a31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -66,7 +66,6 @@ import com.google.common.base.Joiner; @InterfaceStability.Evolving public class FSEditLogLoader { private final FSNamesystem fsNamesys; - private long maxGenStamp = 0; public FSEditLogLoader(FSNamesystem fsNamesys) { this.fsNamesys = fsNamesys; @@ -91,15 +90,6 @@ public class FSEditLogLoader { + " of size " + edits.length() + " edits # " + numEdits + " loaded in " + (now()-startTime)/1000 + " seconds."); } finally { - fsNamesys.setBlockTotal(); - - // Delay the notification of genstamp updates until after - // setBlockTotal() above. Otherwise, we will mark blocks - // as "safe" before they've been incorporated in the expected - // totalBlocks and threshold for SafeMode -- triggering an - // assertion failure and/or exiting safemode too early! - fsNamesys.notifyGenStampUpdate(maxGenStamp); - edits.close(); fsNamesys.writeUnlock(); } @@ -183,6 +173,12 @@ public class FSEditLogLoader { switch (op.opCode) { case OP_ADD: { AddCloseOp addCloseOp = (AddCloseOp)op; + if (FSNamesystem.LOG.isDebugEnabled()) { + FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path + + " numblocks : " + addCloseOp.blocks.length + + " clientHolder " + addCloseOp.clientName + + " clientMachine " + addCloseOp.clientMachine); + } // See if the file already exists (persistBlocks call) INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path); @@ -197,13 +193,6 @@ public class FSEditLogLoader { } long blockSize = addCloseOp.blockSize; - if (FSNamesystem.LOG.isDebugEnabled()) { - FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path + - " numblocks : " + addCloseOp.blocks.length + - " clientHolder " + addCloseOp.clientName + - " clientMachine " + addCloseOp.clientMachine); - } - // Older versions of HDFS does not store the block size in inode. // If the file has more than one block, use the size of the // first block as the blocksize. Otherwise use the default @@ -227,12 +216,18 @@ public class FSEditLogLoader { addCloseOp.atime, blockSize); fsNamesys.prepareFileForWrite(addCloseOp.path, node, - addCloseOp.clientName, addCloseOp.clientMachine, null); + addCloseOp.clientName, addCloseOp.clientMachine, null, + false); } else { // This is OP_ADD on an existing file if (!oldFile.isUnderConstruction()) { // This is a call to append() on an already-closed file. + if (FSNamesystem.LOG.isDebugEnabled()) { + FSNamesystem.LOG.debug("Reopening an already-closed file " + + "for append"); + } fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile, - addCloseOp.clientName, addCloseOp.clientMachine, null); + addCloseOp.clientName, addCloseOp.clientMachine, null, + false); oldFile = getINodeFile(fsDir, addCloseOp.path); } @@ -243,6 +238,13 @@ public class FSEditLogLoader { case OP_CLOSE: { AddCloseOp addCloseOp = (AddCloseOp)op; + if (FSNamesystem.LOG.isDebugEnabled()) { + FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path + + " numblocks : " + addCloseOp.blocks.length + + " clientHolder " + addCloseOp.clientName + + " clientMachine " + addCloseOp.clientMachine); + } + INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path); if (oldFile == null) { throw new IOException("Operation trying to close non-existent file " + @@ -478,14 +480,23 @@ public class FSEditLogLoader { } oldBlock.setNumBytes(newBlock.getNumBytes()); + boolean changeMade = + oldBlock.getGenerationStamp() != newBlock.getGenerationStamp(); oldBlock.setGenerationStamp(newBlock.getGenerationStamp()); if (oldBlock instanceof BlockInfoUnderConstruction && (!isLastBlock || addCloseOp.opCode == FSEditLogOpCodes.OP_CLOSE)) { + changeMade = true; fsNamesys.getBlockManager().forceCompleteBlock( (INodeFileUnderConstruction)file, (BlockInfoUnderConstruction)oldBlock); } + if (changeMade) { + // The state or gen-stamp of the block has changed. So, we may be + // able to process some messages from datanodes that we previously + // were unable to process. + fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock); + } } if (addCloseOp.blocks.length < oldBlocks.length) { @@ -517,13 +528,9 @@ public class FSEditLogLoader { } fsNamesys.getBlockManager().addINode(newBI, file); file.addBlock(newBI); + fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock); } } - - // Record the max genstamp seen - for (Block b : addCloseOp.blocks) { - maxGenStamp = Math.max(maxGenStamp, b.getGenerationStamp()); - } } private static void dumpOpCounts( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index aef137c3650..bede75ebde5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -154,10 +154,6 @@ import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; -import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReceivedDeleteMessage; -import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReportMessage; -import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.CommitBlockSynchronizationMessage; -import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.DataNodeMessage; import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState; import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; @@ -321,8 +317,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats, // lock to protect FSNamesystem. private ReentrantReadWriteLock fsLock; - private PendingDataNodeMessages pendingDatanodeMessages = new PendingDataNodeMessages(); - /** * Used when this NN is in standby state to read from the shared edit log. */ @@ -342,11 +336,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private boolean haEnabled; private final Configuration conf; - - PendingDataNodeMessages getPendingDataNodeMessages() { - return pendingDatanodeMessages; - } - + /** * Instantiates an FSNamesystem loaded from the image and edits * directories specified in the passed Configuration. @@ -481,6 +471,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, try { nnResourceChecker = new NameNodeResourceChecker(conf); checkAvailableResources(); + assert safeMode != null && + !safeMode.initializedReplQueues; setBlockTotal(); blockManager.activate(conf); this.nnrmthread = new Daemon(new NameNodeResourceMonitor()); @@ -531,6 +523,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, LOG.info("Reprocessing replication and invalidation queues..."); blockManager.getDatanodeManager().markAllDatanodesStale(); blockManager.clearQueues(); + blockManager.processAllPendingDNMessages(); blockManager.processMisReplicatedBlocks(); if (LOG.isDebugEnabled()) { @@ -849,8 +842,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, public boolean isRunning() { return fsRunning; } - - private boolean isInStandbyState() { + + @Override + public boolean isInStandbyState() { if (haContext == null || haContext.getState() == null) { // We're still starting up. In this case, if HA is // on for the cluster, we always start in standby. Otherwise @@ -1543,7 +1537,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, blockManager.getDatanodeManager().getDatanodeByHost(clientMachine); if (append && myFile != null) { - return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode); + return prepareFileForWrite( + src, myFile, holder, clientMachine, clientNode, true); } else { // Now we can add the name to the filesystem. This file has no // blocks associated with it. @@ -1581,12 +1576,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @param leaseHolder identifier of the lease holder on this file * @param clientMachine identifier of the client machine * @param clientNode if the client is collocated with a DN, that DN's descriptor + * @param writeToEditLog whether to persist this change to the edit log * @return the last block locations if the block is partial or null otherwise * @throws UnresolvedLinkException * @throws IOException */ public LocatedBlock prepareFileForWrite(String src, INode file, - String leaseHolder, String clientMachine, DatanodeDescriptor clientNode) + String leaseHolder, String clientMachine, DatanodeDescriptor clientNode, + boolean writeToEditLog) throws UnresolvedLinkException, IOException { INodeFile node = (INodeFile) file; INodeFileUnderConstruction cons = new INodeFileUnderConstruction( @@ -1601,6 +1598,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, clientNode); dir.replaceNode(src, node, cons); leaseManager.addLease(cons.getClientName(), src); + + if (writeToEditLog) { + getEditLog().logOpenFile(src, cons); + } return blockManager.convertLastBlockToUnderConstruction(cons); } @@ -2346,9 +2347,45 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if (blocks == null) { return; } - for(Block b : blocks) { + + // In the case that we are a Standby tailing edits from the + // active while in safe-mode, we need to track the total number + // of blocks and safe blocks in the system. + boolean trackBlockCounts = isSafeModeTrackingBlocks(); + int numRemovedComplete = 0, numRemovedSafe = 0; + + for (Block b : blocks) { + if (trackBlockCounts) { + BlockInfo bi = blockManager.getStoredBlock(b); + if (bi.isComplete()) { + numRemovedComplete++; + if (bi.numNodes() >= blockManager.minReplication) { + numRemovedSafe++; + } + } + } blockManager.removeBlock(b); } + if (trackBlockCounts) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adjusting safe-mode totals for deletion of " + src + ":" + + "decreasing safeBlocks by " + numRemovedSafe + + ", totalBlocks by " + numRemovedComplete); + } + adjustSafeModeBlockTotals(-numRemovedSafe, -numRemovedComplete); + } + } + + /** + * @see SafeModeInfo#shouldIncrementallyTrackBlocks + */ + private boolean isSafeModeTrackingBlocks() { + if (!haEnabled) { + // Never track blocks incrementally in non-HA code. + return false; + } + SafeModeInfo sm = this.safeMode; + return sm != null && sm.shouldIncrementallyTrackBlocks(); } /** @@ -2712,15 +2749,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, checkOperation(OperationCategory.WRITE); if (haContext.getState().equals(NameNode.STANDBY_STATE)) { // TODO(HA) we'll never get here, since we check for WRITE operation above! - if (isGenStampInFuture(newgenerationstamp)) { - LOG.info("Required GS=" + newgenerationstamp - + ", Queuing commitBlockSynchronization message"); - getPendingDataNodeMessages().queueMessage( - new PendingDataNodeMessages.CommitBlockSynchronizationMessage( - lastblock, newgenerationstamp, newlength, closeFile, deleteblock, - newtargets, newgenerationstamp)); - return; - } + // Need to implement tests, etc, for this - block recovery spanning + // failover. } if (isInSafeMode()) { @@ -3264,6 +3294,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, boolean initializedReplQueues = false; /** Was safemode entered automatically because available resources were low. */ private boolean resourcesLow = false; + /** Should safemode adjust its block totals as blocks come in */ + private boolean shouldIncrementallyTrackBlocks = false; /** * Creates SafeModeInfo when the name node enters @@ -3291,6 +3323,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats, this.blockSafe = 0; } + /** + * In the HA case, the StandbyNode can be in safemode while the namespace + * is modified by the edit log tailer. In this case, the number of total + * blocks changes as edits are processed (eg blocks are added and deleted). + * However, we don't want to do the incremental tracking during the + * startup-time loading process -- only once the initial total has been + * set after the image has been loaded. + */ + private boolean shouldIncrementallyTrackBlocks() { + return shouldIncrementallyTrackBlocks; + } + /** * Creates SafeModeInfo when safe mode is entered manually, or because * available resources are low. @@ -3476,6 +3520,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats, this.blockThreshold = (int) (blockTotal * threshold); this.blockReplQueueThreshold = (int) (blockTotal * replQueueThreshold); + if (haEnabled) { + // After we initialize the block count, any further namespace + // modifications done while in safe mode need to keep track + // of the number of total blocks in the system. + this.shouldIncrementallyTrackBlocks = true; + } + checkMode(); } @@ -3485,9 +3536,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @param replication current replication */ private synchronized void incrementSafeBlockCount(short replication) { - if (replication == safeReplication) + if (replication == safeReplication) { this.blockSafe++; - checkMode(); + checkMode(); + } } /** @@ -3496,9 +3548,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @param replication current replication */ private synchronized void decrementSafeBlockCount(short replication) { - if (replication == safeReplication-1) + if (replication == safeReplication-1) { this.blockSafe--; - checkMode(); + assert blockSafe >= 0 || isManual(); + checkMode(); + } } /** @@ -3636,6 +3690,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats, + "BlockManager data: active=" + activeBlocks); } } + + private void adjustBlockTotals(int deltaSafe, int deltaTotal) { + if (!shouldIncrementallyTrackBlocks) { + return; + } + assert haEnabled; + + if (LOG.isDebugEnabled()) { + LOG.debug("Adjusting block totals from " + + blockSafe + "/" + blockTotal + " to " + + (blockSafe + deltaSafe) + "/" + (blockTotal + deltaTotal)); + } + assert blockSafe + deltaSafe >= 0 : "Can't reduce blockSafe " + + blockSafe + " by " + deltaSafe + ": would be negative"; + assert blockTotal + deltaTotal >= 0 : "Can't reduce blockTotal " + + blockTotal + " by " + deltaTotal + ": would be negative"; + + blockSafe += deltaSafe; + setBlockTotal(blockTotal + deltaTotal); + } } /** @@ -3741,7 +3815,24 @@ public class FSNamesystem implements Namesystem, FSClusterStats, SafeModeInfo safeMode = this.safeMode; if (safeMode == null) // mostly true return; - safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas()); + BlockInfo storedBlock = blockManager.getStoredBlock(b); + if (storedBlock.isComplete()) { + safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas()); + } + } + + /** + * Adjust the total number of blocks safe and expected during safe mode. + * If safe mode is not currently on, this is a no-op. + * @param deltaSafe the change in number of safe blocks + * @param deltaTotal the change i nnumber of total blocks expected + */ + public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal) { + // safeMode is volatile, and may be set to null at any time + SafeModeInfo safeMode = this.safeMode; + if (safeMode == null) + return; + safeMode.adjustBlockTotals(deltaSafe, deltaTotal); } /** @@ -4065,6 +4156,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return blockManager.getPostponedMisreplicatedBlocksCount(); } + @Metric + public int getPendingDataNodeMessageCount() { + return blockManager.getPendingDataNodeMessageCount(); + } + @Metric public int getBlockCapacity() { return blockManager.getCapacity(); @@ -4912,54 +5008,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats, public boolean isGenStampInFuture(long genStamp) { return (genStamp > getGenerationStamp()); } - - public void notifyGenStampUpdate(long gs) { - if (LOG.isDebugEnabled()) { - LOG.debug("Generation stamp " + gs + " has been reached. " + - "Processing pending messages from DataNodes..."); - } - DataNodeMessage msg = pendingDatanodeMessages.take(gs); - while (msg != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Processing previously pending message: " + msg); - } - try { - switch (msg.getType()) { - case BLOCK_RECEIVED_DELETE: - BlockReceivedDeleteMessage m = (BlockReceivedDeleteMessage) msg; - if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog - .debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from " - + m.getNodeReg().getName() + " " - + m.getReceivedAndDeletedBlocks().length + " blocks."); - } - this.getBlockManager().processIncrementalBlockReport(m.getNodeReg(), - m.getPoolId(), m.getReceivedAndDeletedBlocks()); - break; - case BLOCK_REPORT: - BlockReportMessage mbr = (BlockReportMessage) msg; - if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("*BLOCK* NameNode.blockReport: " - + "from " + mbr.getNodeReg().getName() + " " - + mbr.getBlockList().getNumberOfBlocks() + " blocks"); - } - this.getBlockManager().processReport(mbr.getNodeReg(), - mbr.getPoolId(), mbr.getBlockList()); - break; - case COMMIT_BLOCK_SYNCHRONIZATION: - CommitBlockSynchronizationMessage mcbm = (CommitBlockSynchronizationMessage) msg; - this.commitBlockSynchronization(mcbm.getBlock(), - mcbm.getNewgenerationstamp(), mcbm.getNewlength(), - mcbm.isCloseFile(), mcbm.isDeleteblock(), mcbm.getNewtargets()); - break; - } - } catch (IOException ex) { - LOG.warn("Could not process the message " + msg.getType(), ex); - } - msg = pendingDatanodeMessages.take(gs); - } - } - @VisibleForTesting public EditLogTailer getEditLogTailer() { return editLogTailer; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 5920762ac83..b293b5a14fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -878,16 +878,6 @@ class NameNodeRpcServer implements NamenodeProtocols { String poolId, long[] blocks) throws IOException { verifyRequest(nodeReg); BlockListAsLongs blist = new BlockListAsLongs(blocks); - if (nn.isStandbyState()) { - long maxGs = blist.getMaxGsInBlockList(); - if (namesystem.isGenStampInFuture(maxGs)) { - LOG.info("Required GS="+maxGs+", Queuing blockReport message"); - namesystem.getPendingDataNodeMessages().queueMessage( - new PendingDataNodeMessages.BlockReportMessage(nodeReg, poolId, - blist, maxGs)); - return null; - } - } if(stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*BLOCK* NameNode.blockReport: " + "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks() @@ -904,25 +894,6 @@ class NameNodeRpcServer implements NamenodeProtocols { public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException { verifyRequest(nodeReg); - if (nn.isStandbyState()) { - if (receivedAndDeletedBlocks.length > 0) { - long maxGs = receivedAndDeletedBlocks[0].getBlock() - .getGenerationStamp(); - for (ReceivedDeletedBlockInfo binfo : receivedAndDeletedBlocks) { - if (binfo.getBlock().getGenerationStamp() > maxGs) { - maxGs = binfo.getBlock().getGenerationStamp(); - } - } - if (namesystem.isGenStampInFuture(maxGs)) { - LOG.info("Required GS=" + maxGs - + ", Queuing blockReceivedAndDeleted message"); - namesystem.getPendingDataNodeMessages().queueMessage( - new PendingDataNodeMessages.BlockReceivedDeleteMessage(nodeReg, - poolId, receivedAndDeletedBlocks, maxGs)); - return; - } - } - } if(stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " +"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java index 6846e959a49..c453db561eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java @@ -32,4 +32,10 @@ public interface Namesystem extends RwLock, SafeMode { /** @return the block pool ID */ public String getBlockPoolId(); + + public boolean isInStandbyState(); + + public boolean isGenStampInFuture(long generationStamp); + + public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal); } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java deleted file mode 100644 index 04eb4b9ccc0..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import java.util.PriorityQueue; - -import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; - -public class PendingDataNodeMessages { - - PriorityQueue queue = new PriorityQueue(); - - enum MessageType { - BLOCK_RECEIVED_DELETE, - BLOCK_REPORT, - COMMIT_BLOCK_SYNCHRONIZATION - } - - static abstract class DataNodeMessage - implements Comparable { - - final MessageType type; - private final long targetGs; - - DataNodeMessage(MessageType type, long targetGenStamp) { - this.type = type; - this.targetGs = targetGenStamp; - } - - protected MessageType getType() { - return type; - } - - protected long getTargetGs() { - return targetGs; - } - - public int compareTo(DataNodeMessage other) { - if (targetGs == other.targetGs) { - return 0; - } else if (targetGs < other.targetGs) { - return -1; - } - return 1; - } - } - - static class BlockReceivedDeleteMessage extends DataNodeMessage { - final DatanodeRegistration nodeReg; - final String poolId; - final ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks; - - BlockReceivedDeleteMessage(DatanodeRegistration nodeReg, String poolId, - ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks, long targetGs) { - super(MessageType.BLOCK_RECEIVED_DELETE, targetGs); - this.nodeReg = nodeReg; - this.poolId = poolId; - this.receivedAndDeletedBlocks = receivedAndDeletedBlocks; - } - - DatanodeRegistration getNodeReg() { - return nodeReg; - } - - String getPoolId() { - return poolId; - } - - ReceivedDeletedBlockInfo[] getReceivedAndDeletedBlocks() { - return receivedAndDeletedBlocks; - } - - public String toString() { - return "BlockReceivedDeletedMessage with " + - receivedAndDeletedBlocks.length + " blocks"; - } - } - - static class CommitBlockSynchronizationMessage extends DataNodeMessage { - - private final ExtendedBlock block; - private final long newgenerationstamp; - private final long newlength; - private final boolean closeFile; - private final boolean deleteblock; - private final DatanodeID[] newtargets; - - CommitBlockSynchronizationMessage(ExtendedBlock block, - long newgenerationstamp, long newlength, boolean closeFile, - boolean deleteblock, DatanodeID[] newtargets, long targetGenStamp) { - super(MessageType.COMMIT_BLOCK_SYNCHRONIZATION, targetGenStamp); - this.block = block; - this.newgenerationstamp = newgenerationstamp; - this.newlength = newlength; - this.closeFile = closeFile; - this.deleteblock = deleteblock; - this.newtargets = newtargets; - } - - ExtendedBlock getBlock() { - return block; - } - - long getNewgenerationstamp() { - return newgenerationstamp; - } - - long getNewlength() { - return newlength; - } - - boolean isCloseFile() { - return closeFile; - } - - boolean isDeleteblock() { - return deleteblock; - } - - DatanodeID[] getNewtargets() { - return newtargets; - } - - public String toString() { - return "CommitBlockSynchronizationMessage for " + block; - } - } - - static class BlockReportMessage extends DataNodeMessage { - - private final DatanodeRegistration nodeReg; - private final String poolId; - private final BlockListAsLongs blockList; - - BlockReportMessage(DatanodeRegistration nodeReg, String poolId, - BlockListAsLongs blist, long targetGenStamp) { - super(MessageType.BLOCK_REPORT, targetGenStamp); - this.nodeReg = nodeReg; - this.poolId = poolId; - this.blockList = blist; - } - - DatanodeRegistration getNodeReg() { - return nodeReg; - } - - String getPoolId() { - return poolId; - } - - BlockListAsLongs getBlockList() { - return blockList; - } - - public String toString() { - return "BlockReport from " + nodeReg + " with " + blockList.getNumberOfBlocks() + " blocks"; - } - } - - synchronized void queueMessage(DataNodeMessage msg) { - queue.add(msg); - } - - /** - * Returns a message if contains a message less or equal to the given gs, - * otherwise returns null. - * - * @param gs - */ - synchronized DataNodeMessage take(long gs) { - DataNodeMessage m = queue.peek(); - if (m != null && m.getTargetGs() <= gs) { - return queue.remove(); - } else { - return null; - } - } - - synchronized boolean isEmpty() { - return queue.isEmpty(); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java new file mode 100644 index 00000000000..16977bb820e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import static org.junit.Assert.*; + +import java.util.Queue; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.junit.Test; + +import com.google.common.base.Joiner; + + +public class TestPendingDataNodeMessages { + PendingDataNodeMessages msgs = new PendingDataNodeMessages(); + + private final Block block1Gs1 = new Block(1, 0, 1); + private final Block block1Gs2 = new Block(1, 0, 2); + private final Block block1Gs2DifferentInstance = + new Block(1, 0, 2); + private final Block block2Gs1 = new Block(2, 0, 1); + + private final DatanodeDescriptor fakeDN = new DatanodeDescriptor( + new DatanodeID("fake")); + + @Test + public void testQueues() { + msgs.enqueueReportedBlock(fakeDN, block1Gs1, ReplicaState.FINALIZED); + msgs.enqueueReportedBlock(fakeDN, block1Gs2, ReplicaState.FINALIZED); + + assertEquals(2, msgs.count()); + + // Nothing queued yet for block 2 + assertNull(msgs.takeBlockQueue(block2Gs1)); + assertEquals(2, msgs.count()); + + Queue q = + msgs.takeBlockQueue(block1Gs2DifferentInstance); + assertEquals( + "ReportedBlockInfo [block=blk_1_1, dn=fake, reportedState=FINALIZED]," + + "ReportedBlockInfo [block=blk_1_2, dn=fake, reportedState=FINALIZED]", + Joiner.on(",").join(q)); + assertEquals(0, msgs.count()); + + // Should be null if we pull again + assertNull(msgs.takeBlockQueue(block1Gs1)); + assertEquals(0, msgs.count()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index 181de70f336..fead3b6162f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -30,8 +30,8 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp; -import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo; +import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.ipc.Server; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java index 1f43e057f7b..a7a939c0081 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java @@ -21,18 +21,18 @@ import static org.junit.Assert.*; import java.io.IOException; import java.io.PrintWriter; -import java.io.StringWriter; -import java.net.URISyntaxException; import java.util.Collection; import java.util.List; +import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSUtil; @@ -40,23 +40,29 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.namenode.FSClusterStats; +import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; -import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; import org.apache.log4j.Level; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; import com.google.common.base.Supplier; import com.google.common.collect.Lists; @@ -360,6 +366,164 @@ public class TestDNFencing { FileSystem fs2 = cluster.getFileSystem(1); DFSTestUtil.readFile(fs2, TEST_FILE_PATH); } + + /** + * Regression test for HDFS-2742. The issue in this bug was: + * - DN does a block report while file is open. This BR contains + * the block in RBW state. + * - Standby queues the RBW state in PendingDatanodeMessages + * - Standby processes edit logs during failover. Before fixing + * this bug, it was mistakenly applying the RBW reported state + * after the block had been completed, causing the block to get + * marked corrupt. Instead, we should now be applying the RBW + * message on OP_ADD, and then the FINALIZED message on OP_CLOSE. + */ + @Test + public void testBlockReportsWhileFileBeingWritten() throws Exception { + FSDataOutputStream out = fs.create(TEST_FILE_PATH); + try { + AppendTestUtil.write(out, 0, 10); + out.hflush(); + + // Block report will include the RBW replica, but will be + // queued on the StandbyNode. + cluster.triggerBlockReports(); + + } finally { + IOUtils.closeStream(out); + } + + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + + // Verify that no replicas are marked corrupt, and that the + // file is readable from the failed-over standby. + BlockManagerTestUtil.updateState(nn1.getNamesystem().getBlockManager()); + BlockManagerTestUtil.updateState(nn2.getNamesystem().getBlockManager()); + assertEquals(0, nn1.getNamesystem().getCorruptReplicaBlocks()); + assertEquals(0, nn2.getNamesystem().getCorruptReplicaBlocks()); + + DFSTestUtil.readFile(fs, TEST_FILE_PATH); + } + + /** + * Test that, when a block is re-opened for append, the related + * datanode messages are correctly queued by the SBN because + * they have future states and genstamps. + */ + @Test + public void testQueueingWithAppend() throws Exception { + int numQueued = 0; + int numDN = cluster.getDataNodes().size(); + + FSDataOutputStream out = fs.create(TEST_FILE_PATH); + try { + AppendTestUtil.write(out, 0, 10); + out.hflush(); + + // Opening the file will report RBW replicas, but will be + // queued on the StandbyNode. + numQueued += numDN; // RBW messages + } finally { + IOUtils.closeStream(out); + numQueued += numDN; // blockReceived messages + } + + cluster.triggerBlockReports(); + numQueued += numDN; + + try { + out = fs.append(TEST_FILE_PATH); + AppendTestUtil.write(out, 10, 10); + // RBW replicas once it's opened for append + numQueued += numDN; + + } finally { + IOUtils.closeStream(out); + numQueued += numDN; // blockReceived + } + + cluster.triggerBlockReports(); + numQueued += numDN; + + assertEquals(numQueued, cluster.getNameNode(1).getNamesystem(). + getPendingDataNodeMessageCount()); + + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + + // Verify that no replicas are marked corrupt, and that the + // file is readable from the failed-over standby. + BlockManagerTestUtil.updateState(nn1.getNamesystem().getBlockManager()); + BlockManagerTestUtil.updateState(nn2.getNamesystem().getBlockManager()); + assertEquals(0, nn1.getNamesystem().getCorruptReplicaBlocks()); + assertEquals(0, nn2.getNamesystem().getCorruptReplicaBlocks()); + + AppendTestUtil.check(fs, TEST_FILE_PATH, 20); + } + + /** + * Another regression test for HDFS-2742. This tests the following sequence: + * - DN does a block report while file is open. This BR contains + * the block in RBW state. + * - The block report is delayed in reaching the standby. + * - The file is closed. + * - The standby processes the OP_ADD and OP_CLOSE operations before + * the RBW block report arrives. + * - The standby should not mark the block as corrupt. + */ + @Test + public void testRBWReportArrivesAfterEdits() throws Exception { + final CountDownLatch brFinished = new CountDownLatch(1); + DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) { + @Override + protected Object passThrough(InvocationOnMock invocation) + throws Throwable { + try { + return super.passThrough(invocation); + } finally { + // inform the test that our block report went through. + brFinished.countDown(); + } + } + }; + + FSDataOutputStream out = fs.create(TEST_FILE_PATH); + try { + AppendTestUtil.write(out, 0, 10); + out.hflush(); + + DataNode dn = cluster.getDataNodes().get(0); + DatanodeProtocolClientSideTranslatorPB spy = + DataNodeAdapter.spyOnBposToNN(dn, nn2); + + Mockito.doAnswer(delayer) + .when(spy).blockReport( + Mockito.anyObject(), + Mockito.anyString(), + Mockito.anyObject()); + dn.scheduleAllBlockReport(0); + delayer.waitForCall(); + + } finally { + IOUtils.closeStream(out); + } + + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + + delayer.proceed(); + brFinished.await(); + + // Verify that no replicas are marked corrupt, and that the + // file is readable from the failed-over standby. + BlockManagerTestUtil.updateState(nn1.getNamesystem().getBlockManager()); + BlockManagerTestUtil.updateState(nn2.getNamesystem().getBlockManager()); + assertEquals(0, nn1.getNamesystem().getCorruptReplicaBlocks()); + assertEquals(0, nn2.getNamesystem().getCorruptReplicaBlocks()); + + DFSTestUtil.readFile(fs, TEST_FILE_PATH); + } /** * Print a big banner in the test log to make debug easier. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java index d423ce26617..d6babb788a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java @@ -25,10 +25,13 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -38,15 +41,19 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; import org.junit.After; import org.junit.Before; import org.junit.Test; import com.google.common.base.Supplier; +import com.google.common.collect.Lists; /** * Tests that exercise safemode in an HA cluster. @@ -60,6 +67,12 @@ public class TestHASafeMode { private MiniDFSCluster cluster; private Runtime mockRuntime = mock(Runtime.class); + static { + ((Log4JLogger)LogFactory.getLog(FSImage.class)).getLogger().setLevel(Level.ALL); + ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL); + ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); + } + @Before public void setupCluster() throws Exception { Configuration conf = new Configuration(); @@ -112,7 +125,11 @@ public class TestHASafeMode { @Test public void testEnterSafeModeInANNShouldNotThrowNPE() throws Exception { banner("Restarting active"); + DFSTestUtil + .createFile(fs, new Path("/test"), 3 * BLOCK_SIZE, (short) 3, 1L); restartActive(); + nn0.getRpcServer().transitionToActive(); + FSNamesystem namesystem = nn0.getNamesystem(); String status = namesystem.getSafemode(); assertTrue("Bad safemode status: '" + status + "'", status @@ -187,24 +204,14 @@ public class TestHASafeMode { banner("Restarting standby"); restartStandby(); - // We expect it to be stuck in safemode (not the extension) because - // the block reports are delayed (since they include blocks - // from /test2 which are too-high genstamps. - String status = nn1.getNamesystem().getSafemode(); - assertTrue("Bad safemode status: '" + status + "'", - status.startsWith( - "Safe mode is ON." + - "The reported blocks 0 needs additional 3 blocks to reach")); + // We expect it not to be stuck in safemode, since those blocks + // that are already visible to the SBN should be processed + // in the initial block reports. + assertSafeMode(nn1, 3, 3); banner("Waiting for standby to catch up to active namespace"); HATestUtil.waitForStandbyToCatchUp(nn0, nn1); - - status = nn1.getNamesystem().getSafemode(); - assertTrue("Bad safemode status: '" + status + "'", - status.startsWith( - "Safe mode is ON." + - "The reported blocks 8 has reached the threshold 0.9990 of " + - "total blocks 8. Safe mode will be turned off automatically")); + assertSafeMode(nn1, 8, 8); } /** @@ -224,12 +231,7 @@ public class TestHASafeMode { banner("Restarting standby"); restartStandby(); - String status = nn1.getNamesystem().getSafemode(); - assertTrue("Bad safemode status: '" + status + "'", - status.startsWith( - "Safe mode is ON." + - "The reported blocks 3 has reached the threshold 0.9990 of " + - "total blocks 3. Safe mode will be turned off automatically")); + assertSafeMode(nn1, 3, 3); // Create a few blocks which will send blockReceived calls to the // SBN. @@ -240,12 +242,7 @@ public class TestHASafeMode { banner("Waiting for standby to catch up to active namespace"); HATestUtil.waitForStandbyToCatchUp(nn0, nn1); - status = nn1.getNamesystem().getSafemode(); - assertTrue("Bad safemode status: '" + status + "'", - status.startsWith( - "Safe mode is ON." + - "The reported blocks 8 has reached the threshold 0.9990 of " + - "total blocks 8. Safe mode will be turned off automatically")); + assertSafeMode(nn1, 8, 8); } /** @@ -285,20 +282,11 @@ public class TestHASafeMode { banner("Restarting standby"); restartStandby(); - String status = nn1.getNamesystem().getSafemode(); - assertTrue("Bad safemode status: '" + status + "'", - status.startsWith( - "Safe mode is ON." + - "The reported blocks 0 needs additional 5 blocks to reach")); + assertSafeMode(nn1, 0, 5); banner("Waiting for standby to catch up to active namespace"); HATestUtil.waitForStandbyToCatchUp(nn0, nn1); - status = nn1.getNamesystem().getSafemode(); - assertTrue("Bad safemode status: '" + status + "'", - status.startsWith( - "Safe mode is ON." + - "The reported blocks 0 has reached the threshold 0.9990 of " + - "total blocks 0. Safe mode will be turned off automatically")); + assertSafeMode(nn1, 0, 0); } /** @@ -320,12 +308,7 @@ public class TestHASafeMode { restartStandby(); // It will initially have all of the blocks necessary. - String status = nn1.getNamesystem().getSafemode(); - assertTrue("Bad safemode status: '" + status + "'", - status.startsWith( - "Safe mode is ON." + - "The reported blocks 10 has reached the threshold 0.9990 of " + - "total blocks 10. Safe mode will be turned off automatically")); + assertSafeMode(nn1, 10, 10); // Delete those blocks while the SBN is in safe mode - this // should reduce it back below the threshold @@ -339,23 +322,123 @@ public class TestHASafeMode { HATestUtil.waitForDNDeletions(cluster); cluster.triggerDeletionReports(); - status = nn1.getNamesystem().getSafemode(); - assertTrue("Bad safemode status: '" + status + "'", - status.startsWith( - "Safe mode is ON." + - "The reported blocks 0 needs additional 10 blocks")); + assertSafeMode(nn1, 0, 10); banner("Waiting for standby to catch up to active namespace"); HATestUtil.waitForStandbyToCatchUp(nn0, nn1); - status = nn1.getNamesystem().getSafemode(); - assertTrue("Bad safemode status: '" + status + "'", - status.startsWith( - "Safe mode is ON." + - "The reported blocks 0 has reached the threshold 0.9990 of " + - "total blocks 0. Safe mode will be turned off automatically")); + assertSafeMode(nn1, 0, 0); } + /** + * Tests that the standby node properly tracks the number of total + * and safe blocks while it is in safe mode. Since safe-mode only + * counts completed blocks, append needs to decrement the total + * number of blocks and then re-increment when the file is closed + * again. + */ + @Test + public void testAppendWhileInSafeMode() throws Exception { + banner("Starting with NN0 active and NN1 standby, creating some blocks"); + // Make 4.5 blocks so that append() will re-open an existing block + // instead of just adding a new one + DFSTestUtil.createFile(fs, new Path("/test"), + 4*BLOCK_SIZE + BLOCK_SIZE/2, (short) 3, 1L); + + // Roll edit log so that, when the SBN restarts, it will load + // the namespace during startup. + nn0.getRpcServer().rollEditLog(); + + banner("Restarting standby"); + restartStandby(); + + // It will initially have all of the blocks necessary. + assertSafeMode(nn1, 5, 5); + + // Append to a block while SBN is in safe mode. This should + // not affect safemode initially, since the DN message + // will get queued. + FSDataOutputStream stm = fs.append(new Path("/test")); + try { + assertSafeMode(nn1, 5, 5); + + // if we roll edits now, the SBN should see that it's under construction + // and change its total count and safe count down by one, since UC + // blocks are not counted by safe mode. + HATestUtil.waitForStandbyToCatchUp(nn0, nn1); + assertSafeMode(nn1, 4, 4); + } finally { + IOUtils.closeStream(stm); + } + + // Delete those blocks while the SBN is in safe mode - this + // should reduce it back below the threshold + banner("Removing the blocks without rolling the edit log"); + fs.delete(new Path("/test"), true); + BlockManagerTestUtil.computeAllPendingWork( + nn0.getNamesystem().getBlockManager()); + + banner("Triggering deletions on DNs and Deletion Reports"); + cluster.triggerHeartbeats(); + HATestUtil.waitForDNDeletions(cluster); + cluster.triggerDeletionReports(); + + assertSafeMode(nn1, 0, 4); + + banner("Waiting for standby to catch up to active namespace"); + HATestUtil.waitForStandbyToCatchUp(nn0, nn1); + + assertSafeMode(nn1, 0, 0); + } + + /** + * Regression test for a bug experienced while developing + * HDFS-2742. The scenario here is: + * - image contains some blocks + * - edits log contains at least one block addition, followed + * by deletion of more blocks than were added. + * - When node starts up, some incorrect accounting of block + * totals caused an assertion failure. + */ + @Test + public void testBlocksDeletedInEditLog() throws Exception { + banner("Starting with NN0 active and NN1 standby, creating some blocks"); + // Make 4 blocks persisted in the image. + DFSTestUtil.createFile(fs, new Path("/test"), + 4*BLOCK_SIZE, (short) 3, 1L); + NameNodeAdapter.enterSafeMode(nn0, false); + NameNodeAdapter.saveNamespace(nn0); + NameNodeAdapter.leaveSafeMode(nn0, false); + + // OP_ADD for 2 blocks + DFSTestUtil.createFile(fs, new Path("/test2"), + 2*BLOCK_SIZE, (short) 3, 1L); + + // OP_DELETE for 4 blocks + fs.delete(new Path("/test"), true); + + restartActive(); + } + + private void assertSafeMode(NameNode nn, int safe, int total) { + String status = nn1.getNamesystem().getSafemode(); + if (safe == total) { + assertTrue("Bad safemode status: '" + status + "'", + status.startsWith( + "Safe mode is ON." + + "The reported blocks " + safe + " has reached the threshold " + + "0.9990 of total blocks " + total + ". Safe mode will be " + + "turned off automatically")); + } else { + int additional = total - safe; + assertTrue("Bad safemode status: '" + status + "'", + status.startsWith( + "Safe mode is ON." + + "The reported blocks " + safe + " needs additional " + + additional + " blocks")); + } + } + /** * Set up a namesystem with several edits, both deletions and * additions, and failover to a new NN while that NN is in @@ -378,26 +461,107 @@ public class TestHASafeMode { banner("Restarting standby"); restartStandby(); - // We expect it to be stuck in safemode (not the extension) because - // the block reports are delayed (since they include blocks - // from /test2 which are too-high genstamps. - String status = nn1.getNamesystem().getSafemode(); - assertTrue("Bad safemode status: '" + status + "'", - status.startsWith( - "Safe mode is ON." + - "The reported blocks 0 needs additional 3 blocks to reach")); - + // We expect it to be on its way out of safemode, since all of the blocks + // from the edit log have been reported. + assertSafeMode(nn1, 3, 3); + // Initiate a failover into it while it's in safemode banner("Initiating a failover into NN1 in safemode"); NameNodeAdapter.abortEditLogs(nn0); cluster.transitionToActive(1); - status = nn1.getNamesystem().getSafemode(); + assertSafeMode(nn1, 5, 5); + } + + /** + * Similar to {@link #testBlocksRemovedWhileInSafeMode()} except that + * the OP_DELETE edits arrive at the SBN before the block deletion reports. + * The tracking of safe blocks needs to properly account for the removal + * of the blocks as well as the safe count. This is a regression test for + * HDFS-2742. + */ + @Test + public void testBlocksRemovedWhileInSafeModeEditsArriveFirst() throws Exception { + banner("Starting with NN0 active and NN1 standby, creating some blocks"); + DFSTestUtil.createFile(fs, new Path("/test"), 10*BLOCK_SIZE, (short) 3, 1L); + + // Roll edit log so that, when the SBN restarts, it will load + // the namespace during startup. + nn0.getRpcServer().rollEditLog(); + + banner("Restarting standby"); + restartStandby(); + + // It will initially have all of the blocks necessary. + String status = nn1.getNamesystem().getSafemode(); assertTrue("Bad safemode status: '" + status + "'", status.startsWith( "Safe mode is ON." + - "The reported blocks 5 has reached the threshold 0.9990 of " + - "total blocks 5. Safe mode will be turned off automatically")); + "The reported blocks 10 has reached the threshold 0.9990 of " + + "total blocks 10. Safe mode will be turned off automatically")); + + // Delete those blocks while the SBN is in safe mode. + // Immediately roll the edit log before the actual deletions are sent + // to the DNs. + banner("Removing the blocks without rolling the edit log"); + fs.delete(new Path("/test"), true); + HATestUtil.waitForStandbyToCatchUp(nn0, nn1); + + // Should see removal of the blocks as well as their contribution to safe block count. + assertSafeMode(nn1, 0, 0); + + + banner("Triggering sending deletions to DNs and Deletion Reports"); + BlockManagerTestUtil.computeAllPendingWork( + nn0.getNamesystem().getBlockManager()); + cluster.triggerHeartbeats(); + HATestUtil.waitForDNDeletions(cluster); + cluster.triggerDeletionReports(); + + // No change in assertion status here, but some of the consistency checks + // in safemode will fire here if we accidentally decrement safe block count + // below 0. + assertSafeMode(nn1, 0, 0); + } + + + /** + * Test that the number of safe blocks is accounted correctly even when + * blocks move between under-construction state and completed state. + * If a FINALIZED report arrives at the SBN before the block is marked + * COMPLETE, then when we get the OP_CLOSE we need to count it as "safe" + * at that point. This is a regression test for HDFS-2742. + */ + @Test + public void testSafeBlockTracking() throws Exception { + banner("Starting with NN0 active and NN1 standby, creating some " + + "UC blocks plus some other blocks to force safemode"); + DFSTestUtil.createFile(fs, new Path("/other-blocks"), 10*BLOCK_SIZE, (short) 3, 1L); + + List stms = Lists.newArrayList(); + try { + for (int i = 0; i < 5; i++) { + FSDataOutputStream stm = fs.create(new Path("/test-uc-" + i)); + stms.add(stm); + stm.write(1); + stm.hflush(); + } + // Roll edit log so that, when the SBN restarts, it will load + // the namespace during startup and enter safemode. + nn0.getRpcServer().rollEditLog(); + } finally { + for (FSDataOutputStream stm : stms) { + IOUtils.closeStream(stm); + } + } + + banner("Restarting SBN"); + restartStandby(); + assertSafeMode(nn1, 10, 10); + + banner("Allowing SBN to catch up"); + HATestUtil.waitForStandbyToCatchUp(nn0, nn1); + assertSafeMode(nn1, 15, 15); } /** @@ -425,12 +589,7 @@ public class TestHASafeMode { nn0.getRpcServer().rollEditLog(); restartStandby(); - String status = nn1.getNamesystem().getSafemode(); - assertTrue("Bad safemode status: '" + status + "'", - status.startsWith( - "Safe mode is ON." + - "The reported blocks 6 has reached the threshold 0.9990 of " + - "total blocks 6. Safe mode will be turned off automatically")); + assertSafeMode(nn1, 6, 6); } /**