diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index af836532bce..956900d32fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2930,7 +2930,7 @@ public class BlockManager { // Decrement number of blocks scheduled to this datanode. // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with // RECEIVED_BLOCK), we currently also decrease the approximate number. - node.decrementBlocksScheduled(); + node.decrementBlocksScheduled(storageInfo.getStorageType()); // get the deletion hint node DatanodeDescriptor delHintNode = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 593ea90e70e..a0e67013649 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -76,12 +76,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { */ protected int tolerateHeartbeatMultiplier; - protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap, - Host2NodesMap host2datanodeMap) { - initialize(conf, stats, clusterMap, host2datanodeMap); - } - protected BlockPlacementPolicyDefault() { } @@ -174,6 +168,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { return getPipeline(writer, results.toArray(new DatanodeStorageInfo[results.size()])); } catch (NotEnoughReplicasException nr) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to choose with favored nodes (=" + favoredNodes + + "), disregard favored nodes hint and retry.", nr); + } // Fall back to regular block placement disregarding favored nodes hint return chooseTarget(src, numOfReplicas, writer, new ArrayList(numOfReplicas), false, @@ -291,6 +289,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { unavailableStorages, newBlock); final EnumMap storageTypes = getRequiredStorageTypes(requiredStorageTypes); + if (LOG.isTraceEnabled()) { + LOG.trace("storageTypes=" + storageTypes); + } try { if ((numOfReplicas = requiredStorageTypes.size()) == 0) { @@ -337,7 +338,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } catch (NotEnoughReplicasException e) { final String message = "Failed to place enough replicas, still in need of " + (totalReplicasExpected - results.size()) + " to reach " - + totalReplicasExpected + "."; + + totalReplicasExpected + + " (unavailableStorages=" + unavailableStorages + + ", storagePolicy=" + storagePolicy + + ", newBlock=" + newBlock + ")"; + if (LOG.isTraceEnabled()) { LOG.trace(message, e); } else { @@ -466,39 +471,59 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); } + final String localRack = localMachine.getNetworkLocation(); - // choose one from the local rack try { - return chooseRandom(localMachine.getNetworkLocation(), excludedNodes, + // choose one from the local rack + return chooseRandom(localRack, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); - } catch (NotEnoughReplicasException e1) { - // find the second replica - DatanodeDescriptor newLocal=null; + } catch (NotEnoughReplicasException e) { + // find the next replica and retry with its rack for(DatanodeStorageInfo resultStorage : results) { DatanodeDescriptor nextNode = resultStorage.getDatanodeDescriptor(); if (nextNode != localMachine) { - newLocal = nextNode; - break; - } - } - if (newLocal != null) { - try { - return chooseRandom(newLocal.getNetworkLocation(), excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, - storageTypes); - } catch(NotEnoughReplicasException e2) { - //otherwise randomly choose one from the network - return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to choose from local rack (location = " + localRack + + "), retry with the rack of the next replica (location = " + + nextNode.getNetworkLocation() + ")", e); + } + return chooseFromNextRack(nextNode, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); } - } else { - //otherwise randomly choose one from the network - return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes); } + + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to choose from local rack (location = " + localRack + + "); the second replica is not found, retry choosing ramdomly", e); + } + //the second replica is not found, randomly choose one from the network + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } } - + + private DatanodeStorageInfo chooseFromNextRack(Node next, + Set excludedNodes, + long blocksize, + int maxNodesPerRack, + List results, + boolean avoidStaleNodes, + EnumMap storageTypes) throws NotEnoughReplicasException { + final String nextRack = next.getNetworkLocation(); + try { + return chooseRandom(nextRack, excludedNodes, blocksize, maxNodesPerRack, + results, avoidStaleNodes, storageTypes); + } catch(NotEnoughReplicasException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to choose from the next rack (location = " + nextRack + + "), retry choosing ramdomly", e); + } + //otherwise randomly choose one from the network + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); + } + } + /** * Choose numOfReplicas nodes from the racks * that localMachine is NOT on. @@ -522,6 +547,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { excludedNodes, blocksize, maxReplicasPerRack, results, avoidStaleNodes, storageTypes); } catch (NotEnoughReplicasException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to choose remote rack (location = ~" + + localMachine.getNetworkLocation() + "), fallback to local rack", e); + } chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas), localMachine.getNetworkLocation(), excludedNodes, blocksize, maxReplicasPerRack, results, avoidStaleNodes, storageTypes); @@ -572,6 +601,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { DatanodeDescriptor chosenNode = (DatanodeDescriptor)clusterMap.chooseRandom(scope); if (excludedNodes.add(chosenNode)) { //was not in the excluded list + if (LOG.isDebugEnabled()) { + builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" ["); + } numOfAvailableNodes--; final DatanodeStorageInfo[] storages = DFSUtil.shuffle( @@ -603,6 +635,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } } } + if (LOG.isDebugEnabled()) { + builder.append("\n]"); + } // If no candidate storage was found on this DN then set badTarget. badTarget = (i == storages.length); @@ -613,9 +648,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { String detail = enableDebugLogging; if (LOG.isDebugEnabled()) { if (badTarget && builder != null) { - detail = builder.append("]").toString(); + detail = builder.toString(); builder.setLength(0); - } else detail = ""; + } else { + detail = ""; + } } throw new NotEnoughReplicasException(detail); } @@ -649,14 +686,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { private static void logNodeIsNotChosen(DatanodeStorageInfo storage, String reason) { if (LOG.isDebugEnabled()) { - final DatanodeDescriptor node = storage.getDatanodeDescriptor(); // build the error message for later use. debugLoggingBuilder.get() - .append(node).append(": ") - .append("Storage ").append(storage) - .append("at node ").append(NodeBase.getPath(node)) - .append(" is not chosen because ") - .append(reason); + .append("\n Storage ").append(storage) + .append(" is not chosen since ").append(reason).append("."); } } @@ -681,11 +714,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { boolean considerLoad, List results, boolean avoidStaleNodes, - StorageType storageType) { - if (storage.getStorageType() != storageType) { - logNodeIsNotChosen(storage, - "storage types do not match, where the expected storage type is " - + storageType); + StorageType requiredStorageType) { + if (storage.getStorageType() != requiredStorageType) { + logNodeIsNotChosen(storage, "storage types do not match," + + " where the required storage type is " + requiredStorageType); return false; } if (storage.getState() == State.READ_ONLY_SHARED) { @@ -707,9 +739,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE; - final long scheduledSize = blockSize * node.getBlocksScheduled(); - if (requiredSize > node.getRemaining() - scheduledSize) { - logNodeIsNotChosen(storage, "the node does not have enough space "); + final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType()); + final long remaining = node.getRemaining(storage.getStorageType()); + if (requiredSize > remaining - scheduledSize) { + logNodeIsNotChosen(storage, "the node does not have enough " + + storage.getStorageType() + " space" + + " (required=" + requiredSize + + ", scheduled=" + scheduledSize + + ", remaining=" + remaining + ")"); return false; } @@ -718,8 +755,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { final double maxLoad = 2.0 * stats.getInServiceXceiverAverage(); final int nodeLoad = node.getXceiverCount(); if (nodeLoad > maxLoad) { - logNodeIsNotChosen(storage, - "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") "); + logNodeIsNotChosen(storage, "the node is too busy (load: " + nodeLoad + + " > " + maxLoad + ") "); return false; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 34be727ff02..55599f7d3ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -28,16 +28,19 @@ import java.util.Map; import java.util.Queue; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.util.EnumCounters; import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.util.IntrusiveCollection; import org.apache.hadoop.util.Time; @@ -202,8 +205,10 @@ public class DatanodeDescriptor extends DatanodeInfo { * in case of errors (e.g. datanode does not report if an error occurs * while writing the block). */ - private int currApproxBlocksScheduled = 0; - private int prevApproxBlocksScheduled = 0; + private EnumCounters currApproxBlocksScheduled + = new EnumCounters(StorageType.class); + private EnumCounters prevApproxBlocksScheduled + = new EnumCounters(StorageType.class); private long lastBlocksScheduledRollTime = 0; private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min private int volumeFailures = 0; @@ -474,25 +479,48 @@ public class DatanodeDescriptor extends DatanodeInfo { } } + /** + * @return Approximate number of blocks currently scheduled to be written + */ + public long getRemaining(StorageType t) { + long remaining = 0; + for(DatanodeStorageInfo s : getStorageInfos()) { + if (s.getStorageType() == t) { + remaining += s.getRemaining(); + } + } + return remaining; + } + + /** + * @return Approximate number of blocks currently scheduled to be written + * to the given storage type of this datanode. + */ + public int getBlocksScheduled(StorageType t) { + return (int)(currApproxBlocksScheduled.get(t) + + prevApproxBlocksScheduled.get(t)); + } + /** * @return Approximate number of blocks currently scheduled to be written * to this datanode. */ public int getBlocksScheduled() { - return currApproxBlocksScheduled + prevApproxBlocksScheduled; + return (int)(currApproxBlocksScheduled.sum() + + prevApproxBlocksScheduled.sum()); } /** Increment the number of blocks scheduled. */ - void incrementBlocksScheduled() { - currApproxBlocksScheduled++; + void incrementBlocksScheduled(StorageType t) { + currApproxBlocksScheduled.add(t, 1);; } /** Decrement the number of blocks scheduled. */ - void decrementBlocksScheduled() { - if (prevApproxBlocksScheduled > 0) { - prevApproxBlocksScheduled--; - } else if (currApproxBlocksScheduled > 0) { - currApproxBlocksScheduled--; + void decrementBlocksScheduled(StorageType t) { + if (prevApproxBlocksScheduled.get(t) > 0) { + prevApproxBlocksScheduled.subtract(t, 1); + } else if (currApproxBlocksScheduled.get(t) > 0) { + currApproxBlocksScheduled.subtract(t, 1); } // its ok if both counters are zero. } @@ -500,8 +528,8 @@ public class DatanodeDescriptor extends DatanodeInfo { /** Adjusts curr and prev number of blocks scheduled every few minutes. */ private void rollBlocksScheduled(long now) { if (now - lastBlocksScheduledRollTime > BLOCKS_SCHEDULED_ROLL_INTERVAL) { - prevApproxBlocksScheduled = currApproxBlocksScheduled; - currApproxBlocksScheduled = 0; + prevApproxBlocksScheduled.set(currApproxBlocksScheduled); + currApproxBlocksScheduled.reset(); lastBlocksScheduledRollTime = now; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 9f07a38cf84..04f17acca4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -283,7 +283,7 @@ public class DatanodeStorageInfo { /** Increment the number of blocks scheduled for each given storage */ public static void incrementBlocksScheduled(DatanodeStorageInfo... storages) { for (DatanodeStorageInfo s : storages) { - s.getDatanodeDescriptor().incrementBlocksScheduled(); + s.getDatanodeDescriptor().incrementBlocksScheduled(s.getStorageType()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 2bb1317ad5b..57ad6aa8bad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.mover; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -432,6 +433,12 @@ public class Mover { } return expected.isEmpty() || existing.isEmpty(); } + + @Override + public String toString() { + return getClass().getSimpleName() + "{expected=" + expected + + ", existing=" + existing + "}"; + } } static int run(Collection namenodes, Configuration conf) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java index 8bdea1fd59e..8a8e61fe31d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java @@ -105,6 +105,15 @@ public class EnumCounters> { this.counters[i] -= that.counters[i]; } } + + /** @return the sum of all counters. */ + public final long sum() { + long sum = 0; + for(int i = 0; i < counters.length; i++) { + sum += counters[i]; + } + return sum; + } @Override public boolean equals(Object obj) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 0e49cfec053..0512b7f4365 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1474,19 +1474,21 @@ public class MiniDFSCluster { secureResources, dn.getIpcPort())); dns[i - curDatanodesNum] = dn; } - curDatanodesNum += numDataNodes; this.numDataNodes += numDataNodes; waitActive(); - + if (storageCapacities != null) { for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) { - List volumes = dns[i].getFSDataset().getVolumes(); - assert storageCapacities[i].length == storagesPerDatanode; + final int index = i - curDatanodesNum; + List volumes = dns[index].getFSDataset().getVolumes(); + assert storageCapacities[index].length == storagesPerDatanode; assert volumes.size() == storagesPerDatanode; for (int j = 0; j < volumes.size(); ++j) { FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j); - volume.setCapacityForTesting(storageCapacities[i][j]); + LOG.info("setCapacityForTesting " + storageCapacities[index][j] + + " for [" + volume.getStorageType() + "]" + volume.getStorageID()); + volume.setCapacityForTesting(storageCapacities[index][j]); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java index d2a7fcc3543..88b399277c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java @@ -17,30 +17,58 @@ */ package org.apache.hadoop.hdfs.server.mover; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.balancer.Dispatcher; import org.apache.hadoop.hdfs.server.balancer.ExitStatus; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; import org.apache.hadoop.io.IOUtils; +import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Test; -import java.net.URI; -import java.util.*; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; /** * Test the data migration tool (for Archival Storage) */ public class TestStorageMover { - private static final long BLOCK_SIZE = 1024; + static final Log LOG = LogFactory.getLog(TestStorageMover.class); + static { + ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class) + ).getLogger().setLevel(Level.ALL); + ((Log4JLogger)LogFactory.getLog(Dispatcher.class) + ).getLogger().setLevel(Level.ALL); + } + + private static final int BLOCK_SIZE = 1024; private static final short REPL = 3; private static final int NUM_DATANODES = 6; private static final Configuration DEFAULT_CONF = new HdfsConfiguration(); @@ -50,12 +78,15 @@ public class TestStorageMover { private static final BlockStoragePolicy COLD; static { - DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(new - HdfsConfiguration()); + DEFAULT_CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + DEFAULT_CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L); + + DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(DEFAULT_CONF); HOT = DEFAULT_POLICIES.getPolicy("HOT"); WARM = DEFAULT_POLICIES.getPolicy("WARM"); COLD = DEFAULT_POLICIES.getPolicy("COLD"); - Dispatcher.setBlockMoveWaitTime(10 * 1000); + Dispatcher.setBlockMoveWaitTime(1000L); } /** @@ -63,17 +94,48 @@ public class TestStorageMover { * also defines snapshots. */ static class NamespaceScheme { + final List dirs; final List files; + final long fileSize; final Map> snapshotMap; final Map policyMap; - NamespaceScheme(List files, Map> snapshotMap, + NamespaceScheme(List dirs, List files, long fileSize, + Map> snapshotMap, Map policyMap) { - this.files = files; + this.dirs = dirs == null? Collections.emptyList(): dirs; + this.files = files == null? Collections.emptyList(): files; + this.fileSize = fileSize; this.snapshotMap = snapshotMap == null ? - new HashMap>() : snapshotMap; + Collections.>emptyMap() : snapshotMap; this.policyMap = policyMap; } + + /** + * Create files/directories/snapshots. + */ + void prepare(DistributedFileSystem dfs, short repl) throws Exception { + for (Path d : dirs) { + dfs.mkdirs(d); + } + for (Path file : files) { + DFSTestUtil.createFile(dfs, file, fileSize, repl, 0L); + } + for (Map.Entry> entry : snapshotMap.entrySet()) { + for (String snapshot : entry.getValue()) { + SnapshotTestHelper.createSnapshot(dfs, entry.getKey(), snapshot); + } + } + } + + /** + * Set storage policies according to the corresponding scheme. + */ + void setStoragePolicy(DistributedFileSystem dfs) throws Exception { + for (Map.Entry entry : policyMap.entrySet()) { + dfs.setStoragePolicy(entry.getKey(), entry.getValue().getName()); + } + } } /** @@ -87,6 +149,11 @@ public class TestStorageMover { final StorageType[][] storageTypes; final long[][] storageCapacities; + ClusterScheme() { + this(DEFAULT_CONF, NUM_DATANODES, REPL, + genStorageTypes(NUM_DATANODES, 1, 1), null); + } + ClusterScheme(Configuration conf, int numDataNodes, short repl, StorageType[][] types, long[][] capacities) { Preconditions.checkArgument(types == null || types.length == numDataNodes); @@ -128,6 +195,22 @@ public class TestStorageMover { dfs = cluster.getFileSystem(); } + private void runBasicTest(boolean shotdown) throws Exception { + setupCluster(); + try { + prepareNamespace(); + verify(true); + + setStoragePolicy(); + migrate(); + verify(true); + } finally { + if (shotdown) { + shutdownCluster(); + } + } + } + void shutdownCluster() throws Exception { IOUtils.cleanup(null, dfs); if (cluster != null) { @@ -140,18 +223,11 @@ public class TestStorageMover { * corresponding scheme. */ void prepareNamespace() throws Exception { - for (Path file : nsScheme.files) { - DFSTestUtil.createFile(dfs, file, BLOCK_SIZE * 2, clusterScheme.repl, - 0L); - } - for (Map.Entry> entry : nsScheme.snapshotMap.entrySet()) { - for (String snapshot : entry.getValue()) { - SnapshotTestHelper.createSnapshot(dfs, entry.getKey(), snapshot); - } - } - for (Map.Entry entry : nsScheme.policyMap.entrySet()) { - dfs.setStoragePolicy(entry.getKey(), entry.getValue().getName()); - } + nsScheme.prepare(dfs, clusterScheme.repl); + } + + void setStoragePolicy() throws Exception { + nsScheme.setStoragePolicy(dfs); } /** @@ -159,6 +235,7 @@ public class TestStorageMover { */ void migrate(String... args) throws Exception { runMover(); + Thread.sleep(5000); // let the NN finish deletion } /** @@ -195,38 +272,128 @@ public class TestStorageMover { verifyRecursively(fullPath, child); } } else if (!status.isSymlink()) { // is file - HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status; - byte policyId = fileStatus.getStoragePolicy(); - BlockStoragePolicy policy = policies.getPolicy(policyId); - final List types = policy.chooseStorageTypes( - status.getReplication()); - for(LocatedBlock lb : fileStatus.getBlockLocations().getLocatedBlocks()) { - final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types, - lb.getStorageTypes()); - Assert.assertTrue(diff.removeOverlap()); + verifyFile(parent, status, null); + } + } + + private void verifyFile(final Path parent, final HdfsFileStatus status, + final Byte expectedPolicyId) throws Exception { + HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status; + byte policyId = fileStatus.getStoragePolicy(); + BlockStoragePolicy policy = policies.getPolicy(policyId); + if (expectedPolicyId != null) { + Assert.assertEquals(expectedPolicyId, policy); + } + final List types = policy.chooseStorageTypes( + status.getReplication()); + for(LocatedBlock lb : fileStatus.getBlockLocations().getLocatedBlocks()) { + final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types, + lb.getStorageTypes()); + Assert.assertTrue(fileStatus.getFullName(parent.toString()) + + " with policy " + policy + " has non-empty overlap: " + diff, + diff.removeOverlap()); + } + } + + Replication getReplication(Path file) throws IOException { + return getOrVerifyReplication(file, null); + } + + Replication verifyReplication(Path file, int expectedDiskCount, + int expectedArchiveCount) throws IOException { + final Replication r = new Replication(); + r.disk = expectedDiskCount; + r.archive = expectedArchiveCount; + return getOrVerifyReplication(file, r); + } + + private Replication getOrVerifyReplication(Path file, Replication expected) + throws IOException { + final List lbs = dfs.getClient().getLocatedBlocks( + file.toString(), 0).getLocatedBlocks(); + Assert.assertEquals(1, lbs.size()); + + LocatedBlock lb = lbs.get(0); + StringBuilder types = new StringBuilder(); + final Replication r = new Replication(); + for(StorageType t : lb.getStorageTypes()) { + types.append(t).append(", "); + if (t == StorageType.DISK) { + r.disk++; + } else if (t == StorageType.ARCHIVE) { + r.archive++; + } else { + Assert.fail("Unexpected storage type " + t); } } + + if (expected != null) { + final String s = "file = " + file + "\n types = [" + types + "]"; + Assert.assertEquals(s, expected, r); + } + return r; } } + static class Replication { + int disk; + int archive; + + @Override + public int hashCode() { + return disk ^ archive; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj == null || !(obj instanceof Replication)) { + return false; + } + final Replication that = (Replication)obj; + return this.disk == that.disk && this.archive == that.archive; + } + + @Override + public String toString() { + return "[disk=" + disk + ", archive=" + archive + "]"; + } + } private static StorageType[][] genStorageTypes(int numDataNodes) { + return genStorageTypes(numDataNodes, 0, 0); + } + + private static StorageType[][] genStorageTypes(int numDataNodes, + int numAllDisk, int numAllArchive) { StorageType[][] types = new StorageType[numDataNodes][]; - for (int i = 0; i < types.length; i++) { + int i = 0; + for (; i < numAllDisk; i++) { + types[i] = new StorageType[]{StorageType.DISK, StorageType.DISK}; + } + for (; i < numAllDisk + numAllArchive; i++) { + types[i] = new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE}; + } + for (; i < types.length; i++) { types[i] = new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}; } return types; } - - private void runTest(MigrationTest test) throws Exception { - test.setupCluster(); - try { - test.prepareNamespace(); - test.migrate(); - Thread.sleep(5000); // let the NN finish deletion - test.verify(true); - } finally { - test.shutdownCluster(); + + private static long[][] genCapacities(int nDatanodes, int numAllDisk, + int numAllArchive, long diskCapacity, long archiveCapacity) { + final long[][] capacities = new long[nDatanodes][]; + int i = 0; + for (; i < numAllDisk; i++) { + capacities[i] = new long[]{diskCapacity, diskCapacity}; } + for (; i < numAllDisk + numAllArchive; i++) { + capacities[i] = new long[]{archiveCapacity, archiveCapacity}; + } + for(; i < capacities.length; i++) { + capacities[i] = new long[]{diskCapacity, archiveCapacity}; + } + return capacities; } /** @@ -237,11 +404,227 @@ public class TestStorageMover { final Path foo = new Path("/foo"); Map policyMap = Maps.newHashMap(); policyMap.put(foo, COLD); - NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(foo), null, - policyMap); + NamespaceScheme nsScheme = new NamespaceScheme(null, Arrays.asList(foo), + 2*BLOCK_SIZE, null, policyMap); ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null); + new MigrationTest(clusterScheme, nsScheme).runBasicTest(true); + } + + private static class PathPolicyMap { + final Map map = Maps.newHashMap(); + final Path hot = new Path("/hot"); + final Path warm = new Path("/warm"); + final Path cold = new Path("/cold"); + final List files; + + PathPolicyMap(int filesPerDir){ + map.put(hot, HOT); + map.put(warm, WARM); + map.put(cold, COLD); + files = new ArrayList(); + for(Path dir : map.keySet()) { + for(int i = 0; i < filesPerDir; i++) { + files.add(new Path(dir, "file" + i)); + } + } + } + + NamespaceScheme newNamespaceScheme() { + return new NamespaceScheme(Arrays.asList(hot, warm, cold), + files, BLOCK_SIZE/2, null, map); + } + + /** + * Move hot files to warm and cold, warm files to hot and cold, + * and cold files to hot and warm. + */ + void moveAround(DistributedFileSystem dfs) throws Exception { + for(Path srcDir : map.keySet()) { + int i = 0; + for(Path dstDir : map.keySet()) { + if (!srcDir.equals(dstDir)) { + final Path src = new Path(srcDir, "file" + i++); + final Path dst = new Path(dstDir, srcDir.getName() + "2" + dstDir.getName()); + LOG.info("rename " + src + " to " + dst); + dfs.rename(src, dst); + } + } + } + } + } + + /** + * Test directories with Hot, Warm and Cold polices. + */ + @Test + public void testHotWarmColdDirs() throws Exception { + PathPolicyMap pathPolicyMap = new PathPolicyMap(3); + NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); + ClusterScheme clusterScheme = new ClusterScheme(); MigrationTest test = new MigrationTest(clusterScheme, nsScheme); - runTest(test); + + test.runBasicTest(false); + + pathPolicyMap.moveAround(test.dfs); + test.migrate(); + test.verify(true); + test.shutdownCluster(); + } + + /** + * Test DISK is running out of spaces. + */ + @Test + public void testNoSpaceDisk() throws Exception { + final PathPolicyMap pathPolicyMap = new PathPolicyMap(0); + final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); + + final long diskCapacity = (3 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE; + final long archiveCapacity = 100*BLOCK_SIZE; + final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1, + diskCapacity, archiveCapacity); + final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, + NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities); + final MigrationTest test = new MigrationTest(clusterScheme, nsScheme); + + test.runBasicTest(false); + + // create hot files with replication 3 until not more spaces. + final short replication = 3; + { + int hotFileCount = 0; + try { + for(; ; hotFileCount++) { + final Path p = new Path(pathPolicyMap.hot, "file" + hotFileCount); + DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); + } + } catch(IOException e) { + LOG.info("Expected: hotFileCount=" + hotFileCount, e); + } + Assert.assertTrue(hotFileCount >= 2); + } + + // create hot files with replication 1 to use up all remaining spaces. + { + int hotFileCount_r1 = 0; + try { + for(; ; hotFileCount_r1++) { + final Path p = new Path(pathPolicyMap.hot, "file_r1_" + hotFileCount_r1); + DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)1, 0L); + } + } catch(IOException e) { + LOG.info("Expected: hotFileCount_r1=" + hotFileCount_r1, e); + } + } + + { // test increasing replication. Since DISK is full, + // new replicas should be stored in ARCHIVE as a fallback storage. + final Path file0 = new Path(pathPolicyMap.hot, "file0"); + final Replication r = test.getReplication(file0); + LOG.info("XXX " + file0 + ": replication=" + r); + final short newReplication = (short)5; + test.dfs.setReplication(file0, newReplication); +// DFSTestUtil.waitReplication(test.dfs, file0, newReplication); + Thread.sleep(10000); + test.verifyReplication(file0, r.disk, newReplication - r.disk); + } + + { // test creating a cold file and then increase replication + final Path p = new Path(pathPolicyMap.cold, "foo"); + DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); + test.verifyReplication(p, 0, replication); + + final short newReplication = 5; + test.dfs.setReplication(p, newReplication); +// DFSTestUtil.waitReplication(test.dfs, p, newReplication); + Thread.sleep(10000); + test.verifyReplication(p, 0, newReplication); + } + + { //test move a hot file to warm + //TODO: fix Mover not terminate in the test below +// final Path file1 = new Path(pathPolicyMap.hot, "file1"); +// test.dfs.rename(file1, pathPolicyMap.warm); +// test.migrate(); +// test.verify(true); + } + + test.shutdownCluster(); + } + + /** + * Test ARCHIVE is running out of spaces. + */ + @Test + public void testNoSpaceArchive() throws Exception { + final PathPolicyMap pathPolicyMap = new PathPolicyMap(0); + final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); + + final long diskCapacity = 100*BLOCK_SIZE; + final long archiveCapacity = (2 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE; + final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1, + diskCapacity, archiveCapacity); + final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, + NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities); + final MigrationTest test = new MigrationTest(clusterScheme, nsScheme); + + test.runBasicTest(false); + + // create cold files with replication 3 until not more spaces. + final short replication = 3; + { + int coldFileCount = 0; + try { + for(; ; coldFileCount++) { + final Path p = new Path(pathPolicyMap.cold, "file" + coldFileCount); + DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); + } + } catch(IOException e) { + LOG.info("Expected: coldFileCount=" + coldFileCount, e); + } + Assert.assertTrue(coldFileCount >= 2); + } + + // create cold files with replication 1 to use up all remaining spaces. + { + int coldFileCount_r1 = 0; + try { + for(; ; coldFileCount_r1++) { + final Path p = new Path(pathPolicyMap.cold, "file_r1_" + coldFileCount_r1); + DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)1, 0L); + } + } catch(IOException e) { + LOG.info("Expected: coldFileCount_r1=" + coldFileCount_r1, e); + } + } + + { // test increasing replication but new replicas cannot be created + // since no more ARCHIVE space. + final Path file0 = new Path(pathPolicyMap.cold, "file0"); + final Replication r = test.getReplication(file0); + LOG.info("XXX " + file0 + ": replication=" + r); + Assert.assertEquals(0, r.disk); + + final short newReplication = (short)5; + test.dfs.setReplication(file0, newReplication); + Thread.sleep(10000); + + test.verifyReplication(file0, 0, r.archive); + } + + { // test creating a hot file + final Path p = new Path(pathPolicyMap.hot, "foo"); + DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)3, 0L); + } + + { //test move a cold file to warm + final Path file1 = new Path(pathPolicyMap.hot, "file1"); + test.dfs.rename(file1, pathPolicyMap.warm); + test.migrate(); + test.verify(true); + } + + test.shutdownCluster(); } }