svn merge -c 1161992 from trunk for HDFS-395.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1229881 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-01-11 03:36:36 +00:00
parent eaa9af5abf
commit 8eed021a88
13 changed files with 297 additions and 116 deletions

View File

@ -2,6 +2,11 @@ Hadoop HDFS Change Log
Release 0.23-PB - Unreleased Release 0.23-PB - Unreleased
NEW FEATURES
HDFS-395. DFS Scalability: Incremental block reports. (Tomasz Nykiel
via hairong)
IMPROVEMENTS IMPROVEMENTS
HDFS-2018. Move all journal stream management code into one place. HDFS-2018. Move all journal stream management code into one place.

View File

@ -59,10 +59,12 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
@ -2006,7 +2008,7 @@ public class BlockManager {
* Modify (block-->datanode) map. Possibly generate replication tasks, if the * Modify (block-->datanode) map. Possibly generate replication tasks, if the
* removed block is still valid. * removed block is still valid.
*/ */
private void removeStoredBlock(Block block, DatanodeDescriptor node) { public void removeStoredBlock(Block block, DatanodeDescriptor node) {
if(NameNode.stateChangeLog.isDebugEnabled()) { if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: " NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
+ block + " from " + node.getName()); + block + " from " + node.getName());
@ -2125,27 +2127,48 @@ public class BlockManager {
} }
} }
/** The given node is reporting that it received a certain block. */ /** The given node is reporting that it received/deleted certain blocks. */
public void blockReceived(final DatanodeID nodeID, final String poolId, public void blockReceivedAndDeleted(final DatanodeID nodeID,
final Block block, final String delHint) throws IOException { final String poolId,
final ReceivedDeletedBlockInfo receivedAndDeletedBlocks[]
) throws IOException {
namesystem.writeLock(); namesystem.writeLock();
int received = 0;
int deleted = 0;
try { try {
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive) { if (node == null || !node.isAlive) {
final String s = block + " is received from dead or unregistered node " NameNode.stateChangeLog
+ nodeID.getName(); .warn("BLOCK* blockReceivedDeleted"
NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + s); + " is received from dead or unregistered node "
throw new IOException(s); + nodeID.getName());
throw new IOException(
"Got blockReceivedDeleted message from unregistered or dead node");
} }
for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
if (receivedAndDeletedBlocks[i].isDeletedBlock()) {
removeStoredBlock(
receivedAndDeletedBlocks[i].getBlock(), node);
deleted++;
} else {
addBlock(node, receivedAndDeletedBlocks[i].getBlock(),
receivedAndDeletedBlocks[i].getDelHints());
received++;
}
if (NameNode.stateChangeLog.isDebugEnabled()) { if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* blockReceived: " + block NameNode.stateChangeLog.debug("BLOCK* block"
+ (receivedAndDeletedBlocks[i].isDeletedBlock() ? "Deleted"
: "Received") + ": " + receivedAndDeletedBlocks[i].getBlock()
+ " is received from " + nodeID.getName()); + " is received from " + nodeID.getName());
} }
}
addBlock(node, block, delHint);
} finally { } finally {
namesystem.writeUnlock(); namesystem.writeUnlock();
NameNode.stateChangeLog
.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from "
+ nodeID.getName() + " received: " + received + ", "
+ " deleted: " + deleted);
} }
} }
@ -2320,6 +2343,7 @@ public class BlockManager {
} }
public void removeBlock(Block block) { public void removeBlock(Block block) {
block.setNumBytes(BlockCommand.NO_ACK);
addToInvalidates(block); addToInvalidates(block);
corruptReplicas.removeFromCorruptReplicasMap(block); corruptReplicas.removeFromCorruptReplicasMap(block);
blocksMap.removeBlock(block); blocksMap.removeBlock(block);

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
@ -87,6 +88,7 @@ class BPOfferService implements Runnable {
DatanodeRegistration bpRegistration; DatanodeRegistration bpRegistration;
long lastBlockReport = 0; long lastBlockReport = 0;
long lastDeletedReport = 0;
boolean resetBlockReportTime = true; boolean resetBlockReportTime = true;
@ -94,8 +96,9 @@ class BPOfferService implements Runnable {
DatanodeProtocol bpNamenode; DatanodeProtocol bpNamenode;
private long lastHeartbeat = 0; private long lastHeartbeat = 0;
private volatile boolean initialized = false; private volatile boolean initialized = false;
private final LinkedList<Block> receivedBlockList = new LinkedList<Block>(); private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList
private final LinkedList<String> delHints = new LinkedList<String>(); = new LinkedList<ReceivedDeletedBlockInfo>();
private volatile int pendingReceivedRequests = 0;
private volatile boolean shouldServiceRun = true; private volatile boolean shouldServiceRun = true;
UpgradeManagerDatanode upgradeManager = null; UpgradeManagerDatanode upgradeManager = null;
private final DataNode dn; private final DataNode dn;
@ -271,39 +274,30 @@ class BPOfferService implements Runnable {
* Report received blocks and delete hints to the Namenode * Report received blocks and delete hints to the Namenode
* @throws IOException * @throws IOException
*/ */
private void reportReceivedBlocks() throws IOException { private void reportReceivedDeletedBlocks() throws IOException {
// check if there are newly received blocks // check if there are newly received blocks
Block [] blockArray=null; ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
String [] delHintArray=null; int currentReceivedRequestsCounter;
synchronized(receivedBlockList) { synchronized (receivedAndDeletedBlockList) {
synchronized(delHints){ currentReceivedRequestsCounter = pendingReceivedRequests;
int numBlocks = receivedBlockList.size(); int numBlocks = receivedAndDeletedBlockList.size();
if (numBlocks > 0) { if (numBlocks > 0) {
if(numBlocks!=delHints.size()) {
LOG.warn("Panic: receiveBlockList and delHints are not of " +
"the same length" );
}
// //
// Send newly-received blockids to namenode // Send newly-received and deleted blockids to namenode
// //
blockArray = receivedBlockList.toArray(new Block[numBlocks]); receivedAndDeletedBlockArray = receivedAndDeletedBlockList
delHintArray = delHints.toArray(new String[numBlocks]); .toArray(new ReceivedDeletedBlockInfo[numBlocks]);
} }
} }
if (receivedAndDeletedBlockArray != null) {
bpNamenode.blockReceivedAndDeleted(bpRegistration, getBlockPoolId(),
receivedAndDeletedBlockArray);
synchronized (receivedAndDeletedBlockList) {
for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
} }
if (blockArray != null) { pendingReceivedRequests -= currentReceivedRequestsCounter;
if(delHintArray == null || delHintArray.length != blockArray.length ) {
LOG.warn("Panic: block array & delHintArray are not the same" );
}
bpNamenode.blockReceived(bpRegistration, getBlockPoolId(), blockArray,
delHintArray);
synchronized(receivedBlockList) {
synchronized(delHints){
for(int i=0; i<blockArray.length; i++) {
receivedBlockList.remove(blockArray[i]);
delHints.remove(delHintArray[i]);
}
}
} }
} }
} }
@ -315,8 +309,8 @@ class BPOfferService implements Runnable {
*/ */
void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) { void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
if (block == null || delHint == null) { if (block == null || delHint == null) {
throw new IllegalArgumentException( throw new IllegalArgumentException(block == null ? "Block is null"
block==null?"Block is null":"delHint is null"); : "delHint is null");
} }
if (!block.getBlockPoolId().equals(getBlockPoolId())) { if (!block.getBlockPoolId().equals(getBlockPoolId())) {
@ -325,13 +319,29 @@ class BPOfferService implements Runnable {
return; return;
} }
synchronized (receivedBlockList) { synchronized (receivedAndDeletedBlockList) {
synchronized (delHints) { receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
receivedBlockList.add(block.getLocalBlock()); .getLocalBlock(), delHint));
delHints.add(delHint); pendingReceivedRequests++;
receivedBlockList.notifyAll(); receivedAndDeletedBlockList.notifyAll();
} }
} }
void notifyNamenodeDeletedBlock(ExtendedBlock block) {
if (block == null) {
throw new IllegalArgumentException("Block is null");
}
if (!block.getBlockPoolId().equals(getBlockPoolId())) {
LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
+ getBlockPoolId());
return;
}
synchronized (receivedAndDeletedBlockList) {
receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
.getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT));
}
} }
@ -443,7 +453,8 @@ class BPOfferService implements Runnable {
* forever calling remote NameNode functions. * forever calling remote NameNode functions.
*/ */
private void offerService() throws Exception { private void offerService() throws Exception {
LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of " LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
+ dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
+ dnConf.blockReportInterval + "msec" + " Initial delay: " + dnConf.blockReportInterval + "msec" + " Initial delay: "
+ dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval=" + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
+ dnConf.heartBeatInterval); + dnConf.heartBeatInterval);
@ -481,8 +492,11 @@ class BPOfferService implements Runnable {
} }
} }
} }
if (pendingReceivedRequests > 0
reportReceivedBlocks(); || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
reportReceivedDeletedBlocks();
lastDeletedReport = startTime;
}
DatanodeCommand cmd = blockReport(); DatanodeCommand cmd = blockReport();
processCommand(cmd); processCommand(cmd);
@ -498,10 +512,10 @@ class BPOfferService implements Runnable {
// //
long waitTime = dnConf.heartBeatInterval - long waitTime = dnConf.heartBeatInterval -
(System.currentTimeMillis() - lastHeartbeat); (System.currentTimeMillis() - lastHeartbeat);
synchronized(receivedBlockList) { synchronized(receivedAndDeletedBlockList) {
if (waitTime > 0 && receivedBlockList.size() == 0) { if (waitTime > 0 && pendingReceivedRequests == 0) {
try { try {
receivedBlockList.wait(waitTime); receivedAndDeletedBlockList.wait(waitTime);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.warn("BPOfferService for " + this + " interrupted"); LOG.warn("BPOfferService for " + this + " interrupted");
} }

View File

@ -55,6 +55,7 @@ class DNConf {
final long readaheadLength; final long readaheadLength;
final long heartBeatInterval; final long heartBeatInterval;
final long blockReportInterval; final long blockReportInterval;
final long deleteReportInterval;
final long initialBlockReportDelay; final long initialBlockReportDelay;
final int writePacketSize; final int writePacketSize;
@ -105,6 +106,7 @@ class DNConf {
heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L; DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
this.deleteReportInterval = 100 * heartBeatInterval;
// do we need to sync block file contents to disk when blockfile is closed? // do we need to sync block file contents to disk when blockfile is closed?
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
DFS_DATANODE_SYNCONCLOSE_DEFAULT); DFS_DATANODE_SYNCONCLOSE_DEFAULT);

View File

@ -129,6 +129,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolServerSideTranslatorR23; import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolServerSideTranslatorR23;
import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolTranslatorR23; import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolTranslatorR23;
@ -659,6 +660,17 @@ public class DataNode extends Configured
} }
} }
// calls specific to BP
protected void notifyNamenodeDeletedBlock(ExtendedBlock block) {
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if (bpos != null) {
bpos.notifyNamenodeDeletedBlock(block);
} else {
LOG.warn("Cannot find BPOfferService for reporting block deleted for bpid="
+ block.getBlockPoolId());
}
}
public void reportBadBlocks(ExtendedBlock block) throws IOException{ public void reportBadBlocks(ExtendedBlock block) throws IOException{
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos == null || bpos.bpNamenode == null) { if(bpos == null || bpos.bpNamenode == null) {

View File

@ -1182,7 +1182,7 @@ public class FSDataset implements FSDatasetInterface {
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
roots[idx] = storage.getStorageDir(idx).getCurrentDir(); roots[idx] = storage.getStorageDir(idx).getCurrentDir();
} }
asyncDiskService = new FSDatasetAsyncDiskService(roots); asyncDiskService = new FSDatasetAsyncDiskService(this, roots);
registerMBean(storage.getStorageID()); registerMBean(storage.getStorageID());
} }
@ -2118,14 +2118,18 @@ public class FSDataset implements FSDatasetInterface {
File metaFile = getMetaFile(f, invalidBlks[i].getGenerationStamp()); File metaFile = getMetaFile(f, invalidBlks[i].getGenerationStamp());
// Delete the block asynchronously to make sure we can do it fast enough // Delete the block asynchronously to make sure we can do it fast enough
asyncDiskService.deleteAsync(v, bpid, f, metaFile, asyncDiskService.deleteAsync(v, f, metaFile,
invalidBlks[i].toString()); new ExtendedBlock(bpid, invalidBlks[i]));
} }
if (error) { if (error) {
throw new IOException("Error in deleting blocks."); throw new IOException("Error in deleting blocks.");
} }
} }
public void notifyNamenodeDeletedBlock(ExtendedBlock block){
datanode.notifyNamenodeDeletedBlock(block);
}
/** /**
* Turn the block identifier into a filename; ignore generation stamp!!! * Turn the block identifier into a filename; ignore generation stamp!!!
*/ */

