HDFS-7990. IBR delete ack should not be delayed. Contributed by Daryn Sharp.

(cherry picked from commit 60882ab26d)
This commit is contained in:
Kihwal Lee 2015-03-27 09:06:23 -05:00
parent 36371cddd1
commit 577ea08865
5 changed files with 23 additions and 15 deletions

View File

@ -27,6 +27,8 @@ Release 2.8.0 - UNRELEASED
HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes
a lot of time if disks are busy (Rushabh S Shah via kihwal) a lot of time if disks are busy (Rushabh S Shah via kihwal)
HDFS-7990. IBR delete ack should not be delayed. (daryn via kihwal)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -82,12 +82,11 @@ class BPServiceActor implements Runnable {
final BPOfferService bpos; final BPOfferService bpos;
// lastBlockReport, lastDeletedReport and lastHeartbeat may be assigned/read // lastBlockReport and lastHeartbeat may be assigned/read
// by testing threads (through BPServiceActor#triggerXXX), while also // by testing threads (through BPServiceActor#triggerXXX), while also
// assigned/read by the actor thread. Thus they should be declared as volatile // assigned/read by the actor thread. Thus they should be declared as volatile
// to make sure the "happens-before" consistency. // to make sure the "happens-before" consistency.
volatile long lastBlockReport = 0; volatile long lastBlockReport = 0;
volatile long lastDeletedReport = 0;
boolean resetBlockReportTime = true; boolean resetBlockReportTime = true;
@ -417,10 +416,10 @@ class BPServiceActor implements Runnable {
@VisibleForTesting @VisibleForTesting
void triggerDeletionReportForTests() { void triggerDeletionReportForTests() {
synchronized (pendingIncrementalBRperStorage) { synchronized (pendingIncrementalBRperStorage) {
lastDeletedReport = 0; sendImmediateIBR = true;
pendingIncrementalBRperStorage.notifyAll(); pendingIncrementalBRperStorage.notifyAll();
while (lastDeletedReport == 0) { while (sendImmediateIBR) {
try { try {
pendingIncrementalBRperStorage.wait(100); pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -465,7 +464,6 @@ class BPServiceActor implements Runnable {
// or we will report an RBW replica after the BlockReport already reports // or we will report an RBW replica after the BlockReport already reports
// a FINALIZED one. // a FINALIZED one.
reportReceivedDeletedBlocks(); reportReceivedDeletedBlocks();
lastDeletedReport = startTime;
long brCreateStartTime = monotonicNow(); long brCreateStartTime = monotonicNow();
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists = Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
@ -674,7 +672,6 @@ class BPServiceActor implements Runnable {
*/ */
private void offerService() throws Exception { private void offerService() throws Exception {
LOG.info("For namenode " + nnAddr + " using" LOG.info("For namenode " + nnAddr + " using"
+ " DELETEREPORT_INTERVAL of " + dnConf.deleteReportInterval + " msec "
+ " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec" + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
+ " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec" + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
+ " Initial delay: " + dnConf.initialBlockReportDelay + "msec" + " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
@ -690,7 +687,9 @@ class BPServiceActor implements Runnable {
// //
// Every so often, send heartbeat or block-report // Every so often, send heartbeat or block-report
// //
if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) { boolean sendHeartbeat =
startTime - lastHeartbeat >= dnConf.heartBeatInterval;
if (sendHeartbeat) {
// //
// All heartbeat messages include following info: // All heartbeat messages include following info:
// -- Datanode name // -- Datanode name
@ -729,10 +728,8 @@ class BPServiceActor implements Runnable {
} }
} }
} }
if (sendImmediateIBR || if (sendImmediateIBR || sendHeartbeat) {
(startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
reportReceivedDeletedBlocks(); reportReceivedDeletedBlocks();
lastDeletedReport = startTime;
} }
List<DatanodeCommand> cmds = blockReport(); List<DatanodeCommand> cmds = blockReport();

View File

@ -82,7 +82,6 @@ public class DNConf {
final long heartBeatInterval; final long heartBeatInterval;
final long blockReportInterval; final long blockReportInterval;
final long blockReportSplitThreshold; final long blockReportSplitThreshold;
final long deleteReportInterval;
final long initialBlockReportDelay; final long initialBlockReportDelay;
final long cacheReportInterval; final long cacheReportInterval;
final long dfsclientSlowIoWarningThresholdMs; final long dfsclientSlowIoWarningThresholdMs;
@ -164,7 +163,6 @@ public class DNConf {
heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L; DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
this.deleteReportInterval = 100 * heartBeatInterval;
// do we need to sync block file contents to disk when blockfile is closed? // do we need to sync block file contents to disk when blockfile is closed?
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
DFS_DATANODE_SYNCONCLOSE_DEFAULT); DFS_DATANODE_SYNCONCLOSE_DEFAULT);

View File

@ -84,7 +84,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override @Override
public SimulatedFSDataset newInstance(DataNode datanode, public SimulatedFSDataset newInstance(DataNode datanode,
DataStorage storage, Configuration conf) throws IOException { DataStorage storage, Configuration conf) throws IOException {
return new SimulatedFSDataset(storage, conf); return new SimulatedFSDataset(datanode, storage, conf);
} }
@Override @Override
@ -509,8 +509,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
private final SimulatedStorage storage; private final SimulatedStorage storage;
private final SimulatedVolume volume; private final SimulatedVolume volume;
private final String datanodeUuid; private final String datanodeUuid;
private final DataNode datanode;
public SimulatedFSDataset(DataStorage storage, Configuration conf) { public SimulatedFSDataset(DataStorage storage, Configuration conf) {
this(null, storage, conf);
}
public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration conf) {
this.datanode = datanode;
if (storage != null) { if (storage != null) {
for (int i = 0; i < storage.getNumStorageDirs(); ++i) { for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
storage.createStorageID(storage.getStorageDir(i), false); storage.createStorageID(storage.getStorageDir(i), false);
@ -737,6 +744,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
storage.free(bpid, binfo.getNumBytes()); storage.free(bpid, binfo.getNumBytes());
map.remove(b); map.remove(b);
if (datanode != null) {
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, b),
binfo.getStorageUuid());
}
} }
if (error) { if (error) {
throw new IOException("Invalidate: Missing blocks."); throw new IOException("Invalidate: Missing blocks.");

View File

@ -159,8 +159,8 @@ public class TestIncrementalBlockReports {
anyString(), anyString(),
any(StorageReceivedDeletedBlocks[].class)); any(StorageReceivedDeletedBlocks[].class));
// Trigger a block report, this also triggers an IBR. // Trigger a heartbeat, this also triggers an IBR.
DataNodeTestUtils.triggerBlockReport(singletonDn); DataNodeTestUtils.triggerHeartbeat(singletonDn);
Thread.sleep(2000); Thread.sleep(2000);
// Ensure that the deleted block is reported. // Ensure that the deleted block is reported.