HDFS-13097: [SPS]: Fix the branch review comments(Part1). Contributed by Surendra Singh.

This commit is contained in:
Uma Maheswara Rao G 2018-02-07 02:28:23 -08:00 committed by Uma Maheswara Rao Gangumalla
parent d3de4fb2a0
commit 4402f3f855
33 changed files with 665 additions and 604 deletions

View File

@ -3110,8 +3110,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
public boolean isStoragePolicySatisfierRunning() throws IOException {
return namenode.isStoragePolicySatisfierRunning();
public boolean isInternalSatisfierRunning() throws IOException {
return namenode.isInternalSatisfierRunning();
}
Tracer getTracer() {

View File

@ -1759,12 +1759,12 @@ public interface ClientProtocol {
void satisfyStoragePolicy(String path) throws IOException;
/**
* Check if StoragePolicySatisfier is running.
* @return true if StoragePolicySatisfier is running
* Check if internal StoragePolicySatisfier is running.
* @return true if internal StoragePolicySatisfier is running
* @throws IOException
*/
@Idempotent
boolean isStoragePolicySatisfierRunning() throws IOException;
boolean isInternalSatisfierRunning() throws IOException;
/**
* Check the storage policy satisfy status of the path for which

View File

@ -150,8 +150,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSto
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
@ -301,8 +301,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
private final static GetErasureCodingCodecsRequestProto
VOID_GET_EC_CODEC_REQUEST = GetErasureCodingCodecsRequestProto
.newBuilder().build();
private final static IsStoragePolicySatisfierRunningRequestProto
VOID_IS_SPS_RUNNING_REQUEST = IsStoragePolicySatisfierRunningRequestProto
private final static IsInternalSatisfierRunningRequestProto
VOID_IS_SPS_RUNNING_REQUEST = IsInternalSatisfierRunningRequestProto
.newBuilder().build();
@ -1912,10 +1912,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public boolean isStoragePolicySatisfierRunning() throws IOException {
public boolean isInternalSatisfierRunning() throws IOException {
try {
IsStoragePolicySatisfierRunningResponseProto rep =
rpcProxy.isStoragePolicySatisfierRunning(null,
IsInternalSatisfierRunningResponseProto rep =
rpcProxy.isInternalSatisfierRunning(null,
VOID_IS_SPS_RUNNING_REQUEST);
return rep.getRunning();
} catch (ServiceException e) {

View File

@ -838,10 +838,10 @@ message SatisfyStoragePolicyResponseProto {
}
message IsStoragePolicySatisfierRunningRequestProto { // no parameters
message IsInternalSatisfierRunningRequestProto { // no parameters
}
message IsStoragePolicySatisfierRunningResponseProto {
message IsInternalSatisfierRunningResponseProto {
required bool running = 1;
}
@ -1048,8 +1048,8 @@ service ClientNamenodeProtocol {
returns(ListOpenFilesResponseProto);
rpc satisfyStoragePolicy(SatisfyStoragePolicyRequestProto)
returns(SatisfyStoragePolicyResponseProto);
rpc isStoragePolicySatisfierRunning(IsStoragePolicySatisfierRunningRequestProto)
returns(IsStoragePolicySatisfierRunningResponseProto);
rpc isInternalSatisfierRunning(IsInternalSatisfierRunningRequestProto)
returns(IsInternalSatisfierRunningResponseProto);
rpc checkStoragePolicySatisfyPathStatus(CheckStoragePolicySatisfyPathStatusRequestProto)
returns(CheckStoragePolicySatisfyPathStatusResponseProto);
}

View File

@ -2498,7 +2498,7 @@ public class RouterRpcServer extends AbstractService
}
@Override
public boolean isStoragePolicySatisfierRunning() throws IOException {
public boolean isInternalSatisfierRunning() throws IOException {
checkOperation(OperationCategory.READ, false);
return false;
}

View File

@ -53,7 +53,6 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -74,7 +73,6 @@ import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -1458,26 +1456,6 @@ public class DFSUtil {
return blocksReplWorkMultiplier;
}
/**
* Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from
* configuration.
*
* @param conf Configuration
* @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION
*/
public static int getSPSWorkMultiplier(Configuration conf) {
int spsWorkMultiplier = conf
.getInt(
DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION,
DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
Preconditions.checkArgument(
(spsWorkMultiplier > 0),
DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION +
" = '" + spsWorkMultiplier + "' is invalid. " +
"It should be a positive, non-zero integer value.");
return spsWorkMultiplier;
}
/**
* Get SPNEGO keytab Key from configuration
*
@ -1738,43 +1716,4 @@ public class DFSUtil {
}
return id;
}
/**
* Remove the overlap between the expected types and the existing types.
*
* @param expected
* - Expected storage types list.
* @param existing
* - Existing storage types list.
* @param ignoreNonMovable
* ignore non-movable storage types by removing them from both
* expected and existing storage type list to prevent non-movable
* storage from being moved.
* @returns if the existing types or the expected types is empty after
* removing the overlap.
*/
public static boolean removeOverlapBetweenStorageTypes(
List<StorageType> expected,
List<StorageType> existing, boolean ignoreNonMovable) {
for (Iterator<StorageType> i = existing.iterator(); i.hasNext();) {
final StorageType t = i.next();
if (expected.remove(t)) {
i.remove();
}
}
if (ignoreNonMovable) {
removeNonMovable(existing);
removeNonMovable(expected);
}
return expected.isEmpty() || existing.isEmpty();
}
private static void removeNonMovable(List<StorageType> types) {
for (Iterator<StorageType> i = types.iterator(); i.hasNext();) {
final StorageType t = i.next();
if (!t.isMovable()) {
i.remove();
}
}
}
}

View File

@ -162,8 +162,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFile
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
@ -1865,14 +1865,14 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
@Override
public IsStoragePolicySatisfierRunningResponseProto
isStoragePolicySatisfierRunning(RpcController controller,
IsStoragePolicySatisfierRunningRequestProto req)
public IsInternalSatisfierRunningResponseProto
isInternalSatisfierRunning(RpcController controller,
IsInternalSatisfierRunningRequestProto req)
throws ServiceException {
try {
boolean ret = server.isStoragePolicySatisfierRunning();
IsStoragePolicySatisfierRunningResponseProto.Builder builder =
IsStoragePolicySatisfierRunningResponseProto.newBuilder();
boolean ret = server.isInternalSatisfierRunning();
IsInternalSatisfierRunningResponseProto.Builder builder =
IsInternalSatisfierRunningResponseProto.newBuilder();
builder.setRunning(ret);
return builder.build();
} catch (IOException e) {

View File

@ -69,8 +69,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@ -94,12 +92,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSPathIds;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@ -435,11 +428,7 @@ public class BlockManager implements BlockStatsMXBean {
private final BlockIdManager blockIdManager;
/** For satisfying block storage policies. */
private final StoragePolicySatisfier sps;
private final boolean storagePolicyEnabled;
private StoragePolicySatisfierMode spsMode;
private SPSPathIds spsPaths;
private final int spsOutstandingPathsLimit;
private final StoragePolicySatisfyManager spsManager;
/** Minimum live replicas needed for the datanode to be transitioned
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
@ -479,19 +468,10 @@ public class BlockManager implements BlockStatsMXBean {
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
* 1000L);
// StoragePolicySatisfier(SPS) configs
storagePolicyEnabled =
conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT);
String spsModeVal = conf.get(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
spsOutstandingPathsLimit = conf.getInt(
DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY,
DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT);
spsMode = StoragePolicySatisfierMode.fromString(spsModeVal);
spsPaths = new SPSPathIds();
sps = new StoragePolicySatisfier(conf);
// sps manager manages the user invoked sps paths and does the movement.
spsManager = new StoragePolicySatisfyManager(conf, namesystem, this);
blockTokenSecretManager = createBlockTokenSecretManager(conf);
providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
@ -719,7 +699,7 @@ public class BlockManager implements BlockStatsMXBean {
}
public void close() {
stopSPS(false);
getSPSManager().stop();
bmSafeMode.close();
try {
redundancyThread.interrupt();
@ -733,7 +713,7 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager.close();
pendingReconstruction.stop();
blocksMap.close();
stopSPSGracefully();
getSPSManager().stopGracefully();
}
/** @return the datanodeManager */
@ -5046,222 +5026,9 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
* Gets the storage policy satisfier instance.
*
* @return sps
* @return sps manager.
*/
public StoragePolicySatisfier getStoragePolicySatisfier() {
return sps;
}
/**
* Start storage policy satisfier service.
*/
public void startSPS() {
if (!(storagePolicyEnabled && spsMode != StoragePolicySatisfierMode.NONE)) {
LOG.info(
"Failed to start StoragePolicySatisfier "
+ " as {} set to {} and {} set to {}.",
DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, spsMode);
return;
} else if (sps.isRunning()) {
LOG.info("Storage policy satisfier is already running"
+ " as internal service.");
return;
}
// starting internal SPS service
if (spsMode == StoragePolicySatisfierMode.INTERNAL) {
sps.start(false, spsMode);
}
}
/**
* Stop storage policy satisfier service.
*
* @param forceStop
* true represents that it should stop SPS service by clearing all
* pending SPS work
*/
public void stopSPS(boolean forceStop) {
if (!(storagePolicyEnabled
&& (spsMode != StoragePolicySatisfierMode.NONE))) {
LOG.info("Storage policy satisfier is not enabled.");
return;
} else if (!sps.isRunning()) {
removeAllSPSPathIds();
LOG.info("Storage policy satisfier is not running.");
return;
}
sps.disable(forceStop);
}
/**
* Enable storage policy satisfier by starting its service.
*/
public void enableInternalSPS() {
if (!storagePolicyEnabled){
LOG.info("Failed to start StoragePolicySatisfier as {} set to {}.",
DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled);
return;
}
if (sps.isRunning()) {
LOG.info("Storage policy satisfier is already running as SPS mode:{}.",
spsMode);
return;
}
updateSPSMode(StoragePolicySatisfierMode.INTERNAL);
sps.init(new IntraSPSNameNodeContext(this.namesystem, this, sps),
new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(),
sps),
new IntraSPSNameNodeBlockMoveTaskHandler(this, this.namesystem), null);
sps.start(true, spsMode);
}
/**
* Enable storage policy satisfier by starting its service.
*/
public void enableExternalSPS() {
if (!storagePolicyEnabled){
LOG.info("Failed to start StoragePolicySatisfier as {} set to {}.",
DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled);
return;
}
if (spsMode == StoragePolicySatisfierMode.EXTERNAL) {
LOG.info("Storage policy satisfier is already enabled as SPS mode:{}.",
spsMode);
return;
}
updateSPSMode(StoragePolicySatisfierMode.EXTERNAL);
sps.stopGracefully();
}
private void updateSPSMode(StoragePolicySatisfierMode newSpsMode) {
LOG.debug("Updating SPS service status, current mode:{}, new mode:{}",
spsMode, newSpsMode);
spsMode = newSpsMode;
}
/**
* Disable the storage policy satisfier by stopping its services.
*/
public void disableSPS() {
switch (spsMode) {
case NONE:
break;
case INTERNAL:
case EXTERNAL:
if (!sps.isRunning()) {
LOG.info("Storage policy satisfier is already stopped.");
} else {
LOG.info("Stopping StoragePolicySatisfier mode {}, as admin "
+ "requested to stop it.", spsMode);
sps.disable(true);
}
removeAllSPSPathIds();
break;
default:
// nothing
break;
}
updateSPSMode(StoragePolicySatisfierMode.NONE);
}
/**
* Timed wait to stop storage policy satisfier daemon threads.
*/
public void stopSPSGracefully() {
removeAllSPSPathIds();
sps.stopGracefully();
}
/**
* @return True if storage policy satisfier running.
*/
public boolean isStoragePolicySatisfierRunning() {
return sps.isRunning();
}
/**
* @return status
* Storage policy satisfy status of the path.
* @throws IOException
*/
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
if (spsMode != StoragePolicySatisfierMode.INTERNAL) {
LOG.debug("Satisfier is not running inside namenode, so status "
+ "can't be returned.");
throw new IOException("Satisfier is not running inside namenode, "
+ "so status can't be returned.");
}
return sps.checkStoragePolicySatisfyPathStatus(path);
}
/**
* @return SPS service instance.
*/
public SPSService getSPSService() {
return this.sps;
}
/**
* @return the next SPS path id, on which path users has invoked to satisfy
* storages.
*/
public Long getNextSPSPathId() {
return spsPaths.pollNext();
}
/**
* Verify that satisfier queue limit exceeds allowed outstanding limit.
*/
public void verifyOutstandingSPSPathQLimit() throws IOException {
long size = spsPaths.size();
// Checking that the SPS call Q exceeds the allowed limit.
if (spsOutstandingPathsLimit - size <= 0) {
LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}",
spsOutstandingPathsLimit, size);
throw new IOException("Outstanding satisfier queue limit: "
+ spsOutstandingPathsLimit + " exceeded, try later!");
}
}
/**
* Removes the SPS path id from the list of sps paths.
*/
public void removeSPSPathId(long trackId) {
spsPaths.remove(trackId);
}
/**
* Clean up all sps path ids.
*/
public void removeAllSPSPathIds() {
spsPaths.clear();
}
/**
* Adds the sps path to SPSPathIds list.
*/
public void addSPSPathId(long id) {
spsPaths.add(id);
}
/**
* @return true if sps is running as an internal service or external service.
*/
public boolean isSPSEnabled() {
return spsMode == StoragePolicySatisfierMode.INTERNAL
|| spsMode == StoragePolicySatisfierMode.EXTERNAL;
}
/**
* @return sps service mode.
*/
public StoragePolicySatisfierMode getSPSMode() {
return spsMode;
public StoragePolicySatisfyManager getSPSManager() {
return spsManager;
}
}

View File

@ -211,8 +211,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
* A queue of blocks corresponding to trackID for moving its storage
* placements by this datanode.
*/
private final Queue<BlockMovingInfo> storageMovementBlocks =
new LinkedList<>();
private final BlockQueue<BlockMovingInfo> storageMovementBlocks =
new BlockQueue<>();
private volatile boolean dropSPSWork = false;
/* Variables for maintaining number of blocks scheduled to be written to
@ -369,6 +369,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
this.pendingCached.clear();
this.cached.clear();
this.pendingUncached.clear();
this.storageMovementBlocks.clear();
}
public int numBlocks() {
@ -1082,9 +1083,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
* - storage mismatched block info
*/
public void addBlocksToMoveStorage(BlockMovingInfo blkMovingInfo) {
synchronized (storageMovementBlocks) {
storageMovementBlocks.offer(blkMovingInfo);
}
storageMovementBlocks.offer(blkMovingInfo);
BlockManager.LOG
.debug("Adding block move task " + blkMovingInfo + " to " + getName()
+ ", current queue size is " + storageMovementBlocks.size());
}
/**
@ -1101,23 +1103,18 @@ public class DatanodeDescriptor extends DatanodeInfo {
* total number of blocks which will be send to this datanode for
* block movement.
*
* @return block infos which needs to move its storage locations.
* @return block infos which needs to move its storage locations or null if
* there is no block infos to move.
*/
public BlockMovingInfo[] getBlocksToMoveStorages(int numBlocksToMoveTasks) {
synchronized (storageMovementBlocks) {
List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
for (; !storageMovementBlocks.isEmpty()
&& numBlocksToMoveTasks > 0; numBlocksToMoveTasks--) {
blockMovingInfos.add(storageMovementBlocks.poll());
}
BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos
.size()];
blkMoveArray = blockMovingInfos.toArray(blkMoveArray);
if (blkMoveArray.length > 0) {
return blkMoveArray;
}
List<BlockMovingInfo> blockMovingInfos = storageMovementBlocks
.poll(numBlocksToMoveTasks);
if (blockMovingInfos == null || blockMovingInfos.size() <= 0) {
return null;
}
BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos
.size()];
return blockMovingInfos.toArray(blkMoveArray);
}
/**

View File

@ -365,7 +365,7 @@ public interface HdfsServerConstants {
String XATTR_ERASURECODING_POLICY =
"system.hdfs.erasurecoding.policy";
String XATTR_SATISFY_STORAGE_POLICY = "user.hdfs.sps.xattr";
String XATTR_SATISFY_STORAGE_POLICY = "user.hdfs.sps";
Path MOVER_ID_PATH = new Path("/system/mover.id");

View File

@ -75,9 +75,8 @@ public class StoragePolicySatisfyWorker {
public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
this.datanode = datanode;
moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
// Defaulting to 10. This is to minimise the number of move ops.
moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 10);
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
moverCompletionService = new ExecutorCompletionService<>(moveExecutor);
handler = new BlocksMovementsStatusHandler();
@ -127,21 +126,13 @@ public class StoragePolicySatisfyWorker {
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("BlockMoverTask-" + threadIndex.getAndIncrement());
return t;
}
}, new ThreadPoolExecutor.CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor e) {
LOG.info("Execution for block movement to satisfy storage policy"
+ " got rejected, Executing in current thread");
// will run in the current thread.
super.rejectedExecution(runnable, e);
}
});
moverThreadPool.allowCoreThreadTimeOut(true);

View File

@ -661,7 +661,7 @@ public class Mover {
boolean spsRunning;
try {
spsRunning = nnc.getDistributedFileSystem().getClient()
.isStoragePolicySatisfierRunning();
.isInternalSatisfierRunning();
} catch (RemoteException e) {
IOException cause = e.unwrapRemoteException();
if (cause instanceof StandbyException) {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
@ -75,24 +76,33 @@ final class FSDirSatisfyStoragePolicyOp {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
}
INode inode = FSDirectory.resolveLastINode(iip);
if (inodeHasSatisfyXAttr(inode)) {
throw new IOException(
"Cannot request to call satisfy storage policy on path "
if (inode.isFile() && inode.asFile().numBlocks() == 0) {
if (NameNode.LOG.isInfoEnabled()) {
NameNode.LOG.info(
"Skipping satisfy storage policy on path:{} as "
+ "this file doesn't have any blocks!",
inode.getFullPathName());
}
} else if (inodeHasSatisfyXAttr(inode)) {
NameNode.LOG
.warn("Cannot request to call satisfy storage policy on path: "
+ inode.getFullPathName()
+ ", as this file/dir was already called for satisfying "
+ "storage policy.");
}
if (unprotectedSatisfyStoragePolicy(inode, fsd)) {
} else {
XAttr satisfyXAttr = XAttrHelper
.buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
xAttrs.add(satisfyXAttr);
List<XAttr> xAttrs = Arrays.asList(satisfyXAttr);
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs,
xAttrs, EnumSet.of(XAttrSetFlag.CREATE));
XAttrStorage.updateINodeXAttrs(inode, newXAttrs,
iip.getLatestSnapshotId());
fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
// Adding directory in the pending queue, so FileInodeIdCollector
// process directory child in batch and recursively
fsd.getBlockManager().getSPSManager().addPathId(inode.getId());
}
} finally {
fsd.writeUnlock();
@ -106,7 +116,7 @@ final class FSDirSatisfyStoragePolicyOp {
} else {
// Adding directory in the pending queue, so FileInodeIdCollector process
// directory child in batch and recursively
fsd.getBlockManager().addSPSPathId(inode.getId());
fsd.getBlockManager().getSPSManager().addPathId(inode.getId());
return true;
}
}

View File

@ -90,7 +90,6 @@ class FSDirStatAndListingOp {
* @param srcArg The string representation of the path to the file
* @param resolveLink whether to throw UnresolvedLinkException
* if src refers to a symlink
* @param needLocation if blockLocations need to be returned
*
* @param needLocation Include {@link LocatedBlocks} in result.
* @param needBlockToken Include block tokens in {@link LocatedBlocks}.

View File

@ -209,7 +209,7 @@ class FSDirXAttrOp {
for (XAttr xattr : toRemove) {
if (XATTR_SATISFY_STORAGE_POLICY
.equals(XAttrHelper.getPrefixedName(xattr))) {
fsd.getBlockManager().getStoragePolicySatisfier()
fsd.getBlockManager().getSPSManager().getInternalSPSService()
.clearQueue(inode.getId());
break;
}

View File

@ -1401,7 +1401,7 @@ public class FSDirectory implements Closeable {
if (!inode.isSymlink()) {
final XAttrFeature xaf = inode.getXAttrFeature();
addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
if (namesystem.getBlockManager().isSPSEnabled()) {
if (namesystem.getBlockManager().getSPSManager().isEnabled()) {
addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
}
}

View File

@ -259,10 +259,7 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
@ -1295,13 +1292,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir,
edekCacheLoaderDelay, edekCacheLoaderInterval);
}
blockManager.getSPSService().init(
new IntraSPSNameNodeContext(this, blockManager,
blockManager.getSPSService()),
new IntraSPSNameNodeFileIdCollector(getFSDirectory(),
blockManager.getSPSService()),
new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this), null);
blockManager.startSPS();
blockManager.getSPSManager().start();
} finally {
startingActiveService = false;
blockManager.checkSafeMode();
@ -1332,7 +1323,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
writeLock();
try {
if (blockManager != null) {
blockManager.stopSPS(false);
blockManager.getSPSManager().stop();
}
stopSecretManager();
leaseManager.stopMonitor();
@ -1372,7 +1363,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// Don't want to keep replication queues when not in Active.
blockManager.clearQueues();
blockManager.setInitializedReplQueues(false);
blockManager.stopSPSGracefully();
blockManager.getSPSManager().stopGracefully();
}
} finally {
writeUnlock("stopActiveServices");
@ -2281,17 +2272,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
DFS_STORAGE_POLICY_ENABLED_KEY));
}
// checks sps status
if (!blockManager.isSPSEnabled()
|| (blockManager.getSPSMode() == StoragePolicySatisfierMode.INTERNAL
&& !blockManager.getStoragePolicySatisfier().isRunning())) {
if (!blockManager.getSPSManager().isEnabled() || (blockManager
.getSPSManager().getMode() == StoragePolicySatisfierMode.INTERNAL
&& !blockManager.getSPSManager().isInternalSatisfierRunning())) {
throw new UnsupportedActionException(
"Cannot request to satisfy storage policy "
+ "when storage policy satisfier feature has been disabled"
+ " by admin. Seek for an admin help to enable it "
+ "or use Mover tool.");
}
// checks SPS Q has many outstanding requests.
blockManager.verifyOutstandingSPSPathQLimit();
// checks SPS Q has many outstanding requests. It will throw IOException if
// the limit exceeds.
blockManager.getSPSManager().verifyOutstandingPathQLimit();
}
/**
@ -3996,17 +3988,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
// Handle blocks movement results sent by the coordinator datanode.
StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
if (sps != null) {
if (!sps.isRunning()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Storage policy satisfier is not running. So, ignoring storage"
+ " movement attempt finished block info sent by DN");
}
} else {
sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished);
SPSService sps = blockManager.getSPSManager().getInternalSPSService();
if (!sps.isRunning()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Storage policy satisfier is not running. So, ignoring storage"
+ " movement attempt finished block info sent by DN");
}
} else {
sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished);
}
//create ha status

View File

@ -2043,7 +2043,7 @@ public class NameNode extends ReconfigurableBase implements
} else if (property.equals(ipcClientRPCBackoffEnable)) {
return reconfigureIPCBackoffEnabled(newVal);
} else if (property.equals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY)) {
return reconfigureSPSEnabled(newVal, property);
return reconfigureSPSModeEvent(newVal, property);
} else {
throw new ReconfigurationException(property, newVal, getConf().get(
property));
@ -2127,39 +2127,27 @@ public class NameNode extends ReconfigurableBase implements
return Boolean.toString(clientBackoffEnabled);
}
String reconfigureSPSEnabled(String newVal, String property)
String reconfigureSPSModeEvent(String newVal, String property)
throws ReconfigurationException {
if (newVal == null
|| StoragePolicySatisfierMode.fromString(newVal) == null) {
throw new ReconfigurationException(property, newVal,
getConf().get(property),
new HadoopIllegalArgumentException(
"For enabling or disabling storage policy satisfier, we must "
+ "pass either none/internal/external string value only"));
"For enabling or disabling storage policy satisfier, must "
+ "pass either internal/external/none string value only"));
}
if (!isActiveState()) {
throw new ReconfigurationException(property, newVal,
getConf().get(property), new HadoopIllegalArgumentException(
"Enabling or disabling storage policy satisfier service on "
+ state + " NameNode is not allowed"));
getConf().get(property),
new HadoopIllegalArgumentException(
"Enabling or disabling storage policy satisfier service on "
+ state + " NameNode is not allowed"));
}
StoragePolicySatisfierMode mode = StoragePolicySatisfierMode
.fromString(newVal);
switch(mode){
case NONE:
namesystem.getBlockManager().disableSPS();
break;
case INTERNAL:
namesystem.getBlockManager().enableInternalSPS();
break;
case EXTERNAL:
namesystem.getBlockManager().enableExternalSPS();
break;
default:
// nothing
break;
}
namesystem.getBlockManager().getSPSManager().changeModeEvent(mode);
return newVal;
}

View File

@ -2536,15 +2536,15 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
@Override
public boolean isStoragePolicySatisfierRunning() throws IOException {
public boolean isInternalSatisfierRunning() throws IOException {
checkNNStartup();
String operationName = "isStoragePolicySatisfierRunning";
String operationName = "isInternalSatisfierRunning";
namesystem.checkSuperuserPrivilege(operationName);
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
boolean isSPSRunning =
namesystem.getBlockManager().isStoragePolicySatisfierRunning();
boolean isSPSRunning = namesystem.getBlockManager().getSPSManager()
.isInternalSatisfierRunning();
namesystem.logAuditEvent(true, operationName, null);
return isSPSRunning;
}
@ -2556,8 +2556,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
return namesystem.getBlockManager().checkStoragePolicySatisfyPathStatus(
path);
return namesystem.getBlockManager().getSPSManager()
.checkStoragePolicySatisfyPathStatus(path);
}
@Override
@ -2579,17 +2579,16 @@ public class NameNodeRpcServer implements NamenodeProtocols {
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
// Check that internal SPS service is running
if (namesystem.getBlockManager()
.getSPSMode() == StoragePolicySatisfierMode.INTERNAL
&& namesystem.getBlockManager().getSPSService().isRunning()) {
// Check that SPS daemon service is running inside namenode
if (namesystem.getBlockManager().getSPSManager()
.getMode() == StoragePolicySatisfierMode.INTERNAL) {
LOG.debug("SPS service is internally enabled and running inside "
+ "namenode, so external SPS is not allowed to fetch the path Ids");
throw new IOException("SPS service is internally enabled and running"
+ " inside namenode, so external SPS is not allowed to fetch"
+ " the path Ids");
}
return namesystem.getBlockManager().getNextSPSPathId();
return namesystem.getBlockManager().getSPSManager().getNextPathId();
}
@Override

View File

@ -311,7 +311,7 @@ public class BlockStorageMovementNeeded {
if (Time.monotonicNow()
- lastStatusCleanTime > statusClearanceElapsedTimeMs) {
lastStatusCleanTime = Time.monotonicNow();
cleanSpsStatus();
cleanSPSStatus();
}
startINodeId = null; // Current inode id successfully scanned.
}
@ -333,7 +333,7 @@ public class BlockStorageMovementNeeded {
}
}
private synchronized void cleanSpsStatus() {
private synchronized void cleanSPSStatus() {
for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it =
spsStatus.entrySet().iterator(); it.hasNext();) {
Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next();

View File

@ -178,17 +178,17 @@ public class IntraSPSNameNodeContext implements Context {
@Override
public Long getNextSPSPathId() {
return blockManager.getNextSPSPathId();
return blockManager.getSPSManager().getNextPathId();
}
@Override
public void removeSPSPathId(long trackId) {
blockManager.removeSPSPathId(trackId);
blockManager.getSPSManager().removePathId(trackId);
}
@Override
public void removeAllSPSPathIds() {
blockManager.removeAllSPSPathIds();
blockManager.getSPSManager().removeAllPathIds();
}
@Override

View File

@ -1,70 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.sps;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A class which holds the SPS invoked path ids.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class SPSPathIds {
// List of pending dir to satisfy the policy
private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
/**
* Add the path id to queue.
*/
public synchronized void add(long pathId) {
spsDirsToBeTraveresed.add(pathId);
}
/**
* Removes the path id.
*/
public synchronized void remove(long pathId) {
spsDirsToBeTraveresed.remove(pathId);
}
/**
* Clears all path ids.
*/
public synchronized void clear() {
spsDirsToBeTraveresed.clear();
}
/**
* @return next path id available in queue.
*/
public synchronized Long pollNext() {
return spsDirsToBeTraveresed.poll();
}
/**
* @return the size of the queue.
*/
public synchronized long size() {
return spsDirsToBeTraveresed.size();
}
}

View File

@ -67,11 +67,12 @@ public interface SPSService {
void stopGracefully();
/**
* Disable the SPS service.
* Stops the SPS service.
*
* @param forceStop
* true represents to clear all the sps path's hint, false otherwise.
*/
void disable(boolean forceStop);
void stop(boolean forceStop);
/**
* Check whether StoragePolicySatisfier is running.
@ -105,6 +106,11 @@ public interface SPSService {
*/
int processingQueueSize();
/**
* Clear inodeId present in the processing queue.
*/
void clearQueue(long inodeId);
/**
* @return the configuration.
*/

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -60,6 +59,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* Setting storagePolicy on a file after the file write will only update the new
@ -145,7 +145,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
new BlockStorageMovementAttemptedItems(this,
storageMovementNeeded, blockMovementListener);
this.blockMoveTaskHandler = blockMovementTaskHandler;
this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(getConf());
this.spsWorkMultiplier = getSPSWorkMultiplier(getConf());
this.blockMovementMaxRetry = getConf().getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
@ -163,8 +163,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
serviceMode);
return;
}
isRunning = true;
this.spsMode = serviceMode;
if (spsMode == StoragePolicySatisfierMode.INTERNAL
&& ctxt.isMoverRunning()) {
isRunning = false;
@ -182,6 +180,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
StringUtils.toLowerCase(spsMode.toString()));
}
isRunning = true;
this.spsMode = serviceMode;
// Ensure that all the previously submitted block movements(if any) have to
// be stopped in all datanodes.
addDropSPSWorkCommandsToAllDNs();
@ -193,7 +193,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
@Override
public synchronized void disable(boolean forceStop) {
public synchronized void stop(boolean forceStop) {
isRunning = false;
if (storagePolicySatisfierThread == null) {
return;
@ -214,19 +214,22 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
@Override
public synchronized void stopGracefully() {
if (isRunning) {
disable(true);
stop(false);
}
if (this.storageMovementsMonitor != null) {
this.storageMovementsMonitor.stopGracefully();
}
if (storagePolicySatisfierThread == null) {
return;
}
try {
storagePolicySatisfierThread.join(3000);
} catch (InterruptedException ie) {
if (storagePolicySatisfierThread != null) {
try {
storagePolicySatisfierThread.join(3000);
} catch (InterruptedException ie) {
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupted Exception while waiting to join sps thread,"
+ " ignoring it", ie);
}
}
}
}
@ -351,32 +354,26 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
Thread.sleep(3000);
blockCount = 0L;
}
} catch (IOException e) {
LOG.error("Exception during StoragePolicySatisfier execution - "
+ "will continue next cycle", e);
} catch (Throwable t) {
handleException(t);
}
}
}
private void handleException(Throwable t) {
// double check to avoid entering into synchronized block.
if (isRunning) {
synchronized (this) {
if (isRunning) {
if (t instanceof InterruptedException) {
synchronized (this) {
if (isRunning) {
isRunning = false;
LOG.info("Stopping StoragePolicySatisfier.");
if (t instanceof InterruptedException) {
LOG.info("Stopping StoragePolicySatisfier.", t);
} else {
LOG.error("StoragePolicySatisfier thread received "
+ "runtime exception.", t);
}
// Stopping monitor thread and clearing queues as well
this.clearQueues();
this.storageMovementsMonitor.stopGracefully();
} else {
LOG.error(
"StoragePolicySatisfier thread received runtime exception, "
+ "ignoring", t);
}
}
}
}
return;
}
private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
@ -434,7 +431,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
List<StorageType> existing = new LinkedList<StorageType>(
Arrays.asList(blockInfo.getStorageTypes()));
if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
if (!removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos,
blockInfo, expectedStorageTypes, existing, blockInfo.getLocations(),
@ -499,7 +496,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
DatanodeInfo[] storages, DatanodeStorageReport[] liveDns,
ErasureCodingPolicy ecPolicy) {
boolean foundMatchingTargetNodesForBlock = true;
if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
if (!removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
List<StorageTypeNodePair> sourceWithStorageMap =
new ArrayList<StorageTypeNodePair>();
@ -880,21 +877,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
storageMovementNeeded.clearAll();
}
/**
* Set file inode in queue for which storage movement needed for its blocks.
*
* @param inodeId
* - file inode/blockcollection id.
*/
public void satisfyStoragePolicy(Long inodeId) {
//For file startId and trackId is same
storageMovementNeeded.add(new ItemInfo(inodeId, inodeId));
if (LOG.isDebugEnabled()) {
LOG.debug("Added track info for inode {} to block "
+ "storageMovementNeeded queue", inodeId);
}
}
/**
* Clear queues for given track id.
*/
@ -958,6 +940,10 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
@Override
public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) {
storageMovementNeeded.add(trackInfo, scanCompleted);
if (LOG.isDebugEnabled()) {
LOG.debug("Added track info for inode {} to block "
+ "storageMovementNeeded queue", trackInfo.getFileId());
}
}
@Override
@ -993,4 +979,63 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
//TODO Add join here on SPS rpc server also
storagePolicySatisfierThread.join();
}
/**
* Remove the overlap between the expected types and the existing types.
*
* @param expected
* - Expected storage types list.
* @param existing
* - Existing storage types list.
* @param ignoreNonMovable
* ignore non-movable storage types by removing them from both
* expected and existing storage type list to prevent non-movable
* storage from being moved.
* @returns if the existing types or the expected types is empty after
* removing the overlap.
*/
private static boolean removeOverlapBetweenStorageTypes(
List<StorageType> expected,
List<StorageType> existing, boolean ignoreNonMovable) {
for (Iterator<StorageType> i = existing.iterator(); i.hasNext();) {
final StorageType t = i.next();
if (expected.remove(t)) {
i.remove();
}
}
if (ignoreNonMovable) {
removeNonMovable(existing);
removeNonMovable(expected);
}
return expected.isEmpty() || existing.isEmpty();
}
private static void removeNonMovable(List<StorageType> types) {
for (Iterator<StorageType> i = types.iterator(); i.hasNext();) {
final StorageType t = i.next();
if (!t.isMovable()) {
i.remove();
}
}
}
/**
* Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from
* configuration.
*
* @param conf Configuration
* @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION
*/
private static int getSPSWorkMultiplier(Configuration conf) {
int spsWorkMultiplier = conf
.getInt(
DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION,
DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
Preconditions.checkArgument(
(spsWorkMultiplier > 0),
DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION +
" = '" + spsWorkMultiplier + "' is invalid. " +
"It should be a positive, non-zero integer value.");
return spsWorkMultiplier;
}
}

View File

@ -0,0 +1,399 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.sps;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.sps.ExternalStoragePolicySatisfier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This manages satisfy storage policy invoked path ids and expose methods to
* process these path ids. It maintains sps mode(INTERNAL/EXTERNAL/NONE)
* configured by the administrator.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
* it will start internal sps daemon service inside namenode and process sps
* invoked path ids to satisfy the storage policy.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
* it won't do anything, just maintains the sps invoked path ids. Administrator
* requires to start external sps service explicitly, to fetch the sps invoked
* path ids from namenode, then do necessary computations and block movement in
* order to satisfy the storage policy. Please refer
* {@link ExternalStoragePolicySatisfier} class to understand more about the
* external sps service functionality.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then it
* will disable the sps feature completely by clearing all queued up sps path's
* hint.
*
* This class is instantiated by the BlockManager.
*/
public class StoragePolicySatisfyManager {
private static final Logger LOG = LoggerFactory
.getLogger(StoragePolicySatisfyManager.class);
private final StoragePolicySatisfier spsService;
private final boolean storagePolicyEnabled;
private volatile StoragePolicySatisfierMode mode;
private final Queue<Long> pathsToBeTraveresed;
private final int outstandingPathsLimit;
private final Namesystem namesystem;
private final BlockManager blkMgr;
public StoragePolicySatisfyManager(Configuration conf, Namesystem namesystem,
BlockManager blkMgr) {
// StoragePolicySatisfier(SPS) configs
storagePolicyEnabled = conf.getBoolean(
DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT);
String modeVal = conf.get(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
outstandingPathsLimit = conf.getInt(
DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY,
DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT);
mode = StoragePolicySatisfierMode.fromString(modeVal);
pathsToBeTraveresed = new LinkedList<Long>();
// instantiate SPS service by just keeps config reference and not starting
// any supporting threads.
spsService = new StoragePolicySatisfier(conf);
this.namesystem = namesystem;
this.blkMgr = blkMgr;
}
/**
* This function will do following logic based on the configured sps mode:
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
* starts internal daemon service inside namenode.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
* it won't do anything. Administrator requires to start external sps service
* explicitly.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then the
* service is disabled and won't do any action.
*/
public void start() {
if (!storagePolicyEnabled) {
LOG.info("Disabling StoragePolicySatisfier service as {} set to {}.",
DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled);
return;
}
switch (mode) {
case INTERNAL:
if (spsService.isRunning()) {
LOG.info("Storage policy satisfier is already running"
+ " as internal daemon service inside namenode.");
return;
}
// starts internal daemon service inside namenode
spsService.init(
new IntraSPSNameNodeContext(namesystem, blkMgr, spsService),
new IntraSPSNameNodeFileIdCollector(namesystem.getFSDirectory(),
spsService),
new IntraSPSNameNodeBlockMoveTaskHandler(blkMgr, namesystem), null);
spsService.start(false, mode);
break;
case EXTERNAL:
LOG.info("Storage policy satisfier is configured as external, "
+ "please start external sps service explicitly to satisfy policy");
break;
case NONE:
LOG.info("Storage policy satisfier is disabled");
break;
default:
LOG.info("Given mode: {} is invalid", mode);
break;
}
}
/**
* This function will do following logic based on the configured sps mode:
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
* stops internal daemon service inside namenode.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
* it won't do anything. Administrator requires to stop external sps service
* explicitly, if needed.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then the
* service is disabled and won't do any action.
*/
public void stop() {
if (!storagePolicyEnabled) {
if (LOG.isDebugEnabled()) {
LOG.debug("Storage policy is not enabled, ignoring");
}
return;
}
switch (mode) {
case INTERNAL:
removeAllPathIds();
if (!spsService.isRunning()) {
LOG.info("Internal storage policy satisfier daemon service"
+ " is not running");
return;
}
// stops internal daemon service running inside namenode
spsService.stop(false);
break;
case EXTERNAL:
removeAllPathIds();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Storage policy satisfier service is running outside namenode"
+ ", ignoring");
}
break;
case NONE:
if (LOG.isDebugEnabled()) {
LOG.debug("Storage policy satisfier is not enabled, ignoring");
}
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("Invalid mode:{}, ignoring", mode);
}
break;
}
}
/**
* Sets new sps mode. If the new mode is internal, then it will start internal
* sps service inside namenode. If the new mode is external, then stops
* internal sps service running(if any) inside namenode. If the new mode is
* none, then it will disable the sps feature completely by clearing all
* queued up sps path's hint.
*/
public void changeModeEvent(StoragePolicySatisfierMode newMode) {
if (!storagePolicyEnabled) {
LOG.info("Failed to change storage policy satisfier as {} set to {}.",
DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating SPS service status, current mode:{}, new mode:{}",
mode, newMode);
}
switch (newMode) {
case INTERNAL:
if (spsService.isRunning()) {
LOG.info("Storage policy satisfier is already running as {} mode.",
mode);
return;
}
spsService.init(
new IntraSPSNameNodeContext(this.namesystem, this.blkMgr, spsService),
new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(),
spsService),
new IntraSPSNameNodeBlockMoveTaskHandler(this.blkMgr,
this.namesystem),
null);
spsService.start(true, newMode);
break;
case EXTERNAL:
if (mode == newMode) {
LOG.info("Storage policy satisfier is already in mode:{},"
+ " so ignoring change mode event.", newMode);
return;
}
spsService.stopGracefully();
break;
case NONE:
if (mode == newMode) {
LOG.info("Storage policy satisfier is already disabled, mode:{}"
+ " so ignoring change mode event.", newMode);
return;
}
LOG.info("Disabling StoragePolicySatisfier, mode:{}", newMode);
spsService.stop(true);
removeAllPathIds();
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("Given mode: {} is invalid", newMode);
}
break;
}
// update sps mode
mode = newMode;
}
/**
* This function will do following logic based on the configured sps mode:
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
* timed wait to stop internal storage policy satisfier daemon threads.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
* it won't do anything, just ignore it.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then the
* service is disabled. It won't do any action, just ignore it.
*/
public void stopGracefully() {
switch (mode) {
case INTERNAL:
spsService.stopGracefully();
break;
case EXTERNAL:
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring, StoragePolicySatisfier feature is running"
+ " outside namenode");
}
break;
case NONE:
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring, StoragePolicySatisfier feature is disabled");
}
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("Invalid mode:{}", mode);
}
break;
}
}
/**
* @return true if the internal storage policy satisfier daemon is running,
* false otherwise.
*/
public boolean isInternalSatisfierRunning() {
return spsService.isRunning();
}
/**
* @return internal SPS service instance.
*/
public SPSService getInternalSPSService() {
return this.spsService;
}
/**
* @return status Storage policy satisfy status of the path. It is supported
* only for the internal sps daemon service.
* @throws IOException
* if the Satisfier is not running inside namenode.
*/
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
if (mode != StoragePolicySatisfierMode.INTERNAL) {
LOG.debug("Satisfier is not running inside namenode, so status "
+ "can't be returned.");
throw new IOException("Satisfier is not running inside namenode, "
+ "so status can't be returned.");
}
return spsService.checkStoragePolicySatisfyPathStatus(path);
}
/**
* @return the next SPS path id, on which path users has invoked to satisfy
* storages.
*/
public Long getNextPathId() {
synchronized (pathsToBeTraveresed) {
return pathsToBeTraveresed.poll();
}
}
/**
* Verify that satisfier queue limit exceeds allowed outstanding limit.
*/
public void verifyOutstandingPathQLimit() throws IOException {
long size = pathsToBeTraveresed.size();
// Checking that the SPS call Q exceeds the allowed limit.
if (outstandingPathsLimit - size <= 0) {
LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}",
outstandingPathsLimit, size);
throw new IOException("Outstanding satisfier queue limit: "
+ outstandingPathsLimit + " exceeded, try later!");
}
}
/**
* Removes the SPS path id from the list of sps paths.
*/
public void removePathId(long trackId) {
synchronized (pathsToBeTraveresed) {
pathsToBeTraveresed.remove(trackId);
}
}
/**
* Clean up all sps path ids.
*/
public void removeAllPathIds() {
synchronized (pathsToBeTraveresed) {
pathsToBeTraveresed.clear();
}
}
/**
* Adds the sps path to SPSPathIds list.
*/
public void addPathId(long id) {
synchronized (pathsToBeTraveresed) {
pathsToBeTraveresed.add(id);
}
}
/**
* @return true if sps is configured as an internal service or external
* service, false otherwise.
*/
public boolean isEnabled() {
return mode == StoragePolicySatisfierMode.INTERNAL
|| mode == StoragePolicySatisfierMode.EXTERNAL;
}
/**
* @return sps service mode.
*/
public StoragePolicySatisfierMode getMode() {
return mode;
}
}

