From 15d38b1bf9fbd41658f6980c1a484dd28f746654 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Thu, 28 Mar 2019 03:37:33 -0700 Subject: [PATCH] HDFS-14295. Add Threadpool for DataTransfers. Contributed by David Mollitor. --- .../hadoop/hdfs/server/datanode/DataNode.java | 51 ++++++++++++------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 4df9225a746..1e432911c0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -214,6 +214,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Timer; import org.apache.hadoop.util.VersionInfo; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.htrace.core.Tracer; import org.eclipse.jetty.util.ajax.JSON; @@ -396,6 +397,8 @@ public class DataNode extends ReconfigurableBase private static final double CONGESTION_RATIO = 1.5; private DiskBalancer diskBalancer; + private final ExecutorService xferService; + @Nullable private final StorageLocationChecker storageLocationChecker; @@ -436,6 +439,8 @@ public class DataNode extends ReconfigurableBase initOOBTimeout(); storageLocationChecker = null; volumeChecker = new DatasetVolumeChecker(conf, new Timer()); + this.xferService = + HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); } /** @@ -476,6 +481,8 @@ public class DataNode extends ReconfigurableBase conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED"); this.volumeChecker = new DatasetVolumeChecker(conf, new Timer()); + this.xferService = + HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); // Determine whether we should try to pass file descriptors to clients. if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, @@ -2081,6 +2088,9 @@ public class DataNode extends ReconfigurableBase // wait reconfiguration thread, if any, to exit shutdownReconfigurationTask(); + LOG.info("Waiting up to 30 seconds for transfer threads to complete"); + HadoopExecutors.shutdown(this.xferService, LOG, 15L, TimeUnit.SECONDS); + // wait for all data receiver threads to exit if (this.threadGroup != null) { int sleepMs = 2; @@ -2354,16 +2364,16 @@ public class DataNode extends ReconfigurableBase int numTargets = xferTargets.length; if (numTargets > 0) { - StringBuilder xfersBuilder = new StringBuilder(); - for (int i = 0; i < numTargets; i++) { - xfersBuilder.append(xferTargets[i]).append(" "); - } - LOG.info(bpReg + " Starting thread to transfer " + - block + " to " + xfersBuilder); + final String xferTargetsString = + StringUtils.join(" ", Arrays.asList(xferTargets)); + LOG.info("{} Starting thread to transfer {} to {}", bpReg, block, + xferTargetsString); - new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, - xferTargetStorageIDs, block, - BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start(); + final DataTransfer dataTransferTask = new DataTransfer(xferTargets, + xferTargetStorageTypes, xferTargetStorageIDs, block, + BlockConstructionStage.PIPELINE_SETUP_CREATE, ""); + + this.xferService.execute(dataTransferTask); } } @@ -3041,15 +3051,22 @@ public class DataNode extends ReconfigurableBase b.setNumBytes(visible); if (targets.length > 0) { - Daemon daemon = new Daemon(threadGroup, - new DataTransfer(targets, targetStorageTypes, targetStorageIds, b, - stage, client)); - daemon.start(); + if (LOG.isDebugEnabled()) { + final String xferTargetsString = + StringUtils.join(" ", Arrays.asList(targets)); + LOG.debug("Transferring a replica to {}", xferTargetsString); + } + + final DataTransfer dataTransferTask = new DataTransfer(targets, + targetStorageTypes, targetStorageIds, b, stage, client); + + @SuppressWarnings("unchecked") + Future f = (Future) this.xferService.submit(dataTransferTask); try { - daemon.join(); - } catch (InterruptedException e) { - throw new IOException( - "Pipeline recovery for " + b + " is interrupted.", e); + f.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException("Pipeline recovery for " + b + " is interrupted.", + e); } } }