HDFS-13166: [SPS]: Implement caching mechanism to keep LIVE datanodes to minimize costly getLiveDatanodeStorageReport() calls. Contributed by Rakesh R.

This commit is contained in:
Surendra Singh Lilhore 2018-03-01 00:08:37 +05:30 committed by Uma Maheswara Rao Gangumalla
parent 8467ec24fb
commit 75ccc1396b
19 changed files with 432 additions and 261 deletions

View File

@ -646,6 +646,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_SPS_MAX_OUTSTANDING_PATHS_KEY =
"dfs.storage.policy.satisfier.max.outstanding.paths";
public static final int DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT = 10000;
// SPS datanode cache config, defaulting to 5mins.
public static final String DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS =
"dfs.storage.policy.satisfier.datanode.cache.refresh.interval.ms";
public static final long DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS_DEFAULT =
300000L;
// SPS keytab configurations, by default it is disabled.
public static final String DFS_SPS_ADDRESS_KEY =

View File

@ -23,8 +23,6 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.CheckDNSpaceRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.CheckDNSpaceResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
@ -277,21 +275,4 @@ public GetNextSPSPathResponseProto getNextSPSPath(
throw new ServiceException(e);
}
}
@Override
public CheckDNSpaceResponseProto checkDNSpaceForScheduling(
RpcController controller, CheckDNSpaceRequestProto request)
throws ServiceException {
try {
CheckDNSpaceResponseProto build = CheckDNSpaceResponseProto.newBuilder()
.setIsGoodDatanodeWithSpace(impl.checkDNSpaceForScheduling(
PBHelperClient.convert(request.getDnInfo()),
PBHelperClient.convertStorageType(request.getStorageType()),
request.getEstimatedSize()))
.build();
return build;
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -22,12 +22,10 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.CheckDNSpaceRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
@ -280,19 +278,4 @@ public String getNextSPSPath() throws IOException {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long estimatedSize) throws IOException {
CheckDNSpaceRequestProto req = CheckDNSpaceRequestProto.newBuilder()
.setDnInfo(PBHelperClient.convert(dn))
.setStorageType(PBHelperClient.convertStorageType(type))
.setEstimatedSize(estimatedSize).build();
try {
return rpcProxy.checkDNSpaceForScheduling(NULL_CONTROLLER, req)
.getIsGoodDatanodeWithSpace();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -2583,17 +2583,4 @@ public String getNextSPSPath() throws IOException {
}
return namesystem.getFilePath(pathId);
}
@Override
public boolean checkDNSpaceForScheduling(DatanodeInfo dn,
StorageType type, long estimatedSize) throws IOException {
checkNNStartup();
String operationName = "checkDNSpaceForScheduling";
namesystem.checkSuperuserPrivilege(operationName);
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
return namesystem.getBlockManager().getDatanodeManager()
.verifyTargetDatanodeHasSpaceForScheduling(dn, type, estimatedSize);
}
}

View File

@ -23,11 +23,10 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
@ -72,9 +71,12 @@ long getFileID(String path) throws UnresolvedLinkException,
/**
* Gets the network topology.
*
* @param datanodeMap
* target datanodes
*
* @return network topology
*/
NetworkTopology getNetworkTopology();
NetworkTopology getNetworkTopology(DatanodeMap datanodeMap);
/**
* Returns true if the give file exists in the Namespace.
@ -132,22 +134,6 @@ long getFileID(String path) throws UnresolvedLinkException,
DatanodeStorageReport[] getLiveDatanodeStorageReport()
throws IOException;
/**
* Checks whether the given datanode has sufficient space to occupy the given
* blockSize data.
*
* @param dn
* datanode info
* @param type
* storage type
* @param blockSize
* blockSize to be scheduled
* @return true if the given datanode has sufficient space to occupy blockSize
* data, false otherwise.
*/
boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long blockSize);
/**
* @return next SPS path info to process.
*/

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.sps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The Datanode cache Manager handles caching of {@link DatanodeStorageReport}.
*
* This class is instantiated by StoragePolicySatisifer. It maintains the array
* of datanode storage reports. It has a configurable refresh interval and
* periodically refresh the datanode cache by fetching latest
* {@link Context#getLiveDatanodeStorageReport()} once it reaches refresh
* interval.
*/
@InterfaceAudience.Private
public class DatanodeCacheManager<T> {
private static final Logger LOG = LoggerFactory
.getLogger(DatanodeCacheManager.class);
private final DatanodeMap datanodeMap;
private NetworkTopology cluster;
/**
* Interval between scans in milliseconds.
*/
private final long refreshIntervalMs;
private long lastAccessedTime;
public DatanodeCacheManager(Configuration conf) {
refreshIntervalMs = conf.getLong(
DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
LOG.info("DatanodeCacheManager refresh interval is {} milliseconds",
refreshIntervalMs);
datanodeMap = new DatanodeMap();
}
/**
* Returns the live datanodes and its storage details, which has available
* space (> 0) to schedule block moves. This will return array of datanodes
* from its local cache. It has a configurable refresh interval in millis and
* periodically refresh the datanode cache by fetching latest
* {@link Context#getLiveDatanodeStorageReport()} once it elapsed refresh
* interval.
*
* @throws IOException
*/
public DatanodeMap getLiveDatanodeStorageReport(
Context<T> spsContext) throws IOException {
long now = Time.monotonicNow();
long elapsedTimeMs = now - lastAccessedTime;
boolean refreshNeeded = elapsedTimeMs >= refreshIntervalMs;
lastAccessedTime = now;
if (refreshNeeded) {
if (LOG.isDebugEnabled()) {
LOG.debug("elapsedTimeMs > refreshIntervalMs : {} > {},"
+ " so refreshing cache", elapsedTimeMs, refreshIntervalMs);
}
datanodeMap.reset(); // clear all previously cached items.
// Fetch live datanodes from namenode and prepare DatanodeMap.
DatanodeStorageReport[] liveDns = spsContext
.getLiveDatanodeStorageReport();
for (DatanodeStorageReport storage : liveDns) {
StorageReport[] storageReports = storage.getStorageReports();
List<StorageType> storageTypes = new ArrayList<>();
List<Long> remainingSizeList = new ArrayList<>();
for (StorageReport t : storageReports) {
if (t.getRemaining() > 0) {
storageTypes.add(t.getStorage().getStorageType());
remainingSizeList.add(t.getRemaining());
}
}
datanodeMap.addTarget(storage.getDatanodeInfo(), storageTypes,
remainingSizeList);
}
if (LOG.isDebugEnabled()) {
LOG.debug("LIVE datanodes: {}", datanodeMap);
}
// get network topology
cluster = spsContext.getNetworkTopology(datanodeMap);
}
return datanodeMap;
}
NetworkTopology getCluster() {
return cluster;
}
}

View File

@ -24,18 +24,16 @@
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
import org.slf4j.Logger;
@ -136,7 +134,7 @@ public BlockStoragePolicy getStoragePolicy(byte policyID) {
}
@Override
public NetworkTopology getNetworkTopology() {
public NetworkTopology getNetworkTopology(DatanodeMap datanodeMap) {
return blockManager.getDatanodeManager().getNetworkTopology();
}
@ -152,23 +150,6 @@ public long getFileID(String path) throws UnresolvedLinkException,
}
}
@Override
public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long blockSize) {
namesystem.readLock();
try {
DatanodeDescriptor datanode = blockManager.getDatanodeManager()
.getDatanode(dn.getDatanodeUuid());
if (datanode == null) {
LOG.debug("Target datanode: " + dn + " doesn't exists");
return false;
}
return null != datanode.chooseStorage4Block(type, blockSize);
} finally {
namesystem.readUnlock();
}
}
@Override
public Long getNextSPSPath() {
return blockManager.getSPSManager().getNextPathId();

View File

@ -27,6 +27,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -50,8 +51,6 @@
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
@ -92,10 +91,12 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
private Context<T> ctxt;
private BlockMoveTaskHandler blockMoveTaskHandler;
private final Configuration conf;
private DatanodeCacheManager<T> dnCacheMgr;
public StoragePolicySatisfier(Configuration conf) {
this.conf = conf;
}
/**
* Represents the collective analysis status for all blocks.
*/
@ -190,6 +191,7 @@ public synchronized void start(boolean reconfigStart,
storagePolicySatisfierThread.start();
this.storageMovementsMonitor.start();
this.storageMovementNeeded.activate();
dnCacheMgr = new DatanodeCacheManager<T>(conf);
}
@Override
@ -271,7 +273,6 @@ public void run() {
}
T trackId = itemInfo.getFile();
BlocksMovingAnalysis status = null;
DatanodeStorageReport[] liveDnReports;
BlockStoragePolicy existingStoragePolicy;
// TODO: presently, context internally acquire the lock
// and returns the result. Need to discuss to move the lock outside?
@ -282,14 +283,13 @@ public void run() {
// just remove trackId from the queue
storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
} else {
liveDnReports = ctxt.getLiveDatanodeStorageReport();
byte existingStoragePolicyID = fileStatus.getStoragePolicy();
existingStoragePolicy = ctxt
.getStoragePolicy(existingStoragePolicyID);
HdfsLocatedFileStatus file = (HdfsLocatedFileStatus) fileStatus;
status = analyseBlocksStorageMovementsAndAssignToDN(file,
existingStoragePolicy, liveDnReports);
existingStoragePolicy);
switch (status.status) {
// Just add to monitor, so it will be retried after timeout
case ANALYSIS_SKIPPED_FOR_RETRY:
@ -380,8 +380,7 @@ public void run() {
private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
HdfsLocatedFileStatus fileInfo,
BlockStoragePolicy existingStoragePolicy,
DatanodeStorageReport[] liveDns) {
BlockStoragePolicy existingStoragePolicy) throws IOException {
BlocksMovingAnalysis.Status status =
BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED;
final ErasureCodingPolicy ecPolicy = fileInfo.getErasureCodingPolicy();
@ -407,6 +406,7 @@ private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
boolean hasLowRedundancyBlocks = false;
int replication = fileInfo.getReplication();
DatanodeMap liveDns = dnCacheMgr.getLiveDatanodeStorageReport(ctxt);
for (int i = 0; i < blocks.size(); i++) {
LocatedBlock blockInfo = blocks.get(i);
@ -462,7 +462,7 @@ private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
// If there is no block paired and few blocks are low redundant, so marking
// the status as FEW_LOW_REDUNDANCY_BLOCKS.
if (hasLowRedundancyBlocks
&& status == BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED) {
&& status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
}
List<Block> assignedBlockIds = new ArrayList<Block>();
@ -526,13 +526,17 @@ private boolean isLowRedundancyBlock(LocatedBlock blockInfo, int replication,
* - list to get existing storage types
* @param storages
* - available storages
* @param liveDns
* - live datanodes which can be used as targets
* @param ecPolicy
* - ec policy of sps invoked file
* @return false if some of the block locations failed to find target node to
* satisfy the storage policy, true otherwise
*/
private boolean computeBlockMovingInfos(
List<BlockMovingInfo> blockMovingInfos, LocatedBlock blockInfo,
List<StorageType> expectedStorageTypes, List<StorageType> existing,
DatanodeInfo[] storages, DatanodeStorageReport[] liveDns,
DatanodeInfo[] storages, DatanodeMap liveDns,
ErasureCodingPolicy ecPolicy) {
boolean foundMatchingTargetNodesForBlock = true;
if (!removeOverlapBetweenStorageTypes(expectedStorageTypes,
@ -572,12 +576,12 @@ private boolean computeBlockMovingInfos(
}
}
StorageTypeNodeMap locsForExpectedStorageTypes =
EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetDns =
findTargetsForExpectedStorageTypes(expectedStorageTypes, liveDns);
foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
blockMovingInfos, blockInfo, sourceWithStorageMap,
expectedStorageTypes, locsForExpectedStorageTypes,
expectedStorageTypes, targetDns,
ecPolicy);
}
return foundMatchingTargetNodesForBlock;
@ -593,9 +597,9 @@ private boolean computeBlockMovingInfos(
* - Block
* @param sourceWithStorageList
* - Source Datanode with storages list
* @param expected
* @param expectedTypes
* - Expecting storages to move
* @param locsForExpectedStorageTypes
* @param targetDns
* - Available DNs for expected storage types
* @return false if some of the block locations failed to find target node to
* satisfy the storage policy
@ -603,8 +607,8 @@ private boolean computeBlockMovingInfos(
private boolean findSourceAndTargetToMove(
List<BlockMovingInfo> blockMovingInfos, LocatedBlock blockInfo,
List<StorageTypeNodePair> sourceWithStorageList,
List<StorageType> expected,
StorageTypeNodeMap locsForExpectedStorageTypes,
List<StorageType> expectedTypes,
EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetDns,
ErasureCodingPolicy ecPolicy) {
boolean foundMatchingTargetNodesForBlock = true;
List<DatanodeInfo> excludeNodes = new ArrayList<>();
@ -617,9 +621,9 @@ private boolean findSourceAndTargetToMove(
// Check whether the block replica is already placed in the expected
// storage type in this source datanode.
if (!expected.contains(existingTypeNodePair.storageType)) {
if (!expectedTypes.contains(existingTypeNodePair.storageType)) {
StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(blockInfo,
existingTypeNodePair.dn, expected);
existingTypeNodePair.dn, targetDns, expectedTypes);
if (chosenTarget != null) {
if (blockInfo.isStriped()) {
buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
@ -631,7 +635,7 @@ private boolean findSourceAndTargetToMove(
existingTypeNodePair.storageType, chosenTarget.dn,
chosenTarget.storageType, blockMovingInfos);
}
expected.remove(chosenTarget.storageType);
expectedTypes.remove(chosenTarget.storageType);
}
}
// To avoid choosing this excludeNodes as targets later
@ -648,24 +652,23 @@ private boolean findSourceAndTargetToMove(
if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) {
continue;
}
if (chosenTarget == null && ctxt
.getNetworkTopology().isNodeGroupAware()) {
if (chosenTarget == null && dnCacheMgr.getCluster().isNodeGroupAware()) {
chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes,
expectedTypes, Matcher.SAME_NODE_GROUP, targetDns,
excludeNodes);
}
// Then, match nodes on the same rack
if (chosenTarget == null) {
chosenTarget =
chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
Matcher.SAME_RACK, locsForExpectedStorageTypes, excludeNodes);
chooseTarget(blockInfo, existingTypeNodePair.dn, expectedTypes,
Matcher.SAME_RACK, targetDns, excludeNodes);
}
if (chosenTarget == null) {
chosenTarget =
chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
Matcher.ANY_OTHER, locsForExpectedStorageTypes, excludeNodes);
chooseTarget(blockInfo, existingTypeNodePair.dn, expectedTypes,
Matcher.ANY_OTHER, targetDns, excludeNodes);
}
if (null != chosenTarget) {
if (blockInfo.isStriped()) {
@ -678,17 +681,17 @@ private boolean findSourceAndTargetToMove(
chosenTarget.storageType, blockMovingInfos);
}
expected.remove(chosenTarget.storageType);
expectedTypes.remove(chosenTarget.storageType);
excludeNodes.add(chosenTarget.dn);
} else {
LOG.warn(
"Failed to choose target datanode for the required"
+ " storage types {}, block:{}, existing storage type:{}",
expected, blockInfo, existingTypeNodePair.storageType);
expectedTypes, blockInfo, existingTypeNodePair.storageType);
}
}
if (expected.size() > 0) {
if (expectedTypes.size() > 0) {
foundMatchingTargetNodesForBlock = false;
}
@ -750,20 +753,38 @@ private void buildStripedBlockMovingInfos(LocatedBlock blockInfo,
/**
* Choose the target storage within same datanode if possible.
*
* @param block
* @param blockInfo
* - block info
* @param source
* - source datanode
* @param targetDns
* - set of target datanodes with its respective storage type
* @param targetTypes
* - list of target storage types
*/
private StorageTypeNodePair chooseTargetTypeInSameNode(LocatedBlock blockInfo,
DatanodeInfo source, List<StorageType> targetTypes) {
DatanodeInfo source,
EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetDns,
List<StorageType> targetTypes) {
for (StorageType t : targetTypes) {
boolean goodTargetDn =
ctxt.checkDNSpaceForScheduling(source, t, blockInfo.getBlockSize());
if (goodTargetDn) {
return new StorageTypeNodePair(t, source);
List<DatanodeWithStorage.StorageDetails> targetNodeStorages =
targetDns.get(t);
if (targetNodeStorages == null) {
continue;
}
for (DatanodeWithStorage.StorageDetails targetNode : targetNodeStorages) {
if (targetNode.getDatanodeInfo().equals(source)) {
// Good target with enough space to write the given block size.
if (targetNode.hasSpaceForScheduling(blockInfo.getBlockSize())) {
targetNode.incScheduledSize(blockInfo.getBlockSize());
return new StorageTypeNodePair(t, source);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Datanode:{} storage type:{} doesn't have sufficient "
+ "space:{} to move the target block size:{}",
source, t, targetNode, blockInfo.getBlockSize());
}
}
}
}
return null;
@ -771,84 +792,105 @@ private StorageTypeNodePair chooseTargetTypeInSameNode(LocatedBlock blockInfo,
private StorageTypeNodePair chooseTarget(LocatedBlock block,
DatanodeInfo source, List<StorageType> targetTypes, Matcher matcher,
StorageTypeNodeMap locsForExpectedStorageTypes,
List<DatanodeInfo> excludeNodes) {
EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>>
locsForExpectedStorageTypes, List<DatanodeInfo> excludeNodes) {
for (StorageType t : targetTypes) {
List<DatanodeInfo> nodesWithStorages =
locsForExpectedStorageTypes.getNodesWithStorages(t);
List<DatanodeWithStorage.StorageDetails> nodesWithStorages =
locsForExpectedStorageTypes.get(t);
if (nodesWithStorages == null || nodesWithStorages.isEmpty()) {
continue; // no target nodes with the required storage type.
}
Collections.shuffle(nodesWithStorages);
for (DatanodeInfo target : nodesWithStorages) {
for (DatanodeWithStorage.StorageDetails targetNode : nodesWithStorages) {
DatanodeInfo target = targetNode.getDatanodeInfo();
if (!excludeNodes.contains(target)
&& matcher.match(ctxt.getNetworkTopology(), source, target)) {
boolean goodTargetDn =
ctxt.checkDNSpaceForScheduling(target, t, block.getBlockSize());
if (goodTargetDn) {
&& matcher.match(dnCacheMgr.getCluster(), source, target)) {
// Good target with enough space to write the given block size.
if (targetNode.hasSpaceForScheduling(block.getBlockSize())) {
targetNode.incScheduledSize(block.getBlockSize());
return new StorageTypeNodePair(t, target);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Datanode:{} storage type:{} doesn't have sufficient "
+ "space:{} to move the target block size:{}",
target, t, targetNode, block.getBlockSize());
}
}
}
}
return null;
}
private static class StorageTypeNodePair {
private StorageType storageType = null;
private DatanodeInfo dn = null;
/**
* Keeps datanode with its respective storage type.
*/
private static final class StorageTypeNodePair {
private final StorageType storageType;
private final DatanodeInfo dn;
StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) {
private StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) {
this.storageType = storageType;
this.dn = dn;
}
}
private StorageTypeNodeMap findTargetsForExpectedStorageTypes(
List<StorageType> expected, DatanodeStorageReport[] liveDns) {
StorageTypeNodeMap targetMap = new StorageTypeNodeMap();
for (DatanodeStorageReport dn : liveDns) {
StorageReport[] storageReports = dn.getStorageReports();
for (StorageReport storageReport : storageReports) {
StorageType t = storageReport.getStorage().getStorageType();
if (expected.contains(t)) {
final long maxRemaining = getMaxRemaining(dn.getStorageReports(), t);
if (maxRemaining > 0L) {
targetMap.add(t, dn.getDatanodeInfo());
}
private EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>>
findTargetsForExpectedStorageTypes(List<StorageType> expected,
DatanodeMap liveDns) {
EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetsMap =
new EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>>(
StorageType.class);
for (StorageType storageType : expected) {
List<DatanodeWithStorage> nodes = liveDns.getTarget(storageType);
if (nodes == null) {
return targetsMap;
}
List<DatanodeWithStorage.StorageDetails> listNodes = targetsMap
.get(storageType);
if (listNodes == null) {
listNodes = new ArrayList<>();
targetsMap.put(storageType, listNodes);
}
for (DatanodeWithStorage n : nodes) {
final DatanodeWithStorage.StorageDetails node = getMaxRemaining(n,
storageType);
if (node != null) {
listNodes.add(node);
}
}
}
return targetMap;
return targetsMap;
}
private static long getMaxRemaining(StorageReport[] storageReports,
StorageType t) {
private static DatanodeWithStorage.StorageDetails getMaxRemaining(
DatanodeWithStorage node, StorageType storageType) {
long max = 0L;
for (StorageReport r : storageReports) {
if (r.getStorage().getStorageType() == t) {
if (r.getRemaining() > max) {
max = r.getRemaining();
}
DatanodeWithStorage.StorageDetails nodeInfo = null;
List<DatanodeWithStorage.StorageDetails> storages = node
.getNodesWithStorages(storageType);
for (DatanodeWithStorage.StorageDetails n : storages) {
if (n.availableSizeToMove() > max) {
max = n.availableSizeToMove();
nodeInfo = n;
}
}
return max;
return nodeInfo;
}
private boolean checkSourceAndTargetTypeExists(DatanodeInfo dn,
List<StorageType> existing, List<StorageType> expectedStorageTypes,
DatanodeStorageReport[] liveDns) {
List<StorageType> existingStorageTypes,
List<StorageType> expectedStorageTypes, DatanodeMap liveDns) {
boolean isExpectedTypeAvailable = false;
boolean isExistingTypeAvailable = false;
for (DatanodeStorageReport liveDn : liveDns) {
if (dn.equals(liveDn.getDatanodeInfo())) {
StorageReport[] storageReports = liveDn.getStorageReports();
for (StorageReport eachStorage : storageReports) {
StorageType storageType = eachStorage.getStorage().getStorageType();
if (existing.contains(storageType)) {
for (DatanodeWithStorage liveDn : liveDns.getTargets()) {
if (dn.equals(liveDn.datanode)) {
for (StorageType eachType : liveDn.getStorageTypes()) {
if (existingStorageTypes.contains(eachType)) {
isExistingTypeAvailable = true;
}
if (expectedStorageTypes.contains(storageType)) {
if (expectedStorageTypes.contains(eachType)) {
isExpectedTypeAvailable = true;
}
if (isExistingTypeAvailable && isExpectedTypeAvailable) {
@ -860,29 +902,143 @@ private boolean checkSourceAndTargetTypeExists(DatanodeInfo dn,
return isExistingTypeAvailable && isExpectedTypeAvailable;
}
private static class StorageTypeNodeMap {
private final EnumMap<StorageType, List<DatanodeInfo>> typeNodeMap =
new EnumMap<StorageType, List<DatanodeInfo>>(StorageType.class);
/**
* Maintains storage type map with the available datanodes in the cluster.
*/
public static class DatanodeMap {
private final EnumMap<StorageType, List<DatanodeWithStorage>> targetsMap =
new EnumMap<StorageType, List<DatanodeWithStorage>>(StorageType.class);
private void add(StorageType t, DatanodeInfo dn) {
List<DatanodeInfo> nodesWithStorages = getNodesWithStorages(t);
LinkedList<DatanodeInfo> value = null;
if (nodesWithStorages == null) {
value = new LinkedList<DatanodeInfo>();
value.add(dn);
typeNodeMap.put(t, value);
} else {
nodesWithStorages.add(dn);
private List<DatanodeWithStorage> targets = new ArrayList<>();
/**
* Build datanode map with the available storage types.
*
* @param node
* datanode
* @param storageTypes
* list of available storage types in the given datanode
* @param maxSize2Move
* available space which can be used for scheduling block move
*/
void addTarget(DatanodeInfo node, List<StorageType> storageTypes,
List<Long> maxSize2Move) {
DatanodeWithStorage nodeStorage = new DatanodeWithStorage(node);
targets.add(nodeStorage);
for (int i = 0; i < storageTypes.size(); i++) {
StorageType type = storageTypes.get(i);
List<DatanodeWithStorage> nodeStorages = targetsMap.get(type);
nodeStorage.addStorageType(type, maxSize2Move.get(i));
if (nodeStorages == null) {
nodeStorages = new LinkedList<>();
targetsMap.put(type, nodeStorages);
}
nodeStorages.add(nodeStorage);
}
}
List<DatanodeWithStorage> getTarget(StorageType storageType) {
return targetsMap.get(storageType);
}
public List<DatanodeWithStorage> getTargets() {
return targets;
}
void reset() {
targetsMap.clear();
}
}
/**
* Keeps datanode with its respective set of supported storage types. It holds
* the available space in each volumes and will be used while pairing the
* target datanodes.
*/
public static final class DatanodeWithStorage {
private final EnumMap<StorageType, List<StorageDetails>> storageMap =
new EnumMap<StorageType, List<StorageDetails>>(StorageType.class);
private final DatanodeInfo datanode;
private DatanodeWithStorage(DatanodeInfo datanode) {
this.datanode = datanode;
}
public DatanodeInfo getDatanodeInfo() {
return datanode;
}
Set<StorageType> getStorageTypes() {
return storageMap.keySet();
}
private void addStorageType(StorageType t, long maxSize2Move) {
List<StorageDetails> nodesWithStorages = getNodesWithStorages(t);
if (nodesWithStorages == null) {
nodesWithStorages = new LinkedList<StorageDetails>();
storageMap.put(t, nodesWithStorages);
}
nodesWithStorages.add(new StorageDetails(maxSize2Move));
}
/**
* Returns datanode storages which has the given storage type.
*
* @param type
* - Storage type
* @return datanodes which has the given storage type
* - storage type
* @return datanodes for the given storage type
*/
private List<DatanodeInfo> getNodesWithStorages(StorageType type) {
return typeNodeMap.get(type);
private List<StorageDetails> getNodesWithStorages(StorageType type) {
return storageMap.get(type);
}
@Override
public String toString() {
return new StringBuilder().append("DatanodeWithStorageInfo(\n ")
.append("Datanode: ").append(datanode).append(" StorageTypeNodeMap: ")
.append(storageMap).append(")").toString();
}
/** Storage details in a datanode storage type. */
final class StorageDetails {
private final long maxSize2Move;
private long scheduledSize = 0L;
private StorageDetails(long maxSize2Move) {
this.maxSize2Move = maxSize2Move;
}
private DatanodeInfo getDatanodeInfo() {
return DatanodeWithStorage.this.datanode;
}
/**
* Checks whether this datanode storage has sufficient space to occupy the
* given block size.
*/
private synchronized boolean hasSpaceForScheduling(long size) {
return availableSizeToMove() > size;
}
/**
* @return the total number of bytes that need to be moved.
*/
private synchronized long availableSizeToMove() {
return maxSize2Move - scheduledSize;
}
/** Increment scheduled size. */
private synchronized void incScheduledSize(long size) {
scheduledSize += size;
}
@Override
public String toString() {
return new StringBuilder().append("StorageDetails(\n ")
.append("maxSize2Move: ").append(maxSize2Move)
.append(" scheduledSize: ").append(scheduledSize).append(")")
.toString();
}
}
}

View File

@ -21,7 +21,6 @@
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@ -211,20 +210,5 @@ public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
*/
@AtMostOnce
String getNextSPSPath() throws IOException;
/**
* Verifies whether the given Datanode has the enough estimated size with
* given storage type for scheduling the block. This API used by External SPS.
*
* @param dn
* - datanode
* @param type
* - storage type
* @param estimatedSize
* - size
*/
@Idempotent
boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long estimatedSize) throws IOException;
}

View File

@ -20,15 +20,14 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
@ -37,6 +36,8 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeWithStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
@ -107,8 +108,14 @@ public long getFileID(String path) throws UnresolvedLinkException,
}
@Override
public NetworkTopology getNetworkTopology() {
return NetworkTopology.getInstance(service.getConf());
public NetworkTopology getNetworkTopology(DatanodeMap datanodeMap) {
// create network topology.
NetworkTopology cluster = NetworkTopology.getInstance(service.getConf());
List<DatanodeWithStorage> targets = datanodeMap.getTargets();
for (DatanodeWithStorage node : targets) {
cluster.add(node.getDatanodeInfo());
}
return cluster;
}
@Override
@ -167,23 +174,6 @@ public DatanodeStorageReport[] getLiveDatanodeStorageReport()
return nnc.getLiveDatanodeStorageReport();
}
@Override
public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long estimatedSize) {
// TODO: Instead of calling namenode for checking the available space, it
// can be optimized by maintaining local cache of datanode storage report
// and do the computations. This local cache can be refreshed per file or
// periodic fashion.
try {
return nnc.getNNProtocolConnection().checkDNSpaceForScheduling(dn, type,
estimatedSize);
} catch (IOException e) {
LOG.warn("Verify the given datanode:{} is good and has "
+ "estimated space in it.", dn, e);
return false;
}
}
@Override
public String getNextSPSPath() {
try {

View File

@ -221,24 +221,6 @@ message GetNextSPSPathResponseProto {
optional string spsPath = 1;
}
message CheckDNSpaceRequestProto {
required DatanodeInfoProto dnInfo = 1;
required StorageTypeProto storageType = 2;
required uint64 estimatedSize = 3;
}
message CheckDNSpaceResponseProto {
required bool isGoodDatanodeWithSpace = 1;
}
message HasLowRedundancyBlocksRequestProto {
required uint64 inodeId = 1;
}
message HasLowRedundancyBlocksResponseProto {
required bool hasLowRedundancyBlocks = 1;
}
/**
* Protocol used by the sub-ordinate namenode to send requests
* the active/primary namenode.
@ -326,11 +308,4 @@ service NamenodeProtocolService {
*/
rpc getNextSPSPath(GetNextSPSPathRequestProto)
returns (GetNextSPSPathResponseProto);
/**
* Verifies whether the given Datanode has the enough estimated size with
* given storage type for scheduling the block movement.
*/
rpc checkDNSpaceForScheduling(CheckDNSpaceRequestProto)
returns (CheckDNSpaceResponseProto);
}

View File

@ -4580,6 +4580,17 @@
</description>
</property>
<property>
<name>dfs.storage.policy.satisfier.datanode.cache.refresh.interval.ms</name>
<value>300000</value>
<description>
How often to refresh the datanode storages cache in milliseconds. This cache
keeps live datanode storage reports fetched from namenode. After elapsed time,
it will again fetch latest datanodes from namenode.
By default, this parameter is set to 5 minutes.
</description>
</property>
<property>
<name>dfs.pipeline.ecn</name>
<value>false</value>

View File

@ -243,7 +243,7 @@ If administrator wants to switch modes of SPS feature while Namenode is running,
hdfs dfsadmin -reconfig namenode <host:ipc_port> start
### Start External SPS Service.
If administrator wants to start external sps, first he/she needs to configure property `dfs.storage.policy.satisfier.mode` with `external` value in configuration file (`hdfs-site.xml`) and then run Namenode reconfig command. After this start external sps service using following command
If administrator wants to start external sps, first he/she needs to configure property `dfs.storage.policy.satisfier.mode` with `external` value in configuration file (`hdfs-site.xml`) and then run Namenode reconfig command. Please ensure that network topology configurations in the configuration file are same as namenode, this cluster will be used for matching target nodes. After this, start external sps service using following command
* Command:

View File

@ -69,6 +69,9 @@ private static void initConf(Configuration conf) {
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
// Reduced refresh cycle to update latest datanodes.
conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000);
}
@Before

View File

@ -105,6 +105,9 @@ private void clusterSetUp(boolean isHAEnabled, Configuration newConf)
"3000");
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
// Reduced refresh cycle to update latest datanodes.
conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000);
final int dnNumber = storageTypes.length;
final short replication = 3;
MiniDFSCluster.Builder clusterBuilder = new MiniDFSCluster.Builder(conf)
@ -295,6 +298,9 @@ public void testWithFederationHA() throws Exception {
conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
// Reduced refresh cycle to update latest datanodes.
conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000);
haCluster = new MiniDFSCluster
.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))

View File

@ -68,6 +68,9 @@ private void createCluster() throws IOException {
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
config.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
// Reduced refresh cycle to update latest datanodes.
config.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000);
startCluster(config, allDiskTypes, numOfDatanodes, storagesPerDatanode,
capacity);
dfs = cluster.getFileSystem(nnIndex);

View File

@ -159,6 +159,10 @@ public void setUp() {
config = new HdfsConfiguration();
config.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
// Most of the tests are restarting DNs and NN. So, reduced refresh cycle to
// update latest datanodes.
config.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
3000);
}
@Test(timeout = 300000)

View File

@ -69,6 +69,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
private int parityBlocks;
private int cellSize;
private int defaultStripeBlockSize;
private Configuration conf;
private ErasureCodingPolicy getEcPolicy() {
return StripedFileTestUtil.getDefaultECPolicy();
@ -84,6 +85,13 @@ public void init(){
parityBlocks = ecPolicy.getNumParityUnits();
cellSize = ecPolicy.getCellSize();
defaultStripeBlockSize = cellSize * stripesPerBlock;
conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
// Reduced refresh cycle to update latest datanodes.
conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000);
initConfWithStripe(conf, defaultStripeBlockSize);
}
/**
@ -103,10 +111,6 @@ public void testMoverWithFullStripe() throws Exception {
}
}
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
initConfWithStripe(conf, defaultStripeBlockSize);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.storagesPerDatanode(storagesPerDatanode)
@ -216,10 +220,6 @@ public void testWhenOnlyFewTargetNodesAreAvailableToSatisfyStoragePolicy()
}
}
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
initConfWithStripe(conf, defaultStripeBlockSize);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.storagesPerDatanode(storagesPerDatanode)
@ -325,13 +325,9 @@ public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception {
}
}
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
"3000");
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
initConfWithStripe(conf, defaultStripeBlockSize);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.storagesPerDatanode(storagesPerDatanode)
@ -420,10 +416,6 @@ public void testWhenNoTargetDatanodeToSatisfyStoragePolicy()
}
}
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
initConfWithStripe(conf, defaultStripeBlockSize);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.storagesPerDatanode(storagesPerDatanode)

View File

@ -49,6 +49,9 @@ public void clusterSetUp() throws IOException, URISyntaxException {
conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
// Reduced refresh cycle to update latest datanodes.
conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000);
StorageType[][] newtypes = new StorageType[][] {
{StorageType.ARCHIVE, StorageType.DISK}};
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL)