diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index bf4d6204e7a..ce8e01a7ead 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1550,6 +1550,9 @@ Release 2.8.0 - UNRELEASED HDFS-9336. deleteSnapshot throws NPE when snapshotname is null. (Brahma Reddy Battula via aajisaka) + HDFS-6533. TestBPOfferService#testBasicFunctionalitytest fails + intermittently. (Wei-Chiu Chuang via Arpit Agarwal) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 0316535d251..1b72961d834 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -127,6 +127,10 @@ static enum RunningState { scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval); } + public DatanodeRegistration getBpRegistration() { + return bpRegistration; + } + boolean isAlive() { if (!shouldServiceRun || !bpThread.isAlive()) { return false; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index ab69bb00150..cb5f272b314 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -180,7 +180,7 @@ public void testBasicFunctionality() throws Exception { BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); bpos.start(); try { - waitForInitialization(bpos); + waitForBothActors(bpos); // The DN should have register to both NNs. Mockito.verify(mockNN1).registerDatanode( @@ -205,6 +205,7 @@ public void testBasicFunctionality() throws Exception { } finally { bpos.stop(); + bpos.join(); } } @@ -235,6 +236,7 @@ public void testIgnoreDeletionsFromNonActive() throws Exception { } finally { bpos.stop(); + bpos.join(); } // Should ignore the delete command from the standby @@ -260,6 +262,7 @@ public void testNNsFromDifferentClusters() throws Exception { waitForOneToFail(bpos); } finally { bpos.stop(); + bpos.join(); } } @@ -307,6 +310,7 @@ public void testPickActiveNameNode() throws Exception { } finally { bpos.stop(); + bpos.join(); } } @@ -349,6 +353,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { waitForBlockReport(mockNN1, mockNN2); } finally { bpos.stop(); + bpos.join(); } } @@ -403,6 +408,27 @@ public Boolean get() { } }, 100, 10000); } + + private void waitForBothActors(final BPOfferService bpos) + throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + List actors = bpos.getBPServiceActors(); + + return bpos.isAlive() && getRegisteredActors(actors) == 2; + } + private int getRegisteredActors(List actors) { + int regActors = 0; + for (BPServiceActor actor : actors) { + if (actor.getBpRegistration() != null) { + regActors++; + } + } + return regActors; + } + }, 100, 10000); + } private void waitForBlockReport(final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception { @@ -540,6 +566,7 @@ public void testReportBadBlockWhenStandbyNNTimesOut() throws Exception { difference < 5000); } finally { bpos.stop(); + bpos.join(); } } @@ -579,6 +606,7 @@ public void testTrySendErrorReportWhenStandbyNNTimesOut() throws Exception { + " processing ", difference < 5000); } finally { bpos.stop(); + bpos.join(); } } /** @@ -624,6 +652,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { + "when errorReport threw IOException", secondCallTime != 0); } finally { bpos.stop(); + bpos.join(); } } @@ -675,6 +704,7 @@ public void testReportBadBlocksWhenNNThrowsStandbyException() .reportBadBlocks(Mockito.any(LocatedBlock[].class)); } finally { bpos.stop(); + bpos.join(); } } }