From 9d817fa1b14b477e5440ae4edd78de849976d9b5 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Fri, 4 Dec 2015 14:46:46 -0800 Subject: [PATCH] HDFS-9214. Support reconfiguring dfs.datanode.balance.max.concurrent.moves without DN restart. (Contributed by Xiaobing Zhou) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/server/datanode/DataNode.java | 40 ++- .../server/datanode/DataXceiverServer.java | 58 +++-- .../datanode/TestDataNodeReconfiguration.java | 241 ++++++++++++++++++ .../hadoop/hdfs/tools/TestDFSAdmin.java | 2 +- 5 files changed, 319 insertions(+), 25 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 34c3ff291c9..e10450d02ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1720,6 +1720,9 @@ Release 2.8.0 - UNRELEASED HDFS-9474. TestPipelinesFailover should not fail when printing debug message. (John Zhuge via Yongjun Zhang) + HDFS-9214. Support reconfiguring dfs.datanode.balance.max.concurrent.moves + without DN restart. (Xiaobing Zhou via Arpit Agarwal) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 0a6875806c5..150ce6b69e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -42,6 +42,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT; @@ -92,7 +94,6 @@ import javax.management.ObjectName; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -212,6 +213,7 @@ import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.protobuf.BlockingService; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -284,7 +286,9 @@ public class DataNode extends ReconfigurableBase /** A list of property that are reconfigurable at runtime. */ private static final List RECONFIGURABLE_PROPERTIES = Collections.unmodifiableList( - Arrays.asList(DFS_DATANODE_DATA_DIR_KEY)); + Arrays.asList( + DFS_DATANODE_DATA_DIR_KEY, + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)); public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog"); @@ -522,6 +526,38 @@ public void reconfigurePropertyImpl(String property, String newVal) } } } + } else if (property.equals( + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)) { + ReconfigurationException rootException = null; + try { + LOG.info("Reconfiguring " + property + " to " + newVal); + int movers; + if (newVal == null) { + // set to default + movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; + } else { + movers = Integer.parseInt(newVal); + if (movers <= 0) { + rootException = new ReconfigurationException( + property, + newVal, + getConf().get(property), + new IllegalArgumentException( + "balancer max concurrent movers must be larger than 0")); + } + } + xserver.updateBalancerMaxConcurrentMovers(movers); + } catch(NumberFormatException nfe) { + rootException = new ReconfigurationException( + property, newVal, getConf().get(property), nfe); + } finally { + if (rootException != null) { + LOG.warn(String.format( + "Exception in updating balancer max concurrent movers %s to %s", + property, newVal), rootException); + throw rootException; + } + } } else { throw new ReconfigurationException( property, newVal, getConf().get(property)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 36852eb3088..36cf8a18077 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -21,6 +21,7 @@ import java.net.SocketTimeoutException; import java.nio.channels.AsynchronousCloseException; import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -31,6 +32,7 @@ import org.apache.hadoop.util.Daemon; import com.google.common.annotations.VisibleForTesting; + import org.slf4j.Logger; /** @@ -64,36 +66,45 @@ class DataXceiverServer implements Runnable { */ static class BlockBalanceThrottler extends DataTransferThrottler { private int numThreads; - private int maxThreads; - + private final AtomicInteger maxThreads = new AtomicInteger(0); + /**Constructor * * @param bandwidth Total amount of bandwidth can be used for balancing */ - private BlockBalanceThrottler(long bandwidth, int maxThreads) { - super(bandwidth); - this.maxThreads = maxThreads; - LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s"); - LOG.info("Number threads for balancing is "+ maxThreads); - } - + private BlockBalanceThrottler(long bandwidth, int maxThreads) { + super(bandwidth); + this.maxThreads.set(maxThreads); + LOG.info("Balancing bandwith is " + bandwidth + " bytes/s"); + LOG.info("Number threads for balancing is " + maxThreads); + } + + private void setMaxConcurrentMovers(int movers) { + this.maxThreads.set(movers); + } + + @VisibleForTesting + int getMaxConcurrentMovers() { + return this.maxThreads.get(); + } + /** Check if the block move can start. * * Return true if the thread quota is not exceeded and * the counter is incremented; False otherwise. */ - synchronized boolean acquire() { - if (numThreads >= maxThreads) { - return false; - } - numThreads++; - return true; - } - - /** Mark that the move is completed. The thread counter is decremented. */ - synchronized void release() { - numThreads--; - } + synchronized boolean acquire() { + if (numThreads >= maxThreads.get()) { + return false; + } + numThreads++; + return true; + } + + /** Mark that the move is completed. The thread counter is decremented. */ + synchronized void release() { + numThreads--; + } } final BlockBalanceThrottler balanceThrottler; @@ -108,7 +119,6 @@ synchronized void release() { DataXceiverServer(PeerServer peerServer, Configuration conf, DataNode datanode) { - this.peerServer = peerServer; this.datanode = datanode; @@ -288,4 +298,8 @@ synchronized void releasePeer(Peer peer) { peers.remove(peer); peersXceiver.remove(peer); } + + public void updateBalancerMaxConcurrentMovers(int movers) { + balanceThrottler.setMaxConcurrentMovers(movers); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java new file mode 100644 index 00000000000..edaf7abb628 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java @@ -0,0 +1,241 @@ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test to reconfigure some parameters for DataNode without restart + */ +public class TestDataNodeReconfiguration { + + private static final Log LOG = LogFactory.getLog(TestBlockRecovery.class); + private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory() + + "data"; + private final static InetSocketAddress NN_ADDR = new InetSocketAddress( + "localhost", 5020); + private final int NUM_NAME_NODE = 1; + private final int NUM_DATA_NODE = 10; + private MiniDFSCluster cluster; + + @Before + public void Setup() throws IOException { + startDFSCluster(NUM_NAME_NODE, NUM_DATA_NODE); + } + + @After + public void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + + File dir = new File(DATA_DIR); + if (dir.exists()) + Assert.assertTrue("Cannot delete data-node dirs", + FileUtil.fullyDelete(dir)); + } + + private void startDFSCluster(int numNameNodes, int numDataNodes) + throws IOException { + Configuration conf = new Configuration(); + + MiniDFSNNTopology nnTopology = MiniDFSNNTopology + .simpleFederatedTopology(numNameNodes); + + cluster = new MiniDFSCluster.Builder(conf).nnTopology(nnTopology) + .numDataNodes(numDataNodes).build(); + cluster.waitActive(); + } + + /** + * Starts an instance of DataNode + * + * @throws IOException + */ + public DataNode[] createDNsForTest(int numDateNode) throws IOException { + Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR); + conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0"); + conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + + DataNode[] result = new DataNode[numDateNode]; + for (int i = 0; i < numDateNode; i++) { + result[i] = DataNodeTestUtils.startDNWithMockNN(conf, NN_ADDR, DATA_DIR); + } + return result; + } + + @Test + public void testMaxConcurrentMoversReconfiguration() + throws ReconfigurationException, IOException { + int maxConcurrentMovers = 10; + for (int i = 0; i < NUM_DATA_NODE; i++) { + DataNode dn = cluster.getDataNodes().get(i); + + // try invalid values + try { + dn.reconfigureProperty( + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, "text"); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting NumberFormatException", + expected.getCause() instanceof NumberFormatException); + } + try { + dn.reconfigureProperty( + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + String.valueOf(-1)); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting IllegalArgumentException", + expected.getCause() instanceof IllegalArgumentException); + } + try { + dn.reconfigureProperty( + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + String.valueOf(0)); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting IllegalArgumentException", + expected.getCause() instanceof IllegalArgumentException); + } + + // change properties + dn.reconfigureProperty(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + String.valueOf(maxConcurrentMovers)); + + // verify change + assertEquals(String.format("%s has wrong value", + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY), + maxConcurrentMovers, dn.xserver.balanceThrottler.getMaxConcurrentMovers()); + + assertEquals(String.format("%s has wrong value", + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY), + maxConcurrentMovers, Integer.parseInt(dn.getConf().get( + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY))); + + // revert to default + dn.reconfigureProperty(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + null); + + // verify default + assertEquals(String.format("%s has wrong value", + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY), + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT, + dn.xserver.balanceThrottler.getMaxConcurrentMovers()); + + assertEquals(String.format("expect %s is not configured", + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY), null, dn + .getConf().get(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)); + } + } + + @Test + public void testAcquireWithMaxConcurrentMoversGreaterThanDefault() + throws IOException, ReconfigurationException { + testAcquireWithMaxConcurrentMoversShared(10); + } + + @Test + public void testAcquireWithMaxConcurrentMoversLessThanDefault() + throws IOException, ReconfigurationException { + testAcquireWithMaxConcurrentMoversShared(3); + } + + private void testAcquireWithMaxConcurrentMoversShared( + int maxConcurrentMovers) + throws IOException, ReconfigurationException { + DataNode[] dns = null; + try { + dns = createDNsForTest(1); + testAcquireOnMaxConcurrentMoversReconfiguration(dns[0], + maxConcurrentMovers); + } catch (IOException ioe) { + throw ioe; + } catch (ReconfigurationException re) { + throw re; + } finally { + shutDownDNs(dns); + } + } + + private void shutDownDNs(DataNode[] dns) { + if (dns == null) { + return; + } + + for (int i = 0; i < dns.length; i++) { + try { + if (dns[i] == null) { + continue; + } + dns[i].shutdown(); + } catch (Exception e) { + LOG.error("Cannot close: ", e); + } + } + } + + private void testAcquireOnMaxConcurrentMoversReconfiguration( + DataNode dataNode, int maxConcurrentMovers) throws IOException, + ReconfigurationException { + int defaultMaxThreads = dataNode.getConf().getInt( + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + for (int i = 0; i < defaultMaxThreads; i++) { + assertEquals("should be able to get thread quota", true, + dataNode.xserver.balanceThrottler.acquire()); + } + + assertEquals("should not be able to get thread quota", false, + dataNode.xserver.balanceThrottler.acquire()); + + // change properties + dataNode.reconfigureProperty( + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + String.valueOf(maxConcurrentMovers)); + + assertEquals("thread quota is wrong", maxConcurrentMovers, + dataNode.xserver.balanceThrottler.getMaxConcurrentMovers()); // thread quota + + int val = Math.abs(maxConcurrentMovers - defaultMaxThreads); + if (defaultMaxThreads < maxConcurrentMovers) { + for (int i = 0; i < val; i++) { + assertEquals("should be able to get thread quota", true, + dataNode.xserver.balanceThrottler.acquire()); + } + } else if (defaultMaxThreads > maxConcurrentMovers) { + for (int i = 0; i < val; i++) { + assertEquals("should not be able to get thread quota", false, + dataNode.xserver.balanceThrottler.acquire()); + } + } + + assertEquals("should not be able to get thread quota", false, + dataNode.xserver.balanceThrottler.acquire()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index a2b5638be5b..3a30ccf1b92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -207,7 +207,7 @@ public void testGetReconfigAllowedProperties() throws IOException { final String address = "localhost:" + port; List outputs = getReconfigurationAllowedProperties("datanode", address); - assertEquals(2, outputs.size()); + assertEquals(3, outputs.size()); assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outputs.get(1)); }