From 035ed26147f10620fc6ed3a514d9ebbcc31304b5 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Thu, 27 Aug 2015 16:09:35 -0700 Subject: [PATCH] Revert "HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone classes from BlockManager. Contributed by Mingliang Liu." This reverts commit 4e9307f26dd41270f95fb50166e1a091852e4d58. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 - .../server/blockmanagement/BlockManager.java | 368 +++++++++++------- .../blockmanagement/BlockToMarkCorrupt.java | 87 ----- .../blockmanagement/ReplicationWork.java | 87 ----- 4 files changed, 226 insertions(+), 319 deletions(-) delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c4c8c3bd85f..9cc3326d070 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -853,9 +853,6 @@ Release 2.8.0 - UNRELEASED HDFS-8962. Clean up checkstyle warnings in o.a.h.hdfs.DfsClientConf. (Mingliang Liu via wheat9) - HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone - classes from BlockManager. (Mingliang Liu via wheat9) - OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 5a05fa73845..95933d203a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1181,24 +1181,24 @@ public class BlockManager implements BlockStatsMXBean { DatanodeStorageInfo storageInfo, DatanodeDescriptor node) throws IOException { - if (b.getCorrupted().isDeleted()) { + if (b.corrupted.isDeleted()) { blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" + " corrupt as it does not belong to any file", b); - addToInvalidates(b.getCorrupted(), node); + addToInvalidates(b.corrupted, node); return; } - short expectedReplicas = b.getCorrupted().getReplication(); + short expectedReplicas = b.corrupted.getReplication(); // Add replica to the data-node if it is not already there if (storageInfo != null) { - storageInfo.addBlock(b.getStored()); + storageInfo.addBlock(b.stored); } // Add this replica to corruptReplicas Map - corruptReplicas.addToCorruptReplicasMap(b.getCorrupted(), node, - b.getReason(), b.getReasonCode()); + corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason, + b.reasonCode); - NumberReplicas numberOfReplicas = countNodes(b.getStored()); + NumberReplicas numberOfReplicas = countNodes(b.stored); boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= expectedReplicas; boolean minReplicationSatisfied = @@ -1207,7 +1207,7 @@ public class BlockManager implements BlockStatsMXBean { (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) > expectedReplicas; boolean corruptedDuringWrite = minReplicationSatisfied && - b.isCorruptedDuringWrite(); + (b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp()); // case 1: have enough number of live replicas // case 2: corrupted replicas + live replicas > Replication factor // case 3: Block is marked corrupt due to failure while writing. In this @@ -1220,7 +1220,7 @@ public class BlockManager implements BlockStatsMXBean { invalidateBlock(b, node); } else if (namesystem.isPopulatingReplQueues()) { // add the block to neededReplication - updateNeededReplications(b.getStored(), -1, 0); + updateNeededReplications(b.stored, -1, 0); } } @@ -1239,18 +1239,18 @@ public class BlockManager implements BlockStatsMXBean { } // Check how many copies we have of the block - NumberReplicas nr = countNodes(b.getStored()); + NumberReplicas nr = countNodes(b.stored); if (nr.replicasOnStaleNodes() > 0) { blockLog.debug("BLOCK* invalidateBlocks: postponing " + "invalidation of {} on {} because {} replica(s) are located on " + "nodes with potentially out-of-date block reports", b, dn, nr.replicasOnStaleNodes()); - postponeBlock(b.getCorrupted()); + postponeBlock(b.corrupted); return false; } else if (nr.liveReplicas() >= 1) { // If we have at least one copy on a live node, then we can delete it. - addToInvalidates(b.getCorrupted(), dn); - removeStoredBlock(b.getStored(), node); + addToInvalidates(b.corrupted, dn); + removeStoredBlock(b.stored, node); blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.", b, dn); return true; @@ -1338,18 +1338,71 @@ public class BlockManager implements BlockStatsMXBean { */ @VisibleForTesting int computeReplicationWorkForBlocks(List> blocksToReplicate) { + int requiredReplication, numEffectiveReplicas; + List containingNodes; + DatanodeDescriptor srcNode; + BlockCollection bc = null; + int additionalReplRequired; + int scheduledWork = 0; - final List work = new LinkedList<>(); + List work = new LinkedList(); namesystem.writeLock(); try { synchronized (neededReplications) { for (int priority = 0; priority < blocksToReplicate.size(); priority++) { for (BlockInfo block : blocksToReplicate.get(priority)) { - ReplicationWork rw = scheduleReplication(block, priority); - if (rw != null) { - work.add(rw); + // block should belong to a file + bc = getBlockCollection(block); + // abandoned block or block reopened for append + if (bc == null + || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { + // remove from neededReplications + neededReplications.remove(block, priority); + continue; } + + requiredReplication = getExpectedReplicaNum(block); + + // get a source data-node + containingNodes = new ArrayList(); + List liveReplicaNodes = new ArrayList(); + NumberReplicas numReplicas = new NumberReplicas(); + srcNode = chooseSourceDatanode( + block, containingNodes, liveReplicaNodes, numReplicas, + priority); + if(srcNode == null) { // block can not be replicated from any node + LOG.debug("Block " + block + " cannot be repl from any node"); + continue; + } + + // liveReplicaNodes can include READ_ONLY_SHARED replicas which are + // not included in the numReplicas.liveReplicas() count + assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); + + // do not schedule more if enough replicas is already pending + numEffectiveReplicas = numReplicas.liveReplicas() + + pendingReplications.getNumReplicas(block); + + if (numEffectiveReplicas >= requiredReplication) { + if ( (pendingReplications.getNumReplicas(block) > 0) || + (blockHasEnoughRacks(block)) ) { + neededReplications.remove(block, priority); // remove from neededReplications + blockLog.debug("BLOCK* Removing {} from neededReplications as" + + " it has enough replicas", block); + continue; + } + } + + if (numReplicas.liveReplicas() < requiredReplication) { + additionalReplRequired = requiredReplication + - numEffectiveReplicas; + } else { + additionalReplRequired = 1; // Needed on a new rack + } + work.add(new ReplicationWork(block, bc, srcNode, + containingNodes, liveReplicaNodes, additionalReplRequired, + priority)); } } } @@ -1357,12 +1410,12 @@ public class BlockManager implements BlockStatsMXBean { namesystem.writeUnlock(); } - final Set excludedNodes = new HashSet<>(); + final Set excludedNodes = new HashSet(); for(ReplicationWork rw : work){ // Exclude all of the containing nodes from being targets. // This list includes decommissioning or corrupt nodes. excludedNodes.clear(); - for (DatanodeDescriptor dn : rw.getContainingNodes()) { + for (DatanodeDescriptor dn : rw.containingNodes) { excludedNodes.add(dn); } @@ -1375,15 +1428,67 @@ public class BlockManager implements BlockStatsMXBean { namesystem.writeLock(); try { for(ReplicationWork rw : work){ - final DatanodeStorageInfo[] targets = rw.getTargets(); + final DatanodeStorageInfo[] targets = rw.targets; if(targets == null || targets.length == 0){ - rw.resetTargets(); + rw.targets = null; continue; } synchronized (neededReplications) { - if (validateReplicationWork(rw)) { - scheduledWork++; + BlockInfo block = rw.block; + int priority = rw.priority; + // Recheck since global lock was released + // block should belong to a file + bc = getBlockCollection(block); + // abandoned block or block reopened for append + if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { + neededReplications.remove(block, priority); // remove from neededReplications + rw.targets = null; + continue; + } + requiredReplication = getExpectedReplicaNum(block); + + // do not schedule more if enough replicas is already pending + NumberReplicas numReplicas = countNodes(block); + numEffectiveReplicas = numReplicas.liveReplicas() + + pendingReplications.getNumReplicas(block); + + if (numEffectiveReplicas >= requiredReplication) { + if ( (pendingReplications.getNumReplicas(block) > 0) || + (blockHasEnoughRacks(block)) ) { + neededReplications.remove(block, priority); // remove from neededReplications + rw.targets = null; + blockLog.debug("BLOCK* Removing {} from neededReplications as" + + " it has enough replicas", block); + continue; + } + } + + if ( (numReplicas.liveReplicas() >= requiredReplication) && + (!blockHasEnoughRacks(block)) ) { + if (rw.srcNode.getNetworkLocation().equals( + targets[0].getDatanodeDescriptor().getNetworkLocation())) { + //No use continuing, unless a new rack in this case + continue; + } + } + + // Add block to the to be replicated list + rw.srcNode.addBlockToBeReplicated(block, targets); + scheduledWork++; + DatanodeStorageInfo.incrementBlocksScheduled(targets); + + // Move the block-replication into a "pending" state. + // The reason we use 'pending' is so we can retry + // replications that fail after an appropriate amount of time. + pendingReplications.increment(block, + DatanodeStorageInfo.toDatanodeDescriptors(targets)); + blockLog.debug("BLOCK* block {} is moved from neededReplications to " + + "pendingReplications", block); + + // remove from neededReplications + if(numEffectiveReplicas + targets.length >= requiredReplication) { + neededReplications.remove(block, priority); // remove from neededReplications } } } @@ -1394,15 +1499,15 @@ public class BlockManager implements BlockStatsMXBean { if (blockLog.isInfoEnabled()) { // log which blocks have been scheduled for replication for(ReplicationWork rw : work){ - DatanodeStorageInfo[] targets = rw.getTargets(); + DatanodeStorageInfo[] targets = rw.targets; if (targets != null && targets.length != 0) { StringBuilder targetList = new StringBuilder("datanode(s)"); - for (DatanodeStorageInfo target : targets) { + for (int k = 0; k < targets.length; k++) { targetList.append(' '); - targetList.append(target.getDatanodeDescriptor()); + targetList.append(targets[k].getDatanodeDescriptor()); } - blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNode(), - rw.getBlock(), targetList); + blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNode, + rw.block, targetList); } } } @@ -1414,118 +1519,6 @@ public class BlockManager implements BlockStatsMXBean { return scheduledWork; } - boolean hasEnoughEffectiveReplicas(BlockInfo block, - NumberReplicas numReplicas, int pendingReplicaNum, int required) { - int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum; - return (numEffectiveReplicas >= required) && - (pendingReplicaNum > 0 || blockHasEnoughRacks(block)); - } - - private ReplicationWork scheduleReplication(BlockInfo block, int priority) { - // block should belong to a file - BlockCollection bc = getBlockCollection(block); - // abandoned block or block reopened for append - if (bc == null - || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { - // remove from neededReplications - neededReplications.remove(block, priority); - return null; - } - - short requiredReplication = getExpectedReplicaNum(block); - - // get a source data-node - List containingNodes = new ArrayList<>(); - List liveReplicaNodes = new ArrayList<>(); - NumberReplicas numReplicas = new NumberReplicas(); - DatanodeDescriptor srcNode = chooseSourceDatanode(block, containingNodes, - liveReplicaNodes, numReplicas, priority); - if (srcNode == null) { // block can not be replicated from any node - LOG.debug("Block " + block + " cannot be repl from any node"); - return null; - } - - // liveReplicaNodes can include READ_ONLY_SHARED replicas which are - // not included in the numReplicas.liveReplicas() count - assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); - - int pendingNum = pendingReplications.getNumReplicas(block); - if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, - requiredReplication)) { - neededReplications.remove(block, priority); - blockLog.debug("BLOCK* Removing {} from neededReplications as" + - " it has enough replicas", block); - return null; - } - - final int additionalReplRequired; - if (numReplicas.liveReplicas() < requiredReplication) { - additionalReplRequired = requiredReplication - numReplicas.liveReplicas() - - pendingNum; - } else { - additionalReplRequired = 1; // Needed on a new rack - } - return new ReplicationWork(block, bc, srcNode, containingNodes, - liveReplicaNodes, additionalReplRequired, priority); - } - - private boolean validateReplicationWork(ReplicationWork rw) { - BlockInfo block = rw.getBlock(); - int priority = rw.getPriority(); - // Recheck since global lock was released - // block should belong to a file - BlockCollection bc = getBlockCollection(block); - // abandoned block or block reopened for append - if(bc == null - || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { - neededReplications.remove(block, priority); - rw.resetTargets(); - return false; - } - - // do not schedule more if enough replicas is already pending - final short requiredReplication = getExpectedReplicaNum(block); - NumberReplicas numReplicas = countNodes(block); - final int pendingNum = pendingReplications.getNumReplicas(block); - if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, - requiredReplication)) { - neededReplications.remove(block, priority); - rw.resetTargets(); - blockLog.debug("BLOCK* Removing {} from neededReplications as" + - " it has enough replicas", block); - return false; - } - - DatanodeStorageInfo[] targets = rw.getTargets(); - if ( (numReplicas.liveReplicas() >= requiredReplication) && - (!blockHasEnoughRacks(block)) ) { - if (rw.getSrcNode().getNetworkLocation().equals( - targets[0].getDatanodeDescriptor().getNetworkLocation())) { - //No use continuing, unless a new rack in this case - return false; - } - } - - // Add block to the to be replicated list - rw.getSrcNode().addBlockToBeReplicated(block, targets); - DatanodeStorageInfo.incrementBlocksScheduled(targets); - - // Move the block-replication into a "pending" state. - // The reason we use 'pending' is so we can retry - // replications that fail after an appropriate amount of time. - pendingReplications.increment(block, - DatanodeStorageInfo.toDatanodeDescriptors(targets)); - blockLog.debug("BLOCK* block {} is moved from neededReplications to " - + "pendingReplications", block); - - int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum; - // remove from neededReplications - if(numEffectiveReplicas + targets.length >= requiredReplication) { - neededReplications.remove(block, priority); - } - return true; - } - /** Choose target for WebHDFS redirection. */ public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src, DatanodeDescriptor clientnode, Set excludes, long blocksize) { @@ -1772,6 +1765,52 @@ public class BlockManager implements BlockStatsMXBean { this.reportedState = reportedState; } } + + /** + * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a + * list of blocks that should be considered corrupt due to a block report. + */ + private static class BlockToMarkCorrupt { + /** The corrupted block in a datanode. */ + final BlockInfo corrupted; + /** The corresponding block stored in the BlockManager. */ + final BlockInfo stored; + /** The reason to mark corrupt. */ + final String reason; + /** The reason code to be stored */ + final Reason reasonCode; + + BlockToMarkCorrupt(BlockInfo corrupted, + BlockInfo stored, String reason, + Reason reasonCode) { + Preconditions.checkNotNull(corrupted, "corrupted is null"); + Preconditions.checkNotNull(stored, "stored is null"); + + this.corrupted = corrupted; + this.stored = stored; + this.reason = reason; + this.reasonCode = reasonCode; + } + + BlockToMarkCorrupt(BlockInfo stored, String reason, + Reason reasonCode) { + this(stored, stored, reason, reasonCode); + } + + BlockToMarkCorrupt(BlockInfo stored, long gs, String reason, + Reason reasonCode) { + this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored, + reason, reasonCode); + //the corrupted block in datanode has a different generation stamp + corrupted.setGenerationStamp(gs); + } + + @Override + public String toString() { + return corrupted + "(" + + (corrupted == stored? "same as stored": "stored=" + stored) + ")"; + } + } /** * The given storage is reporting all its blocks. @@ -3758,6 +3797,51 @@ public class BlockManager implements BlockStatsMXBean { null); } + private static class ReplicationWork { + + private final BlockInfo block; + private final BlockCollection bc; + + private final DatanodeDescriptor srcNode; + private final List containingNodes; + private final List liveReplicaStorages; + private final int additionalReplRequired; + + private DatanodeStorageInfo targets[]; + private final int priority; + + public ReplicationWork(BlockInfo block, + BlockCollection bc, + DatanodeDescriptor srcNode, + List containingNodes, + List liveReplicaStorages, + int additionalReplRequired, + int priority) { + this.block = block; + this.bc = bc; + this.srcNode = srcNode; + this.srcNode.incrementPendingReplicationWithoutTargets(); + this.containingNodes = containingNodes; + this.liveReplicaStorages = liveReplicaStorages; + this.additionalReplRequired = additionalReplRequired; + this.priority = priority; + this.targets = null; + } + + private void chooseTargets(BlockPlacementPolicy blockplacement, + BlockStoragePolicySuite storagePolicySuite, + Set excludedNodes) { + try { + targets = blockplacement.chooseTarget(bc.getName(), + additionalReplRequired, srcNode, liveReplicaStorages, false, + excludedNodes, block.getNumBytes(), + storagePolicySuite.getPolicy(bc.getStoragePolicyID())); + } finally { + srcNode.decrementPendingReplicationWithoutTargets(); + } + } + } + /** * A simple result enum for the result of * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}. @@ -3771,9 +3855,9 @@ public class BlockManager implements BlockStatsMXBean { OVER_REPLICATED, /** A decision can't currently be made about this block. */ POSTPONE, - /** The block is under construction, so should be ignored. */ + /** The block is under construction, so should be ignored */ UNDER_CONSTRUCTION, - /** The block is properly replicated. */ + /** The block is properly replicated */ OK } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java deleted file mode 100644 index 3842e562ac3..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.blockmanagement; - -import static org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason; - -import com.google.common.base.Preconditions; - -/** - * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a - * list of blocks that should be considered corrupt due to a block report. - */ -class BlockToMarkCorrupt { - /** The corrupted block in a datanode. */ - private final BlockInfo corrupted; - /** The corresponding block stored in the BlockManager. */ - private final BlockInfo stored; - /** The reason to mark corrupt. */ - private final String reason; - /** The reason code to be stored */ - private final CorruptReplicasMap.Reason reasonCode; - - BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason, - CorruptReplicasMap.Reason reasonCode) { - Preconditions.checkNotNull(corrupted, "corrupted is null"); - Preconditions.checkNotNull(stored, "stored is null"); - - this.corrupted = corrupted; - this.stored = stored; - this.reason = reason; - this.reasonCode = reasonCode; - } - - BlockToMarkCorrupt(BlockInfo stored, String reason, - CorruptReplicasMap.Reason reasonCode) { - this(stored, stored, reason, reasonCode); - } - - BlockToMarkCorrupt(BlockInfo stored, long gs, String reason, - CorruptReplicasMap.Reason reasonCode) { - this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored, - reason, reasonCode); - //the corrupted block in datanode has a different generation stamp - corrupted.setGenerationStamp(gs); - } - - public boolean isCorruptedDuringWrite() { - return stored.getGenerationStamp() > corrupted.getGenerationStamp(); - } - - public BlockInfo getCorrupted() { - return corrupted; - } - - public BlockInfo getStored() { - return stored; - } - - public String getReason() { - return reason; - } - - public Reason getReasonCode() { - return reasonCode; - } - - @Override - public String toString() { - return corrupted + "(" - + (corrupted == stored ? "same as stored": "stored=" + stored) + ")"; - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java deleted file mode 100644 index f8a6dad1d1d..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.blockmanagement; - -import org.apache.hadoop.net.Node; - -import java.util.Collections; -import java.util.List; -import java.util.Set; - -class ReplicationWork { - private final BlockInfo block; - private final BlockCollection bc; - private final DatanodeDescriptor srcNode; - private final int additionalReplRequired; - private final int priority; - private final List containingNodes; - private final List liveReplicaStorages; - private DatanodeStorageInfo[] targets; - - public ReplicationWork(BlockInfo block, BlockCollection bc, - DatanodeDescriptor srcNode, List containingNodes, - List liveReplicaStorages, int additionalReplRequired, - int priority) { - this.block = block; - this.bc = bc; - this.srcNode = srcNode; - this.srcNode.incrementPendingReplicationWithoutTargets(); - this.containingNodes = containingNodes; - this.liveReplicaStorages = liveReplicaStorages; - this.additionalReplRequired = additionalReplRequired; - this.priority = priority; - this.targets = null; - } - - void chooseTargets(BlockPlacementPolicy blockplacement, - BlockStoragePolicySuite storagePolicySuite, - Set excludedNodes) { - try { - targets = blockplacement.chooseTarget(bc.getName(), - additionalReplRequired, srcNode, liveReplicaStorages, false, - excludedNodes, block.getNumBytes(), - storagePolicySuite.getPolicy(bc.getStoragePolicyID())); - } finally { - srcNode.decrementPendingReplicationWithoutTargets(); - } - } - - DatanodeStorageInfo[] getTargets() { - return targets; - } - - void resetTargets() { - this.targets = null; - } - - List getContainingNodes() { - return Collections.unmodifiableList(containingNodes); - } - - public int getPriority() { - return priority; - } - - public BlockInfo getBlock() { - return block; - } - - public DatanodeDescriptor getSrcNode() { - return srcNode; - } -}