diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index adbb133a6cb..7337aa28cef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -3109,6 +3109,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } } + public boolean isStoragePolicySatisfierRunning() throws IOException { + return namenode.isStoragePolicySatisfierRunning(); + } + Tracer getTracer() { return tracer; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 1031ef66d68..4b95c3dfe3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -1756,4 +1756,12 @@ public interface ClientProtocol { */ @Idempotent void satisfyStoragePolicy(String path) throws IOException; + + /** + * Check if StoragePolicySatisfier is running. + * @return true if StoragePolicySatisfier is running + * @throws IOException + */ + @Idempotent + boolean isStoragePolicySatisfierRunning() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 54597d6bc0d..d7c32bce198 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -147,6 +147,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSto import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; @@ -295,6 +297,10 @@ public class ClientNamenodeProtocolTranslatorPB implements private final static GetErasureCodingCodecsRequestProto VOID_GET_EC_CODEC_REQUEST = GetErasureCodingCodecsRequestProto .newBuilder().build(); + private final static IsStoragePolicySatisfierRunningRequestProto + VOID_IS_SPS_RUNNING_REQUEST = IsStoragePolicySatisfierRunningRequestProto + .newBuilder().build(); + public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) { rpcProxy = proxy; @@ -1901,6 +1907,18 @@ public class ClientNamenodeProtocolTranslatorPB implements } } + @Override + public boolean isStoragePolicySatisfierRunning() throws IOException { + try { + IsStoragePolicySatisfierRunningResponseProto rep = + rpcProxy.isStoragePolicySatisfierRunning(null, + VOID_IS_SPS_RUNNING_REQUEST); + return rep.getRunning(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + @Override public QuotaUsage getQuotaUsage(String path) throws IOException { GetQuotaUsageRequestProto req = diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 37541538609..519bb01a81c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -839,6 +839,13 @@ message SatisfyStoragePolicyResponseProto { } +message IsStoragePolicySatisfierRunningRequestProto { // no parameters +} + +message IsStoragePolicySatisfierRunningResponseProto { + required bool running = 1; +} + service ClientNamenodeProtocol { rpc getBlockLocations(GetBlockLocationsRequestProto) returns(GetBlockLocationsResponseProto); @@ -1027,4 +1034,6 @@ service ClientNamenodeProtocol { returns(ListOpenFilesResponseProto); rpc satisfyStoragePolicy(SatisfyStoragePolicyRequestProto) returns(SatisfyStoragePolicyResponseProto); + rpc isStoragePolicySatisfierRunning(IsStoragePolicySatisfierRunningRequestProto) + returns(IsStoragePolicySatisfierRunningResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 55085eb2278..e2b931ec1b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -613,6 +613,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.mover.max-no-move-interval"; public static final int DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute + public static final String DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY = + "dfs.storage.policy.satisfier.activate"; + public static final boolean DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_DEFAULT = + true; + public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address"; public static final int DFS_DATANODE_DEFAULT_PORT = 9866; public static final String DFS_DATANODE_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_DEFAULT_PORT; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 8ffcf40bcf5..f338d4edccd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -159,6 +159,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFile import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; @@ -1859,6 +1861,22 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements } } + @Override + public IsStoragePolicySatisfierRunningResponseProto + isStoragePolicySatisfierRunning(RpcController controller, + IsStoragePolicySatisfierRunningRequestProto req) + throws ServiceException { + try { + boolean ret = server.isStoragePolicySatisfierRunning(); + IsStoragePolicySatisfierRunningResponseProto.Builder builder = + IsStoragePolicySatisfierRunningResponseProto.newBuilder(); + builder.setRunning(ret); + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + @Override public GetQuotaUsageResponseProto getQuotaUsage( RpcController controller, GetQuotaUsageRequestProto req) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java index 6bf298640f8..5cf4204940d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java @@ -30,7 +30,8 @@ public enum ExitStatus { IO_EXCEPTION(-4), ILLEGAL_ARGUMENTS(-5), INTERRUPTED(-6), - UNFINALIZED_UPGRADE(-7); + UNFINALIZED_UPGRADE(-7), + SKIPPED_DUE_TO_SPS(-8); private final int code; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 53fb0fbeaef..ad444eb321e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -471,7 +471,24 @@ public class BlockManager implements BlockStatsMXBean { DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); - sps = new StoragePolicySatisfier(namesystem, storageMovementNeeded, this); + final boolean storagePolicyEnabled = + conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT); + final boolean spsEnabled = + conf.getBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_DEFAULT); + if (storagePolicyEnabled && spsEnabled) { + sps = new StoragePolicySatisfier(namesystem, + storageMovementNeeded, this); + } else { + sps = null; + LOG.warn( + "Failed to start StoragePolicySatisfier" + + " since {} set to {} and {} set to {}.", + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, spsEnabled); + } blockTokenSecretManager = createBlockTokenSecretManager(conf); providedStorageMap = new ProvidedStorageMap(namesystem, this, conf); @@ -696,11 +713,15 @@ public class BlockManager implements BlockStatsMXBean { this.blockReportThread.start(); mxBeanName = MBeans.register("NameNode", "BlockStats", this); bmSafeMode.activate(blockTotal); - sps.start(); + if (sps != null) { + sps.start(); + } } public void close() { - sps.stop(); + if (sps != null) { + sps.stop(); + } bmSafeMode.close(); try { redundancyThread.interrupt(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index c3098f3cff0..e486317017e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -24,6 +24,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion; @@ -364,6 +365,8 @@ public interface HdfsServerConstants { String XATTR_ERASURECODING_POLICY = "system.hdfs.erasurecoding.policy"; + Path MOVER_ID_PATH = new Path("/system/mover.id"); + long BLOCK_GROUP_INDEX_MASK = 15; byte MAX_BLOCKS_IN_GROUP = 16; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index c5d14d25dae..ce78bde3ae9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -41,11 +41,14 @@ import org.apache.hadoop.hdfs.server.balancer.ExitStatus; import org.apache.hadoop.hdfs.server.balancer.Matcher; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.SecurityUtil; @@ -70,8 +73,6 @@ import java.util.concurrent.atomic.AtomicInteger; public class Mover { static final Log LOG = LogFactory.getLog(Mover.class); - static final Path MOVER_ID_PATH = new Path("/system/mover.id"); - private static class StorageMap { private final StorageGroupMap sources = new StorageGroupMap(); @@ -645,7 +646,7 @@ public class Mover { List connectors = Collections.emptyList(); try { connectors = NameNodeConnector.newNameNodeConnectors(namenodes, - Mover.class.getSimpleName(), MOVER_ID_PATH, conf, + Mover.class.getSimpleName(), HdfsServerConstants.MOVER_ID_PATH, conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS); while (connectors.size() > 0) { @@ -655,6 +656,22 @@ public class Mover { NameNodeConnector nnc = iter.next(); final Mover m = new Mover(nnc, conf, retryCount, excludedPinnedBlocks); + + boolean spsRunning; + try { + spsRunning = nnc.getDistributedFileSystem().getClient() + .isStoragePolicySatisfierRunning(); + } catch (StandbyException e) { + System.err.println("Skip Standby Namenode. " + nnc.toString()); + continue; + } + if (spsRunning) { + System.err.println("Mover failed due to StoragePolicySatisfier" + + " is running. Exiting with status " + + ExitStatus.SKIPPED_DUE_TO_SPS + "... "); + return ExitStatus.SKIPPED_DUE_TO_SPS.getExitCode(); + } + final ExitStatus r = m.run(); if (r == ExitStatus.SUCCESS) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index b8ad1dff700..abe9849ba59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3897,8 +3897,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // TODO: Handle blocks movement results send by the coordinator datanode. // This has to be revisited as part of HDFS-11029. - blockManager.getStoragePolicySatisfier() - .handleBlocksStorageMovementResults(blksMovementResults); + if (blockManager.getStoragePolicySatisfier() != null) { + blockManager.getStoragePolicySatisfier() + .handleBlocksStorageMovementResults(blksMovementResults); + } //create ha status final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index dcad3e19d07..2dbba7facf6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -2521,4 +2521,15 @@ public class NameNodeRpcServer implements NamenodeProtocols { namesystem.logAuditEvent(true, operationName, null); return result; } + + @Override + public boolean isStoragePolicySatisfierRunning() throws IOException { + checkNNStartup(); + if (nn.isStandbyState()) { + throw new StandbyException("Not supported by Standby Namenode."); + } + StoragePolicySatisfier sps = namesystem.getBlockManager() + .getStoragePolicySatisfier(); + return sps != null && sps.isRunning(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index 617ab2c7917..cc2ca7dbe49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -70,6 +72,7 @@ public class StoragePolicySatisfier implements Runnable { private final BlockManager blockManager; private final BlockStorageMovementNeeded storageMovementNeeded; private final BlockStorageMovementAttemptedItems storageMovementsMonitor; + private volatile boolean isRunning = false; public StoragePolicySatisfier(final Namesystem namesystem, final BlockStorageMovementNeeded storageMovementNeeded, @@ -99,6 +102,7 @@ public class StoragePolicySatisfier implements Runnable { * Stop storage policy satisfier demon thread. */ public void stop() { + isRunning = false; if (storagePolicySatisfierThread == null) { return; } @@ -110,8 +114,40 @@ public class StoragePolicySatisfier implements Runnable { this.storageMovementsMonitor.stop(); } + /** + * Check whether StoragePolicySatisfier is running. + * @return true if running + */ + public boolean isRunning() { + return isRunning; + } + + // Return true if a Mover instance is running + private boolean checkIfMoverRunning() { + boolean ret = false; + try { + String moverId = HdfsServerConstants.MOVER_ID_PATH.toString(); + INode inode = namesystem.getFSDirectory().getINode( + moverId, FSDirectory.DirOp.READ); + if (inode != null) { + ret = true; + } + } catch (IOException e) { + LOG.info("StoragePolicySatisfier is enabled as no Mover ID file found."); + ret = false; + } + return ret; + } + @Override public void run() { + isRunning = !checkIfMoverRunning(); + if (!isRunning) { + LOG.error("StoragePolicySatisfier thread stopped " + + "as Mover ID file " + HdfsServerConstants.MOVER_ID_PATH.toString() + + " exists"); + return; + } while (namesystem.isRunning()) { try { Long blockCollectionID = storageMovementNeeded.get(); @@ -123,6 +159,7 @@ public class StoragePolicySatisfier implements Runnable { // we want to check block movements. Thread.sleep(3000); } catch (Throwable t) { + isRunning = false; if (!namesystem.isRunning()) { LOG.info("Stopping StoragePolicySatisfier."); if (!(t instanceof InterruptedException)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 8eaf2a4057e..10321418bd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4495,6 +4495,15 @@ + + dfs.storage.policy.satisfier.activate + true + + If true, activate StoragePolicySatisfier. + By default, StoragePolicySatisfier is activated. + + + dfs.pipeline.ecn false diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java index 1eb44e057e7..8e02d4106b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java @@ -67,6 +67,8 @@ public class TestStoragePolicySatisfyWorker { conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); + conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, + true); } @Before diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index 62c91bf9e33..c396387e63c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.balancer.TestBalancer; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.mover.Mover.MLocation; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.http.HttpConfig; @@ -113,6 +114,8 @@ public class TestMover { conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); + conf.setBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false); } static Mover newMover(Configuration conf) throws IOException { @@ -124,7 +127,7 @@ public class TestMover { } final List nncs = NameNodeConnector.newNameNodeConnectors( - nnMap, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf, + nnMap, Mover.class.getSimpleName(), HdfsServerConstants.MOVER_ID_PATH, conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS); return new Mover(nncs.get(0), conf, new AtomicInteger(0), new HashMap<>()); } @@ -132,6 +135,8 @@ public class TestMover { @Test public void testScheduleSameBlock() throws IOException { final Configuration conf = new HdfsConfiguration(); + conf.setBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(4).build(); try { @@ -454,8 +459,11 @@ public class TestMover { */ @Test public void testMoverCli() throws Exception { + final Configuration clusterConf = new HdfsConfiguration(); + clusterConf.setBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false); final MiniDFSCluster cluster = new MiniDFSCluster - .Builder(new HdfsConfiguration()).numDataNodes(0).build(); + .Builder(clusterConf).numDataNodes(0).build(); try { final Configuration conf = cluster.getConfiguration(0); try { @@ -487,8 +495,10 @@ public class TestMover { @Test public void testMoverCliWithHAConf() throws Exception { final Configuration conf = new HdfsConfiguration(); + conf.setBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false); final MiniDFSCluster cluster = new MiniDFSCluster - .Builder(new HdfsConfiguration()) + .Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(0).build(); HATestUtil.setFailoverConfigurations(cluster, conf, "MyCluster"); @@ -509,11 +519,16 @@ public class TestMover { @Test public void testMoverCliWithFederation() throws Exception { + final Configuration clusterConf = new HdfsConfiguration(); + clusterConf.setBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false); final MiniDFSCluster cluster = new MiniDFSCluster - .Builder(new HdfsConfiguration()) + .Builder(clusterConf) .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3)) .numDataNodes(0).build(); final Configuration conf = new HdfsConfiguration(); + conf.setBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false); DFSTestUtil.setFederatedConfiguration(cluster, conf); try { Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); @@ -557,11 +572,16 @@ public class TestMover { @Test public void testMoverCliWithFederationHA() throws Exception { + final Configuration clusterConf = new HdfsConfiguration(); + clusterConf.setBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false); final MiniDFSCluster cluster = new MiniDFSCluster - .Builder(new HdfsConfiguration()) + .Builder(clusterConf) .nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(3)) .numDataNodes(0).build(); final Configuration conf = new HdfsConfiguration(); + conf.setBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false); DFSTestUtil.setFederatedHAConfiguration(cluster, conf); try { Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); @@ -625,6 +645,8 @@ public class TestMover { public void testMoveWhenStoragePolicyNotSatisfying() throws Exception { // HDFS-8147 final Configuration conf = new HdfsConfiguration(); + conf.setBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(3) .storageTypes( @@ -650,6 +672,36 @@ public class TestMover { } } + @Test(timeout = 300000) + public void testMoveWhenStoragePolicySatisfierIsRunning() throws Exception { + final Configuration conf = new HdfsConfiguration(); + conf.setBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, true); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(3) + .storageTypes( + new StorageType[][] {{StorageType.DISK}, {StorageType.DISK}, + {StorageType.DISK}}).build(); + try { + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String file = "/testMoveWhenStoragePolicySatisfierIsRunning"; + // write to DISK + final FSDataOutputStream out = dfs.create(new Path(file)); + out.writeChars("testMoveWhenStoragePolicySatisfierIsRunning"); + out.close(); + + // move to ARCHIVE + dfs.setStoragePolicy(new Path(file), "COLD"); + int rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] {"-p", file.toString()}); + int exitcode = ExitStatus.SKIPPED_DUE_TO_SPS.getExitCode(); + Assert.assertEquals("Exit code should be " + exitcode, exitcode, rc); + } finally { + cluster.shutdown(); + } + } + @Test public void testMoverFailedRetry() throws Exception { // HDFS-8147 @@ -746,6 +798,8 @@ public class TestMover { 1L); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, false); + conf.setBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false); } @Test(timeout = 300000) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java index 356ae3a8b2b..d54864929f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java @@ -96,6 +96,8 @@ public class TestStorageMover { DEFAULT_CONF.setLong( DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 2L); DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L); + DEFAULT_CONF.setBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false); DEFAULT_POLICIES = BlockStoragePolicySuite.createDefaultSuite(); HOT = DEFAULT_POLICIES.getPolicy(HdfsConstants.HOT_STORAGE_POLICY_NAME); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 499fe3c56f2..fe23f3e4586 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -31,12 +31,14 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Before; @@ -442,6 +444,27 @@ public class TestStoragePolicySatisfier { } } + /** + * Tests to verify that SPS should not start when a Mover instance + * is running. + */ + @Test(timeout = 300000) + public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier() + throws IOException { + try { + // Simulate Mover by creating MOVER_ID file + DFSTestUtil.createFile(hdfsCluster.getFileSystem(), + HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0); + hdfsCluster.restartNameNode(true); + boolean running = hdfsCluster.getFileSystem() + .getClient().isStoragePolicySatisfierRunning(); + Assert.assertFalse("SPS should not start " + + "when a Mover instance is running", running); + } finally { + hdfsCluster.shutdown(); + } + } + private void waitForAttemptedItems(long expectedBlkMovAttemptedCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();