HDFS-16397. Reconfig slow disk parameters for datanode (#3828)
This commit is contained in:
parent
c18b646020
commit
6b07c851f3
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
|
||||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
@ -556,7 +558,7 @@ class BPServiceActor implements Runnable {
|
||||||
SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
|
SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
|
||||||
SlowPeerReports.EMPTY_REPORT;
|
SlowPeerReports.EMPTY_REPORT;
|
||||||
final SlowDiskReports slowDisks =
|
final SlowDiskReports slowDisks =
|
||||||
outliersReportDue && dn.getDiskMetrics() != null ?
|
outliersReportDue && dnConf.diskStatsEnabled && dn.getDiskMetrics() != null ?
|
||||||
SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
|
SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
|
||||||
SlowDiskReports.EMPTY_REPORT;
|
SlowDiskReports.EMPTY_REPORT;
|
||||||
|
|
||||||
|
@ -1195,7 +1197,7 @@ class BPServiceActor implements Runnable {
|
||||||
private final long heartbeatIntervalMs;
|
private final long heartbeatIntervalMs;
|
||||||
private final long lifelineIntervalMs;
|
private final long lifelineIntervalMs;
|
||||||
private volatile long blockReportIntervalMs;
|
private volatile long blockReportIntervalMs;
|
||||||
private final long outliersReportIntervalMs;
|
private volatile long outliersReportIntervalMs;
|
||||||
|
|
||||||
Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
|
Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
|
||||||
long blockReportIntervalMs, long outliersReportIntervalMs) {
|
long blockReportIntervalMs, long outliersReportIntervalMs) {
|
||||||
|
@ -1356,10 +1358,22 @@ class BPServiceActor implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
void setBlockReportIntervalMs(long intervalMs) {
|
void setBlockReportIntervalMs(long intervalMs) {
|
||||||
Preconditions.checkArgument(intervalMs > 0);
|
Preconditions.checkArgument(intervalMs > 0,
|
||||||
|
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY + " should be larger than 0");
|
||||||
this.blockReportIntervalMs = intervalMs;
|
this.blockReportIntervalMs = intervalMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setOutliersReportIntervalMs(long intervalMs) {
|
||||||
|
Preconditions.checkArgument(intervalMs > 0,
|
||||||
|
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY + " should be larger than 0");
|
||||||
|
this.outliersReportIntervalMs = intervalMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
long getOutliersReportIntervalMs() {
|
||||||
|
return this.outliersReportIntervalMs;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wrapped for testing.
|
* Wrapped for testing.
|
||||||
* @return
|
* @return
|
||||||
|
|
|
@ -109,8 +109,8 @@ public class DNConf {
|
||||||
volatile long blockReportInterval;
|
volatile long blockReportInterval;
|
||||||
volatile long blockReportSplitThreshold;
|
volatile long blockReportSplitThreshold;
|
||||||
volatile boolean peerStatsEnabled;
|
volatile boolean peerStatsEnabled;
|
||||||
final boolean diskStatsEnabled;
|
volatile boolean diskStatsEnabled;
|
||||||
final long outliersReportIntervalMs;
|
volatile long outliersReportIntervalMs;
|
||||||
final long ibrInterval;
|
final long ibrInterval;
|
||||||
volatile long initialBlockReportDelayMs;
|
volatile long initialBlockReportDelayMs;
|
||||||
volatile long cacheReportInterval;
|
volatile long cacheReportInterval;
|
||||||
|
@ -511,4 +511,15 @@ public class DNConf {
|
||||||
void setPeerStatsEnabled(boolean enablePeerStats) {
|
void setPeerStatsEnabled(boolean enablePeerStats) {
|
||||||
peerStatsEnabled = enablePeerStats;
|
peerStatsEnabled = enablePeerStats;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setFileIoProfilingSamplingPercentage(int samplingPercentage) {
|
||||||
|
diskStatsEnabled = Util.isDiskStatsEnabled(samplingPercentage);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOutliersReportIntervalMs(String reportIntervalMs) {
|
||||||
|
dn.getConf().set(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, reportIntervalMs);
|
||||||
|
outliersReportIntervalMs = getConf().getTimeDuration(
|
||||||
|
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
|
||||||
|
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_IN
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
|
||||||
|
@ -48,6 +50,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THR
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
|
||||||
|
@ -59,6 +63,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABL
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
||||||
|
@ -70,6 +78,7 @@ import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStag
|
||||||
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||||
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY;
|
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY;
|
||||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
|
import static org.apache.hadoop.util.Preconditions.checkNotNull;
|
||||||
import static org.apache.hadoop.util.Time.now;
|
import static org.apache.hadoop.util.Time.now;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
@ -327,7 +336,11 @@ public class DataNode extends ReconfigurableBase
|
||||||
DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
||||||
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
|
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
|
||||||
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
|
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
|
||||||
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY));
|
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY,
|
||||||
|
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
|
||||||
|
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
|
||||||
|
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
|
||||||
|
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY));
|
||||||
|
|
||||||
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
|
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
|
||||||
|
|
||||||
|
@ -370,7 +383,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
DataNodeMetrics metrics;
|
DataNodeMetrics metrics;
|
||||||
@Nullable
|
@Nullable
|
||||||
private volatile DataNodePeerMetrics peerMetrics;
|
private volatile DataNodePeerMetrics peerMetrics;
|
||||||
private DataNodeDiskMetrics diskMetrics;
|
private volatile DataNodeDiskMetrics diskMetrics;
|
||||||
private InetSocketAddress streamingAddr;
|
private InetSocketAddress streamingAddr;
|
||||||
|
|
||||||
private LoadingCache<String, Map<String, Long>> datanodeNetworkCounts;
|
private LoadingCache<String, Map<String, Long>> datanodeNetworkCounts;
|
||||||
|
@ -651,6 +664,11 @@ public class DataNode extends ReconfigurableBase
|
||||||
case DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY:
|
case DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY:
|
||||||
case DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY:
|
case DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY:
|
||||||
return reconfSlowPeerParameters(property, newVal);
|
return reconfSlowPeerParameters(property, newVal);
|
||||||
|
case DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY:
|
||||||
|
case DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY:
|
||||||
|
case DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY:
|
||||||
|
case DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY:
|
||||||
|
return reconfSlowDiskParameters(property, newVal);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -777,6 +795,61 @@ public class DataNode extends ReconfigurableBase
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String reconfSlowDiskParameters(String property, String newVal)
|
||||||
|
throws ReconfigurationException {
|
||||||
|
String result = null;
|
||||||
|
try {
|
||||||
|
LOG.info("Reconfiguring {} to {}", property, newVal);
|
||||||
|
if (property.equals(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY)) {
|
||||||
|
checkNotNull(dnConf, "DNConf has not been initialized.");
|
||||||
|
String reportInterval = (newVal == null ? DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT :
|
||||||
|
newVal);
|
||||||
|
result = reportInterval;
|
||||||
|
dnConf.setOutliersReportIntervalMs(reportInterval);
|
||||||
|
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
|
||||||
|
if (bpos != null) {
|
||||||
|
for (BPServiceActor actor : bpos.getBPServiceActors()) {
|
||||||
|
actor.getScheduler().setOutliersReportIntervalMs(
|
||||||
|
dnConf.outliersReportIntervalMs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (property.equals(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY)) {
|
||||||
|
checkNotNull(dnConf, "DNConf has not been initialized.");
|
||||||
|
int samplingPercentage = (newVal == null ?
|
||||||
|
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_DEFAULT :
|
||||||
|
Integer.parseInt(newVal));
|
||||||
|
result = Integer.toString(samplingPercentage);
|
||||||
|
dnConf.setFileIoProfilingSamplingPercentage(samplingPercentage);
|
||||||
|
if (fileIoProvider != null) {
|
||||||
|
fileIoProvider.getProfilingEventHook().setSampleRangeMax(samplingPercentage);
|
||||||
|
}
|
||||||
|
if (samplingPercentage > 0 && diskMetrics == null) {
|
||||||
|
diskMetrics = new DataNodeDiskMetrics(this,
|
||||||
|
dnConf.outliersReportIntervalMs, getConf());
|
||||||
|
} else if (samplingPercentage <= 0 && diskMetrics != null) {
|
||||||
|
diskMetrics.shutdownAndWait();
|
||||||
|
}
|
||||||
|
} else if (property.equals(DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY)) {
|
||||||
|
checkNotNull(diskMetrics, "DataNode disk stats may be disabled.");
|
||||||
|
long minDisks = (newVal == null ? DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_DEFAULT :
|
||||||
|
Long.parseLong(newVal));
|
||||||
|
result = Long.toString(minDisks);
|
||||||
|
diskMetrics.setMinOutlierDetectionDisks(minDisks);
|
||||||
|
} else if (property.equals(DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY)) {
|
||||||
|
checkNotNull(diskMetrics, "DataNode disk stats may be disabled.");
|
||||||
|
long threshold = (newVal == null ? DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_DEFAULT :
|
||||||
|
Long.parseLong(newVal));
|
||||||
|
result = Long.toString(threshold);
|
||||||
|
diskMetrics.setLowThresholdMs(threshold);
|
||||||
|
}
|
||||||
|
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
|
||||||
|
return result;
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a list of the keys of the re-configurable properties in configuration.
|
* Get a list of the keys of the re-configurable properties in configuration.
|
||||||
*/
|
*/
|
||||||
|
@ -2372,7 +2445,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
if (metrics != null) {
|
if (metrics != null) {
|
||||||
metrics.shutdown();
|
metrics.shutdown();
|
||||||
}
|
}
|
||||||
if (diskMetrics != null) {
|
if (dnConf.diskStatsEnabled && diskMetrics != null) {
|
||||||
diskMetrics.shutdownAndWait();
|
diskMetrics.shutdownAndWait();
|
||||||
}
|
}
|
||||||
if (dataNodeInfoBeanName != null) {
|
if (dataNodeInfoBeanName != null) {
|
||||||
|
@ -3942,7 +4015,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
|
|
||||||
@Override // DataNodeMXBean
|
@Override // DataNodeMXBean
|
||||||
public String getSlowDisks() {
|
public String getSlowDisks() {
|
||||||
if (diskMetrics == null) {
|
if (!dnConf.diskStatsEnabled || diskMetrics == null) {
|
||||||
//Disk Stats not enabled
|
//Disk Stats not enabled
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1071,4 +1071,8 @@ public class FileIoProvider {
|
||||||
}
|
}
|
||||||
profilingEventHook.onFailure(volume, begin);
|
profilingEventHook.onFailure(volume, begin);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ProfilingFileIoEvents getProfilingEventHook() {
|
||||||
|
return profilingEventHook;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -40,8 +41,8 @@ class ProfilingFileIoEvents {
|
||||||
static final Logger LOG =
|
static final Logger LOG =
|
||||||
LoggerFactory.getLogger(ProfilingFileIoEvents.class);
|
LoggerFactory.getLogger(ProfilingFileIoEvents.class);
|
||||||
|
|
||||||
private final boolean isEnabled;
|
private volatile boolean isEnabled;
|
||||||
private final int sampleRangeMax;
|
private volatile int sampleRangeMax;
|
||||||
|
|
||||||
public ProfilingFileIoEvents(@Nullable Configuration conf) {
|
public ProfilingFileIoEvents(@Nullable Configuration conf) {
|
||||||
if (conf != null) {
|
if (conf != null) {
|
||||||
|
@ -49,15 +50,7 @@ class ProfilingFileIoEvents {
|
||||||
DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
|
DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
|
||||||
DFSConfigKeys
|
DFSConfigKeys
|
||||||
.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_DEFAULT);
|
.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_DEFAULT);
|
||||||
isEnabled = Util.isDiskStatsEnabled(fileIOSamplingPercentage);
|
setSampleRangeMax(fileIOSamplingPercentage);
|
||||||
if (fileIOSamplingPercentage > 100) {
|
|
||||||
LOG.warn(DFSConfigKeys
|
|
||||||
.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY +
|
|
||||||
" value cannot be more than 100. Setting value to 100");
|
|
||||||
fileIOSamplingPercentage = 100;
|
|
||||||
}
|
|
||||||
sampleRangeMax = (int) ((double) fileIOSamplingPercentage / 100 *
|
|
||||||
Integer.MAX_VALUE);
|
|
||||||
} else {
|
} else {
|
||||||
isEnabled = false;
|
isEnabled = false;
|
||||||
sampleRangeMax = 0;
|
sampleRangeMax = 0;
|
||||||
|
@ -145,4 +138,26 @@ class ProfilingFileIoEvents {
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setSampleRangeMax(int fileIOSamplingPercentage) {
|
||||||
|
isEnabled = Util.isDiskStatsEnabled(fileIOSamplingPercentage);
|
||||||
|
if (fileIOSamplingPercentage > 100) {
|
||||||
|
LOG.warn(DFSConfigKeys
|
||||||
|
.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY +
|
||||||
|
" value cannot be more than 100. Setting value to 100");
|
||||||
|
fileIOSamplingPercentage = 100;
|
||||||
|
}
|
||||||
|
sampleRangeMax = (int) ((double) fileIOSamplingPercentage / 100 *
|
||||||
|
Integer.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public boolean getDiskStatsEnabled() {
|
||||||
|
return isEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getSampleRangeMax() {
|
||||||
|
return sampleRangeMax;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp;
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
|
import org.apache.hadoop.util.Preconditions;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -42,6 +43,9 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class detects and maintains DataNode disk outliers and their
|
* This class detects and maintains DataNode disk outliers and their
|
||||||
* latencies for different ops (metadata, read, write).
|
* latencies for different ops (metadata, read, write).
|
||||||
|
@ -68,11 +72,11 @@ public class DataNodeDiskMetrics {
|
||||||
/**
|
/**
|
||||||
* Minimum number of disks to run outlier detection.
|
* Minimum number of disks to run outlier detection.
|
||||||
*/
|
*/
|
||||||
private final long minOutlierDetectionDisks;
|
private volatile long minOutlierDetectionDisks;
|
||||||
/**
|
/**
|
||||||
* Threshold in milliseconds below which a disk is definitely not slow.
|
* Threshold in milliseconds below which a disk is definitely not slow.
|
||||||
*/
|
*/
|
||||||
private final long lowThresholdMs;
|
private volatile long lowThresholdMs;
|
||||||
/**
|
/**
|
||||||
* The number of slow disks that needs to be excluded.
|
* The number of slow disks that needs to be excluded.
|
||||||
*/
|
*/
|
||||||
|
@ -269,4 +273,31 @@ public class DataNodeDiskMetrics {
|
||||||
public List<String> getSlowDisksToExclude() {
|
public List<String> getSlowDisksToExclude() {
|
||||||
return slowDisksToExclude;
|
return slowDisksToExclude;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setLowThresholdMs(long thresholdMs) {
|
||||||
|
Preconditions.checkArgument(thresholdMs > 0,
|
||||||
|
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY + " should be larger than 0");
|
||||||
|
lowThresholdMs = thresholdMs;
|
||||||
|
this.slowDiskDetector.setLowThresholdMs(thresholdMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLowThresholdMs() {
|
||||||
|
return lowThresholdMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMinOutlierDetectionDisks(long minDisks) {
|
||||||
|
Preconditions.checkArgument(minDisks > 0,
|
||||||
|
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY + " should be larger than 0");
|
||||||
|
minOutlierDetectionDisks = minDisks;
|
||||||
|
this.slowDiskDetector.setMinNumResources(minDisks);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMinOutlierDetectionDisks() {
|
||||||
|
return minOutlierDetectionDisks;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public OutlierDetector getSlowDiskDetector() {
|
||||||
|
return this.slowDiskDetector;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,12 +28,18 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
@ -554,4 +560,117 @@ public class TestDataNodeReconfiguration {
|
||||||
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
|
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSlowDiskParameters() throws ReconfigurationException, IOException {
|
||||||
|
String[] slowDisksParameters1 = {
|
||||||
|
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
|
||||||
|
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY};
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_DATA_NODE; i++) {
|
||||||
|
DataNode dn = cluster.getDataNodes().get(i);
|
||||||
|
|
||||||
|
// Try invalid values.
|
||||||
|
try {
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "text");
|
||||||
|
} catch (ReconfigurationException expected) {
|
||||||
|
assertTrue("expecting NumberFormatException",
|
||||||
|
expected.getCause() instanceof NumberFormatException);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, "text");
|
||||||
|
} catch (ReconfigurationException expected) {
|
||||||
|
assertTrue("expecting NumberFormatException",
|
||||||
|
expected.getCause() instanceof NumberFormatException);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enable disk stats, make DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY > 0.
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, "1");
|
||||||
|
for (String parameter : slowDisksParameters1) {
|
||||||
|
try {
|
||||||
|
dn.reconfigureProperty(parameter, "text");
|
||||||
|
fail("ReconfigurationException expected");
|
||||||
|
} catch (ReconfigurationException expected) {
|
||||||
|
assertTrue("expecting NumberFormatException",
|
||||||
|
expected.getCause() instanceof NumberFormatException);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
dn.reconfigureProperty(parameter, String.valueOf(-1));
|
||||||
|
fail("ReconfigurationException expected");
|
||||||
|
} catch (ReconfigurationException expected) {
|
||||||
|
assertTrue("expecting IllegalArgumentException",
|
||||||
|
expected.getCause() instanceof IllegalArgumentException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Change and verify properties.
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "1ms");
|
||||||
|
assertEquals(1, dn.getDnConf().outliersReportIntervalMs);
|
||||||
|
|
||||||
|
BlockPoolManager blockPoolManager = new BlockPoolManager(dn);
|
||||||
|
blockPoolManager.refreshNamenodes(dn.getConf());
|
||||||
|
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
|
||||||
|
if (bpos != null) {
|
||||||
|
for (BPServiceActor actor : bpos.getBPServiceActors()) {
|
||||||
|
assertEquals(String.format("%s has wrong value",
|
||||||
|
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY),
|
||||||
|
1, actor.getScheduler().getOutliersReportIntervalMs());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String[] slowDisksParameters2 = {
|
||||||
|
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
|
||||||
|
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
|
||||||
|
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY};
|
||||||
|
for (String parameter : slowDisksParameters2) {
|
||||||
|
dn.reconfigureProperty(parameter, "99");
|
||||||
|
}
|
||||||
|
// Assert diskMetrics.
|
||||||
|
assertEquals(99, dn.getDiskMetrics().getMinOutlierDetectionDisks());
|
||||||
|
assertEquals(99, dn.getDiskMetrics().getLowThresholdMs());
|
||||||
|
// Assert dnConf.
|
||||||
|
assertTrue(dn.getDnConf().diskStatsEnabled);
|
||||||
|
// Assert profilingEventHook.
|
||||||
|
assertTrue(dn.getFileIoProvider().getProfilingEventHook().getDiskStatsEnabled());
|
||||||
|
assertEquals((int) ((double) 99 / 100 * Integer.MAX_VALUE),
|
||||||
|
dn.getFileIoProvider().getProfilingEventHook().getSampleRangeMax());
|
||||||
|
// Assert slowDiskDetector.
|
||||||
|
assertEquals(99,
|
||||||
|
dn.getDiskMetrics().getSlowDiskDetector().getMinOutlierDetectionNodes());
|
||||||
|
assertEquals(99,
|
||||||
|
dn.getDiskMetrics().getSlowDiskDetector().getLowThresholdMs());
|
||||||
|
|
||||||
|
// Revert to default and verify.
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, null);
|
||||||
|
assertEquals(String.format("expect %s is not configured",
|
||||||
|
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY), null,
|
||||||
|
dn.getConf().get(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY));
|
||||||
|
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, null);
|
||||||
|
assertEquals(String.format("expect %s is not configured",
|
||||||
|
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY), null,
|
||||||
|
dn.getConf().get(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY));
|
||||||
|
assertFalse(dn.getFileIoProvider().getProfilingEventHook().getDiskStatsEnabled());
|
||||||
|
assertEquals(0,
|
||||||
|
dn.getFileIoProvider().getProfilingEventHook().getSampleRangeMax());
|
||||||
|
|
||||||
|
// Enable disk stats, make DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY > 0.
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, "1");
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY, null);
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY, null);
|
||||||
|
assertEquals(String.format("expect %s is not configured",
|
||||||
|
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY), null,
|
||||||
|
dn.getConf().get(DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY));
|
||||||
|
assertEquals(String.format("expect %s is not configured",
|
||||||
|
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY), null,
|
||||||
|
dn.getConf().get(DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY));
|
||||||
|
assertEquals(DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_DEFAULT,
|
||||||
|
dn.getDiskMetrics().getSlowDiskDetector().getMinOutlierDetectionNodes());
|
||||||
|
assertEquals(DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_DEFAULT,
|
||||||
|
dn.getDiskMetrics().getSlowDiskDetector().getLowThresholdMs());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -338,7 +338,7 @@ public class TestDFSAdmin {
|
||||||
final List<String> outs = Lists.newArrayList();
|
final List<String> outs = Lists.newArrayList();
|
||||||
final List<String> errs = Lists.newArrayList();
|
final List<String> errs = Lists.newArrayList();
|
||||||
getReconfigurableProperties("datanode", address, outs, errs);
|
getReconfigurableProperties("datanode", address, outs, errs);
|
||||||
assertEquals(12, outs.size());
|
assertEquals(16, outs.size());
|
||||||
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
|
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue