From 34f9ceab4a53007bba485b51fbd909dae5198148 Mon Sep 17 00:00:00 2001 From: Ming Ma Date: Fri, 27 Jan 2017 16:16:42 -0800 Subject: [PATCH] HDFS-11378. Verify multiple DataNodes can be decommissioned/maintenance at the same time. (Manoj Govindassamy via mingma) (cherry picked from commit 312b36d113d83640b92c62fdd91ede74bd04c00f) --- .../hadoop/hdfs/AdminStatesBaseTest.java | 151 +++++++++++++----- .../apache/hadoop/hdfs/TestDecommission.java | 43 +++++ .../hadoop/hdfs/TestMaintenanceState.java | 36 +++++ 3 files changed, 186 insertions(+), 44 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java index 534c5e0c8f7..c4ccc676519 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java @@ -22,11 +22,13 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -149,10 +151,18 @@ static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name, } } - /* - * decommission the DN or put the DN into maintenance for datanodeUuid or one - * random node if datanodeUuid is null. - * And wait for the node to reach the given {@code waitForState}. + /** + * Decommission or perform Maintenance for DataNodes and wait for them to + * reach the expected state. + * + * @param nnIndex NameNode index + * @param datanodeUuid DataNode to decommission/maintenance, or a random + * DataNode if null + * @param maintenanceExpirationInMS Maintenance expiration time + * @param decommissionedNodes List of DataNodes already decommissioned + * @param waitForState Await for this state for datanodeUuid DataNode + * @return DatanodeInfo DataNode taken out of service + * @throws IOException */ protected DatanodeInfo takeNodeOutofService(int nnIndex, String datanodeUuid, long maintenanceExpirationInMS, @@ -162,48 +172,91 @@ protected DatanodeInfo takeNodeOutofService(int nnIndex, maintenanceExpirationInMS, decommissionedNodes, null, waitForState); } - /* - * decommission the DN or put the DN to maintenance set by datanodeUuid - * Pick randome node if datanodeUuid == null - * wait for the node to reach the given {@code waitForState}. + /** + * Decommission or perform Maintenance for DataNodes and wait for them to + * reach the expected state. + * + * @param nnIndex NameNode index + * @param datanodeUuid DataNode to decommission/maintenance, or a random + * DataNode if null + * @param maintenanceExpirationInMS Maintenance expiration time + * @param decommissionedNodes List of DataNodes already decommissioned + * @param inMaintenanceNodes Map of DataNodes already entering/in maintenance + * @param waitForState Await for this state for datanodeUuid DataNode + * @return DatanodeInfo DataNode taken out of service + * @throws IOException */ protected DatanodeInfo takeNodeOutofService(int nnIndex, String datanodeUuid, long maintenanceExpirationInMS, List decommissionedNodes, Map inMaintenanceNodes, AdminStates waitForState) throws IOException { + return takeNodeOutofService(nnIndex, (datanodeUuid != null ? + Lists.newArrayList(datanodeUuid) : null), + maintenanceExpirationInMS, decommissionedNodes, inMaintenanceNodes, + waitForState).get(0); + } + + /** + * Decommission or perform Maintenance for DataNodes and wait for them to + * reach the expected state. + * + * @param nnIndex NameNode index + * @param dataNodeUuids DataNodes to decommission/maintenance, or a random + * DataNode if null + * @param maintenanceExpirationInMS Maintenance expiration time + * @param decommissionedNodes List of DataNodes already decommissioned + * @param inMaintenanceNodes Map of DataNodes already entering/in maintenance + * @param waitForState Await for this state for datanodeUuid DataNode + * @return DatanodeInfo DataNode taken out of service + * @throws IOException + */ + protected List takeNodeOutofService(int nnIndex, + List dataNodeUuids, long maintenanceExpirationInMS, + List decommissionedNodes, + Map inMaintenanceNodes, AdminStates waitForState) + throws IOException { DFSClient client = getDfsClient(nnIndex); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.ALL); boolean isDecommissionRequest = waitForState == AdminStates.DECOMMISSION_INPROGRESS || - waitForState == AdminStates.DECOMMISSIONED; + waitForState == AdminStates.DECOMMISSIONED; - // - // pick one datanode randomly unless the caller specifies one. - // - int index = 0; - if (datanodeUuid == null) { + List dataNodeNames = new ArrayList<>(); + List datanodeInfos = new ArrayList<>(); + // pick one DataNode randomly unless the caller specifies one. + if (dataNodeUuids == null) { boolean found = false; while (!found) { - index = myrand.nextInt(info.length); + int index = myrand.nextInt(info.length); if ((isDecommissionRequest && !info[index].isDecommissioned()) || (!isDecommissionRequest && !info[index].isInMaintenance())) { + dataNodeNames.add(info[index].getXferAddr()); + datanodeInfos.add(NameNodeAdapter.getDatanode( + cluster.getNamesystem(nnIndex), info[index])); found = true; } } } else { - // The caller specifies a DN - for (; index < info.length; index++) { - if (info[index].getDatanodeUuid().equals(datanodeUuid)) { - break; + // The caller specified a DataNode + for (String datanodeUuid : dataNodeUuids) { + boolean found = false; + for (int index = 0; index < info.length; index++) { + if (info[index].getDatanodeUuid().equals(datanodeUuid)) { + dataNodeNames.add(info[index].getXferAddr()); + datanodeInfos.add(NameNodeAdapter.getDatanode( + cluster.getNamesystem(nnIndex), info[index])); + found = true; + break; + } + } + if (!found) { + throw new IOException("invalid datanodeUuid " + datanodeUuid); } } - if (index == info.length) { - throw new IOException("invalid datanodeUuid " + datanodeUuid); - } } - String nodename = info[index].getXferAddr(); - LOG.info("Taking node: " + nodename + " out of service"); + LOG.info("Taking node: " + Arrays.toString(dataNodeNames.toArray()) + + " out of service"); ArrayList decommissionNodes = new ArrayList(); if (decommissionedNodes != null) { @@ -220,18 +273,20 @@ protected DatanodeInfo takeNodeOutofService(int nnIndex, } if (isDecommissionRequest) { - decommissionNodes.add(nodename); + for (String dataNodeName : dataNodeNames) { + decommissionNodes.add(dataNodeName); + } } else { - maintenanceNodes.put(nodename, maintenanceExpirationInMS); + for (String dataNodeName : dataNodeNames) { + maintenanceNodes.put(dataNodeName, maintenanceExpirationInMS); + } } // write node names into the json host file. hostsFileWriter.initOutOfServiceHosts(decommissionNodes, maintenanceNodes); refreshNodes(nnIndex); - DatanodeInfo ret = NameNodeAdapter.getDatanode( - cluster.getNamesystem(nnIndex), info[index]); - waitNodeState(ret, waitForState); - return ret; + waitNodeState(datanodeInfos, waitForState); + return datanodeInfos; } /* Ask a specific NN to put the datanode in service and wait for it @@ -270,23 +325,31 @@ protected void putNodeInService(int nnIndex, putNodeInService(nnIndex, datanodeInfo); } - /* - * Wait till node is transitioned to the expected state. + /** + * Wait till DataNode is transitioned to the expected state. */ - protected void waitNodeState(DatanodeInfo node, - AdminStates state) { - boolean done = state == node.getAdminState(); - while (!done) { - LOG.info("Waiting for node " + node + " to change state to " - + state + " current state: " + node.getAdminState()); - try { - Thread.sleep(HEARTBEAT_INTERVAL * 500); - } catch (InterruptedException e) { - // nothing + protected void waitNodeState(DatanodeInfo node, AdminStates state) { + waitNodeState(Lists.newArrayList(node), state); + } + + /** + * Wait till all DataNodes are transitioned to the expected state. + */ + protected void waitNodeState(List nodes, AdminStates state) { + for (DatanodeInfo node : nodes) { + boolean done = (state == node.getAdminState()); + while (!done) { + LOG.info("Waiting for node " + node + " to change state to " + + state + " current state: " + node.getAdminState()); + try { + Thread.sleep(HEARTBEAT_INTERVAL * 500); + } catch (InterruptedException e) { + // nothing + } + done = (state == node.getAdminState()); } - done = state == node.getAdminState(); + LOG.info("node " + node + " reached the state " + state); } - LOG.info("node " + node + " reached the state " + state); } protected void initIncludeHost(String hostNameAndPort) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index cb815b198f9..bdd39d2e06a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -1109,4 +1109,47 @@ public void testUsedCapacity() throws Exception { assertTrue("BlockPoolUsed should not be the same after a node has " + "been decommissioned!",initialBlockPoolUsed != newBlockPoolUsed); } + + /** + * Verify if multiple DataNodes can be decommission at the same time. + */ + @Test(timeout = 360000) + public void testMultipleNodesDecommission() throws Exception { + startCluster(1, 5); + final Path file = new Path("/testMultipleNodesDecommission.dat"); + final FileSystem fileSys = getCluster().getFileSystem(0); + final FSNamesystem ns = getCluster().getNamesystem(0); + + int repl = 3; + writeFile(fileSys, file, repl, 1); + // Request Decommission for DataNodes 1 and 2. + List decomDataNodes = takeNodeOutofService(0, + Lists.newArrayList(getCluster().getDataNodes().get(0).getDatanodeUuid(), + getCluster().getDataNodes().get(1).getDatanodeUuid()), + Long.MAX_VALUE, null, null, AdminStates.DECOMMISSIONED); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + String errMsg = checkFile(fileSys, file, repl, + decomDataNodes.get(0).getXferAddr(), 5); + if (errMsg != null) { + LOG.warn("Check file: " + errMsg); + } + return true; + } catch (IOException e) { + LOG.warn("Check file: " + e); + return false; + } + } + }, 500, 30000); + + // Put the decommissioned nodes back in service. + for (DatanodeInfo datanodeInfo : decomDataNodes) { + putNodeInService(0, datanodeInfo); + } + + cleanupFile(fileSys, file); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java index 9cc130b5b11..bbf947fc148 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; +import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -520,6 +521,41 @@ private void testDecommissionDifferentNodeAfterMaintenance(int repl) cleanupFile(fileSys, file); } + /** + * Verify if multiple DataNodes can transition to maintenance state + * at the same time. + */ + @Test(timeout = 360000) + public void testMultipleNodesMaintenance() throws Exception { + startCluster(1, 5); + final Path file = new Path("/testMultipleNodesMaintenance.dat"); + final FileSystem fileSys = getCluster().getFileSystem(0); + final FSNamesystem ns = getCluster().getNamesystem(0); + + int repl = 3; + writeFile(fileSys, file, repl, 1); + final DatanodeInfo[] nodes = getFirstBlockReplicasDatanodeInfos(fileSys, + file); + + // Request maintenance for DataNodes 1 and 2 which has the file blocks. + List maintenanceDN = takeNodeOutofService(0, + Lists.newArrayList(nodes[0].getDatanodeUuid(), + nodes[1].getDatanodeUuid()), Long.MAX_VALUE, null, null, + AdminStates.IN_MAINTENANCE); + + // Verify file replication matches maintenance state min replication + assertNull(checkWithRetry(ns, fileSys, file, 1, null, nodes[0])); + + // Put the maintenance nodes back in service + for (DatanodeInfo datanodeInfo : maintenanceDN) { + putNodeInService(0, datanodeInfo); + } + + // Verify file replication catching up to the old state + assertNull(checkWithRetry(ns, fileSys, file, repl, null)); + + cleanupFile(fileSys, file); + } @Test(timeout = 360000) public void testChangeReplicationFactors() throws IOException {