HDFS-12523. Thread pools in ErasureCodingWorker do not shutdown. (Huafeng Wang via Lei)

This commit is contained in:
Lei Xu 2017-09-21 16:10:32 -07:00
parent 9d3e4cccf9
commit 1267ff22ce
3 changed files with 17 additions and 8 deletions

View File

@ -1115,7 +1115,7 @@ public class DataNode extends ReconfigurableBase
/** /**
* Shutdown disk balancer. * Shutdown disk balancer.
*/ */
private void shutdownDiskBalancer() { private void shutdownDiskBalancer() {
if (this.diskBalancer != null) { if (this.diskBalancer != null) {
this.diskBalancer.shutdown(); this.diskBalancer.shutdown();
this.diskBalancer = null; this.diskBalancer = null;
@ -2077,6 +2077,10 @@ public class DataNode extends ReconfigurableBase
ipcServer.stop(); ipcServer.stop();
} }
if (ecWorker != null) {
ecWorker.shutDown();
}
if(blockPoolManager != null) { if(blockPoolManager != null) {
try { try {
this.blockPoolManager.shutDownAll(bposArray); this.blockPoolManager.shutDownAll(bposArray);

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -149,7 +151,12 @@ public final class ErasureCodingWorker {
return conf; return conf;
} }
ThreadPoolExecutor getStripedReadPool() { CompletionService<Void> createReadService() {
return stripedReadPool; return new ExecutorCompletionService<>(stripedReadPool);
}
public void shutDown() {
stripedReconstructionPool.shutdown();
stripedReadPool.shutdown();
} }
} }

View File

@ -39,8 +39,6 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.BitSet; import java.util.BitSet;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
@ -110,7 +108,7 @@ abstract class StripedReconstructor {
// position in striped internal block // position in striped internal block
private long positionInBlock; private long positionInBlock;
private StripedReader stripedReader; private StripedReader stripedReader;
private ThreadPoolExecutor stripedReadPool; private ErasureCodingWorker erasureCodingWorker;
private final CachingStrategy cachingStrategy; private final CachingStrategy cachingStrategy;
private long maxTargetLength = 0L; private long maxTargetLength = 0L;
private final BitSet liveBitSet; private final BitSet liveBitSet;
@ -122,7 +120,7 @@ abstract class StripedReconstructor {
StripedReconstructor(ErasureCodingWorker worker, StripedReconstructor(ErasureCodingWorker worker,
StripedReconstructionInfo stripedReconInfo) { StripedReconstructionInfo stripedReconInfo) {
this.stripedReadPool = worker.getStripedReadPool(); this.erasureCodingWorker = worker;
this.datanode = worker.getDatanode(); this.datanode = worker.getDatanode();
this.conf = worker.getConf(); this.conf = worker.getConf();
this.ecPolicy = stripedReconInfo.getEcPolicy(); this.ecPolicy = stripedReconInfo.getEcPolicy();
@ -225,7 +223,7 @@ abstract class StripedReconstructor {
} }
CompletionService<Void> createReadService() { CompletionService<Void> createReadService() {
return new ExecutorCompletionService<>(stripedReadPool); return erasureCodingWorker.createReadService();
} }
ExtendedBlock getBlockGroup() { ExtendedBlock getBlockGroup() {