HDFS-7228. Add an SSD policy into the default BlockStoragePolicySuite. Contributed by Jing Zhao.
This commit is contained in:
parent
5faaba0bd0
commit
7dcad84143
|
@ -676,6 +676,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-7195. Update user doc of secure mode about Datanodes don't require root
|
HDFS-7195. Update user doc of secure mode about Datanodes don't require root
|
||||||
or jsvc. (cnauroth)
|
or jsvc. (cnauroth)
|
||||||
|
|
||||||
|
HDFS-7228. Add an SSD policy into the default BlockStoragePolicySuite.
|
||||||
|
(jing9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||||
|
|
|
@ -455,8 +455,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean initLazyPersist(HdfsFileStatus stat) {
|
private boolean initLazyPersist(HdfsFileStatus stat) {
|
||||||
final BlockStoragePolicy lpPolicy =
|
final BlockStoragePolicy lpPolicy = blockStoragePolicySuite
|
||||||
blockStoragePolicySuite.getPolicy("LAZY_PERSIST");
|
.getPolicy(HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
|
||||||
return lpPolicy != null &&
|
return lpPolicy != null &&
|
||||||
stat.getStoragePolicy() == lpPolicy.getId();
|
stat.getStoragePolicy() == lpPolicy.getId();
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,4 +164,18 @@ public class HdfsConstants {
|
||||||
|
|
||||||
public static final String SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR
|
public static final String SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR
|
||||||
= Path.SEPARATOR + DOT_SNAPSHOT_DIR + Path.SEPARATOR;
|
= Path.SEPARATOR + DOT_SNAPSHOT_DIR + Path.SEPARATOR;
|
||||||
|
|
||||||
|
public static final String MEMORY_STORAGE_POLICY_NAME = "LAZY_PERSIST";
|
||||||
|
public static final String ALLSSD_STORAGE_POLICY_NAME = "ALL_SSD";
|
||||||
|
public static final String ONESSD_STORAGE_POLICY_NAME = "ONE_SSD";
|
||||||
|
public static final String HOT_STORAGE_POLICY_NAME = "HOT";
|
||||||
|
public static final String WARM_STORAGE_POLICY_NAME = "WARM";
|
||||||
|
public static final String COLD_STORAGE_POLICY_NAME = "COLD";
|
||||||
|
|
||||||
|
public static final byte MEMORY_STORAGE_POLICY_ID = 15;
|
||||||
|
public static final byte ALLSSD_STORAGE_POLICY_ID = 12;
|
||||||
|
public static final byte ONESSD_STORAGE_POLICY_ID = 10;
|
||||||
|
public static final byte HOT_STORAGE_POLICY_ID = 7;
|
||||||
|
public static final byte WARM_STORAGE_POLICY_ID = 5;
|
||||||
|
public static final byte COLD_STORAGE_POLICY_ID = 2;
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.fs.XAttr;
|
||||||
import org.apache.hadoop.hdfs.StorageType;
|
import org.apache.hadoop.hdfs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -44,23 +45,39 @@ public class BlockStoragePolicySuite {
|
||||||
public static BlockStoragePolicySuite createDefaultSuite() {
|
public static BlockStoragePolicySuite createDefaultSuite() {
|
||||||
final BlockStoragePolicy[] policies =
|
final BlockStoragePolicy[] policies =
|
||||||
new BlockStoragePolicy[1 << ID_BIT_LENGTH];
|
new BlockStoragePolicy[1 << ID_BIT_LENGTH];
|
||||||
final byte lazyPersistId = 15;
|
final byte lazyPersistId = HdfsConstants.MEMORY_STORAGE_POLICY_ID;
|
||||||
policies[lazyPersistId] = new BlockStoragePolicy(lazyPersistId, "LAZY_PERSIST",
|
policies[lazyPersistId] = new BlockStoragePolicy(lazyPersistId,
|
||||||
|
HdfsConstants.MEMORY_STORAGE_POLICY_NAME,
|
||||||
new StorageType[]{StorageType.RAM_DISK, StorageType.DISK},
|
new StorageType[]{StorageType.RAM_DISK, StorageType.DISK},
|
||||||
new StorageType[]{StorageType.DISK},
|
new StorageType[]{StorageType.DISK},
|
||||||
new StorageType[]{StorageType.DISK},
|
new StorageType[]{StorageType.DISK},
|
||||||
true); // Cannot be changed on regular files, but inherited.
|
true); // Cannot be changed on regular files, but inherited.
|
||||||
final byte hotId = 12;
|
final byte allssdId = HdfsConstants.ALLSSD_STORAGE_POLICY_ID;
|
||||||
policies[hotId] = new BlockStoragePolicy(hotId, "HOT",
|
policies[allssdId] = new BlockStoragePolicy(allssdId,
|
||||||
|
HdfsConstants.ALLSSD_STORAGE_POLICY_NAME,
|
||||||
|
new StorageType[]{StorageType.SSD},
|
||||||
|
new StorageType[]{StorageType.DISK},
|
||||||
|
new StorageType[]{StorageType.DISK});
|
||||||
|
final byte onessdId = HdfsConstants.ONESSD_STORAGE_POLICY_ID;
|
||||||
|
policies[onessdId] = new BlockStoragePolicy(onessdId,
|
||||||
|
HdfsConstants.ONESSD_STORAGE_POLICY_NAME,
|
||||||
|
new StorageType[]{StorageType.SSD, StorageType.DISK},
|
||||||
|
new StorageType[]{StorageType.SSD, StorageType.DISK},
|
||||||
|
new StorageType[]{StorageType.SSD, StorageType.DISK});
|
||||||
|
final byte hotId = HdfsConstants.HOT_STORAGE_POLICY_ID;
|
||||||
|
policies[hotId] = new BlockStoragePolicy(hotId,
|
||||||
|
HdfsConstants.HOT_STORAGE_POLICY_NAME,
|
||||||
new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY,
|
new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY,
|
||||||
new StorageType[]{StorageType.ARCHIVE});
|
new StorageType[]{StorageType.ARCHIVE});
|
||||||
final byte warmId = 8;
|
final byte warmId = HdfsConstants.WARM_STORAGE_POLICY_ID;
|
||||||
policies[warmId] = new BlockStoragePolicy(warmId, "WARM",
|
policies[warmId] = new BlockStoragePolicy(warmId,
|
||||||
|
HdfsConstants.WARM_STORAGE_POLICY_NAME,
|
||||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
|
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
|
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE});
|
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE});
|
||||||
final byte coldId = 4;
|
final byte coldId = HdfsConstants.COLD_STORAGE_POLICY_ID;
|
||||||
policies[coldId] = new BlockStoragePolicy(coldId, "COLD",
|
policies[coldId] = new BlockStoragePolicy(coldId,
|
||||||
|
HdfsConstants.COLD_STORAGE_POLICY_NAME,
|
||||||
new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY,
|
new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY,
|
||||||
StorageType.EMPTY_ARRAY);
|
StorageType.EMPTY_ARRAY);
|
||||||
return new BlockStoragePolicySuite(hotId, policies);
|
return new BlockStoragePolicySuite(hotId, policies);
|
||||||
|
|
|
@ -1133,7 +1133,8 @@ public class DFSTestUtil {
|
||||||
// OP_CLOSE 9
|
// OP_CLOSE 9
|
||||||
s.close();
|
s.close();
|
||||||
// OP_SET_STORAGE_POLICY 45
|
// OP_SET_STORAGE_POLICY 45
|
||||||
filesystem.setStoragePolicy(pathFileCreate, "HOT");
|
filesystem.setStoragePolicy(pathFileCreate,
|
||||||
|
HdfsConstants.HOT_STORAGE_POLICY_NAME);
|
||||||
// OP_RENAME_OLD 1
|
// OP_RENAME_OLD 1
|
||||||
final Path pathFileMoved = new Path("/file_moved");
|
final Path pathFileMoved = new Path("/file_moved");
|
||||||
filesystem.rename(pathFileCreate, pathFileMoved);
|
filesystem.rename(pathFileCreate, pathFileMoved);
|
||||||
|
|
|
@ -66,10 +66,12 @@ public class TestBlockStoragePolicy {
|
||||||
static final long FILE_LEN = 1024;
|
static final long FILE_LEN = 1024;
|
||||||
static final short REPLICATION = 3;
|
static final short REPLICATION = 3;
|
||||||
|
|
||||||
static final byte COLD = (byte) 4;
|
static final byte COLD = HdfsConstants.COLD_STORAGE_POLICY_ID;
|
||||||
static final byte WARM = (byte) 8;
|
static final byte WARM = HdfsConstants.WARM_STORAGE_POLICY_ID;
|
||||||
static final byte HOT = (byte) 12;
|
static final byte HOT = HdfsConstants.HOT_STORAGE_POLICY_ID;
|
||||||
static final byte LAZY_PERSIST = (byte) 15;
|
static final byte ONESSD = HdfsConstants.ONESSD_STORAGE_POLICY_ID;
|
||||||
|
static final byte ALLSSD = HdfsConstants.ALLSSD_STORAGE_POLICY_ID;
|
||||||
|
static final byte LAZY_PERSIST = HdfsConstants.MEMORY_STORAGE_POLICY_ID;
|
||||||
|
|
||||||
@Test (timeout=300000)
|
@Test (timeout=300000)
|
||||||
public void testConfigKeyEnabled() throws IOException {
|
public void testConfigKeyEnabled() throws IOException {
|
||||||
|
@ -79,7 +81,8 @@ public class TestBlockStoragePolicy {
|
||||||
.numDataNodes(1).build();
|
.numDataNodes(1).build();
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
cluster.getFileSystem().setStoragePolicy(new Path("/"), "COLD");
|
cluster.getFileSystem().setStoragePolicy(new Path("/"),
|
||||||
|
HdfsConstants.COLD_STORAGE_POLICY_NAME);
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -98,7 +101,8 @@ public class TestBlockStoragePolicy {
|
||||||
.numDataNodes(1).build();
|
.numDataNodes(1).build();
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
cluster.getFileSystem().setStoragePolicy(new Path("/"), "COLD");
|
cluster.getFileSystem().setStoragePolicy(new Path("/"),
|
||||||
|
HdfsConstants.COLD_STORAGE_POLICY_NAME);
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -108,17 +112,25 @@ public class TestBlockStoragePolicy {
|
||||||
public void testDefaultPolicies() {
|
public void testDefaultPolicies() {
|
||||||
final Map<Byte, String> expectedPolicyStrings = new HashMap<Byte, String>();
|
final Map<Byte, String> expectedPolicyStrings = new HashMap<Byte, String>();
|
||||||
expectedPolicyStrings.put(COLD,
|
expectedPolicyStrings.put(COLD,
|
||||||
"BlockStoragePolicy{COLD:4, storageTypes=[ARCHIVE], " +
|
"BlockStoragePolicy{COLD:" + COLD + ", storageTypes=[ARCHIVE], " +
|
||||||
"creationFallbacks=[], replicationFallbacks=[]}");
|
"creationFallbacks=[], replicationFallbacks=[]}");
|
||||||
expectedPolicyStrings.put(WARM,
|
expectedPolicyStrings.put(WARM,
|
||||||
"BlockStoragePolicy{WARM:8, storageTypes=[DISK, ARCHIVE], " +
|
"BlockStoragePolicy{WARM:" + WARM + ", storageTypes=[DISK, ARCHIVE], " +
|
||||||
"creationFallbacks=[DISK, ARCHIVE], replicationFallbacks=[DISK, ARCHIVE]}");
|
"creationFallbacks=[DISK, ARCHIVE], " +
|
||||||
|
"replicationFallbacks=[DISK, ARCHIVE]}");
|
||||||
expectedPolicyStrings.put(HOT,
|
expectedPolicyStrings.put(HOT,
|
||||||
"BlockStoragePolicy{HOT:12, storageTypes=[DISK], " +
|
"BlockStoragePolicy{HOT:" + HOT + ", storageTypes=[DISK], " +
|
||||||
"creationFallbacks=[], replicationFallbacks=[ARCHIVE]}");
|
"creationFallbacks=[], replicationFallbacks=[ARCHIVE]}");
|
||||||
expectedPolicyStrings.put(LAZY_PERSIST,
|
expectedPolicyStrings.put(LAZY_PERSIST,
|
||||||
"BlockStoragePolicy{LAZY_PERSIST:15, storageTypes=[RAM_DISK, DISK], " +
|
"BlockStoragePolicy{LAZY_PERSIST:" + LAZY_PERSIST +
|
||||||
|
", storageTypes=[RAM_DISK, DISK], " +
|
||||||
"creationFallbacks=[DISK], replicationFallbacks=[DISK]}");
|
"creationFallbacks=[DISK], replicationFallbacks=[DISK]}");
|
||||||
|
expectedPolicyStrings.put(ONESSD, "BlockStoragePolicy{ONE_SSD:" + ONESSD +
|
||||||
|
", storageTypes=[SSD, DISK], creationFallbacks=[SSD, DISK], " +
|
||||||
|
"replicationFallbacks=[SSD, DISK]}");
|
||||||
|
expectedPolicyStrings.put(ALLSSD, "BlockStoragePolicy{ALL_SSD:" + ALLSSD +
|
||||||
|
", storageTypes=[SSD], creationFallbacks=[DISK], " +
|
||||||
|
"replicationFallbacks=[DISK]}");
|
||||||
|
|
||||||
for(byte i = 1; i < 16; i++) {
|
for(byte i = 1; i < 16; i++) {
|
||||||
final BlockStoragePolicy policy = POLICY_SUITE.getPolicy(i);
|
final BlockStoragePolicy policy = POLICY_SUITE.getPolicy(i);
|
||||||
|
@ -845,15 +857,15 @@ public class TestBlockStoragePolicy {
|
||||||
|
|
||||||
final Path invalidPath = new Path("/invalidPath");
|
final Path invalidPath = new Path("/invalidPath");
|
||||||
try {
|
try {
|
||||||
fs.setStoragePolicy(invalidPath, "WARM");
|
fs.setStoragePolicy(invalidPath, HdfsConstants.WARM_STORAGE_POLICY_NAME);
|
||||||
Assert.fail("Should throw a FileNotFoundException");
|
Assert.fail("Should throw a FileNotFoundException");
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
GenericTestUtils.assertExceptionContains(invalidPath.toString(), e);
|
GenericTestUtils.assertExceptionContains(invalidPath.toString(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
fs.setStoragePolicy(fooFile, "COLD");
|
fs.setStoragePolicy(fooFile, HdfsConstants.COLD_STORAGE_POLICY_NAME);
|
||||||
fs.setStoragePolicy(barDir, "WARM");
|
fs.setStoragePolicy(barDir, HdfsConstants.WARM_STORAGE_POLICY_NAME);
|
||||||
fs.setStoragePolicy(barFile2, "HOT");
|
fs.setStoragePolicy(barFile2, HdfsConstants.HOT_STORAGE_POLICY_NAME);
|
||||||
|
|
||||||
dirList = fs.getClient().listPaths(dir.toString(),
|
dirList = fs.getClient().listPaths(dir.toString(),
|
||||||
HdfsFileStatus.EMPTY_NAME).getPartialListing();
|
HdfsFileStatus.EMPTY_NAME).getPartialListing();
|
||||||
|
@ -901,7 +913,7 @@ public class TestBlockStoragePolicy {
|
||||||
DFSTestUtil.createFile(fs, fooFile1, FILE_LEN, REPLICATION, 0L);
|
DFSTestUtil.createFile(fs, fooFile1, FILE_LEN, REPLICATION, 0L);
|
||||||
DFSTestUtil.createFile(fs, fooFile2, FILE_LEN, REPLICATION, 0L);
|
DFSTestUtil.createFile(fs, fooFile2, FILE_LEN, REPLICATION, 0L);
|
||||||
|
|
||||||
fs.setStoragePolicy(fooDir, "WARM");
|
fs.setStoragePolicy(fooDir, HdfsConstants.WARM_STORAGE_POLICY_NAME);
|
||||||
|
|
||||||
HdfsFileStatus[] dirList = fs.getClient().listPaths(dir.toString(),
|
HdfsFileStatus[] dirList = fs.getClient().listPaths(dir.toString(),
|
||||||
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
|
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
|
||||||
|
@ -913,7 +925,7 @@ public class TestBlockStoragePolicy {
|
||||||
// take snapshot
|
// take snapshot
|
||||||
SnapshotTestHelper.createSnapshot(fs, dir, "s1");
|
SnapshotTestHelper.createSnapshot(fs, dir, "s1");
|
||||||
// change the storage policy of fooFile1
|
// change the storage policy of fooFile1
|
||||||
fs.setStoragePolicy(fooFile1, "COLD");
|
fs.setStoragePolicy(fooFile1, HdfsConstants.COLD_STORAGE_POLICY_NAME);
|
||||||
|
|
||||||
fooList = fs.getClient().listPaths(fooDir.toString(),
|
fooList = fs.getClient().listPaths(fooDir.toString(),
|
||||||
HdfsFileStatus.EMPTY_NAME).getPartialListing();
|
HdfsFileStatus.EMPTY_NAME).getPartialListing();
|
||||||
|
@ -936,7 +948,7 @@ public class TestBlockStoragePolicy {
|
||||||
HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD);
|
HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD);
|
||||||
|
|
||||||
// change the storage policy of foo dir
|
// change the storage policy of foo dir
|
||||||
fs.setStoragePolicy(fooDir, "HOT");
|
fs.setStoragePolicy(fooDir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
|
||||||
// /dir/foo is now hot
|
// /dir/foo is now hot
|
||||||
dirList = fs.getClient().listPaths(dir.toString(),
|
dirList = fs.getClient().listPaths(dir.toString(),
|
||||||
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
|
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
|
||||||
|
@ -1053,7 +1065,7 @@ public class TestBlockStoragePolicy {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testChangeHotFileRep() throws Exception {
|
public void testChangeHotFileRep() throws Exception {
|
||||||
testChangeFileRep("HOT", HOT,
|
testChangeFileRep(HdfsConstants.HOT_STORAGE_POLICY_NAME, HOT,
|
||||||
new StorageType[]{StorageType.DISK, StorageType.DISK,
|
new StorageType[]{StorageType.DISK, StorageType.DISK,
|
||||||
StorageType.DISK},
|
StorageType.DISK},
|
||||||
new StorageType[]{StorageType.DISK, StorageType.DISK, StorageType.DISK,
|
new StorageType[]{StorageType.DISK, StorageType.DISK, StorageType.DISK,
|
||||||
|
@ -1067,7 +1079,7 @@ public class TestBlockStoragePolicy {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testChangeWarmRep() throws Exception {
|
public void testChangeWarmRep() throws Exception {
|
||||||
testChangeFileRep("WARM", WARM,
|
testChangeFileRep(HdfsConstants.WARM_STORAGE_POLICY_NAME, WARM,
|
||||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE,
|
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE,
|
||||||
StorageType.ARCHIVE},
|
StorageType.ARCHIVE},
|
||||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE,
|
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE,
|
||||||
|
@ -1080,7 +1092,7 @@ public class TestBlockStoragePolicy {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testChangeColdRep() throws Exception {
|
public void testChangeColdRep() throws Exception {
|
||||||
testChangeFileRep("COLD", COLD,
|
testChangeFileRep(HdfsConstants.COLD_STORAGE_POLICY_NAME, COLD,
|
||||||
new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
|
new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
|
||||||
StorageType.ARCHIVE},
|
StorageType.ARCHIVE},
|
||||||
new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
|
new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
|
||||||
|
@ -1144,7 +1156,7 @@ public class TestBlockStoragePolicy {
|
||||||
final DistributedFileSystem fs = cluster.getFileSystem();
|
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
try {
|
try {
|
||||||
BlockStoragePolicy[] policies = fs.getStoragePolicies();
|
BlockStoragePolicy[] policies = fs.getStoragePolicies();
|
||||||
Assert.assertEquals(4, policies.length);
|
Assert.assertEquals(6, policies.length);
|
||||||
Assert.assertEquals(POLICY_SUITE.getPolicy(COLD).toString(),
|
Assert.assertEquals(POLICY_SUITE.getPolicy(COLD).toString(),
|
||||||
policies[0].toString());
|
policies[0].toString());
|
||||||
Assert.assertEquals(POLICY_SUITE.getPolicy(WARM).toString(),
|
Assert.assertEquals(POLICY_SUITE.getPolicy(WARM).toString(),
|
||||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
@ -44,6 +43,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.StorageType;
|
import org.apache.hadoop.hdfs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
@ -67,8 +67,6 @@ import org.junit.Test;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test the data migration tool (for Archival Storage)
|
* Test the data migration tool (for Archival Storage)
|
||||||
*/
|
*/
|
||||||
|
@ -100,9 +98,9 @@ public class TestStorageMover {
|
||||||
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L);
|
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L);
|
||||||
|
|
||||||
DEFAULT_POLICIES = BlockStoragePolicySuite.createDefaultSuite();
|
DEFAULT_POLICIES = BlockStoragePolicySuite.createDefaultSuite();
|
||||||
HOT = DEFAULT_POLICIES.getPolicy("HOT");
|
HOT = DEFAULT_POLICIES.getPolicy(HdfsConstants.HOT_STORAGE_POLICY_NAME);
|
||||||
WARM = DEFAULT_POLICIES.getPolicy("WARM");
|
WARM = DEFAULT_POLICIES.getPolicy(HdfsConstants.WARM_STORAGE_POLICY_NAME);
|
||||||
COLD = DEFAULT_POLICIES.getPolicy("COLD");
|
COLD = DEFAULT_POLICIES.getPolicy(HdfsConstants.COLD_STORAGE_POLICY_NAME);
|
||||||
TestBalancer.initTestSetup();
|
TestBalancer.initTestSetup();
|
||||||
Dispatcher.setDelayAfterErrors(1000L);
|
Dispatcher.setDelayAfterErrors(1000L);
|
||||||
}
|
}
|
||||||
|
@ -201,14 +199,6 @@ public class TestStorageMover {
|
||||||
this.policies = DEFAULT_POLICIES;
|
this.policies = DEFAULT_POLICIES;
|
||||||
}
|
}
|
||||||
|
|
||||||
MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme,
|
|
||||||
BlockStoragePolicySuite policies) {
|
|
||||||
this.clusterScheme = cScheme;
|
|
||||||
this.nsScheme = nsScheme;
|
|
||||||
this.conf = clusterScheme.conf;
|
|
||||||
this.policies = policies;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up the cluster and start NameNode and DataNodes according to the
|
* Set up the cluster and start NameNode and DataNodes according to the
|
||||||
* corresponding scheme.
|
* corresponding scheme.
|
||||||
|
@ -273,9 +263,6 @@ public class TestStorageMover {
|
||||||
}
|
}
|
||||||
if (verifyAll) {
|
if (verifyAll) {
|
||||||
verifyNamespace();
|
verifyNamespace();
|
||||||
} else {
|
|
||||||
// TODO verify according to the given path list
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -413,11 +400,6 @@ public class TestStorageMover {
|
||||||
return genStorageTypes(numDataNodes, 0, 0, 0);
|
return genStorageTypes(numDataNodes, 0, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static StorageType[][] genStorageTypes(int numDataNodes,
|
|
||||||
int numAllDisk, int numAllArchive) {
|
|
||||||
return genStorageTypes(numDataNodes, numAllDisk, numAllArchive, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static StorageType[][] genStorageTypes(int numDataNodes,
|
private static StorageType[][] genStorageTypes(int numDataNodes,
|
||||||
int numAllDisk, int numAllArchive, int numRamDisk) {
|
int numAllDisk, int numAllArchive, int numRamDisk) {
|
||||||
Preconditions.checkArgument(
|
Preconditions.checkArgument(
|
||||||
|
@ -441,26 +423,6 @@ public class TestStorageMover {
|
||||||
return types;
|
return types;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long[][] genCapacities(int nDatanodes, int numAllDisk,
|
|
||||||
int numAllArchive, int numRamDisk, long diskCapacity,
|
|
||||||
long archiveCapacity, long ramDiskCapacity) {
|
|
||||||
final long[][] capacities = new long[nDatanodes][];
|
|
||||||
int i = 0;
|
|
||||||
for (; i < numRamDisk; i++) {
|
|
||||||
capacities[i] = new long[]{ramDiskCapacity, diskCapacity};
|
|
||||||
}
|
|
||||||
for (; i < numRamDisk + numAllDisk; i++) {
|
|
||||||
capacities[i] = new long[]{diskCapacity, diskCapacity};
|
|
||||||
}
|
|
||||||
for (; i < numRamDisk + numAllDisk + numAllArchive; i++) {
|
|
||||||
capacities[i] = new long[]{archiveCapacity, archiveCapacity};
|
|
||||||
}
|
|
||||||
for(; i < capacities.length; i++) {
|
|
||||||
capacities[i] = new long[]{diskCapacity, archiveCapacity};
|
|
||||||
}
|
|
||||||
return capacities;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class PathPolicyMap {
|
private static class PathPolicyMap {
|
||||||
final Map<Path, BlockStoragePolicy> map = Maps.newHashMap();
|
final Map<Path, BlockStoragePolicy> map = Maps.newHashMap();
|
||||||
final Path hot = new Path("/hot");
|
final Path hot = new Path("/hot");
|
||||||
|
@ -666,8 +628,8 @@ public class TestStorageMover {
|
||||||
|
|
||||||
private void setVolumeFull(DataNode dn, StorageType type) {
|
private void setVolumeFull(DataNode dn, StorageType type) {
|
||||||
List<? extends FsVolumeSpi> volumes = dn.getFSDataset().getVolumes();
|
List<? extends FsVolumeSpi> volumes = dn.getFSDataset().getVolumes();
|
||||||
for (int j = 0; j < volumes.size(); ++j) {
|
for (FsVolumeSpi v : volumes) {
|
||||||
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
|
FsVolumeImpl volume = (FsVolumeImpl) v;
|
||||||
if (volume.getStorageType() == type) {
|
if (volume.getStorageType() == type) {
|
||||||
LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]"
|
LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]"
|
||||||
+ volume.getStorageID());
|
+ volume.getStorageID());
|
||||||
|
|
Loading…
Reference in New Issue