HDFS-9876. shouldProcessOverReplicated should not count number of pending replicas. Contributed by Jing Zhao.
This commit is contained in:
parent
4abb2fa687
commit
f2ba7da4f0
@ -455,6 +455,9 @@ Trunk (Unreleased)
|
|||||||
HDFS-9867. Missing block exception should carry locatedBlocks information.
|
HDFS-9867. Missing block exception should carry locatedBlocks information.
|
||||||
(Mingliang Liu via jing9)
|
(Mingliang Liu via jing9)
|
||||||
|
|
||||||
|
HDFS-9876. shouldProcessOverReplicated should not count number of pending
|
||||||
|
replicas. (jing9)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-7347. Configurable erasure coding policy for individual files and
|
HDFS-7347. Configurable erasure coding policy for individual files and
|
||||||
|
@ -2893,7 +2893,7 @@ private Block addStoredBlock(final BlockInfo block,
|
|||||||
} else {
|
} else {
|
||||||
updateNeededReplications(storedBlock, curReplicaDelta, 0);
|
updateNeededReplications(storedBlock, curReplicaDelta, 0);
|
||||||
}
|
}
|
||||||
if (shouldProcessOverReplicated(num, pendingNum, fileReplication)) {
|
if (shouldProcessOverReplicated(num, fileReplication)) {
|
||||||
processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint);
|
processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint);
|
||||||
}
|
}
|
||||||
// If the file replication has reached desired value
|
// If the file replication has reached desired value
|
||||||
@ -2912,8 +2912,8 @@ private Block addStoredBlock(final BlockInfo block,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean shouldProcessOverReplicated(NumberReplicas num,
|
private boolean shouldProcessOverReplicated(NumberReplicas num,
|
||||||
int pendingNum, int expectedNum) {
|
int expectedNum) {
|
||||||
int numCurrent = num.liveReplicas() + pendingNum;
|
final int numCurrent = num.liveReplicas();
|
||||||
return numCurrent > expectedNum ||
|
return numCurrent > expectedNum ||
|
||||||
(numCurrent == expectedNum && num.redundantInternalBlocks() > 0);
|
(numCurrent == expectedNum && num.redundantInternalBlocks() > 0);
|
||||||
}
|
}
|
||||||
@ -3131,7 +3131,7 @@ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shouldProcessOverReplicated(num, 0, expectedReplication)) {
|
if (shouldProcessOverReplicated(num, expectedReplication)) {
|
||||||
if (num.replicasOnStaleNodes() > 0) {
|
if (num.replicasOnStaleNodes() > 0) {
|
||||||
// If any of the replicas of this block are on nodes that are
|
// If any of the replicas of this block are on nodes that are
|
||||||
// considered "stale", then these replicas may in fact have
|
// considered "stale", then these replicas may in fact have
|
||||||
@ -3268,7 +3268,6 @@ private void chooseExcessReplicasStriped(BlockCollection bc,
|
|||||||
assert storedBlock instanceof BlockInfoStriped;
|
assert storedBlock instanceof BlockInfoStriped;
|
||||||
BlockInfoStriped sblk = (BlockInfoStriped) storedBlock;
|
BlockInfoStriped sblk = (BlockInfoStriped) storedBlock;
|
||||||
short groupSize = sblk.getTotalBlockNum();
|
short groupSize = sblk.getTotalBlockNum();
|
||||||
BlockPlacementPolicy placementPolicy = placementPolicies.getPolicy(true);
|
|
||||||
|
|
||||||
// find all duplicated indices
|
// find all duplicated indices
|
||||||
BitSet found = new BitSet(groupSize); //indices found
|
BitSet found = new BitSet(groupSize); //indices found
|
||||||
@ -3283,14 +3282,6 @@ private void chooseExcessReplicasStriped(BlockCollection bc,
|
|||||||
found.set(index);
|
found.set(index);
|
||||||
storage2index.put(storage, index);
|
storage2index.put(storage, index);
|
||||||
}
|
}
|
||||||
// the number of target left replicas equals to the of number of the found
|
|
||||||
// indices.
|
|
||||||
int numOfTarget = found.cardinality();
|
|
||||||
|
|
||||||
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
|
|
||||||
bc.getStoragePolicyID());
|
|
||||||
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
|
|
||||||
(short)numOfTarget, DatanodeStorageInfo.toStorageTypes(nonExcess));
|
|
||||||
|
|
||||||
// use delHint only if delHint is duplicated
|
// use delHint only if delHint is duplicated
|
||||||
final DatanodeStorageInfo delStorageHint =
|
final DatanodeStorageInfo delStorageHint =
|
||||||
@ -3302,6 +3293,19 @@ private void chooseExcessReplicasStriped(BlockCollection bc,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cardinality of found indicates the expected number of internal blocks
|
||||||
|
final int numOfTarget = found.cardinality();
|
||||||
|
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
|
||||||
|
bc.getStoragePolicyID());
|
||||||
|
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
|
||||||
|
(short) numOfTarget, DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||||
|
if (excessTypes.isEmpty()) {
|
||||||
|
LOG.warn("excess types chosen for block {} among storages {} is empty",
|
||||||
|
storedBlock, nonExcess);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockPlacementPolicy placementPolicy = placementPolicies.getPolicy(true);
|
||||||
// for each duplicated index, delete some replicas until only one left
|
// for each duplicated index, delete some replicas until only one left
|
||||||
for (int targetIndex = duplicated.nextSetBit(0); targetIndex >= 0;
|
for (int targetIndex = duplicated.nextSetBit(0); targetIndex >= 0;
|
||||||
targetIndex = duplicated.nextSetBit(targetIndex + 1)) {
|
targetIndex = duplicated.nextSetBit(targetIndex + 1)) {
|
||||||
@ -3312,9 +3316,7 @@ private void chooseExcessReplicasStriped(BlockCollection bc,
|
|||||||
candidates.add(storage);
|
candidates.add(storage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Block internalBlock = new Block(storedBlock);
|
if (candidates.size() > 1) {
|
||||||
internalBlock.setBlockId(storedBlock.getBlockId() + targetIndex);
|
|
||||||
while (candidates.size() > 1) {
|
|
||||||
List<DatanodeStorageInfo> replicasToDelete = placementPolicy
|
List<DatanodeStorageInfo> replicasToDelete = placementPolicy
|
||||||
.chooseReplicasToDelete(nonExcess, candidates, (short) 1,
|
.chooseReplicasToDelete(nonExcess, candidates, (short) 1,
|
||||||
excessTypes, null, null);
|
excessTypes, null, null);
|
||||||
@ -3749,7 +3751,7 @@ void processOverReplicatedBlocksOnReCommission(
|
|||||||
final BlockInfo block = it.next();
|
final BlockInfo block = it.next();
|
||||||
int expectedReplication = this.getReplication(block);
|
int expectedReplication = this.getReplication(block);
|
||||||
NumberReplicas num = countNodes(block);
|
NumberReplicas num = countNodes(block);
|
||||||
if (shouldProcessOverReplicated(num, 0, expectedReplication)) {
|
if (shouldProcessOverReplicated(num, expectedReplication)) {
|
||||||
// over-replicated block
|
// over-replicated block
|
||||||
processOverReplicatedBlock(block, (short) expectedReplication, null,
|
processOverReplicatedBlock(block, (short) expectedReplication, null,
|
||||||
null);
|
null);
|
||||||
@ -3886,7 +3888,7 @@ public void checkReplication(BlockCollection bc) {
|
|||||||
neededReplications.add(block, n.liveReplicas() + pending,
|
neededReplications.add(block, n.liveReplicas() + pending,
|
||||||
n.readOnlyReplicas(),
|
n.readOnlyReplicas(),
|
||||||
n.decommissionedAndDecommissioning(), expected);
|
n.decommissionedAndDecommissioning(), expected);
|
||||||
} else if (shouldProcessOverReplicated(n, 0, expected)) {
|
} else if (shouldProcessOverReplicated(n, expected)) {
|
||||||
processOverReplicatedBlock(block, expected, null, null);
|
processOverReplicatedBlock(block, expected, null, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user