HDFS-5390. Send one incremental block report per storage directory.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1534891 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-10-23 01:28:48 +00:00
parent b884af72c5
commit 01f37e42f0
16 changed files with 204 additions and 83 deletions

View File

@ -42,3 +42,6 @@ IMPROVEMENTS:
HDFS-5398. NameNode changes to process storage reports per storage HDFS-5398. NameNode changes to process storage reports per storage
directory. (Arpit Agarwal) directory. (Arpit Agarwal)
HDFS-5390. Send one incremental block report per storage directory.
(Arpit Agarwal)

View File

@ -192,7 +192,8 @@ class BPOfferService {
* till namenode is informed before responding with success to the * till namenode is informed before responding with success to the
* client? For now we don't. * client? For now we don't.
*/ */
void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) { void notifyNamenodeReceivedBlock(
ExtendedBlock block, String delHint, String storageUuid) {
checkBlock(block); checkBlock(block);
checkDelHint(delHint); checkDelHint(delHint);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
@ -201,7 +202,7 @@ class BPOfferService {
delHint); delHint);
for (BPServiceActor actor : bpServices) { for (BPServiceActor actor : bpServices) {
actor.notifyNamenodeBlockImmediately(bInfo); actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
} }
} }
@ -218,23 +219,23 @@ class BPOfferService {
"delHint is null"); "delHint is null");
} }
void notifyNamenodeDeletedBlock(ExtendedBlock block) { void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
checkBlock(block); checkBlock(block);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null); block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null);
for (BPServiceActor actor : bpServices) { for (BPServiceActor actor : bpServices) {
actor.notifyNamenodeDeletedBlock(bInfo); actor.notifyNamenodeDeletedBlock(bInfo, storageUuid);
} }
} }
void notifyNamenodeReceivingBlock(ExtendedBlock block) { void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
checkBlock(block); checkBlock(block);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null); block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null);
for (BPServiceActor actor : bpServices) { for (BPServiceActor actor : bpServices) {
actor.notifyNamenodeBlockImmediately(bInfo); actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
} }
} }

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -94,9 +93,9 @@ class BPServiceActor implements Runnable {
* keyed by block ID, contains the pending changes which have yet to be * keyed by block ID, contains the pending changes which have yet to be
* reported to the NN. Access should be synchronized on this object. * reported to the NN. Access should be synchronized on this object.
*/ */
private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR private final Map<String, PerStoragePendingIncrementalBR>
= Maps.newHashMap(); pendingIncrementalBRperStorage = Maps.newConcurrentMap();
private volatile int pendingReceivedRequests = 0; private volatile int pendingReceivedRequests = 0;
private volatile boolean shouldServiceRun = true; private volatile boolean shouldServiceRun = true;
private final DataNode dn; private final DataNode dn;
@ -263,64 +262,84 @@ class BPServiceActor implements Runnable {
* @throws IOException * @throws IOException
*/ */
private void reportReceivedDeletedBlocks() throws IOException { private void reportReceivedDeletedBlocks() throws IOException {
// For each storage, check if there are newly received blocks and if
// check if there are newly received blocks // so then send an incremental report to the NameNode.
ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null; for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
synchronized (pendingIncrementalBR) { pendingIncrementalBRperStorage.entrySet()) {
int numBlocks = pendingIncrementalBR.size(); final String storageUuid = entry.getKey();
if (numBlocks > 0) { final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
// ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
// Send newly-received and deleted blockids to namenode // TODO: We can probably use finer-grained synchronization now.
// synchronized (pendingIncrementalBRperStorage) {
receivedAndDeletedBlockArray = pendingIncrementalBR if (perStorageMap.getBlockInfoCount() > 0) {
.values().toArray(new ReceivedDeletedBlockInfo[numBlocks]); // Send newly-received and deleted blockids to namenode
receivedAndDeletedBlockArray = perStorageMap.dequeueBlockInfos();
pendingReceivedRequests -= receivedAndDeletedBlockArray.length;
}
} }
pendingIncrementalBR.clear();
} if (receivedAndDeletedBlockArray != null) {
if (receivedAndDeletedBlockArray != null) { StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( storageUuid, receivedAndDeletedBlockArray) };
bpRegistration.getDatanodeUuid(), receivedAndDeletedBlockArray) }; boolean success = false;
boolean success = false; try {
try { bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(), report);
report); success = true;
success = true; } finally {
} finally { synchronized (pendingIncrementalBRperStorage) {
synchronized (pendingIncrementalBR) { if (!success) {
if (!success) { // If we didn't succeed in sending the report, put all of the
// If we didn't succeed in sending the report, put all of the // blocks back onto our queue, but only in the case where we
// blocks back onto our queue, but only in the case where we didn't // didn't put something newer in the meantime.
// put something newer in the meantime. perStorageMap.putMissingBlockInfos(receivedAndDeletedBlockArray);
for (ReceivedDeletedBlockInfo rdbi : receivedAndDeletedBlockArray) { pendingReceivedRequests += perStorageMap.getBlockInfoCount();
if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
}
} }
} }
pendingReceivedRequests = pendingIncrementalBR.size();
} }
} }
} }
} }
/**
* Retrieve the incremental BR state for a given storage UUID
* @param storageUuid
* @return
*/
private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(
String storageUuid) {
PerStoragePendingIncrementalBR mapForStorage =
pendingIncrementalBRperStorage.get(storageUuid);
if (mapForStorage == null) {
// This is the first time we are adding incremental BR state for
// this storage so create a new map. This is required once per
// storage, per service actor.
mapForStorage = new PerStoragePendingIncrementalBR();
pendingIncrementalBRperStorage.put(storageUuid, mapForStorage);
}
return mapForStorage;
}
/* /*
* Informing the name node could take a long long time! Should we wait * Informing the name node could take a long long time! Should we wait
* till namenode is informed before responding with success to the * till namenode is informed before responding with success to the
* client? For now we don't. * client? For now we don't.
*/ */
void notifyNamenodeBlockImmediately(ReceivedDeletedBlockInfo bInfo) { void notifyNamenodeBlockImmediately(
synchronized (pendingIncrementalBR) { ReceivedDeletedBlockInfo bInfo, String storageUuid) {
pendingIncrementalBR.put( synchronized (pendingIncrementalBRperStorage) {
bInfo.getBlock().getBlockId(), bInfo); getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo);
pendingReceivedRequests++; pendingReceivedRequests++;
pendingIncrementalBR.notifyAll(); pendingIncrementalBRperStorage.notifyAll();
} }
} }
void notifyNamenodeDeletedBlock(ReceivedDeletedBlockInfo bInfo) { void notifyNamenodeDeletedBlock(
synchronized (pendingIncrementalBR) { ReceivedDeletedBlockInfo bInfo, String storageUuid) {
pendingIncrementalBR.put( synchronized (pendingIncrementalBRperStorage) {
bInfo.getBlock().getBlockId(), bInfo); getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo);
} }
} }
@ -329,13 +348,13 @@ class BPServiceActor implements Runnable {
*/ */
@VisibleForTesting @VisibleForTesting
void triggerBlockReportForTests() { void triggerBlockReportForTests() {
synchronized (pendingIncrementalBR) { synchronized (pendingIncrementalBRperStorage) {
lastBlockReport = 0; lastBlockReport = 0;
lastHeartbeat = 0; lastHeartbeat = 0;
pendingIncrementalBR.notifyAll(); pendingIncrementalBRperStorage.notifyAll();
while (lastBlockReport == 0) { while (lastBlockReport == 0) {
try { try {
pendingIncrementalBR.wait(100); pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
return; return;
} }
@ -345,12 +364,12 @@ class BPServiceActor implements Runnable {
@VisibleForTesting @VisibleForTesting
void triggerHeartbeatForTests() { void triggerHeartbeatForTests() {
synchronized (pendingIncrementalBR) { synchronized (pendingIncrementalBRperStorage) {
lastHeartbeat = 0; lastHeartbeat = 0;
pendingIncrementalBR.notifyAll(); pendingIncrementalBRperStorage.notifyAll();
while (lastHeartbeat == 0) { while (lastHeartbeat == 0) {
try { try {
pendingIncrementalBR.wait(100); pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
return; return;
} }
@ -360,13 +379,13 @@ class BPServiceActor implements Runnable {
@VisibleForTesting @VisibleForTesting
void triggerDeletionReportForTests() { void triggerDeletionReportForTests() {
synchronized (pendingIncrementalBR) { synchronized (pendingIncrementalBRperStorage) {
lastDeletedReport = 0; lastDeletedReport = 0;
pendingIncrementalBR.notifyAll(); pendingIncrementalBRperStorage.notifyAll();
while (lastDeletedReport == 0) { while (lastDeletedReport == 0) {
try { try {
pendingIncrementalBR.wait(100); pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
return; return;
} }
@ -582,10 +601,10 @@ class BPServiceActor implements Runnable {
// //
long waitTime = dnConf.heartBeatInterval - long waitTime = dnConf.heartBeatInterval -
(Time.now() - lastHeartbeat); (Time.now() - lastHeartbeat);
synchronized(pendingIncrementalBR) { synchronized(pendingIncrementalBRperStorage) {
if (waitTime > 0 && pendingReceivedRequests == 0) { if (waitTime > 0 && pendingReceivedRequests == 0) {
try { try {
pendingIncrementalBR.wait(waitTime); pendingIncrementalBRperStorage.wait(waitTime);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.warn("BPOfferService for " + this + " interrupted"); LOG.warn("BPOfferService for " + this + " interrupted");
} }
@ -756,4 +775,52 @@ class BPServiceActor implements Runnable {
} }
} }
private static class PerStoragePendingIncrementalBR {
private Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR =
Maps.newHashMap();
/**
* Return the number of blocks on this storage that have pending
* incremental block reports.
* @return
*/
int getBlockInfoCount() {
return pendingIncrementalBR.size();
}
/**
* Dequeue and return all pending incremental block report state.
* @return
*/
ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
ReceivedDeletedBlockInfo[] blockInfos =
pendingIncrementalBR.values().toArray(
new ReceivedDeletedBlockInfo[getBlockInfoCount()]);
pendingIncrementalBR.clear();
return blockInfos;
}
/**
* Add blocks from blockArray to pendingIncrementalBR, unless the
* block already exists in pendingIncrementalBR.
* @param blockArray list of blocks to add.
*/
void putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) {
for (ReceivedDeletedBlockInfo rdbi : blockArray) {
if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
}
}
}
/**
* Add pending incremental block report for a single block.
* @param blockID
* @param blockInfo
*/
void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo);
}
}
} }

View File

@ -162,7 +162,8 @@ class BlockReceiver implements Closeable {
switch (stage) { switch (stage) {
case PIPELINE_SETUP_CREATE: case PIPELINE_SETUP_CREATE:
replicaInfo = datanode.data.createRbw(block); replicaInfo = datanode.data.createRbw(block);
datanode.notifyNamenodeReceivingBlock(block); datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
break; break;
case PIPELINE_SETUP_STREAMING_RECOVERY: case PIPELINE_SETUP_STREAMING_RECOVERY:
replicaInfo = datanode.data.recoverRbw( replicaInfo = datanode.data.recoverRbw(
@ -176,7 +177,8 @@ class BlockReceiver implements Closeable {
block.getLocalBlock()); block.getLocalBlock());
} }
block.setGenerationStamp(newGs); block.setGenerationStamp(newGs);
datanode.notifyNamenodeReceivingBlock(block); datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
break; break;
case PIPELINE_SETUP_APPEND_RECOVERY: case PIPELINE_SETUP_APPEND_RECOVERY:
replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd); replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
@ -185,7 +187,8 @@ class BlockReceiver implements Closeable {
block.getLocalBlock()); block.getLocalBlock());
} }
block.setGenerationStamp(newGs); block.setGenerationStamp(newGs);
datanode.notifyNamenodeReceivingBlock(block); datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
break; break;
case TRANSFER_RBW: case TRANSFER_RBW:
case TRANSFER_FINALIZED: case TRANSFER_FINALIZED:
@ -252,6 +255,10 @@ class BlockReceiver implements Closeable {
/** Return the datanode object. */ /** Return the datanode object. */
DataNode getDataNode() {return datanode;} DataNode getDataNode() {return datanode;}
public Replica getReplicaInfo() {
return replicaInfo;
}
/** /**
* close files. * close files.
*/ */
@ -1072,7 +1079,8 @@ class BlockReceiver implements Closeable {
: 0; : 0;
block.setNumBytes(replicaInfo.getNumBytes()); block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block); datanode.data.finalizeBlock(block);
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT); datanode.closeBlock(
block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
if (ClientTraceLog.isInfoEnabled() && isClient) { if (ClientTraceLog.isInfoEnabled() && isClient) {
long offset = 0; long offset = 0;
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block

View File

@ -520,10 +520,11 @@ public class DataNode extends Configured
} }
// calls specific to BP // calls specific to BP
protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) { protected void notifyNamenodeReceivedBlock(
ExtendedBlock block, String delHint, String storageUuid) {
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) { if(bpos != null) {
bpos.notifyNamenodeReceivedBlock(block, delHint); bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
} else { } else {
LOG.error("Cannot find BPOfferService for reporting block received for bpid=" LOG.error("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId()); + block.getBlockPoolId());
@ -531,10 +532,11 @@ public class DataNode extends Configured
} }
// calls specific to BP // calls specific to BP
protected void notifyNamenodeReceivingBlock(ExtendedBlock block) { protected void notifyNamenodeReceivingBlock(
ExtendedBlock block, String storageUuid) {
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) { if(bpos != null) {
bpos.notifyNamenodeReceivingBlock(block); bpos.notifyNamenodeReceivingBlock(block, storageUuid);
} else { } else {
LOG.error("Cannot find BPOfferService for reporting block receiving for bpid=" LOG.error("Cannot find BPOfferService for reporting block receiving for bpid="
+ block.getBlockPoolId()); + block.getBlockPoolId());
@ -542,10 +544,10 @@ public class DataNode extends Configured
} }
/** Notify the corresponding namenode to delete the block. */ /** Notify the corresponding namenode to delete the block. */
public void notifyNamenodeDeletedBlock(ExtendedBlock block) { public void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if (bpos != null) { if (bpos != null) {
bpos.notifyNamenodeDeletedBlock(block); bpos.notifyNamenodeDeletedBlock(block, storageUuid);
} else { } else {
LOG.error("Cannot find BPOfferService for reporting block deleted for bpid=" LOG.error("Cannot find BPOfferService for reporting block deleted for bpid="
+ block.getBlockPoolId()); + block.getBlockPoolId());
@ -1528,11 +1530,11 @@ public class DataNode extends Configured
* @param block * @param block
* @param delHint * @param delHint
*/ */
void closeBlock(ExtendedBlock block, String delHint) { void closeBlock(ExtendedBlock block, String delHint, String storageUuid) {
metrics.incrBlocksWritten(); metrics.incrBlocksWritten();
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) { if(bpos != null) {
bpos.notifyNamenodeReceivedBlock(block, delHint); bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
} else { } else {
LOG.warn("Cannot find BPOfferService for reporting block received for bpid=" LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId()); + block.getBlockPoolId());
@ -1892,7 +1894,7 @@ public class DataNode extends Configured
ExtendedBlock newBlock = new ExtendedBlock(oldBlock); ExtendedBlock newBlock = new ExtendedBlock(oldBlock);
newBlock.setGenerationStamp(recoveryId); newBlock.setGenerationStamp(recoveryId);
newBlock.setNumBytes(newLength); newBlock.setNumBytes(newLength);
notifyNamenodeReceivedBlock(newBlock, ""); notifyNamenodeReceivedBlock(newBlock, "", storageID);
return storageID; return storageID;
} }

View File

@ -447,6 +447,7 @@ class DataXceiver extends Receiver implements Runnable {
String mirrorNode = null; // the name:port of next target String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup String firstBadLink = ""; // first datanode that failed in connection setup
Status mirrorInStatus = SUCCESS; Status mirrorInStatus = SUCCESS;
Replica replica;
try { try {
if (isDatanode || if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
@ -457,8 +458,10 @@ class DataXceiver extends Receiver implements Runnable {
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum, clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy); cachingStrategy);
replica = blockReceiver.getReplicaInfo();
} else { } else {
datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd); replica =
datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
} }
// //
@ -590,7 +593,8 @@ class DataXceiver extends Receiver implements Runnable {
// the block is finalized in the PacketResponder. // the block is finalized in the PacketResponder.
if (isDatanode || if (isDatanode ||
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT); datanode.closeBlock(
block, DataNode.EMPTY_DEL_HINT, replica.getStorageUuid());
LOG.info("Received " + block + " src: " + remoteAddress + " dest: " LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
+ localAddress + " of size " + block.getNumBytes()); + localAddress + " of size " + block.getNumBytes());
} }
@ -859,7 +863,8 @@ class DataXceiver extends Receiver implements Runnable {
dataXceiverServer.balanceThrottler, null); dataXceiverServer.balanceThrottler, null);
// notify name node // notify name node
datanode.notifyNamenodeReceivedBlock(block, delHint); datanode.notifyNamenodeReceivedBlock(
block, delHint, blockReceiver.getReplicaInfo().getStorageUuid());
LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()); LOG.info("Moved " + block + " from " + peer.getRemoteAddressString());

View File

@ -54,4 +54,9 @@ public interface Replica {
* @return the number of bytes that are visible to readers * @return the number of bytes that are visible to readers
*/ */
public long getVisibleLength(); public long getVisibleLength();
/**
* Return the storageUuid of the volume that stores this replica.
*/
public String getStorageUuid();
} }

View File

@ -137,6 +137,14 @@ abstract public class ReplicaInfo extends Block implements Replica {
void setVolume(FsVolumeSpi vol) { void setVolume(FsVolumeSpi vol) {
this.volume = vol; this.volume = vol;
} }
/**
* Get the storageUuid of the volume that stores this replica.
*/
@Override
public String getStorageUuid() {
return volume.getStorageID();
}
/** /**
* Return the parent directory path where this replica is located * Return the parent directory path where this replica is located

View File

@ -243,7 +243,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @param expectedBlockLen the number of bytes the replica is expected to have * @param expectedBlockLen the number of bytes the replica is expected to have
* @throws IOException * @throws IOException
*/ */
public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
) throws IOException; ) throws IOException;
/** /**

View File

@ -26,6 +26,9 @@ import org.apache.hadoop.hdfs.StorageType;
* This is an interface for the underlying volume. * This is an interface for the underlying volume.
*/ */
public interface FsVolumeSpi { public interface FsVolumeSpi {
/** @return the StorageUuid of the volume */
public String getStorageID();
/** @return a list of block pools. */ /** @return a list of block pools. */
public String[] getBlockPoolList(); public String[] getBlockPoolList();

View File

@ -195,7 +195,7 @@ class FsDatasetAsyncDiskService {
+ " at file " + blockFile + ". Ignored."); + " at file " + blockFile + ". Ignored.");
} else { } else {
if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){ if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
datanode.notifyNamenodeDeletedBlock(block); datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
} }
volume.decDfsUsed(block.getBlockPoolId(), dfsBytes); volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
LOG.info("Deleted " + block.getBlockPoolId() + " " LOG.info("Deleted " + block.getBlockPoolId() + " "

View File

@ -699,7 +699,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public void recoverClose(ExtendedBlock b, long newGS, public Replica recoverClose(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException { long expectedBlockLen) throws IOException {
LOG.info("Recover failed close " + b); LOG.info("Recover failed close " + b);
// check replica's state // check replica's state
@ -710,6 +710,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (replicaInfo.getState() == ReplicaState.RBW) { if (replicaInfo.getState() == ReplicaState.RBW) {
finalizeReplica(b.getBlockPoolId(), replicaInfo); finalizeReplica(b.getBlockPoolId(), replicaInfo);
} }
return replicaInfo;
} }
/** /**

View File

@ -290,6 +290,7 @@ class FsVolumeImpl implements FsVolumeSpi {
} }
} }
@Override
public String getStorageID() { public String getStorageID() {
return storageID; return storageID;
} }

View File

@ -131,6 +131,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
} }
@Override
public String getStorageUuid() {
return storage.getStorageUuid();
}
@Override @Override
synchronized public long getGenerationStamp() { synchronized public long getGenerationStamp() {
return theBlock.getGenerationStamp(); return theBlock.getGenerationStamp();
@ -314,6 +319,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
private static class SimulatedStorage { private static class SimulatedStorage {
private Map<String, SimulatedBPStorage> map = private Map<String, SimulatedBPStorage> map =
new HashMap<String, SimulatedBPStorage>(); new HashMap<String, SimulatedBPStorage>();
private final String storageUuid = "SimulatedStorage-UUID";
private long capacity; // in bytes private long capacity; // in bytes
synchronized long getFree() { synchronized long getFree() {
@ -375,6 +382,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
return bpStorage; return bpStorage;
} }
public String getStorageUuid() {
return storageUuid;
}
} }
private final Map<String, Map<Block, BInfo>> blockMap private final Map<String, Map<Block, BInfo>> blockMap
@ -625,7 +636,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
throws IOException { throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock()); BInfo binfo = map.get(b.getLocalBlock());
@ -639,6 +650,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
map.remove(b.getLocalBlock()); map.remove(b.getLocalBlock());
binfo.theBlock.setGenerationStamp(newGS); binfo.theBlock.setGenerationStamp(newGS);
map.put(binfo.theBlock, binfo); map.put(binfo.theBlock, binfo);
return binfo;
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi

View File

@ -176,7 +176,7 @@ public class TestBPOfferService {
waitForBlockReport(mockNN2); waitForBlockReport(mockNN2);
// When we receive a block, it should report it to both NNs // When we receive a block, it should report it to both NNs
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, ""); bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, "", "");
ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1); ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1);
assertEquals(1, ret.length); assertEquals(1, ret.length);

View File

@ -412,6 +412,11 @@ public class TestDirectoryScanner {
public StorageType getStorageType() { public StorageType getStorageType() {
return StorageType.DEFAULT; return StorageType.DEFAULT;
} }
@Override
public String getStorageID() {
return "";
}
} }
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi(); private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();