HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone classes from BlockManager. Contributed by Mingliang Liu.
This commit is contained in:
parent
cbb249534a
commit
6d12cd8d60
|
@ -855,6 +855,9 @@ Release 2.8.0 - UNRELEASED
|
|||
|
||||
HDFS-8865. Improve quota initialization performance. (kihwal)
|
||||
|
||||
HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone
|
||||
classes from BlockManager. (Mingliang Liu via wheat9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -1181,24 +1181,24 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
DatanodeStorageInfo storageInfo,
|
||||
DatanodeDescriptor node) throws IOException {
|
||||
|
||||
if (b.corrupted.isDeleted()) {
|
||||
if (b.getCorrupted().isDeleted()) {
|
||||
blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
|
||||
" corrupt as it does not belong to any file", b);
|
||||
addToInvalidates(b.corrupted, node);
|
||||
addToInvalidates(b.getCorrupted(), node);
|
||||
return;
|
||||
}
|
||||
short expectedReplicas = b.corrupted.getReplication();
|
||||
short expectedReplicas = b.getCorrupted().getReplication();
|
||||
|
||||
// Add replica to the data-node if it is not already there
|
||||
if (storageInfo != null) {
|
||||
storageInfo.addBlock(b.stored);
|
||||
storageInfo.addBlock(b.getStored());
|
||||
}
|
||||
|
||||
// Add this replica to corruptReplicas Map
|
||||
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
|
||||
b.reasonCode);
|
||||
corruptReplicas.addToCorruptReplicasMap(b.getCorrupted(), node,
|
||||
b.getReason(), b.getReasonCode());
|
||||
|
||||
NumberReplicas numberOfReplicas = countNodes(b.stored);
|
||||
NumberReplicas numberOfReplicas = countNodes(b.getStored());
|
||||
boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
|
||||
expectedReplicas;
|
||||
boolean minReplicationSatisfied =
|
||||
|
@ -1207,7 +1207,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
|
||||
expectedReplicas;
|
||||
boolean corruptedDuringWrite = minReplicationSatisfied &&
|
||||
(b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp());
|
||||
b.isCorruptedDuringWrite();
|
||||
// case 1: have enough number of live replicas
|
||||
// case 2: corrupted replicas + live replicas > Replication factor
|
||||
// case 3: Block is marked corrupt due to failure while writing. In this
|
||||
|
@ -1220,7 +1220,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
invalidateBlock(b, node);
|
||||
} else if (namesystem.isPopulatingReplQueues()) {
|
||||
// add the block to neededReplication
|
||||
updateNeededReplications(b.stored, -1, 0);
|
||||
updateNeededReplications(b.getStored(), -1, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1239,18 +1239,18 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
}
|
||||
|
||||
// Check how many copies we have of the block
|
||||
NumberReplicas nr = countNodes(b.stored);
|
||||
NumberReplicas nr = countNodes(b.getStored());
|
||||
if (nr.replicasOnStaleNodes() > 0) {
|
||||
blockLog.debug("BLOCK* invalidateBlocks: postponing " +
|
||||
"invalidation of {} on {} because {} replica(s) are located on " +
|
||||
"nodes with potentially out-of-date block reports", b, dn,
|
||||
nr.replicasOnStaleNodes());
|
||||
postponeBlock(b.corrupted);
|
||||
postponeBlock(b.getCorrupted());
|
||||
return false;
|
||||
} else if (nr.liveReplicas() >= 1) {
|
||||
// If we have at least one copy on a live node, then we can delete it.
|
||||
addToInvalidates(b.corrupted, dn);
|
||||
removeStoredBlock(b.stored, node);
|
||||
addToInvalidates(b.getCorrupted(), dn);
|
||||
removeStoredBlock(b.getStored(), node);
|
||||
blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
|
||||
b, dn);
|
||||
return true;
|
||||
|
@ -1338,71 +1338,18 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
*/
|
||||
@VisibleForTesting
|
||||
int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
|
||||
int requiredReplication, numEffectiveReplicas;
|
||||
List<DatanodeDescriptor> containingNodes;
|
||||
DatanodeDescriptor srcNode;
|
||||
BlockCollection bc = null;
|
||||
int additionalReplRequired;
|
||||
|
||||
int scheduledWork = 0;
|
||||
List<ReplicationWork> work = new LinkedList<ReplicationWork>();
|
||||
final List<ReplicationWork> work = new LinkedList<>();
|
||||
|
||||
namesystem.writeLock();
|
||||
try {
|
||||
synchronized (neededReplications) {
|
||||
for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
|
||||
for (BlockInfo block : blocksToReplicate.get(priority)) {
|
||||
// block should belong to a file
|
||||
bc = getBlockCollection(block);
|
||||
// abandoned block or block reopened for append
|
||||
if (bc == null
|
||||
|| (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
||||
// remove from neededReplications
|
||||
neededReplications.remove(block, priority);
|
||||
continue;
|
||||
ReplicationWork rw = scheduleReplication(block, priority);
|
||||
if (rw != null) {
|
||||
work.add(rw);
|
||||
}
|
||||
|
||||
requiredReplication = getExpectedReplicaNum(block);
|
||||
|
||||
// get a source data-node
|
||||
containingNodes = new ArrayList<DatanodeDescriptor>();
|
||||
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
NumberReplicas numReplicas = new NumberReplicas();
|
||||
srcNode = chooseSourceDatanode(
|
||||
block, containingNodes, liveReplicaNodes, numReplicas,
|
||||
priority);
|
||||
if(srcNode == null) { // block can not be replicated from any node
|
||||
LOG.debug("Block " + block + " cannot be repl from any node");
|
||||
continue;
|
||||
}
|
||||
|
||||
// liveReplicaNodes can include READ_ONLY_SHARED replicas which are
|
||||
// not included in the numReplicas.liveReplicas() count
|
||||
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
|
||||
|
||||
// do not schedule more if enough replicas is already pending
|
||||
numEffectiveReplicas = numReplicas.liveReplicas() +
|
||||
pendingReplications.getNumReplicas(block);
|
||||
|
||||
if (numEffectiveReplicas >= requiredReplication) {
|
||||
if ( (pendingReplications.getNumReplicas(block) > 0) ||
|
||||
(blockHasEnoughRacks(block)) ) {
|
||||
neededReplications.remove(block, priority); // remove from neededReplications
|
||||
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
|
||||
" it has enough replicas", block);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (numReplicas.liveReplicas() < requiredReplication) {
|
||||
additionalReplRequired = requiredReplication
|
||||
- numEffectiveReplicas;
|
||||
} else {
|
||||
additionalReplRequired = 1; // Needed on a new rack
|
||||
}
|
||||
work.add(new ReplicationWork(block, bc, srcNode,
|
||||
containingNodes, liveReplicaNodes, additionalReplRequired,
|
||||
priority));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1410,12 +1357,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
namesystem.writeUnlock();
|
||||
}
|
||||
|
||||
final Set<Node> excludedNodes = new HashSet<Node>();
|
||||
final Set<Node> excludedNodes = new HashSet<>();
|
||||
for(ReplicationWork rw : work){
|
||||
// Exclude all of the containing nodes from being targets.
|
||||
// This list includes decommissioning or corrupt nodes.
|
||||
excludedNodes.clear();
|
||||
for (DatanodeDescriptor dn : rw.containingNodes) {
|
||||
for (DatanodeDescriptor dn : rw.getContainingNodes()) {
|
||||
excludedNodes.add(dn);
|
||||
}
|
||||
|
||||
|
@ -1428,67 +1375,15 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
namesystem.writeLock();
|
||||
try {
|
||||
for(ReplicationWork rw : work){
|
||||
final DatanodeStorageInfo[] targets = rw.targets;
|
||||
final DatanodeStorageInfo[] targets = rw.getTargets();
|
||||
if(targets == null || targets.length == 0){
|
||||
rw.targets = null;
|
||||
rw.resetTargets();
|
||||
continue;
|
||||
}
|
||||
|
||||
synchronized (neededReplications) {
|
||||
BlockInfo block = rw.block;
|
||||
int priority = rw.priority;
|
||||
// Recheck since global lock was released
|
||||
// block should belong to a file
|
||||
bc = getBlockCollection(block);
|
||||
// abandoned block or block reopened for append
|
||||
if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
||||
neededReplications.remove(block, priority); // remove from neededReplications
|
||||
rw.targets = null;
|
||||
continue;
|
||||
}
|
||||
requiredReplication = getExpectedReplicaNum(block);
|
||||
|
||||
// do not schedule more if enough replicas is already pending
|
||||
NumberReplicas numReplicas = countNodes(block);
|
||||
numEffectiveReplicas = numReplicas.liveReplicas() +
|
||||
pendingReplications.getNumReplicas(block);
|
||||
|
||||
if (numEffectiveReplicas >= requiredReplication) {
|
||||
if ( (pendingReplications.getNumReplicas(block) > 0) ||
|
||||
(blockHasEnoughRacks(block)) ) {
|
||||
neededReplications.remove(block, priority); // remove from neededReplications
|
||||
rw.targets = null;
|
||||
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
|
||||
" it has enough replicas", block);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
|
||||
(!blockHasEnoughRacks(block)) ) {
|
||||
if (rw.srcNode.getNetworkLocation().equals(
|
||||
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
|
||||
//No use continuing, unless a new rack in this case
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Add block to the to be replicated list
|
||||
rw.srcNode.addBlockToBeReplicated(block, targets);
|
||||
scheduledWork++;
|
||||
DatanodeStorageInfo.incrementBlocksScheduled(targets);
|
||||
|
||||
// Move the block-replication into a "pending" state.
|
||||
// The reason we use 'pending' is so we can retry
|
||||
// replications that fail after an appropriate amount of time.
|
||||
pendingReplications.increment(block,
|
||||
DatanodeStorageInfo.toDatanodeDescriptors(targets));
|
||||
blockLog.debug("BLOCK* block {} is moved from neededReplications to "
|
||||
+ "pendingReplications", block);
|
||||
|
||||
// remove from neededReplications
|
||||
if(numEffectiveReplicas + targets.length >= requiredReplication) {
|
||||
neededReplications.remove(block, priority); // remove from neededReplications
|
||||
if (validateReplicationWork(rw)) {
|
||||
scheduledWork++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1499,15 +1394,15 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
if (blockLog.isInfoEnabled()) {
|
||||
// log which blocks have been scheduled for replication
|
||||
for(ReplicationWork rw : work){
|
||||
DatanodeStorageInfo[] targets = rw.targets;
|
||||
DatanodeStorageInfo[] targets = rw.getTargets();
|
||||
if (targets != null && targets.length != 0) {
|
||||
StringBuilder targetList = new StringBuilder("datanode(s)");
|
||||
for (int k = 0; k < targets.length; k++) {
|
||||
for (DatanodeStorageInfo target : targets) {
|
||||
targetList.append(' ');
|
||||
targetList.append(targets[k].getDatanodeDescriptor());
|
||||
targetList.append(target.getDatanodeDescriptor());
|
||||
}
|
||||
blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNode,
|
||||
rw.block, targetList);
|
||||
blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNode(),
|
||||
rw.getBlock(), targetList);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1519,6 +1414,118 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
return scheduledWork;
|
||||
}
|
||||
|
||||
boolean hasEnoughEffectiveReplicas(BlockInfo block,
|
||||
NumberReplicas numReplicas, int pendingReplicaNum, int required) {
|
||||
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
|
||||
return (numEffectiveReplicas >= required) &&
|
||||
(pendingReplicaNum > 0 || blockHasEnoughRacks(block));
|
||||
}
|
||||
|
||||
private ReplicationWork scheduleReplication(BlockInfo block, int priority) {
|
||||
// block should belong to a file
|
||||
BlockCollection bc = getBlockCollection(block);
|
||||
// abandoned block or block reopened for append
|
||||
if (bc == null
|
||||
|| (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
||||
// remove from neededReplications
|
||||
neededReplications.remove(block, priority);
|
||||
return null;
|
||||
}
|
||||
|
||||
short requiredReplication = getExpectedReplicaNum(block);
|
||||
|
||||
// get a source data-node
|
||||
List<DatanodeDescriptor> containingNodes = new ArrayList<>();
|
||||
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
|
||||
NumberReplicas numReplicas = new NumberReplicas();
|
||||
DatanodeDescriptor srcNode = chooseSourceDatanode(block, containingNodes,
|
||||
liveReplicaNodes, numReplicas, priority);
|
||||
if (srcNode == null) { // block can not be replicated from any node
|
||||
LOG.debug("Block " + block + " cannot be repl from any node");
|
||||
return null;
|
||||
}
|
||||
|
||||
// liveReplicaNodes can include READ_ONLY_SHARED replicas which are
|
||||
// not included in the numReplicas.liveReplicas() count
|
||||
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
|
||||
|
||||
int pendingNum = pendingReplications.getNumReplicas(block);
|
||||
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
|
||||
requiredReplication)) {
|
||||
neededReplications.remove(block, priority);
|
||||
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
|
||||
" it has enough replicas", block);
|
||||
return null;
|
||||
}
|
||||
|
||||
final int additionalReplRequired;
|
||||
if (numReplicas.liveReplicas() < requiredReplication) {
|
||||
additionalReplRequired = requiredReplication - numReplicas.liveReplicas()
|
||||
- pendingNum;
|
||||
} else {
|
||||
additionalReplRequired = 1; // Needed on a new rack
|
||||
}
|
||||
return new ReplicationWork(block, bc, srcNode, containingNodes,
|
||||
liveReplicaNodes, additionalReplRequired, priority);
|
||||
}
|
||||
|
||||
private boolean validateReplicationWork(ReplicationWork rw) {
|
||||
BlockInfo block = rw.getBlock();
|
||||
int priority = rw.getPriority();
|
||||
// Recheck since global lock was released
|
||||
// block should belong to a file
|
||||
BlockCollection bc = getBlockCollection(block);
|
||||
// abandoned block or block reopened for append
|
||||
if (bc == null
|
||||
|| (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
||||
neededReplications.remove(block, priority);
|
||||
rw.resetTargets();
|
||||
return false;
|
||||
}
|
||||
|
||||
// do not schedule more if enough replicas is already pending
|
||||
final short requiredReplication = getExpectedReplicaNum(block);
|
||||
NumberReplicas numReplicas = countNodes(block);
|
||||
final int pendingNum = pendingReplications.getNumReplicas(block);
|
||||
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
|
||||
requiredReplication)) {
|
||||
neededReplications.remove(block, priority);
|
||||
rw.resetTargets();
|
||||
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
|
||||
" it has enough replicas", block);
|
||||
return false;
|
||||
}
|
||||
|
||||
DatanodeStorageInfo[] targets = rw.getTargets();
|
||||
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
|
||||
(!blockHasEnoughRacks(block)) ) {
|
||||
if (rw.getSrcNode().getNetworkLocation().equals(
|
||||
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
|
||||
//No use continuing, unless a new rack in this case
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Add block to the to be replicated list
|
||||
rw.getSrcNode().addBlockToBeReplicated(block, targets);
|
||||
DatanodeStorageInfo.incrementBlocksScheduled(targets);
|
||||
|
||||
// Move the block-replication into a "pending" state.
|
||||
// The reason we use 'pending' is so we can retry
|
||||
// replications that fail after an appropriate amount of time.
|
||||
pendingReplications.increment(block,
|
||||
DatanodeStorageInfo.toDatanodeDescriptors(targets));
|
||||
blockLog.debug("BLOCK* block {} is moved from neededReplications to "
|
||||
+ "pendingReplications", block);
|
||||
|
||||
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
|
||||
// remove from neededReplications
|
||||
if(numEffectiveReplicas + targets.length >= requiredReplication) {
|
||||
neededReplications.remove(block, priority);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Choose target for WebHDFS redirection. */
|
||||
public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
|
||||
DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
|
||||
|
@ -1766,52 +1773,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
|
||||
* list of blocks that should be considered corrupt due to a block report.
|
||||
*/
|
||||
private static class BlockToMarkCorrupt {
|
||||
/** The corrupted block in a datanode. */
|
||||
final BlockInfo corrupted;
|
||||
/** The corresponding block stored in the BlockManager. */
|
||||
final BlockInfo stored;
|
||||
/** The reason to mark corrupt. */
|
||||
final String reason;
|
||||
/** The reason code to be stored */
|
||||
final Reason reasonCode;
|
||||
|
||||
BlockToMarkCorrupt(BlockInfo corrupted,
|
||||
BlockInfo stored, String reason,
|
||||
Reason reasonCode) {
|
||||
Preconditions.checkNotNull(corrupted, "corrupted is null");
|
||||
Preconditions.checkNotNull(stored, "stored is null");
|
||||
|
||||
this.corrupted = corrupted;
|
||||
this.stored = stored;
|
||||
this.reason = reason;
|
||||
this.reasonCode = reasonCode;
|
||||
}
|
||||
|
||||
BlockToMarkCorrupt(BlockInfo stored, String reason,
|
||||
Reason reasonCode) {
|
||||
this(stored, stored, reason, reasonCode);
|
||||
}
|
||||
|
||||
BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
|
||||
Reason reasonCode) {
|
||||
this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
|
||||
reason, reasonCode);
|
||||
//the corrupted block in datanode has a different generation stamp
|
||||
corrupted.setGenerationStamp(gs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return corrupted + "("
|
||||
+ (corrupted == stored? "same as stored": "stored=" + stored) + ")";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The given storage is reporting all its blocks.
|
||||
* Update the (storage-->block list) and (block-->storage list) maps.
|
||||
|
@ -3797,51 +3758,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
null);
|
||||
}
|
||||
|
||||
private static class ReplicationWork {
|
||||
|
||||
private final BlockInfo block;
|
||||
private final BlockCollection bc;
|
||||
|
||||
private final DatanodeDescriptor srcNode;
|
||||
private final List<DatanodeDescriptor> containingNodes;
|
||||
private final List<DatanodeStorageInfo> liveReplicaStorages;
|
||||
private final int additionalReplRequired;
|
||||
|
||||
private DatanodeStorageInfo targets[];
|
||||
private final int priority;
|
||||
|
||||
public ReplicationWork(BlockInfo block,
|
||||
BlockCollection bc,
|
||||
DatanodeDescriptor srcNode,
|
||||
List<DatanodeDescriptor> containingNodes,
|
||||
List<DatanodeStorageInfo> liveReplicaStorages,
|
||||
int additionalReplRequired,
|
||||
int priority) {
|
||||
this.block = block;
|
||||
this.bc = bc;
|
||||
this.srcNode = srcNode;
|
||||
this.srcNode.incrementPendingReplicationWithoutTargets();
|
||||
this.containingNodes = containingNodes;
|
||||
this.liveReplicaStorages = liveReplicaStorages;
|
||||
this.additionalReplRequired = additionalReplRequired;
|
||||
this.priority = priority;
|
||||
this.targets = null;
|
||||
}
|
||||
|
||||
private void chooseTargets(BlockPlacementPolicy blockplacement,
|
||||
BlockStoragePolicySuite storagePolicySuite,
|
||||
Set<Node> excludedNodes) {
|
||||
try {
|
||||
targets = blockplacement.chooseTarget(bc.getName(),
|
||||
additionalReplRequired, srcNode, liveReplicaStorages, false,
|
||||
excludedNodes, block.getNumBytes(),
|
||||
storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
|
||||
} finally {
|
||||
srcNode.decrementPendingReplicationWithoutTargets();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A simple result enum for the result of
|
||||
* {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
|
||||
|
@ -3855,9 +3771,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
OVER_REPLICATED,
|
||||
/** A decision can't currently be made about this block. */
|
||||
POSTPONE,
|
||||
/** The block is under construction, so should be ignored */
|
||||
/** The block is under construction, so should be ignored. */
|
||||
UNDER_CONSTRUCTION,
|
||||
/** The block is properly replicated */
|
||||
/** The block is properly replicated. */
|
||||
OK
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
|
||||
* list of blocks that should be considered corrupt due to a block report.
|
||||
*/
|
||||
class BlockToMarkCorrupt {
|
||||
/** The corrupted block in a datanode. */
|
||||
private final BlockInfo corrupted;
|
||||
/** The corresponding block stored in the BlockManager. */
|
||||
private final BlockInfo stored;
|
||||
/** The reason to mark corrupt. */
|
||||
private final String reason;
|
||||
/** The reason code to be stored */
|
||||
private final CorruptReplicasMap.Reason reasonCode;
|
||||
|
||||
BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason,
|
||||
CorruptReplicasMap.Reason reasonCode) {
|
||||
Preconditions.checkNotNull(corrupted, "corrupted is null");
|
||||
Preconditions.checkNotNull(stored, "stored is null");
|
||||
|
||||
this.corrupted = corrupted;
|
||||
this.stored = stored;
|
||||
this.reason = reason;
|
||||
this.reasonCode = reasonCode;
|
||||
}
|
||||
|
||||
BlockToMarkCorrupt(BlockInfo stored, String reason,
|
||||
CorruptReplicasMap.Reason reasonCode) {
|
||||
this(stored, stored, reason, reasonCode);
|
||||
}
|
||||
|
||||
BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
|
||||
CorruptReplicasMap.Reason reasonCode) {
|
||||
this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
|
||||
reason, reasonCode);
|
||||
//the corrupted block in datanode has a different generation stamp
|
||||
corrupted.setGenerationStamp(gs);
|
||||
}
|
||||
|
||||
public boolean isCorruptedDuringWrite() {
|
||||
return stored.getGenerationStamp() > corrupted.getGenerationStamp();
|
||||
}
|
||||
|
||||
public BlockInfo getCorrupted() {
|
||||
return corrupted;
|
||||
}
|
||||
|
||||
public BlockInfo getStored() {
|
||||
return stored;
|
||||
}
|
||||
|
||||
public String getReason() {
|
||||
return reason;
|
||||
}
|
||||
|
||||
public Reason getReasonCode() {
|
||||
return reasonCode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return corrupted + "("
|
||||
+ (corrupted == stored ? "same as stored": "stored=" + stored) + ")";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,87 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import org.apache.hadoop.net.Node;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
class ReplicationWork {
|
||||
private final BlockInfo block;
|
||||
private final BlockCollection bc;
|
||||
private final DatanodeDescriptor srcNode;
|
||||
private final int additionalReplRequired;
|
||||
private final int priority;
|
||||
private final List<DatanodeDescriptor> containingNodes;
|
||||
private final List<DatanodeStorageInfo> liveReplicaStorages;
|
||||
private DatanodeStorageInfo[] targets;
|
||||
|
||||
public ReplicationWork(BlockInfo block, BlockCollection bc,
|
||||
DatanodeDescriptor srcNode, List<DatanodeDescriptor> containingNodes,
|
||||
List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired,
|
||||
int priority) {
|
||||
this.block = block;
|
||||
this.bc = bc;
|
||||
this.srcNode = srcNode;
|
||||
this.srcNode.incrementPendingReplicationWithoutTargets();
|
||||
this.containingNodes = containingNodes;
|
||||
this.liveReplicaStorages = liveReplicaStorages;
|
||||
this.additionalReplRequired = additionalReplRequired;
|
||||
this.priority = priority;
|
||||
this.targets = null;
|
||||
}
|
||||
|
||||
void chooseTargets(BlockPlacementPolicy blockplacement,
|
||||
BlockStoragePolicySuite storagePolicySuite,
|
||||
Set<Node> excludedNodes) {
|
||||
try {
|
||||
targets = blockplacement.chooseTarget(bc.getName(),
|
||||
additionalReplRequired, srcNode, liveReplicaStorages, false,
|
||||
excludedNodes, block.getNumBytes(),
|
||||
storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
|
||||
} finally {
|
||||
srcNode.decrementPendingReplicationWithoutTargets();
|
||||
}
|
||||
}
|
||||
|
||||
DatanodeStorageInfo[] getTargets() {
|
||||
return targets;
|
||||
}
|
||||
|
||||
void resetTargets() {
|
||||
this.targets = null;
|
||||
}
|
||||
|
||||
List<DatanodeDescriptor> getContainingNodes() {
|
||||
return Collections.unmodifiableList(containingNodes);
|
||||
}
|
||||
|
||||
public int getPriority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
public BlockInfo getBlock() {
|
||||
return block;
|
||||
}
|
||||
|
||||
public DatanodeDescriptor getSrcNode() {
|
||||
return srcNode;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue