HDFS-16331. Make dfs.blockreport.intervalMsec reconfigurable (#3676)
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
parent
0c62a514f7
commit
52ec65fd10
|
@ -70,6 +70,7 @@ import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import org.apache.hadoop.util.Preconditions;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
import org.apache.hadoop.util.VersionUtil;
|
import org.apache.hadoop.util.VersionUtil;
|
||||||
|
@ -1193,7 +1194,7 @@ class BPServiceActor implements Runnable {
|
||||||
|
|
||||||
private final long heartbeatIntervalMs;
|
private final long heartbeatIntervalMs;
|
||||||
private final long lifelineIntervalMs;
|
private final long lifelineIntervalMs;
|
||||||
private final long blockReportIntervalMs;
|
private volatile long blockReportIntervalMs;
|
||||||
private final long outliersReportIntervalMs;
|
private final long outliersReportIntervalMs;
|
||||||
|
|
||||||
Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
|
Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
|
||||||
|
@ -1349,6 +1350,15 @@ class BPServiceActor implements Runnable {
|
||||||
this.nextBlockReportTime.getAndSet(nextBlockReportTime);
|
this.nextBlockReportTime.getAndSet(nextBlockReportTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long getBlockReportIntervalMs() {
|
||||||
|
return this.blockReportIntervalMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setBlockReportIntervalMs(long intervalMs) {
|
||||||
|
Preconditions.checkArgument(intervalMs > 0);
|
||||||
|
this.blockReportIntervalMs = intervalMs;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wrapped for testing.
|
* Wrapped for testing.
|
||||||
* @return
|
* @return
|
||||||
|
|
|
@ -74,6 +74,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
|
||||||
import org.apache.hadoop.hdfs.server.common.Util;
|
import org.apache.hadoop.hdfs.server.common.Util;
|
||||||
import org.apache.hadoop.security.SaslPropertiesResolver;
|
import org.apache.hadoop.security.SaslPropertiesResolver;
|
||||||
|
import org.apache.hadoop.util.Preconditions;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -105,7 +106,7 @@ public class DNConf {
|
||||||
final long readaheadLength;
|
final long readaheadLength;
|
||||||
final long heartBeatInterval;
|
final long heartBeatInterval;
|
||||||
private final long lifelineIntervalMs;
|
private final long lifelineIntervalMs;
|
||||||
final long blockReportInterval;
|
volatile long blockReportInterval;
|
||||||
final long blockReportSplitThreshold;
|
final long blockReportSplitThreshold;
|
||||||
final boolean peerStatsEnabled;
|
final boolean peerStatsEnabled;
|
||||||
final boolean diskStatsEnabled;
|
final boolean diskStatsEnabled;
|
||||||
|
@ -475,6 +476,11 @@ public class DNConf {
|
||||||
return processCommandsThresholdMs;
|
return processCommandsThresholdMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setBlockReportInterval(long intervalMs) {
|
||||||
|
Preconditions.checkArgument(intervalMs > 0);
|
||||||
|
blockReportInterval = intervalMs;
|
||||||
|
}
|
||||||
|
|
||||||
public long getBlockReportInterval() {
|
public long getBlockReportInterval() {
|
||||||
return blockReportInterval;
|
return blockReportInterval;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING;
|
||||||
|
@ -300,7 +302,8 @@ public class DataNode extends ReconfigurableBase
|
||||||
Collections.unmodifiableList(
|
Collections.unmodifiableList(
|
||||||
Arrays.asList(
|
Arrays.asList(
|
||||||
DFS_DATANODE_DATA_DIR_KEY,
|
DFS_DATANODE_DATA_DIR_KEY,
|
||||||
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY));
|
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||||
|
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
|
||||||
|
|
||||||
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
|
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
|
||||||
|
|
||||||
|
@ -536,78 +539,111 @@ public class DataNode extends ReconfigurableBase
|
||||||
public String reconfigurePropertyImpl(String property, String newVal)
|
public String reconfigurePropertyImpl(String property, String newVal)
|
||||||
throws ReconfigurationException {
|
throws ReconfigurationException {
|
||||||
switch (property) {
|
switch (property) {
|
||||||
case DFS_DATANODE_DATA_DIR_KEY: {
|
case DFS_DATANODE_DATA_DIR_KEY: {
|
||||||
IOException rootException = null;
|
IOException rootException = null;
|
||||||
|
try {
|
||||||
|
LOG.info("Reconfiguring {} to {}", property, newVal);
|
||||||
|
this.refreshVolumes(newVal);
|
||||||
|
return getConf().get(DFS_DATANODE_DATA_DIR_KEY);
|
||||||
|
} catch (IOException e) {
|
||||||
|
rootException = e;
|
||||||
|
} finally {
|
||||||
|
// Send a full block report to let NN acknowledge the volume changes.
|
||||||
try {
|
try {
|
||||||
LOG.info("Reconfiguring {} to {}", property, newVal);
|
triggerBlockReport(
|
||||||
this.refreshVolumes(newVal);
|
new BlockReportOptions.Factory().setIncremental(false).build());
|
||||||
return getConf().get(DFS_DATANODE_DATA_DIR_KEY);
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
rootException = e;
|
LOG.warn("Exception while sending the block report after refreshing"
|
||||||
|
+ " volumes {} to {}", property, newVal, e);
|
||||||
|
if (rootException == null) {
|
||||||
|
rootException = e;
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
// Send a full block report to let NN acknowledge the volume changes.
|
if (rootException != null) {
|
||||||
try {
|
throw new ReconfigurationException(property, newVal,
|
||||||
triggerBlockReport(
|
getConf().get(property), rootException);
|
||||||
new BlockReportOptions.Factory().setIncremental(false).build());
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Exception while sending the block report after refreshing"
|
|
||||||
+ " volumes {} to {}", property, newVal, e);
|
|
||||||
if (rootException == null) {
|
|
||||||
rootException = e;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (rootException != null) {
|
|
||||||
throw new ReconfigurationException(property, newVal,
|
|
||||||
getConf().get(property), rootException);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
case DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY: {
|
break;
|
||||||
ReconfigurationException rootException = null;
|
}
|
||||||
try {
|
case DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY: {
|
||||||
LOG.info("Reconfiguring {} to {}", property, newVal);
|
ReconfigurationException rootException = null;
|
||||||
int movers;
|
try {
|
||||||
if (newVal == null) {
|
LOG.info("Reconfiguring {} to {}", property, newVal);
|
||||||
// set to default
|
int movers;
|
||||||
movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
if (newVal == null) {
|
||||||
} else {
|
// set to default
|
||||||
movers = Integer.parseInt(newVal);
|
movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
||||||
if (movers <= 0) {
|
} else {
|
||||||
rootException = new ReconfigurationException(
|
movers = Integer.parseInt(newVal);
|
||||||
property,
|
if (movers <= 0) {
|
||||||
newVal,
|
|
||||||
getConf().get(property),
|
|
||||||
new IllegalArgumentException(
|
|
||||||
"balancer max concurrent movers must be larger than 0"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
boolean success = xserver.updateBalancerMaxConcurrentMovers(movers);
|
|
||||||
if (!success) {
|
|
||||||
rootException = new ReconfigurationException(
|
rootException = new ReconfigurationException(
|
||||||
property,
|
property,
|
||||||
newVal,
|
newVal,
|
||||||
getConf().get(property),
|
getConf().get(property),
|
||||||
new IllegalArgumentException(
|
new IllegalArgumentException(
|
||||||
"Could not modify concurrent moves thread count"));
|
"balancer max concurrent movers must be larger than 0"));
|
||||||
}
|
|
||||||
return Integer.toString(movers);
|
|
||||||
} catch (NumberFormatException nfe) {
|
|
||||||
rootException = new ReconfigurationException(
|
|
||||||
property, newVal, getConf().get(property), nfe);
|
|
||||||
} finally {
|
|
||||||
if (rootException != null) {
|
|
||||||
LOG.warn(String.format(
|
|
||||||
"Exception in updating balancer max concurrent movers %s to %s",
|
|
||||||
property, newVal), rootException);
|
|
||||||
throw rootException;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
boolean success = xserver.updateBalancerMaxConcurrentMovers(movers);
|
||||||
|
if (!success) {
|
||||||
|
rootException = new ReconfigurationException(
|
||||||
|
property,
|
||||||
|
newVal,
|
||||||
|
getConf().get(property),
|
||||||
|
new IllegalArgumentException(
|
||||||
|
"Could not modify concurrent moves thread count"));
|
||||||
|
}
|
||||||
|
return Integer.toString(movers);
|
||||||
|
} catch (NumberFormatException nfe) {
|
||||||
|
rootException = new ReconfigurationException(
|
||||||
|
property, newVal, getConf().get(property), nfe);
|
||||||
|
} finally {
|
||||||
|
if (rootException != null) {
|
||||||
|
LOG.warn(String.format(
|
||||||
|
"Exception in updating balancer max concurrent movers %s to %s",
|
||||||
|
property, newVal), rootException);
|
||||||
|
throw rootException;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
default:
|
break;
|
||||||
break;
|
}
|
||||||
|
case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY: {
|
||||||
|
ReconfigurationException rootException = null;
|
||||||
|
try {
|
||||||
|
LOG.info("Reconfiguring {} to {}", property, newVal);
|
||||||
|
long intervalMs;
|
||||||
|
if (newVal == null) {
|
||||||
|
// Set to default.
|
||||||
|
intervalMs = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
||||||
|
} else {
|
||||||
|
intervalMs = Long.parseLong(newVal);
|
||||||
|
}
|
||||||
|
dnConf.setBlockReportInterval(intervalMs);
|
||||||
|
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
|
||||||
|
if (bpos != null) {
|
||||||
|
for (BPServiceActor actor : bpos.getBPServiceActors()) {
|
||||||
|
actor.getScheduler().setBlockReportIntervalMs(intervalMs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Long.toString(intervalMs);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
rootException = new ReconfigurationException(
|
||||||
|
property, newVal, getConf().get(property), e);
|
||||||
|
} finally {
|
||||||
|
if (rootException != null) {
|
||||||
|
LOG.warn(String.format(
|
||||||
|
"Exception in updating block report interval %s to %s",
|
||||||
|
property, newVal), rootException);
|
||||||
|
throw rootException;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
throw new ReconfigurationException(
|
throw new ReconfigurationException(
|
||||||
property, newVal, getConf().get(property));
|
property, newVal, getConf().get(property));
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
@ -293,4 +295,74 @@ public class TestDataNodeReconfiguration {
|
||||||
assertEquals("should not be able to get thread quota", false,
|
assertEquals("should not be able to get thread quota", false,
|
||||||
dataNode.xserver.balanceThrottler.acquire());
|
dataNode.xserver.balanceThrottler.acquire());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReportIntervalReconfiguration()
|
||||||
|
throws ReconfigurationException, IOException {
|
||||||
|
int blockReportInterval = 300 * 1000;
|
||||||
|
for (int i = 0; i < NUM_DATA_NODE; i++) {
|
||||||
|
DataNode dn = cluster.getDataNodes().get(i);
|
||||||
|
|
||||||
|
// Try invalid values.
|
||||||
|
try {
|
||||||
|
dn.reconfigureProperty(
|
||||||
|
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, "text");
|
||||||
|
fail("ReconfigurationException expected");
|
||||||
|
} catch (ReconfigurationException expected) {
|
||||||
|
assertTrue("expecting NumberFormatException",
|
||||||
|
expected.getCause() instanceof NumberFormatException);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
dn.reconfigureProperty(
|
||||||
|
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
||||||
|
String.valueOf(-1));
|
||||||
|
fail("ReconfigurationException expected");
|
||||||
|
} catch (ReconfigurationException expected) {
|
||||||
|
assertTrue("expecting IllegalArgumentException",
|
||||||
|
expected.getCause() instanceof IllegalArgumentException);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Change properties.
|
||||||
|
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
||||||
|
String.valueOf(blockReportInterval));
|
||||||
|
|
||||||
|
// Verify change.
|
||||||
|
assertEquals(String.format("%s has wrong value",
|
||||||
|
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
|
||||||
|
blockReportInterval,
|
||||||
|
dn.getDnConf().getBlockReportInterval());
|
||||||
|
for (BPOfferService bpos : dn.getAllBpOs()) {
|
||||||
|
if (bpos != null) {
|
||||||
|
for (BPServiceActor actor : bpos.getBPServiceActors()) {
|
||||||
|
assertEquals(String.format("%s has wrong value",
|
||||||
|
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
|
||||||
|
blockReportInterval,
|
||||||
|
actor.getScheduler().getBlockReportIntervalMs());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Revert to default.
|
||||||
|
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
||||||
|
null);
|
||||||
|
assertEquals(String.format("%s has wrong value",
|
||||||
|
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
|
||||||
|
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT,
|
||||||
|
dn.getDnConf().getBlockReportInterval());
|
||||||
|
// Verify default.
|
||||||
|
for (BPOfferService bpos : dn.getAllBpOs()) {
|
||||||
|
if (bpos != null) {
|
||||||
|
for (BPServiceActor actor : bpos.getBPServiceActors()) {
|
||||||
|
assertEquals(String.format("%s has wrong value",
|
||||||
|
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
|
||||||
|
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT,
|
||||||
|
actor.getScheduler().getBlockReportIntervalMs());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(String.format("expect %s is not configured",
|
||||||
|
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), null, dn
|
||||||
|
.getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -336,7 +336,7 @@ public class TestDFSAdmin {
|
||||||
final List<String> outs = Lists.newArrayList();
|
final List<String> outs = Lists.newArrayList();
|
||||||
final List<String> errs = Lists.newArrayList();
|
final List<String> errs = Lists.newArrayList();
|
||||||
getReconfigurableProperties("datanode", address, outs, errs);
|
getReconfigurableProperties("datanode", address, outs, errs);
|
||||||
assertEquals(3, outs.size());
|
assertEquals(4, outs.size());
|
||||||
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
|
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue