HDFS-6875. Archival Storage: support migration for a list of specified paths. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2014-09-08 14:10:00 -07:00
parent f1432e2424
commit 2b5c528a73
5 changed files with 332 additions and 24 deletions

View File

@ -23,9 +23,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -63,7 +61,22 @@ public class NameNodeConnector implements Closeable {
final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>(
namenodes.size());
for (URI uri : namenodes) {
NameNodeConnector nnc = new NameNodeConnector(name, uri, idPath, conf);
NameNodeConnector nnc = new NameNodeConnector(name, uri, idPath,
null, conf);
nnc.getKeyManager().startBlockKeyUpdater();
connectors.add(nnc);
}
return connectors;
}
public static List<NameNodeConnector> newNameNodeConnectors(
Map<URI, List<Path>> namenodes, String name, Path idPath,
Configuration conf) throws IOException {
final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>(
namenodes.size());
for (Map.Entry<URI, List<Path>> entry : namenodes.entrySet()) {
NameNodeConnector nnc = new NameNodeConnector(name, entry.getKey(),
idPath, entry.getValue(), conf);
nnc.getKeyManager().startBlockKeyUpdater();
connectors.add(nnc);
}
@ -80,13 +93,17 @@ public class NameNodeConnector implements Closeable {
private final DistributedFileSystem fs;
private final Path idPath;
private final OutputStream out;
private final List<Path> targetPaths;
private int notChangedIterations = 0;
public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
Configuration conf) throws IOException {
List<Path> targetPaths, Configuration conf)
throws IOException {
this.nameNodeUri = nameNodeUri;
this.idPath = idPath;
this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays
.asList(new Path("/")) : targetPaths;
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
NamenodeProtocol.class).getProxy();
@ -133,6 +150,11 @@ public class NameNodeConnector implements Closeable {
return keyManager;
}
/** @return the list of paths to scan/migrate */
public List<Path> getTargetPaths() {
return targetPaths;
}
/** Should the instance continue running? */
public boolean shouldContinue(long dispatchBlockMoveBytes) {
if (dispatchBlockMoveBytes > 0) {

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.hdfs.server.mover;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.cli.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -43,6 +46,8 @@ import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.text.DateFormat;
@ -93,6 +98,7 @@ public class Mover {
private final Dispatcher dispatcher;
private final StorageMap storages;
private final List<Path> targetPaths;
private final BlockStoragePolicy.Suite blockStoragePolicies;
@ -112,6 +118,7 @@ public class Mover {
maxConcurrentMovesPerNode, conf);
this.storages = new StorageMap();
this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf);
this.targetPaths = nnc.getTargetPaths();
}
void init() throws IOException {
@ -232,7 +239,10 @@ public class Mover {
getSnapshottableDirs();
boolean hasRemaining = true;
try {
hasRemaining = processDirRecursively("", dfs.getFileInfo("/"));
for (Path target : targetPaths) {
hasRemaining = processDirRecursively("", dfs.getFileInfo(target
.toUri().getPath()));
}
} catch (IOException e) {
LOG.warn("Failed to get root directory status. Ignore and continue.", e);
}
@ -441,7 +451,7 @@ public class Mover {
}
}
static int run(Collection<URI> namenodes, Configuration conf)
static int run(Map<URI, List<Path>> namenodes, Configuration conf)
throws IOException, InterruptedException {
final long sleeptime = 2000 * conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
@ -480,7 +490,103 @@ public class Mover {
static class Cli extends Configured implements Tool {
private static final String USAGE = "Usage: java "
+ Mover.class.getSimpleName();
+ Mover.class.getSimpleName()
+ " [-p <space separated files/dirs> specify a list of files/dirs to migrate]"
+ " [-f <local file name> specify a local file containing files/dirs to migrate]";
private static Options buildCliOptions() {
Options opts = new Options();
Option file = OptionBuilder.withArgName("pathsFile").hasArg()
.withDescription("a local file containing files/dirs to migrate")
.create("f");
Option paths = OptionBuilder.withArgName("paths").hasArgs()
.withDescription("specify space separated files/dirs to migrate")
.create("p");
OptionGroup group = new OptionGroup();
group.addOption(file);
group.addOption(paths);
opts.addOptionGroup(group);
return opts;
}
private static String[] readPathFile(String file) throws IOException {
List<String> list = Lists.newArrayList();
BufferedReader reader = new BufferedReader(new FileReader(file));
try {
String line;
while ((line = reader.readLine()) != null) {
if (!line.trim().isEmpty()) {
list.add(line);
}
}
} finally {
IOUtils.cleanup(LOG, reader);
}
return list.toArray(new String[list.size()]);
}
private static Map<URI, List<Path>> getNameNodePaths(CommandLine line,
Configuration conf) throws Exception {
Map<URI, List<Path>> map = Maps.newHashMap();
String[] paths = null;
if (line.hasOption("f")) {
paths = readPathFile(line.getOptionValue("f"));
} else if (line.hasOption("p")) {
paths = line.getOptionValues("p");
}
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
if (paths == null || paths.length == 0) {
for (URI namenode : namenodes) {
map.put(namenode, null);
}
return map;
}
final URI singleNs = namenodes.size() == 1 ?
namenodes.iterator().next() : null;
for (String path : paths) {
Path target = new Path(path);
if (!target.isUriPathAbsolute()) {
throw new IllegalArgumentException("The path " + target
+ " is not absolute");
}
URI targetUri = target.toUri();
if ((targetUri.getAuthority() == null || targetUri.getScheme() ==
null) && singleNs == null) {
// each path must contains both scheme and authority information
// unless there is only one name service specified in the
// configuration
throw new IllegalArgumentException("The path " + target
+ " does not contain scheme and authority thus cannot identify"
+ " its name service");
}
URI key = singleNs;
if (singleNs == null) {
key = new URI(targetUri.getScheme(), targetUri.getAuthority(),
null, null, null);
if (!namenodes.contains(key)) {
throw new IllegalArgumentException("Cannot resolve the path " +
target + ". The namenode services specified in the " +
"configuration: " + namenodes);
}
}
List<Path> targets = map.get(key);
if (targets == null) {
targets = Lists.newArrayList();
map.put(key, targets);
}
targets.add(Path.getPathWithoutSchemeAndAuthority(target));
}
return map;
}
@VisibleForTesting
static Map<URI, List<Path>> getNameNodePathsToMove(Configuration conf,
String... args) throws Exception {
final Options opts = buildCliOptions();
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(opts, args, true);
return getNameNodePaths(commandLine, conf);
}
@Override
public int run(String[] args) throws Exception {
@ -488,14 +594,20 @@ public class Mover {
final Configuration conf = getConf();
try {
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
return Mover.run(namenodes, conf);
final Map<URI, List<Path>> map = getNameNodePathsToMove(conf, args);
return Mover.run(map, conf);
} catch (IOException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.IO_EXCEPTION.getExitCode();
} catch (InterruptedException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.INTERRUPTED.getExitCode();
} catch (ParseException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.ILLEGAL_ARGUMENTS.getExitCode();
} catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.ILLEGAL_ARGUMENTS.getExitCode();
} finally {
System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date()));
System.out.println("Mover took " + StringUtils.formatTime(Time.monotonicNow()-startTime));

View File

@ -20,9 +20,12 @@ package org.apache.hadoop.hdfs;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -87,8 +90,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -872,6 +874,37 @@ public class DFSTestUtil {
.join(nameservices));
}
public static void setFederatedHAConfiguration(MiniDFSCluster cluster,
Configuration conf) {
Map<String, List<String>> nameservices = Maps.newHashMap();
for (NameNodeInfo info : cluster.getNameNodeInfos()) {
Preconditions.checkState(info.nameserviceId != null);
List<String> nns = nameservices.get(info.nameserviceId);
if (nns == null) {
nns = Lists.newArrayList();
nameservices.put(info.nameserviceId, nns);
}
nns.add(info.nnId);
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
info.nameserviceId, info.nnId),
DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME,
info.nameNode.getNameNodeAddress()).toString());
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
info.nameserviceId, info.nnId),
DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME,
info.nameNode.getNameNodeAddress()).toString());
}
for (Map.Entry<String, List<String>> entry : nameservices.entrySet()) {
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX,
entry.getKey()), Joiner.on(",").join(entry.getValue()));
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + entry
.getKey(), ConfiguredFailoverProxyProvider.class.getName());
}
conf.set(DFSConfigKeys.DFS_NAMESERVICES, Joiner.on(",")
.join(nameservices.keySet()));
}
private static DatanodeID getDatanodeID(String ipAddr) {
return new DatanodeID(ipAddr, "localhost",
UUID.randomUUID().toString(),

View File

@ -19,23 +19,19 @@ package org.apache.hadoop.hdfs.server.mover;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.GenericOptionsParser;
import org.junit.Assert;
import org.junit.Test;
@ -82,4 +78,145 @@ public class TestMover {
cluster.shutdown();
}
}
private void checkMovePaths(List<Path> actual, Path... expected) {
Assert.assertEquals(expected.length, actual.size());
for (Path p : expected) {
Assert.assertTrue(actual.contains(p));
}
}
/**
* Test Mover Cli by specifying a list of files/directories using option "-p".
* There is only one namenode (and hence name service) specified in the conf.
*/
@Test
public void testMoverCli() throws Exception {
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(new HdfsConfiguration()).numDataNodes(0).build();
try {
final Configuration conf = cluster.getConfiguration(0);
try {
Mover.Cli.getNameNodePathsToMove(conf, "-p", "/foo", "bar");
Assert.fail("Expected exception for illegal path bar");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("bar is not absolute", e);
}
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf);
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Assert.assertEquals(1, namenodes.size());
Assert.assertEquals(1, movePaths.size());
URI nn = namenodes.iterator().next();
Assert.assertTrue(movePaths.containsKey(nn));
Assert.assertNull(movePaths.get(nn));
movePaths = Mover.Cli.getNameNodePathsToMove(conf, "-p", "/foo", "/bar");
namenodes = DFSUtil.getNsServiceRpcUris(conf);
Assert.assertEquals(1, movePaths.size());
nn = namenodes.iterator().next();
Assert.assertTrue(movePaths.containsKey(nn));
checkMovePaths(movePaths.get(nn), new Path("/foo"), new Path("/bar"));
} finally {
cluster.shutdown();
}
}
@Test
public void testMoverCliWithHAConf() throws Exception {
final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(new HdfsConfiguration())
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0).build();
HATestUtil.setFailoverConfigurations(cluster, conf, "MyCluster");
try {
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf,
"-p", "/foo", "/bar");
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Assert.assertEquals(1, namenodes.size());
Assert.assertEquals(1, movePaths.size());
URI nn = namenodes.iterator().next();
Assert.assertEquals(new URI("hdfs://MyCluster"), nn);
Assert.assertTrue(movePaths.containsKey(nn));
checkMovePaths(movePaths.get(nn), new Path("/foo"), new Path("/bar"));
} finally {
cluster.shutdown();
}
}
@Test
public void testMoverCliWithFederation() throws Exception {
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(new HdfsConfiguration())
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3))
.numDataNodes(0).build();
final Configuration conf = new HdfsConfiguration();
DFSTestUtil.setFederatedConfiguration(cluster, conf);
try {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Assert.assertEquals(3, namenodes.size());
try {
Mover.Cli.getNameNodePathsToMove(conf, "-p", "/foo");
Assert.fail("Expect exception for missing authority information");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains(
"does not contain scheme and authority", e);
}
try {
Mover.Cli.getNameNodePathsToMove(conf, "-p", "hdfs:///foo");
Assert.fail("Expect exception for missing authority information");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains(
"does not contain scheme and authority", e);
}
try {
Mover.Cli.getNameNodePathsToMove(conf, "-p", "wrong-hdfs://ns1/foo");
Assert.fail("Expect exception for wrong scheme");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("Cannot resolve the path", e);
}
Iterator<URI> iter = namenodes.iterator();
URI nn1 = iter.next();
URI nn2 = iter.next();
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf,
"-p", nn1 + "/foo", nn1 + "/bar", nn2 + "/foo/bar");
Assert.assertEquals(2, movePaths.size());
checkMovePaths(movePaths.get(nn1), new Path("/foo"), new Path("/bar"));
checkMovePaths(movePaths.get(nn2), new Path("/foo/bar"));
} finally {
cluster.shutdown();
}
}
@Test
public void testMoverCliWithFederationHA() throws Exception {
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(new HdfsConfiguration())
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(3))
.numDataNodes(0).build();
final Configuration conf = new HdfsConfiguration();
DFSTestUtil.setFederatedHAConfiguration(cluster, conf);
try {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Assert.assertEquals(3, namenodes.size());
Iterator<URI> iter = namenodes.iterator();
URI nn1 = iter.next();
URI nn2 = iter.next();
URI nn3 = iter.next();
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf,
"-p", nn1 + "/foo", nn1 + "/bar", nn2 + "/foo/bar", nn3 + "/foobar");
Assert.assertEquals(3, movePaths.size());
checkMovePaths(movePaths.get(nn1), new Path("/foo"), new Path("/bar"));
checkMovePaths(movePaths.get(nn2), new Path("/foo/bar"));
checkMovePaths(movePaths.get(nn3), new Path("/foobar"));
} finally {
cluster.shutdown();
}
}
}

View File

@ -252,7 +252,11 @@ public class TestStorageMover {
private void runMover() throws Exception {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
int result = Mover.run(namenodes, conf);
Map<URI, List<Path>> nnMap = Maps.newHashMap();
for (URI nn : namenodes) {
nnMap.put(nn, null);
}
int result = Mover.run(nnMap, conf);
Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result);
}