HDFS-9710. DN can be configured to send block receipt IBRs in batches.

This commit is contained in:
Tsz-Wo Nicholas Sze 2016-02-26 15:32:25 -08:00
parent eab52dfb35
commit d1d4e16690
19 changed files with 388 additions and 75 deletions

View File

@ -2058,6 +2058,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9425. Expose number of blocks per volume as a metric HDFS-9425. Expose number of blocks per volume as a metric
(Brahma Reddy Battula via vinayakumarb) (Brahma Reddy Battula via vinayakumarb)
HDFS-9710. DN can be configured to send block receipt IBRs in batches.
(szetszwo)
BUG FIXES BUG FIXES
HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs. HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs.

View File

@ -555,6 +555,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval"; public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval";
public static final int DFS_DF_INTERVAL_DEFAULT = 60000; public static final int DFS_DF_INTERVAL_DEFAULT = 60000;
public static final String DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY
= "dfs.blockreport.incremental.intervalMsec";
public static final long DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT
= 0;
public static final String DFS_BLOCKREPORT_INTERVAL_MSEC_KEY = "dfs.blockreport.intervalMsec"; public static final String DFS_BLOCKREPORT_INTERVAL_MSEC_KEY = "dfs.blockreport.intervalMsec";
public static final long DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 6 * 60 * 60 * 1000; public static final long DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 6 * 60 * 60 * 1000;
public static final String DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay"; public static final String DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";

View File

@ -233,29 +233,32 @@ class BPOfferService {
* till namenode is informed before responding with success to the * till namenode is informed before responding with success to the
* client? For now we don't. * client? For now we don't.
*/ */
void notifyNamenodeReceivedBlock( void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint,
ExtendedBlock block, String delHint, String storageUuid) { String storageUuid, boolean isOnTransientStorage) {
notifyNamenodeBlock(block, BlockStatus.RECEIVED_BLOCK, delHint, notifyNamenodeBlock(block, BlockStatus.RECEIVED_BLOCK, delHint,
storageUuid); storageUuid, isOnTransientStorage);
} }
void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) { void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid); notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid,
false);
} }
void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) { void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid); notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid,
false);
} }
private void notifyNamenodeBlock(ExtendedBlock block, BlockStatus status, private void notifyNamenodeBlock(ExtendedBlock block, BlockStatus status,
String delHint, String storageUuid) { String delHint, String storageUuid, boolean isOnTransientStorage) {
checkBlock(block); checkBlock(block);
final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo( final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo(
block.getLocalBlock(), status, delHint); block.getLocalBlock(), status, delHint);
final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid); final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid);
for (BPServiceActor actor : bpServices) { for (BPServiceActor actor : bpServices) {
actor.getIbrManager().notifyNamenodeBlock(info, storage); actor.getIbrManager().notifyNamenodeBlock(info, storage,
isOnTransientStorage);
} }
} }

View File

