diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index ff972eb30ed..b00430974a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -340,7 +340,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } // No calculation needed when there is only one rack or picking one node. int numOfRacks = clusterMap.getNumOfRacks(); - if (numOfRacks == 1 || totalNumOfReplicas <= 1) { + // HDFS-14527 return default when numOfRacks = 0 to avoid + // ArithmeticException when calc maxNodesPerRack at following logic. + if (numOfRacks <= 1 || totalNumOfReplicas <= 1) { return new int[] {numOfReplicas, totalNumOfReplicas}; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java index 95c5c880a9a..b204450491a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java @@ -43,7 +43,9 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD } // No calculation needed when there is only one rack or picking one node. int numOfRacks = clusterMap.getNumOfRacks(); - if (numOfRacks == 1 || totalNumOfReplicas <= 1) { + // HDFS-14527 return default when numOfRacks = 0 to avoid + // ArithmeticException when calc maxNodesPerRack at following logic. + if (numOfRacks <= 1 || totalNumOfReplicas <= 1) { return new int[] {numOfReplicas, totalNumOfReplicas}; } // If more racks than replicas, put one replica per rack. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRedundancyMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRedundancyMonitor.java new file mode 100644 index 00000000000..0667e2611b4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRedundancyMonitor.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.TestBlockStoragePolicy; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.apache.hadoop.fs.contract.hdfs.HDFSContract.BLOCK_SIZE; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +/** + * This class tests RedundancyMonitor in BlockManager. + */ +public class TestRedundancyMonitor { + private static final String FILENAME = "/dummyfile.txt"; + + /** + * RedundancyMonitor invoke choose target out of global lock when + * #computeDatanodeWork. However it may result in NN terminate when choose + * target meet runtime exception(ArithmeticException) since we stop all + * DataNodes during that time. + * Verify that NN should not terminate even stop all datanodes. + */ + @Test + public void testChooseTargetWhenAllDataNodesStop() throws Throwable { + + HdfsConfiguration conf = new HdfsConfiguration(); + String[] hosts = new String[]{"host1", "host2"}; + String[] racks = new String[]{"/d1/r1", "/d1/r1"}; + try (MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(conf) + .racks(racks).hosts(hosts).numDataNodes(hosts.length).build()) { + miniCluster.waitActive(); + + FSNamesystem fsn = miniCluster.getNamesystem(); + BlockManager blockManager = fsn.getBlockManager(); + + BlockPlacementPolicyDefault replicator + = (BlockPlacementPolicyDefault) blockManager + .getBlockPlacementPolicy(); + Set dns = blockManager.getDatanodeManager() + .getDatanodes(); + + DelayAnswer delayer = new DelayAnswer(BlockPlacementPolicyDefault.LOG); + NetworkTopology clusterMap = replicator.clusterMap; + NetworkTopology spyClusterMap = spy(clusterMap); + replicator.clusterMap = spyClusterMap; + doAnswer(delayer).when(spyClusterMap).getNumOfRacks(); + + ExecutorService pool = Executors.newFixedThreadPool(2); + + // Trigger chooseTarget + Future chooseTargetFuture = pool.submit(() -> { + replicator.chooseTarget(FILENAME, 2, dns.iterator().next(), + new ArrayList(), false, null, BLOCK_SIZE, + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); + return null; + }); + + // Wait until chooseTarget calls NetworkTopology#getNumOfRacks + delayer.waitForCall(); + // Remove all DataNodes + Future stopDatanodesFuture = pool.submit(() -> { + for (DatanodeDescriptor dn : dns) { + spyClusterMap.remove(dn); + } + return null; + }); + // Wait stopDatanodesFuture run finish + stopDatanodesFuture.get(); + + // Allow chooseTarget to proceed + delayer.proceed(); + try { + chooseTargetFuture.get(); + } catch (ExecutionException ee) { + throw ee.getCause(); + } + } + } +}