diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index c340e9a00d7..a70c7b9c00e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1054,7 +1054,7 @@ public class BlockManager implements BlockStatsMXBean { return minReplicationToBeInMaintenance; } - private short getMinMaintenanceStorageNum(BlockInfo block) { + short getMinMaintenanceStorageNum(BlockInfo block) { if (block.isStriped()) { return ((BlockInfoStriped) block).getRealDataBlockNum(); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java index 2ccc1eb2537..421d15f04d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java @@ -349,8 +349,7 @@ public class DatanodeAdminManager { } } } - if (isMaintenance - && numLive >= blockManager.getMinReplicationToBeInMaintenance()) { + if (isMaintenance && numLive >= blockManager.getMinMaintenanceStorageNum(block)) { return true; } return false; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceWithStriped.java new file mode 100644 index 00000000000..2e17b9681b7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceWithStriped.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.INodeFile; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.util.HostsFileWriter; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class tests the in maintenance of datanode with striped blocks. + */ +public class TestMaintenanceWithStriped { + private static final Logger LOG = + LoggerFactory.getLogger(TestMaintenanceWithStriped.class); + + // heartbeat interval in seconds + private static final int HEARTBEAT_INTERVAL = 1; + // block report in msec + private static final int BLOCKREPORT_INTERVAL_MSEC = 1000; + // replication interval + private static final int NAMENODE_REPLICATION_INTERVAL = 1; + + private Configuration conf; + private MiniDFSCluster cluster; + private DistributedFileSystem dfs; + private final ErasureCodingPolicy ecPolicy = + StripedFileTestUtil.getDefaultECPolicy(); + private int numDNs; + private final int cellSize = ecPolicy.getCellSize(); + private final int dataBlocks = ecPolicy.getNumDataUnits(); + private final int parityBlocks = ecPolicy.getNumParityUnits(); + private final int blockSize = cellSize * 4; + private final int blockGroupSize = blockSize * dataBlocks; + private final Path ecDir = new Path("/" + this.getClass().getSimpleName()); + private HostsFileWriter hostsFileWriter; + private boolean useCombinedHostFileManager = true; + + private FSNamesystem fsn; + private BlockManager bm; + + protected Configuration createConfiguration() { + return new HdfsConfiguration(); + } + + @Before + public void setup() throws IOException { + // Set up the hosts/exclude files. + hostsFileWriter = new HostsFileWriter(); + conf = createConfiguration(); + if (useCombinedHostFileManager) { + conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY, + CombinedHostFileManager.class, HostConfigManager.class); + } + hostsFileWriter.initialize(conf, "temp/admin"); + + + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 2000); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + BLOCKREPORT_INTERVAL_MSEC); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, + 4); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, + NAMENODE_REPLICATION_INTERVAL); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY, + cellSize - 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, + false); + + numDNs = dataBlocks + parityBlocks + 5; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.waitActive(); + dfs = cluster.getFileSystem(0); + fsn = cluster.getNamesystem(); + bm = fsn.getBlockManager(); + + dfs.enableErasureCodingPolicy( + StripedFileTestUtil.getDefaultECPolicy().getName()); + dfs.mkdirs(ecDir); + dfs.setErasureCodingPolicy(ecDir, + StripedFileTestUtil.getDefaultECPolicy().getName()); + } + + @After + public void teardown() throws IOException { + hostsFileWriter.cleanup(); + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + /** + * test DN maintenance with striped blocks. + * @throws Exception + */ + @Test(timeout = 120000) + public void testInMaintenance() throws Exception { + //1. create EC file + // d0 d1 d2 d3 d4 d5 d6 d7 d8 + final Path ecFile = new Path(ecDir, "testInMaintenance"); + int writeBytes = cellSize * dataBlocks; + writeStripedFile(dfs, ecFile, writeBytes); + Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks()); + FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes); + + final INodeFile fileNode = cluster.getNamesystem().getFSDirectory() + .getINode4Write(ecFile.toString()).asFile(); + BlockInfo firstBlock = fileNode.getBlocks()[0]; + DatanodeStorageInfo[] dnStorageInfos = bm.getStorages(firstBlock); + + //2. maintenance node + // d4 d5 d6 d7 d8 + int maintenanceDNIndex = 4; + int numMaintenance= 5; + List maintenanceNodes = new ArrayList<>(); + + for (int i = maintenanceDNIndex; i < numMaintenance + maintenanceDNIndex; ++i) { + maintenanceNodes.add(dnStorageInfos[i].getDatanodeDescriptor()); + } + + maintenanceNode(0, maintenanceNodes, AdminStates.IN_MAINTENANCE, Long.MAX_VALUE); + + //3. wait for maintenance block to replicate + GenericTestUtils.waitFor( + () -> maintenanceNodes.size() == fsn.getNumInMaintenanceLiveDataNodes(), + 100, 60000); + + //4. check DN status, it should be reconstructed again + LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations( + ecFile.toString(), 0, writeBytes); + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + + BlockInfoStriped blockInfo = + (BlockInfoStriped)bm.getStoredBlock( + new Block(bg.getBlock().getBlockId())); + + // So far, there are 11 total internal blocks, 6 live (d0 d1 d2 d3 d4' d5') + // and 5 in maintenance (d4 d5 d6 d7 d8) internal blocks. + + assertEquals(6, bm.countNodes(blockInfo).liveReplicas()); + assertEquals(5, bm.countNodes(blockInfo).maintenanceNotForReadReplicas()); + + FileChecksum fileChecksum2 = dfs.getFileChecksum(ecFile, writeBytes); + Assert.assertEquals("Checksum mismatches!", fileChecksum1, fileChecksum2); + } + + + /* Get DFSClient to the namenode */ + private static DFSClient getDfsClient(NameNode nn, Configuration conf) + throws IOException { + return new DFSClient(nn.getNameNodeAddress(), conf); + } + + private byte[] writeStripedFile(DistributedFileSystem fs, Path ecFile, + int writeBytes) throws Exception { + byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes); + DFSTestUtil.writeFile(fs, ecFile, new String(bytes)); + StripedFileTestUtil.waitBlockGroupsReported(fs, ecFile.toString()); + + StripedFileTestUtil.checkData(fs, ecFile, writeBytes, + new ArrayList(), null, blockGroupSize); + return bytes; + } + + /* + * maintenance the DN at index dnIndex or one random node if dnIndex is set + * to -1 and wait for the node to reach the given {@code waitForState}. + */ + private void maintenanceNode(int nnIndex, List maintenancedNodes, + AdminStates waitForState, long maintenanceExpirationInMS) + throws IOException, TimeoutException, InterruptedException { + DFSClient client = getDfsClient(cluster.getNameNode(nnIndex), conf); + DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); + + // write nodename into the exclude file. + Map maintenanceNodes = new HashMap<>(); + + for (DatanodeInfo dn : maintenancedNodes) { + boolean nodeExists = false; + for (DatanodeInfo dninfo : info) { + if (dninfo.getDatanodeUuid().equals(dn.getDatanodeUuid())) { + nodeExists = true; + break; + } + } + assertTrue("Datanode: " + dn + " is not LIVE", nodeExists); + maintenanceNodes.put(dn.getName(), maintenanceExpirationInMS); + LOG.info("Maintenance node: " + dn.getName()); + } + // write node names into the json host file. + hostsFileWriter.initOutOfServiceHosts(null, maintenanceNodes); + + refreshNodes(cluster.getNamesystem(nnIndex), conf); + for (DatanodeInfo dn : maintenancedNodes) { + DatanodeInfo ret = NameNodeAdapter + .getDatanode(cluster.getNamesystem(nnIndex), dn); + LOG.info("Waiting for node " + ret + " to change state to " + waitForState + + " current state: " + ret.getAdminState()); + GenericTestUtils.waitFor( + () -> ret.getAdminState() == waitForState, + 100, 60000); + LOG.info("node " + ret + " reached the state " + waitForState); + } + } + + private static void refreshNodes(final FSNamesystem ns, + final Configuration conf) throws IOException { + ns.getBlockManager().getDatanodeManager().refreshNodes(conf); + } + +}