HDFS-8863. The remaining space check in BlockPlacementPolicyDefault is flawed. (Kihwal Lee via yliu)
This commit is contained in:
parent
63bdbb7793
commit
146db49f7f
|
@ -852,11 +852,11 @@ Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HDFS-8659. Block scanner INFO message is spamming logs. (Yongjun Zhang)
|
HDFS-8659. Block scanner INFO message is spamming logs. (Yongjun Zhang)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8722. Optimize datanode writes for small writes and flushes (kihwal)
|
HDFS-8722. Optimize datanode writes for small writes and flushes (kihwal)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
|
@ -871,6 +871,9 @@ Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
HDFS-8867. Enable optimized block reports. (Daryn Sharp via jing9)
|
HDFS-8867. Enable optimized block reports. (Daryn Sharp via jing9)
|
||||||
|
|
||||||
|
HDFS-8863. The remaining space check in BlockPlacementPolicyDefault is
|
||||||
|
flawed. (Kihwal Lee via yliu)
|
||||||
|
|
||||||
Release 2.7.1 - 2015-07-06
|
Release 2.7.1 - 2015-07-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -868,7 +868,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
|
|
||||||
final long requiredSize = blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
|
final long requiredSize = blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
|
||||||
final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType());
|
final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType());
|
||||||
final long remaining = node.getRemaining(storage.getStorageType());
|
final long remaining = node.getRemaining(storage.getStorageType(),
|
||||||
|
requiredSize);
|
||||||
if (requiredSize > remaining - scheduledSize) {
|
if (requiredSize > remaining - scheduledSize) {
|
||||||
logNodeIsNotChosen(storage, "the node does not have enough "
|
logNodeIsNotChosen(storage, "the node does not have enough "
|
||||||
+ storage.getStorageType() + " space"
|
+ storage.getStorageType() + " space"
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||||
import org.apache.hadoop.hdfs.util.EnumCounters;
|
import org.apache.hadoop.hdfs.util.EnumCounters;
|
||||||
|
@ -664,16 +665,26 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Approximate number of blocks currently scheduled to be written
|
* Return the sum of remaining spaces of the specified type. If the remaining
|
||||||
|
* space of a storage is less than minSize, it won't be counted toward the
|
||||||
|
* sum.
|
||||||
|
*
|
||||||
|
* @param t The storage type. If null, the type is ignored.
|
||||||
|
* @param minSize The minimum free space required.
|
||||||
|
* @return the sum of remaining spaces that are bigger than minSize.
|
||||||
*/
|
*/
|
||||||
public long getRemaining(StorageType t) {
|
public long getRemaining(StorageType t, long minSize) {
|
||||||
long remaining = 0;
|
long remaining = 0;
|
||||||
for(DatanodeStorageInfo s : getStorageInfos()) {
|
for (DatanodeStorageInfo s : getStorageInfos()) {
|
||||||
if (s.getStorageType() == t) {
|
if (s.getState() == State.NORMAL &&
|
||||||
remaining += s.getRemaining();
|
(t == null || s.getStorageType() == t)) {
|
||||||
|
long r = s.getRemaining();
|
||||||
|
if (r >= minSize) {
|
||||||
|
remaining += r;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return remaining;
|
return remaining;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -100,6 +100,16 @@ public class TestReplicationPolicy {
|
||||||
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
|
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void updateHeartbeatForExtraStorage(long capacity,
|
||||||
|
long dfsUsed, long remaining, long blockPoolUsed) {
|
||||||
|
DatanodeDescriptor dn = dataNodes[5];
|
||||||
|
dn.getStorageInfos()[1].setUtilizationForTesting(
|
||||||
|
capacity, dfsUsed, remaining, blockPoolUsed);
|
||||||
|
dn.updateHeartbeat(
|
||||||
|
BlockManagerTestUtil.getStorageReportsForDatanode(dn),
|
||||||
|
0L, 0L, 0, 0, null);
|
||||||
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setupCluster() throws Exception {
|
public static void setupCluster() throws Exception {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
@ -113,6 +123,16 @@ public class TestReplicationPolicy {
|
||||||
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
|
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
|
||||||
dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
|
dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
|
||||||
|
|
||||||
|
// create an extra storage for dn5.
|
||||||
|
DatanodeStorage extraStorage = new DatanodeStorage(
|
||||||
|
storages[5].getStorageID() + "-extra", DatanodeStorage.State.NORMAL,
|
||||||
|
StorageType.DEFAULT);
|
||||||
|
/* DatanodeStorageInfo si = new DatanodeStorageInfo(
|
||||||
|
storages[5].getDatanodeDescriptor(), extraStorage);
|
||||||
|
*/
|
||||||
|
BlockManagerTestUtil.updateStorage(storages[5].getDatanodeDescriptor(),
|
||||||
|
extraStorage);
|
||||||
|
|
||||||
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
|
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
|
||||||
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
|
File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
|
||||||
|
@ -135,11 +155,17 @@ public class TestReplicationPolicy {
|
||||||
bm.getDatanodeManager().getHeartbeatManager().addDatanode(
|
bm.getDatanodeManager().getHeartbeatManager().addDatanode(
|
||||||
dataNodes[i]);
|
dataNodes[i]);
|
||||||
}
|
}
|
||||||
|
resetHeartbeatForStorages();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void resetHeartbeatForStorages() {
|
||||||
for (int i=0; i < NUM_OF_DATANODES; i++) {
|
for (int i=0; i < NUM_OF_DATANODES; i++) {
|
||||||
updateHeartbeatWithUsage(dataNodes[i],
|
updateHeartbeatWithUsage(dataNodes[i],
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
||||||
}
|
}
|
||||||
|
// No available space in the extra storage of dn0
|
||||||
|
updateHeartbeatForExtraStorage(0L, 0L, 0L, 0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
|
private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
|
||||||
|
@ -149,6 +175,31 @@ public class TestReplicationPolicy {
|
||||||
private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) {
|
private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) {
|
||||||
return cluster.isOnSameRack(left.getDatanodeDescriptor(), right);
|
return cluster.isOnSameRack(left.getDatanodeDescriptor(), right);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test whether the remaining space per storage is individually
|
||||||
|
* considered.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testChooseNodeWithMultipleStorages() throws Exception {
|
||||||
|
updateHeartbeatWithUsage(dataNodes[5],
|
||||||
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||||
|
(2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L,
|
||||||
|
0L, 0L, 0, 0);
|
||||||
|
|
||||||
|
updateHeartbeatForExtraStorage(
|
||||||
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||||
|
(2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L);
|
||||||
|
|
||||||
|
DatanodeStorageInfo[] targets;
|
||||||
|
targets = chooseTarget (1, dataNodes[5],
|
||||||
|
new ArrayList<DatanodeStorageInfo>(), null);
|
||||||
|
assertEquals(1, targets.length);
|
||||||
|
assertEquals(storages[4], targets[0]);
|
||||||
|
|
||||||
|
resetHeartbeatForStorages();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In this testcase, client is dataNodes[0]. So the 1st replica should be
|
* In this testcase, client is dataNodes[0]. So the 1st replica should be
|
||||||
* placed on dataNodes[0], the 2nd replica should be placed on
|
* placed on dataNodes[0], the 2nd replica should be placed on
|
||||||
|
@ -190,10 +241,8 @@ public class TestReplicationPolicy {
|
||||||
assertTrue(isOnSameRack(targets[1], targets[2]) ||
|
assertTrue(isOnSameRack(targets[1], targets[2]) ||
|
||||||
isOnSameRack(targets[2], targets[3]));
|
isOnSameRack(targets[2], targets[3]));
|
||||||
assertFalse(isOnSameRack(targets[0], targets[2]));
|
assertFalse(isOnSameRack(targets[0], targets[2]));
|
||||||
|
|
||||||
updateHeartbeatWithUsage(dataNodes[0],
|
resetHeartbeatForStorages();
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
|
||||||
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
|
private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
|
||||||
|
@ -348,9 +397,7 @@ public class TestReplicationPolicy {
|
||||||
isOnSameRack(targets[2], targets[3]));
|
isOnSameRack(targets[2], targets[3]));
|
||||||
assertFalse(isOnSameRack(targets[1], targets[3]));
|
assertFalse(isOnSameRack(targets[1], targets[3]));
|
||||||
|
|
||||||
updateHeartbeatWithUsage(dataNodes[0],
|
resetHeartbeatForStorages();
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
|
||||||
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -391,12 +438,8 @@ public class TestReplicationPolicy {
|
||||||
assertTrue(isOnSameRack(targets[0], targets[1]) ||
|
assertTrue(isOnSameRack(targets[0], targets[1]) ||
|
||||||
isOnSameRack(targets[1], targets[2]));
|
isOnSameRack(targets[1], targets[2]));
|
||||||
assertFalse(isOnSameRack(targets[0], targets[2]));
|
assertFalse(isOnSameRack(targets[0], targets[2]));
|
||||||
|
|
||||||
for(int i=0; i<2; i++) {
|
resetHeartbeatForStorages();
|
||||||
updateHeartbeatWithUsage(dataNodes[i],
|
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
|
||||||
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -474,6 +517,7 @@ public class TestReplicationPolicy {
|
||||||
} finally {
|
} finally {
|
||||||
bm.getDatanodeManager().getNetworkTopology().remove(newDn);
|
bm.getDatanodeManager().getNetworkTopology().remove(newDn);
|
||||||
}
|
}
|
||||||
|
resetHeartbeatForStorages();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -527,12 +571,8 @@ public class TestReplicationPolicy {
|
||||||
// Suppose to place replicas on each node but two data nodes are not
|
// Suppose to place replicas on each node but two data nodes are not
|
||||||
// available for placing replica, so here we expect a short of 2
|
// available for placing replica, so here we expect a short of 2
|
||||||
assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2"));
|
assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2"));
|
||||||
|
|
||||||
for(int i=0; i<2; i++) {
|
resetHeartbeatForStorages();
|
||||||
updateHeartbeatWithUsage(dataNodes[i],
|
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
|
||||||
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean containsWithinRange(DatanodeStorageInfo target,
|
private boolean containsWithinRange(DatanodeStorageInfo target,
|
||||||
|
|
Loading…
Reference in New Issue