HDFS-6932. Balancer and Mover tools should ignore replicas on RAM_DISK. (Contributed by Xiaoyu Yao)
This commit is contained in:
parent
feda4733a8
commit
e8e7fbe81a
|
@ -74,3 +74,7 @@
|
||||||
HDFS-7143. Fix findbugs warnings in HDFS-6581 branch. (szetszwo via
|
HDFS-7143. Fix findbugs warnings in HDFS-6581 branch. (szetszwo via
|
||||||
Arpit Agarwal)
|
Arpit Agarwal)
|
||||||
|
|
||||||
|
HDFS-6932. Balancer and Mover tools should ignore replicas on RAM_DISK.
|
||||||
|
(Xiaoyu Yao via Arpit Agarwal)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -31,10 +32,12 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public enum StorageType {
|
public enum StorageType {
|
||||||
DISK,
|
DISK(false),
|
||||||
SSD,
|
SSD(false),
|
||||||
ARCHIVE,
|
ARCHIVE(false),
|
||||||
RAM_DISK;
|
RAM_DISK(true);
|
||||||
|
|
||||||
|
private final boolean isTransient;
|
||||||
|
|
||||||
public static final StorageType DEFAULT = DISK;
|
public static final StorageType DEFAULT = DISK;
|
||||||
|
|
||||||
|
@ -42,7 +45,21 @@ public enum StorageType {
|
||||||
|
|
||||||
private static final StorageType[] VALUES = values();
|
private static final StorageType[] VALUES = values();
|
||||||
|
|
||||||
|
StorageType(boolean isTransient) { this.isTransient = isTransient; }
|
||||||
|
|
||||||
|
public boolean isMovable() { return isTransient == false; }
|
||||||
|
|
||||||
public static List<StorageType> asList() {
|
public static List<StorageType> asList() {
|
||||||
return Arrays.asList(VALUES);
|
return Arrays.asList(VALUES);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static List<StorageType> getMovableTypes() {
|
||||||
|
List<StorageType> movableTypes = new ArrayList<StorageType>();
|
||||||
|
for (StorageType t : VALUES) {
|
||||||
|
if ( t.isTransient == false ) {
|
||||||
|
movableTypes.add(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return movableTypes;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -271,7 +271,7 @@ public class Balancer {
|
||||||
long overLoadedBytes = 0L, underLoadedBytes = 0L;
|
long overLoadedBytes = 0L, underLoadedBytes = 0L;
|
||||||
for(DatanodeStorageReport r : reports) {
|
for(DatanodeStorageReport r : reports) {
|
||||||
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
|
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
|
||||||
for(StorageType t : StorageType.asList()) {
|
for(StorageType t : StorageType.getMovableTypes()) {
|
||||||
final Double utilization = policy.getUtilization(r, t);
|
final Double utilization = policy.getUtilization(r, t);
|
||||||
if (utilization == null) { // datanode does not have such storage type
|
if (utilization == null) { // datanode does not have such storage type
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -69,7 +69,7 @@ public class Mover {
|
||||||
= new EnumMap<StorageType, List<StorageGroup>>(StorageType.class);
|
= new EnumMap<StorageType, List<StorageGroup>>(StorageType.class);
|
||||||
|
|
||||||
private StorageMap() {
|
private StorageMap() {
|
||||||
for(StorageType t : StorageType.asList()) {
|
for(StorageType t : StorageType.getMovableTypes()) {
|
||||||
targetStorageTypeMap.put(t, new LinkedList<StorageGroup>());
|
targetStorageTypeMap.put(t, new LinkedList<StorageGroup>());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -130,7 +130,7 @@ public class Mover {
|
||||||
final List<DatanodeStorageReport> reports = dispatcher.init();
|
final List<DatanodeStorageReport> reports = dispatcher.init();
|
||||||
for(DatanodeStorageReport r : reports) {
|
for(DatanodeStorageReport r : reports) {
|
||||||
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
|
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
|
||||||
for(StorageType t : StorageType.asList()) {
|
for(StorageType t : StorageType.getMovableTypes()) {
|
||||||
final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher);
|
final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher);
|
||||||
final long maxRemaining = getMaxRemaining(r, t);
|
final long maxRemaining = getMaxRemaining(r, t);
|
||||||
final StorageGroup target = maxRemaining > 0L ? dn.addTarget(t,
|
final StorageGroup target = maxRemaining > 0L ? dn.addTarget(t,
|
||||||
|
@ -354,7 +354,7 @@ public class Mover {
|
||||||
LocatedBlock lb = lbs.get(i);
|
LocatedBlock lb = lbs.get(i);
|
||||||
final StorageTypeDiff diff = new StorageTypeDiff(types,
|
final StorageTypeDiff diff = new StorageTypeDiff(types,
|
||||||
lb.getStorageTypes());
|
lb.getStorageTypes());
|
||||||
if (!diff.removeOverlap()) {
|
if (!diff.removeOverlap(true)) {
|
||||||
if (scheduleMoves4Block(diff, lb)) {
|
if (scheduleMoves4Block(diff, lb)) {
|
||||||
hasRemaining |= (diff.existing.size() > 1 &&
|
hasRemaining |= (diff.existing.size() > 1 &&
|
||||||
diff.expected.size() > 1);
|
diff.expected.size() > 1);
|
||||||
|
@ -461,19 +461,35 @@ public class Mover {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove the overlap between the expected types and the existing types.
|
* Remove the overlap between the expected types and the existing types.
|
||||||
* @return if the existing types or the expected types is empty after
|
* @param ignoreNonMovable ignore non-movable storage types
|
||||||
|
* by removing them from both expected and existing storage type list
|
||||||
|
* to prevent non-movable storage from being moved.
|
||||||
|
* @returns if the existing types or the expected types is empty after
|
||||||
* removing the overlap.
|
* removing the overlap.
|
||||||
*/
|
*/
|
||||||
boolean removeOverlap() {
|
boolean removeOverlap(boolean ignoreNonMovable) {
|
||||||
for(Iterator<StorageType> i = existing.iterator(); i.hasNext(); ) {
|
for(Iterator<StorageType> i = existing.iterator(); i.hasNext(); ) {
|
||||||
final StorageType t = i.next();
|
final StorageType t = i.next();
|
||||||
if (expected.remove(t)) {
|
if (expected.remove(t)) {
|
||||||
i.remove();
|
i.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (ignoreNonMovable) {
|
||||||
|
removeNonMovable(existing);
|
||||||
|
removeNonMovable(expected);
|
||||||
|
}
|
||||||
return expected.isEmpty() || existing.isEmpty();
|
return expected.isEmpty() || existing.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void removeNonMovable(List<StorageType> types) {
|
||||||
|
for (Iterator<StorageType> i = types.iterator(); i.hasNext(); ) {
|
||||||
|
final StorageType t = i.next();
|
||||||
|
if (!t.isMovable()) {
|
||||||
|
i.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getClass().getSimpleName() + "{expected=" + expected
|
return getClass().getSimpleName() + "{expected=" + expected
|
||||||
|
|
|
@ -1431,6 +1431,39 @@ public class DFSTestUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper function that verified blocks of a file are placed on the
|
||||||
|
* expected storage type.
|
||||||
|
*
|
||||||
|
* @param fs The file system containing the the file.
|
||||||
|
* @param client The DFS client used to access the file
|
||||||
|
* @param path name to the file to verify
|
||||||
|
* @param storageType expected storage type
|
||||||
|
* @returns true if file exists and its blocks are located on the expected
|
||||||
|
* storage type.
|
||||||
|
* false otherwise.
|
||||||
|
*/
|
||||||
|
public static boolean verifyFileReplicasOnStorageType(FileSystem fs,
|
||||||
|
DFSClient client, Path path, StorageType storageType) throws IOException {
|
||||||
|
if (!fs.exists(path)) {
|
||||||
|
LOG.info("verifyFileReplicasOnStorageType: file " + path + "does not exist");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
long fileLength = client.getFileInfo(path.toString()).getLen();
|
||||||
|
LocatedBlocks locatedBlocks =
|
||||||
|
client.getLocatedBlocks(path.toString(), 0, fileLength);
|
||||||
|
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
|
||||||
|
if (locatedBlock.getStorageTypes()[0] != storageType) {
|
||||||
|
LOG.info("verifyFileReplicasOnStorageType: for file " + path +
|
||||||
|
". Expect blk" + locatedBlock +
|
||||||
|
" on Type: " + storageType + ". Actual Type: " +
|
||||||
|
locatedBlock.getStorageTypes()[0]);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper function to create a key in the Key Provider. Defaults
|
* Helper function to create a key in the Key Provider. Defaults
|
||||||
* to the first indexed NameNode's Key Provider.
|
* to the first indexed NameNode's Key Provider.
|
||||||
|
|
|
@ -1391,7 +1391,8 @@ public class MiniDFSCluster {
|
||||||
// Set up datanode address
|
// Set up datanode address
|
||||||
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
|
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
|
||||||
if (manageDfsDirs) {
|
if (manageDfsDirs) {
|
||||||
String dirs = makeDataNodeDirs(i, storageTypes == null ? null : storageTypes[i]);
|
String dirs = makeDataNodeDirs(i, storageTypes == null ?
|
||||||
|
null : storageTypes[i - curDatanodesNum]);
|
||||||
dnConf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
|
dnConf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
|
||||||
conf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
|
conf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.balancer;
|
package org.apache.hadoop.hdfs.server.balancer;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||||
|
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
@ -41,12 +44,7 @@ import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.*;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
||||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -57,6 +55,7 @@ import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
|
import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
@ -86,6 +85,7 @@ public class TestBalancer {
|
||||||
static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5%
|
static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5%
|
||||||
static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta
|
static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta
|
||||||
static final int DEFAULT_BLOCK_SIZE = 100;
|
static final int DEFAULT_BLOCK_SIZE = 100;
|
||||||
|
static final int DEFAULT_RAM_DISK_BLOCK_SIZE = 5 * 1024 * 1024;
|
||||||
private static final Random r = new Random();
|
private static final Random r = new Random();
|
||||||
|
|
||||||
static {
|
static {
|
||||||
|
@ -108,6 +108,15 @@ public class TestBalancer {
|
||||||
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void initConfWithRamDisk(Configuration conf) {
|
||||||
|
conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE);
|
||||||
|
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3);
|
||||||
|
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||||
|
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
|
||||||
|
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
|
||||||
|
conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS, 1);
|
||||||
|
}
|
||||||
|
|
||||||
/* create a file with a length of <code>fileLen</code> */
|
/* create a file with a length of <code>fileLen</code> */
|
||||||
static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
|
static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
|
||||||
short replicationFactor, int nnIndex)
|
short replicationFactor, int nnIndex)
|
||||||
|
@ -1096,6 +1105,81 @@ public class TestBalancer {
|
||||||
CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
|
CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Test Balancer with Ram_Disk configured
|
||||||
|
* One DN has two files on RAM_DISK, other DN has no files on RAM_DISK.
|
||||||
|
* Then verify that the balancer does not migrate files on RAM_DISK across DN.
|
||||||
|
*/
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testBalancerWithRamDisk() throws Exception {
|
||||||
|
final int SEED = 0xFADED;
|
||||||
|
final short REPL_FACT = 1;
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
initConfWithRamDisk(conf);
|
||||||
|
|
||||||
|
final int defaultRamDiskCapacity = 10;
|
||||||
|
final int defaultDiskCapacity = 100;
|
||||||
|
final long ramDiskStorageLimit =
|
||||||
|
((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
|
||||||
|
(DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
|
||||||
|
final long diskStorageLimit =
|
||||||
|
((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
|
||||||
|
(DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
|
||||||
|
|
||||||
|
cluster = new MiniDFSCluster
|
||||||
|
.Builder(conf)
|
||||||
|
.numDataNodes(1)
|
||||||
|
.storageCapacities(new long[] { ramDiskStorageLimit, diskStorageLimit })
|
||||||
|
.storageTypes(new StorageType[] { RAM_DISK, DEFAULT })
|
||||||
|
.build();
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
// Create few files on RAM_DISK
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
||||||
|
final Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
|
||||||
|
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
DFSClient client = fs.getClient();
|
||||||
|
DFSTestUtil.createFile(fs, path1, true,
|
||||||
|
DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
|
||||||
|
DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
|
||||||
|
DFSTestUtil.createFile(fs, path1, true,
|
||||||
|
DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
|
||||||
|
DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
|
||||||
|
|
||||||
|
// Sleep for a short time to allow the lazy writer thread to do its job
|
||||||
|
Thread.sleep(6 * 1000);
|
||||||
|
|
||||||
|
// Add another fresh DN with the same type/capacity without files on RAM_DISK
|
||||||
|
StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}};
|
||||||
|
long[][] storageCapacities = new long[][]{{ramDiskStorageLimit, diskStorageLimit}};
|
||||||
|
cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null,
|
||||||
|
null, null, storageCapacities, null, false, false, false, null);
|
||||||
|
|
||||||
|
cluster.triggerHeartbeats();
|
||||||
|
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
||||||
|
|
||||||
|
// Run Balancer
|
||||||
|
Balancer.Parameters p = new Balancer.Parameters(
|
||||||
|
Parameters.DEFAULT.policy,
|
||||||
|
Parameters.DEFAULT.threshold,
|
||||||
|
Parameters.DEFAULT.nodesToBeExcluded,
|
||||||
|
Parameters.DEFAULT.nodesToBeIncluded);
|
||||||
|
final int r = Balancer.run(namenodes, p, conf);
|
||||||
|
|
||||||
|
// Validate no RAM_DISK block should be moved
|
||||||
|
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
|
||||||
|
|
||||||
|
// Verify files are still on RAM_DISK
|
||||||
|
DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK);
|
||||||
|
DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param args
|
* @param args
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.ReconfigurationException;
|
import org.apache.hadoop.conf.ReconfigurationException;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
@ -70,6 +71,8 @@ import org.junit.Test;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test the data migration tool (for Archival Storage)
|
* Test the data migration tool (for Archival Storage)
|
||||||
*/
|
*/
|
||||||
|
@ -340,7 +343,7 @@ public class TestStorageMover {
|
||||||
Assert.assertTrue(fileStatus.getFullName(parent.toString())
|
Assert.assertTrue(fileStatus.getFullName(parent.toString())
|
||||||
+ " with policy " + policy + " has non-empty overlap: " + diff
|
+ " with policy " + policy + " has non-empty overlap: " + diff
|
||||||
+ ", the corresponding block is " + lb.getBlock().getLocalBlock(),
|
+ ", the corresponding block is " + lb.getBlock().getLocalBlock(),
|
||||||
diff.removeOverlap());
|
diff.removeOverlap(true));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -411,17 +414,29 @@ public class TestStorageMover {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static StorageType[][] genStorageTypes(int numDataNodes) {
|
private static StorageType[][] genStorageTypes(int numDataNodes) {
|
||||||
return genStorageTypes(numDataNodes, 0, 0);
|
return genStorageTypes(numDataNodes, 0, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static StorageType[][] genStorageTypes(int numDataNodes,
|
private static StorageType[][] genStorageTypes(int numDataNodes,
|
||||||
int numAllDisk, int numAllArchive) {
|
int numAllDisk, int numAllArchive) {
|
||||||
|
return genStorageTypes(numDataNodes, numAllDisk, numAllArchive, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StorageType[][] genStorageTypes(int numDataNodes,
|
||||||
|
int numAllDisk, int numAllArchive, int numRamDisk) {
|
||||||
|
Preconditions.checkArgument(
|
||||||
|
(numAllDisk + numAllArchive + numRamDisk) <= numDataNodes);
|
||||||
|
|
||||||
StorageType[][] types = new StorageType[numDataNodes][];
|
StorageType[][] types = new StorageType[numDataNodes][];
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (; i < numAllDisk; i++) {
|
for (; i < numRamDisk; i++)
|
||||||
|
{
|
||||||
|
types[i] = new StorageType[]{StorageType.RAM_DISK, StorageType.DISK};
|
||||||
|
}
|
||||||
|
for (; i < numRamDisk + numAllDisk; i++) {
|
||||||
types[i] = new StorageType[]{StorageType.DISK, StorageType.DISK};
|
types[i] = new StorageType[]{StorageType.DISK, StorageType.DISK};
|
||||||
}
|
}
|
||||||
for (; i < numAllDisk + numAllArchive; i++) {
|
for (; i < numRamDisk + numAllDisk + numAllArchive; i++) {
|
||||||
types[i] = new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE};
|
types[i] = new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE};
|
||||||
}
|
}
|
||||||
for (; i < types.length; i++) {
|
for (; i < types.length; i++) {
|
||||||
|
@ -431,13 +446,17 @@ public class TestStorageMover {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long[][] genCapacities(int nDatanodes, int numAllDisk,
|
private static long[][] genCapacities(int nDatanodes, int numAllDisk,
|
||||||
int numAllArchive, long diskCapacity, long archiveCapacity) {
|
int numAllArchive, int numRamDisk, long diskCapacity,
|
||||||
|
long archiveCapacity, long ramDiskCapacity) {
|
||||||
final long[][] capacities = new long[nDatanodes][];
|
final long[][] capacities = new long[nDatanodes][];
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (; i < numAllDisk; i++) {
|
for (; i < numRamDisk; i++) {
|
||||||
|
capacities[i] = new long[]{ramDiskCapacity, diskCapacity};
|
||||||
|
}
|
||||||
|
for (; i < numRamDisk + numAllDisk; i++) {
|
||||||
capacities[i] = new long[]{diskCapacity, diskCapacity};
|
capacities[i] = new long[]{diskCapacity, diskCapacity};
|
||||||
}
|
}
|
||||||
for (; i < numAllDisk + numAllArchive; i++) {
|
for (; i < numRamDisk + numAllDisk + numAllArchive; i++) {
|
||||||
capacities[i] = new long[]{archiveCapacity, archiveCapacity};
|
capacities[i] = new long[]{archiveCapacity, archiveCapacity};
|
||||||
}
|
}
|
||||||
for(; i < capacities.length; i++) {
|
for(; i < capacities.length; i++) {
|
||||||
|
@ -742,4 +761,56 @@ public class TestStorageMover {
|
||||||
test.shutdownCluster();
|
test.shutdownCluster();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test blocks of lazy_persist file on RAM_DISK will not be moved to other
|
||||||
|
* storage types by the Storage Mover.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRamDiskNotMoved() throws Exception {
|
||||||
|
LOG.info("testRamDiskNotMoved");
|
||||||
|
final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
|
||||||
|
final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
|
||||||
|
|
||||||
|
final long diskCapacity = 100 * BLOCK_SIZE;
|
||||||
|
final long archiveCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)
|
||||||
|
* BLOCK_SIZE;
|
||||||
|
final long ramDiskCapacity = 10 * BLOCK_SIZE;
|
||||||
|
final long[][] capacities = genCapacities(1, 0, 0, 1,
|
||||||
|
diskCapacity, archiveCapacity, ramDiskCapacity);
|
||||||
|
final int LAZY_WRITER_INTERVAL_SEC = 1;
|
||||||
|
final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
|
||||||
|
1, (short)1, genStorageTypes(1, 0, 0, 1), capacities);
|
||||||
|
clusterScheme.conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
||||||
|
LAZY_WRITER_INTERVAL_SEC);
|
||||||
|
final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
|
||||||
|
|
||||||
|
try {
|
||||||
|
test.runBasicTest(false);
|
||||||
|
|
||||||
|
// test creating a hot RAM_DISK file
|
||||||
|
final int SEED = 0xFADED;
|
||||||
|
final Path foo_hot = new Path(pathPolicyMap.hot, "foo_hot");
|
||||||
|
DFSTestUtil.createFile(test.dfs, foo_hot, true, BLOCK_SIZE, BLOCK_SIZE,
|
||||||
|
BLOCK_SIZE, (short) 1, SEED, true);
|
||||||
|
Assert.assertTrue(DFSTestUtil.verifyFileReplicasOnStorageType(test.dfs,
|
||||||
|
test.dfs.getClient(), foo_hot, StorageType.RAM_DISK));
|
||||||
|
|
||||||
|
// Sleep for a short time to allow the lazy writer thread to do its job
|
||||||
|
Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
|
||||||
|
|
||||||
|
// Verify policy related name change is allowed
|
||||||
|
final Path foo_hot_new = new Path(pathPolicyMap.warm, "foo_hot");
|
||||||
|
test.dfs.rename(foo_hot, pathPolicyMap.warm);
|
||||||
|
Assert.assertTrue(test.dfs.exists(foo_hot_new));
|
||||||
|
|
||||||
|
// Verify blocks on ram disk will not be moved to other storage types by
|
||||||
|
// policy based Storage Mover.
|
||||||
|
test.migrate();
|
||||||
|
Assert.assertTrue(DFSTestUtil.verifyFileReplicasOnStorageType(test.dfs,
|
||||||
|
test.dfs.getClient(), foo_hot_new, StorageType.RAM_DISK));
|
||||||
|
} finally {
|
||||||
|
test.shutdownCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue