HDFS-10885. [SPS]: Mover tool should not be allowed to run when Storage Policy Satisfier is on. Contributed by Wei Zhou
This commit is contained in:
parent
b07291e176
commit
cd5262aba0
|
@ -3109,6 +3109,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
}
|
||||
}
|
||||
|
||||
public boolean isStoragePolicySatisfierRunning() throws IOException {
|
||||
return namenode.isStoragePolicySatisfierRunning();
|
||||
}
|
||||
|
||||
Tracer getTracer() {
|
||||
return tracer;
|
||||
}
|
||||
|
|
|
@ -1756,4 +1756,12 @@ public interface ClientProtocol {
|
|||
*/
|
||||
@Idempotent
|
||||
void satisfyStoragePolicy(String path) throws IOException;
|
||||
|
||||
/**
|
||||
* Check if StoragePolicySatisfier is running.
|
||||
* @return true if StoragePolicySatisfier is running
|
||||
* @throws IOException
|
||||
*/
|
||||
@Idempotent
|
||||
boolean isStoragePolicySatisfierRunning() throws IOException;
|
||||
}
|
||||
|
|
|
@ -147,6 +147,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSto
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
|
||||
|
@ -295,6 +297,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
private final static GetErasureCodingCodecsRequestProto
|
||||
VOID_GET_EC_CODEC_REQUEST = GetErasureCodingCodecsRequestProto
|
||||
.newBuilder().build();
|
||||
private final static IsStoragePolicySatisfierRunningRequestProto
|
||||
VOID_IS_SPS_RUNNING_REQUEST = IsStoragePolicySatisfierRunningRequestProto
|
||||
.newBuilder().build();
|
||||
|
||||
|
||||
public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) {
|
||||
rpcProxy = proxy;
|
||||
|
@ -1901,6 +1907,18 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStoragePolicySatisfierRunning() throws IOException {
|
||||
try {
|
||||
IsStoragePolicySatisfierRunningResponseProto rep =
|
||||
rpcProxy.isStoragePolicySatisfierRunning(null,
|
||||
VOID_IS_SPS_RUNNING_REQUEST);
|
||||
return rep.getRunning();
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public QuotaUsage getQuotaUsage(String path) throws IOException {
|
||||
GetQuotaUsageRequestProto req =
|
||||
|
|
|
@ -839,6 +839,13 @@ message SatisfyStoragePolicyResponseProto {
|
|||
|
||||
}
|
||||
|
||||
message IsStoragePolicySatisfierRunningRequestProto { // no parameters
|
||||
}
|
||||
|
||||
message IsStoragePolicySatisfierRunningResponseProto {
|
||||
required bool running = 1;
|
||||
}
|
||||
|
||||
service ClientNamenodeProtocol {
|
||||
rpc getBlockLocations(GetBlockLocationsRequestProto)
|
||||
returns(GetBlockLocationsResponseProto);
|
||||
|
@ -1027,4 +1034,6 @@ service ClientNamenodeProtocol {
|
|||
returns(ListOpenFilesResponseProto);
|
||||
rpc satisfyStoragePolicy(SatisfyStoragePolicyRequestProto)
|
||||
returns(SatisfyStoragePolicyResponseProto);
|
||||
rpc isStoragePolicySatisfierRunning(IsStoragePolicySatisfierRunningRequestProto)
|
||||
returns(IsStoragePolicySatisfierRunningResponseProto);
|
||||
}
|
||||
|
|
|
@ -613,6 +613,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.mover.max-no-move-interval";
|
||||
public static final int DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
|
||||
|
||||
public static final String DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY =
|
||||
"dfs.storage.policy.satisfier.activate";
|
||||
public static final boolean DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_DEFAULT =
|
||||
true;
|
||||
|
||||
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
|
||||
public static final int DFS_DATANODE_DEFAULT_PORT = 9866;
|
||||
public static final String DFS_DATANODE_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_DEFAULT_PORT;
|
||||
|
|
|
@ -159,6 +159,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFile
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpgradeStatusResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
|
||||
|
@ -1859,6 +1861,22 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IsStoragePolicySatisfierRunningResponseProto
|
||||
isStoragePolicySatisfierRunning(RpcController controller,
|
||||
IsStoragePolicySatisfierRunningRequestProto req)
|
||||
throws ServiceException {
|
||||
try {
|
||||
boolean ret = server.isStoragePolicySatisfierRunning();
|
||||
IsStoragePolicySatisfierRunningResponseProto.Builder builder =
|
||||
IsStoragePolicySatisfierRunningResponseProto.newBuilder();
|
||||
builder.setRunning(ret);
|
||||
return builder.build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetQuotaUsageResponseProto getQuotaUsage(
|
||||
RpcController controller, GetQuotaUsageRequestProto req)
|
||||
|
|
|
@ -30,7 +30,8 @@ public enum ExitStatus {
|
|||
IO_EXCEPTION(-4),
|
||||
ILLEGAL_ARGUMENTS(-5),
|
||||
INTERRUPTED(-6),
|
||||
UNFINALIZED_UPGRADE(-7);
|
||||
UNFINALIZED_UPGRADE(-7),
|
||||
SKIPPED_DUE_TO_SPS(-8);
|
||||
|
||||
private final int code;
|
||||
|
||||
|
|
|
@ -471,7 +471,24 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
|
||||
* 1000L);
|
||||
|
||||
sps = new StoragePolicySatisfier(namesystem, storageMovementNeeded, this);
|
||||
final boolean storagePolicyEnabled =
|
||||
conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT);
|
||||
final boolean spsEnabled =
|
||||
conf.getBoolean(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY,
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_DEFAULT);
|
||||
if (storagePolicyEnabled && spsEnabled) {
|
||||
sps = new StoragePolicySatisfier(namesystem,
|
||||
storageMovementNeeded, this);
|
||||
} else {
|
||||
sps = null;
|
||||
LOG.warn(
|
||||
"Failed to start StoragePolicySatisfier"
|
||||
+ " since {} set to {} and {} set to {}.",
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled,
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, spsEnabled);
|
||||
}
|
||||
blockTokenSecretManager = createBlockTokenSecretManager(conf);
|
||||
|
||||
providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
|
||||
|
@ -696,11 +713,15 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
this.blockReportThread.start();
|
||||
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
|
||||
bmSafeMode.activate(blockTotal);
|
||||
sps.start();
|
||||
if (sps != null) {
|
||||
sps.start();
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
sps.stop();
|
||||
if (sps != null) {
|
||||
sps.stop();
|
||||
}
|
||||
bmSafeMode.close();
|
||||
try {
|
||||
redundancyThread.interrupt();
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.regex.Matcher;
|
|||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
|
||||
|
@ -364,6 +365,8 @@ public interface HdfsServerConstants {
|
|||
String XATTR_ERASURECODING_POLICY =
|
||||
"system.hdfs.erasurecoding.policy";
|
||||
|
||||
Path MOVER_ID_PATH = new Path("/system/mover.id");
|
||||
|
||||
long BLOCK_GROUP_INDEX_MASK = 15;
|
||||
byte MAX_BLOCKS_IN_GROUP = 16;
|
||||
}
|
||||
|
|
|
@ -41,11 +41,14 @@ import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
|
|||
import org.apache.hadoop.hdfs.server.balancer.Matcher;
|
||||
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
@ -70,8 +73,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
public class Mover {
|
||||
static final Log LOG = LogFactory.getLog(Mover.class);
|
||||
|
||||
static final Path MOVER_ID_PATH = new Path("/system/mover.id");
|
||||
|
||||
private static class StorageMap {
|
||||
private final StorageGroupMap<Source> sources
|
||||
= new StorageGroupMap<Source>();
|
||||
|
@ -645,7 +646,7 @@ public class Mover {
|
|||
List<NameNodeConnector> connectors = Collections.emptyList();
|
||||
try {
|
||||
connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
|
||||
Mover.class.getSimpleName(), MOVER_ID_PATH, conf,
|
||||
Mover.class.getSimpleName(), HdfsServerConstants.MOVER_ID_PATH, conf,
|
||||
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
|
||||
|
||||
while (connectors.size() > 0) {
|
||||
|
@ -655,6 +656,22 @@ public class Mover {
|
|||
NameNodeConnector nnc = iter.next();
|
||||
final Mover m = new Mover(nnc, conf, retryCount,
|
||||
excludedPinnedBlocks);
|
||||
|
||||
boolean spsRunning;
|
||||
try {
|
||||
spsRunning = nnc.getDistributedFileSystem().getClient()
|
||||
.isStoragePolicySatisfierRunning();
|
||||
} catch (StandbyException e) {
|
||||
System.err.println("Skip Standby Namenode. " + nnc.toString());
|
||||
continue;
|
||||
}
|
||||
if (spsRunning) {
|
||||
System.err.println("Mover failed due to StoragePolicySatisfier"
|
||||
+ " is running. Exiting with status "
|
||||
+ ExitStatus.SKIPPED_DUE_TO_SPS + "... ");
|
||||
return ExitStatus.SKIPPED_DUE_TO_SPS.getExitCode();
|
||||
}
|
||||
|
||||
final ExitStatus r = m.run();
|
||||
|
||||
if (r == ExitStatus.SUCCESS) {
|
||||
|
|
|
@ -3897,8 +3897,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
|
||||
// TODO: Handle blocks movement results send by the coordinator datanode.
|
||||
// This has to be revisited as part of HDFS-11029.
|
||||
blockManager.getStoragePolicySatisfier()
|
||||
.handleBlocksStorageMovementResults(blksMovementResults);
|
||||
if (blockManager.getStoragePolicySatisfier() != null) {
|
||||
blockManager.getStoragePolicySatisfier()
|
||||
.handleBlocksStorageMovementResults(blksMovementResults);
|
||||
}
|
||||
|
||||
//create ha status
|
||||
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
|
||||
|
|
|
@ -2521,4 +2521,15 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
|||
namesystem.logAuditEvent(true, operationName, null);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStoragePolicySatisfierRunning() throws IOException {
|
||||
checkNNStartup();
|
||||
if (nn.isStandbyState()) {
|
||||
throw new StandbyException("Not supported by Standby Namenode.");
|
||||
}
|
||||
StoragePolicySatisfier sps = namesystem.getBlockManager()
|
||||
.getStoragePolicySatisfier();
|
||||
return sps != null && sps.isRunning();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
|||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
|
@ -70,6 +72,7 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
private final BlockManager blockManager;
|
||||
private final BlockStorageMovementNeeded storageMovementNeeded;
|
||||
private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
|
||||
private volatile boolean isRunning = false;
|
||||
|
||||
public StoragePolicySatisfier(final Namesystem namesystem,
|
||||
final BlockStorageMovementNeeded storageMovementNeeded,
|
||||
|
@ -99,6 +102,7 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
* Stop storage policy satisfier demon thread.
|
||||
*/
|
||||
public void stop() {
|
||||
isRunning = false;
|
||||
if (storagePolicySatisfierThread == null) {
|
||||
return;
|
||||
}
|
||||
|
@ -110,8 +114,40 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
this.storageMovementsMonitor.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether StoragePolicySatisfier is running.
|
||||
* @return true if running
|
||||
*/
|
||||
public boolean isRunning() {
|
||||
return isRunning;
|
||||
}
|
||||
|
||||
// Return true if a Mover instance is running
|
||||
private boolean checkIfMoverRunning() {
|
||||
boolean ret = false;
|
||||
try {
|
||||
String moverId = HdfsServerConstants.MOVER_ID_PATH.toString();
|
||||
INode inode = namesystem.getFSDirectory().getINode(
|
||||
moverId, FSDirectory.DirOp.READ);
|
||||
if (inode != null) {
|
||||
ret = true;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.info("StoragePolicySatisfier is enabled as no Mover ID file found.");
|
||||
ret = false;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
isRunning = !checkIfMoverRunning();
|
||||
if (!isRunning) {
|
||||
LOG.error("StoragePolicySatisfier thread stopped "
|
||||
+ "as Mover ID file " + HdfsServerConstants.MOVER_ID_PATH.toString()
|
||||
+ " exists");
|
||||
return;
|
||||
}
|
||||
while (namesystem.isRunning()) {
|
||||
try {
|
||||
Long blockCollectionID = storageMovementNeeded.get();
|
||||
|
@ -123,6 +159,7 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
// we want to check block movements.
|
||||
Thread.sleep(3000);
|
||||
} catch (Throwable t) {
|
||||
isRunning = false;
|
||||
if (!namesystem.isRunning()) {
|
||||
LOG.info("Stopping StoragePolicySatisfier.");
|
||||
if (!(t instanceof InterruptedException)) {
|
||||
|
|
|
@ -4495,6 +4495,15 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.storage.policy.satisfier.activate</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
If true, activate StoragePolicySatisfier.
|
||||
By default, StoragePolicySatisfier is activated.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.pipeline.ecn</name>
|
||||
<value>false</value>
|
||||
|
|
|
@ -67,6 +67,8 @@ public class TestStoragePolicySatisfyWorker {
|
|||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
|
||||
1L);
|
||||
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY,
|
||||
true);
|
||||
}
|
||||
|
||||
@Before
|
||||
|
|
|
@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
|
|||
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
|
@ -113,6 +114,8 @@ public class TestMover {
|
|||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
|
||||
1L);
|
||||
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false);
|
||||
}
|
||||
|
||||
static Mover newMover(Configuration conf) throws IOException {
|
||||
|
@ -124,7 +127,7 @@ public class TestMover {
|
|||
}
|
||||
|
||||
final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors(
|
||||
nnMap, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf,
|
||||
nnMap, Mover.class.getSimpleName(), HdfsServerConstants.MOVER_ID_PATH, conf,
|
||||
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
|
||||
return new Mover(nncs.get(0), conf, new AtomicInteger(0), new HashMap<>());
|
||||
}
|
||||
|
@ -132,6 +135,8 @@ public class TestMover {
|
|||
@Test
|
||||
public void testScheduleSameBlock() throws IOException {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false);
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(4).build();
|
||||
try {
|
||||
|
@ -454,8 +459,11 @@ public class TestMover {
|
|||
*/
|
||||
@Test
|
||||
public void testMoverCli() throws Exception {
|
||||
final Configuration clusterConf = new HdfsConfiguration();
|
||||
clusterConf.setBoolean(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false);
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster
|
||||
.Builder(new HdfsConfiguration()).numDataNodes(0).build();
|
||||
.Builder(clusterConf).numDataNodes(0).build();
|
||||
try {
|
||||
final Configuration conf = cluster.getConfiguration(0);
|
||||
try {
|
||||
|
@ -487,8 +495,10 @@ public class TestMover {
|
|||
@Test
|
||||
public void testMoverCliWithHAConf() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false);
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster
|
||||
.Builder(new HdfsConfiguration())
|
||||
.Builder(conf)
|
||||
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||
.numDataNodes(0).build();
|
||||
HATestUtil.setFailoverConfigurations(cluster, conf, "MyCluster");
|
||||
|
@ -509,11 +519,16 @@ public class TestMover {
|
|||
|
||||
@Test
|
||||
public void testMoverCliWithFederation() throws Exception {
|
||||
final Configuration clusterConf = new HdfsConfiguration();
|
||||
clusterConf.setBoolean(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false);
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster
|
||||
.Builder(new HdfsConfiguration())
|
||||
.Builder(clusterConf)
|
||||
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3))
|
||||
.numDataNodes(0).build();
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false);
|
||||
DFSTestUtil.setFederatedConfiguration(cluster, conf);
|
||||
try {
|
||||
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||
|
@ -557,11 +572,16 @@ public class TestMover {
|
|||
|
||||
@Test
|
||||
public void testMoverCliWithFederationHA() throws Exception {
|
||||
final Configuration clusterConf = new HdfsConfiguration();
|
||||
clusterConf.setBoolean(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false);
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster
|
||||
.Builder(new HdfsConfiguration())
|
||||
.Builder(clusterConf)
|
||||
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(3))
|
||||
.numDataNodes(0).build();
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false);
|
||||
DFSTestUtil.setFederatedHAConfiguration(cluster, conf);
|
||||
try {
|
||||
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||
|
@ -625,6 +645,8 @@ public class TestMover {
|
|||
public void testMoveWhenStoragePolicyNotSatisfying() throws Exception {
|
||||
// HDFS-8147
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false);
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(3)
|
||||
.storageTypes(
|
||||
|
@ -650,6 +672,36 @@ public class TestMover {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testMoveWhenStoragePolicySatisfierIsRunning() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, true);
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(3)
|
||||
.storageTypes(
|
||||
new StorageType[][] {{StorageType.DISK}, {StorageType.DISK},
|
||||
{StorageType.DISK}}).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final String file = "/testMoveWhenStoragePolicySatisfierIsRunning";
|
||||
// write to DISK
|
||||
final FSDataOutputStream out = dfs.create(new Path(file));
|
||||
out.writeChars("testMoveWhenStoragePolicySatisfierIsRunning");
|
||||
out.close();
|
||||
|
||||
// move to ARCHIVE
|
||||
dfs.setStoragePolicy(new Path(file), "COLD");
|
||||
int rc = ToolRunner.run(conf, new Mover.Cli(),
|
||||
new String[] {"-p", file.toString()});
|
||||
int exitcode = ExitStatus.SKIPPED_DUE_TO_SPS.getExitCode();
|
||||
Assert.assertEquals("Exit code should be " + exitcode, exitcode, rc);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoverFailedRetry() throws Exception {
|
||||
// HDFS-8147
|
||||
|
@ -746,6 +798,8 @@ public class TestMover {
|
|||
1L);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
|
||||
false);
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false);
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
|
|
|
@ -96,6 +96,8 @@ public class TestStorageMover {
|
|||
DEFAULT_CONF.setLong(
|
||||
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 2L);
|
||||
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L);
|
||||
DEFAULT_CONF.setBoolean(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, false);
|
||||
|
||||
DEFAULT_POLICIES = BlockStoragePolicySuite.createDefaultSuite();
|
||||
HOT = DEFAULT_POLICIES.getPolicy(HdfsConstants.HOT_STORAGE_POLICY_NAME);
|
||||
|
|
|
@ -31,12 +31,14 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -442,6 +444,27 @@ public class TestStoragePolicySatisfier {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests to verify that SPS should not start when a Mover instance
|
||||
* is running.
|
||||
*/
|
||||
@Test(timeout = 300000)
|
||||
public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier()
|
||||
throws IOException {
|
||||
try {
|
||||
// Simulate Mover by creating MOVER_ID file
|
||||
DFSTestUtil.createFile(hdfsCluster.getFileSystem(),
|
||||
HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0);
|
||||
hdfsCluster.restartNameNode(true);
|
||||
boolean running = hdfsCluster.getFileSystem()
|
||||
.getClient().isStoragePolicySatisfierRunning();
|
||||
Assert.assertFalse("SPS should not start "
|
||||
+ "when a Mover instance is running", running);
|
||||
} finally {
|
||||
hdfsCluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
|
||||
int timeout) throws TimeoutException, InterruptedException {
|
||||
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
|
||||
|
|
Loading…
Reference in New Issue