HDFS-15651. Client could not obtain block when DN CommandProcessingThread exit. Contributed by Aiphago.
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
Reviewed-by: Yiqun Lin <yqlin@apache.org>
(cherry picked from commit 3067a25fa1
)
This commit is contained in:
parent
5da46e197b
commit
fc5fe67b3f
|
@ -1271,6 +1271,10 @@ class BPServiceActor implements Runnable {
|
||||||
processQueue();
|
processQueue();
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.error("{} encountered fatal exception and exit.", getName(), t);
|
LOG.error("{} encountered fatal exception and exit.", getName(), t);
|
||||||
|
runningState = RunningState.FAILED;
|
||||||
|
} finally {
|
||||||
|
LOG.warn("Ending command processor service for: " + this);
|
||||||
|
shouldServiceRun = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1286,6 +1290,7 @@ class BPServiceActor implements Runnable {
|
||||||
dn.getMetrics().incrNumProcessedCommands();
|
dn.getMetrics().incrNumProcessedCommands();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.error("{} encountered interrupt and exit.", getName());
|
LOG.error("{} encountered interrupt and exit.", getName());
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
// ignore unless thread was specifically interrupted.
|
// ignore unless thread was specifically interrupted.
|
||||||
if (Thread.interrupted()) {
|
if (Thread.interrupted()) {
|
||||||
break;
|
break;
|
||||||
|
@ -1357,4 +1362,11 @@ class BPServiceActor implements Runnable {
|
||||||
dn.getMetrics().incrActorCmdQueueLength(1);
|
dn.getMetrics().incrActorCmdQueueLength(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void stopCommandProcessingThread() {
|
||||||
|
if (commandProcessingThread != null) {
|
||||||
|
commandProcessingThread.interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -1088,4 +1088,26 @@ public class TestBPOfferService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 5000)
|
||||||
|
public void testCommandProcessingThreadExit() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).
|
||||||
|
numDataNodes(1).build();
|
||||||
|
try {
|
||||||
|
List<DataNode> datanodes = cluster.getDataNodes();
|
||||||
|
DataNode dataNode = datanodes.get(0);
|
||||||
|
List<BPOfferService> allBpOs = dataNode.getAllBpOs();
|
||||||
|
BPOfferService bpos = allBpOs.get(0);
|
||||||
|
waitForInitialization(bpos);
|
||||||
|
BPServiceActor actor = bpos.getBPServiceActors().get(0);
|
||||||
|
// Stop and wait util actor exit.
|
||||||
|
actor.stopCommandProcessingThread();
|
||||||
|
GenericTestUtils.waitFor(() -> !actor.isAlive(), 100, 3000);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue