diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java index e7aa66b62f0..05d4d77744e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java @@ -29,6 +29,7 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintStream; import java.text.DecimalFormat; +import java.util.Collection; import java.util.Date; import java.util.Random; import java.util.StringTokenizer; @@ -36,7 +37,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; @@ -102,7 +105,8 @@ public class TestDFSIO implements Tool { " [-compression codecClassName]" + " [-nrFiles N]" + " [-size Size[B|KB|MB|GB|TB]]" + - " [-resFile resultFileName] [-bufferSize Bytes]"; + " [-resFile resultFileName] [-bufferSize Bytes]" + + " [-storagePolicy storagePolicyName]"; private Configuration config; @@ -305,7 +309,7 @@ public class TestDFSIO implements Tool { writer = null; } } - LOG.info("created control files for: "+nrFiles+" files"); + LOG.info("created control files for: " + nrFiles + " files"); } private static String getFileName(int fIdx) { @@ -326,6 +330,7 @@ public class TestDFSIO implements Tool { */ private abstract static class IOStatMapper extends IOMapperBase { protected CompressionCodec compressionCodec; + protected String blockStoragePolicy; IOStatMapper() { } @@ -350,6 +355,8 @@ public class TestDFSIO implements Tool { compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codec, getConf()); } + + blockStoragePolicy = getConf().get("test.io.block.storage.policy", null); } @Override // IOMapperBase @@ -389,8 +396,11 @@ public class TestDFSIO implements Tool { @Override // IOMapperBase public Closeable getIOStream(String name) throws IOException { // create file - OutputStream out = - fs.create(new Path(getDataDir(getConf()), name), true, bufferSize); + Path filePath = new Path(getDataDir(getConf()), name); + OutputStream out = fs.create(filePath, true, bufferSize); + if (blockStoragePolicy != null) { + fs.setStoragePolicy(filePath, blockStoragePolicy); + } if(compressionCodec != null) out = compressionCodec.createOutputStream(out); LOG.info("out = " + out.getClass().getName()); @@ -713,8 +723,9 @@ public class TestDFSIO implements Tool { System.err.print(StringUtils.stringifyException(e)); res = -2; } - if(res == -1) - System.err.print(USAGE); + if (res == -1) { + System.err.println(USAGE); + } System.exit(res); } @@ -727,6 +738,7 @@ public class TestDFSIO implements Tool { long skipSize = 0; String resFileName = DEFAULT_RES_FILE_NAME; String compressionClass = null; + String storagePolicy = null; boolean isSequential = false; String version = TestDFSIO.class.getSimpleName() + ".1.8"; @@ -771,6 +783,8 @@ public class TestDFSIO implements Tool { bufferSize = Integer.parseInt(args[++i]); } else if (args[i].equalsIgnoreCase("-resfile")) { resFileName = args[++i]; + } else if (args[i].equalsIgnoreCase("-storagePolicy")) { + storagePolicy = args[++i]; } else { System.err.println("Illegal argument: " + args[i]); return -1; @@ -799,6 +813,33 @@ public class TestDFSIO implements Tool { config.setLong("test.io.skip.size", skipSize); FileSystem fs = FileSystem.get(config); + if (storagePolicy != null) { + boolean isValid = false; + Collection storagePolicies = + ((DistributedFileSystem) fs).getAllStoragePolicies(); + try { + for (BlockStoragePolicy policy : storagePolicies) { + if (policy.getName().equals(storagePolicy)) { + isValid = true; + break; + } + } + } catch (Exception e) { + throw new IOException("Get block storage policies error: ", e); + } + if (!isValid) { + System.out.println("Invalid block storage policy: " + storagePolicy); + System.out.println("Current supported storage policy list: "); + for (BlockStoragePolicy policy : storagePolicies) { + System.out.println(policy.getName()); + } + return -1; + } + + config.set("test.io.block.storage.policy", storagePolicy); + LOG.info("storagePolicy = " + storagePolicy); + } + if (isSequential) { long tStart = System.currentTimeMillis(); sequentialTest(fs, testType, nrBytes, nrFiles);