HDFS-2742. HA: observed dataloss in replication stress test. Contributed by Todd Lipcon

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1238940 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-02-01 05:16:49 +00:00
parent 43679fcccd
commit cf611255d6
13 changed files with 923 additions and 431 deletions

View File

@ -145,3 +145,5 @@ HDFS-2824. Fix failover when prior NN died just after creating an edit log segme
HDFS-2853. HA: NN fails to start if the shared edits dir is marked required (atm via eli)
HDFS-2845. SBN should not allow browsing of the file system via web UI. (Bikas Saha via atm)
HDFS-2742. HA: observed dataloss in replication stress test. (todd via eli)

View File

@ -180,7 +180,7 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
/**
* Count the number of data-nodes the block belongs to.
*/
int numNodes() {
public int numNodes() {
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
for(int idx = getCapacity()-1; idx >= 0; idx--) {

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -28,6 +29,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
@ -49,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Util;
@ -58,7 +61,6 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@ -69,7 +71,6 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Daemon;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
/**
@ -83,12 +84,21 @@ public class BlockManager {
/** Default load factor of map */
public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
private static final String QUEUE_REASON_CORRUPT_STATE =
"it has the wrong state or generation stamp";
private static final String QUEUE_REASON_FUTURE_GENSTAMP =
"generation stamp is in the future";
private final Namesystem namesystem;
private final DatanodeManager datanodeManager;
private final HeartbeatManager heartbeatManager;
private final BlockTokenSecretManager blockTokenSecretManager;
private final PendingDataNodeMessages pendingDNMessages =
new PendingDataNodeMessages();
private volatile long pendingReplicationBlocksCount = 0L;
private volatile long corruptReplicaBlocksCount = 0L;
private volatile long underReplicatedBlocksCount = 0L;
@ -124,6 +134,10 @@ public class BlockManager {
public long getPostponedMisreplicatedBlocksCount() {
return postponedMisreplicatedBlocksCount;
}
/** Used by metrics */
public int getPendingDataNodeMessageCount() {
return pendingDNMessages.count();
}
/**replicationRecheckInterval is how often namenode checks for new replication work*/
private final long replicationRecheckInterval;
@ -479,12 +493,24 @@ public class BlockManager {
if(curBlock.isComplete())
return curBlock;
BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
if (!force && ucBlock.numNodes() < minReplication)
int numNodes = ucBlock.numNodes();
if (!force && numNodes < minReplication)
throw new IOException("Cannot complete block: " +
"block does not satisfy minimal replication requirement.");
BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
// replace penultimate block in file
fileINode.setBlock(blkIndex, completeBlock);
// Since safe-mode only counts complete blocks, and we now have
// one more complete block, we need to adjust the total up, and
// also count it as safe, if we have at least the minimum replica
// count. (We may not have the minimum replica count yet if this is
// a "forced" completion when a file is getting closed by an
// OP_CLOSE edit on the standby).
namesystem.adjustSafeModeBlockTotals(0, 1);
namesystem.incrementSafeBlockCount(
Math.min(numNodes, minReplication));
// replace block in the blocksMap
return blocksMap.replaceBlock(completeBlock);
}
@ -548,6 +574,14 @@ public class BlockManager {
invalidateBlocks.remove(datanodeId, oldBlock);
}
// Adjust safe-mode totals, since under-construction blocks don't
// count in safe-mode.
namesystem.adjustSafeModeBlockTotals(
// decrement safe if we had enough
targets.length >= minReplication ? -1 : 0,
// always decrement total blocks
-1);
final long fileLength = fileINode.computeContentSummary().getLength();
final long pos = fileLength - ucBlock.getNumBytes();
return createLocatedBlock(ucBlock, pos, AccessMode.WRITE);
@ -1483,9 +1517,19 @@ public class BlockManager {
assert (node.numBlocks() == 0);
BlockReportIterator itBR = report.getBlockReportIterator();
boolean isStandby = namesystem.isInStandbyState();
while(itBR.hasNext()) {
Block iblk = itBR.next();
ReplicaState reportedState = itBR.getCurrentReplicaState();
if (isStandby &&
namesystem.isGenStampInFuture(iblk.getGenerationStamp())) {
queueReportedBlock(node, iblk, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
continue;
}
BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
// If block does not belong to any file, we are done.
if (storedBlock == null) continue;
@ -1493,7 +1537,14 @@ public class BlockManager {
// If block is corrupt, mark it and continue to next block.
BlockUCState ucState = storedBlock.getBlockUCState();
if (isReplicaCorrupt(iblk, reportedState, storedBlock, ucState, node)) {
if (namesystem.isInStandbyState()) {
// In the Standby, we may receive a block report for a file that we
// just have an out-of-date gen-stamp or state for, for example.
queueReportedBlock(node, iblk, reportedState,
QUEUE_REASON_CORRUPT_STATE);
} else {
markBlockAsCorrupt(storedBlock, node);
}
continue;
}
@ -1576,7 +1627,8 @@ public class BlockManager {
* @param toCorrupt replicas with unexpected length or generation stamp;
* add to corrupt replicas
* @param toUC replicas of blocks currently under construction
* @return
* @return the up-to-date stored block, if it should be kept.
* Otherwise, null.
*/
private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
final Block block, final ReplicaState reportedState,
@ -1591,6 +1643,13 @@ public class BlockManager {
+ " replicaState = " + reportedState);
}
if (namesystem.isInStandbyState() &&
namesystem.isGenStampInFuture(block.getGenerationStamp())) {
queueReportedBlock(dn, block, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
return null;
}
// find block by blockId
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
if(storedBlock == null) {
@ -1615,7 +1674,16 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
}
if (isReplicaCorrupt(block, reportedState, storedBlock, ucState, dn)) {
if (namesystem.isInStandbyState()) {
// If the block is an out-of-date generation stamp or state,
// but we're the standby, we shouldn't treat it as corrupt,
// but instead just queue it for later processing.
queueReportedBlock(dn, storedBlock, reportedState,
QUEUE_REASON_CORRUPT_STATE);
} else {
toCorrupt.add(storedBlock);
}
return storedBlock;
}
@ -1633,6 +1701,68 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
return storedBlock;
}
/**
* Queue the given reported block for later processing in the
* standby node. {@see PendingDataNodeMessages}.
* @param reason a textual reason to report in the debug logs
*/
private void queueReportedBlock(DatanodeDescriptor dn, Block block,
ReplicaState reportedState, String reason) {
assert namesystem.isInStandbyState();
if (LOG.isDebugEnabled()) {
LOG.debug("Queueing reported block " + block +
" in state " + reportedState +
" from datanode " + dn + " for later processing " +
"because " + reason + ".");
}
pendingDNMessages.enqueueReportedBlock(dn, block, reportedState);
}
/**
* Try to process any messages that were previously queued for the given
* block. This is called from FSEditLogLoader whenever a block's state
* in the namespace has changed or a new block has been created.
*/
public void processQueuedMessagesForBlock(Block b) throws IOException {
Queue<ReportedBlockInfo> queue = pendingDNMessages.takeBlockQueue(b);
if (queue == null) {
// Nothing to re-process
return;
}
processQueuedMessages(queue);
}
private void processQueuedMessages(Iterable<ReportedBlockInfo> rbis)
throws IOException {
for (ReportedBlockInfo rbi : rbis) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing previouly queued message " + rbi);
}
processAndHandleReportedBlock(
rbi.getNode(), rbi.getBlock(), rbi.getReportedState(), null);
}
}
/**
* Process any remaining queued datanode messages after entering
* active state. At this point they will not be re-queued since
* we are the definitive master node and thus should be up-to-date
* with the namespace information.
*/
public void processAllPendingDNMessages() throws IOException {
assert !namesystem.isInStandbyState() :
"processAllPendingDNMessages() should be called after exiting " +
"standby state!";
int count = pendingDNMessages.count();
if (count > 0) {
LOG.info("Processing " + count + " messages from DataNodes " +
"that were previously queued during standby state.");
}
processQueuedMessages(pendingDNMessages.takeAll());
assert pendingDNMessages.count() == 0;
}
/*
* The next two methods test the various cases under which we must conclude
* the replica is corrupt, or under construction. These are laid out
@ -1742,14 +1872,16 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
&& numCurrentReplica >= minReplication)
&& numCurrentReplica >= minReplication) {
storedBlock = completeBlock(storedBlock.getINode(), storedBlock, false);
} else if (storedBlock.isComplete()) {
// check whether safe replication is reached for the block
// only complete blocks are counted towards that
if(storedBlock.isComplete())
// only complete blocks are counted towards that.
// In the case that the block just became complete above, completeBlock()
// handles the safe block count maintenance.
namesystem.incrementSafeBlockCount(numCurrentReplica);
}
}
/**
* Modify (block-->datanode) map. Remove block from set of
@ -1807,14 +1939,16 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
+ pendingReplications.getNumReplicas(storedBlock);
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
numLiveReplicas >= minReplication)
numLiveReplicas >= minReplication) {
storedBlock = completeBlock(fileINode, storedBlock, false);
} else if (storedBlock.isComplete()) {
// check whether safe replication is reached for the block
// only complete blocks are counted towards that
// Is no-op if not in safe mode.
if(storedBlock.isComplete())
// In the case that the block just became complete above, completeBlock()
// handles the safe block count maintenance.
namesystem.incrementSafeBlockCount(numCurrentReplica);
}
// if file is under construction, then done for now
if (fileINode.isUnderConstruction()) {
@ -2514,7 +2648,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
}
public int getActiveBlockCount() {
return blocksMap.size() - (int)invalidateBlocks.numBlocks();
return blocksMap.size();
}
public DatanodeDescriptor[] getNodes(BlockInfo block) {

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* In the Standby Node, we can receive messages about blocks
* before they are actually available in the namespace, or while
* they have an outdated state in the namespace. In those cases,
* we queue those block-related messages in this structure.
* */
class PendingDataNodeMessages {
Map<Block, Queue<ReportedBlockInfo>> queueByBlockId =
Maps.newHashMap();
private int count = 0;
static class ReportedBlockInfo {
private final Block block;
private final DatanodeDescriptor dn;
private final ReplicaState reportedState;
ReportedBlockInfo(DatanodeDescriptor dn, Block block,
ReplicaState reportedState) {
this.dn = dn;
this.block = block;
this.reportedState = reportedState;
}
Block getBlock() {
return block;
}
DatanodeDescriptor getNode() {
return dn;
}
ReplicaState getReportedState() {
return reportedState;
}
@Override
public String toString() {
return "ReportedBlockInfo [block=" + block + ", dn=" + dn
+ ", reportedState=" + reportedState + "]";
}
}
void enqueueReportedBlock(DatanodeDescriptor dn, Block block,
ReplicaState reportedState) {
block = new Block(block);
getBlockQueue(block).add(
new ReportedBlockInfo(dn, block, reportedState));
count++;
}
/**
* @return any messages that were previously queued for the given block,
* or null if no messages were queued.
*/
Queue<ReportedBlockInfo> takeBlockQueue(Block block) {
Queue<ReportedBlockInfo> queue = queueByBlockId.remove(block);
if (queue != null) {
count -= queue.size();
}
return queue;
}
private Queue<ReportedBlockInfo> getBlockQueue(Block block) {
Queue<ReportedBlockInfo> queue = queueByBlockId.get(block);
if (queue == null) {
queue = Lists.newLinkedList();
queueByBlockId.put(block, queue);
}
return queue;
}
public int count() {
return count ;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (Map.Entry<Block, Queue<ReportedBlockInfo>> entry :
queueByBlockId.entrySet()) {
sb.append("Block " + entry.getKey() + ":\n");
for (ReportedBlockInfo rbi : entry.getValue()) {
sb.append(" ").append(rbi).append("\n");
}
}
return sb.toString();
}
public Iterable<ReportedBlockInfo> takeAll() {
List<ReportedBlockInfo> rbis = Lists.newArrayListWithCapacity(
count);
for (Queue<ReportedBlockInfo> q : queueByBlockId.values()) {
rbis.addAll(q);
}
queueByBlockId.clear();
count = 0;
return rbis;
}
}

View File

@ -66,7 +66,6 @@ import com.google.common.base.Joiner;
@InterfaceStability.Evolving
public class FSEditLogLoader {
private final FSNamesystem fsNamesys;
private long maxGenStamp = 0;
public FSEditLogLoader(FSNamesystem fsNamesys) {
this.fsNamesys = fsNamesys;
@ -91,15 +90,6 @@ public class FSEditLogLoader {
+ " of size " + edits.length() + " edits # " + numEdits
+ " loaded in " + (now()-startTime)/1000 + " seconds.");
} finally {
fsNamesys.setBlockTotal();
// Delay the notification of genstamp updates until after
// setBlockTotal() above. Otherwise, we will mark blocks
// as "safe" before they've been incorporated in the expected
// totalBlocks and threshold for SafeMode -- triggering an
// assertion failure and/or exiting safemode too early!
fsNamesys.notifyGenStampUpdate(maxGenStamp);
edits.close();
fsNamesys.writeUnlock();
}
@ -183,6 +173,12 @@ public class FSEditLogLoader {
switch (op.opCode) {
case OP_ADD: {
AddCloseOp addCloseOp = (AddCloseOp)op;
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
" numblocks : " + addCloseOp.blocks.length +
" clientHolder " + addCloseOp.clientName +
" clientMachine " + addCloseOp.clientMachine);
}
// See if the file already exists (persistBlocks call)
INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
@ -197,13 +193,6 @@ public class FSEditLogLoader {
}
long blockSize = addCloseOp.blockSize;
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
" numblocks : " + addCloseOp.blocks.length +
" clientHolder " + addCloseOp.clientName +
" clientMachine " + addCloseOp.clientMachine);
}
// Older versions of HDFS does not store the block size in inode.
// If the file has more than one block, use the size of the
// first block as the blocksize. Otherwise use the default
@ -227,12 +216,18 @@ public class FSEditLogLoader {
addCloseOp.atime, blockSize);
fsNamesys.prepareFileForWrite(addCloseOp.path, node,
addCloseOp.clientName, addCloseOp.clientMachine, null);
addCloseOp.clientName, addCloseOp.clientMachine, null,
false);
} else { // This is OP_ADD on an existing file
if (!oldFile.isUnderConstruction()) {
// This is a call to append() on an already-closed file.
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("Reopening an already-closed file " +
"for append");
}
fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
addCloseOp.clientName, addCloseOp.clientMachine, null);
addCloseOp.clientName, addCloseOp.clientMachine, null,
false);
oldFile = getINodeFile(fsDir, addCloseOp.path);
}
@ -243,6 +238,13 @@ public class FSEditLogLoader {
case OP_CLOSE: {
AddCloseOp addCloseOp = (AddCloseOp)op;
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
" numblocks : " + addCloseOp.blocks.length +
" clientHolder " + addCloseOp.clientName +
" clientMachine " + addCloseOp.clientMachine);
}
INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
if (oldFile == null) {
throw new IOException("Operation trying to close non-existent file " +
@ -478,14 +480,23 @@ public class FSEditLogLoader {
}
oldBlock.setNumBytes(newBlock.getNumBytes());
boolean changeMade =
oldBlock.getGenerationStamp() != newBlock.getGenerationStamp();
oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
if (oldBlock instanceof BlockInfoUnderConstruction &&
(!isLastBlock || addCloseOp.opCode == FSEditLogOpCodes.OP_CLOSE)) {
changeMade = true;
fsNamesys.getBlockManager().forceCompleteBlock(
(INodeFileUnderConstruction)file,
(BlockInfoUnderConstruction)oldBlock);
}
if (changeMade) {
// The state or gen-stamp of the block has changed. So, we may be
// able to process some messages from datanodes that we previously
// were unable to process.
fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
}
}
if (addCloseOp.blocks.length < oldBlocks.length) {
@ -517,13 +528,9 @@ public class FSEditLogLoader {
}
fsNamesys.getBlockManager().addINode(newBI, file);
file.addBlock(newBI);
fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
}
}
// Record the max genstamp seen
for (Block b : addCloseOp.blocks) {
maxGenStamp = Math.max(maxGenStamp, b.getGenerationStamp());
}
}
private static void dumpOpCounts(

View File

@ -154,10 +154,6 @@ import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReceivedDeleteMessage;
import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReportMessage;
import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.CommitBlockSynchronizationMessage;
import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.DataNodeMessage;
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
@ -321,8 +317,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// lock to protect FSNamesystem.
private ReentrantReadWriteLock fsLock;
private PendingDataNodeMessages pendingDatanodeMessages = new PendingDataNodeMessages();
/**
* Used when this NN is in standby state to read from the shared edit log.
*/
@ -343,10 +337,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private final Configuration conf;
PendingDataNodeMessages getPendingDataNodeMessages() {
return pendingDatanodeMessages;
}
/**
* Instantiates an FSNamesystem loaded from the image and edits
* directories specified in the passed Configuration.
@ -481,6 +471,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
assert safeMode != null &&
!safeMode.initializedReplQueues;
setBlockTotal();
blockManager.activate(conf);
this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
@ -531,6 +523,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
LOG.info("Reprocessing replication and invalidation queues...");
blockManager.getDatanodeManager().markAllDatanodesStale();
blockManager.clearQueues();
blockManager.processAllPendingDNMessages();
blockManager.processMisReplicatedBlocks();
if (LOG.isDebugEnabled()) {
@ -850,7 +843,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return fsRunning;
}
private boolean isInStandbyState() {
@Override
public boolean isInStandbyState() {
if (haContext == null || haContext.getState() == null) {
// We're still starting up. In this case, if HA is
// on for the cluster, we always start in standby. Otherwise
@ -1543,7 +1537,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
if (append && myFile != null) {
return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode);
return prepareFileForWrite(
src, myFile, holder, clientMachine, clientNode, true);
} else {
// Now we can add the name to the filesystem. This file has no
// blocks associated with it.
@ -1581,12 +1576,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @param leaseHolder identifier of the lease holder on this file
* @param clientMachine identifier of the client machine
* @param clientNode if the client is collocated with a DN, that DN's descriptor
* @param writeToEditLog whether to persist this change to the edit log
* @return the last block locations if the block is partial or null otherwise
* @throws UnresolvedLinkException
* @throws IOException
*/
public LocatedBlock prepareFileForWrite(String src, INode file,
String leaseHolder, String clientMachine, DatanodeDescriptor clientNode)
String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
boolean writeToEditLog)
throws UnresolvedLinkException, IOException {
INodeFile node = (INodeFile) file;
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
@ -1602,6 +1599,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
dir.replaceNode(src, node, cons);
leaseManager.addLease(cons.getClientName(), src);
if (writeToEditLog) {
getEditLog().logOpenFile(src, cons);
}
return blockManager.convertLastBlockToUnderConstruction(cons);
}
@ -2346,9 +2347,45 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (blocks == null) {
return;
}
// In the case that we are a Standby tailing edits from the
// active while in safe-mode, we need to track the total number
// of blocks and safe blocks in the system.
boolean trackBlockCounts = isSafeModeTrackingBlocks();
int numRemovedComplete = 0, numRemovedSafe = 0;
for (Block b : blocks) {
if (trackBlockCounts) {
BlockInfo bi = blockManager.getStoredBlock(b);
if (bi.isComplete()) {
numRemovedComplete++;
if (bi.numNodes() >= blockManager.minReplication) {
numRemovedSafe++;
}
}
}
blockManager.removeBlock(b);
}
if (trackBlockCounts) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adjusting safe-mode totals for deletion of " + src + ":" +
"decreasing safeBlocks by " + numRemovedSafe +
", totalBlocks by " + numRemovedComplete);
}
adjustSafeModeBlockTotals(-numRemovedSafe, -numRemovedComplete);
}
}
/**
* @see SafeModeInfo#shouldIncrementallyTrackBlocks
*/
private boolean isSafeModeTrackingBlocks() {
if (!haEnabled) {
// Never track blocks incrementally in non-HA code.
return false;
}
SafeModeInfo sm = this.safeMode;
return sm != null && sm.shouldIncrementallyTrackBlocks();
}
/**
@ -2712,15 +2749,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOperation(OperationCategory.WRITE);
if (haContext.getState().equals(NameNode.STANDBY_STATE)) {
// TODO(HA) we'll never get here, since we check for WRITE operation above!
if (isGenStampInFuture(newgenerationstamp)) {
LOG.info("Required GS=" + newgenerationstamp
+ ", Queuing commitBlockSynchronization message");
getPendingDataNodeMessages().queueMessage(
new PendingDataNodeMessages.CommitBlockSynchronizationMessage(
lastblock, newgenerationstamp, newlength, closeFile, deleteblock,
newtargets, newgenerationstamp));
return;
}
// Need to implement tests, etc, for this - block recovery spanning
// failover.
}
if (isInSafeMode()) {
@ -3264,6 +3294,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean initializedReplQueues = false;
/** Was safemode entered automatically because available resources were low. */
private boolean resourcesLow = false;
/** Should safemode adjust its block totals as blocks come in */
private boolean shouldIncrementallyTrackBlocks = false;
/**
* Creates SafeModeInfo when the name node enters
@ -3291,6 +3323,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
this.blockSafe = 0;
}
/**
* In the HA case, the StandbyNode can be in safemode while the namespace
* is modified by the edit log tailer. In this case, the number of total
* blocks changes as edits are processed (eg blocks are added and deleted).
* However, we don't want to do the incremental tracking during the
* startup-time loading process -- only once the initial total has been
* set after the image has been loaded.
*/
private boolean shouldIncrementallyTrackBlocks() {
return shouldIncrementallyTrackBlocks;
}
/**
* Creates SafeModeInfo when safe mode is entered manually, or because
* available resources are low.
@ -3476,6 +3520,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
this.blockThreshold = (int) (blockTotal * threshold);
this.blockReplQueueThreshold =
(int) (blockTotal * replQueueThreshold);
if (haEnabled) {
// After we initialize the block count, any further namespace
// modifications done while in safe mode need to keep track
// of the number of total blocks in the system.
this.shouldIncrementallyTrackBlocks = true;
}
checkMode();
}
@ -3485,10 +3536,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @param replication current replication
*/
private synchronized void incrementSafeBlockCount(short replication) {
if (replication == safeReplication)
if (replication == safeReplication) {
this.blockSafe++;
checkMode();
}
}
/**
* Decrement number of safe blocks if current block has
@ -3496,10 +3548,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @param replication current replication
*/
private synchronized void decrementSafeBlockCount(short replication) {
if (replication == safeReplication-1)
if (replication == safeReplication-1) {
this.blockSafe--;
assert blockSafe >= 0 || isManual();
checkMode();
}
}
/**
* Check if safe mode was entered manually or automatically (at startup, or
@ -3636,6 +3690,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+ "BlockManager data: active=" + activeBlocks);
}
}
private void adjustBlockTotals(int deltaSafe, int deltaTotal) {
if (!shouldIncrementallyTrackBlocks) {
return;
}
assert haEnabled;
if (LOG.isDebugEnabled()) {
LOG.debug("Adjusting block totals from " +
blockSafe + "/" + blockTotal + " to " +
(blockSafe + deltaSafe) + "/" + (blockTotal + deltaTotal));
}
assert blockSafe + deltaSafe >= 0 : "Can't reduce blockSafe " +
blockSafe + " by " + deltaSafe + ": would be negative";
assert blockTotal + deltaTotal >= 0 : "Can't reduce blockTotal " +
blockTotal + " by " + deltaTotal + ": would be negative";
blockSafe += deltaSafe;
setBlockTotal(blockTotal + deltaTotal);
}
}
/**
@ -3741,8 +3815,25 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null) // mostly true
return;
BlockInfo storedBlock = blockManager.getStoredBlock(b);
if (storedBlock.isComplete()) {
safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
}
}
/**
* Adjust the total number of blocks safe and expected during safe mode.
* If safe mode is not currently on, this is a no-op.
* @param deltaSafe the change in number of safe blocks
* @param deltaTotal the change i nnumber of total blocks expected
*/
public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal) {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
return;
safeMode.adjustBlockTotals(deltaSafe, deltaTotal);
}
/**
* Set the total number of blocks in the system.
@ -4065,6 +4156,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return blockManager.getPostponedMisreplicatedBlocksCount();
}
@Metric
public int getPendingDataNodeMessageCount() {
return blockManager.getPendingDataNodeMessageCount();
}
@Metric
public int getBlockCapacity() {
return blockManager.getCapacity();
@ -4912,54 +5008,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
public boolean isGenStampInFuture(long genStamp) {
return (genStamp > getGenerationStamp());
}
public void notifyGenStampUpdate(long gs) {
if (LOG.isDebugEnabled()) {
LOG.debug("Generation stamp " + gs + " has been reached. " +
"Processing pending messages from DataNodes...");
}
DataNodeMessage msg = pendingDatanodeMessages.take(gs);
while (msg != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing previously pending message: " + msg);
}
try {
switch (msg.getType()) {
case BLOCK_RECEIVED_DELETE:
BlockReceivedDeleteMessage m = (BlockReceivedDeleteMessage) msg;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog
.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from "
+ m.getNodeReg().getName() + " "
+ m.getReceivedAndDeletedBlocks().length + " blocks.");
}
this.getBlockManager().processIncrementalBlockReport(m.getNodeReg(),
m.getPoolId(), m.getReceivedAndDeletedBlocks());
break;
case BLOCK_REPORT:
BlockReportMessage mbr = (BlockReportMessage) msg;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
+ "from " + mbr.getNodeReg().getName() + " "
+ mbr.getBlockList().getNumberOfBlocks() + " blocks");
}
this.getBlockManager().processReport(mbr.getNodeReg(),
mbr.getPoolId(), mbr.getBlockList());
break;
case COMMIT_BLOCK_SYNCHRONIZATION:
CommitBlockSynchronizationMessage mcbm = (CommitBlockSynchronizationMessage) msg;
this.commitBlockSynchronization(mcbm.getBlock(),
mcbm.getNewgenerationstamp(), mcbm.getNewlength(),
mcbm.isCloseFile(), mcbm.isDeleteblock(), mcbm.getNewtargets());
break;
}
} catch (IOException ex) {
LOG.warn("Could not process the message " + msg.getType(), ex);
}
msg = pendingDatanodeMessages.take(gs);
}
}
@VisibleForTesting
public EditLogTailer getEditLogTailer() {
return editLogTailer;

View File

@ -878,16 +878,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
String poolId, long[] blocks) throws IOException {
verifyRequest(nodeReg);
BlockListAsLongs blist = new BlockListAsLongs(blocks);
if (nn.isStandbyState()) {
long maxGs = blist.getMaxGsInBlockList();
if (namesystem.isGenStampInFuture(maxGs)) {
LOG.info("Required GS="+maxGs+", Queuing blockReport message");
namesystem.getPendingDataNodeMessages().queueMessage(
new PendingDataNodeMessages.BlockReportMessage(nodeReg, poolId,
blist, maxGs));
return null;
}
}
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
+ "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks()
@ -904,25 +894,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException {
verifyRequest(nodeReg);
if (nn.isStandbyState()) {
if (receivedAndDeletedBlocks.length > 0) {
long maxGs = receivedAndDeletedBlocks[0].getBlock()
.getGenerationStamp();
for (ReceivedDeletedBlockInfo binfo : receivedAndDeletedBlocks) {
if (binfo.getBlock().getGenerationStamp() > maxGs) {
maxGs = binfo.getBlock().getGenerationStamp();
}
}
if (namesystem.isGenStampInFuture(maxGs)) {
LOG.info("Required GS=" + maxGs
+ ", Queuing blockReceivedAndDeleted message");
namesystem.getPendingDataNodeMessages().queueMessage(
new PendingDataNodeMessages.BlockReceivedDeleteMessage(nodeReg,
poolId, receivedAndDeletedBlocks, maxGs));
return;
}
}
}
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
+"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length

View File

@ -32,4 +32,10 @@ public interface Namesystem extends RwLock, SafeMode {
/** @return the block pool ID */
public String getBlockPoolId();
public boolean isInStandbyState();
public boolean isGenStampInFuture(long generationStamp);
public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
}

