HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone classes from BlockManager. Contributed by Mingliang Liu.

This commit is contained in:
Haohui Mai 2015-08-27 15:36:44 -07:00
parent a9c8ea71aa
commit 4e9307f26d
4 changed files with 319 additions and 226 deletions

View File

@ -853,6 +853,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8962. Clean up checkstyle warnings in o.a.h.hdfs.DfsClientConf. HDFS-8962. Clean up checkstyle warnings in o.a.h.hdfs.DfsClientConf.
(Mingliang Liu via wheat9) (Mingliang Liu via wheat9)
HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone
classes from BlockManager. (Mingliang Liu via wheat9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -1181,24 +1181,24 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
DatanodeStorageInfo storageInfo, DatanodeStorageInfo storageInfo,
DatanodeDescriptor node) throws IOException { DatanodeDescriptor node) throws IOException {
if (b.corrupted.isDeleted()) { if (b.getCorrupted().isDeleted()) {
blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" + blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
" corrupt as it does not belong to any file", b); " corrupt as it does not belong to any file", b);
addToInvalidates(b.corrupted, node); addToInvalidates(b.getCorrupted(), node);
return; return;
} }
short expectedReplicas = b.corrupted.getReplication(); short expectedReplicas = b.getCorrupted().getReplication();
// Add replica to the data-node if it is not already there // Add replica to the data-node if it is not already there
if (storageInfo != null) { if (storageInfo != null) {
storageInfo.addBlock(b.stored); storageInfo.addBlock(b.getStored());
} }
// Add this replica to corruptReplicas Map // Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason, corruptReplicas.addToCorruptReplicasMap(b.getCorrupted(), node,
b.reasonCode); b.getReason(), b.getReasonCode());
NumberReplicas numberOfReplicas = countNodes(b.stored); NumberReplicas numberOfReplicas = countNodes(b.getStored());
boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
expectedReplicas; expectedReplicas;
boolean minReplicationSatisfied = boolean minReplicationSatisfied =
@ -1207,7 +1207,7 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) > (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
expectedReplicas; expectedReplicas;
boolean corruptedDuringWrite = minReplicationSatisfied && boolean corruptedDuringWrite = minReplicationSatisfied &&
(b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp()); b.isCorruptedDuringWrite();
// case 1: have enough number of live replicas // case 1: have enough number of live replicas
// case 2: corrupted replicas + live replicas > Replication factor // case 2: corrupted replicas + live replicas > Replication factor
// case 3: Block is marked corrupt due to failure while writing. In this // case 3: Block is marked corrupt due to failure while writing. In this
@ -1220,7 +1220,7 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
invalidateBlock(b, node); invalidateBlock(b, node);
} else if (namesystem.isPopulatingReplQueues()) { } else if (namesystem.isPopulatingReplQueues()) {
// add the block to neededReplication // add the block to neededReplication
updateNeededReplications(b.stored, -1, 0); updateNeededReplications(b.getStored(), -1, 0);
} }
} }
@ -1239,18 +1239,18 @@ private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
} }
// Check how many copies we have of the block // Check how many copies we have of the block
NumberReplicas nr = countNodes(b.stored); NumberReplicas nr = countNodes(b.getStored());
if (nr.replicasOnStaleNodes() > 0) { if (nr.replicasOnStaleNodes() > 0) {
blockLog.debug("BLOCK* invalidateBlocks: postponing " + blockLog.debug("BLOCK* invalidateBlocks: postponing " +
"invalidation of {} on {} because {} replica(s) are located on " + "invalidation of {} on {} because {} replica(s) are located on " +
"nodes with potentially out-of-date block reports", b, dn, "nodes with potentially out-of-date block reports", b, dn,
nr.replicasOnStaleNodes()); nr.replicasOnStaleNodes());
postponeBlock(b.corrupted); postponeBlock(b.getCorrupted());
return false; return false;
} else if (nr.liveReplicas() >= 1) { } else if (nr.liveReplicas() >= 1) {
// If we have at least one copy on a live node, then we can delete it. // If we have at least one copy on a live node, then we can delete it.
addToInvalidates(b.corrupted, dn); addToInvalidates(b.getCorrupted(), dn);
removeStoredBlock(b.stored, node); removeStoredBlock(b.getStored(), node);
blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.", blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
b, dn); b, dn);
return true; return true;
@ -1338,71 +1338,18 @@ int computeReplicationWork(int blocksToProcess) {
*/ */
@VisibleForTesting @VisibleForTesting
int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) { int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes;
DatanodeDescriptor srcNode;
BlockCollection bc = null;
int additionalReplRequired;
int scheduledWork = 0; int scheduledWork = 0;
List<ReplicationWork> work = new LinkedList<ReplicationWork>(); final List<ReplicationWork> work = new LinkedList<>();
namesystem.writeLock(); namesystem.writeLock();
try { try {
synchronized (neededReplications) { synchronized (neededReplications) {
for (int priority = 0; priority < blocksToReplicate.size(); priority++) { for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
for (BlockInfo block : blocksToReplicate.get(priority)) { for (BlockInfo block : blocksToReplicate.get(priority)) {
// block should belong to a file ReplicationWork rw = scheduleReplication(block, priority);
bc = getBlockCollection(block); if (rw != null) {
// abandoned block or block reopened for append work.add(rw);
if (bc == null
|| (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
// remove from neededReplications
neededReplications.remove(block, priority);
continue;
} }
requiredReplication = getExpectedReplicaNum(block);
// get a source data-node
containingNodes = new ArrayList<DatanodeDescriptor>();
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
NumberReplicas numReplicas = new NumberReplicas();
srcNode = chooseSourceDatanode(
block, containingNodes, liveReplicaNodes, numReplicas,
priority);
if(srcNode == null) { // block can not be replicated from any node
LOG.debug("Block " + block + " cannot be repl from any node");
continue;
}
// liveReplicaNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
// do not schedule more if enough replicas is already pending
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block);
continue;
}
}
if (numReplicas.liveReplicas() < requiredReplication) {
additionalReplRequired = requiredReplication
- numEffectiveReplicas;
} else {
additionalReplRequired = 1; // Needed on a new rack
}
work.add(new ReplicationWork(block, bc, srcNode,
containingNodes, liveReplicaNodes, additionalReplRequired,
priority));
} }
} }
} }
@ -1410,12 +1357,12 @@ int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
namesystem.writeUnlock(); namesystem.writeUnlock();
} }
final Set<Node> excludedNodes = new HashSet<Node>(); final Set<Node> excludedNodes = new HashSet<>();
for(ReplicationWork rw : work){ for(ReplicationWork rw : work){
// Exclude all of the containing nodes from being targets. // Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes. // This list includes decommissioning or corrupt nodes.
excludedNodes.clear(); excludedNodes.clear();
for (DatanodeDescriptor dn : rw.containingNodes) { for (DatanodeDescriptor dn : rw.getContainingNodes()) {
excludedNodes.add(dn); excludedNodes.add(dn);
} }
@ -1428,67 +1375,15 @@ int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
namesystem.writeLock(); namesystem.writeLock();
try { try {
for(ReplicationWork rw : work){ for(ReplicationWork rw : work){
final DatanodeStorageInfo[] targets = rw.targets; final DatanodeStorageInfo[] targets = rw.getTargets();
if(targets == null || targets.length == 0){ if(targets == null || targets.length == 0){
rw.targets = null; rw.resetTargets();
continue; continue;
} }
synchronized (neededReplications) { synchronized (neededReplications) {
BlockInfo block = rw.block; if (validateReplicationWork(rw)) {
int priority = rw.priority;
// Recheck since global lock was released
// block should belong to a file
bc = getBlockCollection(block);
// abandoned block or block reopened for append
if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
neededReplications.remove(block, priority); // remove from neededReplications
rw.targets = null;
continue;
}
requiredReplication = getExpectedReplicaNum(block);
// do not schedule more if enough replicas is already pending
NumberReplicas numReplicas = countNodes(block);
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
rw.targets = null;
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block);
continue;
}
}
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
(!blockHasEnoughRacks(block)) ) {
if (rw.srcNode.getNetworkLocation().equals(
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
//No use continuing, unless a new rack in this case
continue;
}
}
// Add block to the to be replicated list
rw.srcNode.addBlockToBeReplicated(block, targets);
scheduledWork++; scheduledWork++;
DatanodeStorageInfo.incrementBlocksScheduled(targets);
// Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry
// replications that fail after an appropriate amount of time.
pendingReplications.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors(targets));
blockLog.debug("BLOCK* block {} is moved from neededReplications to "
+ "pendingReplications", block);
// remove from neededReplications
if(numEffectiveReplicas + targets.length >= requiredReplication) {
neededReplications.remove(block, priority); // remove from neededReplications
} }
} }
} }
@ -1499,15 +1394,15 @@ int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
if (blockLog.isInfoEnabled()) { if (blockLog.isInfoEnabled()) {
// log which blocks have been scheduled for replication // log which blocks have been scheduled for replication
for(ReplicationWork rw : work){ for(ReplicationWork rw : work){
DatanodeStorageInfo[] targets = rw.targets; DatanodeStorageInfo[] targets = rw.getTargets();
if (targets != null && targets.length != 0) { if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)"); StringBuilder targetList = new StringBuilder("datanode(s)");
for (int k = 0; k < targets.length; k++) { for (DatanodeStorageInfo target : targets) {
targetList.append(' '); targetList.append(' ');
targetList.append(targets[k].getDatanodeDescriptor()); targetList.append(target.getDatanodeDescriptor());
} }
blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNode, blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNode(),
rw.block, targetList); rw.getBlock(), targetList);
} }
} }
} }
@ -1519,6 +1414,118 @@ int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
return scheduledWork; return scheduledWork;
} }
boolean hasEnoughEffectiveReplicas(BlockInfo block,
NumberReplicas numReplicas, int pendingReplicaNum, int required) {
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
return (numEffectiveReplicas >= required) &&
(pendingReplicaNum > 0 || blockHasEnoughRacks(block));
}
private ReplicationWork scheduleReplication(BlockInfo block, int priority) {
// block should belong to a file
BlockCollection bc = getBlockCollection(block);
// abandoned block or block reopened for append
if (bc == null
|| (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
// remove from neededReplications
neededReplications.remove(block, priority);
return null;
}
short requiredReplication = getExpectedReplicaNum(block);
// get a source data-node
List<DatanodeDescriptor> containingNodes = new ArrayList<>();
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
NumberReplicas numReplicas = new NumberReplicas();
DatanodeDescriptor srcNode = chooseSourceDatanode(block, containingNodes,
liveReplicaNodes, numReplicas, priority);
if (srcNode == null) { // block can not be replicated from any node
LOG.debug("Block " + block + " cannot be repl from any node");
return null;
}
// liveReplicaNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
int pendingNum = pendingReplications.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
requiredReplication)) {
neededReplications.remove(block, priority);
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block);
return null;
}
final int additionalReplRequired;
if (numReplicas.liveReplicas() < requiredReplication) {
additionalReplRequired = requiredReplication - numReplicas.liveReplicas()
- pendingNum;
} else {
additionalReplRequired = 1; // Needed on a new rack
}
return new ReplicationWork(block, bc, srcNode, containingNodes,
liveReplicaNodes, additionalReplRequired, priority);
}
private boolean validateReplicationWork(ReplicationWork rw) {
BlockInfo block = rw.getBlock();
int priority = rw.getPriority();
// Recheck since global lock was released
// block should belong to a file
BlockCollection bc = getBlockCollection(block);
// abandoned block or block reopened for append
if(bc == null
|| (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
neededReplications.remove(block, priority);
rw.resetTargets();
return false;
}
// do not schedule more if enough replicas is already pending
final short requiredReplication = getExpectedReplicaNum(block);
NumberReplicas numReplicas = countNodes(block);
final int pendingNum = pendingReplications.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
requiredReplication)) {
neededReplications.remove(block, priority);
rw.resetTargets();
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block);
return false;
}
DatanodeStorageInfo[] targets = rw.getTargets();
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
(!blockHasEnoughRacks(block)) ) {
if (rw.getSrcNode().getNetworkLocation().equals(
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
//No use continuing, unless a new rack in this case
return false;
}
}
// Add block to the to be replicated list
rw.getSrcNode().addBlockToBeReplicated(block, targets);
DatanodeStorageInfo.incrementBlocksScheduled(targets);
// Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry
// replications that fail after an appropriate amount of time.
pendingReplications.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors(targets));
blockLog.debug("BLOCK* block {} is moved from neededReplications to "
+ "pendingReplications", block);
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
// remove from neededReplications
if(numEffectiveReplicas + targets.length >= requiredReplication) {
neededReplications.remove(block, priority);
}
return true;
}
/** Choose target for WebHDFS redirection. */ /** Choose target for WebHDFS redirection. */
public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src, public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) { DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
@ -1766,52 +1773,6 @@ static class StatefulBlockInfo {
} }
} }
/**
* BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
* list of blocks that should be considered corrupt due to a block report.
*/
private static class BlockToMarkCorrupt {
/** The corrupted block in a datanode. */
final BlockInfo corrupted;
/** The corresponding block stored in the BlockManager. */
final BlockInfo stored;
/** The reason to mark corrupt. */
final String reason;
/** The reason code to be stored */
final Reason reasonCode;
BlockToMarkCorrupt(BlockInfo corrupted,
BlockInfo stored, String reason,
Reason reasonCode) {
Preconditions.checkNotNull(corrupted, "corrupted is null");
Preconditions.checkNotNull(stored, "stored is null");
this.corrupted = corrupted;
this.stored = stored;
this.reason = reason;
this.reasonCode = reasonCode;
}
BlockToMarkCorrupt(BlockInfo stored, String reason,
Reason reasonCode) {
this(stored, stored, reason, reasonCode);
}
BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
Reason reasonCode) {
this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
reason, reasonCode);
//the corrupted block in datanode has a different generation stamp
corrupted.setGenerationStamp(gs);
}
@Override
public String toString() {
return corrupted + "("
+ (corrupted == stored? "same as stored": "stored=" + stored) + ")";
}
}
/** /**
* The given storage is reporting all its blocks. * The given storage is reporting all its blocks.
* Update the (storage-->block list) and (block-->storage list) maps. * Update the (storage-->block list) and (block-->storage list) maps.
@ -3797,51 +3758,6 @@ public static LocatedBlock newLocatedBlock(
null); null);
} }
private static class ReplicationWork {
private final BlockInfo block;
private final BlockCollection bc;
private final DatanodeDescriptor srcNode;
private final List<DatanodeDescriptor> containingNodes;
private final List<DatanodeStorageInfo> liveReplicaStorages;
private final int additionalReplRequired;
private DatanodeStorageInfo targets[];
private final int priority;
public ReplicationWork(BlockInfo block,
BlockCollection bc,
DatanodeDescriptor srcNode,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> liveReplicaStorages,
int additionalReplRequired,
int priority) {
this.block = block;
this.bc = bc;
this.srcNode = srcNode;
this.srcNode.incrementPendingReplicationWithoutTargets();
this.containingNodes = containingNodes;
this.liveReplicaStorages = liveReplicaStorages;
this.additionalReplRequired = additionalReplRequired;
this.priority = priority;
this.targets = null;
}
private void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicySuite storagePolicySuite,
Set<Node> excludedNodes) {
try {
targets = blockplacement.chooseTarget(bc.getName(),
additionalReplRequired, srcNode, liveReplicaStorages, false,
excludedNodes, block.getNumBytes(),
storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
} finally {
srcNode.decrementPendingReplicationWithoutTargets();
}
}
}
/** /**
* A simple result enum for the result of * A simple result enum for the result of
* {@link BlockManager#processMisReplicatedBlock(BlockInfo)}. * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
@ -3855,9 +3771,9 @@ enum MisReplicationResult {
OVER_REPLICATED, OVER_REPLICATED,
/** A decision can't currently be made about this block. */ /** A decision can't currently be made about this block. */
POSTPONE, POSTPONE,
/** The block is under construction, so should be ignored */ /** The block is under construction, so should be ignored. */
UNDER_CONSTRUCTION, UNDER_CONSTRUCTION,
/** The block is properly replicated */ /** The block is properly replicated. */
OK OK
} }

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import com.google.common.base.Preconditions;
/**
* BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
* list of blocks that should be considered corrupt due to a block report.
*/
class BlockToMarkCorrupt {
/** The corrupted block in a datanode. */
private final BlockInfo corrupted;
/** The corresponding block stored in the BlockManager. */
private final BlockInfo stored;
/** The reason to mark corrupt. */
private final String reason;
/** The reason code to be stored */
private final CorruptReplicasMap.Reason reasonCode;
BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason,
CorruptReplicasMap.Reason reasonCode) {
Preconditions.checkNotNull(corrupted, "corrupted is null");
Preconditions.checkNotNull(stored, "stored is null");
this.corrupted = corrupted;
this.stored = stored;
this.reason = reason;
this.reasonCode = reasonCode;
}
BlockToMarkCorrupt(BlockInfo stored, String reason,
CorruptReplicasMap.Reason reasonCode) {
this(stored, stored, reason, reasonCode);
}
BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
CorruptReplicasMap.Reason reasonCode) {
this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
reason, reasonCode);
//the corrupted block in datanode has a different generation stamp
corrupted.setGenerationStamp(gs);
}
public boolean isCorruptedDuringWrite() {
return stored.getGenerationStamp() > corrupted.getGenerationStamp();
}
public BlockInfo getCorrupted() {
return corrupted;
}
public BlockInfo getStored() {
return stored;
}
public String getReason() {
return reason;
}
public Reason getReasonCode() {
return reasonCode;
}
@Override
public String toString() {
return corrupted + "("
+ (corrupted == stored ? "same as stored": "stored=" + stored) + ")";
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.net.Node;
import java.util.Collections;
import java.util.List;
import java.util.Set;
class ReplicationWork {
private final BlockInfo block;
private final BlockCollection bc;
private final DatanodeDescriptor srcNode;
private final int additionalReplRequired;
private final int priority;
private final List<DatanodeDescriptor> containingNodes;
private final List<DatanodeStorageInfo> liveReplicaStorages;
private DatanodeStorageInfo[] targets;
public ReplicationWork(BlockInfo block, BlockCollection bc,
DatanodeDescriptor srcNode, List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired,
int priority) {
this.block = block;
this.bc = bc;
this.srcNode = srcNode;
this.srcNode.incrementPendingReplicationWithoutTargets();
this.containingNodes = containingNodes;
this.liveReplicaStorages = liveReplicaStorages;
this.additionalReplRequired = additionalReplRequired;
this.priority = priority;
this.targets = null;
}
void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicySuite storagePolicySuite,
Set<Node> excludedNodes) {
try {
targets = blockplacement.chooseTarget(bc.getName(),
additionalReplRequired, srcNode, liveReplicaStorages, false,
excludedNodes, block.getNumBytes(),
storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
} finally {
srcNode.decrementPendingReplicationWithoutTargets();
}
}
DatanodeStorageInfo[] getTargets() {
return targets;
}
void resetTargets() {
this.targets = null;
}
List<DatanodeDescriptor> getContainingNodes() {
return Collections.unmodifiableList(containingNodes);
}
public int getPriority() {
return priority;
}
public BlockInfo getBlock() {
return block;
}
public DatanodeDescriptor getSrcNode() {
return srcNode;
}
}