HDFS-6944. Archival Storage: add retry and termination logic for Mover. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2014-08-27 14:20:54 -07:00
parent b7ded466b0
commit a26aa6bd07
4 changed files with 381 additions and 78 deletions

View File

@ -43,6 +43,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -166,6 +167,10 @@ public class Dispatcher {
void clear() {
map.clear();
}
public Collection<G> values() {
return map.values();
}
}
/** This class keeps track of a scheduled block move */
@ -306,6 +311,7 @@ public class Dispatcher {
LOG.info("Successfully moved " + this);
} catch (IOException e) {
LOG.warn("Failed to move " + this + ": " + e.getMessage());
target.getDDatanode().setHasFailure();
// Proxy or target may have some issues, delay before using these nodes
// further in order to avoid a potential storm of "threads quota
// exceeded" warnings when the dispatcher gets out of sync with work
@ -366,6 +372,19 @@ public class Dispatcher {
public DBlock(Block block) {
super(block);
}
@Override
public synchronized boolean isLocatedOn(StorageGroup loc) {
// currently we only check if replicas are located on the same DataNodes
// since we do not have the capability to store two replicas in the same
// DataNode even though they are on two different storage types
for (StorageGroup existing : locations) {
if (existing.getDatanodeInfo().equals(loc.getDatanodeInfo())) {
return true;
}
}
return false;
}
}
/** The class represents a desired move. */
@ -469,6 +488,7 @@ public class Dispatcher {
protected long delayUntil = 0L;
/** blocks being moved but not confirmed yet */
private final List<PendingMove> pendings;
private volatile boolean hasFailure = false;
private final int maxConcurrentMoves;
@Override
@ -538,6 +558,10 @@ public class Dispatcher {
synchronized boolean removePendingBlock(PendingMove pendingBlock) {
return pendings.remove(pendingBlock);
}
void setHasFailure() {
this.hasFailure = true;
}
}
/** A node that can be the sources of a block move */
@ -884,7 +908,7 @@ public class Dispatcher {
}
// wait for all block moving to be done
waitForMoveCompletion();
waitForMoveCompletion(targets);
return bytesMoved.get() - bytesLastMoved;
}
@ -892,23 +916,25 @@ public class Dispatcher {
/** The sleeping period before checking if block move is completed again */
static private long blockMoveWaitTime = 30000L;
/** set the sleeping period for block move completion check */
static void setBlockMoveWaitTime(long time) {
blockMoveWaitTime = time;
}
/** Wait for all block move confirmations. */
private void waitForMoveCompletion() {
/**
* Wait for all block move confirmations.
* @return true if there is failed move execution
*/
public static boolean waitForMoveCompletion(
Iterable<? extends StorageGroup> targets) {
boolean hasFailure = false;
for(;;) {
boolean empty = true;
for (StorageGroup t : targets) {
if (!t.getDDatanode().isPendingQEmpty()) {
empty = false;
break;
} else {
hasFailure |= t.getDDatanode().hasFailure;
}
}
if (empty) {
return; //all pending queues are empty
return hasFailure; // all pending queues are empty
}
try {
Thread.sleep(blockMoveWaitTime);
@ -1011,9 +1037,17 @@ public class Dispatcher {
movedBlocks.cleanup();
}
/** set the sleeping period for block move completion check */
@VisibleForTesting
public static void setBlockMoveWaitTime(long time) {
blockMoveWaitTime = time;
}
/** shutdown thread pools */
public void shutdownNow() {
if (dispatchExecutor != null) {
dispatchExecutor.shutdownNow();
}
moveExecutor.shutdownNow();
}

View File

@ -40,7 +40,7 @@ public class MovedBlocks<L> {
public static class Locations<L> {
private final Block block; // the block
/** The locations of the replicas of the block. */
private final List<L> locations = new ArrayList<L>(3);
protected final List<L> locations = new ArrayList<L>(3);
public Locations(Block block) {
this.block = block;

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.mover;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -130,9 +131,8 @@ public class Mover {
private ExitStatus run() {
try {
init();
new Processor().processNamespace();
return ExitStatus.IN_PROGRESS;
boolean hasRemaining = new Processor().processNamespace();
return hasRemaining ? ExitStatus.IN_PROGRESS : ExitStatus.SUCCESS;
} catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.ILLEGAL_ARGUMENTS;
@ -223,16 +223,29 @@ public class Mover {
}
}
private void processNamespace() {
/**
* @return whether there is still remaining migration work for the next
* round
*/
private boolean processNamespace() {
getSnapshottableDirs();
boolean hasRemaining = true;
try {
processDirRecursively("", dfs.getFileInfo("/"));
hasRemaining = processDirRecursively("", dfs.getFileInfo("/"));
} catch (IOException e) {
LOG.warn("Failed to get root directory status. Ignore and continue.", e);
}
// wait for pending move to finish and retry the failed migration
hasRemaining |= Dispatcher.waitForMoveCompletion(storages.targets.values());
return hasRemaining;
}
private void processChildrenList(String fullPath) {
/**
* @return whether there is still remaing migration work for the next
* round
*/
private boolean processChildrenList(String fullPath) {
boolean hasRemaining = false;
for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
final DirectoryListing children;
try {
@ -240,124 +253,128 @@ public class Mover {
} catch(IOException e) {
LOG.warn("Failed to list directory " + fullPath
+ ". Ignore the directory and continue.", e);
return;
return hasRemaining;
}
if (children == null) {
return;
return hasRemaining;
}
for (HdfsFileStatus child : children.getPartialListing()) {
processDirRecursively(fullPath, child);
hasRemaining |= processDirRecursively(fullPath, child);
}
if (!children.hasMore()) {
if (children.hasMore()) {
lastReturnedName = children.getLastName();
} else {
return;
return hasRemaining;
}
}
}
private void processDirRecursively(String parent, HdfsFileStatus status) {
/** @return whether the migration requires next round */
private boolean processDirRecursively(String parent,
HdfsFileStatus status) {
String fullPath = status.getFullName(parent);
if (status.isSymlink()) {
return; //ignore symlinks
} else if (status.isDir()) {
boolean hasRemaining = false;
if (status.isDir()) {
if (!fullPath.endsWith(Path.SEPARATOR)) {
fullPath = fullPath + Path.SEPARATOR;
}
processChildrenList(fullPath);
hasRemaining = processChildrenList(fullPath);
// process snapshots if this is a snapshottable directory
if (snapshottableDirs.contains(fullPath)) {
final String dirSnapshot = fullPath + HdfsConstants.DOT_SNAPSHOT_DIR;
processChildrenList(dirSnapshot);
hasRemaining |= processChildrenList(dirSnapshot);
}
} else { // file
} else if (!status.isSymlink()) { // file
try {
if (isSnapshotPathInCurrent(fullPath)) {
if (!isSnapshotPathInCurrent(fullPath)) {
// the full path is a snapshot path but it is also included in the
// current directory tree, thus ignore it.
return;
hasRemaining = processFile((HdfsLocatedFileStatus)status);
}
} catch (IOException e) {
LOG.warn("Failed to check the status of " + parent
+ ". Ignore it and continue.", e);
return;
return false;
}
processFile(parent, (HdfsLocatedFileStatus)status);
}
return hasRemaining;
}
private void processFile(String parent, HdfsLocatedFileStatus status) {
/** @return true if it is necessary to run another round of migration */
private boolean processFile(HdfsLocatedFileStatus status) {
final BlockStoragePolicy policy = blockStoragePolicies.getPolicy(
status.getStoragePolicy());
final List<StorageType> types = policy.chooseStorageTypes(
status.getReplication());
final LocatedBlocks locations = status.getBlockLocations();
for(LocatedBlock lb : locations.getLocatedBlocks()) {
final StorageTypeDiff diff = new StorageTypeDiff(types, lb.getStorageTypes());
final LocatedBlocks locatedBlocks = status.getBlockLocations();
boolean hasRemaining = false;
for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
final StorageTypeDiff diff = new StorageTypeDiff(types,
lb.getStorageTypes());
if (!diff.removeOverlap()) {
scheduleMoves4Block(diff, lb);
if (scheduleMoves4Block(diff, lb)) {
hasRemaining |= (diff.existing.size() > 1 &&
diff.expected.size() > 1);
} else {
hasRemaining = true;
}
}
}
return hasRemaining;
}
void scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
final List<MLocation> locations = MLocation.toLocations(lb);
Collections.shuffle(locations);
final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations);
for(final Iterator<StorageType> i = diff.existing.iterator(); i.hasNext(); ) {
final StorageType t = i.next();
for(final Iterator<MLocation> j = locations.iterator(); j.hasNext(); ) {
final MLocation ml = j.next();
for (final StorageType t : diff.existing) {
for (final MLocation ml : locations) {
final Source source = storages.getSource(ml);
if (ml.storageType == t) {
// try to schedule replica move.
if (scheduleMoveReplica(db, ml, source, diff.expected)) {
i.remove();
j.remove();
return;
// try to schedule one replica move.
if (scheduleMoveReplica(db, source, diff.expected)) {
return true;
}
}
}
}
return false;
}
@VisibleForTesting
boolean scheduleMoveReplica(DBlock db, MLocation ml,
List<StorageType> targetTypes) {
return scheduleMoveReplica(db, ml, storages.getSource(ml), targetTypes);
return scheduleMoveReplica(db, storages.getSource(ml), targetTypes);
}
boolean scheduleMoveReplica(DBlock db, MLocation ml, Source source,
boolean scheduleMoveReplica(DBlock db, Source source,
List<StorageType> targetTypes) {
if (dispatcher.getCluster().isNodeGroupAware()) {
if (chooseTarget(db, ml, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
if (chooseTarget(db, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
return true;
}
}
// Then, match nodes on the same rack
if (chooseTarget(db, ml, source, targetTypes, Matcher.SAME_RACK)) {
if (chooseTarget(db, source, targetTypes, Matcher.SAME_RACK)) {
return true;
}
// At last, match all remaining nodes
if (chooseTarget(db, ml, source, targetTypes, Matcher.ANY_OTHER)) {
return true;
}
return false;
return chooseTarget(db, source, targetTypes, Matcher.ANY_OTHER);
}
boolean chooseTarget(DBlock db, MLocation ml, Source source,
boolean chooseTarget(DBlock db, Source source,
List<StorageType> targetTypes, Matcher matcher) {
final NetworkTopology cluster = dispatcher.getCluster();
for(final Iterator<StorageType> i = targetTypes.iterator(); i.hasNext(); ) {
final StorageType t = i.next();
for (StorageType t : targetTypes) {
for(StorageGroup target : storages.getTargetStorages(t)) {
if (matcher.match(cluster, ml.datanode, target.getDatanodeInfo())) {
if (matcher.match(cluster, source.getDatanodeInfo(),
target.getDatanodeInfo())) {
final PendingMove pm = source.addPendingMove(db, target);
if (pm != null) {
i.remove();
dispatcher.executePendingMove(pm);
return true;
}
@ -368,7 +385,6 @@ public class Mover {
}
}
static class MLocation {
final DatanodeInfo datanode;
final StorageType storageType;
@ -392,7 +408,8 @@ public class Mover {
}
}
private static class StorageTypeDiff {
@VisibleForTesting
static class StorageTypeDiff {
final List<StorageType> expected;
final List<StorageType> existing;
@ -403,7 +420,8 @@ public class Mover {
/**
* Remove the overlap between the expected types and the existing types.
* @return if the existing types is empty after removed the overlap.
* @return if the existing types or the expected types is empty after
* removing the overlap.
*/
boolean removeOverlap() {
for(Iterator<StorageType> i = existing.iterator(); i.hasNext(); ) {
@ -412,13 +430,13 @@ public class Mover {
i.remove();
}
}
return existing.isEmpty();
return expected.isEmpty() || existing.isEmpty();
}
}
static int run(Collection<URI> namenodes, Configuration conf)
throws IOException, InterruptedException {
final long sleeptime = 2000*conf.getLong(
final long sleeptime = 2000 * conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
LOG.info("namenodes = " + namenodes);
@ -428,22 +446,26 @@ public class Mover {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
Mover.class.getSimpleName(), MOVER_ID_PATH, conf);
while (true) {
while (connectors.size() > 0) {
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
Iterator<NameNodeConnector> iter = connectors.iterator();
while (iter.hasNext()) {
NameNodeConnector nnc = iter.next();
final Mover m = new Mover(nnc, conf);
final ExitStatus r = m.run();
if (r != ExitStatus.IN_PROGRESS) {
//must be an error statue, return.
if (r == ExitStatus.SUCCESS) {
iter.remove();
} else if (r != ExitStatus.IN_PROGRESS) {
// must be an error statue, return
return r.getExitCode();
}
}
Thread.sleep(sleeptime);
}
return ExitStatus.SUCCESS.getExitCode();
} finally {
for(NameNodeConnector nnc : connectors) {
for (NameNodeConnector nnc : connectors) {
IOUtils.cleanup(LOG, nnc);
}
}

View File

@ -0,0 +1,247 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.mover;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
import org.junit.Test;
import java.net.URI;
import java.util.*;
/**
* Test the data migration tool (for Archival Storage)
*/
public class TestStorageMover {
private static final long BLOCK_SIZE = 1024;
private static final short REPL = 3;
private static final int NUM_DATANODES = 6;
private static final Configuration DEFAULT_CONF = new HdfsConfiguration();
private static final BlockStoragePolicy.Suite DEFAULT_POLICIES;
private static final BlockStoragePolicy HOT;
private static final BlockStoragePolicy WARM;
private static final BlockStoragePolicy COLD;
static {
DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(new
HdfsConfiguration());
HOT = DEFAULT_POLICIES.getPolicy("HOT");
WARM = DEFAULT_POLICIES.getPolicy("WARM");
COLD = DEFAULT_POLICIES.getPolicy("COLD");
Dispatcher.setBlockMoveWaitTime(10 * 1000);
}
/**
* This scheme defines files/directories and their block storage policies. It
* also defines snapshots.
*/
static class NamespaceScheme {
final List<Path> files;
final Map<Path, List<String>> snapshotMap;
final Map<Path, BlockStoragePolicy> policyMap;
NamespaceScheme(List<Path> files, Map<Path,List<String>> snapshotMap,
Map<Path, BlockStoragePolicy> policyMap) {
this.files = files;
this.snapshotMap = snapshotMap == null ?
new HashMap<Path, List<String>>() : snapshotMap;
this.policyMap = policyMap;
}
}
/**
* This scheme defines DataNodes and their storage, including storage types
* and remaining capacities.
*/
static class ClusterScheme {
final Configuration conf;
final int numDataNodes;
final short repl;
final StorageType[][] storageTypes;
final long[][] storageCapacities;
ClusterScheme(Configuration conf, int numDataNodes, short repl,
StorageType[][] types, long[][] capacities) {
Preconditions.checkArgument(types == null || types.length == numDataNodes);
Preconditions.checkArgument(capacities == null || capacities.length ==
numDataNodes);
this.conf = conf;
this.numDataNodes = numDataNodes;
this.repl = repl;
this.storageTypes = types;
this.storageCapacities = capacities;
}
}
class MigrationTest {
private final ClusterScheme clusterScheme;
private final NamespaceScheme nsScheme;
private final Configuration conf;
private MiniDFSCluster cluster;
private DistributedFileSystem dfs;
private final BlockStoragePolicy.Suite policies;
MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme) {
this.clusterScheme = cScheme;
this.nsScheme = nsScheme;
this.conf = clusterScheme.conf;
this.policies = BlockStoragePolicy.readBlockStorageSuite(conf);
}
/**
* Set up the cluster and start NameNode and DataNodes according to the
* corresponding scheme.
*/
void setupCluster() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(clusterScheme
.numDataNodes).storageTypes(clusterScheme.storageTypes)
.storageCapacities(clusterScheme.storageCapacities).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
}
void shutdownCluster() throws Exception {
IOUtils.cleanup(null, dfs);
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Create files/directories and set their storage policies according to the
* corresponding scheme.
*/
void prepareNamespace() throws Exception {
for (Path file : nsScheme.files) {
DFSTestUtil.createFile(dfs, file, BLOCK_SIZE * 2, clusterScheme.repl,
0L);
}
for (Map.Entry<Path, List<String>> entry : nsScheme.snapshotMap.entrySet()) {
for (String snapshot : entry.getValue()) {
SnapshotTestHelper.createSnapshot(dfs, entry.getKey(), snapshot);
}
}
for (Map.Entry<Path, BlockStoragePolicy> entry : nsScheme.policyMap.entrySet()) {
dfs.setStoragePolicy(entry.getKey(), entry.getValue().getName());
}
}
/**
* Run the migration tool.
*/
void migrate(String... args) throws Exception {
runMover();
}
/**
* Verify block locations after running the migration tool.
*/
void verify(boolean verifyAll) throws Exception {
if (verifyAll) {
verifyNamespace();
} else {
// TODO verify according to the given path list
}
}
private void runMover() throws Exception {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
int result = Mover.run(namenodes, conf);
Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result);
}
private void verifyNamespace() throws Exception {
HdfsFileStatus status = dfs.getClient().getFileInfo("/");
verifyRecursively(null, status);
}
private void verifyRecursively(final Path parent,
final HdfsFileStatus status) throws Exception {
if (status.isDir()) {
Path fullPath = parent == null ?
new Path("/") : status.getFullPath(parent);
DirectoryListing children = dfs.getClient().listPaths(
fullPath.toString(), HdfsFileStatus.EMPTY_NAME, true);
for (HdfsFileStatus child : children.getPartialListing()) {
verifyRecursively(fullPath, child);
}
} else if (!status.isSymlink()) { // is file
HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status;
byte policyId = fileStatus.getStoragePolicy();
BlockStoragePolicy policy = policies.getPolicy(policyId);
final List<StorageType> types = policy.chooseStorageTypes(
status.getReplication());
for(LocatedBlock lb : fileStatus.getBlockLocations().getLocatedBlocks()) {
final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types,
lb.getStorageTypes());
Assert.assertTrue(diff.removeOverlap());
}
}
}
}
private static StorageType[][] genStorageTypes(int numDataNodes) {
StorageType[][] types = new StorageType[numDataNodes][];
for (int i = 0; i < types.length; i++) {
types[i] = new StorageType[]{StorageType.DISK, StorageType.ARCHIVE};
}
return types;
}
private void runTest(MigrationTest test) throws Exception {
test.setupCluster();
try {
test.prepareNamespace();
test.migrate();
Thread.sleep(5000); // let the NN finish deletion
test.verify(true);
} finally {
test.shutdownCluster();
}
}
/**
* A normal case for Mover: move a file into archival storage
*/
@Test
public void testMigrateFileToArchival() throws Exception {
final Path foo = new Path("/foo");
Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
policyMap.put(foo, COLD);
NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(foo), null,
policyMap);
ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
runTest(test);
}
}