View File

@ -1,201 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.util.PriorityQueue;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
public class PendingDataNodeMessages {
PriorityQueue<DataNodeMessage> queue = new PriorityQueue<DataNodeMessage>();
enum MessageType {
BLOCK_RECEIVED_DELETE,
BLOCK_REPORT,
COMMIT_BLOCK_SYNCHRONIZATION
}
static abstract class DataNodeMessage
implements Comparable<DataNodeMessage> {
final MessageType type;
private final long targetGs;
DataNodeMessage(MessageType type, long targetGenStamp) {
this.type = type;
this.targetGs = targetGenStamp;
}
protected MessageType getType() {
return type;
}
protected long getTargetGs() {
return targetGs;
}
public int compareTo(DataNodeMessage other) {
if (targetGs == other.targetGs) {
return 0;
} else if (targetGs < other.targetGs) {
return -1;
}
return 1;
}
}
static class BlockReceivedDeleteMessage extends DataNodeMessage {
final DatanodeRegistration nodeReg;
final String poolId;
final ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks;
BlockReceivedDeleteMessage(DatanodeRegistration nodeReg, String poolId,
ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks, long targetGs) {
super(MessageType.BLOCK_RECEIVED_DELETE, targetGs);
this.nodeReg = nodeReg;
this.poolId = poolId;
this.receivedAndDeletedBlocks = receivedAndDeletedBlocks;
}
DatanodeRegistration getNodeReg() {
return nodeReg;
}
String getPoolId() {
return poolId;
}
ReceivedDeletedBlockInfo[] getReceivedAndDeletedBlocks() {
return receivedAndDeletedBlocks;
}
public String toString() {
return "BlockReceivedDeletedMessage with " +
receivedAndDeletedBlocks.length + " blocks";
}
}
static class CommitBlockSynchronizationMessage extends DataNodeMessage {
private final ExtendedBlock block;
private final long newgenerationstamp;
private final long newlength;
private final boolean closeFile;
private final boolean deleteblock;
private final DatanodeID[] newtargets;
CommitBlockSynchronizationMessage(ExtendedBlock block,
long newgenerationstamp, long newlength, boolean closeFile,
boolean deleteblock, DatanodeID[] newtargets, long targetGenStamp) {
super(MessageType.COMMIT_BLOCK_SYNCHRONIZATION, targetGenStamp);
this.block = block;
this.newgenerationstamp = newgenerationstamp;
this.newlength = newlength;
this.closeFile = closeFile;
this.deleteblock = deleteblock;
this.newtargets = newtargets;
}
ExtendedBlock getBlock() {
return block;
}
long getNewgenerationstamp() {
return newgenerationstamp;
}
long getNewlength() {
return newlength;
}
boolean isCloseFile() {
return closeFile;
}
boolean isDeleteblock() {
return deleteblock;
}
DatanodeID[] getNewtargets() {
return newtargets;
}
public String toString() {
return "CommitBlockSynchronizationMessage for " + block;
}
}
static class BlockReportMessage extends DataNodeMessage {
private final DatanodeRegistration nodeReg;
private final String poolId;
private final BlockListAsLongs blockList;
BlockReportMessage(DatanodeRegistration nodeReg, String poolId,
BlockListAsLongs blist, long targetGenStamp) {
super(MessageType.BLOCK_REPORT, targetGenStamp);
this.nodeReg = nodeReg;
this.poolId = poolId;
this.blockList = blist;
}
DatanodeRegistration getNodeReg() {
return nodeReg;
}
String getPoolId() {
return poolId;
}
BlockListAsLongs getBlockList() {
return blockList;
}
public String toString() {
return "BlockReport from " + nodeReg + " with " + blockList.getNumberOfBlocks() + " blocks";
}
}
synchronized void queueMessage(DataNodeMessage msg) {
queue.add(msg);
}
/**
* Returns a message if contains a message less or equal to the given gs,
* otherwise returns null.
*
* @param gs
*/
synchronized DataNodeMessage take(long gs) {
DataNodeMessage m = queue.peek();
if (m != null && m.getTargetGs() <= gs) {
return queue.remove();
} else {
return null;
}
}
synchronized boolean isEmpty() {
return queue.isEmpty();
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.*;
import java.util.Queue;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.junit.Test;
import com.google.common.base.Joiner;
public class TestPendingDataNodeMessages {
PendingDataNodeMessages msgs = new PendingDataNodeMessages();
private final Block block1Gs1 = new Block(1, 0, 1);
private final Block block1Gs2 = new Block(1, 0, 2);
private final Block block1Gs2DifferentInstance =
new Block(1, 0, 2);
private final Block block2Gs1 = new Block(2, 0, 1);
private final DatanodeDescriptor fakeDN = new DatanodeDescriptor(
new DatanodeID("fake"));
@Test
public void testQueues() {
msgs.enqueueReportedBlock(fakeDN, block1Gs1, ReplicaState.FINALIZED);
msgs.enqueueReportedBlock(fakeDN, block1Gs2, ReplicaState.FINALIZED);
assertEquals(2, msgs.count());
// Nothing queued yet for block 2
assertNull(msgs.takeBlockQueue(block2Gs1));
assertEquals(2, msgs.count());
Queue<ReportedBlockInfo> q =
msgs.takeBlockQueue(block1Gs2DifferentInstance);
assertEquals(
"ReportedBlockInfo [block=blk_1_1, dn=fake, reportedState=FINALIZED]," +
"ReportedBlockInfo [block=blk_1_2, dn=fake, reportedState=FINALIZED]",
Joiner.on(",").join(q));
assertEquals(0, msgs.count());
// Should be null if we pull again
assertNull(msgs.takeBlockQueue(block1Gs1));
assertEquals(0, msgs.count());
}
}

View File

@ -30,8 +30,8 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.ipc.Server;

View File

@ -21,18 +21,18 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
@ -40,23 +40,29 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
@ -361,6 +367,164 @@ public class TestDNFencing {
DFSTestUtil.readFile(fs2, TEST_FILE_PATH);
}
/**
* Regression test for HDFS-2742. The issue in this bug was:
* - DN does a block report while file is open. This BR contains
* the block in RBW state.
* - Standby queues the RBW state in PendingDatanodeMessages
* - Standby processes edit logs during failover. Before fixing
* this bug, it was mistakenly applying the RBW reported state
* after the block had been completed, causing the block to get
* marked corrupt. Instead, we should now be applying the RBW
* message on OP_ADD, and then the FINALIZED message on OP_CLOSE.
*/
@Test
public void testBlockReportsWhileFileBeingWritten() throws Exception {
FSDataOutputStream out = fs.create(TEST_FILE_PATH);
try {
AppendTestUtil.write(out, 0, 10);
out.hflush();
// Block report will include the RBW replica, but will be
// queued on the StandbyNode.
cluster.triggerBlockReports();
} finally {
IOUtils.closeStream(out);
}
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
// Verify that no replicas are marked corrupt, and that the
// file is readable from the failed-over standby.
BlockManagerTestUtil.updateState(nn1.getNamesystem().getBlockManager());
BlockManagerTestUtil.updateState(nn2.getNamesystem().getBlockManager());
assertEquals(0, nn1.getNamesystem().getCorruptReplicaBlocks());
assertEquals(0, nn2.getNamesystem().getCorruptReplicaBlocks());
DFSTestUtil.readFile(fs, TEST_FILE_PATH);
}
/**
* Test that, when a block is re-opened for append, the related
* datanode messages are correctly queued by the SBN because
* they have future states and genstamps.
*/
@Test
public void testQueueingWithAppend() throws Exception {
int numQueued = 0;
int numDN = cluster.getDataNodes().size();
FSDataOutputStream out = fs.create(TEST_FILE_PATH);
try {
AppendTestUtil.write(out, 0, 10);
out.hflush();
// Opening the file will report RBW replicas, but will be
// queued on the StandbyNode.
numQueued += numDN; // RBW messages
} finally {
IOUtils.closeStream(out);
numQueued += numDN; // blockReceived messages
}
cluster.triggerBlockReports();
numQueued += numDN;
try {
out = fs.append(TEST_FILE_PATH);
AppendTestUtil.write(out, 10, 10);
// RBW replicas once it's opened for append
numQueued += numDN;
} finally {
IOUtils.closeStream(out);
numQueued += numDN; // blockReceived
}
cluster.triggerBlockReports();
numQueued += numDN;
assertEquals(numQueued, cluster.getNameNode(1).getNamesystem().
getPendingDataNodeMessageCount());
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
// Verify that no replicas are marked corrupt, and that the
// file is readable from the failed-over standby.
BlockManagerTestUtil.updateState(nn1.getNamesystem().getBlockManager());
BlockManagerTestUtil.updateState(nn2.getNamesystem().getBlockManager());
assertEquals(0, nn1.getNamesystem().getCorruptReplicaBlocks());
assertEquals(0, nn2.getNamesystem().getCorruptReplicaBlocks());
AppendTestUtil.check(fs, TEST_FILE_PATH, 20);
}
/**
* Another regression test for HDFS-2742. This tests the following sequence:
* - DN does a block report while file is open. This BR contains
* the block in RBW state.
* - The block report is delayed in reaching the standby.
* - The file is closed.
* - The standby processes the OP_ADD and OP_CLOSE operations before
* the RBW block report arrives.
* - The standby should not mark the block as corrupt.
*/
@Test
public void testRBWReportArrivesAfterEdits() throws Exception {
final CountDownLatch brFinished = new CountDownLatch(1);
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) {
@Override
protected Object passThrough(InvocationOnMock invocation)
throws Throwable {
try {
return super.passThrough(invocation);
} finally {
// inform the test that our block report went through.
brFinished.countDown();
}
}
};
FSDataOutputStream out = fs.create(TEST_FILE_PATH);
try {
AppendTestUtil.write(out, 0, 10);
out.hflush();
DataNode dn = cluster.getDataNodes().get(0);
DatanodeProtocolClientSideTranslatorPB spy =
DataNodeAdapter.spyOnBposToNN(dn, nn2);
Mockito.doAnswer(delayer)
.when(spy).blockReport(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.anyString(),
Mockito.<long[]>anyObject());
dn.scheduleAllBlockReport(0);
delayer.waitForCall();
} finally {
IOUtils.closeStream(out);
}
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
delayer.proceed();
brFinished.await();
// Verify that no replicas are marked corrupt, and that the
// file is readable from the failed-over standby.
BlockManagerTestUtil.updateState(nn1.getNamesystem().getBlockManager());
BlockManagerTestUtil.updateState(nn2.getNamesystem().getBlockManager());
assertEquals(0, nn1.getNamesystem().getCorruptReplicaBlocks());
assertEquals(0, nn2.getNamesystem().getCorruptReplicaBlocks());
DFSTestUtil.readFile(fs, TEST_FILE_PATH);
}
/**
* Print a big banner in the test log to make debug easier.
*/

View File

@ -25,10 +25,13 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -38,15 +41,19 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
/**
* Tests that exercise safemode in an HA cluster.
@ -60,6 +67,12 @@ public class TestHASafeMode {
private MiniDFSCluster cluster;
private Runtime mockRuntime = mock(Runtime.class);
static {
((Log4JLogger)LogFactory.getLog(FSImage.class)).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
}
@Before
public void setupCluster() throws Exception {
Configuration conf = new Configuration();
@ -112,7 +125,11 @@ public class TestHASafeMode {
@Test
public void testEnterSafeModeInANNShouldNotThrowNPE() throws Exception {
banner("Restarting active");
DFSTestUtil
.createFile(fs, new Path("/test"), 3 * BLOCK_SIZE, (short) 3, 1L);
restartActive();
nn0.getRpcServer().transitionToActive();
FSNamesystem namesystem = nn0.getNamesystem();
String status = namesystem.getSafemode();
assertTrue("Bad safemode status: '" + status + "'", status
@ -187,24 +204,14 @@ public class TestHASafeMode {
banner("Restarting standby");
restartStandby();
// We expect it to be stuck in safemode (not the extension) because
// the block reports are delayed (since they include blocks
// from /test2 which are too-high genstamps.
String status = nn1.getNamesystem().getSafemode();
assertTrue("Bad safemode status: '" + status + "'",
status.startsWith(
"Safe mode is ON." +
"The reported blocks 0 needs additional 3 blocks to reach"));
// We expect it not to be stuck in safemode, since those blocks
// that are already visible to the SBN should be processed
// in the initial block reports.
assertSafeMode(nn1, 3, 3);
banner("Waiting for standby to catch up to active namespace");
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
status = nn1.getNamesystem().getSafemode();
assertTrue("Bad safemode status: '" + status + "'",
status.startsWith(
"Safe mode is ON." +
"The reported blocks 8 has reached the threshold 0.9990 of " +
"total blocks 8. Safe mode will be turned off automatically"));
assertSafeMode(nn1, 8, 8);
}
/**
@ -224,12 +231,7 @@ public class TestHASafeMode {
banner("Restarting standby");
restartStandby();
String status = nn1.getNamesystem().getSafemode();
assertTrue("Bad safemode status: '" + status + "'",
status.startsWith(
"Safe mode is ON." +
"The reported blocks 3 has reached the threshold 0.9990 of " +
"total blocks 3. Safe mode will be turned off automatically"));
assertSafeMode(nn1, 3, 3);
// Create a few blocks which will send blockReceived calls to the
// SBN.
@ -240,12 +242,7 @@ public class TestHASafeMode {
banner("Waiting for standby to catch up to active namespace");
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
status = nn1.getNamesystem().getSafemode();
assertTrue("Bad safemode status: '" + status + "'",
status.startsWith(
"Safe mode is ON." +
"The reported blocks 8 has reached the threshold 0.9990 of " +
"total blocks 8. Safe mode will be turned off automatically"));
assertSafeMode(nn1, 8, 8);
}
/**
@ -285,20 +282,11 @@ public class TestHASafeMode {
banner("Restarting standby");
restartStandby();
String status = nn1.getNamesystem().getSafemode();
assertTrue("Bad safemode status: '" + status + "'",
status.startsWith(
"Safe mode is ON." +
"The reported blocks 0 needs additional 5 blocks to reach"));
assertSafeMode(nn1, 0, 5);
banner("Waiting for standby to catch up to active namespace");
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
status = nn1.getNamesystem().getSafemode();
assertTrue("Bad safemode status: '" + status + "'",
status.startsWith(
"Safe mode is ON." +
"The reported blocks 0 has reached the threshold 0.9990 of " +
"total blocks 0. Safe mode will be turned off automatically"));
assertSafeMode(nn1, 0, 0);
}
/**
@ -320,12 +308,7 @@ public class TestHASafeMode {
restartStandby();
// It will initially have all of the blocks necessary.
String status = nn1.getNamesystem().getSafemode();
assertTrue("Bad safemode status: '" + status + "'",
status.startsWith(
"Safe mode is ON." +
"The reported blocks 10 has reached the threshold 0.9990 of " +
"total blocks 10. Safe mode will be turned off automatically"));
assertSafeMode(nn1, 10, 10);
// Delete those blocks while the SBN is in safe mode - this
// should reduce it back below the threshold
@ -339,21 +322,121 @@ public class TestHASafeMode {
HATestUtil.waitForDNDeletions(cluster);
cluster.triggerDeletionReports();
status = nn1.getNamesystem().getSafemode();
assertTrue("Bad safemode status: '" + status + "'",
status.startsWith(
"Safe mode is ON." +
"The reported blocks 0 needs additional 10 blocks"));
assertSafeMode(nn1, 0, 10);
banner("Waiting for standby to catch up to active namespace");
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
status = nn1.getNamesystem().getSafemode();
assertSafeMode(nn1, 0, 0);
}
/**
* Tests that the standby node properly tracks the number of total
* and safe blocks while it is in safe mode. Since safe-mode only
* counts completed blocks, append needs to decrement the total
* number of blocks and then re-increment when the file is closed
* again.
*/
@Test
public void testAppendWhileInSafeMode() throws Exception {
banner("Starting with NN0 active and NN1 standby, creating some blocks");
// Make 4.5 blocks so that append() will re-open an existing block
// instead of just adding a new one
DFSTestUtil.createFile(fs, new Path("/test"),
4*BLOCK_SIZE + BLOCK_SIZE/2, (short) 3, 1L);
// Roll edit log so that, when the SBN restarts, it will load
// the namespace during startup.
nn0.getRpcServer().rollEditLog();
banner("Restarting standby");
restartStandby();
// It will initially have all of the blocks necessary.
assertSafeMode(nn1, 5, 5);
// Append to a block while SBN is in safe mode. This should
// not affect safemode initially, since the DN message
// will get queued.
FSDataOutputStream stm = fs.append(new Path("/test"));
try {
assertSafeMode(nn1, 5, 5);
// if we roll edits now, the SBN should see that it's under construction
// and change its total count and safe count down by one, since UC
// blocks are not counted by safe mode.
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
assertSafeMode(nn1, 4, 4);
} finally {
IOUtils.closeStream(stm);
}
// Delete those blocks while the SBN is in safe mode - this
// should reduce it back below the threshold
banner("Removing the blocks without rolling the edit log");
fs.delete(new Path("/test"), true);
BlockManagerTestUtil.computeAllPendingWork(
nn0.getNamesystem().getBlockManager());
banner("Triggering deletions on DNs and Deletion Reports");
cluster.triggerHeartbeats();
HATestUtil.waitForDNDeletions(cluster);
cluster.triggerDeletionReports();
assertSafeMode(nn1, 0, 4);
banner("Waiting for standby to catch up to active namespace");
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
assertSafeMode(nn1, 0, 0);
}
/**
* Regression test for a bug experienced while developing
* HDFS-2742. The scenario here is:
* - image contains some blocks
* - edits log contains at least one block addition, followed
* by deletion of more blocks than were added.
* - When node starts up, some incorrect accounting of block
* totals caused an assertion failure.
*/
@Test
public void testBlocksDeletedInEditLog() throws Exception {
banner("Starting with NN0 active and NN1 standby, creating some blocks");
// Make 4 blocks persisted in the image.
DFSTestUtil.createFile(fs, new Path("/test"),
4*BLOCK_SIZE, (short) 3, 1L);
NameNodeAdapter.enterSafeMode(nn0, false);
NameNodeAdapter.saveNamespace(nn0);
NameNodeAdapter.leaveSafeMode(nn0, false);
// OP_ADD for 2 blocks
DFSTestUtil.createFile(fs, new Path("/test2"),
2*BLOCK_SIZE, (short) 3, 1L);
// OP_DELETE for 4 blocks
fs.delete(new Path("/test"), true);
restartActive();
}
private void assertSafeMode(NameNode nn, int safe, int total) {
String status = nn1.getNamesystem().getSafemode();
if (safe == total) {
assertTrue("Bad safemode status: '" + status + "'",
status.startsWith(
"Safe mode is ON." +
"The reported blocks 0 has reached the threshold 0.9990 of " +
"total blocks 0. Safe mode will be turned off automatically"));
"The reported blocks " + safe + " has reached the threshold " +
"0.9990 of total blocks " + total + ". Safe mode will be " +
"turned off automatically"));
} else {
int additional = total - safe;
assertTrue("Bad safemode status: '" + status + "'",
status.startsWith(
"Safe mode is ON." +
"The reported blocks " + safe + " needs additional " +
additional + " blocks"));
}
}
/**
@ -378,26 +461,107 @@ public class TestHASafeMode {
banner("Restarting standby");
restartStandby();
// We expect it to be stuck in safemode (not the extension) because
// the block reports are delayed (since they include blocks
// from /test2 which are too-high genstamps.
String status = nn1.getNamesystem().getSafemode();
assertTrue("Bad safemode status: '" + status + "'",
status.startsWith(
"Safe mode is ON." +
"The reported blocks 0 needs additional 3 blocks to reach"));
// We expect it to be on its way out of safemode, since all of the blocks
// from the edit log have been reported.
assertSafeMode(nn1, 3, 3);
// Initiate a failover into it while it's in safemode
banner("Initiating a failover into NN1 in safemode");
NameNodeAdapter.abortEditLogs(nn0);
cluster.transitionToActive(1);
status = nn1.getNamesystem().getSafemode();
assertSafeMode(nn1, 5, 5);
}
/**
* Similar to {@link #testBlocksRemovedWhileInSafeMode()} except that
* the OP_DELETE edits arrive at the SBN before the block deletion reports.
* The tracking of safe blocks needs to properly account for the removal
* of the blocks as well as the safe count. This is a regression test for
* HDFS-2742.
*/
@Test
public void testBlocksRemovedWhileInSafeModeEditsArriveFirst() throws Exception {
banner("Starting with NN0 active and NN1 standby, creating some blocks");
DFSTestUtil.createFile(fs, new Path("/test"), 10*BLOCK_SIZE, (short) 3, 1L);
// Roll edit log so that, when the SBN restarts, it will load
// the namespace during startup.
nn0.getRpcServer().rollEditLog();
banner("Restarting standby");
restartStandby();
// It will initially have all of the blocks necessary.
String status = nn1.getNamesystem().getSafemode();
assertTrue("Bad safemode status: '" + status + "'",
status.startsWith(
"Safe mode is ON." +
"The reported blocks 5 has reached the threshold 0.9990 of " +
"total blocks 5. Safe mode will be turned off automatically"));
"The reported blocks 10 has reached the threshold 0.9990 of " +
"total blocks 10. Safe mode will be turned off automatically"));
// Delete those blocks while the SBN is in safe mode.
// Immediately roll the edit log before the actual deletions are sent
// to the DNs.
banner("Removing the blocks without rolling the edit log");
fs.delete(new Path("/test"), true);
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
// Should see removal of the blocks as well as their contribution to safe block count.
assertSafeMode(nn1, 0, 0);
banner("Triggering sending deletions to DNs and Deletion Reports");
BlockManagerTestUtil.computeAllPendingWork(
nn0.getNamesystem().getBlockManager());
cluster.triggerHeartbeats();
HATestUtil.waitForDNDeletions(cluster);
cluster.triggerDeletionReports();
// No change in assertion status here, but some of the consistency checks
// in safemode will fire here if we accidentally decrement safe block count
// below 0.
assertSafeMode(nn1, 0, 0);
}
/**
* Test that the number of safe blocks is accounted correctly even when
* blocks move between under-construction state and completed state.
* If a FINALIZED report arrives at the SBN before the block is marked
* COMPLETE, then when we get the OP_CLOSE we need to count it as "safe"
* at that point. This is a regression test for HDFS-2742.
*/
@Test
public void testSafeBlockTracking() throws Exception {
banner("Starting with NN0 active and NN1 standby, creating some " +
"UC blocks plus some other blocks to force safemode");
DFSTestUtil.createFile(fs, new Path("/other-blocks"), 10*BLOCK_SIZE, (short) 3, 1L);
List<FSDataOutputStream> stms = Lists.newArrayList();
try {
for (int i = 0; i < 5; i++) {
FSDataOutputStream stm = fs.create(new Path("/test-uc-" + i));
stms.add(stm);
stm.write(1);
stm.hflush();
}
// Roll edit log so that, when the SBN restarts, it will load
// the namespace during startup and enter safemode.
nn0.getRpcServer().rollEditLog();
} finally {
for (FSDataOutputStream stm : stms) {
IOUtils.closeStream(stm);
}
}
banner("Restarting SBN");
restartStandby();
assertSafeMode(nn1, 10, 10);
banner("Allowing SBN to catch up");
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
assertSafeMode(nn1, 15, 15);
}
/**
@ -425,12 +589,7 @@ public class TestHASafeMode {
nn0.getRpcServer().rollEditLog();
restartStandby();
String status = nn1.getNamesystem().getSafemode();
assertTrue("Bad safemode status: '" + status + "'",
status.startsWith(
"Safe mode is ON." +
"The reported blocks 6 has reached the threshold 0.9990 of " +
"total blocks 6. Safe mode will be turned off automatically"));
assertSafeMode(nn1, 6, 6);
}
/**