diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6c881ae9753..3a77f027fd6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -155,6 +155,9 @@ Release 2.7.3 - UNRELEASED HDFS-10178. Permanent write failures can happen if pipeline recoveries occur for the first packet (kihwal) + HDFS-9917. IBR accumulate more objects when SNN was down for sometime. + (Brahma Reddy Battula via vinayakumarb) + Release 2.7.2 - 2016-01-25 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 45b11231260..05aca6ce5d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -885,6 +885,11 @@ void reRegister() throws IOException { // and re-register register(nsInfo); scheduler.scheduleHeartbeat(); + // HDFS-9917,Standby NN IBR can be very huge if standby namenode is down + // for sometime. + if (state == HAServiceState.STANDBY) { + pendingIncrementalBRperStorage.clear(); + } } } @@ -993,6 +998,10 @@ private void processQueueMessages() { } } } + @VisibleForTesting + int getPendingIBRSize() { + return pendingIncrementalBRperStorage.size(); + } Scheduler getScheduler() { return scheduler; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 64cc78bf218..ea5792edf4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -28,6 +28,7 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -47,10 +48,12 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -91,7 +94,10 @@ public class TestBPOfferService { private DatanodeProtocolClientSideTranslatorPB mockNN1; private DatanodeProtocolClientSideTranslatorPB mockNN2; - private final NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2]; + private final NNHAStatusHeartbeat[] mockHaStatuses = + new NNHAStatusHeartbeat[2]; + private final DatanodeCommand[][] datanodeCommands = + new DatanodeCommand[2][0]; private final int[] heartbeatCounts = new int[2]; private DataNode mockDn; private FsDatasetSpi mockFSDataset; @@ -145,6 +151,7 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx) Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class)); mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0); + datanodeCommands[nnIdx] = new DatanodeCommand[0]; return mock; } @@ -163,8 +170,12 @@ public HeartbeatAnswer(int nnIdx) { @Override public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable { heartbeatCounts[nnIdx]++; - return new HeartbeatResponse(new DatanodeCommand[0], - mockHaStatuses[nnIdx], null); + HeartbeatResponse heartbeatResponse = + new HeartbeatResponse(datanodeCommands[nnIdx], mockHaStatuses[nnIdx], + null); + //reset the command + datanodeCommands[nnIdx] = new DatanodeCommand[0]; + return heartbeatResponse; } } @@ -675,4 +686,84 @@ public void testReportBadBlocksWhenNNThrowsStandbyException() bpos.stop(); } } + + /* + * HDFS-9917 : Standby IBR accumulation when Standby was down. + */ + @Test + public void testIBRClearanceForStandbyOnReRegister() throws Exception { + final BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); + bpos.start(); + try { + waitForInitialization(bpos); + // Should start with neither NN as active. + assertNull(bpos.getActiveNN()); + // Have NN1 claim active at txid 1 + mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1); + bpos.triggerHeartbeatForTests(); + // Now mockNN1 is acting like active namenode and mockNN2 as Standby + assertSame(mockNN1, bpos.getActiveNN()); + // Return nothing when active Active Namenode gets IBRs + Mockito.doNothing().when(mockNN1).blockReceivedAndDeleted( + Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito + .any(StorageReceivedDeletedBlocks[].class)); + + final IOException re = new IOException( + "Standby NN is currently not able to process IBR"); + + final AtomicBoolean ibrReported = new AtomicBoolean(false); + // throw exception for standby when first IBR is receieved + Mockito.doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ibrReported.set(true); + throw re; + } + }).when(mockNN2).blockReceivedAndDeleted( + Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito + .any(StorageReceivedDeletedBlocks[].class)); + + DatanodeStorage storage = Mockito.mock(DatanodeStorage.class); + Mockito.doReturn(storage).when(mockFSDataset).getStorage("storage0"); + // Add IBRs + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "storage0"); + // Send heartbeat so that the BpServiceActor can send IBR to + // namenode + bpos.triggerHeartbeatForTests(); + // Wait till first IBR is received at standbyNN. Just for confirmation. + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return ibrReported.get(); + } + }, 100, 1000); + + // Send register command back to Datanode to reRegister(). + // After reRegister IBRs should be cleared. + datanodeCommands[1] = new DatanodeCommand[] { new RegisterCommand() }; + assertEquals( + "IBR size before reRegister should be non-0", 1, getStandbyIBRSize( + bpos)); + bpos.triggerHeartbeatForTests(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return getStandbyIBRSize(bpos) == 0; + } + }, 100, 1000); + } finally { + bpos.stop(); + bpos.join(); + } + } + + private int getStandbyIBRSize(BPOfferService bpos) { + List bpServiceActors = bpos.getBPServiceActors(); + for (BPServiceActor bpServiceActor : bpServiceActors) { + if (bpServiceActor.state == HAServiceState.STANDBY) { + return bpServiceActor.getPendingIBRSize(); + } + } + return -1; + } }