HDFS-13110: [SPS]: Reduce the number of APIs in NamenodeProtocol used by external satisfier. Contributed by Rakesh R.

This commit is contained in:
Rakesh Radhakrishnan 2018-02-16 17:01:38 +05:30 committed by Uma Maheswara Rao Gangumalla
parent 4402f3f855
commit 8467ec24fb
25 changed files with 515 additions and 522 deletions

View File

@ -35,16 +35,12 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksReq
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto;
@ -267,32 +263,21 @@ public class NamenodeProtocolServerSideTranslatorPB implements
} }
@Override @Override
public GetNextSPSPathIdResponseProto getNextSPSPathId( public GetNextSPSPathResponseProto getNextSPSPath(
RpcController controller, GetNextSPSPathIdRequestProto request) RpcController controller, GetNextSPSPathRequestProto request)
throws ServiceException { throws ServiceException {
try { try {
Long nextSPSPathId = impl.getNextSPSPathId(); String nextSPSPath = impl.getNextSPSPath();
if (nextSPSPathId == null) { if (nextSPSPath == null) {
return GetNextSPSPathIdResponseProto.newBuilder().build(); return GetNextSPSPathResponseProto.newBuilder().build();
} }
return GetNextSPSPathIdResponseProto.newBuilder().setFileId(nextSPSPathId) return GetNextSPSPathResponseProto.newBuilder().setSpsPath(nextSPSPath)
.build(); .build();
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
} }
@Override
public GetFilePathResponseProto getFilePath(RpcController controller,
GetFilePathRequestProto request) throws ServiceException {
try {
return GetFilePathResponseProto.newBuilder()
.setSrcPath(impl.getFilePath(request.getFileId())).build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override @Override
public CheckDNSpaceResponseProto checkDNSpaceForScheduling( public CheckDNSpaceResponseProto checkDNSpaceForScheduling(
RpcController controller, CheckDNSpaceRequestProto request) RpcController controller, CheckDNSpaceRequestProto request)
@ -309,19 +294,4 @@ public class NamenodeProtocolServerSideTranslatorPB implements
throw new ServiceException(e); throw new ServiceException(e);
} }
} }
@Override
public HasLowRedundancyBlocksResponseProto hasLowRedundancyBlocks(
RpcController controller, HasLowRedundancyBlocksRequestProto request)
throws ServiceException {
try {
return HasLowRedundancyBlocksResponseProto.newBuilder()
.setHasLowRedundancyBlocks(
impl.hasLowRedundancyBlocks(request.getInodeId()))
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
} }

View File