@ -97,8 +97,7 @@ class BPServiceActor implements Runnable {
private final DNConf dnConf; private final DNConf dnConf;
private long prevBlockReportId; private long prevBlockReportId;
private final IncrementalBlockReportManager ibrManager private final IncrementalBlockReportManager ibrManager;
= new IncrementalBlockReportManager();
private DatanodeRegistration bpRegistration; private DatanodeRegistration bpRegistration;
final LinkedList<BPServiceActorAction> bpThreadQueue final LinkedList<BPServiceActorAction> bpThreadQueue
@ -109,6 +108,7 @@ class BPServiceActor implements Runnable {
this.dn = bpos.getDataNode(); this.dn = bpos.getDataNode();
this.nnAddr = nnAddr; this.nnAddr = nnAddr;
this.dnConf = dn.getDnConf(); this.dnConf = dn.getDnConf();
this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval);
prevBlockReportId = ThreadLocalRandom.current().nextLong(); prevBlockReportId = ThreadLocalRandom.current().nextLong();
scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval); scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
} }
@ -568,20 +568,9 @@ class BPServiceActor implements Runnable {
processCommand(new DatanodeCommand[]{ cmd }); processCommand(new DatanodeCommand[]{ cmd });
} }
//
// There is no work to do; sleep until hearbeat timer elapses, // There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again. // or work arrives, and then iterate again.
// ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
long waitTime = scheduler.getHeartbeatWaitTime();
synchronized(ibrManager) {
if (waitTime > 0 && !ibrManager.sendImmediately()) {
try {
ibrManager.wait(waitTime);
} catch (InterruptedException ie) {
LOG.warn("BPOfferService for " + this + " interrupted");
}
}
} // synchronized
} catch(RemoteException re) { } catch(RemoteException re) {
String reClass = re.getClassName(); String reClass = re.getClassName();
if (UnregisteredNodeException.class.getName().equals(reClass) || if (UnregisteredNodeException.class.getName().equals(reClass) ||
@ -768,7 +757,7 @@ class BPServiceActor implements Runnable {
void triggerBlockReport(BlockReportOptions options) { void triggerBlockReport(BlockReportOptions options) {
if (options.isIncremental()) { if (options.isIncremental()) {
LOG.info(bpos.toString() + ": scheduling an incremental block report."); LOG.info(bpos.toString() + ": scheduling an incremental block report.");
ibrManager.triggerIBR(); ibrManager.triggerIBR(true);
} else { } else {
LOG.info(bpos.toString() + ": scheduling a full block report."); LOG.info(bpos.toString() + ": scheduling a full block report.");
synchronized(ibrManager) { synchronized(ibrManager) {

View File

@ -302,8 +302,8 @@ class BlockReceiver implements Closeable {
/** Return the datanode object. */ /** Return the datanode object. */
DataNode getDataNode() {return datanode;} DataNode getDataNode() {return datanode;}
String getStorageUuid() { Replica getReplica() {
return replicaInfo.getStorageUuid(); return replicaInfo;
} }
/** /**
@ -1439,8 +1439,8 @@ class BlockReceiver implements Closeable {
datanode.data.setPinning(block); datanode.data.setPinning(block);
} }
datanode.closeBlock( datanode.closeBlock(block, null, replicaInfo.getStorageUuid(),
block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid()); replicaInfo.isOnTransientStorage());
if (ClientTraceLog.isInfoEnabled() && isClient) { if (ClientTraceLog.isInfoEnabled() && isClient) {
long offset = 0; long offset = 0;
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block

View File

@ -89,6 +89,7 @@ public class DNConf {
final long heartBeatInterval; final long heartBeatInterval;
final long blockReportInterval; final long blockReportInterval;
final long blockReportSplitThreshold; final long blockReportSplitThreshold;
final long ibrInterval;
final long initialBlockReportDelayMs; final long initialBlockReportDelayMs;
final long cacheReportInterval; final long cacheReportInterval;
final long dfsclientSlowIoWarningThresholdMs; final long dfsclientSlowIoWarningThresholdMs;
@ -156,6 +157,9 @@ public class DNConf {
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
this.ibrInterval = conf.getLong(
DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY,
DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT);
this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT); DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT);
this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,

View File

@ -315,7 +315,6 @@ public class DataNode extends ReconfigurableBase
volatile FsDatasetSpi<? extends FsVolumeSpi> data = null; volatile FsDatasetSpi<? extends FsVolumeSpi> data = null;
private String clusterId = null; private String clusterId = null;
public final static String EMPTY_DEL_HINT = "";
final AtomicInteger xmitsInProgress = new AtomicInteger(); final AtomicInteger xmitsInProgress = new AtomicInteger();
Daemon dataXceiverServer = null; Daemon dataXceiverServer = null;
DataXceiverServer xserver = null; DataXceiverServer xserver = null;
@ -1095,11 +1094,12 @@ public class DataNode extends ReconfigurableBase
} }
// calls specific to BP // calls specific to BP
public void notifyNamenodeReceivedBlock( public void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint,
ExtendedBlock block, String delHint, String storageUuid) { String storageUuid, boolean isOnTransientStorage) {
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) { if(bpos != null) {
bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid); bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid,
isOnTransientStorage);
} else { } else {
LOG.error("Cannot find BPOfferService for reporting block received for bpid=" LOG.error("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId()); + block.getBlockPoolId());
@ -2380,15 +2380,11 @@ public class DataNode extends ReconfigurableBase
* @param delHint hint on which excess block to delete * @param delHint hint on which excess block to delete
* @param storageUuid UUID of the storage where block is stored * @param storageUuid UUID of the storage where block is stored
*/ */
void closeBlock(ExtendedBlock block, String delHint, String storageUuid) { void closeBlock(ExtendedBlock block, String delHint, String storageUuid,
boolean isTransientStorage) {
metrics.incrBlocksWritten(); metrics.incrBlocksWritten();
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); notifyNamenodeReceivedBlock(block, delHint, storageUuid,
if(bpos != null) { isTransientStorage);
bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
} else {
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
}
} }
/** Start a single datanode daemon and wait for it to finish. /** Start a single datanode daemon and wait for it to finish.
@ -2718,7 +2714,7 @@ public class DataNode extends ReconfigurableBase
public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock, public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
final long recoveryId, final long newBlockId, final long newLength) final long recoveryId, final long newBlockId, final long newLength)
throws IOException { throws IOException {
final String storageID = data.updateReplicaUnderRecovery(oldBlock, final Replica r = data.updateReplicaUnderRecovery(oldBlock,
recoveryId, newBlockId, newLength); recoveryId, newBlockId, newLength);
// Notify the namenode of the updated block info. This is important // Notify the namenode of the updated block info. This is important
// for HA, since otherwise the standby node may lose track of the // for HA, since otherwise the standby node may lose track of the
@ -2727,7 +2723,9 @@ public class DataNode extends ReconfigurableBase
newBlock.setGenerationStamp(recoveryId); newBlock.setGenerationStamp(recoveryId);
newBlock.setBlockId(newBlockId); newBlock.setBlockId(newBlockId);
newBlock.setNumBytes(newLength); newBlock.setNumBytes(newLength);
notifyNamenodeReceivedBlock(newBlock, "", storageID); final String storageID = r.getStorageUuid();
notifyNamenodeReceivedBlock(newBlock, null, storageID,
r.isOnTransientStorage());
return storageID; return storageID;
} }

View File

@ -672,7 +672,9 @@ class DataXceiver extends Receiver implements Runnable {
String firstBadLink = ""; // first datanode that failed in connection setup String firstBadLink = ""; // first datanode that failed in connection setup
Status mirrorInStatus = SUCCESS; Status mirrorInStatus = SUCCESS;
final String storageUuid; final String storageUuid;
final boolean isOnTransientStorage;
try { try {
final Replica replica;
if (isDatanode || if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver // open a block receiver
@ -682,12 +684,13 @@ class DataXceiver extends Receiver implements Runnable {
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum, clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist, pinning); cachingStrategy, allowLazyPersist, pinning);
replica = blockReceiver.getReplica();
storageUuid = blockReceiver.getStorageUuid();
} else { } else {
storageUuid = datanode.data.recoverClose( replica = datanode.data.recoverClose(
block, latestGenerationStamp, minBytesRcvd); block, latestGenerationStamp, minBytesRcvd);
} }
storageUuid = replica.getStorageUuid();
isOnTransientStorage = replica.isOnTransientStorage();
// //
// Connect to downstream machine, if appropriate // Connect to downstream machine, if appropriate
@ -830,7 +833,7 @@ class DataXceiver extends Receiver implements Runnable {
// the block is finalized in the PacketResponder. // the block is finalized in the PacketResponder.
if (isDatanode || if (isDatanode ||
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT, storageUuid); datanode.closeBlock(block, null, storageUuid, isOnTransientStorage);
LOG.info("Received " + block + " src: " + remoteAddress + " dest: " LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
+ localAddress + " of size " + block.getNumBytes()); + localAddress + " of size " + block.getNumBytes());
} }
@ -1146,8 +1149,9 @@ class DataXceiver extends Receiver implements Runnable {
dataXceiverServer.balanceThrottler, null, true); dataXceiverServer.balanceThrottler, null, true);
// notify name node // notify name node
final Replica r = blockReceiver.getReplica();
datanode.notifyNamenodeReceivedBlock( datanode.notifyNamenodeReceivedBlock(
block, delHint, blockReceiver.getStorageUuid()); block, delHint, r.getStorageUuid(), r.isOnTransientStorage());
LOG.info("Moved " + block + " from " + peer.getRemoteAddressString() LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
+ ", delHint=" + delHint); + ", delHint=" + delHint);

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -33,6 +34,8 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -42,6 +45,9 @@ import com.google.common.collect.Maps;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class IncrementalBlockReportManager { class IncrementalBlockReportManager {
private static final Logger LOG = LoggerFactory.getLogger(
IncrementalBlockReportManager.class);
private static class PerStorageIBR { private static class PerStorageIBR {
/** The blocks in this IBR. */ /** The blocks in this IBR. */
final Map<Block, ReceivedDeletedBlockInfo> blocks = Maps.newHashMap(); final Map<Block, ReceivedDeletedBlockInfo> blocks = Maps.newHashMap();
@ -103,8 +109,29 @@ class IncrementalBlockReportManager {
*/ */
private volatile boolean readyToSend = false; private volatile boolean readyToSend = false;
/** The time interval between two IBRs. */
private final long ibrInterval;
/** The timestamp of the last IBR. */
private volatile long lastIBR;
IncrementalBlockReportManager(final long ibrInterval) {
this.ibrInterval = ibrInterval;
this.lastIBR = monotonicNow() - ibrInterval;
}
boolean sendImmediately() { boolean sendImmediately() {
return readyToSend; return readyToSend && monotonicNow() - ibrInterval >= lastIBR;
}
synchronized void waitTillNextIBR(long waitTime) {
if (waitTime > 0 && !sendImmediately()) {
try {
wait(ibrInterval > 0 && ibrInterval < waitTime? ibrInterval: waitTime);
} catch (InterruptedException ie) {
LOG.warn(getClass().getSimpleName() + " interrupted");
}
}
} }
private synchronized StorageReceivedDeletedBlocks[] generateIBRs() { private synchronized StorageReceivedDeletedBlocks[] generateIBRs() {
@ -144,6 +171,9 @@ class IncrementalBlockReportManager {
} }
// Send incremental block reports to the Namenode outside the lock // Send incremental block reports to the Namenode outside the lock
if (LOG.isDebugEnabled()) {
LOG.debug("call blockReceivedAndDeleted: " + Arrays.toString(reports));
}
boolean success = false; boolean success = false;
final long startTime = monotonicNow(); final long startTime = monotonicNow();
try { try {
@ -151,7 +181,9 @@ class IncrementalBlockReportManager {
success = true; success = true;
} finally { } finally {
metrics.addIncrementalBlockReport(monotonicNow() - startTime); metrics.addIncrementalBlockReport(monotonicNow() - startTime);
if (!success) { if (success) {
lastIBR = startTime;
} else {
// If we didn't succeed in sending the report, put all of the // If we didn't succeed in sending the report, put all of the
// blocks back onto our queue, but only in the case where we // blocks back onto our queue, but only in the case where we
// didn't put something newer in the meantime. // didn't put something newer in the meantime.
@ -191,7 +223,7 @@ class IncrementalBlockReportManager {
} }
synchronized void notifyNamenodeBlock(ReceivedDeletedBlockInfo rdbi, synchronized void notifyNamenodeBlock(ReceivedDeletedBlockInfo rdbi,
DatanodeStorage storage) { DatanodeStorage storage, boolean isOnTransientStorage) {
addRDBI(rdbi, storage); addRDBI(rdbi, storage);
final BlockStatus status = rdbi.getStatus(); final BlockStatus status = rdbi.getStatus();
@ -200,18 +232,23 @@ class IncrementalBlockReportManager {
readyToSend = true; readyToSend = true;
} else if (status == BlockStatus.RECEIVED_BLOCK) { } else if (status == BlockStatus.RECEIVED_BLOCK) {
// the report is sent right away. // the report is sent right away.
triggerIBR(); triggerIBR(isOnTransientStorage);
} }
} }
synchronized void triggerIBR() { synchronized void triggerIBR(boolean force) {
readyToSend = true; readyToSend = true;
if (force) {
lastIBR = monotonicNow() - ibrInterval;
}
if (sendImmediately()) {
notifyAll(); notifyAll();
} }
}
@VisibleForTesting @VisibleForTesting
synchronized void triggerDeletionReportForTests() { synchronized void triggerDeletionReportForTests() {
triggerIBR(); triggerIBR(true);
while (sendImmediately()) { while (sendImmediately()) {
try { try {

View File

@ -374,7 +374,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @return the storage uuid of the replica. * @return the storage uuid of the replica.
* @throws IOException * @throws IOException
*/ */
String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
) throws IOException; ) throws IOException;
/** /**
@ -524,7 +524,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* Update replica's generation stamp and length and finalize it. * Update replica's generation stamp and length and finalize it.
* @return the ID of storage that stores the block * @return the ID of storage that stores the block
*/ */
String updateReplicaUnderRecovery(ExtendedBlock oldBlock, Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long newBlockId, long newLength) throws IOException; long recoveryId, long newBlockId, long newLength) throws IOException;
/** /**

View File

@ -1281,7 +1281,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized String recoverClose(ExtendedBlock b, long newGS, public synchronized Replica recoverClose(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException { long expectedBlockLen) throws IOException {
LOG.info("Recover failed close " + b); LOG.info("Recover failed close " + b);
// check replica's state // check replica's state
@ -1292,7 +1292,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (replicaInfo.getState() == ReplicaState.RBW) { if (replicaInfo.getState() == ReplicaState.RBW) {
finalizeReplica(b.getBlockPoolId(), replicaInfo); finalizeReplica(b.getBlockPoolId(), replicaInfo);
} }
return replicaInfo.getStorageUuid(); return replicaInfo;
} }
/** /**
@ -2424,7 +2424,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized String updateReplicaUnderRecovery( public synchronized Replica updateReplicaUnderRecovery(
final ExtendedBlock oldBlock, final ExtendedBlock oldBlock,
final long recoveryId, final long recoveryId,
final long newBlockId, final long newBlockId,
@ -2484,8 +2484,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
//check replica files after update //check replica files after update
checkReplicaFiles(finalized); checkReplicaFiles(finalized);
//return storage ID return finalized;
return getVolume(new ExtendedBlock(bpid, finalized)).getStorageID();
} }
private FinalizedReplica updateReplicaUnderRecovery( private FinalizedReplica updateReplicaUnderRecovery(
@ -2826,7 +2825,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
datanode.getShortCircuitRegistry().processBlockInvalidation( datanode.getShortCircuitRegistry().processBlockInvalidation(
ExtendedBlockId.fromExtendedBlock(extendedBlock)); ExtendedBlockId.fromExtendedBlock(extendedBlock));
datanode.notifyNamenodeReceivedBlock( datanode.notifyNamenodeReceivedBlock(
extendedBlock, null, newReplicaInfo.getStorageUuid()); extendedBlock, null, newReplicaInfo.getStorageUuid(),
newReplicaInfo.isOnTransientStorage());
// Remove the old replicas // Remove the old replicas
if (blockFile.delete() || !blockFile.exists()) { if (blockFile.delete() || !blockFile.exists()) {

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.protocol; package org.apache.hadoop.hdfs.server.protocol;
import java.util.Arrays;
/** /**
* Report of block received and deleted per Datanode * Report of block received and deleted per Datanode
* storage. * storage.
@ -51,4 +53,9 @@ public class StorageReceivedDeletedBlocks {
this.storage = storage; this.storage = storage;
this.blocks = blocks; this.blocks = blocks;
} }
@Override
public String toString() {
return storage + Arrays.toString(blocks);
}
} }

View File

@ -899,7 +899,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
throws IOException { throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock()); BInfo binfo = map.get(b.getLocalBlock());
@ -913,7 +913,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
map.remove(b.getLocalBlock()); map.remove(b.getLocalBlock());
binfo.theBlock.setGenerationStamp(newGS); binfo.theBlock.setGenerationStamp(newGS);
map.put(binfo.theBlock, binfo); map.put(binfo.theBlock, binfo);
return binfo.getStorageUuid(); return binfo;
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
@ -1192,12 +1192,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, public Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long recoveryId,
long newBlockId, long newBlockId,
long newlength) { long newlength) throws IOException {
// Caller does not care about the exact Storage UUID returned. return getMap(oldBlock.getBlockPoolId()).get(oldBlock.getLocalBlock());
return datanodeUuid;
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi

View File

@ -193,7 +193,7 @@ public class TestBPOfferService {
waitForBlockReport(mockNN2); waitForBlockReport(mockNN2);
// When we receive a block, it should report it to both NNs // When we receive a block, it should report it to both NNs
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, "", ""); bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1); ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1);
assertEquals(1, ret.length); assertEquals(1, ret.length);

View File

@ -0,0 +1,263 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
/**
* This test verifies that incremental block reports are sent in batch mode
* and the namenode allows closing a file with COMMITTED blocks.
*/
public class TestBatchIbr {
public static final Log LOG = LogFactory.getLog(TestBatchIbr.class);
private static final short NUM_DATANODES = 4;
private static final int BLOCK_SIZE = 1024;
private static final int MAX_BLOCK_NUM = 8;
private static final int NUM_FILES = 1000;
private static final int NUM_THREADS = 128;
private static final ThreadLocalBuffer IO_BUF = new ThreadLocalBuffer();
private static final ThreadLocalBuffer VERIFY_BUF = new ThreadLocalBuffer();
static {
GenericTestUtils.setLogLevel(
LogFactory.getLog(IncrementalBlockReportManager.class), Level.ALL);
}
static HdfsConfiguration newConf(long ibrInterval) throws IOException {
final HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setBoolean(ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true);
if (ibrInterval > 0) {
conf.setLong(DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, ibrInterval);
}
return conf;
}
static ExecutorService createExecutor() throws Exception {
final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
final ExecutorCompletionService<Path> completion
= new ExecutorCompletionService<>(executor);
// initialize all threads and buffers
for(int i = 0; i < NUM_THREADS; i++) {
completion.submit(new Callable<Path>() {
@Override
public Path call() throws Exception {
IO_BUF.get();
VERIFY_BUF.get();
return null;
}
});
}
for(int i = 0; i < NUM_THREADS; i++) {
completion.take().get();
}
return executor;
}
static void runIbrTest(final long ibrInterval) throws Exception {
final ExecutorService executor = createExecutor();
final Random ran = new Random();
final Configuration conf = newConf(ibrInterval);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_DATANODES).build();
final DistributedFileSystem dfs = cluster.getFileSystem();
try {
final String dirPathString = "/dir";
final Path dir = new Path(dirPathString);
dfs.mkdirs(dir);
// start testing
final long testStartTime = Time.monotonicNow();
final ExecutorCompletionService<Path> createService
= new ExecutorCompletionService<>(executor);
final AtomicLong createFileTime = new AtomicLong();
final AtomicInteger numBlockCreated = new AtomicInteger();
// create files
for(int i = 0; i < NUM_FILES; i++) {
createService.submit(new Callable<Path>() {
@Override
public Path call() throws Exception {
final long start = Time.monotonicNow();
try {
final long seed = ran.nextLong();
final int numBlocks = ran.nextInt(MAX_BLOCK_NUM) + 1;
numBlockCreated.addAndGet(numBlocks);
return createFile(dir, numBlocks, seed, dfs);
} finally {
createFileTime.addAndGet(Time.monotonicNow() - start);
}
}
});
}
// verify files
final ExecutorCompletionService<Boolean> verifyService
= new ExecutorCompletionService<>(executor);
final AtomicLong verifyFileTime = new AtomicLong();
for(int i = 0; i < NUM_FILES; i++) {
final Path file = createService.take().get();
verifyService.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
final long start = Time.monotonicNow();
try {
return verifyFile(file, dfs);
} finally {
verifyFileTime.addAndGet(Time.monotonicNow() - start);
}
}
});
}
for(int i = 0; i < NUM_FILES; i++) {
Assert.assertTrue(verifyService.take().get());
}
final long testEndTime = Time.monotonicNow();
LOG.info("ibrInterval=" + ibrInterval + " ("
+ toConfString(DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, conf)
+ "), numBlockCreated=" + numBlockCreated);
LOG.info("duration=" + toSecondString(testEndTime - testStartTime)
+ ", createFileTime=" + toSecondString(createFileTime.get())
+ ", verifyFileTime=" + toSecondString(verifyFileTime.get()));
LOG.info("NUM_FILES=" + NUM_FILES
+ ", MAX_BLOCK_NUM=" + MAX_BLOCK_NUM
+ ", BLOCK_SIZE=" + BLOCK_SIZE
+ ", NUM_THREADS=" + NUM_THREADS
+ ", NUM_DATANODES=" + NUM_DATANODES);
logIbrCounts(cluster.getDataNodes());
} finally {
executor.shutdown();
cluster.shutdown();
}
}
static String toConfString(String key, Configuration conf) {
return key + "=" + conf.get(key);
}
static String toSecondString(long ms) {
return (ms/1000.0) + "s";
}
static void logIbrCounts(List<DataNode> datanodes) {
final String name = "IncrementalBlockReportsNumOps";
for(DataNode dn : datanodes) {
final MetricsRecordBuilder m = MetricsAsserts.getMetrics(
dn.getMetrics().name());
final long ibr = MetricsAsserts.getLongCounter(name, m);
LOG.info(dn.getDisplayName() + ": " + name + "=" + ibr);
}
}
static class ThreadLocalBuffer extends ThreadLocal<byte[]> {
@Override
protected byte[] initialValue() {
return new byte[BLOCK_SIZE];
}
}
static byte[] nextBytes(int blockIndex, long seed, byte[] bytes) {
byte b = (byte)(seed ^ (seed >> blockIndex));
for(int i = 0; i < bytes.length; i++) {
bytes[i] = b++;
}
return bytes;
}
static Path createFile(Path dir, int numBlocks, long seed,
DistributedFileSystem dfs) throws IOException {
final Path f = new Path(dir, seed + "_" + numBlocks);
final byte[] bytes = IO_BUF.get();
try(FSDataOutputStream out = dfs.create(f)) {
for(int i = 0; i < numBlocks; i++) {
out.write(nextBytes(i, seed, bytes));
}
}
return f;
}
static boolean verifyFile(Path f, DistributedFileSystem dfs) {
final long seed;
final int numBlocks;
{
final String name = f.getName();
final int i = name.indexOf('_');
seed = Long.parseLong(name.substring(0, i));
numBlocks = Integer.parseInt(name.substring(i + 1));
}
final byte[] computed = IO_BUF.get();
final byte[] expected = VERIFY_BUF.get();
try(FSDataInputStream in = dfs.open(f)) {
for(int i = 0; i < numBlocks; i++) {
in.read(computed);
nextBytes(i, seed, expected);
Assert.assertArrayEquals(expected, computed);
}
return true;
} catch(Exception e) {
LOG.error("Failed to verify file " + f);
return false;
}
}
@Test
public void testIbr() throws Exception {
runIbrTest(0L);
runIbrTest(100L);
}
}

View File

@ -138,12 +138,14 @@ public class TestDataXceiverLazyPersistHint {
PeerLocality locality, PeerLocality locality,
NonLocalLazyPersist nonLocalLazyPersist, NonLocalLazyPersist nonLocalLazyPersist,
final ArgumentCaptor<Boolean> captor) throws IOException { final ArgumentCaptor<Boolean> captor) throws IOException {
final BlockReceiver mockBlockReceiver = mock(BlockReceiver.class);
doReturn(mock(Replica.class)).when(mockBlockReceiver).getReplica();
DataXceiver xceiverSpy = spy(DataXceiver.create( DataXceiver xceiverSpy = spy(DataXceiver.create(
getMockPeer(locality), getMockPeer(locality),
getMockDn(nonLocalLazyPersist), getMockDn(nonLocalLazyPersist),
mock(DataXceiverServer.class))); mock(DataXceiverServer.class)));
doReturn(mockBlockReceiver).when(xceiverSpy).getBlockReceiver(
doReturn(mock(BlockReceiver.class)).when(xceiverSpy).getBlockReceiver(
any(ExtendedBlock.class), any(StorageType.class), any(ExtendedBlock.class), any(StorageType.class),
any(DataInputStream.class), anyString(), anyString(), any(DataInputStream.class), anyString(), anyString(),
any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(), any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(),

View File

@ -88,7 +88,7 @@ public class TestIncrementalBlockReports {
ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo( ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null); getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null);
DatanodeStorage s = singletonDn.getFSDataset().getStorage(storageUuid); DatanodeStorage s = singletonDn.getFSDataset().getStorage(storageUuid);
actor.getIbrManager().notifyNamenodeBlock(rdbi, s); actor.getIbrManager().notifyNamenodeBlock(rdbi, s, false);
} }
/** /**

View File

@ -176,7 +176,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
} }
@Override @Override
public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlkLen)
throws IOException { throws IOException {
return null; return null;
} }
@ -271,7 +271,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
} }
@Override @Override
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, public Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long newBlockId, long newLength) throws IOException { long recoveryId, long newBlockId, long newLength) throws IOException {
return null; return null;
} }

View File

@ -328,7 +328,6 @@ public class TestInterDatanodeProtocol {
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive(); cluster.waitActive();
String bpid = cluster.getNamesystem().getBlockPoolId();
//create a file //create a file
DistributedFileSystem dfs = cluster.getFileSystem(); DistributedFileSystem dfs = cluster.getFileSystem();
@ -379,10 +378,11 @@ public class TestInterDatanodeProtocol {
} }
//update //update
final String storageID = fsdataset.updateReplicaUnderRecovery( final Replica r = fsdataset.updateReplicaUnderRecovery(
new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid,
rri.getBlockId(), newlength); rri.getBlockId(), newlength);
assertTrue(storageID != null); assertTrue(r != null);
assertTrue(r.getStorageUuid() != null);
} finally { } finally {
if (cluster != null) cluster.shutdown(); if (cluster != null) cluster.shutdown();