HDFS-11248: [SPS]: Handle partial block location movements. Contributed by Rakesh R

This commit is contained in:
Uma Maheswara Rao G 2016-12-28 23:21:07 -08:00 committed by Uma Maheswara Rao Gangumalla
parent d81611fe55
commit b7bed9f00a
6 changed files with 460 additions and 167 deletions

View File

@ -28,7 +28,6 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult; import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler; import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -109,20 +108,6 @@ public void run() {
} }
} }
/**
* Mark as block movement failure for the given trackId and blockId.
*
* @param trackId tracking id
* @param blockId block id
*/
void markBlockMovementFailure(long trackId, long blockId) {
LOG.debug("Mark as block movement failure for the given "
+ "trackId:{} and blockId:{}", trackId, blockId);
BlockMovementResult result = new BlockMovementResult(trackId, blockId, null,
BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE);
addMovementResultToTrackIdList(result);
}
private List<BlockMovementResult> addMovementResultToTrackIdList( private List<BlockMovementResult> addMovementResultToTrackIdList(
BlockMovementResult result) { BlockMovementResult result) {
long trackId = result.getTrackId(); long trackId = result.getTrackId();

View File

@ -154,18 +154,9 @@ public void processBlockMovingTasks(long trackID, String blockPoolID,
Collection<BlockMovingInfo> blockMovingInfos) { Collection<BlockMovingInfo> blockMovingInfos) {
LOG.debug("Received BlockMovingTasks {}", blockMovingInfos); LOG.debug("Received BlockMovingTasks {}", blockMovingInfos);
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Iterating backwards. This is to ensure that all the block src location assert blkMovingInfo.getSources().length == blkMovingInfo
// which doesn't have a target node will be marked as failure before .getTargets().length;
// scheduling the block movement to valid target nodes. for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
for (int i = blkMovingInfo.getSources().length - 1; i >= 0; i--) {
if (i >= blkMovingInfo.getTargets().length) {
// Since there is no target selected for scheduling the block,
// just mark this block storage movement as failure. Later, namenode
// can take action on this.
movementTracker.markBlockMovementFailure(trackID,
blkMovingInfo.getBlock().getBlockId());
continue;
}
DatanodeInfo target = blkMovingInfo.getTargets()[i]; DatanodeInfo target = blkMovingInfo.getTargets()[i];
BlockMovingTask blockMovingTask = new BlockMovingTask( BlockMovingTask blockMovingTask = new BlockMovingTask(
trackID, blockPoolID, blkMovingInfo.getBlock(), trackID, blockPoolID, blkMovingInfo.getBlock(),

View File

@ -43,11 +43,14 @@
* automatically after timeout. The default timeout would be 30mins. * automatically after timeout. The default timeout would be 30mins.
*/ */
public class BlockStorageMovementAttemptedItems { public class BlockStorageMovementAttemptedItems {
public static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class); LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
// A map holds the items which are already taken for blocks movements
// processing and sent to DNs. /**
private final Map<Long, Long> storageMovementAttemptedItems; * A map holds the items which are already taken for blocks movements
* processing and sent to DNs.
*/
private final Map<Long, ItemInfo> storageMovementAttemptedItems;
private final List<BlocksStorageMovementResult> storageMovementAttemptedResults; private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
private volatile boolean monitorRunning = true; private volatile boolean monitorRunning = true;
private Daemon timerThread = null; private Daemon timerThread = null;
@ -83,10 +86,16 @@ public BlockStorageMovementAttemptedItems(long timeoutPeriod,
* *
* @param blockCollectionID * @param blockCollectionID
* - tracking id / block collection id * - tracking id / block collection id
* @param allBlockLocsAttemptedToSatisfy
* - failed to find matching target nodes to satisfy storage type for
* all the block locations of the given blockCollectionID
*/ */
public void add(Long blockCollectionID) { public void add(Long blockCollectionID,
boolean allBlockLocsAttemptedToSatisfy) {
synchronized (storageMovementAttemptedItems) { synchronized (storageMovementAttemptedItems) {
storageMovementAttemptedItems.put(blockCollectionID, monotonicNow()); ItemInfo itemInfo = new ItemInfo(monotonicNow(),
allBlockLocsAttemptedToSatisfy);
storageMovementAttemptedItems.put(blockCollectionID, itemInfo);
} }
} }
@ -121,14 +130,61 @@ public synchronized void start() {
*/ */
public synchronized void stop() { public synchronized void stop() {
monitorRunning = false; monitorRunning = false;
timerThread.interrupt(); if (timerThread != null) {
try { timerThread.interrupt();
timerThread.join(3000); try {
} catch (InterruptedException ie) { timerThread.join(3000);
} catch (InterruptedException ie) {
}
} }
this.clearQueues(); this.clearQueues();
} }
/**
* This class contains information of an attempted trackID. Information such
* as, (a)last attempted time stamp, (b)whether all the blocks in the trackID
* were attempted and blocks movement has been scheduled to satisfy storage
* policy. This is used by
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
*/
private final static class ItemInfo {
private final long lastAttemptedTimeStamp;
private final boolean allBlockLocsAttemptedToSatisfy;
/**
* ItemInfo constructor.
*
* @param lastAttemptedTimeStamp
* last attempted time stamp
* @param allBlockLocsAttemptedToSatisfy
* whether all the blocks in the trackID were attempted and blocks
* movement has been scheduled to satisfy storage policy
*/
private ItemInfo(long lastAttemptedTimeStamp,
boolean allBlockLocsAttemptedToSatisfy) {
this.lastAttemptedTimeStamp = lastAttemptedTimeStamp;
this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy;
}
/**
* @return last attempted time stamp.
*/
private long getLastAttemptedTimeStamp() {
return lastAttemptedTimeStamp;
}
/**
* @return true/false. True value represents that, all the block locations
* under the trackID has found matching target nodes to satisfy
* storage policy. False value represents that, trackID needed
* retries to satisfy the storage policy for some of the block
* locations.
*/
private boolean isAllBlockLocsAttemptedToSatisfy() {
return allBlockLocsAttemptedToSatisfy;
}
}
/** /**
* A monitor class for checking block storage movement result and long waiting * A monitor class for checking block storage movement result and long waiting
* items periodically. * items periodically.
@ -147,76 +203,108 @@ public void run() {
} }
} }
} }
}
private void blocksStorageMovementUnReportedItemsCheck() { @VisibleForTesting
synchronized (storageMovementAttemptedItems) { void blocksStorageMovementUnReportedItemsCheck() {
Iterator<Entry<Long, Long>> iter = synchronized (storageMovementAttemptedItems) {
storageMovementAttemptedItems.entrySet().iterator(); Iterator<Entry<Long, ItemInfo>> iter = storageMovementAttemptedItems
long now = monotonicNow(); .entrySet().iterator();
while (iter.hasNext()) { long now = monotonicNow();
Entry<Long, Long> entry = iter.next(); while (iter.hasNext()) {
if (now > entry.getValue() + selfRetryTimeout) { Entry<Long, ItemInfo> entry = iter.next();
Long blockCollectionID = entry.getKey(); ItemInfo itemInfo = entry.getValue();
synchronized (storageMovementAttemptedResults) { if (now > itemInfo.getLastAttemptedTimeStamp() + selfRetryTimeout) {
boolean exist = isExistInResult(blockCollectionID); Long blockCollectionID = entry.getKey();
if (!exist) { synchronized (storageMovementAttemptedResults) {
blockStorageMovementNeeded.add(blockCollectionID); if (!isExistInResult(blockCollectionID)) {
} else { blockStorageMovementNeeded.add(blockCollectionID);
LOG.info("Blocks storage movement results for the"
+ " tracking id : " + blockCollectionID
+ " is reported from one of the co-ordinating datanode."
+ " So, the result will be processed soon.");
}
iter.remove(); iter.remove();
LOG.info("TrackID: {} becomes timed out and moved to needed "
+ "retries queue for next iteration.", blockCollectionID);
} else {
LOG.info("Blocks storage movement results for the"
+ " tracking id : " + blockCollectionID
+ " is reported from one of the co-ordinating datanode."
+ " So, the result will be processed soon.");
} }
} }
} }
}
}
}
private boolean isExistInResult(Long blockCollectionID) {
Iterator<BlocksStorageMovementResult> iter = storageMovementAttemptedResults
.iterator();
while (iter.hasNext()) {
BlocksStorageMovementResult storageMovementAttemptedResult = iter.next();
if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) {
return true;
} }
} }
return false;
}
private boolean isExistInResult(Long blockCollectionID) { @VisibleForTesting
Iterator<BlocksStorageMovementResult> iter = void blockStorageMovementResultCheck() {
synchronized (storageMovementAttemptedResults) {
Iterator<BlocksStorageMovementResult> resultsIter =
storageMovementAttemptedResults.iterator(); storageMovementAttemptedResults.iterator();
while (iter.hasNext()) { while (resultsIter.hasNext()) {
BlocksStorageMovementResult storageMovementAttemptedResult = // TrackID need to be retried in the following cases:
iter.next(); // 1) All or few scheduled block(s) movement has been failed.
if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) { // 2) All the scheduled block(s) movement has been succeeded but there
return true; // are unscheduled block(s) movement in this trackID. Say, some of
} // the blocks in the trackID couldn't finding any matching target node
} // for scheduling block movement in previous SPS iteration.
return false; BlocksStorageMovementResult storageMovementAttemptedResult = resultsIter
} .next();
synchronized (storageMovementAttemptedItems) {
private void blockStorageMovementResultCheck() {
synchronized (storageMovementAttemptedResults) {
Iterator<BlocksStorageMovementResult> iter =
storageMovementAttemptedResults.iterator();
while (iter.hasNext()) {
BlocksStorageMovementResult storageMovementAttemptedResult =
iter.next();
if (storageMovementAttemptedResult if (storageMovementAttemptedResult
.getStatus() == BlocksStorageMovementResult.Status.FAILURE) { .getStatus() == BlocksStorageMovementResult.Status.FAILURE) {
blockStorageMovementNeeded blockStorageMovementNeeded
.add(storageMovementAttemptedResult.getTrackId()); .add(storageMovementAttemptedResult.getTrackId());
LOG.warn("Blocks storage movement results for the tracking id : " LOG.warn("Blocks storage movement results for the tracking id: {}"
+ storageMovementAttemptedResult.getTrackId()
+ " is reported from co-ordinating datanode, but result" + " is reported from co-ordinating datanode, but result"
+ " status is FAILURE. So, added for retry"); + " status is FAILURE. So, added for retry",
storageMovementAttemptedResult.getTrackId());
} else { } else {
synchronized (storageMovementAttemptedItems) { ItemInfo itemInfo = storageMovementAttemptedItems
storageMovementAttemptedItems .get(storageMovementAttemptedResult.getTrackId());
.remove(storageMovementAttemptedResult.getTrackId());
}
LOG.info("Blocks storage movement results for the tracking id : "
+ storageMovementAttemptedResult.getTrackId()
+ " is reported from co-ordinating datanode. "
+ "The result status is SUCCESS.");
}
iter.remove(); // remove from results as processed above
}
}
// ItemInfo could be null. One case is, before the blocks movements
// result arrives the attempted trackID became timed out and then
// removed the trackID from the storageMovementAttemptedItems list.
// TODO: Need to ensure that trackID is added to the
// 'blockStorageMovementNeeded' queue for retries to handle the
// following condition. If all the block locations under the trackID
// are attempted and failed to find matching target nodes to satisfy
// storage policy in previous SPS iteration.
if (itemInfo != null
&& !itemInfo.isAllBlockLocsAttemptedToSatisfy()) {
blockStorageMovementNeeded
.add(storageMovementAttemptedResult.getTrackId());
LOG.warn("Blocks storage movement is SUCCESS for the track id: {}"
+ " reported from co-ordinating datanode. But adding trackID"
+ " back to retry queue as some of the blocks couldn't find"
+ " matching target nodes in previous SPS iteration.",
storageMovementAttemptedResult.getTrackId());
} else {
LOG.info("Blocks storage movement is SUCCESS for the track id: {}"
+ " reported from co-ordinating datanode. But the trackID "
+ "doesn't exists in storageMovementAttemptedItems list",
storageMovementAttemptedResult.getTrackId());
}
}
// Remove trackID from the attempted list, if any.
storageMovementAttemptedItems
.remove(storageMovementAttemptedResult.getTrackId());
}
// Remove trackID from results as processed above.
resultsIter.remove();
}
} }
} }

View File

@ -162,8 +162,15 @@ public void run() {
try { try {
Long blockCollectionID = storageMovementNeeded.get(); Long blockCollectionID = storageMovementNeeded.get();
if (blockCollectionID != null) { if (blockCollectionID != null) {
computeAndAssignStorageMismatchedBlocksToDNs(blockCollectionID); BlockCollection blockCollection =
this.storageMovementsMonitor.add(blockCollectionID); namesystem.getBlockCollection(blockCollectionID);
// Check blockCollectionId existence.
if (blockCollection != null) {
boolean allBlockLocsAttemptedToSatisfy =
computeAndAssignStorageMismatchedBlocksToDNs(blockCollection);
this.storageMovementsMonitor.add(blockCollectionID,
allBlockLocsAttemptedToSatisfy);
}
} }
// TODO: We can think to make this as configurable later, how frequently // TODO: We can think to make this as configurable later, how frequently
// we want to check block movements. // we want to check block movements.
@ -192,20 +199,17 @@ public void run() {
} }
} }
private void computeAndAssignStorageMismatchedBlocksToDNs( private boolean computeAndAssignStorageMismatchedBlocksToDNs(
long blockCollectionID) { BlockCollection blockCollection) {
BlockCollection blockCollection =
namesystem.getBlockCollection(blockCollectionID);
if (blockCollection == null) {
return;
}
byte existingStoragePolicyID = blockCollection.getStoragePolicyID(); byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
BlockStoragePolicy existingStoragePolicy = BlockStoragePolicy existingStoragePolicy =
blockManager.getStoragePolicy(existingStoragePolicyID); blockManager.getStoragePolicy(existingStoragePolicyID);
if (!blockCollection.getLastBlock().isComplete()) { if (!blockCollection.getLastBlock().isComplete()) {
// Postpone, currently file is under construction // Postpone, currently file is under construction
// So, should we add back? or leave it to user // So, should we add back? or leave it to user
return; LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
+ " this to the next retry iteration", blockCollection.getId());
return true;
} }
// First datanode will be chosen as the co-ordinator node for storage // First datanode will be chosen as the co-ordinator node for storage
@ -213,61 +217,87 @@ private void computeAndAssignStorageMismatchedBlocksToDNs(
DatanodeDescriptor coordinatorNode = null; DatanodeDescriptor coordinatorNode = null;
BlockInfo[] blocks = blockCollection.getBlocks(); BlockInfo[] blocks = blockCollection.getBlocks();
List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>(); List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
// True value represents that, SPS is able to find matching target nodes
// to satisfy storage type for all the blocks locations of the given
// blockCollection. A false value represents that, blockCollection needed
// retries to satisfy the storage policy for some of the block locations.
boolean foundMatchingTargetNodesForAllBlocks = true;
for (int i = 0; i < blocks.length; i++) { for (int i = 0; i < blocks.length; i++) {
BlockInfo blockInfo = blocks[i]; BlockInfo blockInfo = blocks[i];
List<StorageType> expectedStorageTypes = List<StorageType> expectedStorageTypes = existingStoragePolicy
existingStoragePolicy.chooseStorageTypes(blockInfo.getReplication()); .chooseStorageTypes(blockInfo.getReplication());
DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo); foundMatchingTargetNodesForAllBlocks |= computeBlockMovingInfos(
StorageType[] storageTypes = new StorageType[storages.length]; blockMovingInfos, blockInfo, expectedStorageTypes);
for (int j = 0; j < storages.length; j++) {
DatanodeStorageInfo datanodeStorageInfo = storages[j];
StorageType storageType = datanodeStorageInfo.getStorageType();
storageTypes[j] = storageType;
}
List<StorageType> existing =
new LinkedList<StorageType>(Arrays.asList(storageTypes));
if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
List<StorageTypeNodePair> sourceWithStorageMap =
new ArrayList<StorageTypeNodePair>();
List<DatanodeStorageInfo> existingBlockStorages =
new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
for (StorageType existingType : existing) {
Iterator<DatanodeStorageInfo> iterator =
existingBlockStorages.iterator();
while (iterator.hasNext()) {
DatanodeStorageInfo datanodeStorageInfo = iterator.next();
StorageType storageType = datanodeStorageInfo.getStorageType();
if (storageType == existingType) {
iterator.remove();
sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
datanodeStorageInfo.getDatanodeDescriptor()));
break;
}
}
}
StorageTypeNodeMap locsForExpectedStorageTypes =
findTargetsForExpectedStorageTypes(expectedStorageTypes);
BlockMovingInfo blockMovingInfo =
findSourceAndTargetToMove(blockInfo, existing, sourceWithStorageMap,
expectedStorageTypes, locsForExpectedStorageTypes);
if (coordinatorNode == null) {
// For now, first datanode will be chosen as the co-ordinator. Later
// this can be optimized if needed.
coordinatorNode =
(DatanodeDescriptor) blockMovingInfo.getSources()[0];
}
blockMovingInfos.add(blockMovingInfo);
}
} }
addBlockMovingInfosToCoordinatorDn(blockCollectionID, blockMovingInfos, assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(),
coordinatorNode); blockMovingInfos, coordinatorNode);
return foundMatchingTargetNodesForAllBlocks;
} }
private void addBlockMovingInfosToCoordinatorDn(long blockCollectionID, /**
* Compute the list of block moving information corresponding to the given
* blockId. This will check that each block location of the given block is
* satisfying the expected storage policy. If block location is not satisfied
* the policy then find out the target node with the expected storage type to
* satisfy the storage policy.
*
* @param blockMovingInfos
* - list of block source and target node pair
* @param blockInfo
* - block details
* @param expectedStorageTypes
* - list of expected storage type to satisfy the storage policy
* @return false if some of the block locations failed to find target node to
* satisfy the storage policy, true otherwise
*/
private boolean computeBlockMovingInfos(
List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
List<StorageType> expectedStorageTypes) {
boolean foundMatchingTargetNodesForBlock = true;
DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
StorageType[] storageTypes = new StorageType[storages.length];
for (int j = 0; j < storages.length; j++) {
DatanodeStorageInfo datanodeStorageInfo = storages[j];
StorageType storageType = datanodeStorageInfo.getStorageType();
storageTypes[j] = storageType;
}
List<StorageType> existing =
new LinkedList<StorageType>(Arrays.asList(storageTypes));
if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
List<StorageTypeNodePair> sourceWithStorageMap =
new ArrayList<StorageTypeNodePair>();
List<DatanodeStorageInfo> existingBlockStorages =
new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
for (StorageType existingType : existing) {
Iterator<DatanodeStorageInfo> iterator =
existingBlockStorages.iterator();
while (iterator.hasNext()) {
DatanodeStorageInfo datanodeStorageInfo = iterator.next();
StorageType storageType = datanodeStorageInfo.getStorageType();
if (storageType == existingType) {
iterator.remove();
sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
datanodeStorageInfo.getDatanodeDescriptor()));
break;
}
}
}
StorageTypeNodeMap locsForExpectedStorageTypes =
findTargetsForExpectedStorageTypes(expectedStorageTypes);
foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
blockMovingInfos, blockInfo, existing, sourceWithStorageMap,
expectedStorageTypes, locsForExpectedStorageTypes);
}
return foundMatchingTargetNodesForBlock;
}
private void assignBlockMovingInfosToCoordinatorDn(long blockCollectionID,
List<BlockMovingInfo> blockMovingInfos, List<BlockMovingInfo> blockMovingInfos,
DatanodeDescriptor coordinatorNode) { DatanodeDescriptor coordinatorNode) {
@ -278,6 +308,11 @@ private void addBlockMovingInfosToCoordinatorDn(long blockCollectionID,
return; return;
} }
// For now, first datanode will be chosen as the co-ordinator. Later
// this can be optimized if needed.
coordinatorNode = (DatanodeDescriptor) blockMovingInfos.get(0)
.getSources()[0];
boolean needBlockStorageMovement = false; boolean needBlockStorageMovement = false;
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Check for atleast one block storage movement has been chosen // Check for atleast one block storage movement has been chosen
@ -301,6 +336,8 @@ private void addBlockMovingInfosToCoordinatorDn(long blockCollectionID,
* Find the good target node for each source node for which block storages was * Find the good target node for each source node for which block storages was
* misplaced. * misplaced.
* *
* @param blockMovingInfos
* - list of block source and target node pair
* @param blockInfo * @param blockInfo
* - Block * - Block
* @param existing * @param existing
@ -311,23 +348,49 @@ private void addBlockMovingInfosToCoordinatorDn(long blockCollectionID,
* - Expecting storages to move * - Expecting storages to move
* @param locsForExpectedStorageTypes * @param locsForExpectedStorageTypes
* - Available DNs for expected storage types * - Available DNs for expected storage types
* @return list of block source and target node pair * @return false if some of the block locations failed to find target node to
* satisfy the storage policy
*/ */
private BlockMovingInfo findSourceAndTargetToMove(BlockInfo blockInfo, private boolean findSourceAndTargetToMove(
List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
List<StorageType> existing, List<StorageType> existing,
List<StorageTypeNodePair> sourceWithStorageList, List<StorageTypeNodePair> sourceWithStorageList,
List<StorageType> expected, List<StorageType> expected,
StorageTypeNodeMap locsForExpectedStorageTypes) { StorageTypeNodeMap locsForExpectedStorageTypes) {
boolean foundMatchingTargetNodesForBlock = true;
List<DatanodeInfo> sourceNodes = new ArrayList<>(); List<DatanodeInfo> sourceNodes = new ArrayList<>();
List<StorageType> sourceStorageTypes = new ArrayList<>(); List<StorageType> sourceStorageTypes = new ArrayList<>();
List<DatanodeInfo> targetNodes = new ArrayList<>(); List<DatanodeInfo> targetNodes = new ArrayList<>();
List<StorageType> targetStorageTypes = new ArrayList<>(); List<StorageType> targetStorageTypes = new ArrayList<>();
List<DatanodeDescriptor> chosenNodes = new ArrayList<>(); List<DatanodeDescriptor> chosenNodes = new ArrayList<>();
// Looping over all the source node locations and choose the target
// storage within same node if possible. This is done separately to
// avoid choosing a target which already has this block.
for (int i = 0; i < sourceWithStorageList.size(); i++) { for (int i = 0; i < sourceWithStorageList.size(); i++) {
StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i); StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode( StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
existingTypeNodePair.dn, expected); existingTypeNodePair.dn, expected);
if (chosenTarget != null) {
sourceNodes.add(existingTypeNodePair.dn);
sourceStorageTypes.add(existingTypeNodePair.storageType);
targetNodes.add(chosenTarget.dn);
targetStorageTypes.add(chosenTarget.storageType);
chosenNodes.add(chosenTarget.dn);
// TODO: We can increment scheduled block count for this node?
}
}
// Looping over all the source node locations. Choose a remote target
// storage node if it was not found out within same node.
for (int i = 0; i < sourceWithStorageList.size(); i++) {
StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
StorageTypeNodePair chosenTarget = null;
// Chosen the target storage within same datanode. So just skipping this
// source node.
if (sourceNodes.contains(existingTypeNodePair.dn)) {
continue;
}
if (chosenTarget == null && blockManager.getDatanodeManager() if (chosenTarget == null && blockManager.getDatanodeManager()
.getNetworkTopology().isNodeGroupAware()) { .getNetworkTopology().isNodeGroupAware()) {
chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn, chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
@ -359,18 +422,40 @@ private BlockMovingInfo findSourceAndTargetToMove(BlockInfo blockInfo,
"Failed to choose target datanode for the required" "Failed to choose target datanode for the required"
+ " storage types {}, block:{}, existing storage type:{}", + " storage types {}, block:{}, existing storage type:{}",
expected, blockInfo, existingTypeNodePair.storageType); expected, blockInfo, existingTypeNodePair.storageType);
sourceNodes.add(existingTypeNodePair.dn); foundMatchingTargetNodesForBlock = false;
sourceStorageTypes.add(existingTypeNodePair.storageType);
// Imp: Not setting the target details, empty targets. Later, this is
// used as an indicator for retrying this block movement.
} }
} }
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blockInfo,
blockMovingInfos.addAll(getBlockMovingInfos(blockInfo, sourceNodes,
sourceStorageTypes, targetNodes, targetStorageTypes));
return foundMatchingTargetNodesForBlock;
}
private List<BlockMovingInfo> getBlockMovingInfos(BlockInfo blockInfo,
List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes) {
List<BlockMovingInfo> blkMovingInfos = new ArrayList<>();
// No source-target node pair exists.
if (sourceNodes.size() <= 0) {
return blkMovingInfos;
}
buildBlockMovingInfos(blockInfo, sourceNodes, sourceStorageTypes,
targetNodes, targetStorageTypes, blkMovingInfos);
return blkMovingInfos;
}
private void buildBlockMovingInfos(BlockInfo blockInfo,
List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes,
List<BlockMovingInfo> blkMovingInfos) {
Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(),
blockInfo.getGenerationStamp());
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk,
sourceNodes.toArray(new DatanodeInfo[sourceNodes.size()]), sourceNodes.toArray(new DatanodeInfo[sourceNodes.size()]),
targetNodes.toArray(new DatanodeInfo[targetNodes.size()]), targetNodes.toArray(new DatanodeInfo[targetNodes.size()]),
sourceStorageTypes.toArray(new StorageType[sourceStorageTypes.size()]), sourceStorageTypes.toArray(new StorageType[sourceStorageTypes.size()]),
targetStorageTypes.toArray(new StorageType[targetStorageTypes.size()])); targetStorageTypes.toArray(new StorageType[targetStorageTypes.size()]));
return blkMovingInfo; blkMovingInfos.add(blkMovingInfo);
} }
/** /**

View File

@ -33,13 +33,13 @@ public class TestBlockStorageMovementAttemptedItems {
private BlockStorageMovementAttemptedItems bsmAttemptedItems = null; private BlockStorageMovementAttemptedItems bsmAttemptedItems = null;
private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null; private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null;
private final int selfRetryTimeout = 500;
@Before @Before
public void setup() { public void setup() {
unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(); unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded();
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, 500, bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
unsatisfiedStorageMovementFiles); selfRetryTimeout, unsatisfiedStorageMovementFiles);
bsmAttemptedItems.start();
} }
@After @After
@ -72,8 +72,9 @@ private boolean checkItemMovedForRetry(Long item, long retryTimeout)
@Test(timeout = 30000) @Test(timeout = 30000)
public void testAddResultWithFailureResult() throws Exception { public void testAddResultWithFailureResult() throws Exception {
bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234); Long item = new Long(1234);
bsmAttemptedItems.add(item); bsmAttemptedItems.add(item, true);
bsmAttemptedItems.addResults( bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.FAILURE)}); item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
@ -82,8 +83,9 @@ public void testAddResultWithFailureResult() throws Exception {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testAddResultWithSucessResult() throws Exception { public void testAddResultWithSucessResult() throws Exception {
bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234); Long item = new Long(1234);
bsmAttemptedItems.add(item); bsmAttemptedItems.add(item, true);
bsmAttemptedItems.addResults( bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)}); item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
@ -92,10 +94,93 @@ public void testAddResultWithSucessResult() throws Exception {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testNoResultAdded() throws Exception { public void testNoResultAdded() throws Exception {
bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234); Long item = new Long(1234);
bsmAttemptedItems.add(item); bsmAttemptedItems.add(item, true);
// After selfretry timeout, it should be added back for retry // After self retry timeout, it should be added back for retry
assertTrue(checkItemMovedForRetry(item, 600)); assertTrue("Failed to add to the retry list",
checkItemMovedForRetry(item, 600));
assertEquals("Failed to remove from the attempted list", 0,
bsmAttemptedItems.getAttemptedItemsCount());
} }
/**
* Partial block movement with BlocksStorageMovementResult#SUCCESS. Here,
* first occurrence is #blockStorageMovementResultCheck() and then
* #blocksStorageMovementUnReportedItemsCheck().
*/
@Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried1() throws Exception {
Long item = new Long(1234);
bsmAttemptedItems.add(item, false);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
// start block movement result monitor thread
bsmAttemptedItems.start();
assertTrue("Failed to add to the retry list",
checkItemMovedForRetry(item, 5000));
assertEquals("Failed to remove from the attempted list", 0,
bsmAttemptedItems.getAttemptedItemsCount());
}
/**
* Partial block movement with BlocksStorageMovementResult#SUCCESS. Here,
* first occurrence is #blocksStorageMovementUnReportedItemsCheck() and then
* #blockStorageMovementResultCheck().
*/
@Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried2() throws Exception {
Long item = new Long(1234);
bsmAttemptedItems.add(item, false);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out
bsmAttemptedItems.blocksStorageMovementUnReportedItemsCheck();
bsmAttemptedItems.blockStorageMovementResultCheck();
assertTrue("Failed to add to the retry list",
checkItemMovedForRetry(item, 5000));
assertEquals("Failed to remove from the attempted list", 0,
bsmAttemptedItems.getAttemptedItemsCount());
}
/**
* Partial block movement with only BlocksStorageMovementResult#FAILURE result
* and storageMovementAttemptedItems list is empty.
*/
@Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried3() throws Exception {
Long item = new Long(1234);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
bsmAttemptedItems.blockStorageMovementResultCheck();
assertTrue("Failed to add to the retry list",
checkItemMovedForRetry(item, 5000));
assertEquals("Failed to remove from the attempted list", 0,
bsmAttemptedItems.getAttemptedItemsCount());
}
/**
* Partial block movement with BlocksStorageMovementResult#FAILURE result and
* storageMovementAttemptedItems.
*/
@Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried4() throws Exception {
Long item = new Long(1234);
bsmAttemptedItems.add(item, false);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
bsmAttemptedItems.blockStorageMovementResultCheck();
assertTrue("Failed to add to the retry list",
checkItemMovedForRetry(item, 5000));
assertEquals("Failed to remove from the attempted list", 0,
bsmAttemptedItems.getAttemptedItemsCount());
}
} }

