From 2113e0a3f24c8de1a1e3d2a79b02fe5abeb6510d Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Wed, 13 May 2015 11:57:49 -0700 Subject: [PATCH] HDFS-8143. Mover should exit after some retry when failed to move blocks. Contributed by surendra singh lilhore --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../hadoop/hdfs/server/mover/Mover.java | 30 +++++++++++--- .../hadoop/hdfs/server/mover/TestMover.java | 39 ++++++++++++++++++- 3 files changed, 65 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 009f95e2375..b8d203e3cd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -427,6 +427,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; public static final String DFS_MOVER_MOVERTHREADS_KEY = "dfs.mover.moverThreads"; public static final int DFS_MOVER_MOVERTHREADS_DEFAULT = 1000; + public static final String DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY = "dfs.mover.retry.max.attempts"; + public static final int DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT = 10; public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address"; public static final int DFS_DATANODE_DEFAULT_PORT = 50010; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 89487aa68fc..466b91fc650 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -59,6 +59,7 @@ import java.io.InputStreamReader; import java.net.URI; import java.text.DateFormat; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; @InterfaceAudience.Private public class Mover { @@ -108,10 +109,12 @@ public class Mover { private final Dispatcher dispatcher; private final StorageMap storages; private final List targetPaths; + private final int retryMaxAttempts; + private final AtomicInteger retryCount; private final BlockStoragePolicy[] blockStoragePolicies; - Mover(NameNodeConnector nnc, Configuration conf) { + Mover(NameNodeConnector nnc, Configuration conf, AtomicInteger retryCount) { final long movedWinWidth = conf.getLong( DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT); @@ -121,7 +124,10 @@ public class Mover { final int maxConcurrentMovesPerNode = conf.getInt( DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); - + this.retryMaxAttempts = conf.getInt( + DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, + DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT); + this.retryCount = retryCount; this.dispatcher = new Dispatcher(nnc, Collections. emptySet(), Collections. emptySet(), movedWinWidth, moverThreads, 0, maxConcurrentMovesPerNode, conf); @@ -256,14 +262,27 @@ public class Mover { * @return whether there is still remaining migration work for the next * round */ - private boolean processNamespace() { + private boolean processNamespace() throws IOException { getSnapshottableDirs(); boolean hasRemaining = false; for (Path target : targetPaths) { hasRemaining |= processPath(target.toUri().getPath()); } // wait for pending move to finish and retry the failed migration - hasRemaining |= Dispatcher.waitForMoveCompletion(storages.targets.values()); + boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets + .values()); + if (hasFailed) { + if (retryCount.get() == retryMaxAttempts) { + throw new IOException("Failed to move some block's after " + + retryMaxAttempts + " retries."); + } else { + retryCount.incrementAndGet(); + } + } else { + // Reset retry count if no failure. + retryCount.set(0); + } + hasRemaining |= hasFailed; return hasRemaining; } @@ -529,6 +548,7 @@ public class Mover { DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 + conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000; + AtomicInteger retryCount = new AtomicInteger(0); LOG.info("namenodes = " + namenodes); List connectors = Collections.emptyList(); @@ -542,7 +562,7 @@ public class Mover { Iterator iter = connectors.iterator(); while (iter.hasNext()) { NameNodeConnector nnc = iter.next(); - final Mover m = new Mover(nnc, conf); + final Mover m = new Mover(nnc, conf, retryCount); final ExitStatus r = m.run(); if (r == ExitStatus.SUCCESS) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index b2f9fce8c3b..f4bedabf82a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -20,12 +20,14 @@ package org.apache.hadoop.hdfs.server.mover; import java.io.IOException; import java.net.URI; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSUtil; @@ -34,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock; +import org.apache.hadoop.hdfs.server.balancer.ExitStatus; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.mover.Mover.MLocation; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; @@ -54,7 +57,7 @@ public class TestMover { final List nncs = NameNodeConnector.newNameNodeConnectors( nnMap, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS); - return new Mover(nncs.get(0), conf); + return new Mover(nncs.get(0), conf, new AtomicInteger(0)); } @Test @@ -324,4 +327,38 @@ public class TestMover { cluster.shutdown(); } } + + @Test + public void testMoverFailedRetry() throws Exception { + // HDFS-8147 + final Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, "2"); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(3) + .storageTypes( + new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}}).build(); + try { + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String file = "/testMoverFailedRetry"; + // write to DISK + final FSDataOutputStream out = dfs.create(new Path(file), (short) 2); + out.writeChars("testMoverFailedRetry"); + out.close(); + + // Delete block file so, block move will fail with FileNotFoundException + LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + cluster.corruptBlockOnDataNodesByDeletingBlockFile(lb.getBlock()); + // move to ARCHIVE + dfs.setStoragePolicy(new Path(file), "COLD"); + int rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] {"-p", file.toString()}); + Assert.assertEquals("Movement should fail after some retry", + ExitStatus.IO_EXCEPTION.getExitCode(), rc); + } finally { + cluster.shutdown(); + } + } }