HDFS-6997: add more tests for data migration and replicaion.

This commit is contained in:
Tsz-Wo Nicholas Sze 2014-09-07 07:44:28 +08:00
parent ba4fc93177
commit 22a41dce4a
8 changed files with 579 additions and 113 deletions

View File

@ -2930,7 +2930,7 @@ public class BlockManager {
// Decrement number of blocks scheduled to this datanode.
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
// RECEIVED_BLOCK), we currently also decrease the approximate number.
node.decrementBlocksScheduled();
node.decrementBlocksScheduled(storageInfo.getStorageType());
// get the deletion hint node
DatanodeDescriptor delHintNode = null;

View File

@ -76,12 +76,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
*/
protected int tolerateHeartbeatMultiplier;
protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap,
Host2NodesMap host2datanodeMap) {
initialize(conf, stats, clusterMap, host2datanodeMap);
}
protected BlockPlacementPolicyDefault() {
}
@ -174,6 +168,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
return getPipeline(writer,
results.toArray(new DatanodeStorageInfo[results.size()]));
} catch (NotEnoughReplicasException nr) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to choose with favored nodes (=" + favoredNodes
+ "), disregard favored nodes hint and retry.", nr);
}
// Fall back to regular block placement disregarding favored nodes hint
return chooseTarget(src, numOfReplicas, writer,
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
@ -291,6 +289,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
unavailableStorages, newBlock);
final EnumMap<StorageType, Integer> storageTypes =
getRequiredStorageTypes(requiredStorageTypes);
if (LOG.isTraceEnabled()) {
LOG.trace("storageTypes=" + storageTypes);
}
try {
if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
@ -337,7 +338,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
} catch (NotEnoughReplicasException e) {
final String message = "Failed to place enough replicas, still in need of "
+ (totalReplicasExpected - results.size()) + " to reach "
+ totalReplicasExpected + ".";
+ totalReplicasExpected
+ " (unavailableStorages=" + unavailableStorages
+ ", storagePolicy=" + storagePolicy
+ ", newBlock=" + newBlock + ")";
if (LOG.isTraceEnabled()) {
LOG.trace(message, e);
} else {
@ -466,37 +471,57 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
}
final String localRack = localMachine.getNetworkLocation();
// choose one from the local rack
try {
return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
// choose one from the local rack
return chooseRandom(localRack, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
} catch (NotEnoughReplicasException e) {
// find the next replica and retry with its rack
for(DatanodeStorageInfo resultStorage : results) {
DatanodeDescriptor nextNode = resultStorage.getDatanodeDescriptor();
if (nextNode != localMachine) {
newLocal = nextNode;
break;
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to choose from local rack (location = " + localRack
+ "), retry with the rack of the next replica (location = "
+ nextNode.getNetworkLocation() + ")", e);
}
return chooseFromNextRack(nextNode, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
}
}
if (newLocal != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to choose from local rack (location = " + localRack
+ "); the second replica is not found, retry choosing ramdomly", e);
}
//the second replica is not found, randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
}
}
private DatanodeStorageInfo chooseFromNextRack(Node next,
Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException {
final String nextRack = next.getNetworkLocation();
try {
return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes,
storageTypes);
} catch(NotEnoughReplicasException e2) {
return chooseRandom(nextRack, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} catch(NotEnoughReplicasException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to choose from the next rack (location = " + nextRack
+ "), retry choosing ramdomly", e);
}
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
}
}
}
/**
@ -522,6 +547,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
excludedNodes, blocksize, maxReplicasPerRack, results,
avoidStaleNodes, storageTypes);
} catch (NotEnoughReplicasException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to choose remote rack (location = ~"
+ localMachine.getNetworkLocation() + "), fallback to local rack", e);
}
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
localMachine.getNetworkLocation(), excludedNodes, blocksize,
maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
@ -572,6 +601,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
DatanodeDescriptor chosenNode =
(DatanodeDescriptor)clusterMap.chooseRandom(scope);
if (excludedNodes.add(chosenNode)) { //was not in the excluded list
if (LOG.isDebugEnabled()) {
builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" [");
}
numOfAvailableNodes--;
final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
@ -603,6 +635,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
}
}
if (LOG.isDebugEnabled()) {
builder.append("\n]");
}
// If no candidate storage was found on this DN then set badTarget.
badTarget = (i == storages.length);
@ -613,9 +648,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
String detail = enableDebugLogging;
if (LOG.isDebugEnabled()) {
if (badTarget && builder != null) {
detail = builder.append("]").toString();
detail = builder.toString();
builder.setLength(0);
} else detail = "";
} else {
detail = "";
}
}
throw new NotEnoughReplicasException(detail);
}
@ -649,14 +686,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
private static void logNodeIsNotChosen(DatanodeStorageInfo storage, String reason) {
if (LOG.isDebugEnabled()) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
// build the error message for later use.
debugLoggingBuilder.get()
.append(node).append(": ")
.append("Storage ").append(storage)
.append("at node ").append(NodeBase.getPath(node))
.append(" is not chosen because ")
.append(reason);
.append("\n Storage ").append(storage)
.append(" is not chosen since ").append(reason).append(".");
}
}
@ -681,11 +714,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
boolean considerLoad,
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
StorageType storageType) {
if (storage.getStorageType() != storageType) {
logNodeIsNotChosen(storage,
"storage types do not match, where the expected storage type is "
+ storageType);
StorageType requiredStorageType) {
if (storage.getStorageType() != requiredStorageType) {
logNodeIsNotChosen(storage, "storage types do not match,"
+ " where the required storage type is " + requiredStorageType);
return false;
}
if (storage.getState() == State.READ_ONLY_SHARED) {
@ -707,9 +739,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE;
final long scheduledSize = blockSize * node.getBlocksScheduled();
if (requiredSize > node.getRemaining() - scheduledSize) {
logNodeIsNotChosen(storage, "the node does not have enough space ");
final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType());
final long remaining = node.getRemaining(storage.getStorageType());
if (requiredSize > remaining - scheduledSize) {
logNodeIsNotChosen(storage, "the node does not have enough "
+ storage.getStorageType() + " space"
+ " (required=" + requiredSize
+ ", scheduled=" + scheduledSize
+ ", remaining=" + remaining + ")");
return false;
}
@ -718,8 +755,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
final int nodeLoad = node.getXceiverCount();
if (nodeLoad > maxLoad) {
logNodeIsNotChosen(storage,
"the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
logNodeIsNotChosen(storage, "the node is too busy (load: " + nodeLoad
+ " > " + maxLoad + ") ");
return false;
}
}

View File

@ -28,16 +28,19 @@ import java.util.Map;
import java.util.Queue;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.Time;
@ -202,8 +205,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
* in case of errors (e.g. datanode does not report if an error occurs
* while writing the block).
*/
private int currApproxBlocksScheduled = 0;
private int prevApproxBlocksScheduled = 0;
private EnumCounters<StorageType> currApproxBlocksScheduled
= new EnumCounters<StorageType>(StorageType.class);
private EnumCounters<StorageType> prevApproxBlocksScheduled
= new EnumCounters<StorageType>(StorageType.class);
private long lastBlocksScheduledRollTime = 0;
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
private int volumeFailures = 0;
@ -474,25 +479,48 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
/**
* @return Approximate number of blocks currently scheduled to be written
*/
public long getRemaining(StorageType t) {
long remaining = 0;
for(DatanodeStorageInfo s : getStorageInfos()) {
if (s.getStorageType() == t) {
remaining += s.getRemaining();
}
}
return remaining;
}
/**
* @return Approximate number of blocks currently scheduled to be written
* to the given storage type of this datanode.
*/
public int getBlocksScheduled(StorageType t) {
return (int)(currApproxBlocksScheduled.get(t)
+ prevApproxBlocksScheduled.get(t));
}
/**
* @return Approximate number of blocks currently scheduled to be written
* to this datanode.
*/
public int getBlocksScheduled() {
return currApproxBlocksScheduled + prevApproxBlocksScheduled;
return (int)(currApproxBlocksScheduled.sum()
+ prevApproxBlocksScheduled.sum());
}
/** Increment the number of blocks scheduled. */
void incrementBlocksScheduled() {
currApproxBlocksScheduled++;
void incrementBlocksScheduled(StorageType t) {
currApproxBlocksScheduled.add(t, 1);;
}
/** Decrement the number of blocks scheduled. */
void decrementBlocksScheduled() {
if (prevApproxBlocksScheduled > 0) {
prevApproxBlocksScheduled--;
} else if (currApproxBlocksScheduled > 0) {
currApproxBlocksScheduled--;
void decrementBlocksScheduled(StorageType t) {
if (prevApproxBlocksScheduled.get(t) > 0) {
prevApproxBlocksScheduled.subtract(t, 1);
} else if (currApproxBlocksScheduled.get(t) > 0) {
currApproxBlocksScheduled.subtract(t, 1);
}
// its ok if both counters are zero.
}
@ -500,8 +528,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
/** Adjusts curr and prev number of blocks scheduled every few minutes. */
private void rollBlocksScheduled(long now) {
if (now - lastBlocksScheduledRollTime > BLOCKS_SCHEDULED_ROLL_INTERVAL) {
prevApproxBlocksScheduled = currApproxBlocksScheduled;
currApproxBlocksScheduled = 0;
prevApproxBlocksScheduled.set(currApproxBlocksScheduled);
currApproxBlocksScheduled.reset();
lastBlocksScheduledRollTime = now;
}
}

View File

@ -283,7 +283,7 @@ public class DatanodeStorageInfo {
/** Increment the number of blocks scheduled for each given storage */
public static void incrementBlocksScheduled(DatanodeStorageInfo... storages) {
for (DatanodeStorageInfo s : storages) {
s.getDatanodeDescriptor().incrementBlocksScheduled();
s.getDatanodeDescriptor().incrementBlocksScheduled(s.getStorageType());
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.mover;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -432,6 +433,12 @@ public class Mover {
}
return expected.isEmpty() || existing.isEmpty();
}
@Override
public String toString() {
return getClass().getSimpleName() + "{expected=" + expected
+ ", existing=" + existing + "}";
}
}
static int run(Collection<URI> namenodes, Configuration conf)

View File

@ -106,6 +106,15 @@ public class EnumCounters<E extends Enum<E>> {
}
}
/** @return the sum of all counters. */
public final long sum() {
long sum = 0;
for(int i = 0; i < counters.length; i++) {
sum += counters[i];
}
return sum;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {

View File

@ -1474,19 +1474,21 @@ public class MiniDFSCluster {
secureResources, dn.getIpcPort()));
dns[i - curDatanodesNum] = dn;
}
curDatanodesNum += numDataNodes;
this.numDataNodes += numDataNodes;
waitActive();
if (storageCapacities != null) {
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
List<? extends FsVolumeSpi> volumes = dns[i].getFSDataset().getVolumes();
assert storageCapacities[i].length == storagesPerDatanode;
final int index = i - curDatanodesNum;
List<? extends FsVolumeSpi> volumes = dns[index].getFSDataset().getVolumes();
assert storageCapacities[index].length == storagesPerDatanode;
assert volumes.size() == storagesPerDatanode;
for (int j = 0; j < volumes.size(); ++j) {
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
volume.setCapacityForTesting(storageCapacities[i][j]);
LOG.info("setCapacityForTesting " + storageCapacities[index][j]
+ " for [" + volume.getStorageType() + "]" + volume.getStorageID());
volume.setCapacityForTesting(storageCapacities[index][j]);
}
}
}

View File

@ -17,30 +17,58 @@
*/
package org.apache.hadoop.hdfs.server.mover;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
import java.net.URI;
import java.util.*;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
/**
* Test the data migration tool (for Archival Storage)
*/
public class TestStorageMover {
private static final long BLOCK_SIZE = 1024;
static final Log LOG = LogFactory.getLog(TestStorageMover.class);
static {
((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class)
).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog(Dispatcher.class)
).getLogger().setLevel(Level.ALL);
}
private static final int BLOCK_SIZE = 1024;
private static final short REPL = 3;
private static final int NUM_DATANODES = 6;
private static final Configuration DEFAULT_CONF = new HdfsConfiguration();
@ -50,12 +78,15 @@ public class TestStorageMover {
private static final BlockStoragePolicy COLD;
static {
DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(new
HdfsConfiguration());
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L);
DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(DEFAULT_CONF);
HOT = DEFAULT_POLICIES.getPolicy("HOT");
WARM = DEFAULT_POLICIES.getPolicy("WARM");
COLD = DEFAULT_POLICIES.getPolicy("COLD");
Dispatcher.setBlockMoveWaitTime(10 * 1000);
Dispatcher.setBlockMoveWaitTime(1000L);
}
/**
@ -63,17 +94,48 @@ public class TestStorageMover {
* also defines snapshots.
*/
static class NamespaceScheme {
final List<Path> dirs;
final List<Path> files;
final long fileSize;
final Map<Path, List<String>> snapshotMap;
final Map<Path, BlockStoragePolicy> policyMap;
NamespaceScheme(List<Path> files, Map<Path,List<String>> snapshotMap,
NamespaceScheme(List<Path> dirs, List<Path> files, long fileSize,
Map<Path,List<String>> snapshotMap,
Map<Path, BlockStoragePolicy> policyMap) {
this.files = files;
this.dirs = dirs == null? Collections.<Path>emptyList(): dirs;
this.files = files == null? Collections.<Path>emptyList(): files;
this.fileSize = fileSize;
this.snapshotMap = snapshotMap == null ?
new HashMap<Path, List<String>>() : snapshotMap;
Collections.<Path, List<String>>emptyMap() : snapshotMap;
this.policyMap = policyMap;
}
/**
* Create files/directories/snapshots.
*/
void prepare(DistributedFileSystem dfs, short repl) throws Exception {
for (Path d : dirs) {
dfs.mkdirs(d);
}
for (Path file : files) {
DFSTestUtil.createFile(dfs, file, fileSize, repl, 0L);
}
for (Map.Entry<Path, List<String>> entry : snapshotMap.entrySet()) {
for (String snapshot : entry.getValue()) {
SnapshotTestHelper.createSnapshot(dfs, entry.getKey(), snapshot);
}
}
}
/**
* Set storage policies according to the corresponding scheme.
*/
void setStoragePolicy(DistributedFileSystem dfs) throws Exception {
for (Map.Entry<Path, BlockStoragePolicy> entry : policyMap.entrySet()) {
dfs.setStoragePolicy(entry.getKey(), entry.getValue().getName());
}
}
}
/**
@ -87,6 +149,11 @@ public class TestStorageMover {
final StorageType[][] storageTypes;
final long[][] storageCapacities;
ClusterScheme() {
this(DEFAULT_CONF, NUM_DATANODES, REPL,
genStorageTypes(NUM_DATANODES, 1, 1), null);
}
ClusterScheme(Configuration conf, int numDataNodes, short repl,
StorageType[][] types, long[][] capacities) {
Preconditions.checkArgument(types == null || types.length == numDataNodes);
@ -128,6 +195,22 @@ public class TestStorageMover {
dfs = cluster.getFileSystem();
}
private void runBasicTest(boolean shotdown) throws Exception {
setupCluster();
try {
prepareNamespace();
verify(true);
setStoragePolicy();
migrate();
verify(true);
} finally {
if (shotdown) {
shutdownCluster();
}
}
}
void shutdownCluster() throws Exception {
IOUtils.cleanup(null, dfs);
if (cluster != null) {
@ -140,18 +223,11 @@ public class TestStorageMover {
* corresponding scheme.
*/
void prepareNamespace() throws Exception {
for (Path file : nsScheme.files) {
DFSTestUtil.createFile(dfs, file, BLOCK_SIZE * 2, clusterScheme.repl,
0L);
}
for (Map.Entry<Path, List<String>> entry : nsScheme.snapshotMap.entrySet()) {
for (String snapshot : entry.getValue()) {
SnapshotTestHelper.createSnapshot(dfs, entry.getKey(), snapshot);
}
}
for (Map.Entry<Path, BlockStoragePolicy> entry : nsScheme.policyMap.entrySet()) {
dfs.setStoragePolicy(entry.getKey(), entry.getValue().getName());
nsScheme.prepare(dfs, clusterScheme.repl);
}
void setStoragePolicy() throws Exception {
nsScheme.setStoragePolicy(dfs);
}
/**
@ -159,6 +235,7 @@ public class TestStorageMover {
*/
void migrate(String... args) throws Exception {
runMover();
Thread.sleep(5000); // let the NN finish deletion
}
/**
@ -195,38 +272,128 @@ public class TestStorageMover {
verifyRecursively(fullPath, child);
}
} else if (!status.isSymlink()) { // is file
verifyFile(parent, status, null);
}
}
private void verifyFile(final Path parent, final HdfsFileStatus status,
final Byte expectedPolicyId) throws Exception {
HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status;
byte policyId = fileStatus.getStoragePolicy();
BlockStoragePolicy policy = policies.getPolicy(policyId);
if (expectedPolicyId != null) {
Assert.assertEquals(expectedPolicyId, policy);
}
final List<StorageType> types = policy.chooseStorageTypes(
status.getReplication());
for(LocatedBlock lb : fileStatus.getBlockLocations().getLocatedBlocks()) {
final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types,
lb.getStorageTypes());
Assert.assertTrue(diff.removeOverlap());
}
}
Assert.assertTrue(fileStatus.getFullName(parent.toString())
+ " with policy " + policy + " has non-empty overlap: " + diff,
diff.removeOverlap());
}
}
Replication getReplication(Path file) throws IOException {
return getOrVerifyReplication(file, null);
}
Replication verifyReplication(Path file, int expectedDiskCount,
int expectedArchiveCount) throws IOException {
final Replication r = new Replication();
r.disk = expectedDiskCount;
r.archive = expectedArchiveCount;
return getOrVerifyReplication(file, r);
}
private Replication getOrVerifyReplication(Path file, Replication expected)
throws IOException {
final List<LocatedBlock> lbs = dfs.getClient().getLocatedBlocks(
file.toString(), 0).getLocatedBlocks();
Assert.assertEquals(1, lbs.size());
LocatedBlock lb = lbs.get(0);
StringBuilder types = new StringBuilder();
final Replication r = new Replication();
for(StorageType t : lb.getStorageTypes()) {
types.append(t).append(", ");
if (t == StorageType.DISK) {
r.disk++;
} else if (t == StorageType.ARCHIVE) {
r.archive++;
} else {
Assert.fail("Unexpected storage type " + t);
}
}
if (expected != null) {
final String s = "file = " + file + "\n types = [" + types + "]";
Assert.assertEquals(s, expected, r);
}
return r;
}
}
static class Replication {
int disk;
int archive;
@Override
public int hashCode() {
return disk ^ archive;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (obj == null || !(obj instanceof Replication)) {
return false;
}
final Replication that = (Replication)obj;
return this.disk == that.disk && this.archive == that.archive;
}
@Override
public String toString() {
return "[disk=" + disk + ", archive=" + archive + "]";
}
}
private static StorageType[][] genStorageTypes(int numDataNodes) {
return genStorageTypes(numDataNodes, 0, 0);
}
private static StorageType[][] genStorageTypes(int numDataNodes,
int numAllDisk, int numAllArchive) {
StorageType[][] types = new StorageType[numDataNodes][];
for (int i = 0; i < types.length; i++) {
int i = 0;
for (; i < numAllDisk; i++) {
types[i] = new StorageType[]{StorageType.DISK, StorageType.DISK};
}
for (; i < numAllDisk + numAllArchive; i++) {
types[i] = new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE};
}
for (; i < types.length; i++) {
types[i] = new StorageType[]{StorageType.DISK, StorageType.ARCHIVE};
}
return types;
}
private void runTest(MigrationTest test) throws Exception {
test.setupCluster();
try {
test.prepareNamespace();
test.migrate();
Thread.sleep(5000); // let the NN finish deletion
test.verify(true);
} finally {
test.shutdownCluster();
private static long[][] genCapacities(int nDatanodes, int numAllDisk,
int numAllArchive, long diskCapacity, long archiveCapacity) {
final long[][] capacities = new long[nDatanodes][];
int i = 0;
for (; i < numAllDisk; i++) {
capacities[i] = new long[]{diskCapacity, diskCapacity};
}
for (; i < numAllDisk + numAllArchive; i++) {
capacities[i] = new long[]{archiveCapacity, archiveCapacity};
}
for(; i < capacities.length; i++) {
capacities[i] = new long[]{diskCapacity, archiveCapacity};
}
return capacities;
}
/**
@ -237,11 +404,227 @@ public class TestStorageMover {
final Path foo = new Path("/foo");
Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
policyMap.put(foo, COLD);
NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(foo), null,
policyMap);
NamespaceScheme nsScheme = new NamespaceScheme(null, Arrays.asList(foo),
2*BLOCK_SIZE, null, policyMap);
ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
new MigrationTest(clusterScheme, nsScheme).runBasicTest(true);
}
private static class PathPolicyMap {
final Map<Path, BlockStoragePolicy> map = Maps.newHashMap();
final Path hot = new Path("/hot");
final Path warm = new Path("/warm");
final Path cold = new Path("/cold");
final List<Path> files;
PathPolicyMap(int filesPerDir){
map.put(hot, HOT);
map.put(warm, WARM);
map.put(cold, COLD);
files = new ArrayList<Path>();
for(Path dir : map.keySet()) {
for(int i = 0; i < filesPerDir; i++) {
files.add(new Path(dir, "file" + i));
}
}
}
NamespaceScheme newNamespaceScheme() {
return new NamespaceScheme(Arrays.asList(hot, warm, cold),
files, BLOCK_SIZE/2, null, map);
}
/**
* Move hot files to warm and cold, warm files to hot and cold,
* and cold files to hot and warm.
*/
void moveAround(DistributedFileSystem dfs) throws Exception {
for(Path srcDir : map.keySet()) {
int i = 0;
for(Path dstDir : map.keySet()) {
if (!srcDir.equals(dstDir)) {
final Path src = new Path(srcDir, "file" + i++);
final Path dst = new Path(dstDir, srcDir.getName() + "2" + dstDir.getName());
LOG.info("rename " + src + " to " + dst);
dfs.rename(src, dst);
}
}
}
}
}
/**
* Test directories with Hot, Warm and Cold polices.
*/
@Test
public void testHotWarmColdDirs() throws Exception {
PathPolicyMap pathPolicyMap = new PathPolicyMap(3);
NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
ClusterScheme clusterScheme = new ClusterScheme();
MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
runTest(test);
test.runBasicTest(false);
pathPolicyMap.moveAround(test.dfs);
test.migrate();
test.verify(true);
test.shutdownCluster();
}
/**
* Test DISK is running out of spaces.
*/
@Test
public void testNoSpaceDisk() throws Exception {
final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
final long diskCapacity = (3 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE;
final long archiveCapacity = 100*BLOCK_SIZE;
final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1,
diskCapacity, archiveCapacity);
final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities);
final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
test.runBasicTest(false);
// create hot files with replication 3 until not more spaces.
final short replication = 3;
{
int hotFileCount = 0;
try {
for(; ; hotFileCount++) {
final Path p = new Path(pathPolicyMap.hot, "file" + hotFileCount);
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
}
} catch(IOException e) {
LOG.info("Expected: hotFileCount=" + hotFileCount, e);
}
Assert.assertTrue(hotFileCount >= 2);
}
// create hot files with replication 1 to use up all remaining spaces.
{
int hotFileCount_r1 = 0;
try {
for(; ; hotFileCount_r1++) {
final Path p = new Path(pathPolicyMap.hot, "file_r1_" + hotFileCount_r1);
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)1, 0L);
}
} catch(IOException e) {
LOG.info("Expected: hotFileCount_r1=" + hotFileCount_r1, e);
}
}
{ // test increasing replication. Since DISK is full,
// new replicas should be stored in ARCHIVE as a fallback storage.
final Path file0 = new Path(pathPolicyMap.hot, "file0");
final Replication r = test.getReplication(file0);
LOG.info("XXX " + file0 + ": replication=" + r);
final short newReplication = (short)5;
test.dfs.setReplication(file0, newReplication);
// DFSTestUtil.waitReplication(test.dfs, file0, newReplication);
Thread.sleep(10000);
test.verifyReplication(file0, r.disk, newReplication - r.disk);
}
{ // test creating a cold file and then increase replication
final Path p = new Path(pathPolicyMap.cold, "foo");
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
test.verifyReplication(p, 0, replication);
final short newReplication = 5;
test.dfs.setReplication(p, newReplication);
// DFSTestUtil.waitReplication(test.dfs, p, newReplication);
Thread.sleep(10000);
test.verifyReplication(p, 0, newReplication);
}
{ //test move a hot file to warm
//TODO: fix Mover not terminate in the test below
// final Path file1 = new Path(pathPolicyMap.hot, "file1");
// test.dfs.rename(file1, pathPolicyMap.warm);
// test.migrate();
// test.verify(true);
}
test.shutdownCluster();
}
/**
* Test ARCHIVE is running out of spaces.
*/
@Test
public void testNoSpaceArchive() throws Exception {
final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
final long diskCapacity = 100*BLOCK_SIZE;
final long archiveCapacity = (2 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE;
final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1,
diskCapacity, archiveCapacity);
final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities);
final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
test.runBasicTest(false);
// create cold files with replication 3 until not more spaces.
final short replication = 3;
{
int coldFileCount = 0;
try {
for(; ; coldFileCount++) {
final Path p = new Path(pathPolicyMap.cold, "file" + coldFileCount);
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
}
} catch(IOException e) {
LOG.info("Expected: coldFileCount=" + coldFileCount, e);
}
Assert.assertTrue(coldFileCount >= 2);
}
// create cold files with replication 1 to use up all remaining spaces.
{
int coldFileCount_r1 = 0;
try {
for(; ; coldFileCount_r1++) {
final Path p = new Path(pathPolicyMap.cold, "file_r1_" + coldFileCount_r1);
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)1, 0L);
}
} catch(IOException e) {
LOG.info("Expected: coldFileCount_r1=" + coldFileCount_r1, e);
}
}
{ // test increasing replication but new replicas cannot be created
// since no more ARCHIVE space.
final Path file0 = new Path(pathPolicyMap.cold, "file0");
final Replication r = test.getReplication(file0);
LOG.info("XXX " + file0 + ": replication=" + r);
Assert.assertEquals(0, r.disk);
final short newReplication = (short)5;
test.dfs.setReplication(file0, newReplication);
Thread.sleep(10000);
test.verifyReplication(file0, 0, r.archive);
}
{ // test creating a hot file
final Path p = new Path(pathPolicyMap.hot, "foo");
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)3, 0L);
}
{ //test move a cold file to warm
final Path file1 = new Path(pathPolicyMap.hot, "file1");
test.dfs.rename(file1, pathPolicyMap.warm);
test.migrate();
test.verify(true);
}
test.shutdownCluster();
}
}