HDFS-9847. HDFS configuration should accept time units. Contributed by Yiqun Lin

This commit is contained in:
Chris Douglas 2016-09-06 10:37:54 -07:00
parent f388a20943
commit d37dc5d1b8
17 changed files with 187 additions and 66 deletions

View File

@ -630,10 +630,15 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
ArrayList<String > names = new ArrayList<String>(); ArrayList<String > names = new ArrayList<String>();
if (isDeprecated(name)) { if (isDeprecated(name)) {
DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(name); DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(name);
warnOnceIfDeprecated(deprecations, name); if (keyInfo != null) {
for (String newKey : keyInfo.newKeys) { if (!keyInfo.getAndSetAccessed()) {
if(newKey != null) { logDeprecation(keyInfo.getWarningMessage(name));
names.add(newKey); }
for (String newKey : keyInfo.newKeys) {
if (newKey != null) {
names.add(newKey);
}
} }
} }
} }
@ -1232,11 +1237,9 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
} }
} }
private void warnOnceIfDeprecated(DeprecationContext deprecations, String name) { @VisibleForTesting
DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(name); void logDeprecation(String message) {
if (keyInfo != null && !keyInfo.getAndSetAccessed()) { LOG_DEPRECATION.info(message);
LOG_DEPRECATION.info(keyInfo.getWarningMessage(name));
}
} }
/** /**
@ -1625,20 +1628,38 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
String vStr = get(name); String vStr = get(name);
if (null == vStr) { if (null == vStr) {
return defaultValue; return defaultValue;
} else {
return getTimeDurationHelper(name, vStr, unit);
}
}
public long getTimeDuration(String name, String defaultValue, TimeUnit unit) {
String vStr = get(name);
if (null == vStr) {
return getTimeDurationHelper(name, defaultValue, unit);
} else {
return getTimeDurationHelper(name, vStr, unit);
} }
vStr = vStr.trim();
return getTimeDurationHelper(name, vStr, unit);
} }
private long getTimeDurationHelper(String name, String vStr, TimeUnit unit) { private long getTimeDurationHelper(String name, String vStr, TimeUnit unit) {
vStr = vStr.trim();
vStr = StringUtils.toLowerCase(vStr);
ParsedTimeDuration vUnit = ParsedTimeDuration.unitFor(vStr); ParsedTimeDuration vUnit = ParsedTimeDuration.unitFor(vStr);
if (null == vUnit) { if (null == vUnit) {
LOG.warn("No unit for " + name + "(" + vStr + ") assuming " + unit); logDeprecation("No unit for " + name + "(" + vStr + ") assuming " + unit);
vUnit = ParsedTimeDuration.unitFor(unit); vUnit = ParsedTimeDuration.unitFor(unit);
} else { } else {
vStr = vStr.substring(0, vStr.lastIndexOf(vUnit.suffix())); vStr = vStr.substring(0, vStr.lastIndexOf(vUnit.suffix()));
} }
return unit.convert(Long.parseLong(vStr), vUnit.unit());
long raw = Long.parseLong(vStr);
long converted = unit.convert(raw, vUnit.unit());
if (vUnit.unit().convert(converted, unit) < raw) {
logDeprecation("Possible loss of precision converting " + vStr
+ vUnit.suffix() + " to " + unit + " for " + name);
}
return converted;
} }
public long[] getTimeDurations(String name, TimeUnit unit) { public long[] getTimeDurations(String name, TimeUnit unit) {

View File

@ -880,6 +880,15 @@ public class TestConfiguration extends TestCase {
assertEquals(30L, conf.getTimeDuration("test.time.X", 30, SECONDS)); assertEquals(30L, conf.getTimeDuration("test.time.X", 30, SECONDS));
conf.set("test.time.X", "30"); conf.set("test.time.X", "30");
assertEquals(30L, conf.getTimeDuration("test.time.X", 40, SECONDS)); assertEquals(30L, conf.getTimeDuration("test.time.X", 40, SECONDS));
assertEquals(10L, conf.getTimeDuration("test.time.c", "10", SECONDS));
assertEquals(30L, conf.getTimeDuration("test.time.c", "30s", SECONDS));
assertEquals(120L, conf.getTimeDuration("test.time.c", "2m", SECONDS));
conf.set("test.time.c", "30");
assertEquals(30L, conf.getTimeDuration("test.time.c", "40s", SECONDS));
// check suffix insensitive
conf.set("test.time.d", "30S");
assertEquals(30L, conf.getTimeDuration("test.time.d", 40, SECONDS));
for (Configuration.ParsedTimeDuration ptd : for (Configuration.ParsedTimeDuration ptd :
Configuration.ParsedTimeDuration.values()) { Configuration.ParsedTimeDuration.values()) {
@ -889,6 +898,43 @@ public class TestConfiguration extends TestCase {
} }
} }
public void testTimeDurationWarning() {
// check warn for possible loss of precision
final String warnFormat = "Possible loss of precision converting %s" +
" to %s for test.time.warn";
final ArrayList<String> warnchk = new ArrayList<>();
Configuration wconf = new Configuration(false) {
@Override
void logDeprecation(String message) {
warnchk.add(message);
}
};
String[] convDAYS = new String[]{"23h", "30m", "40s", "10us", "40000ms"};
for (String s : convDAYS) {
wconf.set("test.time.warn", s);
assertEquals(0, wconf.getTimeDuration("test.time.warn", 1, DAYS));
}
for (int i = 0; i < convDAYS.length; ++i) {
String wchk = String.format(warnFormat, convDAYS[i], "DAYS");
assertEquals(wchk, warnchk.get(i));
}
warnchk.clear();
wconf.setTimeDuration("test.time.warn", 1441, MINUTES);
assertEquals(1, wconf.getTimeDuration("test.time.warn", 0, DAYS));
assertEquals(24, wconf.getTimeDuration("test.time.warn", 0, HOURS));
String dchk = String.format(warnFormat, "1441m", "DAYS");
assertEquals(dchk, warnchk.get(0));
String hchk = String.format(warnFormat, "1441m", "HOURS");
assertEquals(hchk, warnchk.get(1));
assertEquals(1441, wconf.getTimeDuration("test.time.warn", 0, MINUTES));
// no warning
assertEquals(2, warnchk.size());
assertEquals(86460, wconf.getTimeDuration("test.time.warn", 0, SECONDS));
// no warning
assertEquals(2, warnchk.size());
}
public void testPattern() throws IOException { public void testPattern() throws IOException {
out = new BufferedWriter(new FileWriter(CONFIG)); out = new BufferedWriter(new FileWriter(CONFIG));
startConfig(); startConfig();

View File

@ -85,6 +85,7 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
/** /**
* DFSClient configuration. * DFSClient configuration.
@ -233,9 +234,10 @@ public class DfsClientConf {
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
datanodeRestartTimeout = conf.getLong( datanodeRestartTimeout = conf.getTimeDuration(
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000; DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT,
TimeUnit.SECONDS) * 1000;
slowIoWarningThresholdMs = conf.getLong( slowIoWarningThresholdMs = conf.getLong(
DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);

View File

@ -33,6 +33,7 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -662,10 +663,13 @@ public class Balancer {
static int run(Collection<URI> namenodes, final BalancerParameters p, static int run(Collection<URI> namenodes, final BalancerParameters p,
Configuration conf) throws IOException, InterruptedException { Configuration conf) throws IOException, InterruptedException {
final long sleeptime = final long sleeptime =
conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT,
conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, TimeUnit.SECONDS) * 2000 +
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000; conf.getTimeDuration(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT,
TimeUnit.SECONDS) * 1000;
LOG.info("namenodes = " + namenodes); LOG.info("namenodes = " + namenodes);
LOG.info("parameters = " + p); LOG.info("parameters = " + p);
LOG.info("included nodes = " + p.getIncludedNodes()); LOG.info("included nodes = " + p.getIncludedNodes());

View File

@ -42,6 +42,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask; import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -405,8 +406,9 @@ public class BlockManager implements BlockStatsMXBean {
this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf); this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
this.replicationRecheckInterval = this.replicationRecheckInterval =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, conf.getTimeDuration(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L; DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT,
TimeUnit.SECONDS) * 1000L;
this.storageInfoDefragmentInterval = this.storageInfoDefragmentInterval =
conf.getLong( conf.getLong(

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.util.Time.monotonicNow; import static org.apache.hadoop.util.Time.monotonicNow;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -89,9 +90,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
this.stats = stats; this.stats = stats;
this.clusterMap = clusterMap; this.clusterMap = clusterMap;
this.host2datanodeMap = host2datanodeMap; this.host2datanodeMap = host2datanodeMap;
this.heartbeatInterval = conf.getLong( this.heartbeatInterval = conf.getTimeDuration(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000; DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS) * 1000;
this.tolerateHeartbeatMultiplier = conf.getInt( this.tolerateHeartbeatMultiplier = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT); DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);

View File

@ -58,6 +58,7 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.*; import java.util.*;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/** /**
* Manage datanodes, include decommission and other activities. * Manage datanodes, include decommission and other activities.
@ -234,9 +235,9 @@ public class DatanodeManager {
dnsToSwitchMapping.resolve(locations); dnsToSwitchMapping.resolve(locations);
} }
heartbeatIntervalSeconds = conf.getLong( heartbeatIntervalSeconds = conf.getTimeDuration(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
heartbeatRecheckInterval = conf.getInt( heartbeatRecheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
@ -289,9 +290,9 @@ public class DatanodeManager {
" = '" + staleInterval + "' is invalid. " + " = '" + staleInterval + "' is invalid. " +
"It should be a positive non-zero value."); "It should be a positive non-zero value.");
final long heartbeatIntervalSeconds = conf.getLong( final long heartbeatIntervalSeconds = conf.getTimeDuration(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
// The stale interval value cannot be smaller than // The stale interval value cannot be smaller than
// 3 times of heartbeat interval // 3 times of heartbeat interval
final long minStaleInterval = conf.getInt( final long minStaleInterval = conf.getInt(

View File

@ -134,9 +134,10 @@ public class DecommissionManager {
* @param conf * @param conf
*/ */
void activate(Configuration conf) { void activate(Configuration conf) {
final int intervalSecs = final int intervalSecs = (int) conf.getTimeDuration(
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT); DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT,
TimeUnit.SECONDS);
checkArgument(intervalSecs >= 0, "Cannot set a negative " + checkArgument(intervalSecs >= 0, "Cannot set a negative " +
"value for " + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY); "value for " + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY);

View File

@ -64,6 +64,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.security.SaslPropertiesResolver; import org.apache.hadoop.security.SaslPropertiesResolver;
import java.util.concurrent.TimeUnit;
/** /**
* Simple class encapsulating all of the configuration that the DataNode * Simple class encapsulating all of the configuration that the DataNode
* loads at startup time. * loads at startup time.
@ -184,9 +186,9 @@ public class DNConf {
DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY, DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY,
DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT); DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
long initBRDelay = conf.getLong( long initBRDelay = conf.getTimeDuration(
DFS_BLOCKREPORT_INITIAL_DELAY_KEY, DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L; DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT, TimeUnit.SECONDS) * 1000L;
if (initBRDelay >= blockReportInterval) { if (initBRDelay >= blockReportInterval) {
initBRDelay = 0; initBRDelay = 0;
DataNode.LOG.info("dfs.blockreport.initialDelay is " DataNode.LOG.info("dfs.blockreport.initialDelay is "
@ -195,12 +197,12 @@ public class DNConf {
} }
initialBlockReportDelayMs = initBRDelay; initialBlockReportDelayMs = initBRDelay;
heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, heartBeatInterval = conf.getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L; DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS) * 1000L;
long confLifelineIntervalMs = long confLifelineIntervalMs =
conf.getLong(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY, conf.getLong(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY,
3 * conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, 3 * conf.getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT)) * 1000L; DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS)) * 1000L;
if (confLifelineIntervalMs <= heartBeatInterval) { if (confLifelineIntervalMs <= heartBeatInterval) {
confLifelineIntervalMs = 3 * heartBeatInterval; confLifelineIntervalMs = 3 * heartBeatInterval;
DataNode.LOG.warn( DataNode.LOG.warn(
@ -245,9 +247,9 @@ public class DNConf {
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST, DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT); DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT);
this.bpReadyTimeout = conf.getLong( this.bpReadyTimeout = conf.getTimeDuration(
DFS_DATANODE_BP_READY_TIMEOUT_KEY, DFS_DATANODE_BP_READY_TIMEOUT_KEY,
DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT); DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT, TimeUnit.SECONDS);
this.volFailuresTolerated = this.volFailuresTolerated =
conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,

View File

@ -1024,8 +1024,8 @@ public class DataNode extends ReconfigurableBase
return; return;
} }
String reason = null; String reason = null;
if (conf.getInt(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, if (conf.getTimeDuration(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT) < 0) { DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT, TimeUnit.SECONDS) < 0) {
reason = "verification is turned off by configuration"; reason = "verification is turned off by configuration";
} else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) { } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) {
reason = "verifcation is not supported by SimulatedFSDataset"; reason = "verifcation is not supported by SimulatedFSDataset";

View File

@ -414,8 +414,10 @@ public class DirectoryScanner implements Runnable {
DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) { DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
this.datanode = datanode; this.datanode = datanode;
this.dataset = dataset; this.dataset = dataset;
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, int interval = (int) conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT); DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT,
TimeUnit.SECONDS);
scanPeriodMsecs = interval * MILLIS_PER_SECOND; //msec scanPeriodMsecs = interval * MILLIS_PER_SECOND; //msec
int throttle = int throttle =

View File

@ -64,6 +64,7 @@ import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.text.DateFormat; import java.text.DateFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@InterfaceAudience.Private @InterfaceAudience.Private
@ -602,10 +603,13 @@ public class Mover {
static int run(Map<URI, List<Path>> namenodes, Configuration conf) static int run(Map<URI, List<Path>> namenodes, Configuration conf)
throws IOException, InterruptedException { throws IOException, InterruptedException {
final long sleeptime = final long sleeptime =
conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT,
conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, TimeUnit.SECONDS) * 2000 +
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000; conf.getTimeDuration(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT,
TimeUnit.SECONDS) * 1000;
AtomicInteger retryCount = new AtomicInteger(0); AtomicInteger retryCount = new AtomicInteger(0);
LOG.info("namenodes = " + namenodes); LOG.info("namenodes = " + namenodes);

View File

@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import java.util.concurrent.TimeUnit;
@InterfaceAudience.Private @InterfaceAudience.Private
public class CheckpointConf { public class CheckpointConf {
private static final Log LOG = LogFactory.getLog(CheckpointConf.class); private static final Log LOG = LogFactory.getLog(CheckpointConf.class);
@ -52,12 +54,12 @@ public class CheckpointConf {
private double quietMultiplier; private double quietMultiplier;
public CheckpointConf(Configuration conf) { public CheckpointConf(Configuration conf) {
checkpointCheckPeriod = conf.getLong( checkpointCheckPeriod = conf.getTimeDuration(
DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT); DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT, TimeUnit.SECONDS);
checkpointPeriod = conf.getLong(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, checkpointPeriod = conf.getTimeDuration(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT); DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT, TimeUnit.SECONDS);
checkpointTxnCount = conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY, checkpointTxnCount = conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT); DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
maxRetriesOnMergeError = conf.getInt(DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY, maxRetriesOnMergeError = conf.getInt(DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY,

View File

@ -34,6 +34,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -798,9 +799,9 @@ public class FSImage implements Closeable {
*/ */
private boolean needsResaveBasedOnStaleCheckpoint( private boolean needsResaveBasedOnStaleCheckpoint(
File imageFile, long numEditsLoaded) { File imageFile, long numEditsLoaded) {
final long checkpointPeriod = conf.getLong( final long checkpointPeriod = conf.getTimeDuration(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT); DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT, TimeUnit.SECONDS);
final long checkpointTxnCount = conf.getLong( final long checkpointTxnCount = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT); DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);

View File

@ -146,8 +146,9 @@ public class EditLogTailer {
lastLoadTimeMs = monotonicNow(); lastLoadTimeMs = monotonicNow();
logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, logRollPeriodMs = conf.getTimeDuration(
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT) * 1000; DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT, TimeUnit.SECONDS) * 1000;
List<RemoteNameNodeInfo> nns = Collections.emptyList(); List<RemoteNameNodeInfo> nns = Collections.emptyList();
if (logRollPeriodMs >= 0) { if (logRollPeriodMs >= 0) {
try { try {
@ -172,8 +173,9 @@ public class EditLogTailer {
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY + " is negative."); DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY + " is negative.");
} }
sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, sleepTimeMs = conf.getTimeDuration(
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000; DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT, TimeUnit.SECONDS) * 1000;
rollEditsTimeoutMs = conf.getInt( rollEditsTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY,

View File

@ -34,6 +34,7 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Optional; import com.google.common.base.Optional;
@ -742,9 +743,10 @@ public class DFSAdmin extends FsShell {
long timeWindow = 0; long timeWindow = 0;
long txGap = 0; long txGap = 0;
if (argv.length > 1 && "-beforeShutdown".equals(argv[1])) { if (argv.length > 1 && "-beforeShutdown".equals(argv[1])) {
final long checkpointPeriod = dfsConf.getLong( final long checkpointPeriod = dfsConf.getTimeDuration(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT); DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT,
TimeUnit.SECONDS);
final long checkpointTxnCount = dfsConf.getLong( final long checkpointTxnCount = dfsConf.getLong(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT); DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);

View File

@ -637,8 +637,12 @@
</property> </property>
<property> <property>
<name>dfs.blockreport.initialDelay</name> <value>0</value> <name>dfs.blockreport.initialDelay</name>
<description>Delay for first block report in seconds.</description> <value>0</value>
<description>
Delay for first block report in seconds. Support multiple time unit
suffix(case insensitive), as described in dfs.heartbeat.interval.
</description>
</property> </property>
<property> <property>
@ -681,6 +685,8 @@
<value>21600</value> <value>21600</value>
<description>Interval in seconds for Datanode to scan data directories and <description>Interval in seconds for Datanode to scan data directories and
reconcile the difference between blocks in memory and on the disk. reconcile the difference between blocks in memory and on the disk.
Support multiple time unit suffix(case insensitive), as described
in dfs.heartbeat.interval.
</description> </description>
</property> </property>
@ -715,7 +721,13 @@
<property> <property>
<name>dfs.heartbeat.interval</name> <name>dfs.heartbeat.interval</name>
<value>3</value> <value>3</value>
<description>Determines datanode heartbeat interval in seconds.</description> <description>
Determines datanode heartbeat interval in seconds.
Can use the following suffix (case insensitive):
ms(millis), s(sec), m(min), h(hour), d(day)
to specify the time (such as 2s, 2m, 1h, etc.).
Or provide complete number in seconds (such as 30 for 30 seconds).
</description>
</property> </property>
<property> <property>
@ -920,7 +932,9 @@
<name>dfs.namenode.decommission.interval</name> <name>dfs.namenode.decommission.interval</name>
<value>30</value> <value>30</value>
<description>Namenode periodicity in seconds to check if decommission is <description>Namenode periodicity in seconds to check if decommission is
complete.</description> complete. Support multiple time unit suffix(case insensitive), as described
in dfs.heartbeat.interval.
</description>
</property> </property>
<property> <property>
@ -949,7 +963,9 @@
<name>dfs.namenode.replication.interval</name> <name>dfs.namenode.replication.interval</name>
<value>3</value> <value>3</value>
<description>The periodicity in seconds with which the namenode computes <description>The periodicity in seconds with which the namenode computes
replication work for datanodes. </description> replication work for datanodes. Support multiple time unit suffix(case insensitive),
as described in dfs.heartbeat.interval.
</description>
</property> </property>
<property> <property>
@ -1044,7 +1060,10 @@
<property> <property>
<name>dfs.namenode.checkpoint.period</name> <name>dfs.namenode.checkpoint.period</name>
<value>3600</value> <value>3600</value>
<description>The number of seconds between two periodic checkpoints. <description>
The number of seconds between two periodic checkpoints.
Support multiple time unit suffix(case insensitive), as described
in dfs.heartbeat.interval.
</description> </description>
</property> </property>
@ -1062,7 +1081,8 @@
<value>60</value> <value>60</value>
<description>The SecondaryNameNode and CheckpointNode will poll the NameNode <description>The SecondaryNameNode and CheckpointNode will poll the NameNode
every 'dfs.namenode.checkpoint.check.period' seconds to query the number every 'dfs.namenode.checkpoint.check.period' seconds to query the number
of uncheckpointed transactions. of uncheckpointed transactions. Support multiple time unit suffix(case insensitive),
as described in dfs.heartbeat.interval.
</description> </description>
</property> </property>
@ -1408,6 +1428,8 @@
the datanode dead and invoking the normal recovery mechanisms. the datanode dead and invoking the normal recovery mechanisms.
The notification is sent by a datanode when it is being shutdown The notification is sent by a datanode when it is being shutdown
using the shutdownDatanode admin command with the upgrade option. using the shutdownDatanode admin command with the upgrade option.
Support multiple time unit suffix(case insensitive), as described
in dfs.heartbeat.interval.
</description> </description>
</property> </property>
@ -1475,6 +1497,8 @@
log segments, the StandbyNode will only be as up-to-date as how log segments, the StandbyNode will only be as up-to-date as how
often the logs are rolled. Note that failover triggers a log roll often the logs are rolled. Note that failover triggers a log roll
so the StandbyNode will be up to date before it becomes active. so the StandbyNode will be up to date before it becomes active.
Support multiple time unit suffix(case insensitive), as described
in dfs.heartbeat.interval.
</description> </description>
</property> </property>
@ -1484,6 +1508,8 @@
<description> <description>
How often, in seconds, the StandbyNode should check for new How often, in seconds, the StandbyNode should check for new
finalized log segments in the shared edits log. finalized log segments in the shared edits log.
Support multiple time unit suffix(case insensitive), as described
in dfs.heartbeat.interval.
</description> </description>
</property> </property>
@ -2918,6 +2944,8 @@
received request. Setting this to 0 fails requests right away if the received request. Setting this to 0 fails requests right away if the
datanode is not yet registered with the namenode. This wait time datanode is not yet registered with the namenode. This wait time
reduces initial request failures after datanode restart. reduces initial request failures after datanode restart.
Support multiple time unit suffix(case insensitive), as described
in dfs.heartbeat.interval.
</description> </description>
</property> </property>