From 0ae19f76f428264285073b151bf1e340a5d253ef Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Thu, 27 Aug 2015 15:36:44 -0700 Subject: [PATCH] HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone classes from BlockManager. Contributed by Mingliang Liu. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockManager.java | 366 +++++++----------- .../blockmanagement/BlockToMarkCorrupt.java | 87 +++++ .../blockmanagement/ReplicationWork.java | 87 +++++ 4 files changed, 319 insertions(+), 224 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a6c0e74211a..e523b27e4ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -508,6 +508,9 @@ Release 2.8.0 - UNRELEASED HDFS-8962. Clean up checkstyle warnings in o.a.h.hdfs.DfsClientConf. (Mingliang Liu via wheat9) + HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone + classes from BlockManager. (Mingliang Liu via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index e60ffde70bd..389b78c15b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1172,24 +1172,24 @@ public class BlockManager implements BlockStatsMXBean { DatanodeStorageInfo storageInfo, DatanodeDescriptor node) throws IOException { - if (b.corrupted.isDeleted()) { + if (b.getCorrupted().isDeleted()) { blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" + " corrupt as it does not belong to any file", b); - addToInvalidates(b.corrupted, node); + addToInvalidates(b.getCorrupted(), node); return; } - short expectedReplicas = b.corrupted.getReplication(); + short expectedReplicas = b.getCorrupted().getReplication(); // Add replica to the data-node if it is not already there if (storageInfo != null) { - storageInfo.addBlock(b.stored); + storageInfo.addBlock(b.getStored()); } // Add this replica to corruptReplicas Map - corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason, - b.reasonCode); + corruptReplicas.addToCorruptReplicasMap(b.getCorrupted(), node, + b.getReason(), b.getReasonCode()); - NumberReplicas numberOfReplicas = countNodes(b.stored); + NumberReplicas numberOfReplicas = countNodes(b.getStored()); boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= expectedReplicas; boolean minReplicationSatisfied = @@ -1198,7 +1198,7 @@ public class BlockManager implements BlockStatsMXBean { (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) > expectedReplicas; boolean corruptedDuringWrite = minReplicationSatisfied && - (b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp()); + b.isCorruptedDuringWrite(); // case 1: have enough number of live replicas // case 2: corrupted replicas + live replicas > Replication factor // case 3: Block is marked corrupt due to failure while writing. In this @@ -1211,7 +1211,7 @@ public class BlockManager implements BlockStatsMXBean { invalidateBlock(b, node); } else if (namesystem.isPopulatingReplQueues()) { // add the block to neededReplication - updateNeededReplications(b.stored, -1, 0); + updateNeededReplications(b.getStored(), -1, 0); } } @@ -1230,18 +1230,18 @@ public class BlockManager implements BlockStatsMXBean { } // Check how many copies we have of the block - NumberReplicas nr = countNodes(b.stored); + NumberReplicas nr = countNodes(b.getStored()); if (nr.replicasOnStaleNodes() > 0) { blockLog.debug("BLOCK* invalidateBlocks: postponing " + "invalidation of {} on {} because {} replica(s) are located on " + "nodes with potentially out-of-date block reports", b, dn, nr.replicasOnStaleNodes()); - postponeBlock(b.corrupted); + postponeBlock(b.getCorrupted()); return false; } else if (nr.liveReplicas() >= 1) { // If we have at least one copy on a live node, then we can delete it. - addToInvalidates(b.corrupted, dn); - removeStoredBlock(b.stored, node); + addToInvalidates(b.getCorrupted(), dn); + removeStoredBlock(b.getStored(), node); blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.", b, dn); return true; @@ -1329,69 +1329,18 @@ public class BlockManager implements BlockStatsMXBean { */ @VisibleForTesting int computeReplicationWorkForBlocks(List> blocksToReplicate) { - int requiredReplication, numEffectiveReplicas; - List containingNodes; - DatanodeDescriptor srcNode; - BlockCollection bc = null; - int additionalReplRequired; - int scheduledWork = 0; - List work = new LinkedList(); + final List work = new LinkedList<>(); namesystem.writeLock(); try { synchronized (neededReplications) { for (int priority = 0; priority < blocksToReplicate.size(); priority++) { for (BlockInfo block : blocksToReplicate.get(priority)) { - // block should belong to a file - bc = getBlockCollection(block); - // abandoned block or block reopened for append - if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { - neededReplications.remove(block, priority); // remove from neededReplications - continue; + ReplicationWork rw = scheduleReplication(block, priority); + if (rw != null) { + work.add(rw); } - - requiredReplication = getExpectedReplicaNum(block); - - // get a source data-node - containingNodes = new ArrayList(); - List liveReplicaNodes = new ArrayList(); - NumberReplicas numReplicas = new NumberReplicas(); - srcNode = chooseSourceDatanode( - block, containingNodes, liveReplicaNodes, numReplicas, - priority); - if(srcNode == null) { // block can not be replicated from any node - LOG.debug("Block " + block + " cannot be repl from any node"); - continue; - } - - // liveReplicaNodes can include READ_ONLY_SHARED replicas which are - // not included in the numReplicas.liveReplicas() count - assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); - - // do not schedule more if enough replicas is already pending - numEffectiveReplicas = numReplicas.liveReplicas() + - pendingReplications.getNumReplicas(block); - - if (numEffectiveReplicas >= requiredReplication) { - if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { - neededReplications.remove(block, priority); // remove from neededReplications - blockLog.debug("BLOCK* Removing {} from neededReplications as" + - " it has enough replicas", block); - continue; - } - } - - if (numReplicas.liveReplicas() < requiredReplication) { - additionalReplRequired = requiredReplication - - numEffectiveReplicas; - } else { - additionalReplRequired = 1; // Needed on a new rack - } - work.add(new ReplicationWork(block, bc, srcNode, - containingNodes, liveReplicaNodes, additionalReplRequired, - priority)); } } } @@ -1399,12 +1348,12 @@ public class BlockManager implements BlockStatsMXBean { namesystem.writeUnlock(); } - final Set excludedNodes = new HashSet(); + final Set excludedNodes = new HashSet<>(); for(ReplicationWork rw : work){ // Exclude all of the containing nodes from being targets. // This list includes decommissioning or corrupt nodes. excludedNodes.clear(); - for (DatanodeDescriptor dn : rw.containingNodes) { + for (DatanodeDescriptor dn : rw.getContainingNodes()) { excludedNodes.add(dn); } @@ -1417,67 +1366,15 @@ public class BlockManager implements BlockStatsMXBean { namesystem.writeLock(); try { for(ReplicationWork rw : work){ - final DatanodeStorageInfo[] targets = rw.targets; + final DatanodeStorageInfo[] targets = rw.getTargets(); if(targets == null || targets.length == 0){ - rw.targets = null; + rw.resetTargets(); continue; } synchronized (neededReplications) { - BlockInfo block = rw.block; - int priority = rw.priority; - // Recheck since global lock was released - // block should belong to a file - bc = getBlockCollection(block); - // abandoned block or block reopened for append - if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { - neededReplications.remove(block, priority); // remove from neededReplications - rw.targets = null; - continue; - } - requiredReplication = getExpectedReplicaNum(block); - - // do not schedule more if enough replicas is already pending - NumberReplicas numReplicas = countNodes(block); - numEffectiveReplicas = numReplicas.liveReplicas() + - pendingReplications.getNumReplicas(block); - - if (numEffectiveReplicas >= requiredReplication) { - if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { - neededReplications.remove(block, priority); // remove from neededReplications - rw.targets = null; - blockLog.debug("BLOCK* Removing {} from neededReplications as" + - " it has enough replicas", block); - continue; - } - } - - if ( (numReplicas.liveReplicas() >= requiredReplication) && - (!blockHasEnoughRacks(block)) ) { - if (rw.srcNode.getNetworkLocation().equals( - targets[0].getDatanodeDescriptor().getNetworkLocation())) { - //No use continuing, unless a new rack in this case - continue; - } - } - - // Add block to the to be replicated list - rw.srcNode.addBlockToBeReplicated(block, targets); - scheduledWork++; - DatanodeStorageInfo.incrementBlocksScheduled(targets); - - // Move the block-replication into a "pending" state. - // The reason we use 'pending' is so we can retry - // replications that fail after an appropriate amount of time. - pendingReplications.increment(block, - DatanodeStorageInfo.toDatanodeDescriptors(targets)); - blockLog.debug("BLOCK* block {} is moved from neededReplications to " - + "pendingReplications", block); - - // remove from neededReplications - if(numEffectiveReplicas + targets.length >= requiredReplication) { - neededReplications.remove(block, priority); // remove from neededReplications + if (validateReplicationWork(rw)) { + scheduledWork++; } } } @@ -1488,15 +1385,15 @@ public class BlockManager implements BlockStatsMXBean { if (blockLog.isInfoEnabled()) { // log which blocks have been scheduled for replication for(ReplicationWork rw : work){ - DatanodeStorageInfo[] targets = rw.targets; + DatanodeStorageInfo[] targets = rw.getTargets(); if (targets != null && targets.length != 0) { StringBuilder targetList = new StringBuilder("datanode(s)"); - for (int k = 0; k < targets.length; k++) { + for (DatanodeStorageInfo target : targets) { targetList.append(' '); - targetList.append(targets[k].getDatanodeDescriptor()); + targetList.append(target.getDatanodeDescriptor()); } - blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNode, - rw.block, targetList); + blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNode(), + rw.getBlock(), targetList); } } } @@ -1508,6 +1405,118 @@ public class BlockManager implements BlockStatsMXBean { return scheduledWork; } + boolean hasEnoughEffectiveReplicas(BlockInfo block, + NumberReplicas numReplicas, int pendingReplicaNum, int required) { + int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum; + return (numEffectiveReplicas >= required) && + (pendingReplicaNum > 0 || blockHasEnoughRacks(block)); + } + + private ReplicationWork scheduleReplication(BlockInfo block, int priority) { + // block should belong to a file + BlockCollection bc = getBlockCollection(block); + // abandoned block or block reopened for append + if (bc == null + || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { + // remove from neededReplications + neededReplications.remove(block, priority); + return null; + } + + short requiredReplication = getExpectedReplicaNum(block); + + // get a source data-node + List containingNodes = new ArrayList<>(); + List liveReplicaNodes = new ArrayList<>(); + NumberReplicas numReplicas = new NumberReplicas(); + DatanodeDescriptor srcNode = chooseSourceDatanode(block, containingNodes, + liveReplicaNodes, numReplicas, priority); + if (srcNode == null) { // block can not be replicated from any node + LOG.debug("Block " + block + " cannot be repl from any node"); + return null; + } + + // liveReplicaNodes can include READ_ONLY_SHARED replicas which are + // not included in the numReplicas.liveReplicas() count + assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); + + int pendingNum = pendingReplications.getNumReplicas(block); + if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, + requiredReplication)) { + neededReplications.remove(block, priority); + blockLog.debug("BLOCK* Removing {} from neededReplications as" + + " it has enough replicas", block); + return null; + } + + final int additionalReplRequired; + if (numReplicas.liveReplicas() < requiredReplication) { + additionalReplRequired = requiredReplication - numReplicas.liveReplicas() + - pendingNum; + } else { + additionalReplRequired = 1; // Needed on a new rack + } + return new ReplicationWork(block, bc, srcNode, containingNodes, + liveReplicaNodes, additionalReplRequired, priority); + } + + private boolean validateReplicationWork(ReplicationWork rw) { + BlockInfo block = rw.getBlock(); + int priority = rw.getPriority(); + // Recheck since global lock was released + // block should belong to a file + BlockCollection bc = getBlockCollection(block); + // abandoned block or block reopened for append + if(bc == null + || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { + neededReplications.remove(block, priority); + rw.resetTargets(); + return false; + } + + // do not schedule more if enough replicas is already pending + final short requiredReplication = getExpectedReplicaNum(block); + NumberReplicas numReplicas = countNodes(block); + final int pendingNum = pendingReplications.getNumReplicas(block); + if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, + requiredReplication)) { + neededReplications.remove(block, priority); + rw.resetTargets(); + blockLog.debug("BLOCK* Removing {} from neededReplications as" + + " it has enough replicas", block); + return false; + } + + DatanodeStorageInfo[] targets = rw.getTargets(); + if ( (numReplicas.liveReplicas() >= requiredReplication) && + (!blockHasEnoughRacks(block)) ) { + if (rw.getSrcNode().getNetworkLocation().equals( + targets[0].getDatanodeDescriptor().getNetworkLocation())) { + //No use continuing, unless a new rack in this case + return false; + } + } + + // Add block to the to be replicated list + rw.getSrcNode().addBlockToBeReplicated(block, targets); + DatanodeStorageInfo.incrementBlocksScheduled(targets); + + // Move the block-replication into a "pending" state. + // The reason we use 'pending' is so we can retry + // replications that fail after an appropriate amount of time. + pendingReplications.increment(block, + DatanodeStorageInfo.toDatanodeDescriptors(targets)); + blockLog.debug("BLOCK* block {} is moved from neededReplications to " + + "pendingReplications", block); + + int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum; + // remove from neededReplications + if(numEffectiveReplicas + targets.length >= requiredReplication) { + neededReplications.remove(block, priority); + } + return true; + } + /** Choose target for WebHDFS redirection. */ public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src, DatanodeDescriptor clientnode, Set excludes, long blocksize) { @@ -1754,52 +1763,6 @@ public class BlockManager implements BlockStatsMXBean { this.reportedState = reportedState; } } - - /** - * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a - * list of blocks that should be considered corrupt due to a block report. - */ - private static class BlockToMarkCorrupt { - /** The corrupted block in a datanode. */ - final BlockInfo corrupted; - /** The corresponding block stored in the BlockManager. */ - final BlockInfo stored; - /** The reason to mark corrupt. */ - final String reason; - /** The reason code to be stored */ - final Reason reasonCode; - - BlockToMarkCorrupt(BlockInfo corrupted, - BlockInfo stored, String reason, - Reason reasonCode) { - Preconditions.checkNotNull(corrupted, "corrupted is null"); - Preconditions.checkNotNull(stored, "stored is null"); - - this.corrupted = corrupted; - this.stored = stored; - this.reason = reason; - this.reasonCode = reasonCode; - } - - BlockToMarkCorrupt(BlockInfo stored, String reason, - Reason reasonCode) { - this(stored, stored, reason, reasonCode); - } - - BlockToMarkCorrupt(BlockInfo stored, long gs, String reason, - Reason reasonCode) { - this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored, - reason, reasonCode); - //the corrupted block in datanode has a different generation stamp - corrupted.setGenerationStamp(gs); - } - - @Override - public String toString() { - return corrupted + "(" - + (corrupted == stored? "same as stored": "stored=" + stored) + ")"; - } - } /** * The given storage is reporting all its blocks. @@ -3786,51 +3749,6 @@ public class BlockManager implements BlockStatsMXBean { null); } - private static class ReplicationWork { - - private final BlockInfo block; - private final BlockCollection bc; - - private final DatanodeDescriptor srcNode; - private final List containingNodes; - private final List liveReplicaStorages; - private final int additionalReplRequired; - - private DatanodeStorageInfo targets[]; - private final int priority; - - public ReplicationWork(BlockInfo block, - BlockCollection bc, - DatanodeDescriptor srcNode, - List containingNodes, - List liveReplicaStorages, - int additionalReplRequired, - int priority) { - this.block = block; - this.bc = bc; - this.srcNode = srcNode; - this.srcNode.incrementPendingReplicationWithoutTargets(); - this.containingNodes = containingNodes; - this.liveReplicaStorages = liveReplicaStorages; - this.additionalReplRequired = additionalReplRequired; - this.priority = priority; - this.targets = null; - } - - private void chooseTargets(BlockPlacementPolicy blockplacement, - BlockStoragePolicySuite storagePolicySuite, - Set excludedNodes) { - try { - targets = blockplacement.chooseTarget(bc.getName(), - additionalReplRequired, srcNode, liveReplicaStorages, false, - excludedNodes, block.getNumBytes(), - storagePolicySuite.getPolicy(bc.getStoragePolicyID())); - } finally { - srcNode.decrementPendingReplicationWithoutTargets(); - } - } - } - /** * A simple result enum for the result of * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}. @@ -3844,9 +3762,9 @@ public class BlockManager implements BlockStatsMXBean { OVER_REPLICATED, /** A decision can't currently be made about this block. */ POSTPONE, - /** The block is under construction, so should be ignored */ + /** The block is under construction, so should be ignored. */ UNDER_CONSTRUCTION, - /** The block is properly replicated */ + /** The block is properly replicated. */ OK } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java new file mode 100644 index 00000000000..3842e562ac3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import static org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason; + +import com.google.common.base.Preconditions; + +/** + * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a + * list of blocks that should be considered corrupt due to a block report. + */ +class BlockToMarkCorrupt { + /** The corrupted block in a datanode. */ + private final BlockInfo corrupted; + /** The corresponding block stored in the BlockManager. */ + private final BlockInfo stored; + /** The reason to mark corrupt. */ + private final String reason; + /** The reason code to be stored */ + private final CorruptReplicasMap.Reason reasonCode; + + BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason, + CorruptReplicasMap.Reason reasonCode) { + Preconditions.checkNotNull(corrupted, "corrupted is null"); + Preconditions.checkNotNull(stored, "stored is null"); + + this.corrupted = corrupted; + this.stored = stored; + this.reason = reason; + this.reasonCode = reasonCode; + } + + BlockToMarkCorrupt(BlockInfo stored, String reason, + CorruptReplicasMap.Reason reasonCode) { + this(stored, stored, reason, reasonCode); + } + + BlockToMarkCorrupt(BlockInfo stored, long gs, String reason, + CorruptReplicasMap.Reason reasonCode) { + this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored, + reason, reasonCode); + //the corrupted block in datanode has a different generation stamp + corrupted.setGenerationStamp(gs); + } + + public boolean isCorruptedDuringWrite() { + return stored.getGenerationStamp() > corrupted.getGenerationStamp(); + } + + public BlockInfo getCorrupted() { + return corrupted; + } + + public BlockInfo getStored() { + return stored; + } + + public String getReason() { + return reason; + } + + public Reason getReasonCode() { + return reasonCode; + } + + @Override + public String toString() { + return corrupted + "(" + + (corrupted == stored ? "same as stored": "stored=" + stored) + ")"; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java new file mode 100644 index 00000000000..f8a6dad1d1d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.net.Node; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +class ReplicationWork { + private final BlockInfo block; + private final BlockCollection bc; + private final DatanodeDescriptor srcNode; + private final int additionalReplRequired; + private final int priority; + private final List containingNodes; + private final List liveReplicaStorages; + private DatanodeStorageInfo[] targets; + + public ReplicationWork(BlockInfo block, BlockCollection bc, + DatanodeDescriptor srcNode, List containingNodes, + List liveReplicaStorages, int additionalReplRequired, + int priority) { + this.block = block; + this.bc = bc; + this.srcNode = srcNode; + this.srcNode.incrementPendingReplicationWithoutTargets(); + this.containingNodes = containingNodes; + this.liveReplicaStorages = liveReplicaStorages; + this.additionalReplRequired = additionalReplRequired; + this.priority = priority; + this.targets = null; + } + + void chooseTargets(BlockPlacementPolicy blockplacement, + BlockStoragePolicySuite storagePolicySuite, + Set excludedNodes) { + try { + targets = blockplacement.chooseTarget(bc.getName(), + additionalReplRequired, srcNode, liveReplicaStorages, false, + excludedNodes, block.getNumBytes(), + storagePolicySuite.getPolicy(bc.getStoragePolicyID())); + } finally { + srcNode.decrementPendingReplicationWithoutTargets(); + } + } + + DatanodeStorageInfo[] getTargets() { + return targets; + } + + void resetTargets() { + this.targets = null; + } + + List getContainingNodes() { + return Collections.unmodifiableList(containingNodes); + } + + public int getPriority() { + return priority; + } + + public BlockInfo getBlock() { + return block; + } + + public DatanodeDescriptor getSrcNode() { + return srcNode; + } +}