@ -34,12 +34,10 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeys
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto;
@ -271,24 +269,13 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
} }
@Override @Override
public Long getNextSPSPathId() throws IOException { public String getNextSPSPath() throws IOException {
GetNextSPSPathIdRequestProto req = GetNextSPSPathRequestProto req =
GetNextSPSPathIdRequestProto.newBuilder().build(); GetNextSPSPathRequestProto.newBuilder().build();
try { try {
GetNextSPSPathIdResponseProto nextSPSPathId = GetNextSPSPathResponseProto nextSPSPath =
rpcProxy.getNextSPSPathId(NULL_CONTROLLER, req); rpcProxy.getNextSPSPath(NULL_CONTROLLER, req);
return nextSPSPathId.hasFileId() ? nextSPSPathId.getFileId() : null; return nextSPSPath.hasSpsPath() ? nextSPSPath.getSpsPath() : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public String getFilePath(Long inodeId) throws IOException {
GetFilePathRequestProto req =
GetFilePathRequestProto.newBuilder().setFileId(inodeId).build();
try {
return rpcProxy.getFilePath(NULL_CONTROLLER, req).getSrcPath();
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
@ -308,17 +295,4 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
} }
@Override
public boolean hasLowRedundancyBlocks(long inodeId) throws IOException {
HasLowRedundancyBlocksRequestProto req = HasLowRedundancyBlocksRequestProto
.newBuilder().setInodeId(inodeId).build();
try {
return rpcProxy.hasLowRedundancyBlocks(NULL_CONTROLLER, req)
.getHasLowRedundancyBlocks();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
} }

View File

@ -310,7 +310,7 @@ public abstract class FSTreeTraverser {
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
protected abstract void submitCurrentBatch(long startId) protected abstract void submitCurrentBatch(Long startId)
throws IOException, InterruptedException; throws IOException, InterruptedException;
/** /**

View File

@ -2561,20 +2561,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
} }
@Override @Override
public String getFilePath(Long inodeId) throws IOException { public String getNextSPSPath() throws IOException {
checkNNStartup(); checkNNStartup();
String operationName = "getFilePath"; String operationName = "getNextSPSPath";
namesystem.checkSuperuserPrivilege(operationName);
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
return namesystem.getFilePath(inodeId);
}
@Override
public Long getNextSPSPathId() throws IOException {
checkNNStartup();
String operationName = "getNextSPSPathId";
namesystem.checkSuperuserPrivilege(operationName); namesystem.checkSuperuserPrivilege(operationName);
if (nn.isStandbyState()) { if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode."); throw new StandbyException("Not supported by Standby Namenode.");
@ -2588,7 +2577,11 @@ public class NameNodeRpcServer implements NamenodeProtocols {
+ " inside namenode, so external SPS is not allowed to fetch" + " inside namenode, so external SPS is not allowed to fetch"
+ " the path Ids"); + " the path Ids");
} }
return namesystem.getBlockManager().getSPSManager().getNextPathId(); Long pathId = namesystem.getBlockManager().getSPSManager().getNextPathId();
if (pathId == null) {
return null;
}
return namesystem.getFilePath(pathId);
} }
@Override @Override
@ -2603,15 +2596,4 @@ public class NameNodeRpcServer implements NamenodeProtocols {
return namesystem.getBlockManager().getDatanodeManager() return namesystem.getBlockManager().getDatanodeManager()
.verifyTargetDatanodeHasSpaceForScheduling(dn, type, estimatedSize); .verifyTargetDatanodeHasSpaceForScheduling(dn, type, estimatedSize);
} }
@Override
public boolean hasLowRedundancyBlocks(long inodeId) throws IOException {
checkNNStartup();
String operationName = "hasLowRedundancyBlocks";
namesystem.checkSuperuserPrivilege(operationName);
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
return namesystem.getBlockManager().hasLowRedundancyBlocks(inodeId);
}
} }

View File

@ -702,7 +702,7 @@ public class ReencryptionHandler implements Runnable {
* @throws InterruptedException * @throws InterruptedException
*/ */
@Override @Override
protected void submitCurrentBatch(final long zoneId) throws IOException, protected void submitCurrentBatch(final Long zoneId) throws IOException,
InterruptedException { InterruptedException {
if (currentBatch.isEmpty()) { if (currentBatch.isEmpty()) {
return; return;

View File

@ -45,8 +45,13 @@ import com.google.common.annotations.VisibleForTesting;
* entries from tracking. If there is no DN reports about movement attempt * entries from tracking. If there is no DN reports about movement attempt
* finished for a longer time period, then such items will retries automatically * finished for a longer time period, then such items will retries automatically
* after timeout. The default timeout would be 5 minutes. * after timeout. The default timeout would be 5 minutes.
*
* @param <T>
* is identifier of inode or full path name of inode. Internal sps will
* use the file inodeId for the block movement. External sps will use
* file string path representation for the block movement.
*/ */
public class BlockStorageMovementAttemptedItems{ public class BlockStorageMovementAttemptedItems<T> {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class); LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
@ -54,7 +59,7 @@ public class BlockStorageMovementAttemptedItems{
* A map holds the items which are already taken for blocks movements * A map holds the items which are already taken for blocks movements
* processing and sent to DNs. * processing and sent to DNs.
*/ */
private final List<AttemptedItemInfo> storageMovementAttemptedItems; private final List<AttemptedItemInfo<T>> storageMovementAttemptedItems;
private final List<Block> movementFinishedBlocks; private final List<Block> movementFinishedBlocks;
private volatile boolean monitorRunning = true; private volatile boolean monitorRunning = true;
private Daemon timerThread = null; private Daemon timerThread = null;
@ -70,11 +75,11 @@ public class BlockStorageMovementAttemptedItems{
// a request is timed out. // a request is timed out.
// //
private long minCheckTimeout = 1 * 60 * 1000; // minimum value private long minCheckTimeout = 1 * 60 * 1000; // minimum value
private BlockStorageMovementNeeded blockStorageMovementNeeded; private BlockStorageMovementNeeded<T> blockStorageMovementNeeded;
private final SPSService service; private final SPSService<T> service;
public BlockStorageMovementAttemptedItems(SPSService service, public BlockStorageMovementAttemptedItems(SPSService<T> service,
BlockStorageMovementNeeded unsatisfiedStorageMovementFiles, BlockStorageMovementNeeded<T> unsatisfiedStorageMovementFiles,
BlockMovementListener blockMovementListener) { BlockMovementListener blockMovementListener) {
this.service = service; this.service = service;
long recheckTimeout = this.service.getConf().getLong( long recheckTimeout = this.service.getConf().getLong(
@ -100,7 +105,7 @@ public class BlockStorageMovementAttemptedItems{
* @param itemInfo * @param itemInfo
* - tracking info * - tracking info
*/ */
public void add(AttemptedItemInfo itemInfo) { public void add(AttemptedItemInfo<T> itemInfo) {
synchronized (storageMovementAttemptedItems) { synchronized (storageMovementAttemptedItems) {
storageMovementAttemptedItems.add(itemInfo); storageMovementAttemptedItems.add(itemInfo);
} }
@ -190,25 +195,24 @@ public class BlockStorageMovementAttemptedItems{
@VisibleForTesting @VisibleForTesting
void blocksStorageMovementUnReportedItemsCheck() { void blocksStorageMovementUnReportedItemsCheck() {
synchronized (storageMovementAttemptedItems) { synchronized (storageMovementAttemptedItems) {
Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems Iterator<AttemptedItemInfo<T>> iter = storageMovementAttemptedItems
.iterator(); .iterator();
long now = monotonicNow(); long now = monotonicNow();
while (iter.hasNext()) { while (iter.hasNext()) {
AttemptedItemInfo itemInfo = iter.next(); AttemptedItemInfo<T> itemInfo = iter.next();
if (now > itemInfo.getLastAttemptedOrReportedTime() if (now > itemInfo.getLastAttemptedOrReportedTime()
+ selfRetryTimeout) { + selfRetryTimeout) {
Long blockCollectionID = itemInfo.getFileId(); T file = itemInfo.getFile();
synchronized (movementFinishedBlocks) { synchronized (movementFinishedBlocks) {
ItemInfo candidate = new ItemInfo(itemInfo.getStartId(), ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(),
blockCollectionID, itemInfo.getRetryCount() + 1); file, itemInfo.getRetryCount() + 1);
blockStorageMovementNeeded.add(candidate); blockStorageMovementNeeded.add(candidate);
iter.remove(); iter.remove();
LOG.info("TrackID: {} becomes timed out and moved to needed " LOG.info("TrackID: {} becomes timed out and moved to needed "
+ "retries queue for next iteration.", blockCollectionID); + "retries queue for next iteration.", file);
} }
} }
} }
} }
} }
@ -219,17 +223,17 @@ public class BlockStorageMovementAttemptedItems{
while (finishedBlksIter.hasNext()) { while (finishedBlksIter.hasNext()) {
Block blk = finishedBlksIter.next(); Block blk = finishedBlksIter.next();
synchronized (storageMovementAttemptedItems) { synchronized (storageMovementAttemptedItems) {
Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems Iterator<AttemptedItemInfo<T>> iterator =
.iterator(); storageMovementAttemptedItems.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
AttemptedItemInfo attemptedItemInfo = iterator.next(); AttemptedItemInfo<T> attemptedItemInfo = iterator.next();
attemptedItemInfo.getBlocks().remove(blk); attemptedItemInfo.getBlocks().remove(blk);
if (attemptedItemInfo.getBlocks().isEmpty()) { if (attemptedItemInfo.getBlocks().isEmpty()) {
// TODO: try add this at front of the Queue, so that this element // TODO: try add this at front of the Queue, so that this element
// gets the chance first and can be cleaned from queue quickly as // gets the chance first and can be cleaned from queue quickly as
// all movements already done. // all movements already done.
blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo blockStorageMovementNeeded.add(new ItemInfo<T>(attemptedItemInfo
.getStartId(), attemptedItemInfo.getFileId(), .getStartPath(), attemptedItemInfo.getFile(),
attemptedItemInfo.getRetryCount() + 1)); attemptedItemInfo.getRetryCount() + 1));
iterator.remove(); iterator.remove();
} }

View File

@ -43,31 +43,36 @@ import com.google.common.annotations.VisibleForTesting;
* schedule the block collection IDs for movement. It track the info of * schedule the block collection IDs for movement. It track the info of
* scheduled items and remove the SPS xAttr from the file/Directory once * scheduled items and remove the SPS xAttr from the file/Directory once
* movement is success. * movement is success.
*
* @param <T>
* is identifier of inode or full path name of inode. Internal sps will
* use the file inodeId for the block movement. External sps will use
* file string path representation for the block movement.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class BlockStorageMovementNeeded { public class BlockStorageMovementNeeded<T> {
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementNeeded.class); LoggerFactory.getLogger(BlockStorageMovementNeeded.class);
private final Queue<ItemInfo> storageMovementNeeded = private final Queue<ItemInfo<T>> storageMovementNeeded =
new LinkedList<ItemInfo>(); new LinkedList<ItemInfo<T>>();
/** /**
* Map of startId and number of child's. Number of child's indicate the * Map of startPath and number of child's. Number of child's indicate the
* number of files pending to satisfy the policy. * number of files pending to satisfy the policy.
*/ */
private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory = private final Map<T, DirPendingWorkInfo> pendingWorkForDirectory =
new HashMap<Long, DirPendingWorkInfo>(); new HashMap<>();
private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus = private final Map<T, StoragePolicySatisfyPathStatusInfo> spsStatus =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private final Context ctxt; private final Context<T> ctxt;
private Daemon pathIdCollector; private Daemon pathIdCollector;
private FileIdCollector fileIDCollector; private FileCollector<T> fileCollector;
private SPSPathIdProcessor pathIDProcessor; private SPSPathIdProcessor pathIDProcessor;
@ -75,10 +80,10 @@ public class BlockStorageMovementNeeded {
// NOT_AVAILABLE. // NOT_AVAILABLE.
private static long statusClearanceElapsedTimeMs = 300000; private static long statusClearanceElapsedTimeMs = 300000;
public BlockStorageMovementNeeded(Context context, public BlockStorageMovementNeeded(Context<T> context,
FileIdCollector fileIDCollector) { FileCollector<T> fileCollector) {
this.ctxt = context; this.ctxt = context;
this.fileIDCollector = fileIDCollector; this.fileCollector = fileCollector;
pathIDProcessor = new SPSPathIdProcessor(); pathIDProcessor = new SPSPathIdProcessor();
} }
@ -89,8 +94,8 @@ public class BlockStorageMovementNeeded {
* @param trackInfo * @param trackInfo
* - track info for satisfy the policy * - track info for satisfy the policy
*/ */
public synchronized void add(ItemInfo trackInfo) { public synchronized void add(ItemInfo<T> trackInfo) {
spsStatus.put(trackInfo.getStartId(), spsStatus.put(trackInfo.getFile(),
new StoragePolicySatisfyPathStatusInfo( new StoragePolicySatisfyPathStatusInfo(
StoragePolicySatisfyPathStatus.IN_PROGRESS)); StoragePolicySatisfyPathStatus.IN_PROGRESS));
storageMovementNeeded.add(trackInfo); storageMovementNeeded.add(trackInfo);
@ -100,8 +105,8 @@ public class BlockStorageMovementNeeded {
* Add the itemInfo list to tracking list for which storage movement expected * Add the itemInfo list to tracking list for which storage movement expected
* if necessary. * if necessary.
* *
* @param startId * @param startPath
* - start id * - start path
* @param itemInfoList * @param itemInfoList
* - List of child in the directory * - List of child in the directory
* @param scanCompleted * @param scanCompleted
@ -109,10 +114,10 @@ public class BlockStorageMovementNeeded {
* scan. * scan.
*/ */
@VisibleForTesting @VisibleForTesting
public synchronized void addAll(long startId, List<ItemInfo> itemInfoList, public synchronized void addAll(T startPath, List<ItemInfo<T>> itemInfoList,
boolean scanCompleted) { boolean scanCompleted) {
storageMovementNeeded.addAll(itemInfoList); storageMovementNeeded.addAll(itemInfoList);
updatePendingDirScanStats(startId, itemInfoList.size(), scanCompleted); updatePendingDirScanStats(startPath, itemInfoList.size(), scanCompleted);
} }
/** /**
@ -126,22 +131,22 @@ public class BlockStorageMovementNeeded {
* elements to scan. * elements to scan.
*/ */
@VisibleForTesting @VisibleForTesting
public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) { public synchronized void add(ItemInfo<T> itemInfo, boolean scanCompleted) {
storageMovementNeeded.add(itemInfo); storageMovementNeeded.add(itemInfo);
// This represents sps start id is file, so no need to update pending dir // This represents sps start id is file, so no need to update pending dir
// stats. // stats.
if (itemInfo.getStartId() == itemInfo.getFileId()) { if (itemInfo.getStartPath() == itemInfo.getFile()) {
return; return;
} }
updatePendingDirScanStats(itemInfo.getStartId(), 1, scanCompleted); updatePendingDirScanStats(itemInfo.getFile(), 1, scanCompleted);
} }
private void updatePendingDirScanStats(long startId, int numScannedFiles, private void updatePendingDirScanStats(T startPath, int numScannedFiles,
boolean scanCompleted) { boolean scanCompleted) {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId); DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startPath);
if (pendingWork == null) { if (pendingWork == null) {
pendingWork = new DirPendingWorkInfo(); pendingWork = new DirPendingWorkInfo();
pendingWorkForDirectory.put(startId, pendingWork); pendingWorkForDirectory.put(startPath, pendingWork);
} }
pendingWork.addPendingWorkCount(numScannedFiles); pendingWork.addPendingWorkCount(numScannedFiles);
if (scanCompleted) { if (scanCompleted) {
@ -150,12 +155,12 @@ public class BlockStorageMovementNeeded {
} }
/** /**
* Gets the block collection id for which storage movements check necessary * Gets the satisfier files for which block storage movements check necessary
* and make the movement if required. * and make the movement if required.
* *
* @return block collection ID * @return satisfier files
*/ */
public synchronized ItemInfo get() { public synchronized ItemInfo<T> get() {
return storageMovementNeeded.poll(); return storageMovementNeeded.poll();
} }
@ -176,12 +181,12 @@ public class BlockStorageMovementNeeded {
* Decrease the pending child count for directory once one file blocks moved * Decrease the pending child count for directory once one file blocks moved
* successfully. Remove the SPS xAttr if pending child count is zero. * successfully. Remove the SPS xAttr if pending child count is zero.
*/ */
public synchronized void removeItemTrackInfo(ItemInfo trackInfo, public synchronized void removeItemTrackInfo(ItemInfo<T> trackInfo,
boolean isSuccess) throws IOException { boolean isSuccess) throws IOException {
if (trackInfo.isDir()) { if (trackInfo.isDir()) {
// If track is part of some start inode then reduce the pending // If track is part of some start inode then reduce the pending
// directory work count. // directory work count.
long startId = trackInfo.getStartId(); T startId = trackInfo.getStartPath();
if (!ctxt.isFileExist(startId)) { if (!ctxt.isFileExist(startId)) {
// directory deleted just remove it. // directory deleted just remove it.
this.pendingWorkForDirectory.remove(startId); this.pendingWorkForDirectory.remove(startId);
@ -202,17 +207,17 @@ public class BlockStorageMovementNeeded {
} else { } else {
// Remove xAttr if trackID doesn't exist in // Remove xAttr if trackID doesn't exist in
// storageMovementAttemptedItems or file policy satisfied. // storageMovementAttemptedItems or file policy satisfied.
ctxt.removeSPSHint(trackInfo.getFileId()); ctxt.removeSPSHint(trackInfo.getFile());
updateStatus(trackInfo.getStartId(), isSuccess); updateStatus(trackInfo.getFile(), isSuccess);
} }
} }
public synchronized void clearQueue(long trackId) { public synchronized void clearQueue(T trackId) {
ctxt.removeSPSPathId(trackId); ctxt.removeSPSPathId(trackId);
Iterator<ItemInfo> iterator = storageMovementNeeded.iterator(); Iterator<ItemInfo<T>> iterator = storageMovementNeeded.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
ItemInfo next = iterator.next(); ItemInfo<T> next = iterator.next();
if (next.getStartId() == trackId) { if (next.getFile() == trackId) {
iterator.remove(); iterator.remove();
} }
} }
@ -222,7 +227,7 @@ public class BlockStorageMovementNeeded {
/** /**
* Mark inode status as SUCCESS in map. * Mark inode status as SUCCESS in map.
*/ */
private void updateStatus(long startId, boolean isSuccess){ private void updateStatus(T startId, boolean isSuccess){
StoragePolicySatisfyPathStatusInfo spsStatusInfo = StoragePolicySatisfyPathStatusInfo spsStatusInfo =
spsStatus.get(startId); spsStatus.get(startId);
if (spsStatusInfo == null) { if (spsStatusInfo == null) {
@ -244,8 +249,8 @@ public class BlockStorageMovementNeeded {
*/ */
public synchronized void clearQueuesWithNotification() { public synchronized void clearQueuesWithNotification() {
// Remove xAttr from directories // Remove xAttr from directories
Long trackId; T trackId;
while ((trackId = ctxt.getNextSPSPathId()) != null) { while ((trackId = ctxt.getNextSPSPath()) != null) {
try { try {
// Remove xAttr for file // Remove xAttr for file
ctxt.removeSPSHint(trackId); ctxt.removeSPSHint(trackId);
@ -256,17 +261,17 @@ public class BlockStorageMovementNeeded {
// File's directly added to storageMovementNeeded, So try to remove // File's directly added to storageMovementNeeded, So try to remove
// xAttr for file // xAttr for file
ItemInfo itemInfo; ItemInfo<T> itemInfo;
while ((itemInfo = get()) != null) { while ((itemInfo = get()) != null) {
try { try {
// Remove xAttr for file // Remove xAttr for file
if (!itemInfo.isDir()) { if (!itemInfo.isDir()) {
ctxt.removeSPSHint(itemInfo.getFileId()); ctxt.removeSPSHint(itemInfo.getFile());
} }
} catch (IOException ie) { } catch (IOException ie) {
LOG.warn( LOG.warn(
"Failed to remove SPS xattr for track id " "Failed to remove SPS xattr for track id "
+ itemInfo.getFileId(), ie); + itemInfo.getFile(), ie);
} }
} }
this.clearAll(); this.clearAll();
@ -282,29 +287,29 @@ public class BlockStorageMovementNeeded {
public void run() { public void run() {
LOG.info("Starting SPSPathIdProcessor!."); LOG.info("Starting SPSPathIdProcessor!.");
long lastStatusCleanTime = 0; long lastStatusCleanTime = 0;
Long startINodeId = null; T startINode = null;
while (ctxt.isRunning()) { while (ctxt.isRunning()) {
try { try {
if (!ctxt.isInSafeMode()) { if (!ctxt.isInSafeMode()) {
if (startINodeId == null) { if (startINode == null) {
startINodeId = ctxt.getNextSPSPathId(); startINode = ctxt.getNextSPSPath();
} // else same id will be retried } // else same id will be retried
if (startINodeId == null) { if (startINode == null) {
// Waiting for SPS path // Waiting for SPS path
Thread.sleep(3000); Thread.sleep(3000);
} else { } else {
spsStatus.put(startINodeId, spsStatus.put(startINode,
new StoragePolicySatisfyPathStatusInfo( new StoragePolicySatisfyPathStatusInfo(
StoragePolicySatisfyPathStatus.IN_PROGRESS)); StoragePolicySatisfyPathStatus.IN_PROGRESS));
fileIDCollector.scanAndCollectFileIds(startINodeId); fileCollector.scanAndCollectFiles(startINode);
// check if directory was empty and no child added to queue // check if directory was empty and no child added to queue
DirPendingWorkInfo dirPendingWorkInfo = DirPendingWorkInfo dirPendingWorkInfo =
pendingWorkForDirectory.get(startINodeId); pendingWorkForDirectory.get(startINode);
if (dirPendingWorkInfo != null if (dirPendingWorkInfo != null
&& dirPendingWorkInfo.isDirWorkDone()) { && dirPendingWorkInfo.isDirWorkDone()) {
ctxt.removeSPSHint(startINodeId); ctxt.removeSPSHint(startINode);
pendingWorkForDirectory.remove(startINodeId); pendingWorkForDirectory.remove(startINode);
updateStatus(startINodeId, true); updateStatus(startINode, true);
} }
} }
//Clear the SPS status if status is in SUCCESS more than 5 min. //Clear the SPS status if status is in SUCCESS more than 5 min.
@ -313,7 +318,7 @@ public class BlockStorageMovementNeeded {
lastStatusCleanTime = Time.monotonicNow(); lastStatusCleanTime = Time.monotonicNow();
cleanSPSStatus(); cleanSPSStatus();
} }
startINodeId = null; // Current inode id successfully scanned. startINode = null; // Current inode successfully scanned.
} }
} catch (Throwable t) { } catch (Throwable t) {
String reClass = t.getClass().getName(); String reClass = t.getClass().getName();
@ -334,9 +339,9 @@ public class BlockStorageMovementNeeded {
} }
private synchronized void cleanSPSStatus() { private synchronized void cleanSPSStatus() {
for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it = for (Iterator<Entry<T, StoragePolicySatisfyPathStatusInfo>> it = spsStatus
spsStatus.entrySet().iterator(); it.hasNext();) { .entrySet().iterator(); it.hasNext();) {
Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next(); Entry<T, StoragePolicySatisfyPathStatusInfo> entry = it.next();
if (entry.getValue().canRemove()) { if (entry.getValue().canRemove()) {
it.remove(); it.remove();
} }
@ -472,8 +477,8 @@ public class BlockStorageMovementNeeded {
return statusClearanceElapsedTimeMs; return statusClearanceElapsedTimeMs;
} }
public void markScanCompletedForDir(Long inodeId) { public void markScanCompletedForDir(T inode) {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inodeId); DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inode);
if (pendingWork != null) { if (pendingWork != null) {
pendingWork.markScanCompleted(); pendingWork.markScanCompleted();
} }

View File

@ -33,11 +33,16 @@ import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
/** /**
* An interface for the communication between NameNode and SPS module. * An interface for the communication between SPS and Namenode module.
*
* @param <T>
* is identifier of inode or full path name of inode. Internal sps will
* use the file inodeId for the block movement. External sps will use
* file string path representation for the block movement.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface Context { public interface Context<T> {
/** /**
* Returns true if the SPS is running, false otherwise. * Returns true if the SPS is running, false otherwise.
@ -72,13 +77,13 @@ public interface Context {
NetworkTopology getNetworkTopology(); NetworkTopology getNetworkTopology();
/** /**
* Returns true if the give Inode exists in the Namespace. * Returns true if the give file exists in the Namespace.
* *
* @param inodeId * @param filePath
* - Inode ID * - file info
* @return true if Inode exists, false otherwise. * @return true if the given file exists, false otherwise.
*/ */
boolean isFileExist(long inodeId); boolean isFileExist(T filePath);
/** /**
* Gets the storage policy details for the given policy ID. * Gets the storage policy details for the given policy ID.
@ -97,11 +102,11 @@ public interface Context {
/** /**
* Remove the hint which was added to track SPS call. * Remove the hint which was added to track SPS call.
* *
* @param inodeId * @param spsPath
* - Inode ID * - user invoked satisfier path
* @throws IOException * @throws IOException
*/ */
void removeSPSHint(long inodeId) throws IOException; void removeSPSHint(T spsPath) throws IOException;
/** /**
* Gets the number of live datanodes in the cluster. * Gets the number of live datanodes in the cluster.
@ -113,11 +118,11 @@ public interface Context {
/** /**
* Get the file info for a specific file. * Get the file info for a specific file.
* *
* @param inodeID * @param file
* inode identifier * file path
* @return file status metadata information * @return file status metadata information
*/ */
HdfsFileStatus getFileInfo(long inodeID) throws IOException; HdfsFileStatus getFileInfo(T file) throws IOException;
/** /**
* Returns all the live datanodes and its storage details. * Returns all the live datanodes and its storage details.
@ -127,15 +132,6 @@ public interface Context {
DatanodeStorageReport[] getLiveDatanodeStorageReport() DatanodeStorageReport[] getLiveDatanodeStorageReport()
throws IOException; throws IOException;
/**
* Returns true if the given inode file has low redundancy blocks.
*
* @param inodeID
* inode identifier
* @return true if block collection has low redundancy blocks
*/
boolean hasLowRedundancyBlocks(long inodeID);
/** /**
* Checks whether the given datanode has sufficient space to occupy the given * Checks whether the given datanode has sufficient space to occupy the given
* blockSize data. * blockSize data.
@ -153,26 +149,17 @@ public interface Context {
long blockSize); long blockSize);
/** /**
* @return next SPS path id to process. * @return next SPS path info to process.
*/ */
Long getNextSPSPathId(); T getNextSPSPath();
/** /**
* Removes the SPS path id. * Removes the SPS path id.
*/ */
void removeSPSPathId(long pathId); void removeSPSPathId(T pathId);
/** /**
* Removes all SPS path ids. * Removes all SPS path ids.
*/ */
void removeAllSPSPathIds(); void removeAllSPSPathIds();
/**
* Gets the file path for a given inode id.
*
* @param inodeId
* - path inode id.
*/
String getFilePath(Long inodeId);
} }

View File

@ -24,20 +24,25 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
/** /**
* An interface for scanning the directory recursively and collect file ids * An interface for scanning the directory recursively and collect files
* under the given directory. * under the given directory.
*
* @param <T>
* is identifier of inode or full path name of inode. Internal sps will
* use the file inodeId for the block movement. External sps will use
* file string path representation for the block movement.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface FileIdCollector { public interface FileCollector<T> {
/** /**
* Scans the given inode directory and collects the file ids under that * This method can be used to scan and collects the files under that
* directory and adds to the given BlockStorageMovementNeeded. * directory and adds to the given BlockStorageMovementNeeded.
* *
* @param inodeID * @param filePath
* - The directory ID * - file path
*/ */
void scanAndCollectFileIds(Long inodeId) void scanAndCollectFiles(T filePath)
throws IOException, InterruptedException; throws IOException, InterruptedException;
} }

View File

@ -47,17 +47,17 @@ import org.slf4j.LoggerFactory;
* movements to satisfy the storage policy. * movements to satisfy the storage policy.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class IntraSPSNameNodeContext implements Context { public class IntraSPSNameNodeContext implements Context<Long> {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(IntraSPSNameNodeContext.class); .getLogger(IntraSPSNameNodeContext.class);
private final Namesystem namesystem; private final Namesystem namesystem;
private final BlockManager blockManager; private final BlockManager blockManager;
private SPSService service; private SPSService<Long> service;
public IntraSPSNameNodeContext(Namesystem namesystem, public IntraSPSNameNodeContext(Namesystem namesystem,
BlockManager blockManager, SPSService service) { BlockManager blockManager, SPSService<Long> service) {
this.namesystem = namesystem; this.namesystem = namesystem;
this.blockManager = blockManager; this.blockManager = blockManager;
this.service = service; this.service = service;
@ -68,20 +68,18 @@ public class IntraSPSNameNodeContext implements Context {
return blockManager.getDatanodeManager().getNumLiveDataNodes(); return blockManager.getDatanodeManager().getNumLiveDataNodes();
} }
/**
* @return object containing information regarding the file or null if file
* not found.
*/
@Override @Override
public HdfsFileStatus getFileInfo(long inodeID) throws IOException { public HdfsFileStatus getFileInfo(Long inodeID) throws IOException {
String filePath = namesystem.getFilePath(inodeID); String filePath = namesystem.getFilePath(inodeID);
if (StringUtils.isBlank(filePath)) { if (StringUtils.isBlank(filePath)) {
LOG.debug("File with inodeID:{} doesn't exists!", inodeID); LOG.debug("File with inodeID:{} doesn't exists!", inodeID);
return null; return null;
} }
HdfsFileStatus fileInfo = null; return namesystem.getFileInfo(filePath, true, true);
try {
fileInfo = namesystem.getFileInfo(filePath, true, true);
} catch (IOException e) {
LOG.debug("File path:{} doesn't exists!", filePath);
}
return fileInfo;
} }
@Override @Override
@ -97,17 +95,12 @@ public class IntraSPSNameNodeContext implements Context {
} }
@Override @Override
public boolean hasLowRedundancyBlocks(long inodeId) { public boolean isFileExist(Long inodeId) {
return blockManager.hasLowRedundancyBlocks(inodeId);
}
@Override
public boolean isFileExist(long inodeId) {
return namesystem.getFSDirectory().getInode(inodeId) != null; return namesystem.getFSDirectory().getInode(inodeId) != null;
} }
@Override @Override
public void removeSPSHint(long inodeId) throws IOException { public void removeSPSHint(Long inodeId) throws IOException {
this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY); this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY);
} }
@ -177,12 +170,12 @@ public class IntraSPSNameNodeContext implements Context {
} }
@Override @Override
public Long getNextSPSPathId() { public Long getNextSPSPath() {
return blockManager.getSPSManager().getNextPathId(); return blockManager.getSPSManager().getNextPathId();
} }
@Override @Override
public void removeSPSPathId(long trackId) { public void removeSPSPathId(Long trackId) {
blockManager.getSPSManager().removePathId(trackId); blockManager.getSPSManager().removePathId(trackId);
} }
@ -190,10 +183,4 @@ public class IntraSPSNameNodeContext implements Context {
public void removeAllSPSPathIds() { public void removeAllSPSPathIds() {
blockManager.getSPSManager().removeAllPathIds(); blockManager.getSPSManager().removeAllPathIds();
} }
@Override
public String getFilePath(Long inodeId) {
return namesystem.getFilePath(inodeId);
}
} }

View File

@ -35,15 +35,16 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
implements FileIdCollector { implements FileCollector<Long> {
private int maxQueueLimitToScan; private int maxQueueLimitToScan;
private final SPSService service; private final SPSService <Long> service;
private int remainingCapacity = 0; private int remainingCapacity = 0;
private List<ItemInfo> currentBatch; private List<ItemInfo<Long>> currentBatch;
public IntraSPSNameNodeFileIdCollector(FSDirectory dir, SPSService service) { public IntraSPSNameNodeFileIdCollector(FSDirectory dir,
SPSService<Long> service) {
super(dir); super(dir);
this.service = service; this.service = service;
this.maxQueueLimitToScan = service.getConf().getInt( this.maxQueueLimitToScan = service.getConf().getInt(
@ -63,7 +64,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
return false; return false;
} }
if (inode.isFile() && inode.asFile().numBlocks() != 0) { if (inode.isFile() && inode.asFile().numBlocks() != 0) {
currentBatch.add(new ItemInfo( currentBatch.add(new ItemInfo<Long>(
((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId())); ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
remainingCapacity--; remainingCapacity--;
} }
@ -83,10 +84,10 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
} }
@Override @Override
protected void submitCurrentBatch(long startId) protected void submitCurrentBatch(Long startId)
throws IOException, InterruptedException { throws IOException, InterruptedException {
// Add current child's to queue // Add current child's to queue
service.addAllFileIdsToProcess(startId, service.addAllFilesToProcess(startId,
currentBatch, false); currentBatch, false);
currentBatch.clear(); currentBatch.clear();
} }
@ -119,7 +120,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
} }
@Override @Override
public void scanAndCollectFileIds(final Long startINodeId) public void scanAndCollectFiles(final Long startINodeId)
throws IOException, InterruptedException { throws IOException, InterruptedException {
FSDirectory fsd = getFSDirectory(); FSDirectory fsd = getFSDirectory();
INode startInode = fsd.getInode(startINodeId); INode startInode = fsd.getInode(startINodeId);
@ -129,9 +130,9 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
throttle(); throttle();
} }
if (startInode.isFile()) { if (startInode.isFile()) {
currentBatch.add(new ItemInfo(startInode.getId(), startInode.getId())); currentBatch
.add(new ItemInfo<Long>(startInode.getId(), startInode.getId()));
} else { } else {
readLock(); readLock();
// NOTE: this lock will not be held for full directory scanning. It is // NOTE: this lock will not be held for full directory scanning. It is
// basically a sliced locking. Once it collects a batch size( at max the // basically a sliced locking. Once it collects a batch size( at max the
@ -148,7 +149,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
} }
} }
// Mark startInode traverse is done, this is last-batch // Mark startInode traverse is done, this is last-batch
service.addAllFileIdsToProcess(startInode.getId(), currentBatch, true); service.addAllFilesToProcess(startInode.getId(), currentBatch, true);
currentBatch.clear(); currentBatch.clear();
} }
} }

View File

@ -21,48 +21,51 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
/** /**
* ItemInfo is a file info object for which need to satisfy the policy. * ItemInfo is a file info object for which need to satisfy the policy. For
* internal satisfier service, it uses inode id which is Long datatype. For the
* external satisfier service, it uses the full string representation of the
* path.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ItemInfo { public class ItemInfo<T> {
private long startId; private T startPath;
private long fileId; private T file;
private int retryCount; private int retryCount;
public ItemInfo(long startId, long fileId) { public ItemInfo(T startPath, T file) {
this.startId = startId; this.startPath = startPath;
this.fileId = fileId; this.file = file;
// set 0 when item is getting added first time in queue. // set 0 when item is getting added first time in queue.
this.retryCount = 0; this.retryCount = 0;
} }
public ItemInfo(final long startId, final long fileId, final int retryCount) { public ItemInfo(final T startPath, final T file, final int retryCount) {
this.startId = startId; this.startPath = startPath;
this.fileId = fileId; this.file = file;
this.retryCount = retryCount; this.retryCount = retryCount;
} }
/** /**
* Return the start inode id of the current track Id. This indicates that SPS * Returns the start path of the current file. This indicates that SPS
* was invoked on this inode id. * was invoked on this path.
*/ */
public long getStartId() { public T getStartPath() {
return startId; return startPath;
} }
/** /**
* Return the File inode Id for which needs to satisfy the policy. * Returns the file for which needs to satisfy the policy.
*/ */
public long getFileId() { public T getFile() {
return fileId; return file;
} }
/** /**
* Returns true if the tracking path is a directory, false otherwise. * Returns true if the tracking path is a directory, false otherwise.
*/ */
public boolean isDir() { public boolean isDir() {
return (startId != fileId); return !startPath.equals(file);
} }
/** /**

View File

@ -27,10 +27,15 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
/** /**
* An interface for SPSService, which exposes life cycle and processing APIs. * An interface for SPSService, which exposes life cycle and processing APIs.
*
* @param <T>
* is identifier of inode or full path name of inode. Internal sps will
* use the file inodeId for the block movement. External sps will use
* file string path representation for the block movement.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface SPSService { public interface SPSService<T> {
/** /**
* Initializes the helper services. * Initializes the helper services.
@ -38,7 +43,7 @@ public interface SPSService {
* @param ctxt * @param ctxt
* - context is an helper service to provide communication channel * - context is an helper service to provide communication channel
* between NN and SPS * between NN and SPS
* @param fileIDCollector * @param fileCollector
* - a helper service for scanning the files under a given directory * - a helper service for scanning the files under a given directory
* id * id
* @param handler * @param handler
@ -46,7 +51,7 @@ public interface SPSService {
* @param blkMovementListener * @param blkMovementListener
* - listener to know about block movement attempt completion * - listener to know about block movement attempt completion
*/ */
void init(Context ctxt, FileIdCollector fileIDCollector, void init(Context<T> ctxt, FileCollector<T> fileCollector,
BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener); BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener);
/** /**
@ -82,23 +87,24 @@ public interface SPSService {
boolean isRunning(); boolean isRunning();
/** /**
* Adds the Item information(file id etc) to processing queue. * Adds the Item information(file etc) to processing queue.
* *
* @param itemInfo * @param itemInfo
* file info object for which need to satisfy the policy
*/ */
void addFileIdToProcess(ItemInfo itemInfo, boolean scanCompleted); void addFileToProcess(ItemInfo<T> itemInfo, boolean scanCompleted);
/** /**
* Adds all the Item information(file id etc) to processing queue. * Adds all the Item information(file etc) to processing queue.
* *
* @param startId * @param startPath
* - directory/file id, on which SPS was called. * - directory/file, on which SPS was called.
* @param itemInfoList * @param itemInfoList
* - list of item infos * - list of item infos
* @param scanCompleted * @param scanCompleted
* - whether the scanning of directory fully done with itemInfoList * - whether the scanning of directory fully done with itemInfoList
*/ */
void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList, void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList,
boolean scanCompleted); boolean scanCompleted);
/** /**
@ -109,7 +115,7 @@ public interface SPSService {
/** /**
* Clear inodeId present in the processing queue. * Clear inodeId present in the processing queue.
*/ */
void clearQueue(long inodeId); void clearQueue(T spsPath);
/** /**
* @return the configuration. * @return the configuration.
@ -119,10 +125,10 @@ public interface SPSService {
/** /**
* Marks the scanning of directory if finished. * Marks the scanning of directory if finished.
* *
* @param inodeId * @param spsPath
* - directory inode id. * - satisfier path
*/ */
void markScanCompletedForPath(Long inodeId); void markScanCompletedForPath(T spsPath);
/** /**
* Notify the details of storage movement attempt finished blocks. * Notify the details of storage movement attempt finished blocks.

View File

@ -66,7 +66,7 @@ import com.google.common.base.Preconditions;
* storage policy type in Namespace, but physical block storage movement will * storage policy type in Namespace, but physical block storage movement will
* not happen until user runs "Mover Tool" explicitly for such files. The * not happen until user runs "Mover Tool" explicitly for such files. The
* StoragePolicySatisfier Daemon thread implemented for addressing the case * StoragePolicySatisfier Daemon thread implemented for addressing the case
* where users may want to physically move the blocks by a dedidated daemon (can * where users may want to physically move the blocks by a dedicated daemon (can
* run inside Namenode or stand alone) instead of running mover tool explicitly. * run inside Namenode or stand alone) instead of running mover tool explicitly.
* Just calling client API to satisfyStoragePolicy on a file/dir will * Just calling client API to satisfyStoragePolicy on a file/dir will
* automatically trigger to move its physical storage locations as expected in * automatically trigger to move its physical storage locations as expected in
@ -77,19 +77,19 @@ import com.google.common.base.Preconditions;
* physical block movements. * physical block movements.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class StoragePolicySatisfier implements SPSService, Runnable { public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(StoragePolicySatisfier.class); LoggerFactory.getLogger(StoragePolicySatisfier.class);
private Daemon storagePolicySatisfierThread; private Daemon storagePolicySatisfierThread;
private BlockStorageMovementNeeded storageMovementNeeded; private BlockStorageMovementNeeded<T> storageMovementNeeded;
private BlockStorageMovementAttemptedItems storageMovementsMonitor; private BlockStorageMovementAttemptedItems<T> storageMovementsMonitor;
private volatile boolean isRunning = false; private volatile boolean isRunning = false;
private volatile StoragePolicySatisfierMode spsMode = private volatile StoragePolicySatisfierMode spsMode =
StoragePolicySatisfierMode.NONE; StoragePolicySatisfierMode.NONE;
private int spsWorkMultiplier; private int spsWorkMultiplier;
private long blockCount = 0L; private long blockCount = 0L;
private int blockMovementMaxRetry; private int blockMovementMaxRetry;
private Context ctxt; private Context<T> ctxt;
private BlockMoveTaskHandler blockMoveTaskHandler; private BlockMoveTaskHandler blockMoveTaskHandler;
private final Configuration conf; private final Configuration conf;
@ -135,15 +135,15 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
} }
} }
public void init(final Context context, final FileIdCollector fileIDCollector, public void init(final Context<T> context,
final FileCollector<T> fileIDCollector,
final BlockMoveTaskHandler blockMovementTaskHandler, final BlockMoveTaskHandler blockMovementTaskHandler,
final BlockMovementListener blockMovementListener) { final BlockMovementListener blockMovementListener) {
this.ctxt = context; this.ctxt = context;
this.storageMovementNeeded = this.storageMovementNeeded = new BlockStorageMovementNeeded<T>(context,
new BlockStorageMovementNeeded(context, fileIDCollector); fileIDCollector);
this.storageMovementsMonitor = this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems<T>(
new BlockStorageMovementAttemptedItems(this, this, storageMovementNeeded, blockMovementListener);
storageMovementNeeded, blockMovementListener);
this.blockMoveTaskHandler = blockMovementTaskHandler; this.blockMoveTaskHandler = blockMovementTaskHandler;
this.spsWorkMultiplier = getSPSWorkMultiplier(getConf()); this.spsWorkMultiplier = getSPSWorkMultiplier(getConf());
this.blockMovementMaxRetry = getConf().getInt( this.blockMovementMaxRetry = getConf().getInt(
@ -257,24 +257,24 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
continue; continue;
} }
try { try {
ItemInfo<T> itemInfo = null;
boolean retryItem = false;
if (!ctxt.isInSafeMode()) { if (!ctxt.isInSafeMode()) {
ItemInfo itemInfo = storageMovementNeeded.get(); itemInfo = storageMovementNeeded.get();
if (itemInfo != null) { if (itemInfo != null) {
if(itemInfo.getRetryCount() >= blockMovementMaxRetry){ if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
LOG.info("Failed to satisfy the policy after " LOG.info("Failed to satisfy the policy after "
+ blockMovementMaxRetry + " retries. Removing inode " + blockMovementMaxRetry + " retries. Removing inode "
+ itemInfo.getFileId() + " from the queue"); + itemInfo.getFile() + " from the queue");
storageMovementNeeded.removeItemTrackInfo(itemInfo, false); storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
continue; continue;
} }
long trackId = itemInfo.getFileId(); T trackId = itemInfo.getFile();
BlocksMovingAnalysis status = null; BlocksMovingAnalysis status = null;
DatanodeStorageReport[] liveDnReports; DatanodeStorageReport[] liveDnReports;
BlockStoragePolicy existingStoragePolicy; BlockStoragePolicy existingStoragePolicy;
// TODO: presently, context internally acquire the lock // TODO: presently, context internally acquire the lock
// and returns the result. Need to discuss to move the lock outside? // and returns the result. Need to discuss to move the lock outside?
boolean hasLowRedundancyBlocks = ctxt
.hasLowRedundancyBlocks(trackId);
HdfsFileStatus fileStatus = ctxt.getFileInfo(trackId); HdfsFileStatus fileStatus = ctxt.getFileInfo(trackId);
// Check path existence. // Check path existence.
if (fileStatus == null || fileStatus.isDir()) { if (fileStatus == null || fileStatus.isDir()) {
@ -289,7 +289,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
HdfsLocatedFileStatus file = (HdfsLocatedFileStatus) fileStatus; HdfsLocatedFileStatus file = (HdfsLocatedFileStatus) fileStatus;
status = analyseBlocksStorageMovementsAndAssignToDN(file, status = analyseBlocksStorageMovementsAndAssignToDN(file,
hasLowRedundancyBlocks, existingStoragePolicy, liveDnReports); existingStoragePolicy, liveDnReports);
switch (status.status) { switch (status.status) {
// Just add to monitor, so it will be retried after timeout // Just add to monitor, so it will be retried after timeout
case ANALYSIS_SKIPPED_FOR_RETRY: case ANALYSIS_SKIPPED_FOR_RETRY:
@ -302,8 +302,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
+ "movement attempt finished report", + "movement attempt finished report",
status.status, fileStatus.getPath()); status.status, fileStatus.getPath());
} }
this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo this.storageMovementsMonitor.add(new AttemptedItemInfo<T>(
.getStartId(), itemInfo.getFileId(), monotonicNow(), itemInfo.getStartPath(), itemInfo.getFile(), monotonicNow(),
status.assignedBlocks, itemInfo.getRetryCount())); status.assignedBlocks, itemInfo.getRetryCount()));
break; break;
case NO_BLOCKS_TARGETS_PAIRED: case NO_BLOCKS_TARGETS_PAIRED:
@ -312,8 +312,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
+ " retry queue as none of the blocks found its eligible" + " retry queue as none of the blocks found its eligible"
+ " targets.", trackId, fileStatus.getPath()); + " targets.", trackId, fileStatus.getPath());
} }
itemInfo.increRetryCount(); retryItem = true;
this.storageMovementNeeded.add(itemInfo);
break; break;
case FEW_LOW_REDUNDANCY_BLOCKS: case FEW_LOW_REDUNDANCY_BLOCKS:
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -321,8 +320,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
+ "retry queue as some of the blocks are low redundant.", + "retry queue as some of the blocks are low redundant.",
trackId, fileStatus.getPath()); trackId, fileStatus.getPath());
} }
itemInfo.increRetryCount(); retryItem = true;
this.storageMovementNeeded.add(itemInfo);
break; break;
case BLOCKS_FAILED_TO_MOVE: case BLOCKS_FAILED_TO_MOVE:
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -330,7 +328,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
+ "retry queue as some of the blocks movement failed.", + "retry queue as some of the blocks movement failed.",
trackId, fileStatus.getPath()); trackId, fileStatus.getPath());
} }
this.storageMovementNeeded.add(itemInfo); retryItem = true;
break; break;
// Just clean Xattrs // Just clean Xattrs
case BLOCKS_TARGET_PAIRING_SKIPPED: case BLOCKS_TARGET_PAIRING_SKIPPED:
@ -354,6 +352,10 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
Thread.sleep(3000); Thread.sleep(3000);
blockCount = 0L; blockCount = 0L;
} }
if (retryItem) {
itemInfo.increRetryCount();
this.storageMovementNeeded.add(itemInfo);
}
} catch (IOException e) { } catch (IOException e) {
LOG.error("Exception during StoragePolicySatisfier execution - " LOG.error("Exception during StoragePolicySatisfier execution - "
+ "will continue next cycle", e); + "will continue next cycle", e);
@ -377,7 +379,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
} }
private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN( private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
HdfsLocatedFileStatus fileInfo, boolean hasLowRedundancyBlocks, HdfsLocatedFileStatus fileInfo,
BlockStoragePolicy existingStoragePolicy, BlockStoragePolicy existingStoragePolicy,
DatanodeStorageReport[] liveDns) { DatanodeStorageReport[] liveDns) {
BlocksMovingAnalysis.Status status = BlocksMovingAnalysis.Status status =
@ -403,9 +405,17 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
new ArrayList<>()); new ArrayList<>());
} }
List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>(); List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
boolean hasLowRedundancyBlocks = false;
int replication = fileInfo.getReplication();
for (int i = 0; i < blocks.size(); i++) { for (int i = 0; i < blocks.size(); i++) {
LocatedBlock blockInfo = blocks.get(i); LocatedBlock blockInfo = blocks.get(i);
// Block is considered as low redundancy when the block locations array
// length is less than expected replication factor. If any of the block is
// low redundant, then hasLowRedundancyBlocks will be marked as true.
hasLowRedundancyBlocks |= isLowRedundancyBlock(blockInfo, replication,
ecPolicy);
List<StorageType> expectedStorageTypes; List<StorageType> expectedStorageTypes;
if (blockInfo.isStriped()) { if (blockInfo.isStriped()) {
if (ErasureCodingPolicyManager if (ErasureCodingPolicyManager
@ -446,13 +456,15 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
// policy. // policy.
status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED; status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
} }
} else if (hasLowRedundancyBlocks
&& status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
// Check if the previous block was successfully paired.
status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
} }
} }
// If there is no block paired and few blocks are low redundant, so marking
// the status as FEW_LOW_REDUNDANCY_BLOCKS.
if (hasLowRedundancyBlocks
&& status == BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED) {
status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
}
List<Block> assignedBlockIds = new ArrayList<Block>(); List<Block> assignedBlockIds = new ArrayList<Block>();
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Check for at least one block storage movement has been chosen // Check for at least one block storage movement has been chosen
@ -470,6 +482,33 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
return new BlocksMovingAnalysis(status, assignedBlockIds); return new BlocksMovingAnalysis(status, assignedBlockIds);
} }
/**
* The given block is considered as low redundancy when the block locations
* length is less than expected replication factor. For EC blocks, redundancy
* is the summation of data + parity blocks.
*
* @param blockInfo
* block
* @param replication
* replication factor of the given file block
* @param ecPolicy
* erasure coding policy of the given file block
* @return true if the given block is low redundant.
*/
private boolean isLowRedundancyBlock(LocatedBlock blockInfo, int replication,
ErasureCodingPolicy ecPolicy) {
boolean hasLowRedundancyBlock = false;
if (blockInfo.isStriped()) {
// For EC blocks, redundancy is the summation of data + parity blocks.
replication = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
}
// block is considered as low redundancy when the block locations length is
// less than expected replication factor.
hasLowRedundancyBlock = blockInfo.getLocations().length < replication ? true
: false;
return hasLowRedundancyBlock;
}
/** /**
* Compute the list of block moving information corresponding to the given * Compute the list of block moving information corresponding to the given
* blockId. This will check that each block location of the given block is * blockId. This will check that each block location of the given block is
@ -863,7 +902,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
} }
@VisibleForTesting @VisibleForTesting
BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() { public BlockStorageMovementAttemptedItems<T> getAttemptedItemsMonitor() {
return storageMovementsMonitor; return storageMovementsMonitor;
} }
@ -880,7 +919,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
/** /**
* Clear queues for given track id. * Clear queues for given track id.
*/ */
public void clearQueue(long trackId) { public void clearQueue(T trackId) {
storageMovementNeeded.clearQueue(trackId); storageMovementNeeded.clearQueue(trackId);
} }
@ -889,7 +928,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
* attempted or reported time stamp. This is used by * attempted or reported time stamp. This is used by
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}. * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
*/ */
final static class AttemptedItemInfo extends ItemInfo { final static class AttemptedItemInfo<T> extends ItemInfo<T> {
private long lastAttemptedOrReportedTime; private long lastAttemptedOrReportedTime;
private final List<Block> blocks; private final List<Block> blocks;
@ -903,7 +942,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
* @param lastAttemptedOrReportedTime * @param lastAttemptedOrReportedTime
* last attempted or reported time * last attempted or reported time
*/ */
AttemptedItemInfo(long rootId, long trackId, AttemptedItemInfo(T rootId, T trackId,
long lastAttemptedOrReportedTime, long lastAttemptedOrReportedTime,
List<Block> blocks, int retryCount) { List<Block> blocks, int retryCount) {
super(rootId, trackId, retryCount); super(rootId, trackId, retryCount);
@ -932,24 +971,33 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
} }
/**
* Returns sps invoked path status. This method is used by internal satisfy
* storage policy service.
*
* @param path
* sps path
* @return storage policy satisfy path status
* @throws IOException
*/
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus( public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException { String path) throws IOException {
return storageMovementNeeded.getStatus(ctxt.getFileID(path)); return storageMovementNeeded.getStatus(ctxt.getFileID(path));
} }
@Override @Override
public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) { public void addFileToProcess(ItemInfo<T> trackInfo, boolean scanCompleted) {
storageMovementNeeded.add(trackInfo, scanCompleted); storageMovementNeeded.add(trackInfo, scanCompleted);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Added track info for inode {} to block " LOG.debug("Added track info for inode {} to block "
+ "storageMovementNeeded queue", trackInfo.getFileId()); + "storageMovementNeeded queue", trackInfo.getFile());
} }
} }
@Override @Override
public void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList, public void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList,
boolean scanCompleted) { boolean scanCompleted) {
getStorageMovementQueue().addAll(startId, itemInfoList, scanCompleted); getStorageMovementQueue().addAll(startPath, itemInfoList, scanCompleted);
} }
@Override @Override
@ -963,12 +1011,12 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
} }
@VisibleForTesting @VisibleForTesting
public BlockStorageMovementNeeded getStorageMovementQueue() { public BlockStorageMovementNeeded<T> getStorageMovementQueue() {
return storageMovementNeeded; return storageMovementNeeded;
} }
@Override @Override
public void markScanCompletedForPath(Long inodeId) { public void markScanCompletedForPath(T inodeId) {
getStorageMovementQueue().markScanCompletedForDir(inodeId); getStorageMovementQueue().markScanCompletedForDir(inodeId);
} }
@ -976,7 +1024,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
* Join main SPS thread. * Join main SPS thread.
*/ */
public void join() throws InterruptedException { public void join() throws InterruptedException {
//TODO Add join here on SPS rpc server also
storagePolicySatisfierThread.join(); storagePolicySatisfierThread.join();
} }

View File

@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
public class StoragePolicySatisfyManager { public class StoragePolicySatisfyManager {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(StoragePolicySatisfyManager.class); .getLogger(StoragePolicySatisfyManager.class);
private final StoragePolicySatisfier spsService; private final StoragePolicySatisfier<Long> spsService;
private final boolean storagePolicyEnabled; private final boolean storagePolicyEnabled;
private volatile StoragePolicySatisfierMode mode; private volatile StoragePolicySatisfierMode mode;
private final Queue<Long> pathsToBeTraveresed; private final Queue<Long> pathsToBeTraveresed;
@ -84,7 +84,7 @@ public class StoragePolicySatisfyManager {
pathsToBeTraveresed = new LinkedList<Long>(); pathsToBeTraveresed = new LinkedList<Long>();
// instantiate SPS service by just keeps config reference and not starting // instantiate SPS service by just keeps config reference and not starting
// any supporting threads. // any supporting threads.
spsService = new StoragePolicySatisfier(conf); spsService = new StoragePolicySatisfier<Long>(conf);
this.namesystem = namesystem; this.namesystem = namesystem;
this.blkMgr = blkMgr; this.blkMgr = blkMgr;
} }
@ -309,7 +309,7 @@ public class StoragePolicySatisfyManager {
/** /**
* @return internal SPS service instance. * @return internal SPS service instance.
*/ */
public SPSService getInternalSPSService() { public SPSService<Long> getInternalSPSService() {
return this.spsService; return this.spsService;
} }

View File

@ -206,21 +206,11 @@ public interface NamenodeProtocol {
boolean isRollingUpgrade() throws IOException; boolean isRollingUpgrade() throws IOException;
/** /**
* Gets the file path for the given file id. This API used by External SPS. * @return Gets the next available sps path, otherwise null. This API used
*
* @param inodeId
* - file inode id.
* @return path
*/
@Idempotent
String getFilePath(Long inodeId) throws IOException;
/**
* @return Gets the next available sps path id, otherwise null. This API used
* by External SPS. * by External SPS.
*/ */
@AtMostOnce @AtMostOnce
Long getNextSPSPathId() throws IOException; String getNextSPSPath() throws IOException;
/** /**
* Verifies whether the given Datanode has the enough estimated size with * Verifies whether the given Datanode has the enough estimated size with
@ -236,15 +226,5 @@ public interface NamenodeProtocol {
@Idempotent @Idempotent
boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type, boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long estimatedSize) throws IOException; long estimatedSize) throws IOException;
/**
* Check if any low redundancy blocks for given file id. This API used by
* External SPS.
*
* @param inodeID
* - inode id.
*/
@Idempotent
boolean hasLowRedundancyBlocks(long inodeID) throws IOException;
} }

View File

@ -81,11 +81,11 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
private final SaslDataTransferClient saslClient; private final SaslDataTransferClient saslClient;
private final BlockStorageMovementTracker blkMovementTracker; private final BlockStorageMovementTracker blkMovementTracker;
private Daemon movementTrackerThread; private Daemon movementTrackerThread;
private final SPSService service; private final SPSService<String> service;
private final BlockDispatcher blkDispatcher; private final BlockDispatcher blkDispatcher;
public ExternalSPSBlockMoveTaskHandler(Configuration conf, public ExternalSPSBlockMoveTaskHandler(Configuration conf,
NameNodeConnector nnc, SPSService spsService) { NameNodeConnector nnc, SPSService<String> spsService) {
int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
moveExecutor = initializeBlockMoverThreadPool(moverThreads); moveExecutor = initializeBlockMoverThreadPool(moverThreads);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.sps; package org.apache.hadoop.hdfs.server.sps;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -30,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -46,15 +48,15 @@ import org.slf4j.LoggerFactory;
* SPS from Namenode state. * SPS from Namenode state.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ExternalSPSContext implements Context { public class ExternalSPSContext implements Context<String> {
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(ExternalSPSContext.class); LoggerFactory.getLogger(ExternalSPSContext.class);
private SPSService service; private SPSService<String> service;
private NameNodeConnector nnc = null; private NameNodeConnector nnc = null;
private BlockStoragePolicySuite createDefaultSuite = private BlockStoragePolicySuite createDefaultSuite =
BlockStoragePolicySuite.createDefaultSuite(); BlockStoragePolicySuite.createDefaultSuite();
public ExternalSPSContext(SPSService service, NameNodeConnector nnc) { public ExternalSPSContext(SPSService<String> service, NameNodeConnector nnc) {
this.service = service; this.service = service;
this.nnc = nnc; this.nnc = nnc;
} }
@ -110,14 +112,12 @@ public class ExternalSPSContext implements Context {
} }
@Override @Override
public boolean isFileExist(long inodeId) { public boolean isFileExist(String filePath) {
String filePath = null;
try { try {
filePath = getFilePath(inodeId);
return nnc.getDistributedFileSystem().exists(new Path(filePath)); return nnc.getDistributedFileSystem().exists(new Path(filePath));
} catch (IllegalArgumentException | IOException e) { } catch (IllegalArgumentException | IOException e) {
LOG.warn("Exception while getting file is for the given path:{} " LOG.warn("Exception while getting file is for the given path:{}",
+ "and fileId:{}", filePath, inodeId, e); filePath, e);
} }
return false; return false;
} }
@ -133,8 +133,8 @@ public class ExternalSPSContext implements Context {
} }
@Override @Override
public void removeSPSHint(long inodeId) throws IOException { public void removeSPSHint(String inodeId) throws IOException {
nnc.getDistributedFileSystem().removeXAttr(new Path(getFilePath(inodeId)), nnc.getDistributedFileSystem().removeXAttr(new Path(inodeId),
HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY); HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
} }
@ -150,9 +150,15 @@ public class ExternalSPSContext implements Context {
} }
@Override @Override
public HdfsFileStatus getFileInfo(long inodeID) throws IOException { public HdfsFileStatus getFileInfo(String path) throws IOException {
return nnc.getDistributedFileSystem().getClient() HdfsLocatedFileStatus fileInfo = null;
.getLocatedFileInfo(getFilePath(inodeID), false); try {
fileInfo = nnc.getDistributedFileSystem().getClient()
.getLocatedFileInfo(path, false);
} catch (FileNotFoundException e) {
LOG.debug("Path:{} doesn't exists!", path, e);
}
return fileInfo;
} }
@Override @Override
@ -161,17 +167,6 @@ public class ExternalSPSContext implements Context {
return nnc.getLiveDatanodeStorageReport(); return nnc.getLiveDatanodeStorageReport();
} }
@Override
public boolean hasLowRedundancyBlocks(long inodeID) {
try {
return nnc.getNNProtocolConnection().hasLowRedundancyBlocks(inodeID);
} catch (IOException e) {
LOG.warn("Failed to check whether fileid:{} has low redundancy blocks.",
inodeID, e);
return false;
}
}
@Override @Override
public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type, public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long estimatedSize) { long estimatedSize) {
@ -190,9 +185,9 @@ public class ExternalSPSContext implements Context {
} }
@Override @Override
public Long getNextSPSPathId() { public String getNextSPSPath() {
try { try {
return nnc.getNNProtocolConnection().getNextSPSPathId(); return nnc.getNNProtocolConnection().getNextSPSPath();
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Exception while getting next sps path id from Namenode.", e); LOG.warn("Exception while getting next sps path id from Namenode.", e);
return null; return null;
@ -200,7 +195,7 @@ public class ExternalSPSContext implements Context {
} }
@Override @Override
public void removeSPSPathId(long pathId) { public void removeSPSPathId(String pathId) {
// We need not specifically implement for external. // We need not specifically implement for external.
} }
@ -208,15 +203,4 @@ public class ExternalSPSContext implements Context {
public void removeAllSPSPathIds() { public void removeAllSPSPathIds() {
// We need not specifically implement for external. // We need not specifically implement for external.
} }
@Override
public String getFilePath(Long inodeId) {
try {
return nnc.getNNProtocolConnection().getFilePath(inodeId);
} catch (IOException e) {
LOG.warn("Exception while getting file path id:{} from Namenode.",
inodeId, e);
return null;
}
}
} }

View File

@ -28,8 +28,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.sps.Context; import org.apache.hadoop.hdfs.server.namenode.sps.FileCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo; import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -38,19 +37,18 @@ import org.slf4j.LoggerFactory;
/** /**
* This class is to scan the paths recursively. If file is directory, then it * This class is to scan the paths recursively. If file is directory, then it
* will scan for files recursively. If the file is non directory, then it will * will scan for files recursively. If the file is non directory, then it will
* just submit the same file to process. * just submit the same file to process. This will use file string path
* representation.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ExternalSPSFileIDCollector implements FileIdCollector { public class ExternalSPSFilePathCollector implements FileCollector <String>{
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(ExternalSPSFileIDCollector.class); LoggerFactory.getLogger(ExternalSPSFilePathCollector.class);
private Context cxt;
private DistributedFileSystem dfs; private DistributedFileSystem dfs;
private SPSService service; private SPSService<String> service;
private int maxQueueLimitToScan; private int maxQueueLimitToScan;
public ExternalSPSFileIDCollector(Context cxt, SPSService service) { public ExternalSPSFilePathCollector(SPSService<String> service) {
this.cxt = cxt;
this.service = service; this.service = service;
this.maxQueueLimitToScan = service.getConf().getInt( this.maxQueueLimitToScan = service.getConf().getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
@ -74,38 +72,39 @@ public class ExternalSPSFileIDCollector implements FileIdCollector {
* Recursively scan the given path and add the file info to SPS service for * Recursively scan the given path and add the file info to SPS service for
* processing. * processing.
*/ */
private long processPath(long startID, String fullPath) { private long processPath(String startID, String childPath) {
long pendingWorkCount = 0; // to be satisfied file counter long pendingWorkCount = 0; // to be satisfied file counter
for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) { for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
final DirectoryListing children; final DirectoryListing children;
try { try {
children = dfs.getClient().listPaths(fullPath, lastReturnedName, false); children = dfs.getClient().listPaths(childPath, lastReturnedName,
false);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to list directory " + fullPath LOG.warn("Failed to list directory " + childPath
+ ". Ignore the directory and continue.", e); + ". Ignore the directory and continue.", e);
return pendingWorkCount; return pendingWorkCount;
} }
if (children == null) { if (children == null) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("The scanning start dir/sub dir " + fullPath LOG.debug("The scanning start dir/sub dir " + childPath
+ " does not have childrens."); + " does not have childrens.");
} }
return pendingWorkCount; return pendingWorkCount;
} }
for (HdfsFileStatus child : children.getPartialListing()) { for (HdfsFileStatus child : children.getPartialListing()) {
String childFullPath = child.getFullName(childPath);
if (child.isFile()) { if (child.isFile()) {
service.addFileIdToProcess(new ItemInfo(startID, child.getFileId()), service.addFileToProcess(
false); new ItemInfo<String>(startID, childFullPath), false);
checkProcessingQueuesFree(); checkProcessingQueuesFree();
pendingWorkCount++; // increment to be satisfied file count pendingWorkCount++; // increment to be satisfied file count
} else { } else {
String fullPathStr = child.getFullName(fullPath);
if (child.isDirectory()) { if (child.isDirectory()) {
if (!fullPathStr.endsWith(Path.SEPARATOR)) { if (!childFullPath.endsWith(Path.SEPARATOR)) {
fullPathStr = fullPathStr + Path.SEPARATOR; childFullPath = childFullPath + Path.SEPARATOR;
} }
pendingWorkCount += processPath(startID, fullPathStr); pendingWorkCount += processPath(startID, childFullPath);
} }
} }
} }
@ -151,12 +150,11 @@ public class ExternalSPSFileIDCollector implements FileIdCollector {
} }
@Override @Override
public void scanAndCollectFileIds(Long inodeId) throws IOException { public void scanAndCollectFiles(String path) throws IOException {
if (dfs == null) { if (dfs == null) {
dfs = getFS(service.getConf()); dfs = getFS(service.getConf());
} }
long pendingSatisfyItemsCount = processPath(inodeId, long pendingSatisfyItemsCount = processPath(path, path);
cxt.getFilePath(inodeId));
// Check whether the given path contains any item to be tracked // Check whether the given path contains any item to be tracked
// or the no to be satisfied paths. In case of empty list, add the given // or the no to be satisfied paths. In case of empty list, add the given
// inodeId to the 'pendingWorkForDirectory' with empty list so that later // inodeId to the 'pendingWorkForDirectory' with empty list so that later
@ -164,10 +162,10 @@ public class ExternalSPSFileIDCollector implements FileIdCollector {
// this path is already satisfied the storage policy. // this path is already satisfied the storage policy.
if (pendingSatisfyItemsCount <= 0) { if (pendingSatisfyItemsCount <= 0) {
LOG.debug("There is no pending items to satisfy the given path " LOG.debug("There is no pending items to satisfy the given path "
+ "inodeId:{}", inodeId); + "inodeId:{}", path);
service.addAllFileIdsToProcess(inodeId, new ArrayList<>(), true); service.addAllFilesToProcess(path, new ArrayList<>(), true);
} else { } else {
service.markScanCompletedForPath(inodeId); service.markScanCompletedForPath(path);
} }
} }

View File

@ -68,7 +68,8 @@ public final class ExternalStoragePolicySatisfier {
HdfsConfiguration spsConf = new HdfsConfiguration(); HdfsConfiguration spsConf = new HdfsConfiguration();
// login with SPS keytab // login with SPS keytab
secureLogin(spsConf); secureLogin(spsConf);
StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf); StoragePolicySatisfier<String> sps = new StoragePolicySatisfier<String>(
spsConf);
nnc = getNameNodeConnector(spsConf); nnc = getNameNodeConnector(spsConf);
boolean spsRunning; boolean spsRunning;
@ -86,8 +87,8 @@ public final class ExternalStoragePolicySatisfier {
ExternalSPSBlockMoveTaskHandler externalHandler = ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps); new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps);
externalHandler.init(); externalHandler.init();
sps.init(context, new ExternalSPSFileIDCollector(context, sps), sps.init(context, new ExternalSPSFilePathCollector(sps), externalHandler,
externalHandler, blkMoveListener); blkMoveListener);
sps.start(true, StoragePolicySatisfierMode.EXTERNAL); sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
if (sps != null) { if (sps != null) {
sps.join(); sps.join();

View File

@ -214,11 +214,11 @@ message GetFilePathResponseProto {
required string srcPath = 1; required string srcPath = 1;
} }
message GetNextSPSPathIdRequestProto { message GetNextSPSPathRequestProto {
} }
message GetNextSPSPathIdResponseProto { message GetNextSPSPathResponseProto {
optional uint64 fileId = 1; optional string spsPath = 1;
} }
message CheckDNSpaceRequestProto { message CheckDNSpaceRequestProto {
@ -322,26 +322,15 @@ service NamenodeProtocolService {
returns (IsRollingUpgradeResponseProto); returns (IsRollingUpgradeResponseProto);
/** /**
* Return the corresponding file path for give file id * Return the sps path from namenode
*/ */
rpc getFilePath(GetFilePathRequestProto) rpc getNextSPSPath(GetNextSPSPathRequestProto)
returns (GetFilePathResponseProto); returns (GetNextSPSPathResponseProto);
/** /**
* Return the sps path id from namenode * Verifies whether the given Datanode has the enough estimated size with
*/ * given storage type for scheduling the block movement.
rpc getNextSPSPathId(GetNextSPSPathIdRequestProto)
returns (GetNextSPSPathIdResponseProto);
/**
* Return the sps path id from namenode
*/ */
rpc checkDNSpaceForScheduling(CheckDNSpaceRequestProto) rpc checkDNSpaceForScheduling(CheckDNSpaceRequestProto)
returns (CheckDNSpaceResponseProto); returns (CheckDNSpaceResponseProto);
/**
* check whether given file id has low redundancy blocks.
*/
rpc hasLowRedundancyBlocks(HasLowRedundancyBlocksRequestProto)
returns (HasLowRedundancyBlocksResponseProto);
} }

View File

@ -40,22 +40,21 @@ import org.mockito.Mockito;
*/ */
public class TestBlockStorageMovementAttemptedItems { public class TestBlockStorageMovementAttemptedItems {
private BlockStorageMovementAttemptedItems bsmAttemptedItems = null; private BlockStorageMovementAttemptedItems<Long> bsmAttemptedItems;
private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null; private BlockStorageMovementNeeded<Long> unsatisfiedStorageMovementFiles;
private final int selfRetryTimeout = 500; private final int selfRetryTimeout = 500;
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
Configuration config = new HdfsConfiguration(); Configuration config = new HdfsConfiguration();
Context ctxt = Mockito.mock(Context.class); Context<Long> ctxt = Mockito.mock(IntraSPSNameNodeContext.class);
SPSService sps = Mockito.mock(StoragePolicySatisfier.class); SPSService<Long> sps = new StoragePolicySatisfier<Long>(config);
Mockito.when(sps.getConf()).thenReturn(config);
Mockito.when(ctxt.isRunning()).thenReturn(true); Mockito.when(ctxt.isRunning()).thenReturn(true);
Mockito.when(ctxt.isInSafeMode()).thenReturn(false); Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true); Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
unsatisfiedStorageMovementFiles = unsatisfiedStorageMovementFiles =
new BlockStorageMovementNeeded(ctxt, null); new BlockStorageMovementNeeded<Long>(ctxt, null);
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps, bsmAttemptedItems = new BlockStorageMovementAttemptedItems<Long>(sps,
unsatisfiedStorageMovementFiles, null); unsatisfiedStorageMovementFiles, null);
} }
@ -72,9 +71,9 @@ public class TestBlockStorageMovementAttemptedItems {
long stopTime = monotonicNow() + (retryTimeout * 2); long stopTime = monotonicNow() + (retryTimeout * 2);
boolean isItemFound = false; boolean isItemFound = false;
while (monotonicNow() < (stopTime)) { while (monotonicNow() < (stopTime)) {
ItemInfo ele = null; ItemInfo<Long> ele = null;
while ((ele = unsatisfiedStorageMovementFiles.get()) != null) { while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
if (item == ele.getFileId()) { if (item == ele.getFile()) {
isItemFound = true; isItemFound = true;
break; break;
} }
@ -97,7 +96,7 @@ public class TestBlockStorageMovementAttemptedItems {
Long item = new Long(1234); Long item = new Long(1234);
List<Block> blocks = new ArrayList<Block>(); List<Block> blocks = new ArrayList<Block>();
blocks.add(new Block(item)); blocks.add(new Block(item));
bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0)); bsmAttemptedItems.add(new AttemptedItemInfo<Long>(0L, 0L, 0L, blocks, 0));
Block[] blockArray = new Block[blocks.size()]; Block[] blockArray = new Block[blocks.size()];
blocks.toArray(blockArray); blocks.toArray(blockArray);
bsmAttemptedItems.notifyMovementTriedBlocks(blockArray); bsmAttemptedItems.notifyMovementTriedBlocks(blockArray);
@ -114,7 +113,7 @@ public class TestBlockStorageMovementAttemptedItems {
Long item = new Long(1234); Long item = new Long(1234);
List<Block> blocks = new ArrayList<>(); List<Block> blocks = new ArrayList<>();
blocks.add(new Block(item)); blocks.add(new Block(item));
bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0)); bsmAttemptedItems.add(new AttemptedItemInfo<Long>(0L, 0L, 0L, blocks, 0));
assertEquals("Shouldn't receive result", 0, assertEquals("Shouldn't receive result", 0,
bsmAttemptedItems.getMovementFinishedBlocksCount()); bsmAttemptedItems.getMovementFinishedBlocksCount());
assertEquals("Item doesn't exist in the attempted list", 1, assertEquals("Item doesn't exist in the attempted list", 1,
@ -135,7 +134,7 @@ public class TestBlockStorageMovementAttemptedItems {
blocks.add(new Block(5678L)); blocks.add(new Block(5678L));
Long trackID = 0L; Long trackID = 0L;
bsmAttemptedItems bsmAttemptedItems
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
Block[] blksMovementReport = new Block[1]; Block[] blksMovementReport = new Block[1];
blksMovementReport[0] = new Block(item); blksMovementReport[0] = new Block(item);
bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
@ -160,7 +159,7 @@ public class TestBlockStorageMovementAttemptedItems {
List<Block> blocks = new ArrayList<>(); List<Block> blocks = new ArrayList<>();
blocks.add(new Block(item)); blocks.add(new Block(item));
bsmAttemptedItems bsmAttemptedItems
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
Block[] blksMovementReport = new Block[1]; Block[] blksMovementReport = new Block[1];
blksMovementReport[0] = new Block(item); blksMovementReport[0] = new Block(item);
bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
@ -188,7 +187,7 @@ public class TestBlockStorageMovementAttemptedItems {
List<Block> blocks = new ArrayList<>(); List<Block> blocks = new ArrayList<>();
blocks.add(new Block(item)); blocks.add(new Block(item));
bsmAttemptedItems bsmAttemptedItems
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
Block[] blksMovementReport = new Block[1]; Block[] blksMovementReport = new Block[1];
blksMovementReport[0] = new Block(item); blksMovementReport[0] = new Block(item);
bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);

View File

@ -105,7 +105,7 @@ public class TestStoragePolicySatisfier {
public static final int NUM_OF_DATANODES = 3; public static final int NUM_OF_DATANODES = 3;
public static final int STORAGES_PER_DATANODE = 2; public static final int STORAGES_PER_DATANODE = 2;
public static final long CAPACITY = 2 * 256 * 1024 * 1024; public static final long CAPACITY = 2 * 256 * 1024 * 1024;
public static final String FILE = "/testMoveWhenStoragePolicyNotSatisfying"; public static final String FILE = "/testMoveToSatisfyStoragePolicy";
public static final int DEFAULT_BLOCK_SIZE = 1024; public static final int DEFAULT_BLOCK_SIZE = 1024;
/** /**
@ -1269,8 +1269,9 @@ public class TestStoragePolicySatisfier {
//Queue limit can control the traverse logic to wait for some free //Queue limit can control the traverse logic to wait for some free
//entry in queue. After 10 files, traverse control will be on U. //entry in queue. After 10 files, traverse control will be on U.
StoragePolicySatisfier sps = new StoragePolicySatisfier(config); StoragePolicySatisfier<Long> sps = new StoragePolicySatisfier<Long>(config);
Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(), Context<Long> ctxt = new IntraSPSNameNodeContext(
hdfsCluster.getNamesystem(),
hdfsCluster.getNamesystem().getBlockManager(), sps) { hdfsCluster.getNamesystem().getBlockManager(), sps) {
@Override @Override
public boolean isInSafeMode() { public boolean isInSafeMode() {
@ -1283,7 +1284,7 @@ public class TestStoragePolicySatisfier {
} }
}; };
FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt); FileCollector<Long> fileIDCollector = createFileIdCollector(sps, ctxt);
sps.init(ctxt, fileIDCollector, null, null); sps.init(ctxt, fileIDCollector, null, null);
sps.getStorageMovementQueue().activate(); sps.getStorageMovementQueue().activate();
@ -1300,9 +1301,9 @@ public class TestStoragePolicySatisfier {
dfs.delete(new Path("/root"), true); dfs.delete(new Path("/root"), true);
} }
public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps, public FileCollector<Long> createFileIdCollector(
Context ctxt) { StoragePolicySatisfier<Long> sps, Context<Long> ctxt) {
FileIdCollector fileIDCollector = new IntraSPSNameNodeFileIdCollector( FileCollector<Long> fileIDCollector = new IntraSPSNameNodeFileIdCollector(
hdfsCluster.getNamesystem().getFSDirectory(), sps); hdfsCluster.getNamesystem().getFSDirectory(), sps);
return fileIDCollector; return fileIDCollector;
} }
@ -1337,8 +1338,9 @@ public class TestStoragePolicySatisfier {
// Queue limit can control the traverse logic to wait for some free // Queue limit can control the traverse logic to wait for some free
// entry in queue. After 10 files, traverse control will be on U. // entry in queue. After 10 files, traverse control will be on U.
StoragePolicySatisfier sps = new StoragePolicySatisfier(config); StoragePolicySatisfier<Long> sps = new StoragePolicySatisfier<Long>(config);
Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(), Context<Long> ctxt = new IntraSPSNameNodeContext(
hdfsCluster.getNamesystem(),
hdfsCluster.getNamesystem().getBlockManager(), sps) { hdfsCluster.getNamesystem().getBlockManager(), sps) {
@Override @Override
public boolean isInSafeMode() { public boolean isInSafeMode() {
@ -1350,7 +1352,7 @@ public class TestStoragePolicySatisfier {
return true; return true;
} }
}; };
FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt); FileCollector<Long> fileIDCollector = createFileIdCollector(sps, ctxt);
sps.init(ctxt, fileIDCollector, null, null); sps.init(ctxt, fileIDCollector, null, null);
sps.getStorageMovementQueue().activate(); sps.getStorageMovementQueue().activate();
@ -1368,16 +1370,16 @@ public class TestStoragePolicySatisfier {
} }
private void assertTraversal(List<String> expectedTraverseOrder, private void assertTraversal(List<String> expectedTraverseOrder,
FSDirectory fsDir, StoragePolicySatisfier sps) FSDirectory fsDir, StoragePolicySatisfier<Long> sps)
throws InterruptedException { throws InterruptedException {
// Remove 10 element and make queue free, So other traversing will start. // Remove 10 element and make queue free, So other traversing will start.
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
String path = expectedTraverseOrder.remove(0); String path = expectedTraverseOrder.remove(0);
ItemInfo itemInfo = sps.getStorageMovementQueue().get(); ItemInfo<Long> itemInfo = sps.getStorageMovementQueue().get();
if (itemInfo == null) { if (itemInfo == null) {
continue; continue;
} }
long trackId = itemInfo.getFileId(); Long trackId = itemInfo.getFile();
INode inode = fsDir.getInode(trackId); INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got " assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName())); + inode.getFullPathName(), path.equals(inode.getFullPathName()));
@ -1388,11 +1390,11 @@ public class TestStoragePolicySatisfier {
// Check other element traversed in order and E, M, U, R, S should not be // Check other element traversed in order and E, M, U, R, S should not be
// added in queue which we already removed from expected list // added in queue which we already removed from expected list
for (String path : expectedTraverseOrder) { for (String path : expectedTraverseOrder) {
ItemInfo itemInfo = sps.getStorageMovementQueue().get(); ItemInfo<Long> itemInfo = sps.getStorageMovementQueue().get();
if (itemInfo == null) { if (itemInfo == null) {
continue; continue;
} }
long trackId = itemInfo.getFileId(); Long trackId = itemInfo.getFile();
INode inode = fsDir.getInode(trackId); INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got " assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName())); + inode.getFullPathName(), path.equals(inode.getFullPathName()));
@ -1696,39 +1698,41 @@ public class TestStoragePolicySatisfier {
return file1; return file1;
} }
private void waitForAttemptedItems(long expectedBlkMovAttemptedCount, public void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
int timeout) throws TimeoutException, InterruptedException { int timeout) throws TimeoutException, InterruptedException {
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager final StoragePolicySatisfier<Long> sps =
.getSPSManager().getInternalSPSService(); (StoragePolicySatisfier<Long>) blockManager.getSPSManager()
.getInternalSPSService();
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {
LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
expectedBlkMovAttemptedCount, expectedBlkMovAttemptedCount,
((BlockStorageMovementAttemptedItems) (sps ((BlockStorageMovementAttemptedItems<Long>) (sps
.getAttemptedItemsMonitor())).getAttemptedItemsCount()); .getAttemptedItemsMonitor())).getAttemptedItemsCount());
return ((BlockStorageMovementAttemptedItems) (sps return ((BlockStorageMovementAttemptedItems<Long>) (sps
.getAttemptedItemsMonitor())) .getAttemptedItemsMonitor()))
.getAttemptedItemsCount() == expectedBlkMovAttemptedCount; .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
} }
}, 100, timeout); }, 100, timeout);
} }
private void waitForBlocksMovementAttemptReport( public void waitForBlocksMovementAttemptReport(
long expectedMovementFinishedBlocksCount, int timeout) long expectedMovementFinishedBlocksCount, int timeout)
throws TimeoutException, InterruptedException { throws TimeoutException, InterruptedException {
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager final StoragePolicySatisfier<Long> sps =
(StoragePolicySatisfier<Long>) blockManager
.getSPSManager().getInternalSPSService(); .getSPSManager().getInternalSPSService();
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
expectedMovementFinishedBlocksCount, expectedMovementFinishedBlocksCount,
((BlockStorageMovementAttemptedItems) (sps ((BlockStorageMovementAttemptedItems<Long>) (sps
.getAttemptedItemsMonitor())).getMovementFinishedBlocksCount()); .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount());
return ((BlockStorageMovementAttemptedItems) (sps return ((BlockStorageMovementAttemptedItems<Long>) (sps
.getAttemptedItemsMonitor())) .getAttemptedItemsMonitor()))
.getMovementFinishedBlocksCount() .getMovementFinishedBlocksCount()
>= expectedMovementFinishedBlocksCount; >= expectedMovementFinishedBlocksCount;

View File

@ -495,16 +495,17 @@ public class TestStoragePolicySatisfierWithStripedFile {
long expectedBlkMovAttemptedCount, int timeout) long expectedBlkMovAttemptedCount, int timeout)
throws TimeoutException, InterruptedException { throws TimeoutException, InterruptedException {
BlockManager blockManager = cluster.getNamesystem().getBlockManager(); BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager final StoragePolicySatisfier<Long> sps =
(StoragePolicySatisfier<Long>) blockManager
.getSPSManager().getInternalSPSService(); .getSPSManager().getInternalSPSService();
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {
LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
expectedBlkMovAttemptedCount, expectedBlkMovAttemptedCount,
((BlockStorageMovementAttemptedItems) sps ((BlockStorageMovementAttemptedItems<Long>) sps
.getAttemptedItemsMonitor()).getAttemptedItemsCount()); .getAttemptedItemsMonitor()).getAttemptedItemsCount());
return ((BlockStorageMovementAttemptedItems) sps return ((BlockStorageMovementAttemptedItems<Long>) sps
.getAttemptedItemsMonitor()) .getAttemptedItemsMonitor())
.getAttemptedItemsCount() == expectedBlkMovAttemptedCount; .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
} }
@ -567,7 +568,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
long expectedMoveFinishedBlks, int timeout) long expectedMoveFinishedBlks, int timeout)
throws TimeoutException, InterruptedException { throws TimeoutException, InterruptedException {
BlockManager blockManager = cluster.getNamesystem().getBlockManager(); BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager final StoragePolicySatisfier<Long> sps =
(StoragePolicySatisfier<Long>) blockManager
.getSPSManager().getInternalSPSService(); .getSPSManager().getInternalSPSService();
Assert.assertNotNull("Failed to get SPS object reference!", sps); Assert.assertNotNull("Failed to get SPS object reference!", sps);
@ -575,9 +577,10 @@ public class TestStoragePolicySatisfierWithStripedFile {
@Override @Override
public Boolean get() { public Boolean get() {
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
expectedMoveFinishedBlks, ((BlockStorageMovementAttemptedItems) sps expectedMoveFinishedBlks,
((BlockStorageMovementAttemptedItems<Long>) sps
.getAttemptedItemsMonitor()).getMovementFinishedBlocksCount()); .getAttemptedItemsMonitor()).getMovementFinishedBlocksCount());
return ((BlockStorageMovementAttemptedItems) sps return ((BlockStorageMovementAttemptedItems<Long>) sps
.getAttemptedItemsMonitor()) .getAttemptedItemsMonitor())
.getMovementFinishedBlocksCount() >= expectedMoveFinishedBlks; .getMovementFinishedBlocksCount() >= expectedMoveFinishedBlks;
} }

View File

@ -43,23 +43,23 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener; import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.Context; import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems;
import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier; import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
@ -74,6 +74,8 @@ import org.junit.Assert;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Supplier;
/** /**
* Tests the external sps service plugins. * Tests the external sps service plugins.
*/ */
@ -88,6 +90,8 @@ public class TestExternalStoragePolicySatisfier
private String principal; private String principal;
private MiniKdc kdc; private MiniKdc kdc;
private File baseDir; private File baseDir;
private StoragePolicySatisfier<String> externalSps;
private ExternalSPSContext externalCtxt;
@After @After
public void destroy() throws Exception { public void destroy() throws Exception {
@ -97,6 +101,14 @@ public class TestExternalStoragePolicySatisfier
} }
} }
@Override
public void shutdownCluster() {
if (externalSps != null) {
externalSps.stopGracefully();
}
super.shutdownCluster();
}
@Override @Override
public void setUp() { public void setUp() {
super.setUp(); super.setUp();
@ -131,60 +143,44 @@ public class TestExternalStoragePolicySatisfier
nnc = getNameNodeConnector(getConf()); nnc = getNameNodeConnector(getConf());
BlockManager blkMgr = cluster.getNameNode().getNamesystem() externalSps = new StoragePolicySatisfier<String>(getConf());
.getBlockManager(); externalCtxt = new ExternalSPSContext(externalSps,
SPSService spsService = blkMgr.getSPSManager().getInternalSPSService();
spsService.stopGracefully();
ExternalSPSContext context = new ExternalSPSContext(spsService,
getNameNodeConnector(conf)); getNameNodeConnector(conf));
ExternalBlockMovementListener blkMoveListener = ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener(); new ExternalBlockMovementListener();
ExternalSPSBlockMoveTaskHandler externalHandler = ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(conf, nnc, new ExternalSPSBlockMoveTaskHandler(conf, nnc,
blkMgr.getSPSManager().getInternalSPSService()); externalSps);
externalHandler.init(); externalHandler.init();
spsService.init(context, externalSps.init(externalCtxt,
new ExternalSPSFileIDCollector(context, new ExternalSPSFilePathCollector(externalSps), externalHandler,
blkMgr.getSPSManager().getInternalSPSService()), blkMoveListener);
externalHandler, blkMoveListener); externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL);
spsService.start(true, StoragePolicySatisfierMode.EXTERNAL);
return cluster; return cluster;
} }
public void restartNamenode() throws IOException{ public void restartNamenode() throws IOException{
BlockManager blkMgr = getCluster().getNameNode().getNamesystem() if (externalSps != null) {
.getBlockManager(); externalSps.stopGracefully();
SPSService spsService = blkMgr.getSPSManager().getInternalSPSService(); }
spsService.stopGracefully();
getCluster().restartNameNodes(); getCluster().restartNameNodes();
getCluster().waitActive(); getCluster().waitActive();
blkMgr = getCluster().getNameNode().getNamesystem() externalSps = new StoragePolicySatisfier<>(getConf());
.getBlockManager();
spsService = blkMgr.getSPSManager().getInternalSPSService();
spsService.stopGracefully();
ExternalSPSContext context = new ExternalSPSContext(spsService, externalCtxt = new ExternalSPSContext(externalSps,
getNameNodeConnector(getConf())); getNameNodeConnector(getConf()));
ExternalBlockMovementListener blkMoveListener = ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener(); new ExternalBlockMovementListener();
ExternalSPSBlockMoveTaskHandler externalHandler = ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(getConf(), nnc, new ExternalSPSBlockMoveTaskHandler(getConf(), nnc,
blkMgr.getSPSManager().getInternalSPSService()); externalSps);
externalHandler.init(); externalHandler.init();
spsService.init(context, externalSps.init(externalCtxt,
new ExternalSPSFileIDCollector(context, new ExternalSPSFilePathCollector(externalSps), externalHandler,
blkMgr.getSPSManager().getInternalSPSService()), blkMoveListener);
externalHandler, blkMoveListener); externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL);
spsService.start(true, StoragePolicySatisfierMode.EXTERNAL);
}
@Override
public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps,
Context ctxt) {
return new ExternalSPSFileIDCollector(ctxt, sps);
} }
private class ExternalBlockMovementListener implements BlockMovementListener { private class ExternalBlockMovementListener implements BlockMovementListener {
@ -204,7 +200,7 @@ public class TestExternalStoragePolicySatisfier
throws IOException { throws IOException {
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf); final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, namenodes.size()); Assert.assertEquals(1, namenodes.size());
final Path externalSPSPathId = new Path("/system/tmp.id"); final Path externalSPSPathId = HdfsServerConstants.MOVER_ID_PATH;
NameNodeConnector.checkOtherInstanceRunning(false); NameNodeConnector.checkOtherInstanceRunning(false);
while (true) { while (true) {
try { try {
@ -222,6 +218,40 @@ public class TestExternalStoragePolicySatisfier
} }
} }
public void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
int timeout) throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
expectedBlkMovAttemptedCount,
((BlockStorageMovementAttemptedItems<String>) (externalSps
.getAttemptedItemsMonitor())).getAttemptedItemsCount());
return ((BlockStorageMovementAttemptedItems<String>) (externalSps
.getAttemptedItemsMonitor()))
.getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
}
}, 100, timeout);
}
public void waitForBlocksMovementAttemptReport(
long expectedMovementFinishedBlocksCount, int timeout)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
expectedMovementFinishedBlocksCount,
((BlockStorageMovementAttemptedItems<String>) (externalSps
.getAttemptedItemsMonitor())).getMovementFinishedBlocksCount());
return ((BlockStorageMovementAttemptedItems<String>) (externalSps
.getAttemptedItemsMonitor()))
.getMovementFinishedBlocksCount()
>= expectedMovementFinishedBlocksCount;
}
}, 100, timeout);
}
private void initSecureConf(Configuration conf) throws Exception { private void initSecureConf(Configuration conf) throws Exception {
String username = "externalSPS"; String username = "externalSPS";
baseDir = GenericTestUtils baseDir = GenericTestUtils
@ -321,10 +351,6 @@ public class TestExternalStoragePolicySatisfier
List<String> files = new ArrayList<>(); List<String> files = new ArrayList<>();
files.add(FILE); files.add(FILE);
DistributedFileSystem fs = getFS(); DistributedFileSystem fs = getFS();
BlockManager blkMgr = getCluster().getNameNode().getNamesystem()
.getBlockManager();
SPSService spsService = blkMgr.getSPSManager().getInternalSPSService();
spsService.stopGracefully(); // stops SPS
// Creates 4 more files. Send all of them for satisfying the storage // Creates 4 more files. Send all of them for satisfying the storage
// policy together. // policy together.
@ -366,6 +392,28 @@ public class TestExternalStoragePolicySatisfier
} }
} }
/**
* Tests to verify that SPS should be able to start when the Mover ID file
* is not being hold by a Mover. This can be the case when Mover exits
* ungracefully without deleting the ID file from HDFS.
*/
@Test(timeout = 300000)
public void testWhenMoverExitsWithoutDeleteMoverIDFile()
throws IOException {
try {
createCluster();
// Simulate the case by creating MOVER_ID file
DFSTestUtil.createFile(getCluster().getFileSystem(),
HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0);
restartNamenode();
boolean running = externalCtxt.isRunning();
Assert.assertTrue("SPS should be running as "
+ "no Mover really running", running);
} finally {
shutdownCluster();
}
}
/** /**
* This test need not run as external scan is not a batch based scanning right * This test need not run as external scan is not a batch based scanning right
* now. * now.
@ -389,4 +437,20 @@ public class TestExternalStoragePolicySatisfier
@Ignore("Status is not supported for external SPS. So, ignoring it.") @Ignore("Status is not supported for external SPS. So, ignoring it.")
public void testMaxRetryForFailedBlock() throws Exception { public void testMaxRetryForFailedBlock() throws Exception {
} }
/**
* This test is specific to internal SPS. So, ignoring it.
*/
@Ignore("This test is specific to internal SPS. So, ignoring it.")
@Override
public void testTraverseWhenParentDeleted() throws Exception {
}
/**
* This test is specific to internal SPS. So, ignoring it.
*/
@Ignore("This test is specific to internal SPS. So, ignoring it.")
@Override
public void testTraverseWhenRootParentDeleted() throws Exception {
}
} }