HDFS-13076: [SPS]: Cleanup work for HDFS-10285 merge. Contributed by Rakesh R.

This commit is contained in:
Uma Maheswara Rao G 2018-07-23 16:05:35 -07:00 committed by Uma Maheswara Rao Gangumalla
parent dfcb331ba3
commit 39ed3a66db
48 changed files with 1517 additions and 4278 deletions

View File

@ -123,7 +123,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
@ -3110,10 +3109,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
public boolean isInternalSatisfierRunning() throws IOException {
return namenode.isInternalSatisfierRunning();
}
Tracer getTracer() {
return tracer;
}
@ -3170,25 +3165,4 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
checkOpen();
return new OpenFilesIterator(namenode, tracer, openFilesTypes, path);
}
/**
* Check the storage policy satisfy status of the path for which
* {@link DFSClient#satisfyStoragePolicy(String)} is called.
*
* @return Storage policy satisfy status.
* <ul>
* <li>PENDING if path is in queue and not processed for satisfying
* the policy.</li>
* <li>IN_PROGRESS if satisfying the storage policy for path.</li>
* <li>SUCCESS if storage policy satisfied for the path.</li>
* <li>NOT_AVAILABLE if
* {@link DFSClient#satisfyStoragePolicy(String)} not called for
* path or SPS work is already finished.</li>
* </ul>
* @throws IOException
*/
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
return namenode.checkStoragePolicySatisfyPathStatus(path);
}
}

View File

@ -45,7 +45,6 @@ import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
@ -1757,32 +1756,4 @@ public interface ClientProtocol {
*/
@AtMostOnce
void satisfyStoragePolicy(String path) throws IOException;
/**
* Check if internal StoragePolicySatisfier is running.
* @return true if internal StoragePolicySatisfier is running
* @throws IOException
*/
@Idempotent
boolean isInternalSatisfierRunning() throws IOException;
/**
* Check the storage policy satisfy status of the path for which
* {@link ClientProtocol#satisfyStoragePolicy(String)} is called.
*
* @return Storage policy satisfy status.
* <ul>
* <li>PENDING if path is in queue and not processed for satisfying
* the policy.</li>
* <li>IN_PROGRESS if satisfying the storage policy for path.</li>
* <li>SUCCESS if storage policy satisfied for the path.</li>
* <li>NOT_AVAILABLE if
* {@link ClientProtocol#satisfyStoragePolicy(String)} not called for
* path or SPS work is already finished.</li>
* </ul>
* @throws IOException
*/
@Idempotent
StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException;
}

View File

