HDFS-15113. Missing IBR when NameNode restart if open processCommand async feature. Contributed by Xiaoqiao He.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org> Reviewed-by: Brahma Reddy Battula <brahma@apache.org> Reviewed-by: Inigo Goiri <inigoiri@apache.org>
This commit is contained in:
parent
b9d825f178
commit
e9955bb8ff
|
@ -922,14 +922,17 @@ class BPServiceActor implements Runnable {
|
|||
// re-retrieve namespace info to make sure that, if the NN
|
||||
// was restarted, we still match its version (HDFS-2120)
|
||||
NamespaceInfo nsInfo = retrieveNamespaceInfo();
|
||||
// and re-register
|
||||
register(nsInfo);
|
||||
scheduler.scheduleHeartbeat();
|
||||
// HDFS-9917,Standby NN IBR can be very huge if standby namenode is down
|
||||
// for sometime.
|
||||
if (state == HAServiceState.STANDBY || state == HAServiceState.OBSERVER) {
|
||||
ibrManager.clearIBRs();
|
||||
}
|
||||
// HDFS-15113, register and trigger FBR after clean IBR to avoid missing
|
||||
// some blocks report to Standby util next FBR.
|
||||
// and re-register
|
||||
register(nsInfo);
|
||||
scheduler.scheduleHeartbeat();
|
||||
DataNodeFaultInjector.get().blockUtilSendFullBlockReport();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -100,4 +100,9 @@ public class DataNodeFaultInjector {
|
|||
* Used as a hook to inject intercept when BPOfferService hold lock.
|
||||
*/
|
||||
public void delayWhenOfferServiceHoldLock() {}
|
||||
|
||||
/**
|
||||
* Used as a hook to inject intercept when re-register.
|
||||
*/
|
||||
public void blockUtilSendFullBlockReport() {}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
|||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
||||
|
@ -56,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset.SimulatedStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
|
@ -104,6 +106,8 @@ public class TestBPOfferService {
|
|||
private long firstLeaseId = 0;
|
||||
private long secondLeaseId = 0;
|
||||
private long nextFullBlockReportLeaseId = 1L;
|
||||
private int fullBlockReportCount = 0;
|
||||
private int incrBlockReportCount = 0;
|
||||
|
||||
static {
|
||||
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
|
||||
|
@ -226,6 +230,14 @@ public class TestBPOfferService {
|
|||
}
|
||||
}
|
||||
|
||||
private void setBlockReportCount(int count) {
|
||||
fullBlockReportCount = count;
|
||||
}
|
||||
|
||||
private void setIncreaseBlockReportCount(int count) {
|
||||
incrBlockReportCount += count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the BPOS can register to talk to two different NNs,
|
||||
* sends block reports to both, etc.
|
||||
|
@ -262,6 +274,76 @@ public class TestBPOfferService {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* HDFS-15113: Test and verify missing block when re-register.
|
||||
*/
|
||||
@Test
|
||||
public void testMissBlocksWhenReregister() throws Exception {
|
||||
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
|
||||
bpos.start();
|
||||
try {
|
||||
waitForBothActors(bpos);
|
||||
waitForInitialization(bpos);
|
||||
|
||||
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
|
||||
public void blockUtilSendFullBlockReport() {
|
||||
try {
|
||||
Thread.sleep(200);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
countBlockReportItems(FAKE_BLOCK, mockNN1);
|
||||
int totalTestBlocks = 4000;
|
||||
Thread addNewBlockThread = new Thread(() -> {
|
||||
for (int i = 0; i < totalTestBlocks; i++) {
|
||||
SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
|
||||
SimulatedStorage simulatedStorage = fsDataset.getStorages().get(0);
|
||||
String storageId = simulatedStorage.getStorageUuid();
|
||||
ExtendedBlock b = new ExtendedBlock(bpos.getBlockPoolId(), i, 0, i);
|
||||
try {
|
||||
fsDataset.createRbw(StorageType.DEFAULT, storageId, b, false);
|
||||
bpos.notifyNamenodeReceivingBlock(b, storageId);
|
||||
fsDataset.finalizeBlock(b, false);
|
||||
Thread.sleep(1);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
addNewBlockThread.start();
|
||||
|
||||
// Make sure that generate blocks for DataNode and IBR not empty now.
|
||||
Thread.sleep(200);
|
||||
// Trigger re-register using DataNode Command.
|
||||
datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER};
|
||||
bpos.triggerHeartbeatForTests();
|
||||
|
||||
try {
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
if(fullBlockReportCount == totalTestBlocks ||
|
||||
incrBlockReportCount == totalTestBlocks) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}, 1000, 15000);
|
||||
} catch (Exception e) {}
|
||||
|
||||
// Verify FBR/IBR count is equal to generate number.
|
||||
assertTrue(fullBlockReportCount == totalTestBlocks ||
|
||||
incrBlockReportCount == totalTestBlocks);
|
||||
} finally {
|
||||
bpos.stop();
|
||||
bpos.join();
|
||||
|
||||
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
|
||||
public void blockUtilSendFullBlockReport() {}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocklessBlockPoolId() throws Exception {
|
||||
BPOfferService bpos = Mockito.spy(setupBPOSForNNs(mockNN1));
|
||||
|
@ -612,6 +694,36 @@ public class TestBPOfferService {
|
|||
secondCallTime = Time.now();
|
||||
}
|
||||
}
|
||||
|
||||
private void countBlockReportItems(final ExtendedBlock fakeBlock,
|
||||
final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
|
||||
final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
|
||||
final ArgumentCaptor<StorageBlockReport[]> captor =
|
||||
ArgumentCaptor.forClass(StorageBlockReport[].class);
|
||||
|
||||
Mockito.doAnswer((Answer<Object>) invocation -> {
|
||||
Object[] arguments = invocation.getArguments();
|
||||
StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
|
||||
setBlockReportCount(list[0].getBlocks().getNumberOfBlocks());
|
||||
return null;
|
||||
}).when(mockNN).blockReport(
|
||||
Mockito.any(),
|
||||
Mockito.eq(fakeBlockPoolId),
|
||||
captor.capture(),
|
||||
Mockito.any()
|
||||
);
|
||||
|
||||
Mockito.doAnswer((Answer<Object>) invocation -> {
|
||||
Object[] arguments = invocation.getArguments();
|
||||
StorageReceivedDeletedBlocks[] list =
|
||||
(StorageReceivedDeletedBlocks[])arguments[2];
|
||||
setIncreaseBlockReportCount(list[0].getBlocks().length);
|
||||
return null;
|
||||
}).when(mockNN).blockReceivedAndDeleted(
|
||||
Mockito.any(),
|
||||
Mockito.eq(fakeBlockPoolId),
|
||||
Mockito.any());
|
||||
}
|
||||
|
||||
private class BPOfferServiceSynchronousCallAnswer implements Answer<Void> {
|
||||
private final int nnIdx;
|
||||
|
|
Loading…
Reference in New Issue