diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c3aec40d7da..ca06efd5e15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1763,6 +1763,9 @@ Release 2.8.0 - UNRELEASED HDFS-9801. ReconfigurableBase should update the cached configuration. (Arpit Agarwal) + HDFS-9456. BlockPlacementPolicyWithNodeGroup should override + verifyBlockPlacement(). (Xiaobing Zhou via junping_du) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java index ad1799e7977..a5193b3e552 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java @@ -391,4 +391,50 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau } return true; } + + + @Override + public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, + int numberOfReplicas) { + if (locs == null) { + locs = DatanodeDescriptor.EMPTY_ARRAY; + } + + List locList = new ArrayList(); + /* + * remove the part of node group for BlockPlacementPolicyDefault to count + * distinct racks, e.g. "/d1/r1/n1" --> "/d1/r1" + */ + for (int i = 0; i < locs.length; i++) { + locList.add(locs[i].getNetworkLocation()); + locs[i].setNetworkLocation(NetworkTopology.getFirstHalf(locs[i] + .getNetworkLocation())); + } + + BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(locs, + numberOfReplicas); + + // restore the part of node group back + for (int i = 0; i < locs.length; i++) { + locs[i].setNetworkLocation(locList.get(i)); + } + + int minNodeGroups = numberOfReplicas; + BlockPlacementStatusWithNodeGroup nodeGroupStatus = + new BlockPlacementStatusWithNodeGroup( + defaultStatus, getNodeGroupsFromNode(locs), minNodeGroups); + return nodeGroupStatus; + } + + private Set getNodeGroupsFromNode(DatanodeInfo[] nodes) { + Set nodeGroups = new HashSet<>(); + if (nodes == null) { + return nodeGroups; + } + + for (DatanodeInfo node : nodes) { + nodeGroups.add(NetworkTopology.getLastHalf(node.getNetworkLocation())); + } + return nodeGroups; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java new file mode 100644 index 00000000000..b98b3dac362 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * An implementation of @see BlockPlacementStatus for + * @see BlockPlacementPolicyWithNodeGroup + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockPlacementStatusWithNodeGroup implements BlockPlacementStatus { + + private final BlockPlacementStatus parentBlockPlacementStatus; + private final Set currentNodeGroups; + private final int requiredNodeGroups; + + /** + * @param parentBlockPlacementStatus the parent class' status + * @param currentNodeGroups the current set of node groups of the replicas + * @param requiredNodeGroups the number of required node groups + */ + public BlockPlacementStatusWithNodeGroup( + BlockPlacementStatus parentBlockPlacementStatus, + Set currentNodeGroups, int requiredNodeGroups) { + this.parentBlockPlacementStatus = parentBlockPlacementStatus; + this.currentNodeGroups = currentNodeGroups; + this.requiredNodeGroups = requiredNodeGroups; + } + + @Override + public boolean isPlacementPolicySatisfied() { + return parentBlockPlacementStatus.isPlacementPolicySatisfied() + && isNodeGroupPolicySatisfied(); + } + + private boolean isNodeGroupPolicySatisfied() { + return requiredNodeGroups <= currentNodeGroups.size(); + } + + @Override + public String getErrorDescription() { + if (isPlacementPolicySatisfied()) { + return null; + } + + StringBuilder errorDescription = new StringBuilder(); + if (!parentBlockPlacementStatus.isPlacementPolicySatisfied()) { + errorDescription.append(parentBlockPlacementStatus.getErrorDescription()); + } + + if (!isNodeGroupPolicySatisfied()) { + if (errorDescription.length() != 0) { + errorDescription.append(" "); + } + errorDescription.append("The block has " + requiredNodeGroups + + " replicas. But it only has " + currentNodeGroups.size() + + " node groups " + currentNodeGroups + "."); + } + return errorDescription.toString(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index 7a00a3b46b6..edcab1000c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -34,6 +34,9 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.TestBlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopologyWithNodeGroup; @@ -129,6 +132,103 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes }; + /** + * Test block placement verification. + * @throws Exception + */ + @Test + public void testVerifyBlockPlacement() throws Exception { + LocatedBlock locatedBlock; + BlockPlacementStatus status; + ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L)); + List set = new ArrayList<>(); + + // 2 node groups (not enough), 2 racks (enough) + set.clear(); + set.add(storages[0]); + set.add(storages[1]); + set.add(storages[4]); + locatedBlock = BlockManager.newLocatedBlock(b, + set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); + status = replicator.verifyBlockPlacement(locatedBlock.getLocations(), + set.size()); + assertFalse(status.isPlacementPolicySatisfied()); + + // 3 node groups (enough), 2 racks (enough) + set.clear(); + set.add(storages[0]); + set.add(storages[2]); + set.add(storages[5]); + locatedBlock = BlockManager.newLocatedBlock(b, + set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); + status = replicator.verifyBlockPlacement(locatedBlock.getLocations(), + set.size()); + assertTrue(status.isPlacementPolicySatisfied()); + + // 2 node groups (not enough), 1 rack (not enough) + set.clear(); + set.add(storages[0]); + set.add(storages[1]); + set.add(storages[2]); + locatedBlock = BlockManager.newLocatedBlock(b, + set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); + status = replicator.verifyBlockPlacement(locatedBlock.getLocations(), + set.size()); + assertFalse(status.isPlacementPolicySatisfied()); + assertTrue(status.getErrorDescription().contains("node group")); + assertTrue(status.getErrorDescription().contains("more rack(s)")); + + // 3 node groups (enough), 3 racks (enough) + set.clear(); + set.add(storages[0]); + set.add(storages[5]); + set.add(storages[7]); + locatedBlock = BlockManager.newLocatedBlock(b, + set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); + status = replicator.verifyBlockPlacement(locatedBlock.getLocations(), + set.size()); + assertTrue(status.isPlacementPolicySatisfied()); + + // 3 node groups (not enough), 3 racks (enough), 4 replicas + set.clear(); + set.add(storages[0]); + set.add(storages[1]); + set.add(storages[5]); + set.add(storages[7]); + locatedBlock = BlockManager.newLocatedBlock(b, + set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); + status = replicator.verifyBlockPlacement(locatedBlock.getLocations(), + set.size()); + assertFalse(status.isPlacementPolicySatisfied()); + assertTrue(status.getErrorDescription().contains("node group")); + assertFalse(status.getErrorDescription().contains("more rack(s)")); + + // 2 node groups (not enough), 1 rack (not enough) + set.clear(); + set.add(storages[0]); + set.add(storages[1]); + set.add(storages[2]); + locatedBlock = BlockManager.newLocatedBlock(b, + set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); + status = replicator.verifyBlockPlacement(locatedBlock.getLocations(), + set.size()); + assertFalse(status.isPlacementPolicySatisfied()); + assertTrue(status.getErrorDescription().contains("node group")); + assertTrue(status.getErrorDescription().contains("more rack(s)")); + + // 1 node group (not enough), 1 rack (not enough) + set.clear(); + set.add(storages[0]); + set.add(storages[1]); + locatedBlock = BlockManager.newLocatedBlock(b, + set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); + status = replicator.verifyBlockPlacement(locatedBlock.getLocations(), + set.size()); + assertFalse(status.isPlacementPolicySatisfied()); + assertTrue(status.getErrorDescription().contains("node group")); + assertTrue(status.getErrorDescription().contains("more rack(s)")); + } + /** * Scan the targets list: all targets should be on different NodeGroups. * Return false if two targets are found on the same NodeGroup.