HDFS-8946. Improve choosing datanode storage for block placement. (yliu)
This commit is contained in:
parent
21b478e1bf
commit
d852ec1f77
|
@ -525,6 +525,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-8990. Move RemoteBlockReader to hdfs-client module.
|
HDFS-8990. Move RemoteBlockReader to hdfs-client module.
|
||||||
(Mingliang via wheat9)
|
(Mingliang via wheat9)
|
||||||
|
|
||||||
|
HDFS-8946. Improve choosing datanode storage for block placement. (yliu)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -26,12 +26,9 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
|
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.net.NodeBase;
|
import org.apache.hadoop.net.NodeBase;
|
||||||
|
@ -458,19 +455,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
|
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
|
||||||
.entrySet().iterator(); iter.hasNext(); ) {
|
.entrySet().iterator(); iter.hasNext(); ) {
|
||||||
Map.Entry<StorageType, Integer> entry = iter.next();
|
Map.Entry<StorageType, Integer> entry = iter.next();
|
||||||
for (DatanodeStorageInfo localStorage : DFSUtil.shuffle(
|
DatanodeStorageInfo localStorage = chooseStorage4Block(
|
||||||
localDatanode.getStorageInfos())) {
|
localDatanode, blocksize, results, entry.getKey());
|
||||||
StorageType type = entry.getKey();
|
if (localStorage != null) {
|
||||||
if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
|
// add node and related nodes to excludedNode
|
||||||
results, type) >= 0) {
|
addToExcludedNodes(localDatanode, excludedNodes);
|
||||||
int num = entry.getValue();
|
int num = entry.getValue();
|
||||||
if (num == 1) {
|
if (num == 1) {
|
||||||
iter.remove();
|
iter.remove();
|
||||||
} else {
|
} else {
|
||||||
entry.setValue(num - 1);
|
entry.setValue(num - 1);
|
||||||
}
|
|
||||||
return localStorage;
|
|
||||||
}
|
}
|
||||||
|
return localStorage;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -651,7 +647,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
boolean avoidStaleNodes,
|
boolean avoidStaleNodes,
|
||||||
EnumMap<StorageType, Integer> storageTypes)
|
EnumMap<StorageType, Integer> storageTypes)
|
||||||
throws NotEnoughReplicasException {
|
throws NotEnoughReplicasException {
|
||||||
|
|
||||||
int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
|
int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
|
||||||
scope, excludedNodes);
|
scope, excludedNodes);
|
||||||
StringBuilder builder = null;
|
StringBuilder builder = null;
|
||||||
|
@ -669,49 +665,39 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" [");
|
builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" [");
|
||||||
}
|
}
|
||||||
numOfAvailableNodes--;
|
numOfAvailableNodes--;
|
||||||
if (!isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad,
|
DatanodeStorageInfo storage = null;
|
||||||
|
if (isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad,
|
||||||
results, avoidStaleNodes)) {
|
results, avoidStaleNodes)) {
|
||||||
if (LOG.isDebugEnabled()) {
|
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
|
||||||
builder.append("\n]");
|
.entrySet().iterator(); iter.hasNext(); ) {
|
||||||
}
|
Map.Entry<StorageType, Integer> entry = iter.next();
|
||||||
badTarget = true;
|
storage = chooseStorage4Block(
|
||||||
continue;
|
chosenNode, blocksize, results, entry.getKey());
|
||||||
}
|
if (storage != null) {
|
||||||
|
|
||||||
final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
|
|
||||||
chosenNode.getStorageInfos());
|
|
||||||
int i = 0;
|
|
||||||
boolean search = true;
|
|
||||||
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
|
|
||||||
.entrySet().iterator(); search && iter.hasNext(); ) {
|
|
||||||
Map.Entry<StorageType, Integer> entry = iter.next();
|
|
||||||
for (i = 0; i < storages.length; i++) {
|
|
||||||
StorageType type = entry.getKey();
|
|
||||||
final int newExcludedNodes = addIfIsGoodTarget(storages[i],
|
|
||||||
excludedNodes, blocksize, results, type);
|
|
||||||
if (newExcludedNodes >= 0) {
|
|
||||||
numOfReplicas--;
|
numOfReplicas--;
|
||||||
if (firstChosen == null) {
|
if (firstChosen == null) {
|
||||||
firstChosen = storages[i];
|
firstChosen = storage;
|
||||||
}
|
}
|
||||||
numOfAvailableNodes -= newExcludedNodes;
|
// add node and related nodes to excludedNode
|
||||||
|
numOfAvailableNodes -=
|
||||||
|
addToExcludedNodes(chosenNode, excludedNodes);
|
||||||
int num = entry.getValue();
|
int num = entry.getValue();
|
||||||
if (num == 1) {
|
if (num == 1) {
|
||||||
iter.remove();
|
iter.remove();
|
||||||
} else {
|
} else {
|
||||||
entry.setValue(num - 1);
|
entry.setValue(num - 1);
|
||||||
}
|
}
|
||||||
search = false;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
builder.append("\n]");
|
builder.append("\n]");
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no candidate storage was found on this DN then set badTarget.
|
// If no candidate storage was found on this DN then set badTarget.
|
||||||
badTarget = (i == storages.length);
|
badTarget = (storage == null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -740,32 +726,27 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If the given storage is a good target, add it to the result list and
|
* Choose a good storage of given storage type from datanode, and add it to
|
||||||
* update the set of excluded nodes.
|
* the result list.
|
||||||
* @return -1 if the given is not a good target;
|
*
|
||||||
* otherwise, return the number of nodes added to excludedNodes set.
|
* @param dnd datanode descriptor
|
||||||
|
* @param blockSize requested block size
|
||||||
|
* @param results the result storages
|
||||||
|
* @param storageType requested storage type
|
||||||
|
* @return the chosen datanode storage
|
||||||
*/
|
*/
|
||||||
int addIfIsGoodTarget(DatanodeStorageInfo storage,
|
DatanodeStorageInfo chooseStorage4Block(DatanodeDescriptor dnd,
|
||||||
Set<Node> excludedNodes,
|
|
||||||
long blockSize,
|
long blockSize,
|
||||||
List<DatanodeStorageInfo> results,
|
List<DatanodeStorageInfo> results,
|
||||||
StorageType storageType) {
|
StorageType storageType) {
|
||||||
if (isGoodTarget(storage, blockSize, results, storageType)) {
|
DatanodeStorageInfo storage =
|
||||||
|
dnd.chooseStorage4Block(storageType, blockSize);
|
||||||
|
if (storage != null) {
|
||||||
results.add(storage);
|
results.add(storage);
|
||||||
// add node and related nodes to excludedNode
|
} else {
|
||||||
return addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
|
logNodeIsNotChosen(dnd, "no good storage to place the block ");
|
||||||
} else {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void logNodeIsNotChosen(DatanodeStorageInfo storage, String reason) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
// build the error message for later use.
|
|
||||||
debugLoggingBuilder.get()
|
|
||||||
.append("\n Storage ").append(storage)
|
|
||||||
.append(" is not chosen since ").append(reason).append(".");
|
|
||||||
}
|
}
|
||||||
|
return storage;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void logNodeIsNotChosen(DatanodeDescriptor node,
|
private static void logNodeIsNotChosen(DatanodeDescriptor node,
|
||||||
|
@ -836,52 +817,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Determine if a storage is a good target.
|
|
||||||
*
|
|
||||||
* @param storage The target storage
|
|
||||||
* @param blockSize Size of block
|
|
||||||
* @param results A list containing currently chosen nodes. Used to check if
|
|
||||||
* too many nodes has been chosen in the target rack.
|
|
||||||
* @return Return true if <i>node</i> has enough space.
|
|
||||||
*/
|
|
||||||
private boolean isGoodTarget(DatanodeStorageInfo storage,
|
|
||||||
long blockSize,
|
|
||||||
List<DatanodeStorageInfo> results,
|
|
||||||
StorageType requiredStorageType) {
|
|
||||||
if (storage.getStorageType() != requiredStorageType) {
|
|
||||||
logNodeIsNotChosen(storage, "storage types do not match,"
|
|
||||||
+ " where the required storage type is " + requiredStorageType);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (storage.getState() == State.READ_ONLY_SHARED) {
|
|
||||||
logNodeIsNotChosen(storage, "storage is read-only");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (storage.getState() == State.FAILED) {
|
|
||||||
logNodeIsNotChosen(storage, "storage has failed");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
|
||||||
|
|
||||||
final long requiredSize = blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
|
|
||||||
final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType());
|
|
||||||
final long remaining = node.getRemaining(storage.getStorageType(),
|
|
||||||
requiredSize);
|
|
||||||
if (requiredSize > remaining - scheduledSize) {
|
|
||||||
logNodeIsNotChosen(storage, "the node does not have enough "
|
|
||||||
+ storage.getStorageType() + " space"
|
|
||||||
+ " (required=" + requiredSize
|
|
||||||
+ ", scheduled=" + scheduledSize
|
|
||||||
+ ", remaining=" + remaining + ")");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a pipeline of nodes.
|
* Return a pipeline of nodes.
|
||||||
* The pipeline is formed finding a shortest path that
|
* The pipeline is formed finding a shortest path that
|
||||||
|
|
|
@ -31,14 +31,15 @@ import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
|
@ -665,26 +666,39 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the sum of remaining spaces of the specified type. If the remaining
|
* Find whether the datanode contains good storage of given type to
|
||||||
* space of a storage is less than minSize, it won't be counted toward the
|
* place block of size <code>blockSize</code>.
|
||||||
* sum.
|
|
||||||
*
|
*
|
||||||
* @param t The storage type. If null, the type is ignored.
|
* <p>Currently datanode only cares about the storage type, in this
|
||||||
* @param minSize The minimum free space required.
|
* method, the first storage of given type we see is returned.
|
||||||
* @return the sum of remaining spaces that are bigger than minSize.
|
*
|
||||||
|
* @param t requested storage type
|
||||||
|
* @param blockSize requested block size
|
||||||
|
* @return
|
||||||
*/
|
*/
|
||||||
public long getRemaining(StorageType t, long minSize) {
|
public DatanodeStorageInfo chooseStorage4Block(StorageType t,
|
||||||
|
long blockSize) {
|
||||||
|
final long requiredSize =
|
||||||
|
blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
|
||||||
|
final long scheduledSize = blockSize * getBlocksScheduled(t);
|
||||||
long remaining = 0;
|
long remaining = 0;
|
||||||
|
DatanodeStorageInfo storage = null;
|
||||||
for (DatanodeStorageInfo s : getStorageInfos()) {
|
for (DatanodeStorageInfo s : getStorageInfos()) {
|
||||||
if (s.getState() == State.NORMAL &&
|
if (s.getState() == State.NORMAL &&
|
||||||
(t == null || s.getStorageType() == t)) {
|
s.getStorageType() == t) {
|
||||||
|
if (storage == null) {
|
||||||
|
storage = s;
|
||||||
|
}
|
||||||
long r = s.getRemaining();
|
long r = s.getRemaining();
|
||||||
if (r >= minSize) {
|
if (r >= requiredSize) {
|
||||||
remaining += r;
|
remaining += r;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return remaining;
|
if (requiredSize > remaining - scheduledSize) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return storage;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -181,7 +181,7 @@ public class TestReplicationPolicy {
|
||||||
* considered.
|
* considered.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testChooseNodeWithMultipleStorages() throws Exception {
|
public void testChooseNodeWithMultipleStorages1() throws Exception {
|
||||||
updateHeartbeatWithUsage(dataNodes[5],
|
updateHeartbeatWithUsage(dataNodes[5],
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||||
(2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L,
|
(2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L,
|
||||||
|
@ -200,6 +200,30 @@ public class TestReplicationPolicy {
|
||||||
resetHeartbeatForStorages();
|
resetHeartbeatForStorages();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test whether all storages on the datanode are considered while
|
||||||
|
* choosing target to place block.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testChooseNodeWithMultipleStorages2() throws Exception {
|
||||||
|
updateHeartbeatWithUsage(dataNodes[5],
|
||||||
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||||
|
(2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L,
|
||||||
|
0L, 0L, 0, 0);
|
||||||
|
|
||||||
|
updateHeartbeatForExtraStorage(
|
||||||
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||||
|
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L);
|
||||||
|
|
||||||
|
DatanodeStorageInfo[] targets;
|
||||||
|
targets = chooseTarget (1, dataNodes[5],
|
||||||
|
new ArrayList<DatanodeStorageInfo>(), null);
|
||||||
|
assertEquals(1, targets.length);
|
||||||
|
assertEquals(dataNodes[5], targets[0].getDatanodeDescriptor());
|
||||||
|
|
||||||
|
resetHeartbeatForStorages();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In this testcase, client is dataNodes[0]. So the 1st replica should be
|
* In this testcase, client is dataNodes[0]. So the 1st replica should be
|
||||||
* placed on dataNodes[0], the 2nd replica should be placed on
|
* placed on dataNodes[0], the 2nd replica should be placed on
|
||||||
|
|
Loading…
Reference in New Issue