diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 43e9745c9da..222ee49b3ec 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -922,14 +922,17 @@ class BPServiceActor implements Runnable { // re-retrieve namespace info to make sure that, if the NN // was restarted, we still match its version (HDFS-2120) NamespaceInfo nsInfo = retrieveNamespaceInfo(); - // and re-register - register(nsInfo); - scheduler.scheduleHeartbeat(); // HDFS-9917,Standby NN IBR can be very huge if standby namenode is down // for sometime. if (state == HAServiceState.STANDBY || state == HAServiceState.OBSERVER) { ibrManager.clearIBRs(); } + // HDFS-15113, register and trigger FBR after clean IBR to avoid missing + // some blocks report to Standby util next FBR. + // and re-register + register(nsInfo); + scheduler.scheduleHeartbeat(); + DataNodeFaultInjector.get().blockUtilSendFullBlockReport(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 7e661117725..c031f6c4c57 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -100,4 +100,9 @@ public class DataNodeFaultInjector { * Used as a hook to inject intercept when BPOfferService hold lock. */ public void delayWhenOfferServiceHoldLock() {} + + /** + * Used as a hook to inject intercept when re-register. + */ + public void blockUtilSendFullBlockReport() {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 2e8cf242461..54d09c2d7b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; @@ -56,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset.SimulatedStorage; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -104,6 +106,8 @@ public class TestBPOfferService { private long firstLeaseId = 0; private long secondLeaseId = 0; private long nextFullBlockReportLeaseId = 1L; + private int fullBlockReportCount = 0; + private int incrBlockReportCount = 0; static { GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL); @@ -226,6 +230,14 @@ public class TestBPOfferService { } } + private void setBlockReportCount(int count) { + fullBlockReportCount = count; + } + + private void setIncreaseBlockReportCount(int count) { + incrBlockReportCount += count; + } + /** * Test that the BPOS can register to talk to two different NNs, * sends block reports to both, etc. @@ -262,6 +274,76 @@ public class TestBPOfferService { } } + /** + * HDFS-15113: Test and verify missing block when re-register. + */ + @Test + public void testMissBlocksWhenReregister() throws Exception { + BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); + bpos.start(); + try { + waitForBothActors(bpos); + waitForInitialization(bpos); + + DataNodeFaultInjector.set(new DataNodeFaultInjector() { + public void blockUtilSendFullBlockReport() { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + + countBlockReportItems(FAKE_BLOCK, mockNN1); + int totalTestBlocks = 4000; + Thread addNewBlockThread = new Thread(() -> { + for (int i = 0; i < totalTestBlocks; i++) { + SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset; + SimulatedStorage simulatedStorage = fsDataset.getStorages().get(0); + String storageId = simulatedStorage.getStorageUuid(); + ExtendedBlock b = new ExtendedBlock(bpos.getBlockPoolId(), i, 0, i); + try { + fsDataset.createRbw(StorageType.DEFAULT, storageId, b, false); + bpos.notifyNamenodeReceivingBlock(b, storageId); + fsDataset.finalizeBlock(b, false); + Thread.sleep(1); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + addNewBlockThread.start(); + + // Make sure that generate blocks for DataNode and IBR not empty now. + Thread.sleep(200); + // Trigger re-register using DataNode Command. + datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER}; + bpos.triggerHeartbeatForTests(); + + try { + GenericTestUtils.waitFor(() -> { + if(fullBlockReportCount == totalTestBlocks || + incrBlockReportCount == totalTestBlocks) { + return true; + } + return false; + }, 1000, 15000); + } catch (Exception e) {} + + // Verify FBR/IBR count is equal to generate number. + assertTrue(fullBlockReportCount == totalTestBlocks || + incrBlockReportCount == totalTestBlocks); + } finally { + bpos.stop(); + bpos.join(); + + DataNodeFaultInjector.set(new DataNodeFaultInjector() { + public void blockUtilSendFullBlockReport() {} + }); + } + } + @Test public void testLocklessBlockPoolId() throws Exception { BPOfferService bpos = Mockito.spy(setupBPOSForNNs(mockNN1)); @@ -612,6 +694,36 @@ public class TestBPOfferService { secondCallTime = Time.now(); } } + + private void countBlockReportItems(final ExtendedBlock fakeBlock, + final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception { + final String fakeBlockPoolId = fakeBlock.getBlockPoolId(); + final ArgumentCaptor captor = + ArgumentCaptor.forClass(StorageBlockReport[].class); + + Mockito.doAnswer((Answer) invocation -> { + Object[] arguments = invocation.getArguments(); + StorageBlockReport[] list = (StorageBlockReport[])arguments[2]; + setBlockReportCount(list[0].getBlocks().getNumberOfBlocks()); + return null; + }).when(mockNN).blockReport( + Mockito.any(), + Mockito.eq(fakeBlockPoolId), + captor.capture(), + Mockito.any() + ); + + Mockito.doAnswer((Answer) invocation -> { + Object[] arguments = invocation.getArguments(); + StorageReceivedDeletedBlocks[] list = + (StorageReceivedDeletedBlocks[])arguments[2]; + setIncreaseBlockReportCount(list[0].getBlocks().length); + return null; + }).when(mockNN).blockReceivedAndDeleted( + Mockito.any(), + Mockito.eq(fakeBlockPoolId), + Mockito.any()); + } private class BPOfferServiceSynchronousCallAnswer implements Answer { private final int nnIdx;