HDFS-15113. Addendum: Missing IBR when NameNode restart if open processCommand async feature. Contributed by Xiaoqiao He.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org> (cherry picked from commit af64ce2f4a72705c5b68ddfe4d29f0d208fd38e7)
This commit is contained in:
parent
50b0f0dc42
commit
4944dafa29
@ -286,23 +286,30 @@ public void testBasicFunctionality() throws Exception {
|
|||||||
public void testMissBlocksWhenReregister() throws Exception {
|
public void testMissBlocksWhenReregister() throws Exception {
|
||||||
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
|
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
|
||||||
bpos.start();
|
bpos.start();
|
||||||
|
int totalTestBlocks = 4000;
|
||||||
|
Thread addNewBlockThread = null;
|
||||||
|
final AtomicInteger count = new AtomicInteger(0);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
waitForBothActors(bpos);
|
waitForBothActors(bpos);
|
||||||
waitForInitialization(bpos);
|
waitForInitialization(bpos);
|
||||||
|
|
||||||
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
|
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
|
||||||
public void blockUtilSendFullBlockReport() {
|
public void blockUtilSendFullBlockReport() {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(200);
|
GenericTestUtils.waitFor(() -> {
|
||||||
} catch (InterruptedException e) {
|
if(count.get() > 2000) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}, 100, 1000);
|
||||||
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
countBlockReportItems(FAKE_BLOCK, mockNN1);
|
countBlockReportItems(FAKE_BLOCK, mockNN1);
|
||||||
int totalTestBlocks = 4000;
|
addNewBlockThread = new Thread(() -> {
|
||||||
Thread addNewBlockThread = new Thread(() -> {
|
|
||||||
for (int i = 0; i < totalTestBlocks; i++) {
|
for (int i = 0; i < totalTestBlocks; i++) {
|
||||||
SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
|
SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
|
||||||
SimulatedStorage simulatedStorage = fsDataset.getStorages().get(0);
|
SimulatedStorage simulatedStorage = fsDataset.getStorages().get(0);
|
||||||
@ -312,6 +319,7 @@ public void blockUtilSendFullBlockReport() {
|
|||||||
fsDataset.createRbw(StorageType.DEFAULT, storageId, b, false);
|
fsDataset.createRbw(StorageType.DEFAULT, storageId, b, false);
|
||||||
bpos.notifyNamenodeReceivingBlock(b, storageId);
|
bpos.notifyNamenodeReceivingBlock(b, storageId);
|
||||||
fsDataset.finalizeBlock(b, false);
|
fsDataset.finalizeBlock(b, false);
|
||||||
|
count.addAndGet(1);
|
||||||
Thread.sleep(1);
|
Thread.sleep(1);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
@ -321,7 +329,13 @@ public void blockUtilSendFullBlockReport() {
|
|||||||
addNewBlockThread.start();
|
addNewBlockThread.start();
|
||||||
|
|
||||||
// Make sure that generate blocks for DataNode and IBR not empty now.
|
// Make sure that generate blocks for DataNode and IBR not empty now.
|
||||||
Thread.sleep(200);
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
if(count.get() > 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}, 100, 1000);
|
||||||
|
|
||||||
// Trigger re-register using DataNode Command.
|
// Trigger re-register using DataNode Command.
|
||||||
datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER};
|
datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER};
|
||||||
bpos.triggerHeartbeatForTests();
|
bpos.triggerHeartbeatForTests();
|
||||||
@ -340,6 +354,7 @@ public void blockUtilSendFullBlockReport() {
|
|||||||
assertTrue(fullBlockReportCount == totalTestBlocks ||
|
assertTrue(fullBlockReportCount == totalTestBlocks ||
|
||||||
incrBlockReportCount == totalTestBlocks);
|
incrBlockReportCount == totalTestBlocks);
|
||||||
} finally {
|
} finally {
|
||||||
|
addNewBlockThread.join();
|
||||||
bpos.stop();
|
bpos.stop();
|
||||||
bpos.join();
|
bpos.join();
|
||||||
|
|
||||||
@ -698,12 +713,17 @@ private void setTimeForSynchronousBPOSCalls() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record blocks counts of block report and total adding blocks count of IBR
|
||||||
|
* which assume no deleting blocks here.
|
||||||
|
*/
|
||||||
private void countBlockReportItems(final ExtendedBlock fakeBlock,
|
private void countBlockReportItems(final ExtendedBlock fakeBlock,
|
||||||
final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
|
final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
|
||||||
final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
|
final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
|
||||||
final ArgumentCaptor<StorageBlockReport[]> captor =
|
final ArgumentCaptor<StorageBlockReport[]> captor =
|
||||||
ArgumentCaptor.forClass(StorageBlockReport[].class);
|
ArgumentCaptor.forClass(StorageBlockReport[].class);
|
||||||
|
|
||||||
|
// Record blocks count about the last time block report.
|
||||||
Mockito.doAnswer((Answer<Object>) invocation -> {
|
Mockito.doAnswer((Answer<Object>) invocation -> {
|
||||||
Object[] arguments = invocation.getArguments();
|
Object[] arguments = invocation.getArguments();
|
||||||
StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
|
StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
|
||||||
@ -716,6 +736,7 @@ private void countBlockReportItems(final ExtendedBlock fakeBlock,
|
|||||||
Mockito.any()
|
Mockito.any()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Record total adding blocks count and assume no deleting blocks here.
|
||||||
Mockito.doAnswer((Answer<Object>) invocation -> {
|
Mockito.doAnswer((Answer<Object>) invocation -> {
|
||||||
Object[] arguments = invocation.getArguments();
|
Object[] arguments = invocation.getArguments();
|
||||||
StorageReceivedDeletedBlocks[] list =
|
StorageReceivedDeletedBlocks[] list =
|
||||||
|
Loading…
x
Reference in New Issue
Block a user