HDFS-8143. Mover should exit after some retry when failed to move blocks. Contributed by surendra singh lilhore

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-05-13 11:57:49 -07:00
parent ef2488e7f0
commit 2113e0a3f2
3 changed files with 65 additions and 6 deletions

View File

@ -427,6 +427,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
public static final String DFS_MOVER_MOVERTHREADS_KEY = "dfs.mover.moverThreads";
public static final int DFS_MOVER_MOVERTHREADS_DEFAULT = 1000;
public static final String DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY = "dfs.mover.retry.max.attempts";
public static final int DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT = 10;
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
public static final int DFS_DATANODE_DEFAULT_PORT = 50010;

View File

@ -59,6 +59,7 @@ import java.io.InputStreamReader;
import java.net.URI;
import java.text.DateFormat;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
@InterfaceAudience.Private
public class Mover {
@ -108,10 +109,12 @@ public class Mover {
private final Dispatcher dispatcher;
private final StorageMap storages;
private final List<Path> targetPaths;
private final int retryMaxAttempts;
private final AtomicInteger retryCount;
private final BlockStoragePolicy[] blockStoragePolicies;
Mover(NameNodeConnector nnc, Configuration conf) {
Mover(NameNodeConnector nnc, Configuration conf, AtomicInteger retryCount) {
final long movedWinWidth = conf.getLong(
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY,
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT);
@ -121,7 +124,10 @@ public class Mover {
final int maxConcurrentMovesPerNode = conf.getInt(
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
this.retryMaxAttempts = conf.getInt(
DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY,
DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT);
this.retryCount = retryCount;
this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(),
Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
maxConcurrentMovesPerNode, conf);
@ -256,14 +262,27 @@ public class Mover {
* @return whether there is still remaining migration work for the next
* round
*/
private boolean processNamespace() {
private boolean processNamespace() throws IOException {
getSnapshottableDirs();
boolean hasRemaining = false;
for (Path target : targetPaths) {
hasRemaining |= processPath(target.toUri().getPath());
}
// wait for pending move to finish and retry the failed migration
hasRemaining |= Dispatcher.waitForMoveCompletion(storages.targets.values());
boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets
.values());
if (hasFailed) {
if (retryCount.get() == retryMaxAttempts) {
throw new IOException("Failed to move some block's after "
+ retryMaxAttempts + " retries.");
} else {
retryCount.incrementAndGet();
}
} else {
// Reset retry count if no failure.
retryCount.set(0);
}
hasRemaining |= hasFailed;
return hasRemaining;
}
@ -529,6 +548,7 @@ public class Mover {
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 +
conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
AtomicInteger retryCount = new AtomicInteger(0);
LOG.info("namenodes = " + namenodes);
List<NameNodeConnector> connectors = Collections.emptyList();
@ -542,7 +562,7 @@ public class Mover {
Iterator<NameNodeConnector> iter = connectors.iterator();
while (iter.hasNext()) {
NameNodeConnector nnc = iter.next();
final Mover m = new Mover(nnc, conf);
final Mover m = new Mover(nnc, conf, retryCount);
final ExitStatus r = m.run();
if (r == ExitStatus.SUCCESS) {

View File

@ -20,12 +20,14 @@ package org.apache.hadoop.hdfs.server.mover;
import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
@ -34,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
@ -54,7 +57,7 @@ public class TestMover {
final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors(
nnMap, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
return new Mover(nncs.get(0), conf);
return new Mover(nncs.get(0), conf, new AtomicInteger(0));
}
@Test
@ -324,4 +327,38 @@ public class TestMover {
cluster.shutdown();
}
}
@Test
public void testMoverFailedRetry() throws Exception {
// HDFS-8147
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, "2");
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE}}).build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testMoverFailedRetry";
// write to DISK
final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
out.writeChars("testMoverFailedRetry");
out.close();
// Delete block file so, block move will fail with FileNotFoundException
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
cluster.corruptBlockOnDataNodesByDeletingBlockFile(lb.getBlock());
// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", file.toString()});
Assert.assertEquals("Movement should fail after some retry",
ExitStatus.IO_EXCEPTION.getExitCode(), rc);
} finally {
cluster.shutdown();
}
}
}