HDFS-14295. Add Threadpool for DataTransfers. Contributed by David Mollitor.

This commit is contained in:
Inigo Goiri 2019-03-28 03:37:33 -07:00
parent 8a59efee34
commit 15d38b1bf9
1 changed files with 34 additions and 17 deletions

View File

@ -214,6 +214,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer; import org.apache.hadoop.util.Timer;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.htrace.core.Tracer; import org.apache.htrace.core.Tracer;
import org.eclipse.jetty.util.ajax.JSON; import org.eclipse.jetty.util.ajax.JSON;
@ -396,6 +397,8 @@ public class DataNode extends ReconfigurableBase
private static final double CONGESTION_RATIO = 1.5; private static final double CONGESTION_RATIO = 1.5;
private DiskBalancer diskBalancer; private DiskBalancer diskBalancer;
private final ExecutorService xferService;
@Nullable @Nullable
private final StorageLocationChecker storageLocationChecker; private final StorageLocationChecker storageLocationChecker;
@ -436,6 +439,8 @@ public class DataNode extends ReconfigurableBase
initOOBTimeout(); initOOBTimeout();
storageLocationChecker = null; storageLocationChecker = null;
volumeChecker = new DatasetVolumeChecker(conf, new Timer()); volumeChecker = new DatasetVolumeChecker(conf, new Timer());
this.xferService =
HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory());
} }
/** /**
@ -476,6 +481,8 @@ public class DataNode extends ReconfigurableBase
conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED"); conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
this.volumeChecker = new DatasetVolumeChecker(conf, new Timer()); this.volumeChecker = new DatasetVolumeChecker(conf, new Timer());
this.xferService =
HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory());
// Determine whether we should try to pass file descriptors to clients. // Determine whether we should try to pass file descriptors to clients.
if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
@ -2081,6 +2088,9 @@ public class DataNode extends ReconfigurableBase
// wait reconfiguration thread, if any, to exit // wait reconfiguration thread, if any, to exit
shutdownReconfigurationTask(); shutdownReconfigurationTask();
LOG.info("Waiting up to 30 seconds for transfer threads to complete");
HadoopExecutors.shutdown(this.xferService, LOG, 15L, TimeUnit.SECONDS);
// wait for all data receiver threads to exit // wait for all data receiver threads to exit
if (this.threadGroup != null) { if (this.threadGroup != null) {
int sleepMs = 2; int sleepMs = 2;
@ -2354,16 +2364,16 @@ public class DataNode extends ReconfigurableBase
int numTargets = xferTargets.length; int numTargets = xferTargets.length;
if (numTargets > 0) { if (numTargets > 0) {
StringBuilder xfersBuilder = new StringBuilder(); final String xferTargetsString =
for (int i = 0; i < numTargets; i++) { StringUtils.join(" ", Arrays.asList(xferTargets));
xfersBuilder.append(xferTargets[i]).append(" "); LOG.info("{} Starting thread to transfer {} to {}", bpReg, block,
} xferTargetsString);
LOG.info(bpReg + " Starting thread to transfer " +
block + " to " + xfersBuilder);
new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, final DataTransfer dataTransferTask = new DataTransfer(xferTargets,
xferTargetStorageIDs, block, xferTargetStorageTypes, xferTargetStorageIDs, block,
BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start(); BlockConstructionStage.PIPELINE_SETUP_CREATE, "");
this.xferService.execute(dataTransferTask);
} }
} }
@ -3041,15 +3051,22 @@ public class DataNode extends ReconfigurableBase
b.setNumBytes(visible); b.setNumBytes(visible);
if (targets.length > 0) { if (targets.length > 0) {
Daemon daemon = new Daemon(threadGroup, if (LOG.isDebugEnabled()) {
new DataTransfer(targets, targetStorageTypes, targetStorageIds, b, final String xferTargetsString =
stage, client)); StringUtils.join(" ", Arrays.asList(targets));
daemon.start(); LOG.debug("Transferring a replica to {}", xferTargetsString);
}
final DataTransfer dataTransferTask = new DataTransfer(targets,
targetStorageTypes, targetStorageIds, b, stage, client);
@SuppressWarnings("unchecked")
Future<Void> f = (Future<Void>) this.xferService.submit(dataTransferTask);
try { try {
daemon.join(); f.get();
} catch (InterruptedException e) { } catch (InterruptedException | ExecutionException e) {
throw new IOException( throw new IOException("Pipeline recovery for " + b + " is interrupted.",
"Pipeline recovery for " + b + " is interrupted.", e); e);
} }
} }
} }