@ -133,12 +133,6 @@ public final class HdfsConstants {
*/
public enum StoragePolicySatisfierMode {
/**
* This mode represents that SPS service is running inside Namenode and can
* accept any SPS call request.
*/
INTERNAL,
/**
* This mode represents that SPS service is running outside Namenode as an
* external service and can accept any SPS call request.
@ -166,40 +160,6 @@ public final class HdfsConstants {
}
}
/**
* Storage policy satisfy path status.
*/
public enum StoragePolicySatisfyPathStatus {
/**
* Scheduled but not yet processed. This will come only in case of
* directory. Directory will be added first in "pendingWorkForDirectory"
* queue and then later it is processed recursively.
*/
PENDING,
/**
* Satisfying the storage policy for path.
*/
IN_PROGRESS,
/**
* Storage policy satisfied for the path.
*/
SUCCESS,
/**
* Few blocks failed to move and the path is still not
* fully satisfied the storage policy.
*/
FAILURE,
/**
* Status not available.
*/
NOT_AVAILABLE
}
public enum RollingUpgradeAction {
QUERY, PREPARE, FINALIZE;

View File

@ -70,7 +70,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
@ -101,8 +100,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Append
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto;
@ -150,8 +147,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSto
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
@ -301,9 +296,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
private final static GetErasureCodingCodecsRequestProto
VOID_GET_EC_CODEC_REQUEST = GetErasureCodingCodecsRequestProto
.newBuilder().build();
private final static IsInternalSatisfierRunningRequestProto
VOID_IS_SPS_RUNNING_REQUEST = IsInternalSatisfierRunningRequestProto
.newBuilder().build();
public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) {
@ -1911,18 +1903,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
}
@Override
public boolean isInternalSatisfierRunning() throws IOException {
try {
IsInternalSatisfierRunningResponseProto rep =
rpcProxy.isInternalSatisfierRunning(null,
VOID_IS_SPS_RUNNING_REQUEST);
return rep.getRunning();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public QuotaUsage getQuotaUsage(String path) throws IOException {
GetQuotaUsageRequestProto req =
@ -1977,20 +1957,4 @@ public class ClientNamenodeProtocolTranslatorPB implements
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
try {
CheckStoragePolicySatisfyPathStatusRequestProto request =
CheckStoragePolicySatisfyPathStatusRequestProto.newBuilder()
.setSrc(path)
.build();
CheckStoragePolicySatisfyPathStatusResponseProto response = rpcProxy
.checkStoragePolicySatisfyPathStatus(null, request);
return PBHelperClient.convert(response.getStatus());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -130,7 +130,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheF
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolStatsProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusResponseProto.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeStorageReportProto;
@ -3399,40 +3398,4 @@ public class PBHelperClient {
}
return typeProtos;
}
public static StoragePolicySatisfyPathStatus convert(
HdfsConstants.StoragePolicySatisfyPathStatus status) {
switch (status) {
case PENDING:
return StoragePolicySatisfyPathStatus.PENDING;
case IN_PROGRESS:
return StoragePolicySatisfyPathStatus.IN_PROGRESS;
case SUCCESS:
return StoragePolicySatisfyPathStatus.SUCCESS;
case FAILURE:
return StoragePolicySatisfyPathStatus.FAILURE;
case NOT_AVAILABLE:
return StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
default:
throw new IllegalArgumentException("Unexpected SPSStatus :" + status);
}
}
public static HdfsConstants.StoragePolicySatisfyPathStatus convert(
StoragePolicySatisfyPathStatus status) {
switch (status) {
case PENDING:
return HdfsConstants.StoragePolicySatisfyPathStatus.PENDING;
case IN_PROGRESS:
return HdfsConstants.StoragePolicySatisfyPathStatus.IN_PROGRESS;
case SUCCESS:
return HdfsConstants.StoragePolicySatisfyPathStatus.SUCCESS;
case FAILURE:
return HdfsConstants.StoragePolicySatisfyPathStatus.FAILURE;
case NOT_AVAILABLE:
return HdfsConstants.StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
default:
throw new IllegalArgumentException("Unexpected SPSStatus :" + status);
}
}
}

View File

@ -838,28 +838,6 @@ message SatisfyStoragePolicyResponseProto {
}
message IsInternalSatisfierRunningRequestProto { // no parameters
}
message IsInternalSatisfierRunningResponseProto {
required bool running = 1;
}
message CheckStoragePolicySatisfyPathStatusRequestProto { // no parameters
required string src = 1;
}
message CheckStoragePolicySatisfyPathStatusResponseProto {
enum StoragePolicySatisfyPathStatus {
PENDING = 0;
IN_PROGRESS = 1;
SUCCESS = 2;
FAILURE = 3;
NOT_AVAILABLE = 4;
}
required StoragePolicySatisfyPathStatus status = 1;
}
service ClientNamenodeProtocol {
rpc getBlockLocations(GetBlockLocationsRequestProto)
returns(GetBlockLocationsResponseProto);
@ -1048,8 +1026,4 @@ service ClientNamenodeProtocol {
returns(ListOpenFilesResponseProto);
rpc satisfyStoragePolicy(SatisfyStoragePolicyRequestProto)
returns(SatisfyStoragePolicyResponseProto);
rpc isInternalSatisfierRunning(IsInternalSatisfierRunningRequestProto)
returns(IsInternalSatisfierRunningResponseProto);
rpc checkStoragePolicySatisfyPathStatus(CheckStoragePolicySatisfyPathStatusRequestProto)
returns(CheckStoragePolicySatisfyPathStatusResponseProto);
}

View File

@ -87,7 +87,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
@ -2497,19 +2496,6 @@ public class RouterRpcServer extends AbstractService
checkOperation(OperationCategory.WRITE, false);
}
@Override
public boolean isInternalSatisfierRunning() throws IOException {
checkOperation(OperationCategory.READ, false);
return false;
}
@Override
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
checkOperation(OperationCategory.READ, false);
return StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
}
@Override
public Long getNextSPSPath() throws IOException {
checkOperation(OperationCategory.READ, false);

View File

@ -639,10 +639,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.storage.policy.satisfier.retry.max.attempts";
public static final int DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT =
3;
public static final String DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY =
"dfs.storage.policy.satisfier.low.max-streams.preference";
public static final boolean DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_DEFAULT =
true;
public static final String DFS_SPS_MAX_OUTSTANDING_PATHS_KEY =
"dfs.storage.policy.satisfier.max.outstanding.paths";
public static final int DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT = 10000;

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -86,8 +85,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Append
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckStoragePolicySatisfyPathStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
@ -162,8 +159,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFile
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
@ -1864,22 +1859,6 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
}
@Override
public IsInternalSatisfierRunningResponseProto
isInternalSatisfierRunning(RpcController controller,
IsInternalSatisfierRunningRequestProto req)
throws ServiceException {
try {
boolean ret = server.isInternalSatisfierRunning();
IsInternalSatisfierRunningResponseProto.Builder builder =
IsInternalSatisfierRunningResponseProto.newBuilder();
builder.setRunning(ret);
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetQuotaUsageResponseProto getQuotaUsage(
RpcController controller, GetQuotaUsageRequestProto req)
@ -1925,22 +1904,4 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
return VOID_SATISFYSTORAGEPOLICY_RESPONSE;
}
@Override
public CheckStoragePolicySatisfyPathStatusResponseProto
checkStoragePolicySatisfyPathStatus(RpcController controller,
CheckStoragePolicySatisfyPathStatusRequestProto request)
throws ServiceException {
try {
StoragePolicySatisfyPathStatus status = server
.checkStoragePolicySatisfyPathStatus(request.getSrc());
CheckStoragePolicySatisfyPathStatusResponseProto.Builder builder =
CheckStoragePolicySatisfyPathStatusResponseProto
.newBuilder();
builder.setStatus(PBHelperClient.convert(status));
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -42,11 +42,9 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBand
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockMovingInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DropSPSWorkCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
@ -56,11 +54,9 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
@ -102,8 +98,6 @@ import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.Block
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@ -111,7 +105,6 @@ import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DropSPSWorkCommand;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
@ -143,10 +136,6 @@ public class PBHelper {
private static final RegisterCommandProto REG_CMD_PROTO =
RegisterCommandProto.newBuilder().build();
private static final RegisterCommand REG_CMD = new RegisterCommand();
private static final DropSPSWorkCommandProto DROP_SPS_WORK_CMD_PROTO =
DropSPSWorkCommandProto.newBuilder().build();
private static final DropSPSWorkCommand DROP_SPS_WORK_CMD =
new DropSPSWorkCommand();
private PBHelper() {
/** Hidden constructor */
@ -480,10 +469,6 @@ public class PBHelper {
return PBHelper.convert(proto.getBlkIdCmd());
case BlockECReconstructionCommand:
return PBHelper.convert(proto.getBlkECReconstructionCmd());
case BlockStorageMovementCommand:
return PBHelper.convert(proto.getBlkStorageMovementCmd());
case DropSPSWorkCommand:
return DROP_SPS_WORK_CMD;
default:
return null;
}
@ -618,15 +603,6 @@ public class PBHelper {
.setBlkECReconstructionCmd(
convert((BlockECReconstructionCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
builder.setCmdType(DatanodeCommandProto.Type.BlockStorageMovementCommand)
.setBlkStorageMovementCmd(
convert((BlockStorageMovementCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
builder.setCmdType(DatanodeCommandProto.Type.DropSPSWorkCommand)
.setDropSPSWorkCmd(DROP_SPS_WORK_CMD_PROTO);
break;
case DatanodeProtocol.DNA_UNKNOWN: //Not expected
default:
builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);
@ -1148,79 +1124,4 @@ public class PBHelper {
return new FileRegion(block, providedStorageLocation);
}
private static BlockStorageMovementCommandProto convert(
BlockStorageMovementCommand blkStorageMovementCmd) {
BlockStorageMovementCommandProto.Builder builder =
BlockStorageMovementCommandProto.newBuilder();
builder.setBlockPoolId(blkStorageMovementCmd.getBlockPoolId());
Collection<BlockMovingInfo> blockMovingInfos = blkStorageMovementCmd
.getBlockMovingTasks();
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
builder.addBlockMovingInfo(convertBlockMovingInfo(blkMovingInfo));
}
return builder.build();
}
private static BlockMovingInfoProto convertBlockMovingInfo(
BlockMovingInfo blkMovingInfo) {
BlockMovingInfoProto.Builder builder = BlockMovingInfoProto
.newBuilder();
builder.setBlock(PBHelperClient.convert(blkMovingInfo.getBlock()));
DatanodeInfo sourceDnInfo = blkMovingInfo.getSource();
builder.setSourceDnInfo(PBHelperClient.convert(sourceDnInfo));
DatanodeInfo targetDnInfo = blkMovingInfo.getTarget();
builder.setTargetDnInfo(PBHelperClient.convert(targetDnInfo));
StorageType sourceStorageType = blkMovingInfo.getSourceStorageType();
builder.setSourceStorageType(
PBHelperClient.convertStorageType(sourceStorageType));
StorageType targetStorageType = blkMovingInfo.getTargetStorageType();
builder.setTargetStorageType(
PBHelperClient.convertStorageType(targetStorageType));
return builder.build();
}
private static DatanodeCommand convert(
BlockStorageMovementCommandProto blkStorageMovementCmdProto) {
Collection<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
List<BlockMovingInfoProto> blkSPSatisfyList =
blkStorageMovementCmdProto.getBlockMovingInfoList();
for (BlockMovingInfoProto blkSPSatisfy : blkSPSatisfyList) {
blockMovingInfos.add(convertBlockMovingInfo(blkSPSatisfy));
}
return new BlockStorageMovementCommand(
DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT,
blkStorageMovementCmdProto.getBlockPoolId(), blockMovingInfos);
}
private static BlockMovingInfo convertBlockMovingInfo(
BlockMovingInfoProto blockStorageMovingInfoProto) {
BlockProto blockProto = blockStorageMovingInfoProto.getBlock();
Block block = PBHelperClient.convert(blockProto);
DatanodeInfoProto sourceDnInfoProto = blockStorageMovingInfoProto
.getSourceDnInfo();
DatanodeInfo sourceDnInfo = PBHelperClient.convert(sourceDnInfoProto);
DatanodeInfoProto targetDnInfoProto = blockStorageMovingInfoProto
.getTargetDnInfo();
DatanodeInfo targetDnInfo = PBHelperClient.convert(targetDnInfoProto);
StorageTypeProto srcStorageTypeProto = blockStorageMovingInfoProto
.getSourceStorageType();
StorageType srcStorageType = PBHelperClient
.convertStorageType(srcStorageTypeProto);
StorageTypeProto targetStorageTypeProto = blockStorageMovingInfoProto
.getTargetStorageType();
StorageType targetStorageType = PBHelperClient
.convertStorageType(targetStorageTypeProto);
return new BlockMovingInfo(block, sourceDnInfo, targetDnInfo,
srcStorageType, targetStorageType);
}
}

View File

@ -30,8 +30,7 @@ public enum ExitStatus {
IO_EXCEPTION(-4),
ILLEGAL_ARGUMENTS(-5),
INTERRUPTED(-6),
UNFINALIZED_UPGRADE(-7),
SKIPPED_DUE_TO_SPS(-8);
UNFINALIZED_UPGRADE(-7);
private final int code;

View File

@ -93,7 +93,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@ -719,9 +718,6 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager.close();
pendingReconstruction.stop();
blocksMap.close();
if (getSPSManager() != null) {
getSPSManager().stopGracefully();
}
}
/** @return the datanodeManager */
@ -3889,21 +3885,6 @@ public class BlockManager implements BlockStatsMXBean {
}
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode);
// notify SPS about the reported block
notifyStorageMovementAttemptFinishedBlk(storageInfo, block);
}
private void notifyStorageMovementAttemptFinishedBlk(
DatanodeStorageInfo storageInfo, Block block) {
if (getSPSManager() != null) {
SPSService sps = getSPSManager().getInternalSPSService();
if (sps.isRunning()) {
sps.notifyStorageMovementAttemptFinishedBlk(
storageInfo.getDatanodeDescriptor(), storageInfo.getStorageType(),
block);
}
}
}
private void processAndHandleReportedBlock(
@ -5088,7 +5069,7 @@ public class BlockManager implements BlockStatsMXBean {
LOG.info("Storage policy satisfier is disabled");
return false;
}
spsManager = new StoragePolicySatisfyManager(conf, namesystem, this);
spsManager = new StoragePolicySatisfyManager(conf, namesystem);
return true;
}

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -207,14 +206,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
private final LightWeightHashSet<Block> invalidateBlocks =
new LightWeightHashSet<>();
/**
* A queue of blocks corresponding to trackID for moving its storage
* placements by this datanode.
*/
private final BlockQueue<BlockMovingInfo> storageMovementBlocks =
new BlockQueue<>();
private volatile boolean dropSPSWork = false;
/* Variables for maintaining number of blocks scheduled to be written to
* this storage. This count is approximate and might be slightly bigger
* in case of errors (e.g. datanode does not report if an error occurs
@ -369,7 +360,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
this.pendingCached.clear();
this.cached.clear();
this.pendingUncached.clear();
this.storageMovementBlocks.clear();
}
public int numBlocks() {
@ -1075,62 +1065,4 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
return false;
}
/**
* Add the block infos which needs to move its storage locations.
*
* @param blkMovingInfo
* - storage mismatched block info
*/
public void addBlocksToMoveStorage(BlockMovingInfo blkMovingInfo) {
storageMovementBlocks.offer(blkMovingInfo);
BlockManager.LOG
.debug("Adding block move task " + blkMovingInfo + " to " + getName()
+ ", current queue size is " + storageMovementBlocks.size());
}
/**
* Return the number of blocks queued up for movement.
*/
public int getNumberOfBlocksToMoveStorages() {
return storageMovementBlocks.size();
}
/**
* Get the blocks to move to satisfy the storage media type.
*
* @param numBlocksToMoveTasks
* total number of blocks which will be send to this datanode for
* block movement.
*
* @return block infos which needs to move its storage locations or null if
* there is no block infos to move.
*/
public BlockMovingInfo[] getBlocksToMoveStorages(int numBlocksToMoveTasks) {
List<BlockMovingInfo> blockMovingInfos = storageMovementBlocks
.poll(numBlocksToMoveTasks);
if (blockMovingInfos == null || blockMovingInfos.size() <= 0) {
return null;
}
BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos
.size()];
return blockMovingInfos.toArray(blkMoveArray);
}
/**
* Set whether to drop SPS related queues at DN side.
*
* @param dropSPSWork
* - true if need to drop SPS queues, otherwise false.
*/
public synchronized void setDropSPSWork(boolean dropSPSWork) {
this.dropSPSWork = dropSPSWork;
}
/**
* @return true if need to drop SPS queues at DN.
*/
public synchronized boolean shouldDropSPSWork() {
return this.dropSPSWork;
}
}

View File

@ -49,7 +49,6 @@ import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
@ -210,8 +209,6 @@ public class DatanodeManager {
*/
private final long timeBetweenResendingCachingDirectivesMs;
private final boolean blocksToMoveLowPriority;
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
@ -336,12 +333,6 @@ public class DatanodeManager {
this.blocksPerPostponedMisreplicatedBlocksRescan = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY,
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT);
// SPS configuration to decide blocks to move can share equal ratio of
// maxtransfers with pending replica and erasure-coded reconstruction tasks
blocksToMoveLowPriority = conf.getBoolean(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_DEFAULT);
}
private static long getStaleIntervalFromConf(Configuration conf,
@ -1101,19 +1092,6 @@ public class DatanodeManager {
nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());
nodeS.setDisallowed(false); // Node is in the include list
// Sets dropSPSWork flag to true, to ensure that
// DNA_DROP_SPS_WORK_COMMAND will send to datanode via next heartbeat
// response immediately after the node registration. This is
// to avoid a situation, where multiple block attempt finished
// responses coming from different datanodes. After SPS monitor time
// out, it will retry the files which were scheduled to the
// disconnected(for long time more than heartbeat expiry) DN, by
// finding new datanode. Now, if the expired datanode reconnects back
// after SPS reschedules, it leads to get different movement attempt
// finished report from reconnected and newly datanode which is
// attempting the block movement.
nodeS.setDropSPSWork(true);
// resolve network location
if(this.rejectUnresolvedTopologyDN) {
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
@ -1691,47 +1669,18 @@ public class DatanodeManager {
final List<DatanodeCommand> cmds = new ArrayList<>();
// Allocate _approximately_ maxTransfers pending tasks to DataNode.
// NN chooses pending tasks based on the ratio between the lengths of
// replication, erasure-coded block queues and block storage movement
// queues.
// replication and erasure-coded block queues.
int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks();
int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();
int totalBlocksToMove = nodeinfo.getNumberOfBlocksToMoveStorages();
int totalBlocks = totalReplicateBlocks + totalECBlocks;
if (totalBlocks > 0 || totalBlocksToMove > 0) {
int numReplicationTasks = 0;
int numECTasks = 0;
int numBlocksToMoveTasks = 0;
// Check blocksToMoveLowPriority configuration is true/false. If false,
// then equally sharing the max transfer. Otherwise gives high priority to
// the pending_replica/erasure-coded tasks and only the delta streams will
// be used for blocks to move tasks.
if (!blocksToMoveLowPriority) {
// add blocksToMove count to total blocks so that will get equal share
totalBlocks = totalBlocks + totalBlocksToMove;
numReplicationTasks = (int) Math
.ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
numECTasks = (int) Math
.ceil((double) (totalECBlocks * maxTransfers) / totalBlocks);
numBlocksToMoveTasks = (int) Math
.ceil((double) (totalBlocksToMove * maxTransfers) / totalBlocks);
} else {
// Calculate the replica and ec tasks, then pick blocksToMove if there
// is any streams available.
numReplicationTasks = (int) Math
.ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
numECTasks = (int) Math
.ceil((double) (totalECBlocks * maxTransfers) / totalBlocks);
int numTasks = numReplicationTasks + numECTasks;
if (numTasks < maxTransfers) {
int remainingMaxTransfers = maxTransfers - numTasks;
numBlocksToMoveTasks = Math.min(totalBlocksToMove,
remainingMaxTransfers);
}
}
if (totalBlocks > 0) {
int numReplicationTasks = (int) Math.ceil(
(double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
int numECTasks = (int) Math.ceil(
(double) (totalECBlocks * maxTransfers) / totalBlocks);
if (LOG.isDebugEnabled()) {
LOG.debug("Pending replication tasks: " + numReplicationTasks
+ " erasure-coded tasks: " + numECTasks + " blocks to move tasks: "
+ numBlocksToMoveTasks);
+ " erasure-coded tasks: " + numECTasks);
}
// check pending replication tasks
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
@ -1747,23 +1696,6 @@ public class DatanodeManager {
cmds.add(new BlockECReconstructionCommand(
DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
}
// check pending block storage movement tasks
if (nodeinfo.shouldDropSPSWork()) {
cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
// Set back to false to indicate that the new value has been sent to the
// datanode.
nodeinfo.setDropSPSWork(false);
} else {
// Get pending block storage movement tasks
BlockMovingInfo[] blkStorageMovementInfos = nodeinfo
.getBlocksToMoveStorages(numBlocksToMoveTasks);
if (blkStorageMovementInfos != null) {
cmds.add(new BlockStorageMovementCommand(
DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, blockPoolId,
Arrays.asList(blkStorageMovementInfos)));
}
}
}
// check block invalidation
@ -2036,18 +1968,6 @@ public class DatanodeManager {
slowDiskTracker.getSlowDiskReportAsJsonString() : null;
}
/**
* Mark all DNs to drop SPS queues. A DNA_DROP_SPS_WORK_COMMAND will be added
* in heartbeat response, which will indicate DN to drop SPS queues
*/
public void addDropSPSWorkCommandsToAllDNs() {
synchronized (this) {
for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.setDropSPSWork(true);
}
}
}
/**
* Generates datanode reports for the given report type.
*

View File

@ -795,16 +795,6 @@ class BPOfferService {
((BlockECReconstructionCommand) cmd).getECTasks();
dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
break;
case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
LOG.info("DatanodeCommand action: DNA_BLOCK_STORAGE_MOVEMENT");
BlockStorageMovementCommand blkSPSCmd = (BlockStorageMovementCommand) cmd;
dn.getStoragePolicySatisfyWorker().processBlockMovingTasks(
blkSPSCmd.getBlockPoolId(), blkSPSCmd.getBlockMovingTasks());
break;
case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
LOG.info("DatanodeCommand action: DNA_DROP_SPS_WORK_COMMAND");
dn.getStoragePolicySatisfyWorker().dropSPSWork();
break;
default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
}
@ -835,8 +825,6 @@ class BPOfferService {
case DatanodeProtocol.DNA_CACHE:
case DatanodeProtocol.DNA_UNCACHE:
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
break;
default:

View File

@ -386,7 +386,6 @@ public class DataNode extends ReconfigurableBase
private String dnUserName = null;
private BlockRecoveryWorker blockRecoveryWorker;
private ErasureCodingWorker ecWorker;
private StoragePolicySatisfyWorker storagePolicySatisfyWorker;
private final Tracer tracer;
private final TracerConfigurationManager tracerConfigurationManager;
private static final int NUM_CORES = Runtime.getRuntime()
@ -1426,9 +1425,6 @@ public class DataNode extends ReconfigurableBase
ecWorker = new ErasureCodingWorker(getConf(), this);
blockRecoveryWorker = new BlockRecoveryWorker(this);
storagePolicySatisfyWorker =
new StoragePolicySatisfyWorker(getConf(), this, null);
storagePolicySatisfyWorker.start();
blockPoolManager = new BlockPoolManager(this);
blockPoolManager.refreshNamenodes(getConf());
@ -1981,10 +1977,6 @@ public class DataNode extends ReconfigurableBase
}
}
// stop storagePolicySatisfyWorker
if (storagePolicySatisfyWorker != null) {
storagePolicySatisfyWorker.stop();
}
List<BPOfferService> bposArray = (this.blockPoolManager == null)
? new ArrayList<BPOfferService>()
: this.blockPoolManager.getAllNamenodeThreads();
@ -3624,8 +3616,4 @@ public class DataNode extends ReconfigurableBase
}
return this.diskBalancer;
}
StoragePolicySatisfyWorker getStoragePolicySatisfyWorker() {
return storagePolicySatisfyWorker;
}
}

View File

@ -1,217 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* StoragePolicySatisfyWorker handles the storage policy satisfier commands.
* These commands would be issued from NameNode as part of Datanode's heart beat
* response. BPOfferService delegates the work to this class for handling
* BlockStorageMovement commands.
*/
@InterfaceAudience.Private
public class StoragePolicySatisfyWorker {
private static final Logger LOG = LoggerFactory
.getLogger(StoragePolicySatisfyWorker.class);
private final DataNode datanode;
private final int moverThreads;
private final ExecutorService moveExecutor;
private final CompletionService<BlockMovementAttemptFinished>
moverCompletionService;
private final BlockStorageMovementTracker movementTracker;
private Daemon movementTrackerThread;
private final BlockDispatcher blkDispatcher;
public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode,
BlocksMovementsStatusHandler handler) {
this.datanode = datanode;
// Defaulting to 10. This is to minimize the number of move ops.
moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 10);
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
moverCompletionService = new ExecutorCompletionService<>(moveExecutor);
movementTracker = new BlockStorageMovementTracker(moverCompletionService,
handler);
movementTrackerThread = new Daemon(movementTracker);
movementTrackerThread.setName("BlockStorageMovementTracker");
DNConf dnConf = datanode.getDnConf();
int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
blkDispatcher = new BlockDispatcher(dnConf.getSocketTimeout(),
ioFileBufferSize, dnConf.getConnectToDnViaHostname());
}
/**
* Start StoragePolicySatisfyWorker, which will start block movement tracker
* thread to track the completion of block movements.
*/
void start() {
movementTrackerThread.start();
}
/**
* Stop StoragePolicySatisfyWorker, which will terminate executor service and
* stop block movement tracker thread.
*/
void stop() {
movementTracker.stopTracking();
movementTrackerThread.interrupt();
moveExecutor.shutdown();
try {
moveExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for mover thread to terminate", e);
}
}
private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) {
LOG.debug("Block mover to satisfy storage policy; pool threads={}", num);
ThreadPoolExecutor moverThreadPool = new ThreadPoolExecutor(1, num, 60,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("BlockMoverTask-" + threadIndex.getAndIncrement());
return t;
}
});
moverThreadPool.allowCoreThreadTimeOut(true);
return moverThreadPool;
}
/**
* Handles the given set of block movement tasks. This will iterate over the
* block movement list and submit each block movement task asynchronously in a
* separate thread. Each task will move the block replica to the target node &
* wait for the completion.
*
* @param blockPoolID block pool identifier
*
* @param blockMovingInfos
* list of blocks to be moved
*/
public void processBlockMovingTasks(final String blockPoolID,
final Collection<BlockMovingInfo> blockMovingInfos) {
LOG.debug("Received BlockMovingTasks {}", blockMovingInfos);
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
StorageType sourceStorageType = blkMovingInfo.getSourceStorageType();
StorageType targetStorageType = blkMovingInfo.getTargetStorageType();
assert sourceStorageType != targetStorageType
: "Source and Target storage type shouldn't be same!";
BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID,
blkMovingInfo);
moverCompletionService.submit(blockMovingTask);
}
}
/**
* This class encapsulates the process of moving the block replica to the
* given target and wait for the response.
*/
private class BlockMovingTask implements
Callable<BlockMovementAttemptFinished> {
private final String blockPoolID;
private final BlockMovingInfo blkMovingInfo;
BlockMovingTask(String blockPoolID, BlockMovingInfo blkMovInfo) {
this.blockPoolID = blockPoolID;
this.blkMovingInfo = blkMovInfo;
}
@Override
public BlockMovementAttemptFinished call() {
BlockMovementStatus status = moveBlock();
return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(),
blkMovingInfo.getSource(), blkMovingInfo.getTarget(),
blkMovingInfo.getTargetStorageType(), status);
}
private BlockMovementStatus moveBlock() {
datanode.incrementXmitsInProgress();
ExtendedBlock eb = new ExtendedBlock(blockPoolID,
blkMovingInfo.getBlock());
try {
Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
eb, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
new StorageType[]{blkMovingInfo.getTargetStorageType()},
new String[0]);
DataEncryptionKeyFactory keyFactory = datanode
.getDataEncryptionKeyFactoryForBlock(eb);
return blkDispatcher.moveBlock(blkMovingInfo,
datanode.getSaslClient(), eb, datanode.newSocket(),
keyFactory, accessToken);
} catch (IOException e) {
// TODO: handle failure retries
LOG.warn(
"Failed to move block:{} from src:{} to destin:{} to satisfy "
+ "storageType:{}",
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
} finally {
datanode.decrementXmitsInProgress();
}
}
}
/**
* Drop the in-progress SPS work queues.
*/
public void dropSPSWork() {
LOG.info("Received request to drop StoragePolicySatisfierWorker queues. "
+ "So, none of the SPS Worker queued block movements will"
+ " be scheduled.");
}
}

View File

@ -48,8 +48,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.SecurityUtil;
@ -658,25 +656,6 @@ public class Mover {
final Mover m = new Mover(nnc, conf, retryCount,
excludedPinnedBlocks);
boolean spsRunning;
try {
spsRunning = nnc.getDistributedFileSystem().getClient()
.isInternalSatisfierRunning();
} catch (RemoteException e) {
IOException cause = e.unwrapRemoteException();
if (cause instanceof StandbyException) {
System.err.println("Skip Standby Namenode. " + nnc.toString());
continue;
}
throw e;
}
if (spsRunning) {
System.err.println("Mover failed due to StoragePolicySatisfier"
+ " service running inside namenode. Exiting with status "
+ ExitStatus.SKIPPED_DUE_TO_SPS + "... ");
return ExitStatus.SKIPPED_DUE_TO_SPS.getExitCode();
}
final ExitStatus r = m.run();
if (r == ExitStatus.SUCCESS) {

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import org.apache.hadoop.security.AccessControlException;
import java.io.FileNotFoundException;
@ -207,17 +206,6 @@ class FSDirXAttrOp {
List<XAttr> newXAttrs = filterINodeXAttrs(existingXAttrs, toRemove,
removedXAttrs);
if (existingXAttrs.size() != newXAttrs.size()) {
for (XAttr xattr : toRemove) {
if (XATTR_SATISFY_STORAGE_POLICY
.equals(XAttrHelper.getPrefixedName(xattr))) {
StoragePolicySatisfyManager spsManager =
fsd.getBlockManager().getSPSManager();
if (spsManager != null) {
spsManager.getInternalSPSService().clearQueue(inode.getId());
}
break;
}
}
XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
return removedXAttrs;
}

View File

@ -209,7 +209,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -1363,9 +1362,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// Don't want to keep replication queues when not in Active.
blockManager.clearQueues();
blockManager.setInitializedReplQueues(false);
if (blockManager.getSPSManager() != null) {
blockManager.getSPSManager().stopGracefully();
}
}
} finally {
writeUnlock("stopActiveServices");
@ -2275,9 +2271,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
// checks sps status
boolean disabled = (blockManager.getSPSManager() == null);
if (disabled || (blockManager
.getSPSManager().getMode() == StoragePolicySatisfierMode.INTERNAL
&& !blockManager.getSPSManager().isInternalSatisfierRunning())) {
if (disabled) {
throw new UnsupportedActionException(
"Cannot request to satisfy storage policy "
+ "when storage policy satisfier feature has been disabled"

View File

@ -111,7 +111,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -2533,41 +2532,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
return result;
}
@Override
public boolean isInternalSatisfierRunning() throws IOException {
checkNNStartup();
String operationName = "isInternalSatisfierRunning";
namesystem.checkSuperuserPrivilege(operationName);
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
StoragePolicySatisfyManager spsMgr =
namesystem.getBlockManager().getSPSManager();
boolean isInternalSatisfierRunning = (spsMgr != null
? spsMgr.isInternalSatisfierRunning() : false);
namesystem.logAuditEvent(true, operationName, null);
return isInternalSatisfierRunning;
}
@Override
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
checkNNStartup();
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
if (namesystem.getBlockManager().getSPSManager() == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Satisfier is not running inside namenode, so status "
+ "can't be returned.");
}
throw new IOException("Satisfier is not running inside namenode, "
+ "so status can't be returned.");
}
return namesystem.getBlockManager().getSPSManager()
.checkStoragePolicySatisfyPathStatus(path);
}
@Override
public Long getNextSPSPath() throws IOException {
checkNNStartup();

View File

@ -23,14 +23,10 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -60,9 +56,6 @@ public class BlockStorageMovementNeeded {
private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
new HashMap<>();
private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus =
new ConcurrentHashMap<>();
private final Context ctxt;
private Daemon pathIdCollector;
@ -86,9 +79,6 @@ public class BlockStorageMovementNeeded {
* - track info for satisfy the policy
*/
public synchronized void add(ItemInfo trackInfo) {
spsStatus.put(trackInfo.getFile(),
new StoragePolicySatisfyPathStatusInfo(
StoragePolicySatisfyPathStatus.IN_PROGRESS));
storageMovementNeeded.add(trackInfo);
}
@ -129,7 +119,7 @@ public class BlockStorageMovementNeeded {
if (itemInfo.getStartPath() == itemInfo.getFile()) {
return;
}
updatePendingDirScanStats(itemInfo.getFile(), 1, scanCompleted);
updatePendingDirScanStats(itemInfo.getStartPath(), 1, scanCompleted);
}
private void updatePendingDirScanStats(long startPath, int numScannedFiles,
@ -181,7 +171,6 @@ public class BlockStorageMovementNeeded {
if (!ctxt.isFileExist(startId)) {
// directory deleted just remove it.
this.pendingWorkForDirectory.remove(startId);
updateStatus(startId, isSuccess);
} else {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
if (pendingWork != null) {
@ -189,17 +178,13 @@ public class BlockStorageMovementNeeded {
if (pendingWork.isDirWorkDone()) {
ctxt.removeSPSHint(startId);
pendingWorkForDirectory.remove(startId);
pendingWork.setFailure(!isSuccess);
updateStatus(startId, pendingWork.isPolicySatisfied());
}
pendingWork.setFailure(isSuccess);
}
}
} else {
// Remove xAttr if trackID doesn't exist in
// storageMovementAttemptedItems or file policy satisfied.
ctxt.removeSPSHint(trackInfo.getFile());
updateStatus(trackInfo.getFile(), isSuccess);
}
}
@ -215,24 +200,6 @@ public class BlockStorageMovementNeeded {
pendingWorkForDirectory.remove(trackId);
}
/**
* Mark inode status as SUCCESS in map.
*/
private void updateStatus(long startId, boolean isSuccess){
StoragePolicySatisfyPathStatusInfo spsStatusInfo =
spsStatus.get(startId);
if (spsStatusInfo == null) {
spsStatusInfo = new StoragePolicySatisfyPathStatusInfo();
spsStatus.put(startId, spsStatusInfo);
}
if (isSuccess) {
spsStatusInfo.setSuccess();
} else {
spsStatusInfo.setFailure();
}
}
/**
* Clean all the movements in spsDirsToBeTraveresed/storageMovementNeeded
* and notify to clean up required resources.
@ -277,7 +244,6 @@ public class BlockStorageMovementNeeded {
@Override
public void run() {
LOG.info("Starting SPSPathIdProcessor!.");
long lastStatusCleanTime = 0;
Long startINode = null;
while (ctxt.isRunning()) {
try {
@ -289,9 +255,6 @@ public class BlockStorageMovementNeeded {
// Waiting for SPS path
Thread.sleep(3000);
} else {
spsStatus.put(startINode,
new StoragePolicySatisfyPathStatusInfo(
StoragePolicySatisfyPathStatus.IN_PROGRESS));
ctxt.scanAndCollectFiles(startINode);
// check if directory was empty and no child added to queue
DirPendingWorkInfo dirPendingWorkInfo =
@ -300,15 +263,8 @@ public class BlockStorageMovementNeeded {
&& dirPendingWorkInfo.isDirWorkDone()) {
ctxt.removeSPSHint(startINode);
pendingWorkForDirectory.remove(startINode);
updateStatus(startINode, true);
}
}
//Clear the SPS status if status is in SUCCESS more than 5 min.
if (Time.monotonicNow()
- lastStatusCleanTime > statusClearanceElapsedTimeMs) {
lastStatusCleanTime = Time.monotonicNow();
cleanSPSStatus();
}
startINode = null; // Current inode successfully scanned.
}
} catch (Throwable t) {
@ -328,16 +284,6 @@ public class BlockStorageMovementNeeded {
}
}
}
private synchronized void cleanSPSStatus() {
for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it =
spsStatus.entrySet().iterator(); it.hasNext();) {
Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next();
if (entry.getValue().canRemove()) {
it.remove();
}
}
}
}
/**
@ -347,7 +293,6 @@ public class BlockStorageMovementNeeded {
private int pendingWorkCount = 0;
private boolean fullyScanned = false;
private boolean success = true;
/**
* Increment the pending work count for directory.
@ -378,20 +323,6 @@ public class BlockStorageMovementNeeded {
public synchronized void markScanCompleted() {
this.fullyScanned = true;
}
/**
* Return true if all the files block movement is success, otherwise false.
*/
public boolean isPolicySatisfied() {
return success;
}
/**
* Set directory SPS status failed.
*/
public void setFailure(boolean failure) {
this.success = this.success || failure;
}
}
public void activate() {
@ -406,56 +337,6 @@ public class BlockStorageMovementNeeded {
}
}
/**
* Represent the file/directory block movement status.
*/
static class StoragePolicySatisfyPathStatusInfo {
private StoragePolicySatisfyPathStatus status =
StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
private long lastStatusUpdateTime;
StoragePolicySatisfyPathStatusInfo() {
this.lastStatusUpdateTime = 0;
}
StoragePolicySatisfyPathStatusInfo(StoragePolicySatisfyPathStatus status) {
this.status = status;
this.lastStatusUpdateTime = 0;
}
private void setSuccess() {
this.status = StoragePolicySatisfyPathStatus.SUCCESS;
this.lastStatusUpdateTime = Time.monotonicNow();
}
private void setFailure() {
this.status = StoragePolicySatisfyPathStatus.FAILURE;
this.lastStatusUpdateTime = Time.monotonicNow();
}
private StoragePolicySatisfyPathStatus getStatus() {
return status;
}
/**
* Return true if SUCCESS status cached more then 5 min.
*/
private boolean canRemove() {
return (StoragePolicySatisfyPathStatus.SUCCESS == status
|| StoragePolicySatisfyPathStatus.FAILURE == status)
&& (Time.monotonicNow()
- lastStatusUpdateTime) > statusClearanceElapsedTimeMs;
}
}
public StoragePolicySatisfyPathStatus getStatus(long id) {
StoragePolicySatisfyPathStatusInfo spsStatusInfo = spsStatus.get(id);
if(spsStatusInfo == null){
return StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
}
return spsStatusInfo.getStatus();
}
@VisibleForTesting
public static void setStatusClearanceElapsedTimeMs(
long statusClearanceElapsedTimeMs) {

View File

@ -93,11 +93,6 @@ public interface Context {
*/
BlockStoragePolicy getStoragePolicy(byte policyId);
/**
* Drop the SPS work in case if any previous work queued up.
*/
void addDropPreviousSPSWorkAtDNs();
/**
* Remove the hint which was added to track SPS call.
*

View File

@ -1,63 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.sps;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
/**
* This class handles the internal SPS block movements. This will assign block
* movement tasks to target datanode descriptors.
*/
@InterfaceAudience.Private
public class IntraSPSNameNodeBlockMoveTaskHandler
implements BlockMoveTaskHandler {
private BlockManager blockManager;
private Namesystem namesystem;
public IntraSPSNameNodeBlockMoveTaskHandler(BlockManager blockManager,
Namesystem namesytem) {
this.blockManager = blockManager;
this.namesystem = namesytem;
}
@Override
public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
namesystem.readLock();
try {
DatanodeDescriptor dn = blockManager.getDatanodeManager()
.getDatanode(blkMovingInfo.getTarget().getDatanodeUuid());
if (dn == null) {
throw new IOException("Failed to schedule block movement task:"
+ blkMovingInfo + " as target datanode: "
+ blkMovingInfo.getTarget() + " doesn't exists");
}
dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
dn.addBlocksToMoveStorage(blkMovingInfo);
} finally {
namesystem.readUnlock();
}
}
}

View File

@ -1,189 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.sps;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is the Namenode implementation for analyzing the file blocks which
* are expecting to change its storages and assigning the block storage
* movements to satisfy the storage policy.
*/
@InterfaceAudience.Private
public class IntraSPSNameNodeContext implements Context {
private static final Logger LOG = LoggerFactory
.getLogger(IntraSPSNameNodeContext.class);
private final Namesystem namesystem;
private final BlockManager blockManager;
private SPSService service;
private final FileCollector fileCollector;
private final BlockMoveTaskHandler blockMoveTaskHandler;
public IntraSPSNameNodeContext(Namesystem namesystem,
BlockManager blockManager, SPSService service) {
this.namesystem = namesystem;
this.blockManager = blockManager;
this.service = service;
fileCollector = new IntraSPSNameNodeFileIdCollector(
namesystem.getFSDirectory(), service);
blockMoveTaskHandler = new IntraSPSNameNodeBlockMoveTaskHandler(
blockManager, namesystem);
}
@Override
public int getNumLiveDataNodes() {
return blockManager.getDatanodeManager().getNumLiveDataNodes();
}
/**
* @return object containing information regarding the file.
*/
@Override
public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
Path filePath = DFSUtilClient.makePathFromFileId(inodeID);
return namesystem.getFileInfo(filePath.toString(), true, true);
}
@Override
public DatanodeStorageReport[] getLiveDatanodeStorageReport()
throws IOException {
namesystem.readLock();
try {
return blockManager.getDatanodeManager()
.getDatanodeStorageReport(DatanodeReportType.LIVE);
} finally {
namesystem.readUnlock();
}
}
@Override
public boolean isFileExist(long inodeId) {
return namesystem.getFSDirectory().getInode(inodeId) != null;
}
@Override
public void removeSPSHint(long inodeId) throws IOException {
this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY);
}
@Override
public boolean isRunning() {
return namesystem.isRunning() && service.isRunning();
}
@Override
public boolean isInSafeMode() {
return namesystem.isInSafeMode();
}
@Override
public boolean isMoverRunning() {
String moverId = HdfsServerConstants.MOVER_ID_PATH.toString();
return namesystem.isFileOpenedForWrite(moverId);
}
@Override
public void addDropPreviousSPSWorkAtDNs() {
namesystem.readLock();
try {
blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
} finally {
namesystem.readUnlock();
}
}
@Override
public BlockStoragePolicy getStoragePolicy(byte policyID) {
return blockManager.getStoragePolicy(policyID);
}
@Override
public NetworkTopology getNetworkTopology(DatanodeMap datanodeMap) {
return blockManager.getDatanodeManager().getNetworkTopology();
}
@Override
public long getFileID(String path) throws UnresolvedLinkException,
AccessControlException, ParentNotDirectoryException {
namesystem.readLock();
try {
INode inode = namesystem.getFSDirectory().getINode(path);
return inode == null ? -1 : inode.getId();
} finally {
namesystem.readUnlock();
}
}
@Override
public Long getNextSPSPath() {
return blockManager.getSPSManager().getNextPathId();
}
@Override
public void removeSPSPathId(long trackId) {
blockManager.getSPSManager().removePathId(trackId);
}
@Override
public void removeAllSPSPathIds() {
blockManager.getSPSManager().removeAllPathIds();
}
@Override
public void scanAndCollectFiles(long filePath)
throws IOException, InterruptedException {
fileCollector.scanAndCollectFiles(filePath);
}
@Override
public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
blockMoveTaskHandler.submitMoveTask(blkMovingInfo);
}
@Override
public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
LOG.info("Movement attempted blocks: {}",
Arrays.asList(moveAttemptFinishedBlks));
}
}

View File

@ -1,185 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.sps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
import org.apache.hadoop.hdfs.server.namenode.INode;
/**
* A specific implementation for scanning the directory with Namenode internal
* Inode structure and collects the file ids under the given directory ID.
*/
@InterfaceAudience.Private
public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
implements FileCollector {
private int maxQueueLimitToScan;
private final SPSService service;
private int remainingCapacity = 0;
private List<ItemInfo> currentBatch;
public IntraSPSNameNodeFileIdCollector(FSDirectory dir,
SPSService service) {
super(dir, service.getConf());
this.service = service;
this.maxQueueLimitToScan = service.getConf().getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT);
currentBatch = new ArrayList<>(maxQueueLimitToScan);
}
@Override
protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
throws IOException, InterruptedException {
if (LOG.isTraceEnabled()) {
LOG.trace("Processing {} for statisy the policy",
inode.getFullPathName());
}
if (!inode.isFile()) {
return false;
}
if (inode.isFile() && inode.asFile().numBlocks() != 0) {
currentBatch.add(new ItemInfo(
((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
remainingCapacity--;
}
return true;
}
@Override
protected boolean shouldSubmitCurrentBatch() {
return remainingCapacity <= 0;
}
@Override
protected void checkINodeReady(long startId) throws IOException {
// SPS work won't be scheduled if NN is in standby. So, skipping NN
// standby check.
return;
}
@Override
protected void submitCurrentBatch(Long startId)
throws IOException, InterruptedException {
// Add current child's to queue
service.addAllFilesToProcess(startId,
currentBatch, false);
currentBatch.clear();
}
@Override
protected void throttle() throws InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
+ " waiting for some free slots.");
}
remainingCapacity = remainingCapacity();
// wait for queue to be free
while (remainingCapacity <= 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for storageMovementNeeded queue to be free!");
}
Thread.sleep(5000);
remainingCapacity = remainingCapacity();
}
}
@Override
protected boolean canTraverseDir(INode inode) throws IOException {
return true;
}
@Override
protected void checkPauseForTesting() throws InterruptedException {
// Nothing to do
}
@Override
public void scanAndCollectFiles(final long startINodeId)
throws IOException, InterruptedException {
FSDirectory fsd = getFSDirectory();
INode startInode = fsd.getInode(startINodeId);
if (startInode != null) {
remainingCapacity = remainingCapacity();
if (remainingCapacity == 0) {
throttle();
}
if (startInode.isFile()) {
currentBatch
.add(new ItemInfo(startInode.getId(), startInode.getId()));
} else {
readLock();
// NOTE: this lock will not be held for full directory scanning. It is
// basically a sliced locking. Once it collects a batch size( at max the
// size of maxQueueLimitToScan (default 1000)) file ids, then it will
// unlock and submits the current batch to SPSService. Once
// service.processingQueueSize() shows empty slots, then lock will be
// re-acquired and scan will be resumed. This logic was re-used from
// EDEK feature.
try {
traverseDir(startInode.asDirectory(), startINodeId,
HdfsFileStatus.EMPTY_NAME, new SPSTraverseInfo(startINodeId));
} finally {
readUnlock();
}
}
// Mark startInode traverse is done, this is last-batch
service.addAllFilesToProcess(startInode.getId(), currentBatch, true);
currentBatch.clear();
}
}
/**
* Returns queue remaining capacity.
*/
public synchronized int remainingCapacity() {
int size = service.processingQueueSize();
int remainingSize = 0;
if (size < maxQueueLimitToScan) {
remainingSize = maxQueueLimitToScan - size;
}
if (LOG.isDebugEnabled()) {
LOG.debug("SPS processing Q -> maximum capacity:{}, current size:{},"
+ " remaining size:{}", maxQueueLimitToScan, size, remainingSize);
}
return remainingSize;
}
class SPSTraverseInfo extends TraverseInfo {
private long startId;
SPSTraverseInfo(long startId) {
this.startId = startId;
}
public long getStartId() {
return startId;
}
}
}

View File

@ -101,11 +101,6 @@ public interface SPSService {
*/
int processingQueueSize();
/**
* Clear inodeId present in the processing queue.
*/
void clearQueue(long spsPath);
/**
* @return the configuration.
*/

View File

@ -43,14 +43,12 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.balancer.Matcher;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
@ -159,15 +157,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
serviceMode);
return;
}
if (serviceMode == StoragePolicySatisfierMode.INTERNAL
&& ctxt.isMoverRunning()) {
isRunning = false;
LOG.error(
"Stopping StoragePolicySatisfier thread " + "as Mover ID file "
+ HdfsServerConstants.MOVER_ID_PATH.toString()
+ " been opened. Maybe a Mover instance is running!");
return;
}
if (reconfigStart) {
LOG.info("Starting {} StoragePolicySatisfier, as admin requested to "
+ "start it.", StringUtils.toLowerCase(serviceMode.toString()));
@ -177,9 +166,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
isRunning = true;
// Ensure that all the previously submitted block movements(if any) have to
// be stopped in all datanodes.
addDropSPSWorkCommandsToAllDNs();
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
@ -201,7 +187,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
this.storageMovementsMonitor.stop();
if (forceStop) {
storageMovementNeeded.clearQueuesWithNotification();
addDropSPSWorkCommandsToAllDNs();
} else {
LOG.info("Stopping StoragePolicySatisfier.");
}
@ -234,14 +219,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
return isRunning;
}
/**
* Adding drop commands to all datanodes to stop performing the satisfier
* block movements, if any.
*/
private void addDropSPSWorkCommandsToAllDNs() {
ctxt.addDropPreviousSPSWorkAtDNs();
}
@Override
public void run() {
while (isRunning) {
@ -1100,13 +1077,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
storageMovementNeeded.clearAll();
}
/**
* Clear queues for given track id.
*/
public void clearQueue(long trackId) {
storageMovementNeeded.clearQueue(trackId);
}
/**
* This class contains information of an attempted blocks and its last
* attempted or reported time stamp. This is used by
@ -1158,20 +1128,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
}
/**
* Returns sps invoked path status. This method is used by internal satisfy
* storage policy service.
*
* @param path
* sps path
* @return storage policy satisfy path status
* @throws IOException
*/
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
return storageMovementNeeded.getStatus(ctxt.getFileID(path));
}
@Override
public void addFileToProcess(ItemInfo trackInfo, boolean scanCompleted) {
storageMovementNeeded.add(trackInfo, scanCompleted);

View File

@ -18,30 +18,27 @@
package org.apache.hadoop.hdfs.server.namenode.sps;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.sps.ExternalStoragePolicySatisfier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* This manages satisfy storage policy invoked path ids and expose methods to
* process these path ids. It maintains sps mode(INTERNAL/EXTERNAL/NONE)
* process these path ids. It maintains sps mode(EXTERNAL/NONE)
* configured by the administrator.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
* it will start internal sps daemon service inside namenode and process sps
* invoked path ids to satisfy the storage policy.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
* it won't do anything, just maintains the sps invoked path ids. Administrator
* requires to start external sps service explicitly, to fetch the sps invoked
@ -66,10 +63,9 @@ public class StoragePolicySatisfyManager {
private final Queue<Long> pathsToBeTraveresed;
private final int outstandingPathsLimit;
private final Namesystem namesystem;
private final BlockManager blkMgr;
public StoragePolicySatisfyManager(Configuration conf, Namesystem namesystem,
BlockManager blkMgr) {
public StoragePolicySatisfyManager(Configuration conf,
Namesystem namesystem) {
// StoragePolicySatisfier(SPS) configs
storagePolicyEnabled = conf.getBoolean(
DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
@ -82,21 +78,16 @@ public class StoragePolicySatisfyManager {
DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT);
mode = StoragePolicySatisfierMode.fromString(modeVal);
pathsToBeTraveresed = new LinkedList<Long>();
this.namesystem = namesystem;
// instantiate SPS service by just keeps config reference and not starting
// any supporting threads.
spsService = new StoragePolicySatisfier(conf);
this.namesystem = namesystem;
this.blkMgr = blkMgr;
}
/**
* This function will do following logic based on the configured sps mode:
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
* starts internal daemon service inside namenode.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
* it won't do anything. Administrator requires to start external sps service
* explicitly.
@ -113,17 +104,6 @@ public class StoragePolicySatisfyManager {
}
switch (mode) {
case INTERNAL:
if (spsService.isRunning()) {
LOG.info("Storage policy satisfier is already running"
+ " as internal daemon service inside namenode.");
return;
}
// starts internal daemon service inside namenode
spsService.init(
new IntraSPSNameNodeContext(namesystem, blkMgr, spsService));
spsService.start(false, mode);
break;
case EXTERNAL:
LOG.info("Storage policy satisfier is configured as external, "
+ "please start external sps service explicitly to satisfy policy");
@ -141,10 +121,6 @@ public class StoragePolicySatisfyManager {
* This function will do following logic based on the configured sps mode:
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
* stops internal daemon service inside namenode.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
* it won't do anything. Administrator requires to stop external sps service
* explicitly, if needed.
@ -162,16 +138,6 @@ public class StoragePolicySatisfyManager {
}
switch (mode) {
case INTERNAL:
removeAllPathIds();
if (!spsService.isRunning()) {
LOG.info("Internal storage policy satisfier daemon service"
+ " is not running");
return;
}
// stops internal daemon service running inside namenode
spsService.stop(false);
break;
case EXTERNAL:
removeAllPathIds();
if (LOG.isDebugEnabled()) {
@ -194,11 +160,8 @@ public class StoragePolicySatisfyManager {
}
/**
* Sets new sps mode. If the new mode is internal, then it will start internal
* sps service inside namenode. If the new mode is external, then stops
* internal sps service running(if any) inside namenode. If the new mode is
* none, then it will disable the sps feature completely by clearing all
* queued up sps path's hint.
* Sets new sps mode. If the new mode is none, then it will disable the sps
* feature completely by clearing all queued up sps path's hint.
*/
public void changeModeEvent(StoragePolicySatisfierMode newMode) {
if (!storagePolicyEnabled) {
@ -212,16 +175,6 @@ public class StoragePolicySatisfyManager {
}
switch (newMode) {
case INTERNAL:
if (spsService.isRunning()) {
LOG.info("Storage policy satisfier is already running as {} mode.",
mode);
return;
}
spsService.init(new IntraSPSNameNodeContext(this.namesystem, this.blkMgr,
spsService));
spsService.start(true, newMode);
break;
case EXTERNAL:
if (mode == newMode) {
LOG.info("Storage policy satisfier is already in mode:{},"
@ -238,7 +191,7 @@ public class StoragePolicySatisfyManager {
}
LOG.info("Disabling StoragePolicySatisfier, mode:{}", newMode);
spsService.stop(true);
removeAllPathIds();
clearPathIds();
break;
default:
if (LOG.isDebugEnabled()) {
@ -251,77 +204,15 @@ public class StoragePolicySatisfyManager {
mode = newMode;
}
/**
* This function will do following logic based on the configured sps mode:
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
* timed wait to stop internal storage policy satisfier daemon threads.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
* it won't do anything, just ignore it.
*
* <p>
* If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then the
* service is disabled. It won't do any action, just ignore it.
*/
public void stopGracefully() {
switch (mode) {
case INTERNAL:
spsService.stopGracefully();
break;
case EXTERNAL:
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring, StoragePolicySatisfier feature is running"
+ " outside namenode");
}
break;
case NONE:
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring, StoragePolicySatisfier feature is disabled");
}
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("Invalid mode:{}", mode);
}
break;
}
}
/**
* @return true if the internal storage policy satisfier daemon is running,
* false otherwise.
*/
public boolean isInternalSatisfierRunning() {
@VisibleForTesting
public boolean isSatisfierRunning() {
return spsService.isRunning();
}
/**
* @return internal SPS service instance.
*/
public SPSService getInternalSPSService() {
return this.spsService;
}
/**
* @return status Storage policy satisfy status of the path. It is supported
* only for the internal sps daemon service.
* @throws IOException
* if the Satisfier is not running inside namenode.
*/
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
if (mode != StoragePolicySatisfierMode.INTERNAL) {
LOG.debug("Satisfier is not running inside namenode, so status "
+ "can't be returned.");
throw new IOException("Satisfier is not running inside namenode, "
+ "so status can't be returned.");
}
return spsService.checkStoragePolicySatisfyPathStatus(path);
}
/**
* @return the next SPS path id, on which path users has invoked to satisfy
* storages.
@ -348,10 +239,22 @@ public class StoragePolicySatisfyManager {
/**
* Removes the SPS path id from the list of sps paths.
*
* @throws IOException
*/
public void removePathId(long trackId) {
private void clearPathIds(){
synchronized (pathsToBeTraveresed) {
pathsToBeTraveresed.remove(trackId);
Iterator<Long> iterator = pathsToBeTraveresed.iterator();
while (iterator.hasNext()) {
Long trackId = iterator.next();
try {
namesystem.removeXattr(trackId,
HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
} catch (IOException e) {
LOG.debug("Failed to remove sps xatttr!", e);
}
iterator.remove();
}
}
}
@ -374,12 +277,11 @@ public class StoragePolicySatisfyManager {
}
/**
* @return true if sps is configured as an internal service or external
* @return true if sps is configured as an external
* service, false otherwise.
*/
public boolean isEnabled() {
return mode == StoragePolicySatisfierMode.INTERNAL
|| mode == StoragePolicySatisfierMode.EXTERNAL;
return mode == StoragePolicySatisfierMode.EXTERNAL;
}
/**

View File

@ -149,11 +149,6 @@ public class ExternalSPSContext implements Context {
return createDefaultSuite.getPolicy(policyId);
}
@Override
public void addDropPreviousSPSWorkAtDNs() {
// Nothing todo
}
@Override
public void removeSPSHint(long inodeId) throws IOException {
Path filePath = DFSUtilClient.makePathFromFileId(inodeId);

View File

@ -68,15 +68,6 @@ public final class ExternalStoragePolicySatisfier {
StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
nnc = getNameNodeConnector(spsConf);
boolean spsRunning;
spsRunning = nnc.getDistributedFileSystem().getClient()
.isInternalSatisfierRunning();
if (spsRunning) {
throw new RuntimeException(
"Startup failed due to StoragePolicySatisfier"
+ " running inside Namenode.");
}
ExternalSPSContext context = new ExternalSPSContext(sps, nnc);
sps.init(context);
sps.start(true, StoragePolicySatisfierMode.EXTERNAL);

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.tools.TableListing;
import org.apache.hadoop.util.StringUtils;
@ -34,7 +33,6 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.FileNotFoundException;
import com.google.common.base.Joiner;
import java.io.IOException;
import java.util.Arrays;
@ -297,88 +295,6 @@ public class StoragePolicyAdmin extends Configured implements Tool {
dfs.satisfyStoragePolicy(new Path(path));
System.out.println("Scheduled blocks to move based on the current"
+ " storage policy on " + path);
boolean waitOpt = StringUtils.popOption("-w", args);
if (waitOpt) {
waitForSatisfyPolicy(dfs, path);
}
} catch (Exception e) {
System.err.println(AdminHelper.prettifyException(e));
return 2;
}
return 0;
}
private void waitForSatisfyPolicy(DistributedFileSystem dfs, String path)
throws IOException {
System.out.println("Waiting for satisfy the policy ...");
boolean running = true;
while (running) {
StoragePolicySatisfyPathStatus status = dfs.getClient()
.checkStoragePolicySatisfyPathStatus(path);
switch (status) {
case SUCCESS:
case FAILURE:
case NOT_AVAILABLE:
System.out.println(status);
running = false;
break;
case PENDING:
case IN_PROGRESS:
System.out.println(status);
default:
System.err.println("Unexpected storage policy satisfyer status,"
+ " Exiting");
running = false;
break;
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
}
System.out.println(" done");
}
}
/**
* Command to check storage policy satisfier status running internal(inside)
* Namenode.
*/
private static class IsInternalSatisfierRunningCommand
implements AdminHelper.Command {
@Override
public String getName() {
return "-isInternalSatisfierRunning";
}
@Override
public String getShortUsage() {
return "[" + getName() + "]\n";
}
@Override
public String getLongUsage() {
return getShortUsage() + "\n"
+ "Check the status of Storage Policy Statisfier"
+ " running inside Namenode.\n\n";
}
@Override
public int run(Configuration conf, List<String> args) throws IOException {
if (!args.isEmpty()) {
System.err.print("Can't understand arguments: "
+ Joiner.on(" ").join(args) + "\n");
System.err.println("Usage is " + getLongUsage());
return 1;
}
final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
try {
if(dfs.getClient().isInternalSatisfierRunning()){
System.out.println("yes");
}else{
System.out.println("no");
}
} catch (Exception e) {
System.err.println(AdminHelper.prettifyException(e));
return 2;
@ -438,7 +354,6 @@ public class StoragePolicyAdmin extends Configured implements Tool {
new SetStoragePolicyCommand(),
new GetStoragePolicyCommand(),
new UnsetStoragePolicyCommand(),
new SatisfyStoragePolicyCommand(),
new IsInternalSatisfierRunningCommand()
new SatisfyStoragePolicyCommand()
};
}

View File

@ -60,8 +60,6 @@ message DatanodeCommandProto {
NullDatanodeCommand = 7;
BlockIdCommand = 8;
BlockECReconstructionCommand = 9;
BlockStorageMovementCommand = 10;
DropSPSWorkCommand = 11;
}
required Type cmdType = 1; // Type of the command
@ -76,8 +74,6 @@ message DatanodeCommandProto {
optional RegisterCommandProto registerCmd = 7;
optional BlockIdCommandProto blkIdCmd = 8;
optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9;
optional BlockStorageMovementCommandProto blkStorageMovementCmd = 10;
optional DropSPSWorkCommandProto dropSPSWorkCmd = 11;
}
/**
@ -158,32 +154,6 @@ message BlockECReconstructionCommandProto {
repeated BlockECReconstructionInfoProto blockECReconstructioninfo = 1;
}
/**
* Block storage movement command
*/
message BlockStorageMovementCommandProto {
required string blockPoolId = 1;
repeated BlockMovingInfoProto blockMovingInfo = 2;
}
/**
* Instruct datanode to drop SPS work queues
*/
message DropSPSWorkCommandProto {
// void
}
/**
* Block storage movement information
*/
message BlockMovingInfoProto {
required BlockProto block = 1;
required DatanodeInfoProto sourceDnInfo = 2;
required DatanodeInfoProto targetDnInfo = 3;
required StorageTypeProto sourceStorageType = 4;
required StorageTypeProto targetStorageType = 5;
}
/**
* registration - Information of the datanode registering with the namenode
*/

View File

@ -4501,8 +4501,7 @@
<name>dfs.storage.policy.satisfier.mode</name>
<value>none</value>
<description>
Following values are supported - internal, external, none.
If internal, StoragePolicySatisfier will be enabled and started along with active namenode.
Following values are supported - external, none.
If external, StoragePolicySatisfier will be enabled and started as an independent service outside namenode.
If none, StoragePolicySatisfier is disabled.
By default, StoragePolicySatisfier is disabled.
@ -4560,17 +4559,6 @@
</description>
</property>
<property>
<name>dfs.storage.policy.satisfier.low.max-streams.preference</name>
<value>true</value>
<description>
If false, blocks to move tasks will share equal ratio of number of highest-priority
replication streams (dfs.namenode.replication.max-streams) with pending replica and
erasure-coded reconstruction tasks. If true, blocks to move tasks will only use
the delta number of replication streams. The default value is true.
</description>
</property>
<property>
<name>dfs.storage.policy.satisfier.retry.max.attempts</name>
<value>3</value>

View File

@ -106,9 +106,9 @@ Following 2 options will allow users to move the blocks based on new policy set.
### <u>S</u>torage <u>P</u>olicy <u>S</u>atisfier (SPS)
When user changes the storage policy on a file/directory, user can call `HdfsAdmin` API `satisfyStoragePolicy()` to move the blocks as per the new policy set.
The SPS daemon thread runs along with namenode and periodically scans for the storage mismatches between new policy set and the physical blocks placed. This will only track the files/directories for which user invoked satisfyStoragePolicy. If SPS identifies some blocks to be moved for a file, then it will schedule block movement tasks to datanodes. If there are any failures in movement, the SPS will re-attempt by sending new block movement tasks.
The SPS tool running external to namenode periodically scans for the storage mismatches between new policy set and the physical blocks placed. This will only track the files/directories for which user invoked satisfyStoragePolicy. If SPS identifies some blocks to be moved for a file, then it will schedule block movement tasks to datanodes. If there are any failures in movement, the SPS will re-attempt by sending new block movement tasks.
SPS can be enabled as internal service to Namenode or as an external service outside Namenode or disabled dynamically without restarting the Namenode.
SPS can be enabled as an external service outside Namenode or disabled dynamically without restarting the Namenode.
Detailed design documentation can be found at [Storage Policy Satisfier(SPS) (HDFS-10285)](https://issues.apache.org/jira/browse/HDFS-10285)
@ -125,8 +125,8 @@ Detailed design documentation can be found at [Storage Policy Satisfier(SPS) (HD
####Configurations:
* **dfs.storage.policy.satisfier.mode** - Used to enable(internal service inside NN or external service outside NN) or disable SPS.
Following string values are supported - `internal`, `external`, `none`. Configuring `internal` or `external` value represents SPS is enable and `none` to disable.
* **dfs.storage.policy.satisfier.mode** - Used to enable external service outside NN or disable SPS.
Following string values are supported - `external`, `none`. Configuring `external` value represents SPS is enable and `none` to disable.
The default value is `none`.
* **dfs.storage.policy.satisfier.recheck.timeout.millis** - A timeout to re-check the processed block storage movement
@ -218,25 +218,17 @@ Schedule blocks to move based on file's/directory's current storage policy.
* Command:
hdfs storagepolicies -satisfyStoragePolicy [-w] -path <path>
hdfs storagepolicies -satisfyStoragePolicy -path <path>
* Arguments:
| | |
|:---- |:---- |
| `-path <path>` | The path referring to either a directory or a file. |
| `-w` | It requests that the command wait till all the files satisfy the policy in given path. This will print the current status of the path in each 10 sec and status are:<br/>PENDING - Path is in queue and not processed for satisfying the policy.<br/>IN_PROGRESS - Satisfying the storage policy for path.<br/>SUCCESS - Storage policy satisfied for the path.<br/>FAILURE : Few blocks failed to move.<br/>NOT_AVAILABLE - Status not available. |
### SPS Running Status
Check the running status of Storage Policy Satisfier service in namenode. If it is running, return 'yes'. Otherwise return 'no'.
* Command:
hdfs storagepolicies -isInternalSatisfierRunning
### Enable(internal service inside NN or external service outside NN) or Disable SPS without restarting Namenode
If administrator wants to switch modes of SPS feature while Namenode is running, first he/she needs to update the desired value(internal or external or none) for the configuration item `dfs.storage.policy.satisfier.mode` in configuration file (`hdfs-site.xml`) and then run the following Namenode reconfig command
### Enable external service outside NN or Disable SPS without restarting Namenode
If administrator wants to switch modes of SPS feature while Namenode is running, first he/she needs to update the desired value(external or none) for the configuration item `dfs.storage.policy.satisfier.mode` in configuration file (`hdfs-site.xml`) and then run the following Namenode reconfig command
* Command:

View File

@ -59,6 +59,7 @@ import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@ -139,6 +140,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
@ -165,6 +167,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.XAttrStorage;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@ -193,6 +196,7 @@ import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.VersionInfo;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Assume;
import org.apache.hadoop.util.ToolRunner;
@ -2491,4 +2495,40 @@ public class DFSTestUtil {
}
}, 100, timeout);
}
/**
* Get namenode connector using the given configuration and file path.
*
* @param conf
* hdfs configuration
* @param filePath
* file path
* @param namenodeCount
* number of namenodes
* @param createMoverPath
* create move path flag to skip the path creation
* @return Namenode connector.
* @throws IOException
*/
public static NameNodeConnector getNameNodeConnector(Configuration conf,
Path filePath, int namenodeCount, boolean createMoverPath)
throws IOException {
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(namenodeCount, namenodes.size());
NameNodeConnector.checkOtherInstanceRunning(createMoverPath);
while (true) {
try {
final List<NameNodeConnector> nncs = NameNodeConnector
.newNameNodeConnectors(namenodes,
StoragePolicySatisfier.class.getSimpleName(),
filePath, conf,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
return nncs.get(0);
} catch (IOException e) {
LOG.warn("Failed to connect with namenode", e);
// Ignore
}
}
}
}

View File

@ -123,8 +123,6 @@ public class TestBPOfferService {
Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
.when(mockDn).getMetrics();
Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn, null))
.when(mockDn).getStoragePolicySatisfyWorker();
// Set up a simulated dataset with our fake BP
mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
@ -378,8 +376,6 @@ public class TestBPOfferService {
Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
when(mockDn).getMetrics();
Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn, null))
.when(mockDn).getStoragePolicySatisfyWorker();
final AtomicInteger count = new AtomicInteger();
Mockito.doAnswer(new Answer<Void>() {
@Override

View File

@ -1,241 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Supplier;
/**
* This class tests the behavior of moving block replica to the given storage
* type to fulfill the storage policy requirement.
*/
public class TestStoragePolicySatisfyWorker {
private static final Logger LOG = LoggerFactory
.getLogger(TestStoragePolicySatisfyWorker.class);
private static final int DEFAULT_BLOCK_SIZE = 100;
private MiniDFSCluster cluster = null;
private final Configuration conf = new HdfsConfiguration();
private static void initConf(Configuration conf) {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
1L);
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
// Reduced refresh cycle to update latest datanodes.
conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000);
}
@Before
public void setUp() throws IOException {
initConf(conf);
}
@After
public void teardown() throws IOException {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Tests to verify that the block replica is moving to ARCHIVE storage type to
* fulfill the storage policy requirement.
*/
@Test(timeout = 120000)
public void testMoveSingleBlockToAnotherDatanode() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}})
.build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testMoveSingleBlockToAnotherDatanode";
// write to DISK
final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
out.writeChars("testMoveSingleBlockToAnotherDatanode");
out.close();
// verify before movement
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
StorageType[] storageTypes = lb.getStorageTypes();
for (StorageType storageType : storageTypes) {
Assert.assertTrue(StorageType.DISK == storageType);
}
// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
dfs.satisfyStoragePolicy(new Path(file));
cluster.triggerHeartbeats();
// Wait till NameNode notified about the block location details
waitForLocatedBlockWithArchiveStorageType(dfs, file, 2, 30000);
}
/**
* Test to verify that satisfy worker can't move blocks. If specified target
* datanode doesn't have enough space to accommodate the moving block.
*/
@Test(timeout = 120000)
public void testMoveWithNoSpaceAvailable() throws Exception {
final long capacity = 150;
final String rack0 = "/rack0";
final String rack1 = "/rack1";
long[] capacities = new long[] {capacity, capacity, capacity / 2};
String[] hosts = {"host0", "host1", "host2"};
String[] racks = {rack0, rack1, rack0};
int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numOfDatanodes)
.hosts(hosts).racks(racks).simulatedCapacities(capacities)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}})
.build();
cluster.waitActive();
InetSocketAddress[] favoredNodes = new InetSocketAddress[3];
for (int i = 0; i < favoredNodes.length; i++) {
// DFSClient will attempt reverse lookup. In case it resolves
// "127.0.0.1" to "localhost", we manually specify the hostname.
favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress();
}
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testMoveWithNoSpaceAvailable";
DFSTestUtil.createFile(dfs, new Path(file), false, 1024, 100,
DEFAULT_BLOCK_SIZE, (short) 2, 0, false, favoredNodes);
// verify before movement
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
StorageType[] storageTypes = lb.getStorageTypes();
for (StorageType storageType : storageTypes) {
Assert.assertTrue(StorageType.DISK == storageType);
}
// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
DataNode src = cluster.getDataNodes().get(2);
DatanodeInfo targetDnInfo = DFSTestUtil
.getLocalDatanodeInfo(src.getXferPort());
SimpleBlocksMovementsStatusHandler handler =
new SimpleBlocksMovementsStatusHandler();
StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
src, handler);
try {
worker.start();
List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
lb.getStorageTypes()[0], StorageType.ARCHIVE);
blockMovingInfos.add(blockMovingInfo);
worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(),
blockMovingInfos);
waitForBlockMovementCompletion(handler, 1, 30000);
} finally {
worker.stop();
}
}
private void waitForBlockMovementCompletion(
final SimpleBlocksMovementsStatusHandler handler,
int expectedFinishedItemsCount, int timeout) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
List<Block> completedBlocks = handler.getMoveAttemptFinishedBlocks();
int finishedCount = completedBlocks.size();
LOG.info("Block movement completed count={}, expected={} and actual={}",
completedBlocks.size(), expectedFinishedItemsCount, finishedCount);
return expectedFinishedItemsCount == finishedCount;
}
}, 100, timeout);
}
private void waitForLocatedBlockWithArchiveStorageType(
final DistributedFileSystem dfs, final String file,
int expectedArchiveCount, int timeout) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LocatedBlock lb = null;
try {
lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
} catch (IOException e) {
LOG.error("Exception while getting located blocks", e);
return false;
}
int archiveCount = 0;
for (StorageType storageType : lb.getStorageTypes()) {
if (StorageType.ARCHIVE == storageType) {
archiveCount++;
}
}
LOG.info("Archive replica count, expected={} and actual={}",
expectedArchiveCount, archiveCount);
return expectedArchiveCount == archiveCount;
}
}, 100, timeout);
}
private BlockMovingInfo prepareBlockMovingInfo(Block block,
DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
StorageType targetStorageType) {
return new BlockMovingInfo(block, src, destin, storageType,
targetStorageType);
}
}

