HDFS-4987. Namenode changes to track multiple storages per datanode.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1518087 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-08-28 06:30:15 +00:00
parent 8a9db6782e
commit 5d9d702607
16 changed files with 458 additions and 340 deletions

View File

@ -0,0 +1,4 @@
BREAKDOWN OF HDFS-2832 ENABLE SUPPORT FOR HETEROGENEOUS STORAGES IN HDFS
HDFS-4987. Namenode changes to track multiple storages per datanode.
(szetszwo)

View File

@ -21,6 +21,7 @@ import java.util.LinkedList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.util.LightWeightGSet;
@ -39,11 +40,11 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
private LightWeightGSet.LinkedElement nextLinkedElement;
/**
* This array contains triplets of references. For each i-th datanode the
* block belongs to triplets[3*i] is the reference to the DatanodeDescriptor
* and triplets[3*i+1] and triplets[3*i+2] are references to the previous and
* the next blocks, respectively, in the list of blocks belonging to this
* data-node.
* This array contains triplets of references. For each i-th storage, the
* block belongs to triplets[3*i] is the reference to the
* {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
* references to the previous and the next blocks, respectively, in the list
* of blocks belonging to this storage.
*
* Using previous and next in Object triplets is done instead of a
* {@link LinkedList} list to efficiently use memory. With LinkedList the cost
@ -86,9 +87,15 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
}
DatanodeDescriptor getDatanode(int index) {
DatanodeStorageInfo storage = getStorageInfo(index);
return storage == null ? null : storage.getDatanodeDescriptor();
}
DatanodeStorageInfo getStorageInfo(int index) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
return (DatanodeDescriptor)triplets[index*3];
DatanodeStorageInfo storage = (DatanodeStorageInfo)triplets[index*3];
return storage;
}
private BlockInfo getPrevious(int index) {
@ -111,14 +118,10 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
return info;
}
private void setDatanode(int index, DatanodeDescriptor node, BlockInfo previous,
BlockInfo next) {
void setStorageInfo(int index, DatanodeStorageInfo storage) {
assert this.triplets != null : "BlockInfo is not initialized";
int i = index * 3;
assert index >= 0 && i+2 < triplets.length : "Index is out of bound";
triplets[i] = node;
triplets[i+1] = previous;
triplets[i+2] = next;
assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
triplets[index*3] = storage;
}
/**
@ -190,22 +193,24 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
}
/**
* Add data-node this block belongs to.
* Add a {@link DatanodeStorageInfo} location for a block
*/
public boolean addNode(DatanodeDescriptor node) {
if(findDatanode(node) >= 0) // the node is already there
boolean addStorage(DatanodeStorageInfo storage) {
if(findStorageInfo(storage) >= 0) // the node is already there
return false;
// find the last null node
int lastNode = ensureCapacity(1);
setDatanode(lastNode, node, null, null);
setStorageInfo(lastNode, storage);
setNext(lastNode, null);
setPrevious(lastNode, null);
return true;
}
/**
* Remove data-node from the block.
* Remove {@link DatanodeStorageInfo} location for a block
*/
public boolean removeNode(DatanodeDescriptor node) {
int dnIndex = findDatanode(node);
boolean removeStorage(DatanodeStorageInfo storage) {
int dnIndex = findStorageInfo(storage);
if(dnIndex < 0) // the node is not found
return false;
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
@ -213,10 +218,13 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
// find the last not null node
int lastNode = numNodes()-1;
// replace current node triplet by the lastNode one
setDatanode(dnIndex, getDatanode(lastNode), getPrevious(lastNode),
getNext(lastNode));
setStorageInfo(dnIndex, getStorageInfo(lastNode));
setNext(dnIndex, getNext(lastNode));
setPrevious(dnIndex, getPrevious(lastNode));
// set the last triplet to null
setDatanode(lastNode, null, null, null);
setStorageInfo(lastNode, null);
setNext(lastNode, null);
setPrevious(lastNode, null);
return true;
}
@ -236,37 +244,70 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
}
return -1;
}
/**
* Find specified DatanodeStorageInfo.
* @param dn
* @return index or -1 if not found.
*/
int findStorageInfo(DatanodeInfo dn) {
int len = getCapacity();
for(int idx = 0; idx < len; idx++) {
DatanodeStorageInfo cur = getStorageInfo(idx);
if(cur == null)
break;
if(cur.getDatanodeDescriptor() == dn)
return idx;
}
return -1;
}
/**
* Find specified DatanodeStorageInfo.
* @param storageInfo
* @return index or -1 if not found.
*/
int findStorageInfo(DatanodeStorageInfo storageInfo) {
int len = getCapacity();
for(int idx = 0; idx < len; idx++) {
DatanodeStorageInfo cur = getStorageInfo(idx);
if(cur == storageInfo)
return idx;
if(cur == null)
break;
}
return -1;
}
/**
* Insert this block into the head of the list of blocks
* related to the specified DatanodeDescriptor.
* related to the specified DatanodeStorageInfo.
* If the head is null then form a new list.
* @return current block as the new head of the list.
*/
public BlockInfo listInsert(BlockInfo head, DatanodeDescriptor dn) {
int dnIndex = this.findDatanode(dn);
BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
int dnIndex = this.findStorageInfo(storage);
assert dnIndex >= 0 : "Data node is not found: current";
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is already in the list and cannot be inserted.";
this.setPrevious(dnIndex, null);
this.setNext(dnIndex, head);
if(head != null)
head.setPrevious(head.findDatanode(dn), this);
head.setPrevious(head.findStorageInfo(storage), this);
return this;
}
/**
* Remove this block from the list of blocks
* related to the specified DatanodeDescriptor.
* related to the specified DatanodeStorageInfo.
* If this block is the head of the list then return the next block as
* the new head.
* @return the new head of the list or null if the list becomes
* empty after deletion.
* empy after deletion.
*/
public BlockInfo listRemove(BlockInfo head, DatanodeDescriptor dn) {
BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
if(head == null)
return null;
int dnIndex = this.findDatanode(dn);
int dnIndex = this.findStorageInfo(storage);
if(dnIndex < 0) // this block is not on the data-node list
return head;
@ -275,33 +316,20 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
this.setNext(dnIndex, null);
this.setPrevious(dnIndex, null);
if(prev != null)
prev.setNext(prev.findDatanode(dn), next);
prev.setNext(prev.findStorageInfo(storage), next);
if(next != null)
next.setPrevious(next.findDatanode(dn), prev);
next.setPrevious(next.findStorageInfo(storage), prev);
if(this == head) // removing the head
head = next;
return head;
}
/**
* Remove this block from the list of blocks related to the specified
* DatanodeDescriptor. Insert it into the head of the list of blocks.
*
* @return the new head of the list.
*/
public BlockInfo moveBlockToHead(BlockInfo head, DatanodeDescriptor dn,
int curIndex, int headIndex) {
if (head == this) {
return this;
}
BlockInfo next = this.setNext(curIndex, head);
BlockInfo prev = this.setPrevious(curIndex, null);
head.setPrevious(headIndex, this);
prev.setNext(prev.findDatanode(dn), next);
if (next != null)
next.setPrevious(next.findDatanode(dn), prev);
return this;
int listCount(DatanodeStorageInfo storage) {
int count = 0;
for(BlockInfo cur = this; cur != null;
cur = cur.getNext(cur.findStorageInfo(storage)))
count++;
return count;
}
/**

View File

@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@ -68,8 +69,10 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
@ -1034,7 +1037,7 @@ public class BlockManager {
* for logging purposes
*/
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
final DatanodeInfo dn, String reason) throws IOException {
final DatanodeInfo dn, String storageID, String reason) throws IOException {
assert namesystem.hasWriteLock();
final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
if (storedBlock == null) {
@ -1046,11 +1049,11 @@ public class BlockManager {
+ blk + " not found");
return;
}
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn);
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn, storageID);
}
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
DatanodeInfo dn) throws IOException {
DatanodeInfo dn, String storageID) throws IOException {
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
throw new IOException("Cannot mark " + b
@ -1066,7 +1069,7 @@ public class BlockManager {
}
// Add replica to the data-node if it is not already there
node.addBlock(b.stored);
node.addBlock(storageID, b.stored);
// Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason);
@ -1601,10 +1604,11 @@ public class BlockManager {
}
/**
* The given datanode is reporting all its blocks.
* Update the (machine-->blocklist) and (block-->machinelist) maps.
* The given storage is reporting all its blocks.
* Update the (storage-->block list) and (block-->storage list) maps.
*/
public void processReport(final DatanodeID nodeID, final String poolId,
public void processReport(final DatanodeID nodeID,
final DatanodeStorage storage, final String poolId,
final BlockListAsLongs newReport) throws IOException {
namesystem.writeLock();
final long startTime = Time.now(); //after acquiring write lock
@ -1628,9 +1632,9 @@ public class BlockManager {
if (node.numBlocks() == 0) {
// The first block report can be processed a lot more efficiently than
// ordinary block reports. This shortens restart times.
processFirstBlockReport(node, newReport);
processFirstBlockReport(node, storage.getStorageID(), newReport);
} else {
processReport(node, newReport);
processReport(node, storage, newReport);
}
// Now that we have an up-to-date block report, we know that any
@ -1691,28 +1695,31 @@ public class BlockManager {
}
private void processReport(final DatanodeDescriptor node,
final DatanodeStorage storage,
final BlockListAsLongs report) throws IOException {
// Normal case:
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
//
Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
Collection<Block> toRemove = new LinkedList<Block>();
Collection<Block> toRemove = new TreeSet<Block>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
reportDiff(node, storage, report,
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
// Process the blocks on each queue
for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
addStoredBlockUnderConstruction(b.storedBlock, node,
storage.getStorageID(), b.reportedState);
}
for (Block b : toRemove) {
removeStoredBlock(b, node);
}
int numBlocksLogged = 0;
for (BlockInfo b : toAdd) {
addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog);
addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
@ -1726,7 +1733,7 @@ public class BlockManager {
addToInvalidates(b, node);
}
for (BlockToMarkCorrupt b : toCorrupt) {
markBlockAsCorrupt(b, node);
markBlockAsCorrupt(b, node, storage.getStorageID());
}
}
@ -1742,6 +1749,7 @@ public class BlockManager {
* @throws IOException
*/
private void processFirstBlockReport(final DatanodeDescriptor node,
final String storageID,
final BlockListAsLongs report) throws IOException {
if (report == null) return;
assert (namesystem.hasWriteLock());
@ -1754,7 +1762,7 @@ public class BlockManager {
if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(iblk)) {
queueReportedBlock(node, iblk, reportedState,
queueReportedBlock(node, storageID, iblk, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
continue;
}
@ -1771,10 +1779,10 @@ public class BlockManager {
if (shouldPostponeBlocksFromFuture) {
// In the Standby, we may receive a block report for a file that we
// just have an out-of-date gen-stamp or state for, for example.
queueReportedBlock(node, iblk, reportedState,
queueReportedBlock(node, storageID, iblk, reportedState,
QUEUE_REASON_CORRUPT_STATE);
} else {
markBlockAsCorrupt(c, node);
markBlockAsCorrupt(c, node, storageID);
}
continue;
}
@ -1787,25 +1795,26 @@ public class BlockManager {
}
//add replica if appropriate
if (reportedState == ReplicaState.FINALIZED) {
addStoredBlockImmediate(storedBlock, node);
addStoredBlockImmediate(storedBlock, node, storageID);
}
}
}
private void reportDiff(DatanodeDescriptor dn,
private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage,
BlockListAsLongs newReport,
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
// place a delimiter in the list which separates blocks
// that have been reported from those that have not
BlockInfo delimiter = new BlockInfo(new Block(), 1);
boolean added = dn.addBlock(delimiter);
assert added : "Delimiting block cannot be present in the node";
int headIndex = 0; //currently the delimiter is in the head of the list
int curIndex;
dn.updateStorage(storage);
// add all blocks to remove list
for(Iterator<BlockInfo> it = dn.getBlockIterator(storage.getStorageID());
it.hasNext(); ) {
toRemove.add(it.next());
}
if (newReport == null)
newReport = new BlockListAsLongs();
@ -1814,20 +1823,10 @@ public class BlockManager {
while(itBR.hasNext()) {
Block iblk = itBR.next();
ReplicaState iState = itBR.getCurrentReplicaState();
BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
toAdd, toInvalidate, toCorrupt, toUC);
// move block to the head of the list
if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
headIndex = dn.moveBlockToHead(storedBlock, curIndex, headIndex);
}
BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
toRemove.remove(storedBlock);
}
// collect blocks that have not been reported
// all of them are next to the delimiter
Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
delimiter.getNext(0), dn);
while(it.hasNext())
toRemove.add(it.next());
dn.removeBlock(delimiter);
}
/**
@ -1862,6 +1861,7 @@ public class BlockManager {
* Otherwise, null.
*/
private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
final String storageID,
final Block block, final ReplicaState reportedState,
final Collection<BlockInfo> toAdd,
final Collection<Block> toInvalidate,
@ -1876,7 +1876,7 @@ public class BlockManager {
if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(block)) {
queueReportedBlock(dn, block, reportedState,
queueReportedBlock(dn, storageID, block, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
return null;
}
@ -1911,7 +1911,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
// If the block is an out-of-date generation stamp or state,
// but we're the standby, we shouldn't treat it as corrupt,
// but instead just queue it for later processing.
queueReportedBlock(dn, storedBlock, reportedState,
queueReportedBlock(dn, storageID, storedBlock, reportedState,
QUEUE_REASON_CORRUPT_STATE);
} else {
toCorrupt.add(c);
@ -1938,7 +1938,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
* standby node. @see PendingDataNodeMessages.
* @param reason a textual reason to report in the debug logs
*/
private void queueReportedBlock(DatanodeDescriptor dn, Block block,
private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
ReplicaState reportedState, String reason) {
assert shouldPostponeBlocksFromFuture;
@ -1948,7 +1948,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
" from datanode " + dn + " for later processing " +
"because " + reason + ".");
}
pendingDNMessages.enqueueReportedBlock(dn, block, reportedState);
pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState);
}
/**
@ -1971,8 +1971,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
if (LOG.isDebugEnabled()) {
LOG.debug("Processing previouly queued message " + rbi);
}
processAndHandleReportedBlock(
rbi.getNode(), rbi.getBlock(), rbi.getReportedState(), null);
processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(),
rbi.getBlock(), rbi.getReportedState(), null);
}
}
@ -2090,18 +2090,18 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
void addStoredBlockUnderConstruction(
BlockInfoUnderConstruction block,
DatanodeDescriptor node,
DatanodeDescriptor node, String storageID,
ReplicaState reportedState)
throws IOException {
block.addReplicaIfNotPresent(node, block, reportedState);
if (reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
addStoredBlock(block, node, null, true);
addStoredBlock(block, node, storageID, null, true);
}
}
/**
* Faster version of
* {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, DatanodeDescriptor, boolean)}
* {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)}
* , intended for use with initial block report at startup. If not in startup
* safe mode, will call standard addStoredBlock(). Assumes this method is
* called "immediately" so there is no need to refresh the storedBlock from
@ -2112,17 +2112,17 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
* @throws IOException
*/
private void addStoredBlockImmediate(BlockInfo storedBlock,
DatanodeDescriptor node)
DatanodeDescriptor node, String storageID)
throws IOException {
assert (storedBlock != null && namesystem.hasWriteLock());
if (!namesystem.isInStartupSafeMode()
|| namesystem.isPopulatingReplQueues()) {
addStoredBlock(storedBlock, node, null, false);
addStoredBlock(storedBlock, node, storageID, null, false);
return;
}
// just add it
node.addBlock(storedBlock);
node.addBlock(storageID, storedBlock);
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
@ -2145,6 +2145,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
*/
private Block addStoredBlock(final BlockInfo block,
DatanodeDescriptor node,
String storageID,
DatanodeDescriptor delNodeHint,
boolean logEveryBlock)
throws IOException {
@ -2170,7 +2171,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
assert bc != null : "Block must belong to a file";
// add block to the datanode
boolean added = node.addBlock(storedBlock);
boolean added = node.addBlock(storageID, storedBlock);
int curReplicaDelta;
if (added) {
@ -2614,7 +2615,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
* The given node is reporting that it received a certain block.
*/
@VisibleForTesting
void addBlock(DatanodeDescriptor node, Block block, String delHint)
void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint)
throws IOException {
// decrement number of blocks scheduled to this datanode.
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
@ -2635,11 +2636,12 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
// Modify the blocks->datanode map and node's map.
//
pendingReplications.decrement(block, node);
processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
delHintNode);
}
private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block,
private void processAndHandleReportedBlock(DatanodeDescriptor node,
String storageID, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException {
// blockReceived reports a finalized block
@ -2647,7 +2649,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
processReportedBlock(node, block, reportedState,
processReportedBlock(node, storageID, block, reportedState,
toAdd, toInvalidate, toCorrupt, toUC);
// the block is only in one of the to-do lists
// if it is in none then data-node already has it
@ -2655,11 +2657,11 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
: "The block should be only in one of the lists.";
for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
addStoredBlockUnderConstruction(b.storedBlock, node, storageID, b.reportedState);
}
long numBlocksLogged = 0;
for (BlockInfo b : toAdd) {
addStoredBlock(b, node, delHintNode, numBlocksLogged < maxNumBlocksToLog);
addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
@ -2673,7 +2675,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
addToInvalidates(b, node);
}
for (BlockToMarkCorrupt b : toCorrupt) {
markBlockAsCorrupt(b, node);
markBlockAsCorrupt(b, node, storageID);
}
}
@ -2685,7 +2687,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
* This method must be called with FSNamesystem lock held.
*/
public void processIncrementalBlockReport(final DatanodeID nodeID,
final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
final String poolId, final StorageReceivedDeletedBlocks srdb)
throws IOException {
assert namesystem.hasWriteLock();
int received = 0;
@ -2701,19 +2703,19 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
"Got incremental block report from unregistered or dead node");
}
for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
switch (rdbi.getStatus()) {
case DELETED_BLOCK:
removeStoredBlock(rdbi.getBlock(), node);
deleted++;
break;
case RECEIVED_BLOCK:
addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
addBlock(node, srdb.getStorageID(), rdbi.getBlock(), rdbi.getDelHints());
received++;
break;
case RECEIVING_BLOCK:
receiving++;
processAndHandleReportedBlock(node, rdbi.getBlock(),
processAndHandleReportedBlock(node, srdb.getStorageID(), rdbi.getBlock(),
ReplicaState.RBW, null);
break;
default:

View File

@ -18,15 +18,20 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.util.Time;
@ -93,8 +98,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
private volatile BlockInfo blockList = null;
private int numBlocks = 0;
private final Map<String, DatanodeStorageInfo> storageMap =
new HashMap<String, DatanodeStorageInfo>();
// isAlive == heartbeats.contains(this)
// This is an optimization, because contains takes O(n) time on Arraylist
public boolean isAlive = false;
@ -217,39 +223,47 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
/**
* Add datanode to the block.
* Add block to the head of the list of blocks belonging to the data-node.
* Add data-node to the block. Add block to the head of the list of blocks
* belonging to the data-node.
*/
public boolean addBlock(BlockInfo b) {
if(!b.addNode(this))
return false;
// add to the head of the data-node list
blockList = b.listInsert(blockList, this);
numBlocks++;
return true;
}
/**
* Remove block from the list of blocks belonging to the data-node.
* Remove datanode from the block.
*/
public boolean removeBlock(BlockInfo b) {
blockList = b.listRemove(blockList, this);
if ( b.removeNode(this) ) {
numBlocks--;
return true;
} else {
return false;
public boolean addBlock(String storageID, BlockInfo b) {
DatanodeStorageInfo s = getStorageInfo(storageID);
if (s != null) {
return s.addBlock(b);
}
return false;
}
DatanodeStorageInfo getStorageInfo(String storageID) {
return storageMap.get(storageID);
}
public Collection<DatanodeStorageInfo> getStorageInfos() {
return storageMap.values();
}
/**
* Move block to the head of the list of blocks belonging to the data-node.
* @return the index of the head of the blockList
* Remove block from the list of blocks belonging to the data-node. Remove
* data-node from the block.
*/
int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
return curIndex;
boolean removeBlock(BlockInfo b) {
int index = b.findStorageInfo(this);
DatanodeStorageInfo s = b.getStorageInfo(index);
if (s != null) {
return s.removeBlock(b);
}
return false;
}
/**
* Remove block from the list of blocks belonging to the data-node. Remove
* data-node from the block.
*/
boolean removeBlock(String storageID, BlockInfo b) {
DatanodeStorageInfo s = getStorageInfo(storageID);
if (s != null) {
return s.removeBlock(b);
}
return false;
}
/**
@ -257,7 +271,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
* @return the head of the blockList
*/
protected BlockInfo getHead(){
return blockList;
return getBlockIterator().next();
}
/**
@ -268,9 +282,12 @@ public class DatanodeDescriptor extends DatanodeInfo {
* @return the new block
*/
public BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) {
boolean done = removeBlock(oldBlock);
int index = oldBlock.findStorageInfo(this);
DatanodeStorageInfo s = oldBlock.getStorageInfo(index);
boolean done = s.removeBlock(oldBlock);
assert done : "Old block should belong to the data-node when replacing";
done = addBlock(newBlock);
done = s.addBlock(newBlock);
assert done : "New block should not belong to the data-node when replacing";
return newBlock;
}
@ -281,7 +298,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
setBlockPoolUsed(0);
setDfsUsed(0);
setXceiverCount(0);
this.blockList = null;
this.invalidateBlocks.clear();
this.volumeFailures = 0;
}
@ -295,7 +311,12 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
public int numBlocks() {
return numBlocks;
// TODO: synchronization
int blocks = 0;
for (DatanodeStorageInfo entry : storageMap.values()) {
blocks += entry.numBlocks();
}
return blocks;
}
/**
@ -314,38 +335,52 @@ public class DatanodeDescriptor extends DatanodeInfo {
rollBlocksScheduled(getLastUpdate());
}
/**
* Iterates over the list of blocks belonging to the datanode.
*/
public static class BlockIterator implements Iterator<BlockInfo> {
private BlockInfo current;
private DatanodeDescriptor node;
private static class BlockIterator implements Iterator<BlockInfo> {
private final int maxIndex;
private int index = 0;
private List<Iterator<BlockInfo>> iterators = new ArrayList<Iterator<BlockInfo>>();
BlockIterator(BlockInfo head, DatanodeDescriptor dn) {
this.current = head;
this.node = dn;
private BlockIterator(final Iterable<DatanodeStorageInfo> storages) {
for (DatanodeStorageInfo e : storages) {
iterators.add(e.getBlockIterator());
}
maxIndex = iterators.size() - 1;
}
private BlockIterator(final DatanodeStorageInfo storage) {
iterators.add(storage.getBlockIterator());
maxIndex = iterators.size() - 1;
}
@Override
public boolean hasNext() {
return current != null;
update();
return iterators.get(index).hasNext();
}
@Override
public BlockInfo next() {
BlockInfo res = current;
current = current.getNext(current.findDatanode(node));
return res;
update();
return iterators.get(index).next();
}
@Override
public void remove() {
throw new UnsupportedOperationException("Sorry. can't remove.");
public void remove() {
throw new UnsupportedOperationException("Remove unsupported.");
}
private void update() {
while(index < maxIndex && !iterators.get(index).hasNext()) {
index++;
}
}
}
public Iterator<BlockInfo> getBlockIterator() {
return new BlockIterator(this.blockList, this);
Iterator<BlockInfo> getBlockIterator() {
return new BlockIterator(storageMap.values());
}
Iterator<BlockInfo> getBlockIterator(final String storageID) {
return new BlockIterator(storageMap.get(storageID));
}
/**
@ -601,4 +636,15 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
return sb.toString();
}
DatanodeStorageInfo updateStorage(DatanodeStorage s) {
DatanodeStorageInfo storage = getStorageInfo(s.getStorageID());
if (storage == null) {
storage = new DatanodeStorageInfo(this, s);
storageMap.put(s.getStorageID(), storage);
} else {
storage.setState(s.getState());
}
return storage;
}
}

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Iterator;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
/**
* A Datanode has one or more storages. A storage in the Datanode is represented
* by this class.
*/
public class DatanodeStorageInfo {
/**
* Iterates over the list of blocks belonging to the data-node.
*/
static class BlockIterator implements Iterator<BlockInfo> {
private BlockInfo current;
private DatanodeStorageInfo node;
BlockIterator(BlockInfo head, DatanodeStorageInfo dn) {
this.current = head;
this.node = dn;
}
public boolean hasNext() {
return current != null;
}
public BlockInfo next() {
BlockInfo res = current;
current = current.getNext(current.findStorageInfo(node));
return res;
}
public void remove() {
throw new UnsupportedOperationException("Sorry. can't remove.");
}
}
private final DatanodeDescriptor dn;
private final String storageID;
private final StorageType storageType;
private State state;
private long capacity;
private long dfsUsed;
private long remaining;
private volatile BlockInfo blockList = null;
DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) {
this.dn = dn;
this.storageID = s.getStorageID();
this.storageType = s.getStorageType();
this.state = s.getState();
}
public void setUtilization(long capacity, long dfsUsed, long remaining) {
this.capacity = capacity;
this.dfsUsed = dfsUsed;
this.remaining = remaining;
}
public void setState(State s) {
this.state = s;
// TODO: if goes to failed state cleanup the block list
}
public State getState() {
return this.state;
}
public String getStorageID() {
return storageID;
}
public long getCapacity() {
return capacity;
}
public long getDfsUsed() {
return dfsUsed;
}
public long getRemaining() {
return remaining;
}
public boolean addBlock(BlockInfo b) {
if(!b.addStorage(this))
return false;
// add to the head of the data-node list
blockList = b.listInsert(blockList, this);
return true;
}
public boolean removeBlock(BlockInfo b) {
blockList = b.listRemove(blockList, this);
return b.removeStorage(this);
}
public int numBlocks() {
return blockList == null ? 0 : blockList.listCount(this);
}
Iterator<BlockInfo> getBlockIterator() {
return new BlockIterator(this.blockList, this);
}
public void updateState(StorageReport r) {
capacity = r.getCapacity();
dfsUsed = r.getDfsUsed();
remaining = r.getRemaining();
}
public DatanodeDescriptor getDatanodeDescriptor() {
return dn;
}
@Override
public String toString() {
return "[" + storageType + "]" + storageID + ":" + state;
}
}

View File

@ -42,11 +42,13 @@ class PendingDataNodeMessages {
static class ReportedBlockInfo {
private final Block block;
private final DatanodeDescriptor dn;
private final String storageID;
private final ReplicaState reportedState;
ReportedBlockInfo(DatanodeDescriptor dn, Block block,
ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block,
ReplicaState reportedState) {
this.dn = dn;
this.storageID = storageID;
this.block = block;
this.reportedState = reportedState;
}
@ -59,6 +61,10 @@ class PendingDataNodeMessages {
return dn;
}
String getStorageID() {
return storageID;
}
ReplicaState getReportedState() {
return reportedState;
}
@ -70,11 +76,11 @@ class PendingDataNodeMessages {
}
}
void enqueueReportedBlock(DatanodeDescriptor dn, Block block,
void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
ReplicaState reportedState) {
block = new Block(block);
getBlockQueue(block).add(
new ReportedBlockInfo(dn, block, reportedState));
new ReportedBlockInfo(dn, storageID, block, reportedState));
count++;
}

View File

@ -162,7 +162,13 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.*;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
import org.apache.hadoop.hdfs.server.blockmanagement.OutOfV1GenerationStampsException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
@ -174,14 +180,7 @@ import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
@ -193,6 +192,12 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottab
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -201,7 +206,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RetryCache;
@ -3785,7 +3790,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// Otherwise fsck will report these blocks as MISSING, especially if the
// blocksReceived from Datanodes take a long time to arrive.
for (int i = 0; i < descriptors.length; i++) {
descriptors[i].addBlock(storedBlock);
descriptors[i].addBlock(newtargetstorages[i], storedBlock);
}
}
// add pipeline locations into the INodeUnderConstruction
@ -5088,11 +5093,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
public void processIncrementalBlockReport(final DatanodeID nodeID,
final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
final String poolId, final StorageReceivedDeletedBlocks srdb)
throws IOException {
writeLock();
try {
blockManager.processIncrementalBlockReport(nodeID, poolId, blockInfos);
blockManager.processIncrementalBlockReport(nodeID, poolId, srdb);
} finally {
writeUnlock();
}
@ -5578,8 +5583,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
ExtendedBlock blk = blocks[i].getBlock();
DatanodeInfo[] nodes = blocks[i].getLocations();
for (int j = 0; j < nodes.length; j++) {
DatanodeInfo dn = nodes[j];
blockManager.findAndMarkBlockAsCorrupt(blk, dn,
//TODO: add "storageID to LocatedBlock
blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j], "STORAGE_ID",
"client machine reported it");
}
}

View File

@ -42,8 +42,8 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
@ -59,21 +59,21 @@ import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
@ -89,9 +89,10 @@ import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
@ -943,14 +944,18 @@ class NameNodeRpcServer implements NamenodeProtocols {
public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
String poolId, StorageBlockReport[] reports) throws IOException {
verifyRequest(nodeReg);
BlockListAsLongs blist = new BlockListAsLongs(reports[0].getBlocks());
if(blockStateChangeLog.isDebugEnabled()) {
blockStateChangeLog.debug("*BLOCK* NameNode.blockReport: "
+ "from " + nodeReg + " " + blist.getNumberOfBlocks()
+ " blocks");
+ "from " + nodeReg + ", reports.length=" + reports.length);
}
final BlockManager bm = namesystem.getBlockManager();
for(StorageBlockReport r : reports) {
final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
}
namesystem.getBlockManager().processReport(nodeReg, poolId, blist);
DatanodeDescriptor datanode = bm.getDatanodeManager().getDatanode(nodeReg);
datanode.receivedBlockReport();
if (nn.getFSImage().isUpgradeFinalized() && !nn.isStandbyState())
return new FinalizeCommand(poolId);
return null;
@ -965,8 +970,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
+"from "+nodeReg+" "+receivedAndDeletedBlocks.length
+" blocks.");
}
namesystem.processIncrementalBlockReport(
nodeReg, poolId, receivedAndDeletedBlocks[0].getBlocks());
for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
namesystem.processIncrementalBlockReport(nodeReg, poolId, r);
}
}
@Override // DatanodeProtocol

View File

@ -157,7 +157,7 @@ public class TestFileCorruption {
ns.writeLock();
try {
cluster.getNamesystem().getBlockManager().findAndMarkBlockAsCorrupt(
blk, new DatanodeInfo(dnR), "TEST");
blk, new DatanodeInfo(dnR), "TEST", "STORAGE_ID");
} finally {
ns.writeUnlock();
}

View File

@ -1,124 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.junit.Test;
/**
* This class provides tests for BlockInfo class, which is used in BlocksMap.
* The test covers BlockList.listMoveToHead, used for faster block report
* processing in DatanodeDescriptor.reportDiff.
*/
public class TestBlockInfo {
private static final Log LOG = LogFactory
.getLog("org.apache.hadoop.hdfs.TestBlockInfo");
@Test
public void testBlockListMoveToHead() throws Exception {
LOG.info("BlockInfo moveToHead tests...");
final int MAX_BLOCKS = 10;
DatanodeDescriptor dd = DFSTestUtil.getLocalDatanodeDescriptor();
ArrayList<Block> blockList = new ArrayList<Block>(MAX_BLOCKS);
ArrayList<BlockInfo> blockInfoList = new ArrayList<BlockInfo>();
int headIndex;
int curIndex;
LOG.info("Building block list...");
for (int i = 0; i < MAX_BLOCKS; i++) {
blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
blockInfoList.add(new BlockInfo(blockList.get(i), 3));
dd.addBlock(blockInfoList.get(i));
// index of the datanode should be 0
assertEquals("Find datanode should be 0", 0, blockInfoList.get(i)
.findDatanode(dd));
}
// list length should be equal to the number of blocks we inserted
LOG.info("Checking list length...");
assertEquals("Length should be MAX_BLOCK", MAX_BLOCKS, dd.numBlocks());
Iterator<BlockInfo> it = dd.getBlockIterator();
int len = 0;
while (it.hasNext()) {
it.next();
len++;
}
assertEquals("There should be MAX_BLOCK blockInfo's", MAX_BLOCKS, len);
headIndex = dd.getHead().findDatanode(dd);
LOG.info("Moving each block to the head of the list...");
for (int i = 0; i < MAX_BLOCKS; i++) {
curIndex = blockInfoList.get(i).findDatanode(dd);
headIndex = dd.moveBlockToHead(blockInfoList.get(i), curIndex, headIndex);
// the moved element must be at the head of the list
assertEquals("Block should be at the head of the list now.",
blockInfoList.get(i), dd.getHead());
}
// move head of the list to the head - this should not change the list
LOG.info("Moving head to the head...");
BlockInfo temp = dd.getHead();
curIndex = 0;
headIndex = 0;
dd.moveBlockToHead(temp, curIndex, headIndex);
assertEquals(
"Moving head to the head of the list shopuld not change the list",
temp, dd.getHead());
// check all elements of the list against the original blockInfoList
LOG.info("Checking elements of the list...");
temp = dd.getHead();
assertNotNull("Head should not be null", temp);
int c = MAX_BLOCKS - 1;
while (temp != null) {
assertEquals("Expected element is not on the list",
blockInfoList.get(c--), temp);
temp = temp.getNext(0);
}
LOG.info("Moving random blocks to the head of the list...");
headIndex = dd.getHead().findDatanode(dd);
Random rand = new Random();
for (int i = 0; i < MAX_BLOCKS; i++) {
int j = rand.nextInt(MAX_BLOCKS);
curIndex = blockInfoList.get(j).findDatanode(dd);
headIndex = dd.moveBlockToHead(blockInfoList.get(j), curIndex, headIndex);
// the moved element must be at the head of the list
assertEquals("Block should be at the head of the list now.",
blockInfoList.get(j), dd.getHead());
}
}
}

View File

@ -124,7 +124,7 @@ public class TestBlockManager {
private void doBasicTest(int testIndex) {
List<DatanodeDescriptor> origNodes = getNodes(0, 1);
BlockInfo blockInfo = addBlockOnNodes((long)testIndex, origNodes);
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
assertEquals(2, pipeline.length);
@ -307,7 +307,7 @@ public class TestBlockManager {
private void doTestSufficientlyReplBlocksUsesNewRack(int testIndex) {
// Originally on only nodes in rack A.
List<DatanodeDescriptor> origNodes = rackA;
BlockInfo blockInfo = addBlockOnNodes((long)testIndex, origNodes);
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
DatanodeDescriptor pipeline[] = scheduleSingleReplication(blockInfo);
assertEquals(2, pipeline.length); // single new copy
@ -340,7 +340,7 @@ public class TestBlockManager {
List<DatanodeDescriptor> origNodes)
throws Exception {
assertEquals(0, bm.numOfUnderReplicatedBlocks());
addBlockOnNodes((long)testIndex, origNodes);
addBlockOnNodes(testIndex, origNodes);
bm.processMisReplicatedBlocks();
assertEquals(0, bm.numOfUnderReplicatedBlocks());
}
@ -353,7 +353,7 @@ public class TestBlockManager {
private void fulfillPipeline(BlockInfo blockInfo,
DatanodeDescriptor[] pipeline) throws IOException {
for (int i = 1; i < pipeline.length; i++) {
bm.addBlock(pipeline[i], blockInfo, null);
bm.addBlock(pipeline[i], "STORAGE_ID", blockInfo, null);
}
}
@ -362,7 +362,9 @@ public class TestBlockManager {
BlockInfo blockInfo = new BlockInfo(block, 3);
for (DatanodeDescriptor dn : nodes) {
blockInfo.addNode(dn);
for (DatanodeStorageInfo storage : dn.getStorageInfos()) {
blockInfo.addStorage(storage);
}
}
return blockInfo;
}
@ -508,12 +510,12 @@ public class TestBlockManager {
assertTrue(node.isFirstBlockReport());
// send block report, should be processed
reset(node);
bm.processReport(node, "pool", new BlockListAsLongs(null, null));
bm.processReport(node, null, "pool", new BlockListAsLongs(null, null));
verify(node).receivedBlockReport();
assertFalse(node.isFirstBlockReport());
// send block report again, should NOT be processed
reset(node);
bm.processReport(node, "pool", new BlockListAsLongs(null, null));
bm.processReport(node, null, "pool", new BlockListAsLongs(null, null));
verify(node, never()).receivedBlockReport();
assertFalse(node.isFirstBlockReport());
@ -525,7 +527,7 @@ public class TestBlockManager {
assertTrue(node.isFirstBlockReport()); // ready for report again
// send block report, should be processed after restart
reset(node);
bm.processReport(node, "pool", new BlockListAsLongs(null, null));
bm.processReport(node, null, "pool", new BlockListAsLongs(null, null));
verify(node).receivedBlockReport();
assertFalse(node.isFirstBlockReport());
}
@ -550,7 +552,7 @@ public class TestBlockManager {
// send block report while pretending to already have blocks
reset(node);
doReturn(1).when(node).numBlocks();
bm.processReport(node, "pool", new BlockListAsLongs(null, null));
bm.processReport(node, null, "pool", new BlockListAsLongs(null, null));
verify(node).receivedBlockReport();
assertFalse(node.isFirstBlockReport());
}

View File

@ -59,17 +59,18 @@ public class TestDatanodeDescriptor {
assertEquals(0, dd.numBlocks());
BlockInfo blk = new BlockInfo(new Block(1L), 1);
BlockInfo blk1 = new BlockInfo(new Block(2L), 2);
final String storageID = "STORAGE_ID";
// add first block
assertTrue(dd.addBlock(blk));
assertTrue(dd.addBlock(storageID, blk));
assertEquals(1, dd.numBlocks());
// remove a non-existent block
assertFalse(dd.removeBlock(blk1));
assertEquals(1, dd.numBlocks());
// add an existent block
assertFalse(dd.addBlock(blk));
assertFalse(dd.addBlock(storageID, blk));
assertEquals(1, dd.numBlocks());
// add second block
assertTrue(dd.addBlock(blk1));
assertTrue(dd.addBlock(storageID, blk1));
assertEquals(2, dd.numBlocks());
// remove first block
assertTrue(dd.removeBlock(blk));

View File

@ -43,8 +43,8 @@ public class TestPendingDataNodeMessages {
@Test
public void testQueues() {
DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor();
msgs.enqueueReportedBlock(fakeDN, block1Gs1, ReplicaState.FINALIZED);
msgs.enqueueReportedBlock(fakeDN, block1Gs2, ReplicaState.FINALIZED);
msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs1, ReplicaState.FINALIZED);
msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs2, ReplicaState.FINALIZED);
assertEquals(2, msgs.count());

View File

@ -291,9 +291,9 @@ public class TestPendingReplication {
cluster.getNamesystem().writeLock();
try {
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
"TEST");
"STORAGE_ID", "TEST");
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[1],
"TEST");
"STORAGE_ID", "TEST");
} finally {
cluster.getNamesystem().writeUnlock();
}

View File

@ -1096,7 +1096,7 @@ public class TestReplicationPolicy {
// Adding this block will increase its current replication, and that will
// remove it from the queue.
bm.addStoredBlockUnderConstruction(info,
TestReplicationPolicy.dataNodes[0], ReplicaState.FINALIZED);
TestReplicationPolicy.dataNodes[0], "STORAGE", ReplicaState.FINALIZED);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.

View File

@ -235,7 +235,7 @@ public class TestNameNodeMetrics {
cluster.getNamesystem().writeLock();
try {
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
"TEST");
"STORAGE_ID", "TEST");
} finally {
cluster.getNamesystem().writeUnlock();
}
@ -286,7 +286,7 @@ public class TestNameNodeMetrics {
cluster.getNamesystem().writeLock();
try {
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
"TEST");
"STORAGE_ID", "TEST");
} finally {
cluster.getNamesystem().writeUnlock();
}