diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 18137877cac..2ed1e35c045 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -131,6 +131,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0; public static final String DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume"; public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4; + public static final String DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY = + "dfs.datanode.fsdatasetasyncdisk.max.threads.per.volume"; + public static final int DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_DEFAULT = 4; public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec"; public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60; public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 706c078e648..aa577a4d2a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -30,6 +30,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -65,7 +67,7 @@ class FsDatasetAsyncDiskService { // ThreadPool core pool size private static final int CORE_THREADS_PER_VOLUME = 1; // ThreadPool maximum pool size - private static final int MAXIMUM_THREADS_PER_VOLUME = 4; + private final int maxNumThreadsPerVolume; // ThreadPool keep-alive time for threads over core pool size private static final long THREADS_KEEP_ALIVE_SECONDS = 60; @@ -90,6 +92,12 @@ class FsDatasetAsyncDiskService { this.datanode = datanode; this.fsdatasetImpl = fsdatasetImpl; this.threadGroup = new ThreadGroup(getClass().getSimpleName()); + maxNumThreadsPerVolume = datanode.getConf().getInt( + DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY, + DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_DEFAULT); + Preconditions.checkArgument(maxNumThreadsPerVolume > 0, + DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY + + " must be a positive integer."); } private void addExecutorForVolume(final FsVolumeImpl volume) { @@ -110,7 +118,7 @@ class FsDatasetAsyncDiskService { }; ThreadPoolExecutor executor = new ThreadPoolExecutor( - CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, + CORE_THREADS_PER_VOLUME, maxNumThreadsPerVolume, THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, new LinkedBlockingQueue(), threadFactory); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 5dd6e83fc6c..0d31b286f07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2660,6 +2660,16 @@ + + dfs.datanode.fsdatasetasyncdisk.max.threads.per.volume + 4 + + The maximum number of threads per volume used to process async disk + operations on the datanode. These threads consume I/O and CPU at the + same time. This will affect normal data node operations. + + + dfs.cachereport.intervalMsec 10000