HDFS-16386. Reduce DataNode load when FsDatasetAsyncDiskService is working. (#3806)

(cherry picked from commit 746b328554)
This commit is contained in:
jianghuazhu 2021-12-20 19:28:55 +08:00 committed by S O'Donnell
parent 01de8fd9d6
commit b68084ab43
3 changed files with 23 additions and 2 deletions

View File

@ -131,6 +131,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0; public static final long DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
public static final String DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume"; public static final String DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume";
public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4; public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
public static final String DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY =
"dfs.datanode.fsdatasetasyncdisk.max.threads.per.volume";
public static final int DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec"; public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60; public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker"; public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker";

View File

@ -30,6 +30,8 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -65,7 +67,7 @@ class FsDatasetAsyncDiskService {
// ThreadPool core pool size // ThreadPool core pool size
private static final int CORE_THREADS_PER_VOLUME = 1; private static final int CORE_THREADS_PER_VOLUME = 1;
// ThreadPool maximum pool size // ThreadPool maximum pool size
private static final int MAXIMUM_THREADS_PER_VOLUME = 4; private final int maxNumThreadsPerVolume;
// ThreadPool keep-alive time for threads over core pool size // ThreadPool keep-alive time for threads over core pool size
private static final long THREADS_KEEP_ALIVE_SECONDS = 60; private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
@ -90,6 +92,12 @@ class FsDatasetAsyncDiskService {
this.datanode = datanode; this.datanode = datanode;
this.fsdatasetImpl = fsdatasetImpl; this.fsdatasetImpl = fsdatasetImpl;
this.threadGroup = new ThreadGroup(getClass().getSimpleName()); this.threadGroup = new ThreadGroup(getClass().getSimpleName());
maxNumThreadsPerVolume = datanode.getConf().getInt(
DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY,
DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_DEFAULT);
Preconditions.checkArgument(maxNumThreadsPerVolume > 0,
DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY +
" must be a positive integer.");
} }
private void addExecutorForVolume(final FsVolumeImpl volume) { private void addExecutorForVolume(final FsVolumeImpl volume) {
@ -110,7 +118,7 @@ class FsDatasetAsyncDiskService {
}; };
ThreadPoolExecutor executor = new ThreadPoolExecutor( ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, CORE_THREADS_PER_VOLUME, maxNumThreadsPerVolume,
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory); new LinkedBlockingQueue<Runnable>(), threadFactory);

View File

@ -2660,6 +2660,16 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.fsdatasetasyncdisk.max.threads.per.volume</name>
<value>4</value>
<description>
The maximum number of threads per volume used to process async disk
operations on the datanode. These threads consume I/O and CPU at the
same time. This will affect normal data node operations.
</description>
</property>
<property> <property>
<name>dfs.cachereport.intervalMsec</name> <name>dfs.cachereport.intervalMsec</name>
<value>10000</value> <value>10000</value>