View File

@ -678,7 +678,7 @@ public class TestMover {
public void testMoveWhenStoragePolicySatisfierIsRunning() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
StoragePolicySatisfierMode.EXTERNAL.toString());
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.storageTypes(
@ -686,6 +686,9 @@ public class TestMover {
{StorageType.DISK}}).build();
try {
cluster.waitActive();
// Simulate External sps by creating #getNameNodeConnector instance.
DFSTestUtil.getNameNodeConnector(conf, HdfsServerConstants.MOVER_ID_PATH,
1, true);
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testMoveWhenStoragePolicySatisfierIsRunning";
// write to DISK
@ -697,7 +700,7 @@ public class TestMover {
dfs.setStoragePolicy(new Path(file), "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", file.toString()});
int exitcode = ExitStatus.SKIPPED_DUE_TO_SPS.getExitCode();
int exitcode = ExitStatus.IO_EXCEPTION.getExitCode();
Assert.assertEquals("Exit code should be " + exitcode, exitcode, rc);
} finally {
cluster.shutdown();

View File

@ -248,17 +248,17 @@ public class TestNameNodeReconfigure {
// enable SPS internally by keeping DFS_STORAGE_POLICY_ENABLED_KEY
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
StoragePolicySatisfierMode.EXTERNAL.toString());
// Since DFS_STORAGE_POLICY_ENABLED_KEY is disabled, SPS can't be enabled.
assertNull("SPS shouldn't start as "
+ DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY + " is disabled",
nameNode.getNamesystem().getBlockManager().getSPSManager());
verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL, false);
StoragePolicySatisfierMode.EXTERNAL, false);
assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
StoragePolicySatisfierMode.INTERNAL.toString(), nameNode.getConf()
StoragePolicySatisfierMode.EXTERNAL.toString(), nameNode.getConf()
.get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT));
}
@ -285,12 +285,6 @@ public class TestNameNodeReconfigure {
e.getCause());
}
// enable internal SPS
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL, true);
// disable SPS
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
@ -302,7 +296,7 @@ public class TestNameNodeReconfigure {
StoragePolicySatisfierMode.EXTERNAL.toString());
assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
false, nameNode.getNamesystem().getBlockManager().getSPSManager()
.isInternalSatisfierRunning());
.isSatisfierRunning());
assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
StoragePolicySatisfierMode.EXTERNAL.toString(),
nameNode.getConf().get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
@ -337,27 +331,15 @@ public class TestNameNodeReconfigure {
+ " by admin. Seek for an admin help to enable it "
+ "or use Mover tool.", e);
}
// start internal
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
true, nameNode.getNamesystem().getBlockManager().getSPSManager()
.isInternalSatisfierRunning());
assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
StoragePolicySatisfierMode.INTERNAL.toString(),
nameNode.getConf().get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT));
}
void verifySPSEnabled(final NameNode nameNode, String property,
StoragePolicySatisfierMode expected, boolean isSatisfierRunning) {
StoragePolicySatisfyManager spsMgr = nameNode
.getNamesystem().getBlockManager().getSPSManager();
boolean isInternalSatisfierRunning = spsMgr != null
? spsMgr.isInternalSatisfierRunning() : false;
assertEquals(property + " has wrong value", isSatisfierRunning,
isInternalSatisfierRunning);
boolean isSPSRunning = spsMgr != null ? spsMgr.isSatisfierRunning()
: false;
assertEquals(property + " has wrong value", isSPSRunning, isSPSRunning);
String actual = nameNode.getConf().get(property,
DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
assertEquals(property + " has wrong value", expected,

View File

@ -29,7 +29,11 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.sps.ExternalSPSContext;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
@ -45,11 +49,13 @@ import static org.junit.Assert.*;
* Test persistence of satisfying files/directories.
*/
public class TestPersistentStoragePolicySatisfier {
private static Configuration conf;
private static MiniDFSCluster cluster;
private static DistributedFileSystem fs;
private NameNodeConnector nnc;
private StoragePolicySatisfier sps;
private ExternalSPSContext ctxt;
private static Path testFile =
new Path("/testFile");
@ -65,7 +71,6 @@ public class TestPersistentStoragePolicySatisfier {
private static final String COLD = "COLD";
private static final String WARM = "WARM";
private static final String ONE_SSD = "ONE_SSD";
private static final String ALL_SSD = "ALL_SSD";
private static StorageType[][] storageTypes = new StorageType[][] {
{StorageType.DISK, StorageType.ARCHIVE, StorageType.SSD},
@ -104,7 +109,7 @@ public class TestPersistentStoragePolicySatisfier {
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
"3000");
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
StoragePolicySatisfierMode.EXTERNAL.toString());
// Reduced refresh cycle to update latest datanodes.
conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000);
@ -124,6 +129,14 @@ public class TestPersistentStoragePolicySatisfier {
} else {
fs = cluster.getFileSystem();
}
nnc = DFSTestUtil.getNameNodeConnector(conf,
HdfsServerConstants.MOVER_ID_PATH, 1, false);
sps = new StoragePolicySatisfier(conf);
ctxt = new ExternalSPSContext(sps, nnc);
sps.init(ctxt);
sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
createTestFiles(fs, replication);
}
@ -158,6 +171,9 @@ public class TestPersistentStoragePolicySatisfier {
cluster.shutdown(true);
cluster = null;
}
if (sps != null) {
sps.stopGracefully();
}
}
/**
@ -202,49 +218,6 @@ public class TestPersistentStoragePolicySatisfier {
}
}
/**
* Tests to verify satisfier persistence working as expected
* in HA env. This test case runs as below:
* 1. setup HA cluster env with simple HA topology.
* 2. switch the active NameNode from nn0/nn1 to nn1/nn0.
* 3. make sure all the storage policies are satisfied.
* @throws Exception
*/
@Test(timeout = 300000)
public void testWithHA() throws Exception {
try {
// Enable HA env for testing.
clusterSetUp(true, new HdfsConfiguration());
fs.setStoragePolicy(testFile, ALL_SSD);
fs.satisfyStoragePolicy(testFile);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.SSD, 3, timeout, fs);
// test directory
fs.setStoragePolicy(parentDir, WARM);
fs.satisfyStoragePolicy(parentDir);
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.DISK, 1, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 2, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
childFileName, StorageType.DISK, 1, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
childFileName, StorageType.ARCHIVE, 2, timeout, fs);
} finally {
clusterShutdown();
}
}
/**
* Tests to verify satisfier persistence working well with multiple
* restarts operations. This test case runs as below:
@ -281,63 +254,6 @@ public class TestPersistentStoragePolicySatisfier {
}
}
/**
* Tests to verify satisfier persistence working well with
* federal HA env. This test case runs as below:
* 1. setup HA test environment with federal topology.
* 2. satisfy storage policy of file1.
* 3. switch active NameNode from nn0 to nn1.
* 4. switch active NameNode from nn2 to nn3.
* 5. check whether the storage policy of file1 is satisfied.
* @throws Exception
*/
@Test(timeout = 300000)
public void testWithFederationHA() throws Exception {
MiniDFSCluster haCluster = null;
try {
conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
// Reduced refresh cycle to update latest datanodes.
conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000);
haCluster = new MiniDFSCluster
.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))
.storageTypes(storageTypes)
.numDataNodes(storageTypes.length).build();
haCluster.waitActive();
haCluster.transitionToActive(1);
haCluster.transitionToActive(3);
fs = HATestUtil.configureFailoverFs(haCluster, conf);
createTestFiles(fs, (short) 3);
fs.setStoragePolicy(testFile, WARM);
fs.satisfyStoragePolicy(testFile);
haCluster.transitionToStandby(1);
haCluster.transitionToActive(0);
haCluster.transitionToStandby(3);
haCluster.transitionToActive(2);
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.DISK, 1, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.ARCHIVE, 2, timeout, fs);
} finally {
if(fs != null) {
fs.close();
fs = null;
}
if(haCluster != null) {
haCluster.shutdown(true);
haCluster = null;
}
}
}
/**
* Tests to verify SPS xattr will be removed if the satisfy work has
* been finished, expect that the method satisfyStoragePolicy can be
@ -388,7 +304,7 @@ public class TestPersistentStoragePolicySatisfier {
* 3. make sure sps xattr is removed.
* @throws Exception
*/
@Test(timeout = 300000)
@Test(timeout = 300000000)
public void testDropSPS() throws Exception {
try {
clusterSetUp();

View File

@ -17,11 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
@ -32,24 +28,15 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests that StoragePolicySatisfier is able to work with HA enabled.
*/
public class TestStoragePolicySatisfierWithHA {
private MiniDFSCluster cluster = null;
private static final Logger LOG =
LoggerFactory.getLogger(TestStoragePolicySatisfierWithHA.class);
private final Configuration config = new HdfsConfiguration();
private static final int DEFAULT_BLOCK_SIZE = 1024;
@ -67,7 +54,7 @@ public class TestStoragePolicySatisfierWithHA {
private void createCluster() throws IOException {
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
config.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
StoragePolicySatisfierMode.EXTERNAL.toString());
// Reduced refresh cycle to update latest datanodes.
config.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000);
@ -101,50 +88,19 @@ public class TestStoragePolicySatisfierWithHA {
public void testWhenNNHAStateChanges() throws IOException {
try {
createCluster();
boolean running;
dfs = cluster.getFileSystem(1);
try {
dfs.getClient().isInternalSatisfierRunning();
Assert.fail("Call this function to Standby NN should "
+ "raise an exception.");
} catch (RemoteException e) {
IOException cause = e.unwrapRemoteException();
if (!(cause instanceof StandbyException)) {
Assert.fail("Unexpected exception happened " + e);
}
}
cluster.transitionToActive(0);
dfs = cluster.getFileSystem(0);
running = dfs.getClient().isInternalSatisfierRunning();
Assert.assertTrue("StoragePolicySatisfier should be active "
+ "when NN transits from Standby to Active mode.", running);
// NN transits from Active to Standby
cluster.transitionToStandby(0);
try {
dfs.getClient().isInternalSatisfierRunning();
Assert.fail("NN in Standby again, call this function should "
+ "raise an exception.");
} catch (RemoteException e) {
IOException cause = e.unwrapRemoteException();
if (!(cause instanceof StandbyException)) {
Assert.fail("Unexpected exception happened " + e);
}
}
cluster.waitActive();
try {
cluster.getNameNode(0).reconfigurePropertyImpl(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.EXTERNAL.toString());
StoragePolicySatisfierMode.NONE.toString());
Assert.fail("It's not allowed to enable or disable"
+ " StoragePolicySatisfier on Standby NameNode");
} catch (ReconfigurationException e) {
GenericTestUtils.assertExceptionContains("Could not change property "
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY
+ " from 'INTERNAL' to 'EXTERNAL'", e);
+ " from 'EXTERNAL' to 'NONE'", e);
GenericTestUtils.assertExceptionContains(
"Enabling or disabling storage policy satisfier service on "
+ "standby NameNode is not allowed", e.getCause());
@ -153,104 +109,4 @@ public class TestStoragePolicySatisfierWithHA {
cluster.shutdown();
}
}
/**
* Test to verify that during namenode switch over will add
* DNA_DROP_SPS_WORK_COMMAND to all the datanodes. Later, this will ensure to
* drop all the SPS queues at datanode.
*/
@Test(timeout = 90000)
public void testNamenodeSwitchoverShouldDropSPSWork() throws Exception {
try {
createCluster();
FSNamesystem fsn = cluster.getNamesystem(0);
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
List<DatanodeDescriptor> listOfDns = new ArrayList<>();
for (DataNode dn : dataNodes) {
DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn,
dn.getDatanodeId());
listOfDns.add(dnd);
}
cluster.shutdownDataNodes();
cluster.transitionToStandby(0);
LOG.info("**Transition to Active**");
cluster.transitionToActive(1);
// Verify that Standby-to-Active transition should set drop SPS flag to
// true. This will ensure that DNA_DROP_SPS_WORK_COMMAND will be
// propagated to datanode during heartbeat response.
int retries = 20;
boolean dropSPSWork = false;
while (retries > 0) {
for (DatanodeDescriptor dnd : listOfDns) {
dropSPSWork = dnd.shouldDropSPSWork();
if (!dropSPSWork) {
retries--;
Thread.sleep(250);
break;
}
}
if (dropSPSWork) {
break;
}
}
Assert.assertTrue("Didn't drop SPS work", dropSPSWork);
} finally {
cluster.shutdown();
}
}
/**
* Test to verify that SPS work will be dropped once the datanode is marked as
* expired. Internally 'dropSPSWork' flag is set as true while expiration and
* at the time of reconnection, will send DNA_DROP_SPS_WORK_COMMAND to that
* datanode.
*/
@Test(timeout = 90000)
public void testDeadDatanode() throws Exception {
int heartbeatExpireInterval = 2 * 2000;
config.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
3000);
config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000L);
createCluster();
DataNode dn = cluster.getDataNodes().get(0);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
FSNamesystem fsn = cluster.getNamesystem(0);
DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn,
dn.getDatanodeId());
boolean isDead = false;
int retries = 20;
while (retries > 0) {
isDead = dnd.getLastUpdateMonotonic() < (monotonicNow()
- heartbeatExpireInterval);
if (isDead) {
break;
}
retries--;
Thread.sleep(250);
}
Assert.assertTrue("Datanode is alive", isDead);
// Disable datanode heartbeat, so that the datanode will get expired after
// the recheck interval and become dead.
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
// Verify that datanode expiration will set drop SPS flag to
// true. This will ensure that DNA_DROP_SPS_WORK_COMMAND will be
// propagated to datanode during reconnection.
boolean dropSPSWork = false;
retries = 50;
while (retries > 0) {
dropSPSWork = dnd.shouldDropSPSWork();
if (dropSPSWork) {
break;
}
retries--;
Thread.sleep(100);
}
Assert.assertTrue("Didn't drop SPS work", dropSPSWork);
}
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.StorageTypeNodePair;
import org.apache.hadoop.hdfs.server.sps.ExternalSPSContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -52,7 +53,7 @@ public class TestBlockStorageMovementAttemptedItems {
@Before
public void setup() throws Exception {
Configuration config = new HdfsConfiguration();
Context ctxt = Mockito.mock(IntraSPSNameNodeContext.class);
Context ctxt = Mockito.mock(ExternalSPSContext.class);
SPSService sps = new StoragePolicySatisfier(config);
Mockito.when(ctxt.isRunning()).thenReturn(true);
Mockito.when(ctxt.isInSafeMode()).thenReturn(false);

View File

@ -42,7 +42,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.sps.ExternalSPSContext;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
@ -70,6 +72,9 @@ public class TestStoragePolicySatisfierWithStripedFile {
private int cellSize;
private int defaultStripeBlockSize;
private Configuration conf;
private StoragePolicySatisfier sps;
private ExternalSPSContext ctxt;
private NameNodeConnector nnc;
private ErasureCodingPolicy getEcPolicy() {
return StripedFileTestUtil.getDefaultECPolicy();
@ -87,7 +92,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
defaultStripeBlockSize = cellSize * stripesPerBlock;
conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
StoragePolicySatisfierMode.EXTERNAL.toString());
// Reduced refresh cycle to update latest datanodes.
conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000);
@ -102,8 +107,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
*/
@Test(timeout = 300000)
public void testMoverWithFullStripe() throws Exception {
// start 10 datanodes
int numOfDatanodes = 10;
// start 11 datanodes
int numOfDatanodes = 11;
int storagesPerDatanode = 2;
long capacity = 20 * defaultStripeBlockSize;
long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
@ -122,6 +127,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
@ -133,7 +139,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
try {
cluster.waitActive();
startSPS();
DistributedFileSystem dfs = cluster.getFileSystem();
dfs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
@ -189,12 +195,12 @@ public class TestStoragePolicySatisfierWithStripedFile {
LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
cluster.triggerHeartbeats();
waitForBlocksMovementAttemptReport(cluster, 9, 60000);
// verify storage types and locations
waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 9,
9, 60000);
} finally {
cluster.shutdown();
sps.stopGracefully();
}
}
@ -213,7 +219,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
public void testWhenOnlyFewTargetNodesAreAvailableToSatisfyStoragePolicy()
throws Exception {
// start 10 datanodes
int numOfDatanodes = 10;
int numOfDatanodes = 11;
int storagesPerDatanode = 2;
long capacity = 20 * defaultStripeBlockSize;
long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
@ -234,6 +240,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE}})
@ -243,7 +250,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
try {
cluster.waitActive();
startSPS();
DistributedFileSystem dfs = cluster.getFileSystem();
dfs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
@ -271,6 +278,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
Assert.assertEquals(StorageType.DISK, type);
}
}
Thread.sleep(5000);
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
dataBlocks + parityBlocks);
@ -296,13 +304,13 @@ public class TestStoragePolicySatisfierWithStripedFile {
LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
cluster.triggerHeartbeats();
waitForBlocksMovementAttemptReport(cluster, 5, 60000);
waitForAttemptedItems(cluster, 1, 30000);
waitForAttemptedItems(1, 30000);
// verify storage types and locations.
waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 5,
9, 60000);
} finally {
cluster.shutdown();
sps.stopGracefully();
}
}
@ -352,6 +360,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
.build();
try {
cluster.waitActive();
startSPS();
DistributedFileSystem fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
@ -393,6 +402,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
StorageType.ARCHIVE, 9, 9, 60000);
} finally {
cluster.shutdown();
sps.stopGracefully();
}
}
@ -444,6 +454,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
try {
cluster.waitActive();
startSPS();
DistributedFileSystem dfs = cluster.getFileSystem();
dfs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
@ -481,35 +492,25 @@ public class TestStoragePolicySatisfierWithStripedFile {
LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
cluster.triggerHeartbeats();
waitForAttemptedItems(cluster, 1, 30000);
waitForAttemptedItems(1, 30000);
// verify storage types and locations.
waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.DISK, 9, 9,
60000);
waitForAttemptedItems(cluster, 1, 30000);
waitForAttemptedItems(1, 30000);
} finally {
cluster.shutdown();
sps.stopGracefully();
}
}
private void waitForAttemptedItems(MiniDFSCluster cluster,
long expectedBlkMovAttemptedCount, int timeout)
throws TimeoutException, InterruptedException {
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps =
(StoragePolicySatisfier) blockManager
.getSPSManager().getInternalSPSService();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
expectedBlkMovAttemptedCount,
((BlockStorageMovementAttemptedItems) sps
.getAttemptedItemsMonitor()).getAttemptedItemsCount());
return ((BlockStorageMovementAttemptedItems) sps
.getAttemptedItemsMonitor())
.getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
}
}, 100, timeout);
private void startSPS() throws IOException {
nnc = DFSTestUtil.getNameNodeConnector(conf,
HdfsServerConstants.MOVER_ID_PATH, 1, false);
sps = new StoragePolicySatisfier(conf);
ctxt = new ExternalSPSContext(sps, nnc);
sps.init(ctxt);
sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
}
private static void initConfWithStripe(Configuration conf,
@ -562,24 +563,18 @@ public class TestStoragePolicySatisfierWithStripedFile {
}, 100, timeout);
}
// Check whether the block movement attempt report has been arrived at the
// Namenode(SPS).
private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster,
long expectedMoveFinishedBlks, int timeout)
throws TimeoutException, InterruptedException {
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps =
(StoragePolicySatisfier) blockManager.getSPSManager()
.getInternalSPSService();
private void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
int timeout) throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
int actualCount = ((BlockStorageMovementAttemptedItems) (sps
.getAttemptedItemsMonitor())).getMovementFinishedBlocksCount();
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
expectedMoveFinishedBlks,
actualCount);
return actualCount >= expectedMoveFinishedBlks;
LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
expectedBlkMovAttemptedCount,
((BlockStorageMovementAttemptedItems) (sps
.getAttemptedItemsMonitor())).getAttemptedItemsCount());
return ((BlockStorageMovementAttemptedItems) (sps
.getAttemptedItemsMonitor()))
.getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
}
}, 100, timeout);
}

