HDFS-7990. IBR delete ack should not be delayed. Contributed by Daryn Sharp.
This commit is contained in:
parent
af618f23a7
commit
60882ab26d
|
@ -342,6 +342,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes
|
HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes
|
||||||
a lot of time if disks are busy (Rushabh S Shah via kihwal)
|
a lot of time if disks are busy (Rushabh S Shah via kihwal)
|
||||||
|
|
||||||
|
HDFS-7990. IBR delete ack should not be delayed. (daryn via kihwal)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -82,12 +82,11 @@ class BPServiceActor implements Runnable {
|
||||||
|
|
||||||
final BPOfferService bpos;
|
final BPOfferService bpos;
|
||||||
|
|
||||||
// lastBlockReport, lastDeletedReport and lastHeartbeat may be assigned/read
|
// lastBlockReport and lastHeartbeat may be assigned/read
|
||||||
// by testing threads (through BPServiceActor#triggerXXX), while also
|
// by testing threads (through BPServiceActor#triggerXXX), while also
|
||||||
// assigned/read by the actor thread. Thus they should be declared as volatile
|
// assigned/read by the actor thread. Thus they should be declared as volatile
|
||||||
// to make sure the "happens-before" consistency.
|
// to make sure the "happens-before" consistency.
|
||||||
volatile long lastBlockReport = 0;
|
volatile long lastBlockReport = 0;
|
||||||
volatile long lastDeletedReport = 0;
|
|
||||||
|
|
||||||
boolean resetBlockReportTime = true;
|
boolean resetBlockReportTime = true;
|
||||||
|
|
||||||
|
@ -417,10 +416,10 @@ class BPServiceActor implements Runnable {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void triggerDeletionReportForTests() {
|
void triggerDeletionReportForTests() {
|
||||||
synchronized (pendingIncrementalBRperStorage) {
|
synchronized (pendingIncrementalBRperStorage) {
|
||||||
lastDeletedReport = 0;
|
sendImmediateIBR = true;
|
||||||
pendingIncrementalBRperStorage.notifyAll();
|
pendingIncrementalBRperStorage.notifyAll();
|
||||||
|
|
||||||
while (lastDeletedReport == 0) {
|
while (sendImmediateIBR) {
|
||||||
try {
|
try {
|
||||||
pendingIncrementalBRperStorage.wait(100);
|
pendingIncrementalBRperStorage.wait(100);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -465,7 +464,6 @@ class BPServiceActor implements Runnable {
|
||||||
// or we will report an RBW replica after the BlockReport already reports
|
// or we will report an RBW replica after the BlockReport already reports
|
||||||
// a FINALIZED one.
|
// a FINALIZED one.
|
||||||
reportReceivedDeletedBlocks();
|
reportReceivedDeletedBlocks();
|
||||||
lastDeletedReport = startTime;
|
|
||||||
|
|
||||||
long brCreateStartTime = monotonicNow();
|
long brCreateStartTime = monotonicNow();
|
||||||
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
||||||
|
@ -674,7 +672,6 @@ class BPServiceActor implements Runnable {
|
||||||
*/
|
*/
|
||||||
private void offerService() throws Exception {
|
private void offerService() throws Exception {
|
||||||
LOG.info("For namenode " + nnAddr + " using"
|
LOG.info("For namenode " + nnAddr + " using"
|
||||||
+ " DELETEREPORT_INTERVAL of " + dnConf.deleteReportInterval + " msec "
|
|
||||||
+ " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
|
+ " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
|
||||||
+ " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
|
+ " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
|
||||||
+ " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
|
+ " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
|
||||||
|
@ -690,7 +687,9 @@ class BPServiceActor implements Runnable {
|
||||||
//
|
//
|
||||||
// Every so often, send heartbeat or block-report
|
// Every so often, send heartbeat or block-report
|
||||||
//
|
//
|
||||||
if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
|
boolean sendHeartbeat =
|
||||||
|
startTime - lastHeartbeat >= dnConf.heartBeatInterval;
|
||||||
|
if (sendHeartbeat) {
|
||||||
//
|
//
|
||||||
// All heartbeat messages include following info:
|
// All heartbeat messages include following info:
|
||||||
// -- Datanode name
|
// -- Datanode name
|
||||||
|
@ -729,10 +728,8 @@ class BPServiceActor implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (sendImmediateIBR ||
|
if (sendImmediateIBR || sendHeartbeat) {
|
||||||
(startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
|
|
||||||
reportReceivedDeletedBlocks();
|
reportReceivedDeletedBlocks();
|
||||||
lastDeletedReport = startTime;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
List<DatanodeCommand> cmds = blockReport();
|
List<DatanodeCommand> cmds = blockReport();
|
||||||
|
|
|
@ -82,7 +82,6 @@ public class DNConf {
|
||||||
final long heartBeatInterval;
|
final long heartBeatInterval;
|
||||||
final long blockReportInterval;
|
final long blockReportInterval;
|
||||||
final long blockReportSplitThreshold;
|
final long blockReportSplitThreshold;
|
||||||
final long deleteReportInterval;
|
|
||||||
final long initialBlockReportDelay;
|
final long initialBlockReportDelay;
|
||||||
final long cacheReportInterval;
|
final long cacheReportInterval;
|
||||||
final long dfsclientSlowIoWarningThresholdMs;
|
final long dfsclientSlowIoWarningThresholdMs;
|
||||||
|
@ -164,7 +163,6 @@ public class DNConf {
|
||||||
heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
|
heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
|
||||||
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
|
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
|
||||||
|
|
||||||
this.deleteReportInterval = 100 * heartBeatInterval;
|
|
||||||
// do we need to sync block file contents to disk when blockfile is closed?
|
// do we need to sync block file contents to disk when blockfile is closed?
|
||||||
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
|
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
|
||||||
DFS_DATANODE_SYNCONCLOSE_DEFAULT);
|
DFS_DATANODE_SYNCONCLOSE_DEFAULT);
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
@Override
|
@Override
|
||||||
public SimulatedFSDataset newInstance(DataNode datanode,
|
public SimulatedFSDataset newInstance(DataNode datanode,
|
||||||
DataStorage storage, Configuration conf) throws IOException {
|
DataStorage storage, Configuration conf) throws IOException {
|
||||||
return new SimulatedFSDataset(storage, conf);
|
return new SimulatedFSDataset(datanode, storage, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -509,8 +509,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
private final SimulatedStorage storage;
|
private final SimulatedStorage storage;
|
||||||
private final SimulatedVolume volume;
|
private final SimulatedVolume volume;
|
||||||
private final String datanodeUuid;
|
private final String datanodeUuid;
|
||||||
|
private final DataNode datanode;
|
||||||
|
|
||||||
|
|
||||||
public SimulatedFSDataset(DataStorage storage, Configuration conf) {
|
public SimulatedFSDataset(DataStorage storage, Configuration conf) {
|
||||||
|
this(null, storage, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration conf) {
|
||||||
|
this.datanode = datanode;
|
||||||
if (storage != null) {
|
if (storage != null) {
|
||||||
for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
|
for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
|
||||||
storage.createStorageID(storage.getStorageDir(i), false);
|
storage.createStorageID(storage.getStorageDir(i), false);
|
||||||
|
@ -737,6 +744,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
}
|
}
|
||||||
storage.free(bpid, binfo.getNumBytes());
|
storage.free(bpid, binfo.getNumBytes());
|
||||||
map.remove(b);
|
map.remove(b);
|
||||||
|
if (datanode != null) {
|
||||||
|
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, b),
|
||||||
|
binfo.getStorageUuid());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (error) {
|
if (error) {
|
||||||
throw new IOException("Invalidate: Missing blocks.");
|
throw new IOException("Invalidate: Missing blocks.");
|
||||||
|
|
|
@ -159,8 +159,8 @@ public class TestIncrementalBlockReports {
|
||||||
anyString(),
|
anyString(),
|
||||||
any(StorageReceivedDeletedBlocks[].class));
|
any(StorageReceivedDeletedBlocks[].class));
|
||||||
|
|
||||||
// Trigger a block report, this also triggers an IBR.
|
// Trigger a heartbeat, this also triggers an IBR.
|
||||||
DataNodeTestUtils.triggerBlockReport(singletonDn);
|
DataNodeTestUtils.triggerHeartbeat(singletonDn);
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
|
|
||||||
// Ensure that the deleted block is reported.
|
// Ensure that the deleted block is reported.
|
||||||
|
|
Loading…
Reference in New Issue