View File

@ -73,7 +73,7 @@ public final class ExternalStoragePolicySatisfier {
boolean spsRunning;
spsRunning = nnc.getDistributedFileSystem().getClient()
.isStoragePolicySatisfierRunning();
.isInternalSatisfierRunning();
if (spsRunning) {
throw new RuntimeException(
"Startup failed due to StoragePolicySatisfier"

View File

@ -374,7 +374,7 @@ public class StoragePolicyAdmin extends Configured implements Tool {
}
final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
try {
if(dfs.getClient().isStoragePolicySatisfierRunning()){
if(dfs.getClient().isInternalSatisfierRunning()){
System.out.println("yes");
}else{
System.out.println("no");

View File

@ -252,8 +252,8 @@ public class TestNameNodeReconfigure {
// Since DFS_STORAGE_POLICY_ENABLED_KEY is disabled, SPS can't be enabled.
assertEquals("SPS shouldn't start as "
+ DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY + " is disabled", false,
nameNode.getNamesystem().getBlockManager()
.isStoragePolicySatisfierRunning());
nameNode.getNamesystem().getBlockManager().getSPSManager()
.isInternalSatisfierRunning());
verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL, false);
@ -280,8 +280,8 @@ public class TestNameNodeReconfigure {
fail("ReconfigurationException expected");
} catch (ReconfigurationException e) {
GenericTestUtils.assertExceptionContains(
"For enabling or disabling storage policy satisfier, we must "
+ "pass either none/internal/external string value only",
"For enabling or disabling storage policy satisfier, must "
+ "pass either internal/external/none string value only",
e.getCause());
}
@ -301,8 +301,8 @@ public class TestNameNodeReconfigure {
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.EXTERNAL.toString());
assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
false, nameNode.getNamesystem().getBlockManager()
.isStoragePolicySatisfierRunning());
false, nameNode.getNamesystem().getBlockManager().getSPSManager()
.isInternalSatisfierRunning());
assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
StoragePolicySatisfierMode.EXTERNAL.toString(),
nameNode.getConf().get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
@ -342,8 +342,8 @@ public class TestNameNodeReconfigure {
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
true, nameNode.getNamesystem().getBlockManager()
.isStoragePolicySatisfierRunning());
true, nameNode.getNamesystem().getBlockManager().getSPSManager()
.isInternalSatisfierRunning());
assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
StoragePolicySatisfierMode.INTERNAL.toString(),
nameNode.getConf().get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
@ -353,7 +353,8 @@ public class TestNameNodeReconfigure {
void verifySPSEnabled(final NameNode nameNode, String property,
StoragePolicySatisfierMode expected, boolean isSatisfierRunning) {
assertEquals(property + " has wrong value", isSatisfierRunning, nameNode
.getNamesystem().getBlockManager().isStoragePolicySatisfierRunning());
.getNamesystem().getBlockManager().getSPSManager()
.isInternalSatisfierRunning());
String actual = nameNode.getConf().get(property,
DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
assertEquals(property + " has wrong value", expected,

View File

@ -389,7 +389,8 @@ public class TestPersistentStoragePolicySatisfier {
fs.setStoragePolicy(testFile, ONE_SSD);
fs.satisfyStoragePolicy(testFile);
cluster.getNamesystem().getBlockManager().disableSPS();
cluster.getNamesystem().getBlockManager().getSPSManager()
.changeModeEvent(StoragePolicySatisfierMode.NONE);
// Make sure satisfy xattr has been removed.
DFSTestUtil.waitForXattrRemoved(testFileName,

View File

@ -103,7 +103,7 @@ public class TestStoragePolicySatisfierWithHA {
dfs = cluster.getFileSystem(1);
try {
dfs.getClient().isStoragePolicySatisfierRunning();
dfs.getClient().isInternalSatisfierRunning();
Assert.fail("Call this function to Standby NN should "
+ "raise an exception.");
} catch (RemoteException e) {
@ -115,14 +115,14 @@ public class TestStoragePolicySatisfierWithHA {
cluster.transitionToActive(0);
dfs = cluster.getFileSystem(0);
running = dfs.getClient().isStoragePolicySatisfierRunning();
running = dfs.getClient().isInternalSatisfierRunning();
Assert.assertTrue("StoragePolicySatisfier should be active "
+ "when NN transits from Standby to Active mode.", running);
// NN transits from Active to Standby
cluster.transitionToStandby(0);
try {
dfs.getClient().isStoragePolicySatisfierRunning();
dfs.getClient().isInternalSatisfierRunning();
Assert.fail("NN in Standby again, call this function should "
+ "raise an exception.");
} catch (RemoteException e) {

View File

@ -445,15 +445,10 @@ public class TestStoragePolicySatisfier {
try {
hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
Assert.fail(String.format("Should failed to satisfy storage policy "
+ "for %s ,since it has been " + "added to satisfy movement queue.",
FILE));
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
String.format("Cannot request to call satisfy storage policy "
+ "on path %s, as this file/dir was already called for "
+ "satisfying storage policy.", FILE),
e);
} catch (Exception e) {
Assert.fail(String.format("Allow to invoke mutlipe times "
+ "#satisfyStoragePolicy() api for a path %s , internally just "
+ "skipping addtion to satisfy movement queue.", FILE));
}
} finally {
shutdownCluster();
@ -563,7 +558,7 @@ public class TestStoragePolicySatisfier {
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
running = hdfsCluster.getFileSystem()
.getClient().isStoragePolicySatisfierRunning();
.getClient().isInternalSatisfierRunning();
Assert.assertFalse("SPS should stopped as configured.", running);
// Simulate the case by creating MOVER_ID file
@ -576,7 +571,7 @@ public class TestStoragePolicySatisfier {
StoragePolicySatisfierMode.INTERNAL.toString());
running = hdfsCluster.getFileSystem()
.getClient().isStoragePolicySatisfierRunning();
.getClient().isInternalSatisfierRunning();
Assert.assertFalse("SPS should not be able to run as file "
+ HdfsServerConstants.MOVER_ID_PATH + " is being hold.", running);
@ -591,7 +586,7 @@ public class TestStoragePolicySatisfier {
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
running = hdfsCluster.getFileSystem()
.getClient().isStoragePolicySatisfierRunning();
.getClient().isInternalSatisfierRunning();
Assert.assertTrue("SPS should be running as "
+ "Mover already exited", running);
@ -623,7 +618,7 @@ public class TestStoragePolicySatisfier {
HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0);
restartNamenode();
boolean running = hdfsCluster.getFileSystem()
.getClient().isStoragePolicySatisfierRunning();
.getClient().isInternalSatisfierRunning();
Assert.assertTrue("SPS should be running as "
+ "no Mover really running", running);
} finally {
@ -1293,8 +1288,8 @@ public class TestStoragePolicySatisfier {
sps.getStorageMovementQueue().activate();
INode rootINode = fsDir.getINode("/root");
hdfsCluster.getNamesystem().getBlockManager()
.addSPSPathId(rootINode.getId());
hdfsCluster.getNamesystem().getBlockManager().getSPSManager()
.addPathId(rootINode.getId());
//Wait for thread to reach U.
Thread.sleep(1000);
@ -1360,8 +1355,8 @@ public class TestStoragePolicySatisfier {
sps.getStorageMovementQueue().activate();
INode rootINode = fsDir.getINode("/root");
hdfsCluster.getNamesystem().getBlockManager()
.addSPSPathId(rootINode.getId());
hdfsCluster.getNamesystem().getBlockManager().getSPSManager()
.addPathId(rootINode.getId());
// Wait for thread to reach U.
Thread.sleep(1000);
@ -1704,7 +1699,8 @@ public class TestStoragePolicySatisfier {
private void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
int timeout) throws TimeoutException, InterruptedException {
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager
.getSPSManager().getInternalSPSService();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
@ -1723,7 +1719,8 @@ public class TestStoragePolicySatisfier {
long expectedMovementFinishedBlocksCount, int timeout)
throws TimeoutException, InterruptedException {
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager
.getSPSManager().getInternalSPSService();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {

View File

@ -495,7 +495,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
long expectedBlkMovAttemptedCount, int timeout)
throws TimeoutException, InterruptedException {
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager
.getSPSManager().getInternalSPSService();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
@ -566,7 +567,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
long expectedMoveFinishedBlks, int timeout)
throws TimeoutException, InterruptedException {
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager
.getSPSManager().getInternalSPSService();
Assert.assertNotNull("Failed to get SPS object reference!", sps);
GenericTestUtils.waitFor(new Supplier<Boolean>() {

View File

@ -133,7 +133,7 @@ public class TestExternalStoragePolicySatisfier
BlockManager blkMgr = cluster.getNameNode().getNamesystem()
.getBlockManager();
SPSService spsService = blkMgr.getSPSService();
SPSService spsService = blkMgr.getSPSManager().getInternalSPSService();
spsService.stopGracefully();
ExternalSPSContext context = new ExternalSPSContext(spsService,
@ -143,12 +143,12 @@ public class TestExternalStoragePolicySatisfier
new ExternalBlockMovementListener();
ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(conf, nnc,
blkMgr.getSPSService());
blkMgr.getSPSManager().getInternalSPSService());
externalHandler.init();
spsService.init(context,
new ExternalSPSFileIDCollector(context, blkMgr.getSPSService()),
externalHandler,
blkMoveListener);
new ExternalSPSFileIDCollector(context,
blkMgr.getSPSManager().getInternalSPSService()),
externalHandler, blkMoveListener);
spsService.start(true, StoragePolicySatisfierMode.EXTERNAL);
return cluster;
}
@ -156,14 +156,14 @@ public class TestExternalStoragePolicySatisfier
public void restartNamenode() throws IOException{
BlockManager blkMgr = getCluster().getNameNode().getNamesystem()
.getBlockManager();
SPSService spsService = blkMgr.getSPSService();
SPSService spsService = blkMgr.getSPSManager().getInternalSPSService();
spsService.stopGracefully();
getCluster().restartNameNodes();
getCluster().waitActive();
blkMgr = getCluster().getNameNode().getNamesystem()
.getBlockManager();
spsService = blkMgr.getSPSService();
spsService = blkMgr.getSPSManager().getInternalSPSService();
spsService.stopGracefully();
ExternalSPSContext context = new ExternalSPSContext(spsService,
@ -172,12 +172,12 @@ public class TestExternalStoragePolicySatisfier
new ExternalBlockMovementListener();
ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(getConf(), nnc,
blkMgr.getSPSService());
blkMgr.getSPSManager().getInternalSPSService());
externalHandler.init();
spsService.init(context,
new ExternalSPSFileIDCollector(context, blkMgr.getSPSService()),
externalHandler,
blkMoveListener);
new ExternalSPSFileIDCollector(context,
blkMgr.getSPSManager().getInternalSPSService()),
externalHandler, blkMoveListener);
spsService.start(true, StoragePolicySatisfierMode.EXTERNAL);
}
@ -323,7 +323,7 @@ public class TestExternalStoragePolicySatisfier
DistributedFileSystem fs = getFS();
BlockManager blkMgr = getCluster().getNameNode().getNamesystem()
.getBlockManager();
SPSService spsService = blkMgr.getSPSService();
SPSService spsService = blkMgr.getSPSManager().getInternalSPSService();
spsService.stopGracefully(); // stops SPS
// Creates 4 more files. Send all of them for satisfying the storage