View File

@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
/* /*
* This class is a container of multiple thread pools, each for a volume, * This class is a container of multiple thread pools, each for a volume,
@ -47,6 +49,8 @@ import org.apache.commons.logging.LogFactory;
*/ */
class FSDatasetAsyncDiskService { class FSDatasetAsyncDiskService {
final FSDataset dataset;
public static final Log LOG = LogFactory.getLog(FSDatasetAsyncDiskService.class); public static final Log LOG = LogFactory.getLog(FSDatasetAsyncDiskService.class);
// ThreadPool core pool size // ThreadPool core pool size
@ -70,8 +74,8 @@ class FSDatasetAsyncDiskService {
* *
* @param volumes The roots of the data volumes. * @param volumes The roots of the data volumes.
*/ */
FSDatasetAsyncDiskService(File[] volumes) { FSDatasetAsyncDiskService(FSDataset dataset, File[] volumes) {
this.dataset = dataset;
// Create one ThreadPool per volume // Create one ThreadPool per volume
for (int v = 0 ; v < volumes.length; v++) { for (int v = 0 ; v < volumes.length; v++) {
final File vol = volumes[v]; final File vol = volumes[v];
@ -147,13 +151,12 @@ class FSDatasetAsyncDiskService {
* Delete the block file and meta file from the disk asynchronously, adjust * Delete the block file and meta file from the disk asynchronously, adjust
* dfsUsed statistics accordingly. * dfsUsed statistics accordingly.
*/ */
void deleteAsync(FSDataset.FSVolume volume, String bpid, File blockFile, void deleteAsync(FSDataset.FSVolume volume, File blockFile, File metaFile,
File metaFile, String blockName) { ExtendedBlock block) {
DataNode.LOG.info("Scheduling block " + blockName + " file " + blockFile DataNode.LOG.info("Scheduling block " + block.getLocalBlock().toString()
+ " for deletion"); + " file " + blockFile + " for deletion");
ReplicaFileDeleteTask deletionTask = ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(dataset,
new ReplicaFileDeleteTask(volume, bpid, blockFile, metaFile, volume, blockFile, metaFile, block);
blockName);
execute(volume.getCurrentDir(), deletionTask); execute(volume.getCurrentDir(), deletionTask);
} }
@ -161,19 +164,19 @@ class FSDatasetAsyncDiskService {
* as decrement the dfs usage of the volume. * as decrement the dfs usage of the volume.
*/ */
static class ReplicaFileDeleteTask implements Runnable { static class ReplicaFileDeleteTask implements Runnable {
final FSDataset dataset;
final FSDataset.FSVolume volume; final FSDataset.FSVolume volume;
final String blockPoolId;
final File blockFile; final File blockFile;
final File metaFile; final File metaFile;
final String blockName; final ExtendedBlock block;
ReplicaFileDeleteTask(FSDataset.FSVolume volume, String bpid, ReplicaFileDeleteTask(FSDataset dataset, FSDataset.FSVolume volume, File blockFile,
File blockFile, File metaFile, String blockName) { File metaFile, ExtendedBlock block) {
this.dataset = dataset;
this.volume = volume; this.volume = volume;
this.blockPoolId = bpid;
this.blockFile = blockFile; this.blockFile = blockFile;
this.metaFile = metaFile; this.metaFile = metaFile;
this.blockName = blockName; this.block = block;
} }
FSDataset.FSVolume getVolume() { FSDataset.FSVolume getVolume() {
@ -183,9 +186,9 @@ class FSDatasetAsyncDiskService {
@Override @Override
public String toString() { public String toString() {
// Called in AsyncDiskService.execute for displaying error messages. // Called in AsyncDiskService.execute for displaying error messages.
return "deletion of block " + blockPoolId + " " + blockName return "deletion of block " + block.getBlockPoolId() + " "
+ " with block file " + blockFile + " and meta file " + metaFile + block.getLocalBlock().toString() + " with block file " + blockFile
+ " from volume " + volume; + " and meta file " + metaFile + " from volume " + volume;
} }
@Override @Override
@ -193,12 +196,15 @@ class FSDatasetAsyncDiskService {
long dfsBytes = blockFile.length() + metaFile.length(); long dfsBytes = blockFile.length() + metaFile.length();
if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) { if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
DataNode.LOG.warn("Unexpected error trying to delete block " DataNode.LOG.warn("Unexpected error trying to delete block "
+ blockPoolId + " " + blockName + " at file " + blockFile + block.getBlockPoolId() + " " + block.getLocalBlock().toString()
+ ". Ignored."); + " at file " + blockFile + ". Ignored.");
} else { } else {
volume.decDfsUsed(blockPoolId, dfsBytes); if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
DataNode.LOG.info("Deleted block " + blockPoolId + " " + blockName dataset.notifyNamenodeDeletedBlock(block);
+ " at file " + blockFile); }
volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
DataNode.LOG.info("Deleted block " + block.getBlockPoolId() + " "
+ block.getLocalBlock().toString() + " at file " + blockFile);
} }
} }
}; };

View File

@ -77,6 +77,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
@ -820,17 +821,16 @@ class NameNodeRpcServer implements NamenodeProtocols {
} }
@Override // DatanodeProtocol @Override // DatanodeProtocol
public void blockReceived(DatanodeRegistration nodeReg, String poolId, public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
Block blocks[], String delHints[]) throws IOException { ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException {
verifyRequest(nodeReg); verifyRequest(nodeReg);
if(stateChangeLog.isDebugEnabled()) { if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.blockReceived: " stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
+"from "+nodeReg.getName()+" "+blocks.length+" blocks."); +"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length
} +" blocks.");
for (int i = 0; i < blocks.length; i++) {
namesystem.getBlockManager().blockReceived(
nodeReg, poolId, blocks[i], delHints[i]);
} }
namesystem.getBlockManager().blockReceivedAndDeleted(
nodeReg, poolId, receivedAndDeletedBlocks);
} }
@Override // DatanodeProtocol @Override // DatanodeProtocol

View File

@ -44,6 +44,16 @@ import org.apache.hadoop.io.WritableFactory;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class BlockCommand extends DatanodeCommand { public class BlockCommand extends DatanodeCommand {
/**
* This constant is used to indicate that the block deletion does not need
* explicit ACK from the datanode. When a block is put into the list of blocks
* to be deleted, it's size is set to this constant. We assume that no block
* would actually have this size. Otherwise, we would miss ACKs for blocks
* with such size. Positive number is used for compatibility reasons.
*/
public static final long NO_ACK = Long.MAX_VALUE;
String poolId; String poolId;
Block blocks[]; Block blocks[];
DatanodeInfo targets[][]; DatanodeInfo targets[][];

View File

@ -126,17 +126,19 @@ public interface DatanodeProtocol extends VersionedProtocol {
long[] blocks) throws IOException; long[] blocks) throws IOException;
/** /**
* blockReceived() allows the DataNode to tell the NameNode about * blockReceivedAndDeleted() allows the DataNode to tell the NameNode about
* recently-received block data, with a hint for pereferred replica * recently-received and -deleted block data.
* to be deleted when there is any excessive blocks. *
* For the case of received blocks, a hint for preferred replica to be
* deleted when there is any excessive blocks is provided.
* For example, whenever client code * For example, whenever client code
* writes a new Block here, or another DataNode copies a Block to * writes a new Block here, or another DataNode copies a Block to
* this DataNode, it will call blockReceived(). * this DataNode, it will call blockReceived().
*/ */
public void blockReceived(DatanodeRegistration registration, public void blockReceivedAndDeleted(DatanodeRegistration registration,
String poolId, String poolId,
Block blocks[], ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks)
String[] delHints) throws IOException; throws IOException;
/** /**
* errorReport() tells the NameNode about something that has gone * errorReport() tells the NameNode about something that has gone

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
/**
* A data structure to store Block and delHints together, used to send
* received/deleted ACKs.
*/
public class ReceivedDeletedBlockInfo implements Writable {
Block block;
String delHints;
public final static String TODELETE_HINT = "-";
public ReceivedDeletedBlockInfo() {
}
public ReceivedDeletedBlockInfo(Block blk, String delHints) {
this.block = blk;
this.delHints = delHints;
}
public Block getBlock() {
return this.block;
}
public void setBlock(Block blk) {
this.block = blk;
}
public String getDelHints() {
return this.delHints;
}
public void setDelHints(String hints) {
this.delHints = hints;
}
public boolean equals(Object o) {
if (!(o instanceof ReceivedDeletedBlockInfo)) {
return false;
}
ReceivedDeletedBlockInfo other = (ReceivedDeletedBlockInfo) o;
return this.block.equals(other.getBlock())
&& this.delHints.equals(other.delHints);
}
public int hashCode() {
assert false : "hashCode not designed";
return 0;
}
public boolean blockEquals(Block b) {
return this.block.equals(b);
}
public boolean isDeletedBlock() {
return delHints.equals(TODELETE_HINT);
}
@Override
public void write(DataOutput out) throws IOException {
this.block.write(out);
Text.writeString(out, this.delHints);
}
@Override
public void readFields(DataInput in) throws IOException {
this.block = new Block();
this.block.readFields(in);
this.delHints = Text.readString(in);
}
public String toString() {
return block.toString() + ", delHint: " + delHints;
}
}

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
@ -880,10 +881,10 @@ public class NNThroughputBenchmark {
receivedDNReg.setStorageInfo( receivedDNReg.setStorageInfo(
new DataStorage(nsInfo, dnInfo.getStorageID())); new DataStorage(nsInfo, dnInfo.getStorageID()));
receivedDNReg.setInfoPort(dnInfo.getInfoPort()); receivedDNReg.setInfoPort(dnInfo.getInfoPort());
nameNodeProto.blockReceived( receivedDNReg, nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
nameNode.getNamesystem().getBlockPoolId(), .getNamesystem().getBlockPoolId(),
new Block[] {blocks[i]}, new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
new String[] {DataNode.EMPTY_DEL_HINT}); blocks[i], DataNode.EMPTY_DEL_HINT) });
} }
} }
return blocks.length; return blocks.length;
@ -995,11 +996,10 @@ public class NNThroughputBenchmark {
for(DatanodeInfo dnInfo : loc.getLocations()) { for(DatanodeInfo dnInfo : loc.getLocations()) {
int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName()); int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock()); datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock());
nameNodeProto.blockReceived( nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
datanodes[dnIdx].dnRegistration, .getBlock().getBlockPoolId(),
loc.getBlock().getBlockPoolId(), new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(loc
new Block[] {loc.getBlock().getLocalBlock()}, .getBlock().getLocalBlock(), "") });
new String[] {""});
} }
} }
return prevBlock; return prevBlock;

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
@ -105,12 +106,12 @@ public class TestDeadDatanode {
DatanodeProtocol dnp = cluster.getNameNodeRpc(); DatanodeProtocol dnp = cluster.getNameNodeRpc();
Block[] blocks = new Block[] { new Block(0) }; ReceivedDeletedBlockInfo[] blocks = { new ReceivedDeletedBlockInfo(
String[] delHints = new String[] { "" }; new Block(0), "") };
// Ensure blockReceived call from dead datanode is rejected with IOException // Ensure blockReceived call from dead datanode is rejected with IOException
try { try {
dnp.blockReceived(reg, poolId, blocks, delHints); dnp.blockReceivedAndDeleted(reg, poolId, blocks);
Assert.fail("Expected IOException is not thrown"); Assert.fail("Expected IOException is not thrown");
} catch (IOException ex) { } catch (IOException ex) {
// Expected // Expected