diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4c32a36f40c..cd43bd21844 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -929,6 +929,9 @@ Release 2.7.0 - UNRELEASED HDFS-7771. fuse_dfs should permit FILE: on the front of KRB5CCNAME (cmccabe) + HDFS-7704. DN heartbeat to Active NN may be blocked and expire if + connection to Standby NN continues to time out (Rushabh Shah via kihwal) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index dfeacdef180..0289167b7ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -218,7 +218,9 @@ class BPOfferService { String storageUuid, StorageType storageType) { checkBlock(block); for (BPServiceActor actor : bpServices) { - actor.reportBadBlocks(block, storageUuid, storageType); + ReportBadBlockAction rbbAction = new ReportBadBlockAction + (block, storageUuid, storageType); + actor.bpThreadEnqueue(rbbAction); } } @@ -414,7 +416,9 @@ class BPOfferService { */ void trySendErrorReport(int errCode, String errMsg) { for (BPServiceActor actor : bpServices) { - actor.trySendErrorReport(errCode, errMsg); + ErrorReportAction errorReportAction = new ErrorReportAction + (errCode, errMsg); + actor.bpThreadEnqueue(errorReportAction); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 917b5dde7c6..69210db8edf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -25,6 +25,7 @@ import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -34,7 +35,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -120,6 +120,8 @@ class BPServiceActor implements Runnable { private final DNConf dnConf; private DatanodeRegistration bpRegistration; + final LinkedList bpThreadQueue + = new LinkedList(); BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) { this.bpos = bpos; @@ -254,26 +256,6 @@ class BPServiceActor implements Runnable { resetBlockReportTime = true; // reset future BRs for randomness } - void reportBadBlocks(ExtendedBlock block, - String storageUuid, StorageType storageType) { - if (bpRegistration == null) { - return; - } - DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) }; - String[] uuids = { storageUuid }; - StorageType[] types = { storageType }; - LocatedBlock[] blocks = { new LocatedBlock(block, dnArr, uuids, types) }; - - try { - bpNamenode.reportBadBlocks(blocks); - } catch (IOException e){ - /* One common reason is that NameNode could be in safe mode. - * Should we keep on retrying in that case? - */ - LOG.warn("Failed to report bad block " + block + " to namenode : " - + " Exception", e); - } - } /** * Report received blocks and delete hints to the Namenode for each @@ -771,6 +753,7 @@ class BPServiceActor implements Runnable { } catch (IOException e) { LOG.warn("IOException in offerService", e); } + processQueueMessages(); } // while (shouldRun()) } // offerService @@ -909,14 +892,6 @@ class BPServiceActor implements Runnable { return true; } - void trySendErrorReport(int errCode, String errMsg) { - try { - bpNamenode.errorReport(bpRegistration, errCode, errMsg); - } catch(IOException e) { - LOG.warn("Error reporting an error to NameNode " + nnAddr, - e); - } - } /** * Report a bad block from another DN in this cluster. @@ -1018,4 +993,30 @@ class BPServiceActor implements Runnable { } } } -} + + public void bpThreadEnqueue(BPServiceActorAction action) { + synchronized (bpThreadQueue) { + if (!bpThreadQueue.contains(action)) { + bpThreadQueue.add(action); + } + } + } + + private void processQueueMessages() { + LinkedList duplicateQueue; + synchronized (bpThreadQueue) { + duplicateQueue = new LinkedList(bpThreadQueue); + bpThreadQueue.clear(); + } + while (!duplicateQueue.isEmpty()) { + BPServiceActorAction actionItem = duplicateQueue.remove(); + try { + actionItem.reportTo(bpNamenode, bpRegistration); + } catch (BPServiceActorActionException baae) { + LOG.warn(baae.getMessage() + nnAddr , baae); + // Adding it back to the queue if not present + bpThreadEnqueue(actionItem); + } + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 7171c495ab6..e21ce3898a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; @@ -37,6 +38,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; @@ -53,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; +import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.junit.Before; import org.junit.Test; @@ -74,6 +77,8 @@ public class TestBPOfferService { private static final ExtendedBlock FAKE_BLOCK = new ExtendedBlock(FAKE_BPID, 12345L); private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class); + private long firstCallTime = 0; + private long secondCallTime = 0; static { ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); @@ -458,4 +463,156 @@ public class TestBPOfferService { return captor.getValue()[0].getBlocks(); } + private void setTimeForSynchronousBPOSCalls() { + if (firstCallTime == 0) { + firstCallTime = Time.now(); + } else { + secondCallTime = Time.now(); + } + } + + private class BPOfferServiceSynchronousCallAnswer implements Answer { + private final int nnIdx; + + public BPOfferServiceSynchronousCallAnswer(int nnIdx) { + this.nnIdx = nnIdx; + } + + // For active namenode we will record the processTime and for standby + // namenode we will sleep for 5 seconds (This will simulate the situation + // where the standby namenode is down ) . + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + if (nnIdx == 0) { + setTimeForSynchronousBPOSCalls(); + } else { + Thread.sleep(5000); + } + return null; + } + } + + /** + * This test case test the {@link BPOfferService#reportBadBlocks} method + * such that if call to standby namenode times out then that should not + * affect the active namenode heartbeat processing since this function + * are in writeLock. + * @throws Exception + */ + @Test + public void testReportBadBlockWhenStandbyNNTimesOut() throws Exception { + BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); + bpos.start(); + try { + waitForInitialization(bpos); + // Should start with neither NN as active. + assertNull(bpos.getActiveNN()); + // Have NN1 claim active at txid 1 + mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1); + bpos.triggerHeartbeatForTests(); + // Now mockNN1 is acting like active namenode and mockNN2 as Standby + assertSame(mockNN1, bpos.getActiveNN()); + Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0)) + .when(mockNN1).reportBadBlocks(Mockito.any(LocatedBlock[].class)); + Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1)) + .when(mockNN2).reportBadBlocks(Mockito.any(LocatedBlock[].class)); + bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK) + .getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK) + .getStorageType()); + bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK) + .getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK) + .getStorageType()); + Thread.sleep(10000); + long difference = secondCallTime - firstCallTime; + assertTrue("Active namenode reportBadBlock processing should be " + + "independent of standby namenode reportBadBlock processing ", + difference < 5000); + } finally { + bpos.stop(); + } + } + + /** + * This test case test the {@link BPOfferService#trySendErrorReport} method + * such that if call to standby namenode times out then that should not + * affect the active namenode heartbeat processing since this function + * are in writeLock. + * @throws Exception + */ + @Test + public void testTrySendErrorReportWhenStandbyNNTimesOut() throws Exception { + BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); + bpos.start(); + try { + waitForInitialization(bpos); + // Should start with neither NN as active. + assertNull(bpos.getActiveNN()); + // Have NN1 claim active at txid 1 + mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1); + bpos.triggerHeartbeatForTests(); + // Now mockNN1 is acting like active namenode and mockNN2 as Standby + assertSame(mockNN1, bpos.getActiveNN()); + Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0)) + .when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class), + Mockito.anyInt(), Mockito.anyString()); + Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1)) + .when(mockNN2).errorReport(Mockito.any(DatanodeRegistration.class), + Mockito.anyInt(), Mockito.anyString()); + String errorString = "Can't send invalid block " + FAKE_BLOCK; + bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString); + bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString); + Thread.sleep(10000); + long difference = secondCallTime - firstCallTime; + assertTrue("Active namenode trySendErrorReport processing " + + "should be independent of standby namenode trySendErrorReport" + + " processing ", difference < 5000); + } finally { + bpos.stop(); + } + } + /** + * This test case tests whether the {@BPServiceActor#processQueueMessages} + * adds back the error report back to the queue when + * {BPServiceActorAction#reportTo} throws an IOException + * @throws Exception + */ + @Test + public void testTrySendErrorReportWhenNNThrowsIOException() + throws Exception { + BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); + bpos.start(); + try { + waitForInitialization(bpos); + // Should start with neither NN as active. + assertNull(bpos.getActiveNN()); + // Have NN1 claim active at txid 1 + mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1); + bpos.triggerHeartbeatForTests(); + // Now mockNN1 is acting like active namenode and mockNN2 as Standby + assertSame(mockNN1, bpos.getActiveNN()); + Mockito.doAnswer(new Answer() { + // Throw an IOException when this function is first called which will + // in turn add that errorReport back to the bpThreadQueue and let it + // process the next time. + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + if (firstCallTime == 0) { + firstCallTime = Time.now(); + throw new IOException(); + } else { + secondCallTime = Time.now(); + return null; + } + } + }).when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class), + Mockito.anyInt(), Mockito.anyString()); + String errorString = "Can't send invalid block " + FAKE_BLOCK; + bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString); + Thread.sleep(10000); + assertTrue("Active namenode didn't add the report back to the queue " + + "when errorReport threw IOException", secondCallTime != 0); + } finally { + bpos.stop(); + } + } }