View File

@ -527,6 +527,59 @@ public void testMoveWithBlockPinning() throws Exception {
waitExpectedStorageType(file1, StorageType.DISK, 2, 30000); waitExpectedStorageType(file1, StorageType.DISK, 2, 30000);
} }
/**
* Tests to verify that for the given path, only few of the blocks or block
* src locations(src nodes) under the given path will be scheduled for block
* movement.
*
* For example, there are two block for a file:
*
* File1 => two blocks and default storage policy(HOT).
* blk_1[locations=A(DISK),B(DISK),C(DISK),D(DISK),E(DISK)],
* blk_2[locations=A(DISK),B(DISK),C(DISK),D(DISK),E(DISK)].
*
* Now, set storage policy to COLD.
* Only two Dns are available with expected storage type ARCHIVE, say A, E.
*
* SPS will schedule block movement to the coordinator node with the details,
* blk_1[move A(DISK) -> A(ARCHIVE), move E(DISK) -> E(ARCHIVE)],
* blk_2[move A(DISK) -> A(ARCHIVE), move E(DISK) -> E(ARCHIVE)].
*/
@Test(timeout = 300000)
public void testWhenOnlyFewSourceNodesHaveMatchingTargetNodes()
throws Exception {
try {
int numOfDns = 5;
config.setLong("dfs.block.size", 1024);
allDiskTypes =
new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.ARCHIVE}};
hdfsCluster = startCluster(config, allDiskTypes, numOfDns,
storagesPerDatanode, capacity);
dfs = hdfsCluster.getFileSystem();
writeContent(file, (short) 5);
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), "COLD");
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier identified that block to move to
// ARCHIVE area.
waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000);
waitExpectedStorageType(file, StorageType.DISK, 3, 30000);
waitForBlocksMovementResult(1, 30000);
} finally {
shutdownCluster();
}
}
private String createFileAndSimulateFavoredNodes(int favoredNodesCount) private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
throws IOException { throws IOException {
ArrayList<DataNode> dns = hdfsCluster.getDataNodes(); ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
@ -561,7 +614,7 @@ private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
DataNodeTestUtils.mockDatanodeBlkPinning(dn, true); DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
favoredNodesCount--; favoredNodesCount--;
if (favoredNodesCount <= 0) { if (favoredNodesCount <= 0) {
break;// marked favoredNodesCount number of pinned block location break; // marked favoredNodesCount number of pinned block location
} }
} }
return file1; return file1;
@ -600,8 +653,14 @@ public Boolean get() {
} }
private void writeContent(final String fileName) throws IOException { private void writeContent(final String fileName) throws IOException {
writeContent(fileName, (short) 3);
}
private void writeContent(final String fileName, short replicatonFactor)
throws IOException {
// write to DISK // write to DISK
final FSDataOutputStream out = dfs.create(new Path(fileName)); final FSDataOutputStream out = dfs.create(new Path(fileName),
replicatonFactor);
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
out.writeChars("t"); out.writeChars("t");
} }