HDFS-9214. Support reconfiguring dfs.datanode.balance.max.concurrent.moves without DN restart. (Contributed by Xiaobing Zhou)
This commit is contained in:
parent
59dbe8b3e9
commit
9d817fa1b1
|
@ -1720,6 +1720,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-9474. TestPipelinesFailover should not fail when printing debug
|
HDFS-9474. TestPipelinesFailover should not fail when printing debug
|
||||||
message. (John Zhuge via Yongjun Zhang)
|
message. (John Zhuge via Yongjun Zhang)
|
||||||
|
|
||||||
|
HDFS-9214. Support reconfiguring dfs.datanode.balance.max.concurrent.moves
|
||||||
|
without DN restart. (Xiaobing Zhou via Arpit Agarwal)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -42,6 +42,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFA
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
|
||||||
|
@ -92,7 +94,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -212,6 +213,7 @@ import com.google.common.cache.CacheLoader;
|
||||||
import com.google.common.cache.LoadingCache;
|
import com.google.common.cache.LoadingCache;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.protobuf.BlockingService;
|
import com.google.protobuf.BlockingService;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -284,7 +286,9 @@ public class DataNode extends ReconfigurableBase
|
||||||
/** A list of property that are reconfigurable at runtime. */
|
/** A list of property that are reconfigurable at runtime. */
|
||||||
private static final List<String> RECONFIGURABLE_PROPERTIES =
|
private static final List<String> RECONFIGURABLE_PROPERTIES =
|
||||||
Collections.unmodifiableList(
|
Collections.unmodifiableList(
|
||||||
Arrays.asList(DFS_DATANODE_DATA_DIR_KEY));
|
Arrays.asList(
|
||||||
|
DFS_DATANODE_DATA_DIR_KEY,
|
||||||
|
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY));
|
||||||
|
|
||||||
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
|
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
|
||||||
|
|
||||||
|
@ -522,6 +526,38 @@ public class DataNode extends ReconfigurableBase
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if (property.equals(
|
||||||
|
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)) {
|
||||||
|
ReconfigurationException rootException = null;
|
||||||
|
try {
|
||||||
|
LOG.info("Reconfiguring " + property + " to " + newVal);
|
||||||
|
int movers;
|
||||||
|
if (newVal == null) {
|
||||||
|
// set to default
|
||||||
|
movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
||||||
|
} else {
|
||||||
|
movers = Integer.parseInt(newVal);
|
||||||
|
if (movers <= 0) {
|
||||||
|
rootException = new ReconfigurationException(
|
||||||
|
property,
|
||||||
|
newVal,
|
||||||
|
getConf().get(property),
|
||||||
|
new IllegalArgumentException(
|
||||||
|
"balancer max concurrent movers must be larger than 0"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
xserver.updateBalancerMaxConcurrentMovers(movers);
|
||||||
|
} catch(NumberFormatException nfe) {
|
||||||
|
rootException = new ReconfigurationException(
|
||||||
|
property, newVal, getConf().get(property), nfe);
|
||||||
|
} finally {
|
||||||
|
if (rootException != null) {
|
||||||
|
LOG.warn(String.format(
|
||||||
|
"Exception in updating balancer max concurrent movers %s to %s",
|
||||||
|
property, newVal), rootException);
|
||||||
|
throw rootException;
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new ReconfigurationException(
|
throw new ReconfigurationException(
|
||||||
property, newVal, getConf().get(property));
|
property, newVal, getConf().get(property));
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.nio.channels.AsynchronousCloseException;
|
import java.nio.channels.AsynchronousCloseException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
@ -31,6 +32,7 @@ import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -64,36 +66,45 @@ class DataXceiverServer implements Runnable {
|
||||||
*/
|
*/
|
||||||
static class BlockBalanceThrottler extends DataTransferThrottler {
|
static class BlockBalanceThrottler extends DataTransferThrottler {
|
||||||
private int numThreads;
|
private int numThreads;
|
||||||
private int maxThreads;
|
private final AtomicInteger maxThreads = new AtomicInteger(0);
|
||||||
|
|
||||||
/**Constructor
|
/**Constructor
|
||||||
*
|
*
|
||||||
* @param bandwidth Total amount of bandwidth can be used for balancing
|
* @param bandwidth Total amount of bandwidth can be used for balancing
|
||||||
*/
|
*/
|
||||||
private BlockBalanceThrottler(long bandwidth, int maxThreads) {
|
private BlockBalanceThrottler(long bandwidth, int maxThreads) {
|
||||||
super(bandwidth);
|
super(bandwidth);
|
||||||
this.maxThreads = maxThreads;
|
this.maxThreads.set(maxThreads);
|
||||||
LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
|
LOG.info("Balancing bandwith is " + bandwidth + " bytes/s");
|
||||||
LOG.info("Number threads for balancing is "+ maxThreads);
|
LOG.info("Number threads for balancing is " + maxThreads);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setMaxConcurrentMovers(int movers) {
|
||||||
|
this.maxThreads.set(movers);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getMaxConcurrentMovers() {
|
||||||
|
return this.maxThreads.get();
|
||||||
|
}
|
||||||
|
|
||||||
/** Check if the block move can start.
|
/** Check if the block move can start.
|
||||||
*
|
*
|
||||||
* Return true if the thread quota is not exceeded and
|
* Return true if the thread quota is not exceeded and
|
||||||
* the counter is incremented; False otherwise.
|
* the counter is incremented; False otherwise.
|
||||||
*/
|
*/
|
||||||
synchronized boolean acquire() {
|
synchronized boolean acquire() {
|
||||||
if (numThreads >= maxThreads) {
|
if (numThreads >= maxThreads.get()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
numThreads++;
|
numThreads++;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Mark that the move is completed. The thread counter is decremented. */
|
/** Mark that the move is completed. The thread counter is decremented. */
|
||||||
synchronized void release() {
|
synchronized void release() {
|
||||||
numThreads--;
|
numThreads--;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final BlockBalanceThrottler balanceThrottler;
|
final BlockBalanceThrottler balanceThrottler;
|
||||||
|
@ -108,7 +119,6 @@ class DataXceiverServer implements Runnable {
|
||||||
|
|
||||||
DataXceiverServer(PeerServer peerServer, Configuration conf,
|
DataXceiverServer(PeerServer peerServer, Configuration conf,
|
||||||
DataNode datanode) {
|
DataNode datanode) {
|
||||||
|
|
||||||
this.peerServer = peerServer;
|
this.peerServer = peerServer;
|
||||||
this.datanode = datanode;
|
this.datanode = datanode;
|
||||||
|
|
||||||
|
@ -288,4 +298,8 @@ class DataXceiverServer implements Runnable {
|
||||||
peers.remove(peer);
|
peers.remove(peer);
|
||||||
peersXceiver.remove(peer);
|
peersXceiver.remove(peer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateBalancerMaxConcurrentMovers(int movers) {
|
||||||
|
balanceThrottler.setMaxConcurrentMovers(movers);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,241 @@
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.ReconfigurationException;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to reconfigure some parameters for DataNode without restart
|
||||||
|
*/
|
||||||
|
public class TestDataNodeReconfiguration {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestBlockRecovery.class);
|
||||||
|
private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory()
|
||||||
|
+ "data";
|
||||||
|
private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
|
||||||
|
"localhost", 5020);
|
||||||
|
private final int NUM_NAME_NODE = 1;
|
||||||
|
private final int NUM_DATA_NODE = 10;
|
||||||
|
private MiniDFSCluster cluster;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void Setup() throws IOException {
|
||||||
|
startDFSCluster(NUM_NAME_NODE, NUM_DATA_NODE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
cluster = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
File dir = new File(DATA_DIR);
|
||||||
|
if (dir.exists())
|
||||||
|
Assert.assertTrue("Cannot delete data-node dirs",
|
||||||
|
FileUtil.fullyDelete(dir));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startDFSCluster(int numNameNodes, int numDataNodes)
|
||||||
|
throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
MiniDFSNNTopology nnTopology = MiniDFSNNTopology
|
||||||
|
.simpleFederatedTopology(numNameNodes);
|
||||||
|
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).nnTopology(nnTopology)
|
||||||
|
.numDataNodes(numDataNodes).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts an instance of DataNode
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public DataNode[] createDNsForTest(int numDateNode) throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
||||||
|
|
||||||
|
DataNode[] result = new DataNode[numDateNode];
|
||||||
|
for (int i = 0; i < numDateNode; i++) {
|
||||||
|
result[i] = DataNodeTestUtils.startDNWithMockNN(conf, NN_ADDR, DATA_DIR);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxConcurrentMoversReconfiguration()
|
||||||
|
throws ReconfigurationException, IOException {
|
||||||
|
int maxConcurrentMovers = 10;
|
||||||
|
for (int i = 0; i < NUM_DATA_NODE; i++) {
|
||||||
|
DataNode dn = cluster.getDataNodes().get(i);
|
||||||
|
|
||||||
|
// try invalid values
|
||||||
|
try {
|
||||||
|
dn.reconfigureProperty(
|
||||||
|
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, "text");
|
||||||
|
fail("ReconfigurationException expected");
|
||||||
|
} catch (ReconfigurationException expected) {
|
||||||
|
assertTrue("expecting NumberFormatException",
|
||||||
|
expected.getCause() instanceof NumberFormatException);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
dn.reconfigureProperty(
|
||||||
|
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||||
|
String.valueOf(-1));
|
||||||
|
fail("ReconfigurationException expected");
|
||||||
|
} catch (ReconfigurationException expected) {
|
||||||
|
assertTrue("expecting IllegalArgumentException",
|
||||||
|
expected.getCause() instanceof IllegalArgumentException);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
dn.reconfigureProperty(
|
||||||
|
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||||
|
String.valueOf(0));
|
||||||
|
fail("ReconfigurationException expected");
|
||||||
|
} catch (ReconfigurationException expected) {
|
||||||
|
assertTrue("expecting IllegalArgumentException",
|
||||||
|
expected.getCause() instanceof IllegalArgumentException);
|
||||||
|
}
|
||||||
|
|
||||||
|
// change properties
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||||
|
String.valueOf(maxConcurrentMovers));
|
||||||
|
|
||||||
|
// verify change
|
||||||
|
assertEquals(String.format("%s has wrong value",
|
||||||
|
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY),
|
||||||
|
maxConcurrentMovers, dn.xserver.balanceThrottler.getMaxConcurrentMovers());
|
||||||
|
|
||||||
|
assertEquals(String.format("%s has wrong value",
|
||||||
|
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY),
|
||||||
|
maxConcurrentMovers, Integer.parseInt(dn.getConf().get(
|
||||||
|
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)));
|
||||||
|
|
||||||
|
// revert to default
|
||||||
|
dn.reconfigureProperty(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||||
|
null);
|
||||||
|
|
||||||
|
// verify default
|
||||||
|
assertEquals(String.format("%s has wrong value",
|
||||||
|
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY),
|
||||||
|
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT,
|
||||||
|
dn.xserver.balanceThrottler.getMaxConcurrentMovers());
|
||||||
|
|
||||||
|
assertEquals(String.format("expect %s is not configured",
|
||||||
|
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY), null, dn
|
||||||
|
.getConf().get(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAcquireWithMaxConcurrentMoversGreaterThanDefault()
|
||||||
|
throws IOException, ReconfigurationException {
|
||||||
|
testAcquireWithMaxConcurrentMoversShared(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAcquireWithMaxConcurrentMoversLessThanDefault()
|
||||||
|
throws IOException, ReconfigurationException {
|
||||||
|
testAcquireWithMaxConcurrentMoversShared(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testAcquireWithMaxConcurrentMoversShared(
|
||||||
|
int maxConcurrentMovers)
|
||||||
|
throws IOException, ReconfigurationException {
|
||||||
|
DataNode[] dns = null;
|
||||||
|
try {
|
||||||
|
dns = createDNsForTest(1);
|
||||||
|
testAcquireOnMaxConcurrentMoversReconfiguration(dns[0],
|
||||||
|
maxConcurrentMovers);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
throw ioe;
|
||||||
|
} catch (ReconfigurationException re) {
|
||||||
|
throw re;
|
||||||
|
} finally {
|
||||||
|
shutDownDNs(dns);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void shutDownDNs(DataNode[] dns) {
|
||||||
|
if (dns == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < dns.length; i++) {
|
||||||
|
try {
|
||||||
|
if (dns[i] == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
dns[i].shutdown();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Cannot close: ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testAcquireOnMaxConcurrentMoversReconfiguration(
|
||||||
|
DataNode dataNode, int maxConcurrentMovers) throws IOException,
|
||||||
|
ReconfigurationException {
|
||||||
|
int defaultMaxThreads = dataNode.getConf().getInt(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
|
||||||
|
for (int i = 0; i < defaultMaxThreads; i++) {
|
||||||
|
assertEquals("should be able to get thread quota", true,
|
||||||
|
dataNode.xserver.balanceThrottler.acquire());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals("should not be able to get thread quota", false,
|
||||||
|
dataNode.xserver.balanceThrottler.acquire());
|
||||||
|
|
||||||
|
// change properties
|
||||||
|
dataNode.reconfigureProperty(
|
||||||
|
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||||
|
String.valueOf(maxConcurrentMovers));
|
||||||
|
|
||||||
|
assertEquals("thread quota is wrong", maxConcurrentMovers,
|
||||||
|
dataNode.xserver.balanceThrottler.getMaxConcurrentMovers()); // thread quota
|
||||||
|
|
||||||
|
int val = Math.abs(maxConcurrentMovers - defaultMaxThreads);
|
||||||
|
if (defaultMaxThreads < maxConcurrentMovers) {
|
||||||
|
for (int i = 0; i < val; i++) {
|
||||||
|
assertEquals("should be able to get thread quota", true,
|
||||||
|
dataNode.xserver.balanceThrottler.acquire());
|
||||||
|
}
|
||||||
|
} else if (defaultMaxThreads > maxConcurrentMovers) {
|
||||||
|
for (int i = 0; i < val; i++) {
|
||||||
|
assertEquals("should not be able to get thread quota", false,
|
||||||
|
dataNode.xserver.balanceThrottler.acquire());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals("should not be able to get thread quota", false,
|
||||||
|
dataNode.xserver.balanceThrottler.acquire());
|
||||||
|
}
|
||||||
|
}
|
|
@ -207,7 +207,7 @@ public class TestDFSAdmin {
|
||||||
final String address = "localhost:" + port;
|
final String address = "localhost:" + port;
|
||||||
List<String> outputs =
|
List<String> outputs =
|
||||||
getReconfigurationAllowedProperties("datanode", address);
|
getReconfigurationAllowedProperties("datanode", address);
|
||||||
assertEquals(2, outputs.size());
|
assertEquals(3, outputs.size());
|
||||||
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
||||||
outputs.get(1));
|
outputs.get(1));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue