HDFS-11151. [SPS]: StoragePolicySatisfier should gracefully handle when there is no target node with the required storage type. Contributed by Rakesh R
This commit is contained in:
parent
b67ae6d9d7
commit
b07291e176
|
@ -4999,7 +4999,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
*/
|
||||
public void satisfyStoragePolicy(long id) {
|
||||
storageMovementNeeded.add(id);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Added block collection id {} to block "
|
||||
+ "storageMovementNeeded queue", id);
|
||||
}
|
||||
|
|
|
@ -27,8 +27,9 @@ import java.util.concurrent.Future;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -108,15 +109,32 @@ public class BlockStorageMovementTracker implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark as block movement failure for the given trackId and blockId.
|
||||
*
|
||||
* @param trackId tracking id
|
||||
* @param blockId block id
|
||||
*/
|
||||
void markBlockMovementFailure(long trackId, long blockId) {
|
||||
LOG.debug("Mark as block movement failure for the given "
|
||||
+ "trackId:{} and blockId:{}", trackId, blockId);
|
||||
BlockMovementResult result = new BlockMovementResult(trackId, blockId, null,
|
||||
BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE);
|
||||
addMovementResultToTrackIdList(result);
|
||||
}
|
||||
|
||||
private List<BlockMovementResult> addMovementResultToTrackIdList(
|
||||
BlockMovementResult result) {
|
||||
long trackId = result.getTrackId();
|
||||
List<BlockMovementResult> perTrackIdList = movementResults.get(trackId);
|
||||
if (perTrackIdList == null) {
|
||||
perTrackIdList = new ArrayList<>();
|
||||
movementResults.put(trackId, perTrackIdList);
|
||||
List<BlockMovementResult> perTrackIdList;
|
||||
synchronized (movementResults) {
|
||||
perTrackIdList = movementResults.get(trackId);
|
||||
if (perTrackIdList == null) {
|
||||
perTrackIdList = new ArrayList<>();
|
||||
movementResults.put(trackId, perTrackIdList);
|
||||
}
|
||||
perTrackIdList.add(result);
|
||||
}
|
||||
perTrackIdList.add(result);
|
||||
return perTrackIdList;
|
||||
}
|
||||
|
||||
|
|
|
@ -151,14 +151,24 @@ public class StoragePolicySatisfyWorker {
|
|||
*/
|
||||
public void processBlockMovingTasks(long trackID, String blockPoolID,
|
||||
Collection<BlockMovingInfo> blockMovingInfos) {
|
||||
LOG.debug("Received BlockMovingTasks {}", blockMovingInfos);
|
||||
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
|
||||
assert blkMovingInfo
|
||||
.getSources().length == blkMovingInfo.getTargets().length;
|
||||
|
||||
for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
|
||||
// Iterating backwards. This is to ensure that all the block src location
|
||||
// which doesn't have a target node will be marked as failure before
|
||||
// scheduling the block movement to valid target nodes.
|
||||
for (int i = blkMovingInfo.getSources().length - 1; i >= 0; i--) {
|
||||
if (i >= blkMovingInfo.getTargets().length) {
|
||||
// Since there is no target selected for scheduling the block,
|
||||
// just mark this block storage movement as failure. Later, namenode
|
||||
// can take action on this.
|
||||
movementTracker.markBlockMovementFailure(trackID,
|
||||
blkMovingInfo.getBlock().getBlockId());
|
||||
continue;
|
||||
}
|
||||
DatanodeInfo target = blkMovingInfo.getTargets()[i];
|
||||
BlockMovingTask blockMovingTask = new BlockMovingTask(
|
||||
trackID, blockPoolID, blkMovingInfo.getBlock(),
|
||||
blkMovingInfo.getSources()[i], blkMovingInfo.getTargets()[i],
|
||||
blkMovingInfo.getSources()[i], target,
|
||||
blkMovingInfo.getSourceStorageTypes()[i],
|
||||
blkMovingInfo.getTargetStorageTypes()[i]);
|
||||
Future<BlockMovementResult> moveCallable = moverCompletionService
|
||||
|
|
|
@ -218,4 +218,8 @@ public class BlockStorageMovementAttemptedItems {
|
|||
return storageMovementAttemptedResults.size();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getAttemptedItemsCount() {
|
||||
return storageMovementAttemptedItems.size();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -211,6 +211,14 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
addBlockMovingInfosToCoordinatorDn(blockCollectionID, blockMovingInfos,
|
||||
coordinatorNode);
|
||||
}
|
||||
|
||||
private void addBlockMovingInfosToCoordinatorDn(long blockCollectionID,
|
||||
List<BlockMovingInfo> blockMovingInfos,
|
||||
DatanodeDescriptor coordinatorNode) {
|
||||
|
||||
if (blockMovingInfos.size() < 1) {
|
||||
// TODO: Major: handle this case. I think we need retry cases to
|
||||
// be implemented. Idea is, if some files are not getting storage movement
|
||||
|
@ -218,6 +226,20 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
return;
|
||||
}
|
||||
|
||||
boolean needBlockStorageMovement = false;
|
||||
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
|
||||
// Check for atleast one block storage movement has been chosen
|
||||
if (blkMovingInfo.getTargets().length > 0){
|
||||
needBlockStorageMovement = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!needBlockStorageMovement) {
|
||||
// Simply return as there is no targets selected for scheduling the block
|
||||
// movement.
|
||||
return;
|
||||
}
|
||||
|
||||
// 'BlockCollectionId' is used as the tracking ID. All the blocks under this
|
||||
// blockCollectionID will be added to this datanode.
|
||||
coordinatorNode.addBlocksToMoveStorage(blockCollectionID, blockMovingInfos);
|
||||
|
@ -251,9 +273,8 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
List<DatanodeDescriptor> chosenNodes = new ArrayList<>();
|
||||
for (int i = 0; i < sourceWithStorageList.size(); i++) {
|
||||
StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
|
||||
StorageTypeNodePair chosenTarget =
|
||||
chooseTargetTypeInSameNode(existingTypeNodePair.dn, expected,
|
||||
locsForExpectedStorageTypes, chosenNodes);
|
||||
StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
|
||||
existingTypeNodePair.dn, expected);
|
||||
|
||||
if (chosenTarget == null && blockManager.getDatanodeManager()
|
||||
.getNetworkTopology().isNodeGroupAware()) {
|
||||
|
@ -282,15 +303,14 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
chosenNodes.add(chosenTarget.dn);
|
||||
// TODO: We can increment scheduled block count for this node?
|
||||
} else {
|
||||
// TODO: Failed to ChooseTargetNodes...So let just retry. Shall we
|
||||
// proceed without this targets? Then what should be final result?
|
||||
// How about pack empty target, means target node could not be chosen ,
|
||||
// so result should be RETRY_REQUIRED from DN always.
|
||||
// Log..unable to choose target node for source datanodeDescriptor
|
||||
LOG.warn(
|
||||
"Failed to choose target datanode for the required"
|
||||
+ " storage types {}, block:{}, existing storage type:{}",
|
||||
expected, blockInfo, existingTypeNodePair.storageType);
|
||||
sourceNodes.add(existingTypeNodePair.dn);
|
||||
sourceStorageTypes.add(existingTypeNodePair.storageType);
|
||||
targetNodes.add(null);
|
||||
targetStorageTypes.add(null);
|
||||
// Imp: Not setting the target details, empty targets. Later, this is
|
||||
// used as an indicator for retrying this block movement.
|
||||
}
|
||||
}
|
||||
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blockInfo,
|
||||
|
@ -302,15 +322,13 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Choose the target storage within same Datanode if possible.
|
||||
* Choose the target storage within same datanode if possible.
|
||||
*
|
||||
* @param locsForExpectedStorageTypes
|
||||
* @param chosenNodes
|
||||
* @param source source datanode
|
||||
* @param targetTypes list of target storage types
|
||||
*/
|
||||
private StorageTypeNodePair chooseTargetTypeInSameNode(
|
||||
DatanodeDescriptor source, List<StorageType> targetTypes,
|
||||
StorageTypeNodeMap locsForExpectedStorageTypes,
|
||||
List<DatanodeDescriptor> chosenNodes) {
|
||||
DatanodeDescriptor source, List<StorageType> targetTypes) {
|
||||
for (StorageType t : targetTypes) {
|
||||
DatanodeStorageInfo chooseStorage4Block =
|
||||
source.chooseStorage4Block(t, 0);
|
||||
|
@ -328,6 +346,9 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
for (StorageType t : targetTypes) {
|
||||
List<DatanodeDescriptor> nodesWithStorages =
|
||||
locsForExpectedStorageTypes.getNodesWithStorages(t);
|
||||
if (nodesWithStorages == null || nodesWithStorages.isEmpty()) {
|
||||
continue; // no target nodes with the required storage type.
|
||||
}
|
||||
Collections.shuffle(nodesWithStorages);
|
||||
for (DatanodeDescriptor target : nodesWithStorages) {
|
||||
if (!chosenNodes.contains(target) && matcher.match(
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -44,8 +46,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
|
||||
|
||||
/**
|
||||
* Tests that StoragePolicySatisfier daemon is able to check the blocks to be
|
||||
* moved and finding its suggested target locations to move.
|
||||
|
@ -79,7 +79,7 @@ public class TestStoragePolicySatisfier {
|
|||
throws Exception {
|
||||
|
||||
try {
|
||||
// Change policy to ALL_SSD
|
||||
// Change policy to COLD
|
||||
dfs.setStoragePolicy(new Path(file), "COLD");
|
||||
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||
|
@ -356,6 +356,108 @@ public class TestStoragePolicySatisfier {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests to verify that for the given path, some of the blocks or block src
|
||||
* locations(src nodes) under the given path will be scheduled for block
|
||||
* movement.
|
||||
*
|
||||
* For example, there are two block for a file:
|
||||
*
|
||||
* File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)],
|
||||
* blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD.
|
||||
* Only one datanode is available with storage type ARCHIVE, say D.
|
||||
*
|
||||
* SPS will schedule block movement to the coordinator node with the details,
|
||||
* blk_1[move A(DISK) -> D(ARCHIVE)], blk_2[move A(DISK) -> D(ARCHIVE)].
|
||||
*/
|
||||
@Test(timeout = 300000)
|
||||
public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy()
|
||||
throws Exception {
|
||||
try {
|
||||
// Change policy to COLD
|
||||
dfs.setStoragePolicy(new Path(file), "COLD");
|
||||
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||
|
||||
StorageType[][] newtypes =
|
||||
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}};
|
||||
|
||||
// Adding ARCHIVE based datanodes.
|
||||
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
|
||||
storagesPerDatanode, capacity, hdfsCluster);
|
||||
|
||||
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
|
||||
hdfsCluster.triggerHeartbeats();
|
||||
// Wait till StorgePolicySatisfier identified that block to move to
|
||||
// ARCHIVE area.
|
||||
waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000);
|
||||
waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
|
||||
|
||||
waitForBlocksMovementResult(1, 30000);
|
||||
} finally {
|
||||
hdfsCluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests to verify that for the given path, no blocks or block src
|
||||
* locations(src nodes) under the given path will be scheduled for block
|
||||
* movement as there are no available datanode with required storage type.
|
||||
*
|
||||
* For example, there are two block for a file:
|
||||
*
|
||||
* File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)],
|
||||
* blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD.
|
||||
* No datanode is available with storage type ARCHIVE.
|
||||
*
|
||||
* SPS won't schedule any block movement for this path.
|
||||
*/
|
||||
@Test(timeout = 300000)
|
||||
public void testWhenNoTargetDatanodeToSatisfyStoragePolicy()
|
||||
throws Exception {
|
||||
try {
|
||||
// Change policy to COLD
|
||||
dfs.setStoragePolicy(new Path(file), "COLD");
|
||||
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||
|
||||
StorageType[][] newtypes =
|
||||
new StorageType[][]{{StorageType.DISK, StorageType.DISK}};
|
||||
// Adding DISK based datanodes
|
||||
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
|
||||
storagesPerDatanode, capacity, hdfsCluster);
|
||||
|
||||
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
|
||||
hdfsCluster.triggerHeartbeats();
|
||||
|
||||
// No block movement will be scheduled as there is no target node available
|
||||
// with the required storage type.
|
||||
waitForAttemptedItems(1, 30000);
|
||||
waitExpectedStorageType(file, StorageType.DISK, 3, 30000);
|
||||
// Since there is no target node the item will get timed out and then
|
||||
// re-attempted.
|
||||
waitForAttemptedItems(1, 30000);
|
||||
} finally {
|
||||
hdfsCluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
|
||||
int timeout) throws TimeoutException, InterruptedException {
|
||||
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
|
||||
final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
|
||||
expectedBlkMovAttemptedCount,
|
||||
sps.getAttemptedItemsMonitor().getAttemptedItemsCount());
|
||||
return sps.getAttemptedItemsMonitor()
|
||||
.getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
|
||||
}
|
||||
}, 100, timeout);
|
||||
}
|
||||
|
||||
private void waitForBlocksMovementResult(long expectedBlkMovResultsCount,
|
||||
int timeout) throws TimeoutException, InterruptedException {
|
||||
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
|
||||
|
|
Loading…
Reference in New Issue