diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index ac013487a9b..74cd90700e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -54,6 +54,8 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; @@ -98,6 +100,8 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; @@ -469,6 +473,8 @@ public static DatanodeCommand convert(DatanodeCommandProto proto) { return PBHelper.convert(proto.getBlkIdCmd()); case BlockECReconstructionCommand: return PBHelper.convert(proto.getBlkECReconstructionCmd()); + case BlockStorageMovementCommand: + return PBHelper.convert(proto.getBlkStorageMovementCmd()); default: return null; } @@ -603,6 +609,11 @@ public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) { .setBlkECReconstructionCmd( convert((BlockECReconstructionCommand) datanodeCommand)); break; + case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT: + builder.setCmdType(DatanodeCommandProto.Type.BlockStorageMovementCommand) + .setBlkStorageMovementCmd( + convert((BlockStorageMovementCommand) datanodeCommand)); + break; case DatanodeProtocol.DNA_UNKNOWN: //Not expected default: builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand); @@ -1124,4 +1135,83 @@ public static KeyValueProto convert(FileRegion fileRegion) { return new FileRegion(block, providedStorageLocation); } + + private static BlockStorageMovementCommandProto convert( + BlockStorageMovementCommand blkStorageMovementCmd) { + BlockStorageMovementCommandProto.Builder builder = + BlockStorageMovementCommandProto.newBuilder(); + + builder.setTrackID(blkStorageMovementCmd.getTrackID()); + builder.setBlockPoolId(blkStorageMovementCmd.getBlockPoolId()); + Collection blockMovingInfos = blkStorageMovementCmd + .getBlockMovingTasks(); + for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { + builder.addBlockStorageMovement( + convertBlockMovingInfo(blkMovingInfo)); + } + return builder.build(); + } + + private static BlockStorageMovementProto convertBlockMovingInfo( + BlockMovingInfo blkMovingInfo) { + BlockStorageMovementProto.Builder builder = BlockStorageMovementProto + .newBuilder(); + builder.setBlock(PBHelperClient.convert(blkMovingInfo.getBlock())); + + DatanodeInfo[] sourceDnInfos = blkMovingInfo.getSources(); + builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos)); + + DatanodeInfo[] targetDnInfos = blkMovingInfo.getTargets(); + builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos)); + + StorageType[] sourceStorageTypes = blkMovingInfo.getSourceStorageTypes(); + builder.setSourceStorageTypes(convertStorageTypesProto(sourceStorageTypes)); + + StorageType[] targetStorageTypes = blkMovingInfo.getTargetStorageTypes(); + builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes)); + + return builder.build(); + } + + private static DatanodeCommand convert( + BlockStorageMovementCommandProto blkStorageMovementCmdProto) { + Collection blockMovingInfos = new ArrayList<>(); + List blkSPSatisfyList = + blkStorageMovementCmdProto.getBlockStorageMovementList(); + for (BlockStorageMovementProto blkSPSatisfy : blkSPSatisfyList) { + blockMovingInfos.add(convertBlockMovingInfo(blkSPSatisfy)); + } + return new BlockStorageMovementCommand( + DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, + blkStorageMovementCmdProto.getTrackID(), + blkStorageMovementCmdProto.getBlockPoolId(), blockMovingInfos); + } + + private static BlockMovingInfo convertBlockMovingInfo( + BlockStorageMovementProto blockStoragePolicySatisfyProto) { + BlockProto blockProto = blockStoragePolicySatisfyProto.getBlock(); + Block block = PBHelperClient.convert(blockProto); + + DatanodeInfosProto sourceDnInfosProto = blockStoragePolicySatisfyProto + .getSourceDnInfos(); + DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto); + + DatanodeInfosProto targetDnInfosProto = blockStoragePolicySatisfyProto + .getTargetDnInfos(); + DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto); + + StorageTypesProto srcStorageTypesProto = blockStoragePolicySatisfyProto + .getSourceStorageTypes(); + StorageType[] srcStorageTypes = PBHelperClient.convertStorageTypes( + srcStorageTypesProto.getStorageTypesList(), + srcStorageTypesProto.getStorageTypesList().size()); + + StorageTypesProto targetStorageTypesProto = blockStoragePolicySatisfyProto + .getTargetStorageTypes(); + StorageType[] targetStorageTypes = PBHelperClient.convertStorageTypes( + targetStorageTypesProto.getStorageTypesList(), + targetStorageTypesProto.getStorageTypesList().size()); + return new BlockMovingInfo(block, sourceDnInfos, targetDnInfos, + srcStorageTypes, targetStorageTypes); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index bff74240b54..951837e4e49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -1089,19 +1089,4 @@ public void addBlocksToMoveStorage( public List getBlocksToMoveStorages() { return storageMovementBlocks.poll(); } - - // TODO: we will remove this method once DN side handling integrated. We can - // convert the test to check real block movements instead of this ds. - @VisibleForTesting - public List getStorageMovementPendingItems() { - List flatList = new ArrayList<>(); - Iterator> iterator = storageMovementBlocks - .iterator(); - while (iterator.hasNext()) { - List next = iterator.next(); - flatList.addAll(next); - } - return flatList; - } } - diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 698205ac472..65c5d6e5655 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1739,6 +1739,19 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, } } + // check pending block storage movement tasks + List pendingBlockMovementList = nodeinfo + .getBlocksToMoveStorages(); + if (pendingBlockMovementList != null) { + // TODO: trackID is used to track the block movement sends to coordinator + // datanode. Need to implement tracking logic. Temporarily, using a + // constant value -1. + long trackID = -1; + cmds.add(new BlockStorageMovementCommand( + DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, trackID, blockPoolId, + pendingBlockMovementList)); + } + if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index a25f6a92d82..22d88b59476 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -795,6 +795,13 @@ assert getBlockPoolId().equals(bp) : ((BlockECReconstructionCommand) cmd).getECTasks(); dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks); break; + case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT: + LOG.info("DatanodeCommand action: DNA_BLOCK_STORAGE_MOVEMENT"); + BlockStorageMovementCommand blkSPSCmd = (BlockStorageMovementCommand) cmd; + dn.getStoragePolicySatisfyWorker().processBlockMovingTasks( + blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(), + blkSPSCmd.getBlockMovingTasks()); + break; default: LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); } @@ -825,6 +832,7 @@ private boolean processCommandFromStandby(DatanodeCommand cmd, case DatanodeProtocol.DNA_CACHE: case DatanodeProtocol.DNA_UNCACHE: case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION: + case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT: LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction()); break; default: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 1e9c57a2cbf..5bcc934a328 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -386,6 +386,7 @@ public static InetSocketAddress createSocketAddr(String target) { private String dnUserName = null; private BlockRecoveryWorker blockRecoveryWorker; private ErasureCodingWorker ecWorker; + private StoragePolicySatisfyWorker storagePolicySatisfyWorker; private final Tracer tracer; private final TracerConfigurationManager tracerConfigurationManager; private static final int NUM_CORES = Runtime.getRuntime() @@ -1425,6 +1426,8 @@ void startDataNode(List dataDirectories, ecWorker = new ErasureCodingWorker(getConf(), this); blockRecoveryWorker = new BlockRecoveryWorker(this); + storagePolicySatisfyWorker = + new StoragePolicySatisfyWorker(getConf(), this); blockPoolManager = new BlockPoolManager(this); blockPoolManager.refreshNamenodes(getConf()); @@ -3617,3 +3620,7 @@ private DiskBalancer getDiskBalancer() throws IOException { return this.diskBalancer; } } + + StoragePolicySatisfyWorker getStoragePolicySatisfyWorker() { + return storagePolicySatisfyWorker; + }} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index fa408f608a0..2c999633e11 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -28,6 +28,7 @@ import java.io.OutputStream; import java.net.Socket; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.List; import java.util.concurrent.Callable; @@ -126,8 +127,25 @@ public void rejectedExecution(Runnable runnable, return moverThreadPool; } + /** + * Handles the given set of block movement tasks. This will iterate over the + * block movement list and submit each block movement task asynchronously in a + * separate thread. Each task will move the block replica to the target node & + * wait for the completion. + * + * TODO: Presently this function is a blocking call, this has to be refined by + * moving the tracking logic to another tracker thread. HDFS-10884 jira + * addresses the same. + * + * @param trackID + * unique tracking identifier + * @param blockPoolID + * block pool ID + * @param blockMovingInfos + * list of blocks to be moved + */ public void processBlockMovingTasks(long trackID, String blockPoolID, - List blockMovingInfos) { + Collection blockMovingInfos) { Future moveCallable = null; for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { assert blkMovingInfo @@ -143,8 +161,6 @@ public void processBlockMovingTasks(long trackID, String blockPoolID, } } - // TODO: Presently this function act as a blocking call, this has to be - // refined by moving the tracking logic to another tracker thread. for (int i = 0; i < moverTaskFutures.size(); i++) { try { moveCallable = moverExecutorCompletionService.take(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java index c1ab800607a..7c97f1a5012 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.protocol; import java.util.Arrays; +import java.util.Collection; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; @@ -33,12 +34,60 @@ * {@link org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker} * service. After the block movement this DataNode sends response back to the * NameNode about the movement status. + * + * The coordinator datanode will use 'trackId' identifier to coordinate the block + * movement of the given set of blocks. TrackId is a unique identifier that + * represents a group of blocks. Namenode will generate this unique value and + * send it to the coordinator datanode along with the + * BlockStorageMovementCommand. Datanode will monitor the completion of the + * block movements that grouped under this trackId and notifies Namenode about + * the completion status. */ public class BlockStorageMovementCommand extends DatanodeCommand { - // TODO: constructor needs to be refined based on the block movement data - // structure. - BlockStorageMovementCommand(int action) { + private final long trackID; + private final String blockPoolId; + private final Collection blockMovingTasks; + + /** + * Block storage movement command constructor. + * + * @param action + * protocol specific action + * @param trackID + * unique identifier to monitor the given set of block movements + * @param blockPoolId + * block pool ID + * @param blockMovingInfos + * block to storage info that will be used for movement + */ + public BlockStorageMovementCommand(int action, long trackID, + String blockPoolId, Collection blockMovingInfos) { super(action); + this.trackID = trackID; + this.blockPoolId = blockPoolId; + this.blockMovingTasks = blockMovingInfos; + } + + /** + * Returns trackID, which will be used to monitor the block movement assigned + * to this coordinator datanode. + */ + public long getTrackID() { + return trackID; + } + + /** + * Returns block pool ID. + */ + public String getBlockPoolId() { + return blockPoolId; + } + + /** + * Returns the list of blocks to be moved. + */ + public Collection getBlockMovingTasks() { + return blockMovingTasks; } /** @@ -47,10 +96,24 @@ public class BlockStorageMovementCommand extends DatanodeCommand { public static class BlockMovingInfo { private Block blk; private DatanodeInfo[] sourceNodes; - private StorageType[] sourceStorageTypes; private DatanodeInfo[] targetNodes; + private StorageType[] sourceStorageTypes; private StorageType[] targetStorageTypes; + /** + * Block to storage info constructor. + * + * @param block + * block + * @param sourceDnInfos + * node that can be the sources of a block move + * @param targetDnInfos + * target datanode info + * @param srcStorageTypes + * type of source storage media + * @param targetStorageTypes + * type of destin storage media + */ public BlockMovingInfo(Block block, DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos, StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 1f55100af98..283f367032b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -79,6 +79,7 @@ public interface DatanodeProtocol { final static int DNA_CACHE = 9; // cache blocks final static int DNA_UNCACHE = 10; // uncache blocks final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command + final static int DNA_BLOCK_STORAGE_MOVEMENT = 12; // block storage movement command /** * Register Datanode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index bf0df5bf144..8e198094844 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -60,6 +60,7 @@ message DatanodeCommandProto { NullDatanodeCommand = 7; BlockIdCommand = 8; BlockECReconstructionCommand = 9; + BlockStorageMovementCommand = 10; } required Type cmdType = 1; // Type of the command @@ -74,6 +75,7 @@ message DatanodeCommandProto { optional RegisterCommandProto registerCmd = 7; optional BlockIdCommandProto blkIdCmd = 8; optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9; + optional BlockStorageMovementCommandProto blkStorageMovementCmd = 10; } /** @@ -154,6 +156,26 @@ message BlockECReconstructionCommandProto { repeated BlockECReconstructionInfoProto blockECReconstructioninfo = 1; } + /** + * Block storage movement command + */ +message BlockStorageMovementCommandProto { + required uint64 trackID = 1; + required string blockPoolId = 2; + repeated BlockStorageMovementProto blockStorageMovement = 3; +} + +/** + * Block storage movement information + */ +message BlockStorageMovementProto { + required BlockProto block = 1; + required DatanodeInfosProto sourceDnInfos = 2; + required DatanodeInfosProto targetDnInfos = 3; + required StorageTypesProto sourceStorageTypes = 4; + required StorageTypesProto targetStorageTypes = 5; +} + /** * registration - Information of the datanode registering with the namenode */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index b61814dbbf1..37664b5f6d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -29,8 +26,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; -import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Before; import org.junit.Test; @@ -74,9 +70,6 @@ public void testWhenStoragePolicySetToCOLD() try { // Change policy to ALL_SSD distributedFS.setStoragePolicy(new Path(file), "COLD"); - Set previousNodes = - hdfsCluster.getNameNode().getNamesystem().getBlockManager() - .getDatanodeManager().getDatanodes(); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -91,8 +84,8 @@ public void testWhenStoragePolicySetToCOLD() hdfsCluster.triggerHeartbeats(); // Wait till namenode notified about the block location details - waitExpectedStorageType(StorageType.ARCHIVE, distributedFS, previousNodes, - 6, 30000); + waitExpectedStorageType(file, StorageType.ARCHIVE, distributedFS, 3, + 30000); } finally { hdfsCluster.shutdown(); } @@ -104,9 +97,6 @@ public void testWhenStoragePolicySetToALLSSD() try { // Change policy to ALL_SSD distributedFS.setStoragePolicy(new Path(file), "ALL_SSD"); - Set previousNodes = - hdfsCluster.getNameNode().getNamesystem().getBlockManager() - .getDatanodeManager().getDatanodes(); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -123,8 +113,34 @@ public void testWhenStoragePolicySetToALLSSD() hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier Identified that block to move to SSD // areas - waitExpectedStorageType(StorageType.SSD, distributedFS, previousNodes, 6, - 30000); + waitExpectedStorageType(file, StorageType.SSD, distributedFS, 3, 30000); + } finally { + hdfsCluster.shutdown(); + } + } + + @Test(timeout = 300000) + public void testWhenStoragePolicySetToONESSD() + throws Exception { + try { + // Change policy to ONE_SSD + distributedFS.setStoragePolicy(new Path(file), "ONE_SSD"); + FSNamesystem namesystem = hdfsCluster.getNamesystem(); + INode inode = namesystem.getFSDirectory().getINode(file); + + StorageType[][] newtypes = + new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; + + // Making sure SDD based nodes added to cluster. Adding SSD based + // datanodes. + startAdditionalDNs(config, 1, numOfDatanodes, newtypes, + storagesPerDatanode, capacity, hdfsCluster); + namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + hdfsCluster.triggerHeartbeats(); + // Wait till StorgePolicySatisfier Identified that block to move to SSD + // areas + waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000); + waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000); } finally { hdfsCluster.shutdown(); } @@ -174,35 +190,31 @@ private MiniDFSCluster startCluster(final Configuration conf, return cluster; } - // TODO: this assertion can be changed to end to end based assertion later - // when DN side processing work integrated to this work. - private void waitExpectedStorageType(final StorageType expectedStorageType, - final DistributedFileSystem dfs, - final Set previousNodes, int expectedArchiveCount, - int timeout) throws Exception { + // Check whether the Block movement has been successfully completed to satisfy + // the storage policy for the given file. + private void waitExpectedStorageType(final String fileName, + final StorageType expectedStorageType, final DistributedFileSystem dfs, + int expectedStorageCount, int timeout) throws Exception { GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { - Iterator iterator = previousNodes.iterator(); - int archiveCount = 0; - while (iterator.hasNext()) { - DatanodeDescriptor dn = iterator.next(); - List pendingItemsToMove = - dn.getStorageMovementPendingItems(); - for (BlockMovingInfo blkInfoToMoveStorage : pendingItemsToMove) { - StorageType[] targetStorageTypes = - blkInfoToMoveStorage.getTargetStorageTypes(); - for (StorageType storageType : targetStorageTypes) { - if (storageType == expectedStorageType) { - archiveCount++; - } - } + LocatedBlock lb = null; + try { + lb = dfs.getClient().getLocatedBlocks(fileName, 0).get(0); + } catch (IOException e) { + LOG.error("Exception while getting located blocks", e); + return false; + } + int actualStorageCount = 0; + for (StorageType storageType : lb.getStorageTypes()) { + if (expectedStorageType == storageType) { + actualStorageCount++; } } LOG.info( expectedStorageType + " replica count, expected={} and actual={}", - expectedArchiveCount, archiveCount); - return expectedArchiveCount == archiveCount; + expectedStorageType, actualStorageCount); + return expectedStorageCount == actualStorageCount; } }, 100, timeout); }