HDFS-15113. Addendum: Missing IBR when NameNode restart if open processCommand async feature. Contributed by Xiaoqiao He.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
He Xiaoqiao 2020-03-23 12:42:57 -07:00 committed by Wei-Chiu Chuang
parent e2c7ac71b5
commit af64ce2f4a
1 changed files with 27 additions and 6 deletions

View File

@ -281,23 +281,30 @@ public class TestBPOfferService {
public void testMissBlocksWhenReregister() throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
bpos.start();
int totalTestBlocks = 4000;
Thread addNewBlockThread = null;
final AtomicInteger count = new AtomicInteger(0);
try {
waitForBothActors(bpos);
waitForInitialization(bpos);
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
public void blockUtilSendFullBlockReport() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
GenericTestUtils.waitFor(() -> {
if(count.get() > 2000) {
return true;
}
return false;
}, 100, 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
});
countBlockReportItems(FAKE_BLOCK, mockNN1);
int totalTestBlocks = 4000;
Thread addNewBlockThread = new Thread(() -> {
addNewBlockThread = new Thread(() -> {
for (int i = 0; i < totalTestBlocks; i++) {
SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
SimulatedStorage simulatedStorage = fsDataset.getStorages().get(0);
@ -307,6 +314,7 @@ public class TestBPOfferService {
fsDataset.createRbw(StorageType.DEFAULT, storageId, b, false);
bpos.notifyNamenodeReceivingBlock(b, storageId);
fsDataset.finalizeBlock(b, false);
count.addAndGet(1);
Thread.sleep(1);
} catch (Exception e) {
e.printStackTrace();
@ -316,7 +324,13 @@ public class TestBPOfferService {
addNewBlockThread.start();
// Make sure that generate blocks for DataNode and IBR not empty now.
Thread.sleep(200);
GenericTestUtils.waitFor(() -> {
if(count.get() > 0) {
return true;
}
return false;
}, 100, 1000);
// Trigger re-register using DataNode Command.
datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER};
bpos.triggerHeartbeatForTests();
@ -335,6 +349,7 @@ public class TestBPOfferService {
assertTrue(fullBlockReportCount == totalTestBlocks ||
incrBlockReportCount == totalTestBlocks);
} finally {
addNewBlockThread.join();
bpos.stop();
bpos.join();
@ -695,12 +710,17 @@ public class TestBPOfferService {
}
}
/**
* Record blocks counts of block report and total adding blocks count of IBR
* which assume no deleting blocks here.
*/
private void countBlockReportItems(final ExtendedBlock fakeBlock,
final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
final ArgumentCaptor<StorageBlockReport[]> captor =
ArgumentCaptor.forClass(StorageBlockReport[].class);
// Record blocks count about the last time block report.
Mockito.doAnswer((Answer<Object>) invocation -> {
Object[] arguments = invocation.getArguments();
StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
@ -713,6 +733,7 @@ public class TestBPOfferService {
Mockito.any()
);
// Record total adding blocks count and assume no deleting blocks here.
Mockito.doAnswer((Answer<Object>) invocation -> {
Object[] arguments = invocation.getArguments();
StorageReceivedDeletedBlocks[] list =