diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java index c11d4e6aabe..99cae9aed29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs; import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; @@ -31,18 +32,34 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceAudience.Public @InterfaceStability.Unstable public enum StorageType { - DISK, - SSD, - ARCHIVE, - RAM_DISK; + DISK(false), + SSD(false), + ARCHIVE(false), + RAM_DISK(true); + + private final boolean isTransient; public static final StorageType DEFAULT = DISK; - + public static final StorageType[] EMPTY_ARRAY = {}; - + private static final StorageType[] VALUES = values(); - + + StorageType(boolean isTransient) { this.isTransient = isTransient; } + + public boolean isMovable() { return isTransient == false; } + public static List asList() { return Arrays.asList(VALUES); } + + public static List getMovableTypes() { + List movableTypes = new ArrayList(); + for (StorageType t : VALUES) { + if ( t.isTransient == false ) { + movableTypes.add(t); + } + } + return movableTypes; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index d77da7cc1e3..87348b36096 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -272,7 +272,7 @@ public class Balancer { long overLoadedBytes = 0L, underLoadedBytes = 0L; for(DatanodeStorageReport r : reports) { final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); - for(StorageType t : StorageType.asList()) { + for(StorageType t : StorageType.getMovableTypes()) { final Double utilization = policy.getUtilization(r, t); if (utilization == null) { // datanode does not have such storage type continue; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 04133bd23cc..59814af5512 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -69,7 +69,7 @@ public class Mover { = new EnumMap>(StorageType.class); private StorageMap() { - for(StorageType t : StorageType.asList()) { + for(StorageType t : StorageType.getMovableTypes()) { targetStorageTypeMap.put(t, new LinkedList()); } } @@ -130,7 +130,7 @@ public class Mover { final List reports = dispatcher.init(); for(DatanodeStorageReport r : reports) { final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); - for(StorageType t : StorageType.asList()) { + for(StorageType t : StorageType.getMovableTypes()) { final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher); final long maxRemaining = getMaxRemaining(r, t); final StorageGroup target = maxRemaining > 0L ? dn.addTarget(t, @@ -348,7 +348,7 @@ public class Mover { LocatedBlock lb = lbs.get(i); final StorageTypeDiff diff = new StorageTypeDiff(types, lb.getStorageTypes()); - if (!diff.removeOverlap()) { + if (!diff.removeOverlap(true)) { if (scheduleMoves4Block(diff, lb)) { hasRemaining |= (diff.existing.size() > 1 && diff.expected.size() > 1); @@ -452,22 +452,38 @@ public class Mover { this.expected = new LinkedList(expected); this.existing = new LinkedList(Arrays.asList(existing)); } - + /** * Remove the overlap between the expected types and the existing types. - * @return if the existing types or the expected types is empty after + * @param ignoreNonMovable ignore non-movable storage types + * by removing them from both expected and existing storage type list + * to prevent non-movable storage from being moved. + * @returns if the existing types or the expected types is empty after * removing the overlap. */ - boolean removeOverlap() { + boolean removeOverlap(boolean ignoreNonMovable) { for(Iterator i = existing.iterator(); i.hasNext(); ) { final StorageType t = i.next(); if (expected.remove(t)) { i.remove(); } } + if (ignoreNonMovable) { + removeNonMovable(existing); + removeNonMovable(expected); + } return expected.isEmpty() || existing.isEmpty(); } - + + void removeNonMovable(List types) { + for (Iterator i = types.iterator(); i.hasNext(); ) { + final StorageType t = i.next(); + if (!t.isMovable()) { + i.remove(); + } + } + } + @Override public String toString() { return getClass().getSimpleName() + "{expected=" + expected diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 820b812f20e..c728b2ba846 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1456,6 +1456,39 @@ public class DFSTestUtil { } } + /** + * Helper function that verified blocks of a file are placed on the + * expected storage type. + * + * @param fs The file system containing the the file. + * @param client The DFS client used to access the file + * @param path name to the file to verify + * @param storageType expected storage type + * @returns true if file exists and its blocks are located on the expected + * storage type. + * false otherwise. + */ + public static boolean verifyFileReplicasOnStorageType(FileSystem fs, + DFSClient client, Path path, StorageType storageType) throws IOException { + if (!fs.exists(path)) { + LOG.info("verifyFileReplicasOnStorageType: file " + path + "does not exist"); + return false; + } + long fileLength = client.getFileInfo(path.toString()).getLen(); + LocatedBlocks locatedBlocks = + client.getLocatedBlocks(path.toString(), 0, fileLength); + for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { + if (locatedBlock.getStorageTypes()[0] != storageType) { + LOG.info("verifyFileReplicasOnStorageType: for file " + path + + ". Expect blk" + locatedBlock + + " on Type: " + storageType + ". Actual Type: " + + locatedBlock.getStorageTypes()[0]); + return false; + } + } + return true; + } + /** * Helper function to create a key in the Key Provider. Defaults * to the first indexed NameNode's Key Provider. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 703471537b8..bd8e390e5a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1394,7 +1394,8 @@ public class MiniDFSCluster { // Set up datanode address setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig); if (manageDfsDirs) { - String dirs = makeDataNodeDirs(i, storageTypes == null ? null : storageTypes[i]); + String dirs = makeDataNodeDirs(i, storageTypes == null ? + null : storageTypes[i - curDatanodesNum]); dnConf.set(DFS_DATANODE_DATA_DIR_KEY, dirs); conf.set(DFS_DATANODE_DATA_DIR_KEY, dirs); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 1df31acb6bf..831e2c639ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.server.balancer; +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.apache.hadoop.hdfs.StorageType.DEFAULT; +import static org.apache.hadoop.hdfs.StorageType.RAM_DISK; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -41,12 +44,7 @@ import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -57,6 +55,7 @@ import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; import org.apache.log4j.Level; @@ -86,6 +85,7 @@ public class TestBalancer { static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5% static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta static final int DEFAULT_BLOCK_SIZE = 100; + static final int DEFAULT_RAM_DISK_BLOCK_SIZE = 5 * 1024 * 1024; private static final Random r = new Random(); static { @@ -108,6 +108,15 @@ public class TestBalancer { conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); } + static void initConfWithRamDisk(Configuration conf) { + conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE); + conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3); + conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); + conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1); + conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS, 1); + } + /* create a file with a length of fileLen */ static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen, short replicationFactor, int nnIndex) @@ -1098,6 +1107,81 @@ public class TestBalancer { CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true); } + /* + * Test Balancer with Ram_Disk configured + * One DN has two files on RAM_DISK, other DN has no files on RAM_DISK. + * Then verify that the balancer does not migrate files on RAM_DISK across DN. + */ + @Test(timeout=300000) + public void testBalancerWithRamDisk() throws Exception { + final int SEED = 0xFADED; + final short REPL_FACT = 1; + Configuration conf = new Configuration(); + initConfWithRamDisk(conf); + + final int defaultRamDiskCapacity = 10; + final int defaultDiskCapacity = 100; + final long ramDiskStorageLimit = + ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) + + (DEFAULT_RAM_DISK_BLOCK_SIZE - 1); + final long diskStorageLimit = + ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) + + (DEFAULT_RAM_DISK_BLOCK_SIZE - 1); + + cluster = new MiniDFSCluster + .Builder(conf) + .numDataNodes(1) + .storageCapacities(new long[] { ramDiskStorageLimit, diskStorageLimit }) + .storageTypes(new StorageType[] { RAM_DISK, DEFAULT }) + .build(); + + try { + cluster.waitActive(); + // Create few files on RAM_DISK + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + final Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); + + DistributedFileSystem fs = cluster.getFileSystem(); + DFSClient client = fs.getClient(); + DFSTestUtil.createFile(fs, path1, true, + DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE, + DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true); + DFSTestUtil.createFile(fs, path1, true, + DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE, + DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true); + + // Sleep for a short time to allow the lazy writer thread to do its job + Thread.sleep(6 * 1000); + + // Add another fresh DN with the same type/capacity without files on RAM_DISK + StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}}; + long[][] storageCapacities = new long[][]{{ramDiskStorageLimit, diskStorageLimit}}; + cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null, + null, null, storageCapacities, null, false, false, false, null); + + cluster.triggerHeartbeats(); + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + + // Run Balancer + Balancer.Parameters p = new Balancer.Parameters( + Parameters.DEFAULT.policy, + Parameters.DEFAULT.threshold, + Parameters.DEFAULT.nodesToBeExcluded, + Parameters.DEFAULT.nodesToBeIncluded); + final int r = Balancer.run(namenodes, p, conf); + + // Validate no RAM_DISK block should be moved + assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); + + // Verify files are still on RAM_DISK + DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK); + DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK); + } finally { + cluster.shutdown(); + } + } + /** * @param args */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java index 2484c43a390..b690165f6bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -67,6 +68,8 @@ import org.junit.Test; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC; + /** * Test the data migration tool (for Archival Storage) */ @@ -326,10 +329,10 @@ public class TestStorageMover { Assert.assertTrue(fileStatus.getFullName(parent.toString()) + " with policy " + policy + " has non-empty overlap: " + diff + ", the corresponding block is " + lb.getBlock().getLocalBlock(), - diff.removeOverlap()); + diff.removeOverlap(true)); } } - + Replication getReplication(Path file) throws IOException { return getOrVerifyReplication(file, null); } @@ -397,17 +400,29 @@ public class TestStorageMover { } private static StorageType[][] genStorageTypes(int numDataNodes) { - return genStorageTypes(numDataNodes, 0, 0); + return genStorageTypes(numDataNodes, 0, 0, 0); } private static StorageType[][] genStorageTypes(int numDataNodes, int numAllDisk, int numAllArchive) { + return genStorageTypes(numDataNodes, numAllDisk, numAllArchive, 0); + } + + private static StorageType[][] genStorageTypes(int numDataNodes, + int numAllDisk, int numAllArchive, int numRamDisk) { + Preconditions.checkArgument( + (numAllDisk + numAllArchive + numRamDisk) <= numDataNodes); + StorageType[][] types = new StorageType[numDataNodes][]; int i = 0; - for (; i < numAllDisk; i++) { + for (; i < numRamDisk; i++) + { + types[i] = new StorageType[]{StorageType.RAM_DISK, StorageType.DISK}; + } + for (; i < numRamDisk + numAllDisk; i++) { types[i] = new StorageType[]{StorageType.DISK, StorageType.DISK}; } - for (; i < numAllDisk + numAllArchive; i++) { + for (; i < numRamDisk + numAllDisk + numAllArchive; i++) { types[i] = new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE}; } for (; i < types.length; i++) { @@ -416,6 +431,26 @@ public class TestStorageMover { return types; } + private static long[][] genCapacities(int nDatanodes, int numAllDisk, + int numAllArchive, int numRamDisk, long diskCapacity, + long archiveCapacity, long ramDiskCapacity) { + final long[][] capacities = new long[nDatanodes][]; + int i = 0; + for (; i < numRamDisk; i++) { + capacities[i] = new long[]{ramDiskCapacity, diskCapacity}; + } + for (; i < numRamDisk + numAllDisk; i++) { + capacities[i] = new long[]{diskCapacity, diskCapacity}; + } + for (; i < numRamDisk + numAllDisk + numAllArchive; i++) { + capacities[i] = new long[]{archiveCapacity, archiveCapacity}; + } + for(; i < capacities.length; i++) { + capacities[i] = new long[]{diskCapacity, archiveCapacity}; + } + return capacities; + } + private static class PathPolicyMap { final Map map = Maps.newHashMap(); final Path hot = new Path("/hot"); @@ -748,4 +783,56 @@ public class TestStorageMover { test.shutdownCluster(); } } + + /** + * Test blocks of lazy_persist file on RAM_DISK will not be moved to other + * storage types by the Storage Mover. + */ + @Test + public void testRamDiskNotMoved() throws Exception { + LOG.info("testRamDiskNotMoved"); + final PathPolicyMap pathPolicyMap = new PathPolicyMap(0); + final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); + + final long diskCapacity = 100 * BLOCK_SIZE; + final long archiveCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE) + * BLOCK_SIZE; + final long ramDiskCapacity = 10 * BLOCK_SIZE; + final long[][] capacities = genCapacities(1, 0, 0, 1, + diskCapacity, archiveCapacity, ramDiskCapacity); + final int LAZY_WRITER_INTERVAL_SEC = 1; + final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, + 1, (short)1, genStorageTypes(1, 0, 0, 1), capacities); + clusterScheme.conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, + LAZY_WRITER_INTERVAL_SEC); + final MigrationTest test = new MigrationTest(clusterScheme, nsScheme); + + try { + test.runBasicTest(false); + + // test creating a hot RAM_DISK file + final int SEED = 0xFADED; + final Path foo_hot = new Path(pathPolicyMap.hot, "foo_hot"); + DFSTestUtil.createFile(test.dfs, foo_hot, true, BLOCK_SIZE, BLOCK_SIZE, + BLOCK_SIZE, (short) 1, SEED, true); + Assert.assertTrue(DFSTestUtil.verifyFileReplicasOnStorageType(test.dfs, + test.dfs.getClient(), foo_hot, StorageType.RAM_DISK)); + + // Sleep for a short time to allow the lazy writer thread to do its job + Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000); + + // Verify policy related name change is allowed + final Path foo_hot_new = new Path(pathPolicyMap.warm, "foo_hot"); + test.dfs.rename(foo_hot, pathPolicyMap.warm); + Assert.assertTrue(test.dfs.exists(foo_hot_new)); + + // Verify blocks on ram disk will not be moved to other storage types by + // policy based Storage Mover. + test.migrate(); + Assert.assertTrue(DFSTestUtil.verifyFileReplicasOnStorageType(test.dfs, + test.dfs.getClient(), foo_hot_new, StorageType.RAM_DISK)); + } finally { + test.shutdownCluster(); + } + } }