HDFS-9456. BlockPlacementPolicyWithNodeGroup should override verifyBlockPlacement(). Contributed by Xiaobing Zhou.

(cherry picked from commit 77ba5add0d)
This commit is contained in:
Junping Du 2016-02-16 18:55:55 -08:00
parent e79a5e4460
commit 6874a142d8
4 changed files with 230 additions and 0 deletions

View File

@ -1847,6 +1847,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9801. ReconfigurableBase should update the cached configuration.
(Arpit Agarwal)
HDFS-9456. BlockPlacementPolicyWithNodeGroup should override
verifyBlockPlacement(). (Xiaobing Zhou via junping_du)
Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -391,4 +391,50 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
}
return true;
}
@Override
public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
int numberOfReplicas) {
if (locs == null) {
locs = DatanodeDescriptor.EMPTY_ARRAY;
}
List<String> locList = new ArrayList<String>();
/*
* remove the part of node group for BlockPlacementPolicyDefault to count
* distinct racks, e.g. "/d1/r1/n1" --> "/d1/r1"
*/
for (int i = 0; i < locs.length; i++) {
locList.add(locs[i].getNetworkLocation());
locs[i].setNetworkLocation(NetworkTopology.getFirstHalf(locs[i]
.getNetworkLocation()));
}
BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(locs,
numberOfReplicas);
// restore the part of node group back
for (int i = 0; i < locs.length; i++) {
locs[i].setNetworkLocation(locList.get(i));
}
int minNodeGroups = numberOfReplicas;
BlockPlacementStatusWithNodeGroup nodeGroupStatus =
new BlockPlacementStatusWithNodeGroup(
defaultStatus, getNodeGroupsFromNode(locs), minNodeGroups);
return nodeGroupStatus;
}
private Set<String> getNodeGroupsFromNode(DatanodeInfo[] nodes) {
Set<String> nodeGroups = new HashSet<>();
if (nodes == null) {
return nodeGroups;
}
for (DatanodeInfo node : nodes) {
nodeGroups.add(NetworkTopology.getLastHalf(node.getNetworkLocation()));
}
return nodeGroups;
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* An implementation of @see BlockPlacementStatus for
* @see BlockPlacementPolicyWithNodeGroup
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockPlacementStatusWithNodeGroup implements BlockPlacementStatus {
private final BlockPlacementStatus parentBlockPlacementStatus;
private final Set<String> currentNodeGroups;
private final int requiredNodeGroups;
/**
* @param parentBlockPlacementStatus the parent class' status
* @param currentNodeGroups the current set of node groups of the replicas
* @param requiredNodeGroups the number of required node groups
*/
public BlockPlacementStatusWithNodeGroup(
BlockPlacementStatus parentBlockPlacementStatus,
Set<String> currentNodeGroups, int requiredNodeGroups) {
this.parentBlockPlacementStatus = parentBlockPlacementStatus;
this.currentNodeGroups = currentNodeGroups;
this.requiredNodeGroups = requiredNodeGroups;
}
@Override
public boolean isPlacementPolicySatisfied() {
return parentBlockPlacementStatus.isPlacementPolicySatisfied()
&& isNodeGroupPolicySatisfied();
}
private boolean isNodeGroupPolicySatisfied() {
return requiredNodeGroups <= currentNodeGroups.size();
}
@Override
public String getErrorDescription() {
if (isPlacementPolicySatisfied()) {
return null;
}
StringBuilder errorDescription = new StringBuilder();
if (!parentBlockPlacementStatus.isPlacementPolicySatisfied()) {
errorDescription.append(parentBlockPlacementStatus.getErrorDescription());
}
if (!isNodeGroupPolicySatisfied()) {
if (errorDescription.length() != 0) {
errorDescription.append(" ");
}
errorDescription.append("The block has " + requiredNodeGroups
+ " replicas. But it only has " + currentNodeGroups.size()
+ " node groups " + currentNodeGroups + ".");
}
return errorDescription.toString();
}
}

View File

@ -34,6 +34,9 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
@ -129,6 +132,103 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
};
/**
* Test block placement verification.
* @throws Exception
*/
@Test
public void testVerifyBlockPlacement() throws Exception {
LocatedBlock locatedBlock;
BlockPlacementStatus status;
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
List<DatanodeStorageInfo> set = new ArrayList<>();
// 2 node groups (not enough), 2 racks (enough)
set.clear();
set.add(storages[0]);
set.add(storages[1]);
set.add(storages[4]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
set.size());
assertFalse(status.isPlacementPolicySatisfied());
// 3 node groups (enough), 2 racks (enough)
set.clear();
set.add(storages[0]);
set.add(storages[2]);
set.add(storages[5]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
set.size());
assertTrue(status.isPlacementPolicySatisfied());
// 2 node groups (not enough), 1 rack (not enough)
set.clear();
set.add(storages[0]);
set.add(storages[1]);
set.add(storages[2]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
set.size());
assertFalse(status.isPlacementPolicySatisfied());
assertTrue(status.getErrorDescription().contains("node group"));
assertTrue(status.getErrorDescription().contains("more rack(s)"));
// 3 node groups (enough), 3 racks (enough)
set.clear();
set.add(storages[0]);
set.add(storages[5]);
set.add(storages[7]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
set.size());
assertTrue(status.isPlacementPolicySatisfied());
// 3 node groups (not enough), 3 racks (enough), 4 replicas
set.clear();
set.add(storages[0]);
set.add(storages[1]);
set.add(storages[5]);
set.add(storages[7]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
set.size());
assertFalse(status.isPlacementPolicySatisfied());
assertTrue(status.getErrorDescription().contains("node group"));
assertFalse(status.getErrorDescription().contains("more rack(s)"));
// 2 node groups (not enough), 1 rack (not enough)
set.clear();
set.add(storages[0]);
set.add(storages[1]);
set.add(storages[2]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
set.size());
assertFalse(status.isPlacementPolicySatisfied());
assertTrue(status.getErrorDescription().contains("node group"));
assertTrue(status.getErrorDescription().contains("more rack(s)"));
// 1 node group (not enough), 1 rack (not enough)
set.clear();
set.add(storages[0]);
set.add(storages[1]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
set.size());
assertFalse(status.isPlacementPolicySatisfied());
assertTrue(status.getErrorDescription().contains("node group"));
assertTrue(status.getErrorDescription().contains("more rack(s)"));
}
/**
* Scan the targets list: all targets should be on different NodeGroups.
* Return false if two targets are found on the same NodeGroup.