View File

@ -50,7 +50,7 @@ public class TestStoragePolicyCommands {
public void clusterSetUp() throws IOException, URISyntaxException {
conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
StoragePolicySatisfierMode.EXTERNAL.toString());
StorageType[][] newtypes = new StorageType[][] {
{StorageType.ARCHIVE, StorageType.DISK}};
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL)

View File

@ -29,6 +29,11 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.sps.ExternalSPSContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -43,12 +48,13 @@ public class TestStoragePolicySatisfyAdminCommands {
private Configuration conf = null;
private MiniDFSCluster cluster = null;
private DistributedFileSystem dfs = null;
private StoragePolicySatisfier externalSps = null;
@Before
public void clusterSetUp() throws IOException, URISyntaxException {
conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.INTERNAL.toString());
StoragePolicySatisfierMode.EXTERNAL.toString());
// Reduced refresh cycle to update latest datanodes.
conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000);
@ -58,6 +64,14 @@ public class TestStoragePolicySatisfyAdminCommands {
.storageTypes(newtypes).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
NameNodeConnector nnc = DFSTestUtil.getNameNodeConnector(conf,
HdfsServerConstants.MOVER_ID_PATH, 1, false);
StoragePolicySatisfier externalSps = new StoragePolicySatisfier(conf);
Context externalCtxt = new ExternalSPSContext(externalSps, nnc);
externalSps.init(externalCtxt);
externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL);
}
@After
@ -70,6 +84,9 @@ public class TestStoragePolicySatisfyAdminCommands {
cluster.shutdown();
cluster = null;
}
if (externalSps != null) {
externalSps.stopGracefully();
}
}
@Test(timeout = 30000)
@ -92,41 +109,4 @@ public class TestStoragePolicySatisfyAdminCommands {
DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000,
dfs);
}
@Test(timeout = 30000)
public void testIsSatisfierRunningCommand() throws Exception {
final String file = "/testIsSatisfierRunningCommand";
DFSTestUtil.createFile(dfs, new Path(file), SIZE, REPL, 0);
final StoragePolicyAdmin admin = new StoragePolicyAdmin(conf);
DFSTestUtil.toolRun(admin, "-isInternalSatisfierRunning", 0, "yes");
cluster.getNameNode().reconfigureProperty(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
cluster.waitActive();
DFSTestUtil.toolRun(admin, "-isInternalSatisfierRunning", 0, "no");
// Test with unnecessary args
DFSTestUtil.toolRun(admin, "-isInternalSatisfierRunning status", 1,
"Can't understand arguments: ");
}
@Test(timeout = 90000)
public void testSatisfyStoragePolicyCommandWithWaitOption()
throws Exception {
final String file = "/testSatisfyStoragePolicyCommandWithWaitOption";
DFSTestUtil.createFile(dfs, new Path(file), SIZE, REPL, 0);
final StoragePolicyAdmin admin = new StoragePolicyAdmin(conf);
DFSTestUtil.toolRun(admin, "-setStoragePolicy -path " + file
+ " -policy COLD", 0, "Set storage policy COLD on " + file.toString());
DFSTestUtil.toolRun(admin, "-satisfyStoragePolicy -w -path " + file, 0,
"Waiting for satisfy the policy");
DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000,
dfs);
}
}