HDFS-9456. BlockPlacementPolicyWithNodeGroup should override verifyBlockPlacement(). Contributed by Xiaobing Zhou.
(cherry picked from commit77ba5add0d
) (cherry picked from commit6874a142d8
)
This commit is contained in:
parent
8d6fb15b03
commit
c7be3deff7
|
@ -1763,6 +1763,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-9801. ReconfigurableBase should update the cached configuration.
|
HDFS-9801. ReconfigurableBase should update the cached configuration.
|
||||||
(Arpit Agarwal)
|
(Arpit Agarwal)
|
||||||
|
|
||||||
|
HDFS-9456. BlockPlacementPolicyWithNodeGroup should override
|
||||||
|
verifyBlockPlacement(). (Xiaobing Zhou via junping_du)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -391,4 +391,50 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
|
||||||
|
int numberOfReplicas) {
|
||||||
|
if (locs == null) {
|
||||||
|
locs = DatanodeDescriptor.EMPTY_ARRAY;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<String> locList = new ArrayList<String>();
|
||||||
|
/*
|
||||||
|
* remove the part of node group for BlockPlacementPolicyDefault to count
|
||||||
|
* distinct racks, e.g. "/d1/r1/n1" --> "/d1/r1"
|
||||||
|
*/
|
||||||
|
for (int i = 0; i < locs.length; i++) {
|
||||||
|
locList.add(locs[i].getNetworkLocation());
|
||||||
|
locs[i].setNetworkLocation(NetworkTopology.getFirstHalf(locs[i]
|
||||||
|
.getNetworkLocation()));
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(locs,
|
||||||
|
numberOfReplicas);
|
||||||
|
|
||||||
|
// restore the part of node group back
|
||||||
|
for (int i = 0; i < locs.length; i++) {
|
||||||
|
locs[i].setNetworkLocation(locList.get(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
int minNodeGroups = numberOfReplicas;
|
||||||
|
BlockPlacementStatusWithNodeGroup nodeGroupStatus =
|
||||||
|
new BlockPlacementStatusWithNodeGroup(
|
||||||
|
defaultStatus, getNodeGroupsFromNode(locs), minNodeGroups);
|
||||||
|
return nodeGroupStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<String> getNodeGroupsFromNode(DatanodeInfo[] nodes) {
|
||||||
|
Set<String> nodeGroups = new HashSet<>();
|
||||||
|
if (nodes == null) {
|
||||||
|
return nodeGroups;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (DatanodeInfo node : nodes) {
|
||||||
|
nodeGroups.add(NetworkTopology.getLastHalf(node.getNetworkLocation()));
|
||||||
|
}
|
||||||
|
return nodeGroups;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,81 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An implementation of @see BlockPlacementStatus for
|
||||||
|
* @see BlockPlacementPolicyWithNodeGroup
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class BlockPlacementStatusWithNodeGroup implements BlockPlacementStatus {
|
||||||
|
|
||||||
|
private final BlockPlacementStatus parentBlockPlacementStatus;
|
||||||
|
private final Set<String> currentNodeGroups;
|
||||||
|
private final int requiredNodeGroups;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param parentBlockPlacementStatus the parent class' status
|
||||||
|
* @param currentNodeGroups the current set of node groups of the replicas
|
||||||
|
* @param requiredNodeGroups the number of required node groups
|
||||||
|
*/
|
||||||
|
public BlockPlacementStatusWithNodeGroup(
|
||||||
|
BlockPlacementStatus parentBlockPlacementStatus,
|
||||||
|
Set<String> currentNodeGroups, int requiredNodeGroups) {
|
||||||
|
this.parentBlockPlacementStatus = parentBlockPlacementStatus;
|
||||||
|
this.currentNodeGroups = currentNodeGroups;
|
||||||
|
this.requiredNodeGroups = requiredNodeGroups;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isPlacementPolicySatisfied() {
|
||||||
|
return parentBlockPlacementStatus.isPlacementPolicySatisfied()
|
||||||
|
&& isNodeGroupPolicySatisfied();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isNodeGroupPolicySatisfied() {
|
||||||
|
return requiredNodeGroups <= currentNodeGroups.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getErrorDescription() {
|
||||||
|
if (isPlacementPolicySatisfied()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
StringBuilder errorDescription = new StringBuilder();
|
||||||
|
if (!parentBlockPlacementStatus.isPlacementPolicySatisfied()) {
|
||||||
|
errorDescription.append(parentBlockPlacementStatus.getErrorDescription());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isNodeGroupPolicySatisfied()) {
|
||||||
|
if (errorDescription.length() != 0) {
|
||||||
|
errorDescription.append(" ");
|
||||||
|
}
|
||||||
|
errorDescription.append("The block has " + requiredNodeGroups
|
||||||
|
+ " replicas. But it only has " + currentNodeGroups.size()
|
||||||
|
+ " node groups " + currentNodeGroups + ".");
|
||||||
|
}
|
||||||
|
return errorDescription.toString();
|
||||||
|
}
|
||||||
|
}
|
|
@ -34,6 +34,9 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
|
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
|
||||||
|
@ -129,6 +132,103 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test block placement verification.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testVerifyBlockPlacement() throws Exception {
|
||||||
|
LocatedBlock locatedBlock;
|
||||||
|
BlockPlacementStatus status;
|
||||||
|
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
|
||||||
|
List<DatanodeStorageInfo> set = new ArrayList<>();
|
||||||
|
|
||||||
|
// 2 node groups (not enough), 2 racks (enough)
|
||||||
|
set.clear();
|
||||||
|
set.add(storages[0]);
|
||||||
|
set.add(storages[1]);
|
||||||
|
set.add(storages[4]);
|
||||||
|
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||||
|
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||||
|
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
|
||||||
|
set.size());
|
||||||
|
assertFalse(status.isPlacementPolicySatisfied());
|
||||||
|
|
||||||
|
// 3 node groups (enough), 2 racks (enough)
|
||||||
|
set.clear();
|
||||||
|
set.add(storages[0]);
|
||||||
|
set.add(storages[2]);
|
||||||
|
set.add(storages[5]);
|
||||||
|
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||||
|
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||||
|
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
|
||||||
|
set.size());
|
||||||
|
assertTrue(status.isPlacementPolicySatisfied());
|
||||||
|
|
||||||
|
// 2 node groups (not enough), 1 rack (not enough)
|
||||||
|
set.clear();
|
||||||
|
set.add(storages[0]);
|
||||||
|
set.add(storages[1]);
|
||||||
|
set.add(storages[2]);
|
||||||
|
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||||
|
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||||
|
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
|
||||||
|
set.size());
|
||||||
|
assertFalse(status.isPlacementPolicySatisfied());
|
||||||
|
assertTrue(status.getErrorDescription().contains("node group"));
|
||||||
|
assertTrue(status.getErrorDescription().contains("more rack(s)"));
|
||||||
|
|
||||||
|
// 3 node groups (enough), 3 racks (enough)
|
||||||
|
set.clear();
|
||||||
|
set.add(storages[0]);
|
||||||
|
set.add(storages[5]);
|
||||||
|
set.add(storages[7]);
|
||||||
|
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||||
|
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||||
|
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
|
||||||
|
set.size());
|
||||||
|
assertTrue(status.isPlacementPolicySatisfied());
|
||||||
|
|
||||||
|
// 3 node groups (not enough), 3 racks (enough), 4 replicas
|
||||||
|
set.clear();
|
||||||
|
set.add(storages[0]);
|
||||||
|
set.add(storages[1]);
|
||||||
|
set.add(storages[5]);
|
||||||
|
set.add(storages[7]);
|
||||||
|
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||||
|
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||||
|
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
|
||||||
|
set.size());
|
||||||
|
assertFalse(status.isPlacementPolicySatisfied());
|
||||||
|
assertTrue(status.getErrorDescription().contains("node group"));
|
||||||
|
assertFalse(status.getErrorDescription().contains("more rack(s)"));
|
||||||
|
|
||||||
|
// 2 node groups (not enough), 1 rack (not enough)
|
||||||
|
set.clear();
|
||||||
|
set.add(storages[0]);
|
||||||
|
set.add(storages[1]);
|
||||||
|
set.add(storages[2]);
|
||||||
|
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||||
|
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||||
|
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
|
||||||
|
set.size());
|
||||||
|
assertFalse(status.isPlacementPolicySatisfied());
|
||||||
|
assertTrue(status.getErrorDescription().contains("node group"));
|
||||||
|
assertTrue(status.getErrorDescription().contains("more rack(s)"));
|
||||||
|
|
||||||
|
// 1 node group (not enough), 1 rack (not enough)
|
||||||
|
set.clear();
|
||||||
|
set.add(storages[0]);
|
||||||
|
set.add(storages[1]);
|
||||||
|
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||||
|
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||||
|
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
|
||||||
|
set.size());
|
||||||
|
assertFalse(status.isPlacementPolicySatisfied());
|
||||||
|
assertTrue(status.getErrorDescription().contains("node group"));
|
||||||
|
assertTrue(status.getErrorDescription().contains("more rack(s)"));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scan the targets list: all targets should be on different NodeGroups.
|
* Scan the targets list: all targets should be on different NodeGroups.
|
||||||
* Return false if two targets are found on the same NodeGroup.
|
* Return false if two targets are found on the same NodeGroup.
|
||||||
|
|
Loading…
Reference in New Issue