HDFS-8946. Improve choosing datanode storage for block placement. (yliu)
This commit is contained in:
parent
4eaa7fd3ea
commit
8fa41d9dd4
|
@ -870,6 +870,8 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-8990. Move RemoteBlockReader to hdfs-client module.
|
||||
(Mingliang via wheat9)
|
||||
|
||||
HDFS-8946. Improve choosing datanode storage for block placement. (yliu)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -26,12 +26,9 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
|
@ -458,19 +455,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
|
||||
.entrySet().iterator(); iter.hasNext(); ) {
|
||||
Map.Entry<StorageType, Integer> entry = iter.next();
|
||||
for (DatanodeStorageInfo localStorage : DFSUtil.shuffle(
|
||||
localDatanode.getStorageInfos())) {
|
||||
StorageType type = entry.getKey();
|
||||
if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
|
||||
results, type) >= 0) {
|
||||
int num = entry.getValue();
|
||||
if (num == 1) {
|
||||
iter.remove();
|
||||
} else {
|
||||
entry.setValue(num - 1);
|
||||
}
|
||||
return localStorage;
|
||||
DatanodeStorageInfo localStorage = chooseStorage4Block(
|
||||
localDatanode, blocksize, results, entry.getKey());
|
||||
if (localStorage != null) {
|
||||
// add node and related nodes to excludedNode
|
||||
addToExcludedNodes(localDatanode, excludedNodes);
|
||||
int num = entry.getValue();
|
||||
if (num == 1) {
|
||||
iter.remove();
|
||||
} else {
|
||||
entry.setValue(num - 1);
|
||||
}
|
||||
return localStorage;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -651,7 +647,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
boolean avoidStaleNodes,
|
||||
EnumMap<StorageType, Integer> storageTypes)
|
||||
throws NotEnoughReplicasException {
|
||||
|
||||
|
||||
int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
|
||||
scope, excludedNodes);
|
||||
StringBuilder builder = null;
|
||||
|
@ -669,49 +665,39 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" [");
|
||||
}
|
||||
numOfAvailableNodes--;
|
||||
if (!isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad,
|
||||
DatanodeStorageInfo storage = null;
|
||||
if (isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad,
|
||||
results, avoidStaleNodes)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
builder.append("\n]");
|
||||
}
|
||||
badTarget = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
|
||||
chosenNode.getStorageInfos());
|
||||
int i = 0;
|
||||
boolean search = true;
|
||||
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
|
||||
.entrySet().iterator(); search && iter.hasNext(); ) {
|
||||
Map.Entry<StorageType, Integer> entry = iter.next();
|
||||
for (i = 0; i < storages.length; i++) {
|
||||
StorageType type = entry.getKey();
|
||||
final int newExcludedNodes = addIfIsGoodTarget(storages[i],
|
||||
excludedNodes, blocksize, results, type);
|
||||
if (newExcludedNodes >= 0) {
|
||||
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
|
||||
.entrySet().iterator(); iter.hasNext(); ) {
|
||||
Map.Entry<StorageType, Integer> entry = iter.next();
|
||||
storage = chooseStorage4Block(
|
||||
chosenNode, blocksize, results, entry.getKey());
|
||||
if (storage != null) {
|
||||
numOfReplicas--;
|
||||
if (firstChosen == null) {
|
||||
firstChosen = storages[i];
|
||||
firstChosen = storage;
|
||||
}
|
||||
numOfAvailableNodes -= newExcludedNodes;
|
||||
// add node and related nodes to excludedNode
|
||||
numOfAvailableNodes -=
|
||||
addToExcludedNodes(chosenNode, excludedNodes);
|
||||
int num = entry.getValue();
|
||||
if (num == 1) {
|
||||
iter.remove();
|
||||
} else {
|
||||
entry.setValue(num - 1);
|
||||
}
|
||||
search = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
builder.append("\n]");
|
||||
}
|
||||
|
||||
// If no candidate storage was found on this DN then set badTarget.
|
||||
badTarget = (i == storages.length);
|
||||
badTarget = (storage == null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -740,32 +726,27 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
}
|
||||
|
||||
/**
|
||||
* If the given storage is a good target, add it to the result list and
|
||||
* update the set of excluded nodes.
|
||||
* @return -1 if the given is not a good target;
|
||||
* otherwise, return the number of nodes added to excludedNodes set.
|
||||
* Choose a good storage of given storage type from datanode, and add it to
|
||||
* the result list.
|
||||
*
|
||||
* @param dnd datanode descriptor
|
||||
* @param blockSize requested block size
|
||||
* @param results the result storages
|
||||
* @param storageType requested storage type
|
||||
* @return the chosen datanode storage
|
||||
*/
|
||||
int addIfIsGoodTarget(DatanodeStorageInfo storage,
|
||||
Set<Node> excludedNodes,
|
||||
DatanodeStorageInfo chooseStorage4Block(DatanodeDescriptor dnd,
|
||||
long blockSize,
|
||||
List<DatanodeStorageInfo> results,
|
||||
StorageType storageType) {
|
||||
if (isGoodTarget(storage, blockSize, results, storageType)) {
|
||||
DatanodeStorageInfo storage =
|
||||
dnd.chooseStorage4Block(storageType, blockSize);
|
||||
if (storage != null) {
|
||||
results.add(storage);
|
||||
// add node and related nodes to excludedNode
|
||||
return addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
private static void logNodeIsNotChosen(DatanodeStorageInfo storage, String reason) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
// build the error message for later use.
|
||||
debugLoggingBuilder.get()
|
||||
.append("\n Storage ").append(storage)
|
||||
.append(" is not chosen since ").append(reason).append(".");
|
||||
} else {
|
||||
logNodeIsNotChosen(dnd, "no good storage to place the block ");
|
||||
}
|
||||
return storage;
|
||||
}
|
||||
|
||||
private static void logNodeIsNotChosen(DatanodeDescriptor node,
|
||||
|
@ -836,52 +817,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if a storage is a good target.
|
||||
*
|
||||
* @param storage The target storage
|
||||
* @param blockSize Size of block
|
||||
* @param results A list containing currently chosen nodes. Used to check if
|
||||
* too many nodes has been chosen in the target rack.
|
||||
* @return Return true if <i>node</i> has enough space.
|
||||
*/
|
||||
private boolean isGoodTarget(DatanodeStorageInfo storage,
|
||||
long blockSize,
|
||||
List<DatanodeStorageInfo> results,
|
||||
StorageType requiredStorageType) {
|
||||
if (storage.getStorageType() != requiredStorageType) {
|
||||
logNodeIsNotChosen(storage, "storage types do not match,"
|
||||
+ " where the required storage type is " + requiredStorageType);
|
||||
return false;
|
||||
}
|
||||
if (storage.getState() == State.READ_ONLY_SHARED) {
|
||||
logNodeIsNotChosen(storage, "storage is read-only");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (storage.getState() == State.FAILED) {
|
||||
logNodeIsNotChosen(storage, "storage has failed");
|
||||
return false;
|
||||
}
|
||||
|
||||
DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
||||
|
||||
final long requiredSize = blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
|
||||
final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType());
|
||||
final long remaining = node.getRemaining(storage.getStorageType(),
|
||||
requiredSize);
|
||||
if (requiredSize > remaining - scheduledSize) {
|
||||
logNodeIsNotChosen(storage, "the node does not have enough "
|
||||
+ storage.getStorageType() + " space"
|
||||
+ " (required=" + requiredSize
|
||||
+ ", scheduled=" + scheduledSize
|
||||
+ ", remaining=" + remaining + ")");
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a pipeline of nodes.
|
||||
* The pipeline is formed finding a shortest path that
|
||||
|
|
|
@ -31,14 +31,15 @@ import java.util.Queue;
|
|||
import java.util.Set;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
|
@ -663,26 +664,39 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
}
|
||||
|
||||
/**
|
||||
* Return the sum of remaining spaces of the specified type. If the remaining
|
||||
* space of a storage is less than minSize, it won't be counted toward the
|
||||
* sum.
|
||||
* Find whether the datanode contains good storage of given type to
|
||||
* place block of size <code>blockSize</code>.
|
||||
*
|
||||
* @param t The storage type. If null, the type is ignored.
|
||||
* @param minSize The minimum free space required.
|
||||
* @return the sum of remaining spaces that are bigger than minSize.
|
||||
* <p>Currently datanode only cares about the storage type, in this
|
||||
* method, the first storage of given type we see is returned.
|
||||
*
|
||||
* @param t requested storage type
|
||||
* @param blockSize requested block size
|
||||
* @return
|
||||
*/
|
||||
public long getRemaining(StorageType t, long minSize) {
|
||||
public DatanodeStorageInfo chooseStorage4Block(StorageType t,
|
||||
long blockSize) {
|
||||
final long requiredSize =
|
||||
blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
|
||||
final long scheduledSize = blockSize * getBlocksScheduled(t);
|
||||
long remaining = 0;
|
||||
DatanodeStorageInfo storage = null;
|
||||
for (DatanodeStorageInfo s : getStorageInfos()) {
|
||||
if (s.getState() == State.NORMAL &&
|
||||
(t == null || s.getStorageType() == t)) {
|
||||
s.getStorageType() == t) {
|
||||
if (storage == null) {
|
||||
storage = s;
|
||||
}
|
||||
long r = s.getRemaining();
|
||||
if (r >= minSize) {
|
||||
if (r >= requiredSize) {
|
||||
remaining += r;
|
||||
}
|
||||
}
|
||||
}
|
||||
return remaining;
|
||||
if (requiredSize > remaining - scheduledSize) {
|
||||
return null;
|
||||
}
|
||||
return storage;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -181,7 +181,7 @@ public class TestReplicationPolicy {
|
|||
* considered.
|
||||
*/
|
||||
@Test
|
||||
public void testChooseNodeWithMultipleStorages() throws Exception {
|
||||
public void testChooseNodeWithMultipleStorages1() throws Exception {
|
||||
updateHeartbeatWithUsage(dataNodes[5],
|
||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
(2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L,
|
||||
|
@ -200,6 +200,30 @@ public class TestReplicationPolicy {
|
|||
resetHeartbeatForStorages();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test whether all storages on the datanode are considered while
|
||||
* choosing target to place block.
|
||||
*/
|
||||
@Test
|
||||
public void testChooseNodeWithMultipleStorages2() throws Exception {
|
||||
updateHeartbeatWithUsage(dataNodes[5],
|
||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
(2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L,
|
||||
0L, 0L, 0, 0);
|
||||
|
||||
updateHeartbeatForExtraStorage(
|
||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L);
|
||||
|
||||
DatanodeStorageInfo[] targets;
|
||||
targets = chooseTarget (1, dataNodes[5],
|
||||
new ArrayList<DatanodeStorageInfo>(), null);
|
||||
assertEquals(1, targets.length);
|
||||
assertEquals(dataNodes[5], targets[0].getDatanodeDescriptor());
|
||||
|
||||
resetHeartbeatForStorages();
|
||||
}
|
||||
|
||||
/**
|
||||
* In this testcase, client is dataNodes[0]. So the 1st replica should be
|
||||
* placed on dataNodes[0], the 2nd replica should be placed on
|
||||
|
|
Loading…
Reference in New Issue