HDFS-1477. Support reconfiguring dfs.heartbeat.interval and dfs.namenode.heartbeat.recheck-interval without NN restart. (Contributed by Xiaobing Zhou)

This commit is contained in:
Arpit Agarwal 2016-03-10 19:03:55 -08:00
parent 710811652c
commit b0ea50bb29
9 changed files with 357 additions and 86 deletions

View File

@ -305,7 +305,8 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L;
invalidateBlocks = new InvalidateBlocks(
datanodeManager.blockInvalidateLimit, startupDelayBlockDeletionInMs);
datanodeManager.getBlockInvalidateLimit(),
startupDelayBlockDeletionInMs);
// Compute the map capacity by allocating 2% of total memory
blocksMap = new BlocksMap(

View File

@ -22,6 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
@ -67,6 +68,8 @@ public class DatanodeManager {
private final HeartbeatManager heartbeatManager;
private final FSClusterStats fsClusterStats;
private volatile long heartbeatIntervalSeconds;
private volatile int heartbeatRecheckInterval;
/**
* Stores the datanode -> block map.
* <p>
@ -110,7 +113,7 @@ public class DatanodeManager {
/** The period to wait for datanode heartbeat.*/
private long heartbeatExpireInterval;
/** Ask Datanode only up to this many blocks to delete. */
final int blockInvalidateLimit;
private volatile int blockInvalidateLimit;
/** The interval for judging stale DataNodes for read/write */
private final long staleInterval;
@ -224,10 +227,10 @@ public class DatanodeManager {
dnsToSwitchMapping.resolve(locations);
}
final long heartbeatIntervalSeconds = conf.getLong(
heartbeatIntervalSeconds = conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
final int heartbeatRecheckInterval = conf.getInt(
heartbeatRecheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
@ -345,6 +348,10 @@ public FSClusterStats getFSClusterStats() {
return fsClusterStats;
}
int getBlockInvalidateLimit() {
return blockInvalidateLimit;
}
/** @return the datanode statistics. */
public DatanodeStatistics getDatanodeStatistics() {
return heartbeatManager;
@ -1093,6 +1100,14 @@ long getStaleInterval() {
return staleInterval;
}
public long getHeartbeatInterval() {
return this.heartbeatIntervalSeconds;
}
public long getHeartbeatRecheckInterval() {
return this.heartbeatRecheckInterval;
}
/**
* Set the number of current stale DataNodes. The HeartbeatManager got this
* number based on DataNodes' heartbeats.
@ -1646,5 +1661,28 @@ public double getInServiceXceiverAverage() {
}
};
}
public void setHeartbeatInterval(long intervalSeconds) {
setHeartbeatInterval(intervalSeconds,
this.heartbeatRecheckInterval);
}
public void setHeartbeatRecheckInterval(int recheckInterval) {
setHeartbeatInterval(this.heartbeatIntervalSeconds,
recheckInterval);
}
/**
* Set parameters derived from heartbeat interval.
*/
private void setHeartbeatInterval(long intervalSeconds,
int recheckInterval) {
this.heartbeatIntervalSeconds = intervalSeconds;
this.heartbeatRecheckInterval = recheckInterval;
this.heartbeatExpireInterval = 2L * recheckInterval + 10 * 1000
* intervalSeconds;
this.blockInvalidateLimit = Math.max(20 * (int) (intervalSeconds),
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
}
}

View File

@ -2970,6 +2970,7 @@ public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException {
@Override // ClientDatanodeProtocol & ReconfigurationProtocol
public List<String> listReconfigurableProperties()
throws IOException {
checkSuperuserPrivilege();
return RECONFIGURABLE_PROPERTIES;
}

View File

@ -21,11 +21,14 @@
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurableBase;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@ -41,6 +44,7 @@
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@ -92,6 +96,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -140,6 +145,10 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT;
import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
@ -181,7 +190,8 @@
* NameNode state, for example partial blocksMap etc.
**********************************************************/
@InterfaceAudience.Private
public class NameNode implements NameNodeStatusMXBean {
public class NameNode extends ReconfigurableBase implements
NameNodeStatusMXBean {
static{
HdfsConfiguration.init();
}
@ -259,7 +269,12 @@ public static enum OperationCategory {
public static final String[] NAMESERVICE_SPECIFIC_KEYS = {
DFS_HA_AUTO_FAILOVER_ENABLED_KEY
};
/** A list of property that are reconfigurable at runtime. */
static final List<String> RECONFIGURABLE_PROPERTIES = Collections
.unmodifiableList(Arrays.asList(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY));
private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
+ StartupOption.CHECKPOINT.getName() + "] | \n\t["
@ -329,7 +344,6 @@ public long getProtocolVersion(String protocol,
LogFactory.getLog("NameNodeMetricsLog");
protected FSNamesystem namesystem;
protected final Configuration conf;
protected final NamenodeRole role;
private volatile HAState state;
private final boolean haEnabled;
@ -866,12 +880,12 @@ public NameNode(Configuration conf) throws IOException {
protected NameNode(Configuration conf, NamenodeRole role)
throws IOException {
super(conf);
this.tracer = new Tracer.Builder("NameNode").
conf(TraceUtils.wrapHadoopConf(NAMENODE_HTRACE_PREFIX, conf)).
build();
this.tracerConfigurationManager =
new TracerConfigurationManager(NAMENODE_HTRACE_PREFIX, conf);
this.conf = conf;
this.role = role;
setClientNamenodeAddress(conf);
String nsId = getNameServiceId(conf);
@ -882,7 +896,7 @@ protected NameNode(Configuration conf, NamenodeRole role)
this.haContext = createHAContext();
try {
initializeGenericKeys(conf, nsId, namenodeId);
initialize(conf);
initialize(getConf());
try {
haContext.writeLock();
state.prepareToEnterState(haContext);
@ -1811,7 +1825,7 @@ public HAState getState() {
public void startActiveServices() throws IOException {
try {
namesystem.startActiveServices();
startTrashEmptier(conf);
startTrashEmptier(getConf());
} catch (Throwable t) {
doImmediateShutdown(t);
}
@ -1832,7 +1846,7 @@ public void stopActiveServices() throws IOException {
@Override
public void startStandbyServices() throws IOException {
try {
namesystem.startStandbyServices(conf);
namesystem.startStandbyServices(getConf());
} catch (Throwable t) {
doImmediateShutdown(t);
}
@ -1909,8 +1923,8 @@ boolean isStarted() {
*/
void checkHaStateChange(StateChangeRequestInfo req)
throws AccessControlException {
boolean autoHaEnabled = conf.getBoolean(DFS_HA_AUTO_FAILOVER_ENABLED_KEY,
DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
boolean autoHaEnabled = getConf().getBoolean(
DFS_HA_AUTO_FAILOVER_ENABLED_KEY, DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
switch (req.getSource()) {
case REQUEST_BY_USER:
if (autoHaEnabled) {
@ -1937,4 +1951,75 @@ void checkHaStateChange(StateChangeRequestInfo req)
break;
}
}
/*
* {@inheritDoc}
* */
@Override // ReconfigurableBase
public Collection<String> getReconfigurableProperties() {
return RECONFIGURABLE_PROPERTIES;
}
/*
* {@inheritDoc}
* */
@Override // ReconfigurableBase
protected String reconfigurePropertyImpl(String property, String newVal)
throws ReconfigurationException {
final DatanodeManager datanodeManager = namesystem.getBlockManager()
.getDatanodeManager();
switch (property) {
case DFS_HEARTBEAT_INTERVAL_KEY:
namesystem.writeLock();
try {
if (newVal == null) {
// set to default
datanodeManager.setHeartbeatInterval(DFS_HEARTBEAT_INTERVAL_DEFAULT);
return String.valueOf(DFS_HEARTBEAT_INTERVAL_DEFAULT);
} else {
datanodeManager.setHeartbeatInterval(Long.parseLong(newVal));
return String.valueOf(datanodeManager.getHeartbeatInterval());
}
} catch (NumberFormatException nfe) {
throw new ReconfigurationException(property, newVal, getConf().get(
property), nfe);
} finally {
namesystem.writeUnlock();
LOG.info("RECONFIGURE* changed heartbeatInterval to "
+ datanodeManager.getHeartbeatInterval());
}
case DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY:
namesystem.writeLock();
try {
if (newVal == null) {
// set to default
datanodeManager
.setHeartbeatRecheckInterval(
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT);
return String
.valueOf(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT);
} else {
datanodeManager.setHeartbeatRecheckInterval(Integer.parseInt(newVal));
return String.valueOf(datanodeManager.getHeartbeatRecheckInterval());
}
} catch (NumberFormatException nfe) {
throw new ReconfigurationException(property, newVal, getConf().get(
property), nfe);
} finally {
namesystem.writeUnlock();
LOG.info("RECONFIGURE* changed heartbeatRecheckInterval to "
+ datanodeManager.getHeartbeatRecheckInterval());
}
default:
break;
}
throw new ReconfigurationException(property, newVal, getConf()
.get(property));
}
@Override // ReconfigurableBase
protected Configuration getNewConf() {
return new HdfsConfiguration();
}
}

View File

@ -43,7 +43,6 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
@ -2086,7 +2085,7 @@ public EventBatchList getEditsFromTxid(long txid) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.READ); // only active
namesystem.checkSuperuserPrivilege();
int maxEventsPerRPC = nn.conf.getInt(
int maxEventsPerRPC = nn.getConf().getInt(
DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY,
DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT);
FSEditLog log = namesystem.getFSImage().getEditLog();
@ -2189,23 +2188,24 @@ public void removeSpanReceiver(long id) throws IOException {
}
@Override // ReconfigurationProtocol
public void startReconfiguration() {
throw new UnsupportedOperationException(
"Namenode startReconfiguration is not implemented.",
new ReconfigurationException());
public void startReconfiguration() throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
nn.startReconfigurationTask();
}
@Override // ReconfigurationProtocol
public ReconfigurationTaskStatus getReconfigurationStatus() {
throw new UnsupportedOperationException(
" Namenode getReconfigurationStatus is not implemented.",
new ReconfigurationException());
public ReconfigurationTaskStatus getReconfigurationStatus()
throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return nn.getReconfigurationTaskStatus();
}
@Override // ReconfigurationProtocol
public List<String> listReconfigurableProperties() {
throw new UnsupportedOperationException(
" Namenode listReconfigurableProperties is not implemented.",
new ReconfigurationException());
public List<String> listReconfigurableProperties() throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return NameNode.RECONFIGURABLE_PROPERTIES;
}
}

View File

@ -855,7 +855,7 @@ private void copyBlock(final DFSClient dfs, LocatedBlock lblock,
setInetSocketAddress(targetAddr).
setCachingStrategy(CachingStrategy.newDropBehind()).
setClientCacheContext(dfs.getClientContext()).
setConfiguration(namenode.conf).
setConfiguration(namenode.getConf()).
setTracer(tracer).
setRemotePeerFactory(new RemotePeerFactory() {
@Override

View File

@ -81,7 +81,7 @@ public void teardown() throws Exception {
@Test(timeout=120000)
public void testCompInvalidate() throws Exception {
final int blockInvalidateLimit = bm.getDatanodeManager()
.blockInvalidateLimit;
.getBlockInvalidateLimit();
namesystem.writeLock();
try {
for (int i=0; i<nodes.length; i++) {

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import static org.junit.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT;
public class TestNameNodeReconfigure {
public static final Log LOG = LogFactory
.getLog(TestNameNodeReconfigure.class);
private MiniDFSCluster cluster;
@Before
public void setUp() throws IOException {
Configuration conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).build();
}
/**
* Test that we can modify configuration properties.
*/
@Test
public void testReconfigure() throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
final DatanodeManager datanodeManager = nameNode.namesystem
.getBlockManager().getDatanodeManager();
// change properties
nameNode.reconfigureProperty(DFS_HEARTBEAT_INTERVAL_KEY, "" + 6);
nameNode.reconfigureProperty(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
"" + (10 * 60 * 1000));
// try invalid values
try {
nameNode.reconfigureProperty(DFS_HEARTBEAT_INTERVAL_KEY, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue(expected.getCause() instanceof NumberFormatException);
}
try {
nameNode.reconfigureProperty(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
"text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue(expected.getCause() instanceof NumberFormatException);
}
// verify change
assertEquals(
DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value",
6,
nameNode.getConf().getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT));
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value", 6,
datanodeManager.getHeartbeatInterval());
assertEquals(
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY + " has wrong value",
10 * 60 * 1000,
nameNode.getConf().getInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY
+ " has wrong value", 10 * 60 * 1000,
datanodeManager.getHeartbeatRecheckInterval());
// revert to defaults
nameNode.reconfigureProperty(DFS_HEARTBEAT_INTERVAL_KEY, null);
nameNode.reconfigureProperty(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
null);
// verify defaults
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value", null,
nameNode.getConf().get(DFS_HEARTBEAT_INTERVAL_KEY));
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value",
DFS_HEARTBEAT_INTERVAL_DEFAULT, datanodeManager.getHeartbeatInterval());
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY
+ " has wrong value", null,
nameNode.getConf().get(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY
+ " has wrong value", DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT,
datanodeManager.getHeartbeatRecheckInterval());
}
@After
public void shutDown() throws IOException {
if (cluster != null) {
cluster.shutdown();
}
}
}

View File

@ -18,7 +18,11 @@
package org.apache.hadoop.hdfs.tools;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
@ -31,6 +35,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -42,6 +47,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.anyOf;
@ -89,12 +95,6 @@ private void restartCluster() throws IOException {
namenode = cluster.getNameNode();
}
private void startReconfiguration(String nodeType, String address,
final List<String> outs, final List<String> errs) throws IOException {
reconfigurationOutErrFormatter("startReconfiguration", nodeType,
address, outs, errs);
}
private void getReconfigurableProperties(String nodeType, String address,
final List<String> outs, final List<String> errs) throws IOException {
reconfigurationOutErrFormatter("getReconfigurableProperties", nodeType,
@ -151,9 +151,10 @@ public void testDataNodeGetReconfigurableProperties() throws IOException {
* @param expectedSuccuss set true if the reconfiguration task should success.
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
private void testDataNodeGetReconfigurationStatus(boolean expectedSuccuss)
throws IOException, InterruptedException {
throws IOException, InterruptedException, TimeoutException {
ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
datanode.setReconfigurationUtil(ru);
@ -179,21 +180,10 @@ private void testDataNodeGetReconfigurationStatus(boolean expectedSuccuss)
assertThat(admin.startReconfiguration("datanode", address), is(0));
int count = 100;
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
while (count > 0) {
outs.clear();
errs.clear();
getReconfigurationStatus("datanode", address, outs, errs);
if (!outs.isEmpty() && outs.get(0).contains("finished")) {
break;
}
count--;
Thread.sleep(100);
}
LOG.info(String.format("count=%d", count));
assertTrue(count > 0);
awaitReconfigurationFinished("datanode", address, outs, errs);
if (expectedSuccuss) {
assertThat(outs.size(), is(4));
} else {
@ -232,59 +222,89 @@ private void testDataNodeGetReconfigurationStatus(boolean expectedSuccuss)
@Test(timeout = 30000)
public void testDataNodeGetReconfigurationStatus() throws IOException,
InterruptedException {
InterruptedException, TimeoutException {
testDataNodeGetReconfigurationStatus(true);
restartCluster();
testDataNodeGetReconfigurationStatus(false);
}
@Test(timeout = 30000)
public void testNameNodeStartReconfiguration() throws IOException {
final String address = namenode.getHostAndPort();
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
startReconfiguration("namenode", address, outs, errs);
assertEquals(0, outs.size());
assertTrue(errs.size() > 1);
assertThat(
errs.get(0),
is(allOf(containsString("Namenode"), containsString("reconfiguring:"),
containsString("startReconfiguration"),
containsString("is not implemented"),
containsString("UnsupportedOperationException"))));
}
@Test(timeout = 30000)
public void testNameNodeGetReconfigurableProperties() throws IOException {
final String address = namenode.getHostAndPort();
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
assertEquals(0, outs.size());
assertTrue(errs.size() > 1);
assertThat(
errs.get(0),
is(allOf(containsString("Namenode"),
containsString("reconfiguration:"),
containsString("listReconfigurableProperties"),
containsString("is not implemented"),
containsString("UnsupportedOperationException"))));
assertEquals(3, outs.size());
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(1));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(2));
assertEquals(errs.size(), 0);
}
void awaitReconfigurationFinished(final String nodeType,
final String address, final List<String> outs, final List<String> errs)
throws TimeoutException, IOException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
outs.clear();
errs.clear();
try {
getReconfigurationStatus(nodeType, address, outs, errs);
} catch (IOException e) {
LOG.error(String.format(
"call getReconfigurationStatus on %s[%s] failed.", nodeType,
address), e);
}
return !outs.isEmpty() && outs.get(0).contains("finished");
}
}, 100, 100 * 100);
}
@Test(timeout = 30000)
public void testNameNodeGetReconfigurationStatus() throws IOException {
public void testNameNodeGetReconfigurationStatus() throws IOException,
InterruptedException, TimeoutException {
ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
namenode.setReconfigurationUtil(ru);
final String address = namenode.getHostAndPort();
List<ReconfigurationUtil.PropertyChange> changes =
new ArrayList<>();
changes.add(new ReconfigurationUtil.PropertyChange(
DFS_HEARTBEAT_INTERVAL_KEY, String.valueOf(6),
namenode.getConf().get(DFS_HEARTBEAT_INTERVAL_KEY)));
changes.add(new ReconfigurationUtil.PropertyChange(
"randomKey", "new123", "old456"));
when(ru.parseChangedProperties(any(Configuration.class),
any(Configuration.class))).thenReturn(changes);
assertThat(admin.startReconfiguration("namenode", address), is(0));
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurationStatus("namenode", address, outs, errs);
assertEquals(0, outs.size());
assertTrue(errs.size() > 1);
assertThat(
errs.get(0),
is(allOf(containsString("Namenode"),
containsString("reloading configuration:"),
containsString("getReconfigurationStatus"),
containsString("is not implemented"),
containsString("UnsupportedOperationException"))));
awaitReconfigurationFinished("namenode", address, outs, errs);
// verify change
assertEquals(
DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value",
6,
namenode
.getConf()
.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT));
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value",
6,
namenode
.getNamesystem()
.getBlockManager()
.getDatanodeManager()
.getHeartbeatInterval());
int offset = 1;
assertThat(outs.get(offset), containsString("SUCCESS: Changed property "
+ DFS_HEARTBEAT_INTERVAL_KEY));
assertThat(outs.get(offset + 1),
is(allOf(containsString("From:"), containsString("3"))));
assertThat(outs.get(offset + 2),
is(allOf(containsString("To:"), containsString("6"))));
}
}