diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index a86023e416c..79815c05a83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -23,9 +23,7 @@ import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -63,7 +61,22 @@ public static List newNameNodeConnectors( final List connectors = new ArrayList( namenodes.size()); for (URI uri : namenodes) { - NameNodeConnector nnc = new NameNodeConnector(name, uri, idPath, conf); + NameNodeConnector nnc = new NameNodeConnector(name, uri, idPath, + null, conf); + nnc.getKeyManager().startBlockKeyUpdater(); + connectors.add(nnc); + } + return connectors; + } + + public static List newNameNodeConnectors( + Map> namenodes, String name, Path idPath, + Configuration conf) throws IOException { + final List connectors = new ArrayList( + namenodes.size()); + for (Map.Entry> entry : namenodes.entrySet()) { + NameNodeConnector nnc = new NameNodeConnector(name, entry.getKey(), + idPath, entry.getValue(), conf); nnc.getKeyManager().startBlockKeyUpdater(); connectors.add(nnc); } @@ -80,14 +93,18 @@ public static List newNameNodeConnectors( private final DistributedFileSystem fs; private final Path idPath; private final OutputStream out; + private final List targetPaths; private int notChangedIterations = 0; public NameNodeConnector(String name, URI nameNodeUri, Path idPath, - Configuration conf) throws IOException { + List targetPaths, Configuration conf) + throws IOException { this.nameNodeUri = nameNodeUri; this.idPath = idPath; - + this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays + .asList(new Path("/")) : targetPaths; + this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class).getProxy(); this.client = NameNodeProxies.createProxy(conf, nameNodeUri, @@ -133,6 +150,11 @@ public KeyManager getKeyManager() { return keyManager; } + /** @return the list of paths to scan/migrate */ + public List getTargetPaths() { + return targetPaths; + } + /** Should the instance continue running? */ public boolean shouldContinue(long dispatchBlockMoveBytes) { if (dispatchBlockMoveBytes > 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 57ad6aa8bad..1593a7efb4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -19,6 +19,9 @@ import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.cli.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -43,6 +46,8 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import java.io.BufferedReader; +import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.text.DateFormat; @@ -93,6 +98,7 @@ private List getTargetStorages(StorageType t) { private final Dispatcher dispatcher; private final StorageMap storages; + private final List targetPaths; private final BlockStoragePolicy.Suite blockStoragePolicies; @@ -112,6 +118,7 @@ Collections. emptySet(), movedWinWidth, moverThreads, 0, maxConcurrentMovesPerNode, conf); this.storages = new StorageMap(); this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf); + this.targetPaths = nnc.getTargetPaths(); } void init() throws IOException { @@ -232,7 +239,10 @@ private boolean processNamespace() { getSnapshottableDirs(); boolean hasRemaining = true; try { - hasRemaining = processDirRecursively("", dfs.getFileInfo("/")); + for (Path target : targetPaths) { + hasRemaining = processDirRecursively("", dfs.getFileInfo(target + .toUri().getPath())); + } } catch (IOException e) { LOG.warn("Failed to get root directory status. Ignore and continue.", e); } @@ -441,7 +451,7 @@ public String toString() { } } - static int run(Collection namenodes, Configuration conf) + static int run(Map> namenodes, Configuration conf) throws IOException, InterruptedException { final long sleeptime = 2000 * conf.getLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, @@ -452,7 +462,7 @@ static int run(Collection namenodes, Configuration conf) try { connectors = NameNodeConnector.newNameNodeConnectors(namenodes, Mover.class.getSimpleName(), MOVER_ID_PATH, conf); - + while (connectors.size() > 0) { Collections.shuffle(connectors); Iterator iter = connectors.iterator(); @@ -480,7 +490,103 @@ static int run(Collection namenodes, Configuration conf) static class Cli extends Configured implements Tool { private static final String USAGE = "Usage: java " - + Mover.class.getSimpleName(); + + Mover.class.getSimpleName() + + " [-p specify a list of files/dirs to migrate]" + + " [-f specify a local file containing files/dirs to migrate]"; + + private static Options buildCliOptions() { + Options opts = new Options(); + Option file = OptionBuilder.withArgName("pathsFile").hasArg() + .withDescription("a local file containing files/dirs to migrate") + .create("f"); + Option paths = OptionBuilder.withArgName("paths").hasArgs() + .withDescription("specify space separated files/dirs to migrate") + .create("p"); + OptionGroup group = new OptionGroup(); + group.addOption(file); + group.addOption(paths); + opts.addOptionGroup(group); + return opts; + } + + private static String[] readPathFile(String file) throws IOException { + List list = Lists.newArrayList(); + BufferedReader reader = new BufferedReader(new FileReader(file)); + try { + String line; + while ((line = reader.readLine()) != null) { + if (!line.trim().isEmpty()) { + list.add(line); + } + } + } finally { + IOUtils.cleanup(LOG, reader); + } + return list.toArray(new String[list.size()]); + } + + private static Map> getNameNodePaths(CommandLine line, + Configuration conf) throws Exception { + Map> map = Maps.newHashMap(); + String[] paths = null; + if (line.hasOption("f")) { + paths = readPathFile(line.getOptionValue("f")); + } else if (line.hasOption("p")) { + paths = line.getOptionValues("p"); + } + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + if (paths == null || paths.length == 0) { + for (URI namenode : namenodes) { + map.put(namenode, null); + } + return map; + } + final URI singleNs = namenodes.size() == 1 ? + namenodes.iterator().next() : null; + for (String path : paths) { + Path target = new Path(path); + if (!target.isUriPathAbsolute()) { + throw new IllegalArgumentException("The path " + target + + " is not absolute"); + } + URI targetUri = target.toUri(); + if ((targetUri.getAuthority() == null || targetUri.getScheme() == + null) && singleNs == null) { + // each path must contains both scheme and authority information + // unless there is only one name service specified in the + // configuration + throw new IllegalArgumentException("The path " + target + + " does not contain scheme and authority thus cannot identify" + + " its name service"); + } + URI key = singleNs; + if (singleNs == null) { + key = new URI(targetUri.getScheme(), targetUri.getAuthority(), + null, null, null); + if (!namenodes.contains(key)) { + throw new IllegalArgumentException("Cannot resolve the path " + + target + ". The namenode services specified in the " + + "configuration: " + namenodes); + } + } + List targets = map.get(key); + if (targets == null) { + targets = Lists.newArrayList(); + map.put(key, targets); + } + targets.add(Path.getPathWithoutSchemeAndAuthority(target)); + } + return map; + } + + @VisibleForTesting + static Map> getNameNodePathsToMove(Configuration conf, + String... args) throws Exception { + final Options opts = buildCliOptions(); + CommandLineParser parser = new GnuParser(); + CommandLine commandLine = parser.parse(opts, args, true); + return getNameNodePaths(commandLine, conf); + } @Override public int run(String[] args) throws Exception { @@ -488,14 +594,20 @@ public int run(String[] args) throws Exception { final Configuration conf = getConf(); try { - final Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); - return Mover.run(namenodes, conf); + final Map> map = getNameNodePathsToMove(conf, args); + return Mover.run(map, conf); } catch (IOException e) { System.out.println(e + ". Exiting ..."); return ExitStatus.IO_EXCEPTION.getExitCode(); } catch (InterruptedException e) { System.out.println(e + ". Exiting ..."); return ExitStatus.INTERRUPTED.getExitCode(); + } catch (ParseException e) { + System.out.println(e + ". Exiting ..."); + return ExitStatus.ILLEGAL_ARGUMENTS.getExitCode(); + } catch (IllegalArgumentException e) { + System.out.println(e + ". Exiting ..."); + return ExitStatus.ILLEGAL_ARGUMENTS.getExitCode(); } finally { System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date())); System.out.println("Mover took " + StringUtils.formatTime(Time.monotonicNow()-startTime)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 96e50dfb553..a1aa7d64ac4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -20,9 +20,12 @@ import com.google.common.base.Charsets; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -87,8 +90,7 @@ import java.util.*; import java.util.concurrent.TimeoutException; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -871,6 +873,37 @@ public static void setFederatedConfiguration(MiniDFSCluster cluster, conf.set(DFSConfigKeys.DFS_NAMESERVICES, Joiner.on(",") .join(nameservices)); } + + public static void setFederatedHAConfiguration(MiniDFSCluster cluster, + Configuration conf) { + Map> nameservices = Maps.newHashMap(); + for (NameNodeInfo info : cluster.getNameNodeInfos()) { + Preconditions.checkState(info.nameserviceId != null); + List nns = nameservices.get(info.nameserviceId); + if (nns == null) { + nns = Lists.newArrayList(); + nameservices.put(info.nameserviceId, nns); + } + nns.add(info.nnId); + + conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, + info.nameserviceId, info.nnId), + DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME, + info.nameNode.getNameNodeAddress()).toString()); + conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, + info.nameserviceId, info.nnId), + DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME, + info.nameNode.getNameNodeAddress()).toString()); + } + for (Map.Entry> entry : nameservices.entrySet()) { + conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, + entry.getKey()), Joiner.on(",").join(entry.getValue())); + conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + entry + .getKey(), ConfiguredFailoverProxyProvider.class.getName()); + } + conf.set(DFSConfigKeys.DFS_NAMESERVICES, Joiner.on(",") + .join(nameservices.keySet())); + } private static DatanodeID getDatanodeID(String ipAddr) { return new DatanodeID(ipAddr, "localhost", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index da913e70baf..5866c7f7384 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -19,23 +19,19 @@ import java.io.IOException; import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; +import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.mover.Mover.MLocation; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.GenericOptionsParser; import org.junit.Assert; import org.junit.Test; @@ -82,4 +78,145 @@ public void testScheduleSameBlock() throws IOException { cluster.shutdown(); } } + + private void checkMovePaths(List actual, Path... expected) { + Assert.assertEquals(expected.length, actual.size()); + for (Path p : expected) { + Assert.assertTrue(actual.contains(p)); + } + } + + /** + * Test Mover Cli by specifying a list of files/directories using option "-p". + * There is only one namenode (and hence name service) specified in the conf. + */ + @Test + public void testMoverCli() throws Exception { + final MiniDFSCluster cluster = new MiniDFSCluster + .Builder(new HdfsConfiguration()).numDataNodes(0).build(); + try { + final Configuration conf = cluster.getConfiguration(0); + try { + Mover.Cli.getNameNodePathsToMove(conf, "-p", "/foo", "bar"); + Assert.fail("Expected exception for illegal path bar"); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains("bar is not absolute", e); + } + + Map> movePaths = Mover.Cli.getNameNodePathsToMove(conf); + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + Assert.assertEquals(1, namenodes.size()); + Assert.assertEquals(1, movePaths.size()); + URI nn = namenodes.iterator().next(); + Assert.assertTrue(movePaths.containsKey(nn)); + Assert.assertNull(movePaths.get(nn)); + + movePaths = Mover.Cli.getNameNodePathsToMove(conf, "-p", "/foo", "/bar"); + namenodes = DFSUtil.getNsServiceRpcUris(conf); + Assert.assertEquals(1, movePaths.size()); + nn = namenodes.iterator().next(); + Assert.assertTrue(movePaths.containsKey(nn)); + checkMovePaths(movePaths.get(nn), new Path("/foo"), new Path("/bar")); + } finally { + cluster.shutdown(); + } + } + + @Test + public void testMoverCliWithHAConf() throws Exception { + final Configuration conf = new HdfsConfiguration(); + final MiniDFSCluster cluster = new MiniDFSCluster + .Builder(new HdfsConfiguration()) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0).build(); + HATestUtil.setFailoverConfigurations(cluster, conf, "MyCluster"); + try { + Map> movePaths = Mover.Cli.getNameNodePathsToMove(conf, + "-p", "/foo", "/bar"); + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + Assert.assertEquals(1, namenodes.size()); + Assert.assertEquals(1, movePaths.size()); + URI nn = namenodes.iterator().next(); + Assert.assertEquals(new URI("hdfs://MyCluster"), nn); + Assert.assertTrue(movePaths.containsKey(nn)); + checkMovePaths(movePaths.get(nn), new Path("/foo"), new Path("/bar")); + } finally { + cluster.shutdown(); + } + } + + @Test + public void testMoverCliWithFederation() throws Exception { + final MiniDFSCluster cluster = new MiniDFSCluster + .Builder(new HdfsConfiguration()) + .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3)) + .numDataNodes(0).build(); + final Configuration conf = new HdfsConfiguration(); + DFSTestUtil.setFederatedConfiguration(cluster, conf); + try { + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + Assert.assertEquals(3, namenodes.size()); + + try { + Mover.Cli.getNameNodePathsToMove(conf, "-p", "/foo"); + Assert.fail("Expect exception for missing authority information"); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains( + "does not contain scheme and authority", e); + } + + try { + Mover.Cli.getNameNodePathsToMove(conf, "-p", "hdfs:///foo"); + Assert.fail("Expect exception for missing authority information"); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains( + "does not contain scheme and authority", e); + } + + try { + Mover.Cli.getNameNodePathsToMove(conf, "-p", "wrong-hdfs://ns1/foo"); + Assert.fail("Expect exception for wrong scheme"); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains("Cannot resolve the path", e); + } + + Iterator iter = namenodes.iterator(); + URI nn1 = iter.next(); + URI nn2 = iter.next(); + Map> movePaths = Mover.Cli.getNameNodePathsToMove(conf, + "-p", nn1 + "/foo", nn1 + "/bar", nn2 + "/foo/bar"); + Assert.assertEquals(2, movePaths.size()); + checkMovePaths(movePaths.get(nn1), new Path("/foo"), new Path("/bar")); + checkMovePaths(movePaths.get(nn2), new Path("/foo/bar")); + } finally { + cluster.shutdown(); + } + } + + @Test + public void testMoverCliWithFederationHA() throws Exception { + final MiniDFSCluster cluster = new MiniDFSCluster + .Builder(new HdfsConfiguration()) + .nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(3)) + .numDataNodes(0).build(); + final Configuration conf = new HdfsConfiguration(); + DFSTestUtil.setFederatedHAConfiguration(cluster, conf); + try { + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + Assert.assertEquals(3, namenodes.size()); + + Iterator iter = namenodes.iterator(); + URI nn1 = iter.next(); + URI nn2 = iter.next(); + URI nn3 = iter.next(); + Map> movePaths = Mover.Cli.getNameNodePathsToMove(conf, + "-p", nn1 + "/foo", nn1 + "/bar", nn2 + "/foo/bar", nn3 + "/foobar"); + Assert.assertEquals(3, movePaths.size()); + checkMovePaths(movePaths.get(nn1), new Path("/foo"), new Path("/bar")); + checkMovePaths(movePaths.get(nn2), new Path("/foo/bar")); + checkMovePaths(movePaths.get(nn3), new Path("/foobar")); + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java index 88b399277c2..66088b68d55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java @@ -252,7 +252,11 @@ void verify(boolean verifyAll) throws Exception { private void runMover() throws Exception { Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); - int result = Mover.run(namenodes, conf); + Map> nnMap = Maps.newHashMap(); + for (URI nn : namenodes) { + nnMap.put(nn, null); + } + int result = Mover.run(nnMap, conf); Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result); }