diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index e4a30cb4657..e8c64d281f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -1271,6 +1271,10 @@ class BPServiceActor implements Runnable { processQueue(); } catch (Throwable t) { LOG.error("{} encountered fatal exception and exit.", getName(), t); + runningState = RunningState.FAILED; + } finally { + LOG.warn("Ending command processor service for: " + this); + shouldServiceRun = false; } } @@ -1286,6 +1290,7 @@ class BPServiceActor implements Runnable { dn.getMetrics().incrNumProcessedCommands(); } catch (InterruptedException e) { LOG.error("{} encountered interrupt and exit.", getName()); + Thread.currentThread().interrupt(); // ignore unless thread was specifically interrupted. if (Thread.interrupted()) { break; @@ -1357,4 +1362,11 @@ class BPServiceActor implements Runnable { dn.getMetrics().incrActorCmdQueueLength(1); } } + + @VisibleForTesting + void stopCommandProcessingThread() { + if (commandProcessingThread != null) { + commandProcessingThread.interrupt(); + } + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 8cf7299dfc1..98af2fa78ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -1088,4 +1088,26 @@ public class TestBPOfferService { } } } + + @Test(timeout = 5000) + public void testCommandProcessingThreadExit() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf). + numDataNodes(1).build(); + try { + List datanodes = cluster.getDataNodes(); + DataNode dataNode = datanodes.get(0); + List allBpOs = dataNode.getAllBpOs(); + BPOfferService bpos = allBpOs.get(0); + waitForInitialization(bpos); + BPServiceActor actor = bpos.getBPServiceActors().get(0); + // Stop and wait util actor exit. + actor.stopCommandProcessingThread(); + GenericTestUtils.waitFor(() -> !actor.isAlive(), 100, 3000); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } } \ No newline at end of file