HDFS-13174. hdfs mover -p /path times out after 20 min. Contributed by Istvan Fajth.
(cherry picked from commitc966a3837a
) (cherry picked from commit975d4b3d60
)
This commit is contained in:
parent
fe1926a50f
commit
73badce597
|
@ -533,7 +533,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout";
|
||||
public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
|
||||
public static final String DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.balancer.max-no-move-interval";
|
||||
public static final int DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
|
||||
public static final int DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
|
||||
public static final String DFS_BALANCER_MAX_ITERATION_TIME_KEY = "dfs.balancer.max-iteration-time";
|
||||
public static final long DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT = 20 * 60 * 1000L; // 20 mins
|
||||
|
||||
|
||||
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
|
||||
|
|
|
@ -289,13 +289,17 @@ public class Balancer {
|
|||
final int maxNoMoveInterval = conf.getInt(
|
||||
DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT);
|
||||
final long maxIterationTime = conf.getLong(
|
||||
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY,
|
||||
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT);
|
||||
|
||||
this.nnc = theblockpool;
|
||||
this.dispatcher =
|
||||
new Dispatcher(theblockpool, p.getIncludedNodes(),
|
||||
p.getExcludedNodes(), movedWinWidth, moverThreads,
|
||||
dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
|
||||
getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval, conf);
|
||||
getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval,
|
||||
maxIterationTime, conf);
|
||||
this.threshold = p.getThreshold();
|
||||
this.policy = p.getBalancingPolicy();
|
||||
this.sourceNodes = p.getSourceNodes();
|
||||
|
|
|
@ -138,6 +138,8 @@ public class Dispatcher {
|
|||
private final boolean connectToDnViaHostname;
|
||||
private BlockPlacementPolicies placementPolicies;
|
||||
|
||||
private long maxIterationTime;
|
||||
|
||||
static class Allocator {
|
||||
private final int max;
|
||||
private int count = 0;
|
||||
|
@ -346,13 +348,19 @@ public class Dispatcher {
|
|||
|
||||
/** Dispatch the move to the proxy source & wait for the response. */
|
||||
private void dispatch() {
|
||||
LOG.info("Start moving " + this);
|
||||
assert !(reportedBlock instanceof DBlockStriped);
|
||||
|
||||
Socket sock = new Socket();
|
||||
DataOutputStream out = null;
|
||||
DataInputStream in = null;
|
||||
try {
|
||||
if (source.isIterationOver()){
|
||||
LOG.info("Cancel moving " + this +
|
||||
" as iteration is already cancelled due to" +
|
||||
" dfs.balancer.max-iteration-time is passed.");
|
||||
throw new IOException("Block move cancelled.");
|
||||
}
|
||||
LOG.info("Start moving " + this);
|
||||
assert !(reportedBlock instanceof DBlockStriped);
|
||||
|
||||
sock.connect(
|
||||
NetUtils.createSocketAddr(target.getDatanodeInfo().
|
||||
getXferAddr(Dispatcher.this.connectToDnViaHostname)),
|
||||
|
@ -760,7 +768,10 @@ public class Dispatcher {
|
|||
* Check if the iteration is over
|
||||
*/
|
||||
public boolean isIterationOver() {
|
||||
return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME);
|
||||
if (maxIterationTime < 0){
|
||||
return false;
|
||||
}
|
||||
return (Time.monotonicNow()-startTime > maxIterationTime);
|
||||
}
|
||||
|
||||
/** Add a task */
|
||||
|
@ -908,8 +919,6 @@ public class Dispatcher {
|
|||
return blocksToReceive > 0;
|
||||
}
|
||||
|
||||
private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins
|
||||
|
||||
/**
|
||||
* This method iteratively does the following: it first selects a block to
|
||||
* move, then sends a request to the proxy source to start the block move
|
||||
|
@ -990,7 +999,7 @@ public class Dispatcher {
|
|||
}
|
||||
|
||||
if (isIterationOver()) {
|
||||
LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000
|
||||
LOG.info("The maximum iteration time (" + maxIterationTime/1000
|
||||
+ " seconds) has been reached. Stopping " + this);
|
||||
}
|
||||
}
|
||||
|
@ -1013,14 +1022,14 @@ public class Dispatcher {
|
|||
int maxNoMoveInterval, Configuration conf) {
|
||||
this(nnc, includedNodes, excludedNodes, movedWinWidth,
|
||||
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
|
||||
0L, 0L, 0, maxNoMoveInterval, conf);
|
||||
0L, 0L, 0, maxNoMoveInterval, -1, conf);
|
||||
}
|
||||
|
||||
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
||||
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
||||
int dispatcherThreads, int maxConcurrentMovesPerNode,
|
||||
long getBlocksSize, long getBlocksMinBlockSize,
|
||||
int blockMoveTimeout, int maxNoMoveInterval, Configuration conf) {
|
||||
long getBlocksSize, long getBlocksMinBlockSize, int blockMoveTimeout,
|
||||
int maxNoMoveInterval, long maxIterationTime, Configuration conf) {
|
||||
this.nnc = nnc;
|
||||
this.excludedNodes = excludedNodes;
|
||||
this.includedNodes = includedNodes;
|
||||
|
@ -1047,6 +1056,7 @@ public class Dispatcher {
|
|||
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
|
||||
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
||||
placementPolicies = new BlockPlacementPolicies(conf, null, cluster, null);
|
||||
this.maxIterationTime = maxIterationTime;
|
||||
}
|
||||
|
||||
public DistributedFileSystem getDistributedFileSystem() {
|
||||
|
|
|
@ -3537,6 +3537,16 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.balancer.max-iteration-time</name>
|
||||
<value>1200000</value>
|
||||
<description>
|
||||
Maximum amount of time while an iteration can be run by the Balancer. After
|
||||
this time the Balancer will stop the iteration, and reevaluate the work
|
||||
needs to be done to Balance the cluster. The default value is 20 minutes.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.block.invalidate.limit</name>
|
||||
<value>1000</value>
|
||||
|
|
|
@ -1580,6 +1580,85 @@ public class TestBalancer {
|
|||
CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 100000)
|
||||
public void testMaxIterationTime() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
int blockSize = 10*1024*1024; // 10MB block size
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
|
||||
// limit the worker thread count of Balancer to have only 1 queue per DN
|
||||
conf.setInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, 1);
|
||||
// limit the bandwitdh to 1 packet per sec to emulate slow block moves
|
||||
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
|
||||
64 * 1024);
|
||||
// set client socket timeout to have an IN_PROGRESS notification back from
|
||||
// the DataNode about the copy in every second.
|
||||
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2000L);
|
||||
// set max iteration time to 2 seconds to timeout before moving any block
|
||||
conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 2000L);
|
||||
// setup the cluster
|
||||
final long capacity = 10L * blockSize;
|
||||
final long[] dnCapacities = new long[] {capacity, capacity};
|
||||
final short rep = 1;
|
||||
final long seed = 0xFAFAFA;
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(0)
|
||||
.build();
|
||||
try {
|
||||
cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||
cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
|
||||
cluster.waitClusterUp();
|
||||
cluster.waitActive();
|
||||
final Path path = new Path("/testMaxIterationTime.dat");
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
// fill the DN to 40%
|
||||
DFSTestUtil.createFile(fs, path, 4L * blockSize, rep, seed);
|
||||
// start a new DN
|
||||
cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
|
||||
cluster.triggerHeartbeats();
|
||||
// setup Balancer and run one iteration
|
||||
List<NameNodeConnector> connectors = Collections.emptyList();
|
||||
try {
|
||||
BalancerParameters bParams = BalancerParameters.DEFAULT;
|
||||
connectors = NameNodeConnector.newNameNodeConnectors(
|
||||
DFSUtil.getInternalNsRpcUris(conf), Balancer.class.getSimpleName(),
|
||||
Balancer.BALANCER_ID_PATH, conf, bParams.getMaxIdleIteration());
|
||||
for (NameNodeConnector nnc : connectors) {
|
||||
LOG.info("NNC to work on: " + nnc);
|
||||
Balancer b = new Balancer(nnc, bParams, conf);
|
||||
long startTime = Time.monotonicNow();
|
||||
Result r = b.runOneIteration();
|
||||
long runtime = Time.monotonicNow() - startTime;
|
||||
assertEquals("We expect ExitStatus.IN_PROGRESS to be reported.",
|
||||
ExitStatus.IN_PROGRESS, r.exitStatus);
|
||||
// accept runtime if it is under 3.5 seconds, as we need to wait for
|
||||
// IN_PROGRESS report from DN, and some spare to be able to finish.
|
||||
// NOTE: This can be a source of flaky tests, if the box is busy,
|
||||
// assertion here is based on the following: Balancer is already set
|
||||
// up, iteration gets the blocks from the NN, and makes the decision
|
||||
// to move 2 blocks. After that the PendingMoves are scheduled, and
|
||||
// DataNode heartbeats in for the Balancer every second, iteration is
|
||||
// two seconds long. This means that it will fail if the setup and the
|
||||
// heartbeat from the DataNode takes more than 500ms, as the iteration
|
||||
// should end at the 3rd second from start. As the number of
|
||||
// operations seems to be pretty low, and all comm happens locally, I
|
||||
// think the possibility of a failure due to node busyness is low.
|
||||
assertTrue("Unexpected iteration runtime: " + runtime + "ms > 3.5s",
|
||||
runtime < 3500);
|
||||
}
|
||||
} finally {
|
||||
for (NameNodeConnector nnc : connectors) {
|
||||
IOUtils.cleanupWithLogger(null, nnc);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
cluster.shutdown(true, true);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Test Balancer with Ram_Disk configured
|
||||
* One DN has two files on RAM_DISK, other DN has no files on RAM_DISK.
|
||||
|
|
|
@ -476,6 +476,52 @@ public class TestMover {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerMaxIterationTimeNotAffectMover() throws Exception {
|
||||
long blockSize = 10*1024*1024;
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
conf.setInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 1);
|
||||
conf.setInt(
|
||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 1);
|
||||
// set a fairly large block size to run into the limitation
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||
conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
|
||||
// set a somewhat grater than zero max iteration time to have the move time
|
||||
// to surely exceed it
|
||||
conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 200L);
|
||||
conf.setInt(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, 1);
|
||||
// set client socket timeout to have an IN_PROGRESS notification back from
|
||||
// the DataNode about the copy in every second.
|
||||
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000L);
|
||||
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(2)
|
||||
.storageTypes(
|
||||
new StorageType[][] {{StorageType.DISK, StorageType.DISK},
|
||||
{StorageType.ARCHIVE, StorageType.ARCHIVE}})
|
||||
.build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||
final String file = "/testMaxIterationTime.dat";
|
||||
final Path path = new Path(file);
|
||||
short rep_factor = 1;
|
||||
int seed = 0xFAFAFA;
|
||||
// write to DISK
|
||||
DFSTestUtil.createFile(fs, path, 4L * blockSize, rep_factor, seed);
|
||||
|
||||
// move to ARCHIVE
|
||||
fs.setStoragePolicy(new Path(file), "COLD");
|
||||
int rc = ToolRunner.run(conf, new Mover.Cli(),
|
||||
new String[] {"-p", file});
|
||||
Assert.assertEquals("Retcode expected to be ExitStatus.SUCCESS (0).",
|
||||
ExitStatus.SUCCESS.getExitCode(), rc);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private final ErasureCodingPolicy ecPolicy =
|
||||
StripedFileTestUtil.getDefaultECPolicy();
|
||||
private final int dataBlocks = ecPolicy.getNumDataUnits();
|
||||
|
|
Loading…
Reference in New Issue