MAPREDUCE-6227. DFSIO for truncate. (shv via yliu)
This commit is contained in:
parent
c5f18ba65b
commit
6933975143
|
@ -42,6 +42,8 @@ Release 2.7.0 - UNRELEASED
|
||||||
MAPREDUCE-5800. Use Job#getInstance instead of deprecated constructors
|
MAPREDUCE-5800. Use Job#getInstance instead of deprecated constructors
|
||||||
(aajisaka)
|
(aajisaka)
|
||||||
|
|
||||||
|
MAPREDUCE-6227. DFSIO for truncate. (shv via yliu)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-6169. MergeQueue should release reference to the current item
|
MAPREDUCE-6169. MergeQueue should release reference to the current item
|
||||||
|
|
|
@ -31,7 +31,6 @@ import java.io.PrintStream;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.StringTokenizer;
|
import java.util.StringTokenizer;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -92,13 +91,13 @@ public class TestDFSIO implements Tool {
|
||||||
private static final String BASE_FILE_NAME = "test_io_";
|
private static final String BASE_FILE_NAME = "test_io_";
|
||||||
private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
|
private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
|
||||||
private static final long MEGA = ByteMultiple.MB.value();
|
private static final long MEGA = ByteMultiple.MB.value();
|
||||||
private static final int DEFAULT_NR_BYTES = 1;
|
private static final int DEFAULT_NR_BYTES = 128;
|
||||||
private static final int DEFAULT_NR_FILES = 4;
|
private static final int DEFAULT_NR_FILES = 4;
|
||||||
private static final String USAGE =
|
private static final String USAGE =
|
||||||
"Usage: " + TestDFSIO.class.getSimpleName() +
|
"Usage: " + TestDFSIO.class.getSimpleName() +
|
||||||
" [genericOptions]" +
|
" [genericOptions]" +
|
||||||
" -read [-random | -backward | -skip [-skipSize Size]] |" +
|
" -read [-random | -backward | -skip [-skipSize Size]] |" +
|
||||||
" -write | -append | -clean" +
|
" -write | -append | -truncate | -clean" +
|
||||||
" [-compression codecClassName]" +
|
" [-compression codecClassName]" +
|
||||||
" [-nrFiles N]" +
|
" [-nrFiles N]" +
|
||||||
" [-size Size[B|KB|MB|GB|TB]]" +
|
" [-size Size[B|KB|MB|GB|TB]]" +
|
||||||
|
@ -121,7 +120,8 @@ public class TestDFSIO implements Tool {
|
||||||
TEST_TYPE_APPEND("append"),
|
TEST_TYPE_APPEND("append"),
|
||||||
TEST_TYPE_READ_RANDOM("random read"),
|
TEST_TYPE_READ_RANDOM("random read"),
|
||||||
TEST_TYPE_READ_BACKWARD("backward read"),
|
TEST_TYPE_READ_BACKWARD("backward read"),
|
||||||
TEST_TYPE_READ_SKIP("skip read");
|
TEST_TYPE_READ_SKIP("skip read"),
|
||||||
|
TEST_TYPE_TRUNCATE("truncate");
|
||||||
|
|
||||||
private String type;
|
private String type;
|
||||||
|
|
||||||
|
@ -192,6 +192,9 @@ public class TestDFSIO implements Tool {
|
||||||
private static Path getRandomReadDir(Configuration conf) {
|
private static Path getRandomReadDir(Configuration conf) {
|
||||||
return new Path(getBaseDir(conf), "io_random_read");
|
return new Path(getBaseDir(conf), "io_random_read");
|
||||||
}
|
}
|
||||||
|
private static Path getTruncateDir(Configuration conf) {
|
||||||
|
return new Path(getBaseDir(conf), "io_truncate");
|
||||||
|
}
|
||||||
private static Path getDataDir(Configuration conf) {
|
private static Path getDataDir(Configuration conf) {
|
||||||
return new Path(getBaseDir(conf), "io_data");
|
return new Path(getBaseDir(conf), "io_data");
|
||||||
}
|
}
|
||||||
|
@ -203,6 +206,7 @@ public class TestDFSIO implements Tool {
|
||||||
public static void beforeClass() throws Exception {
|
public static void beforeClass() throws Exception {
|
||||||
bench = new TestDFSIO();
|
bench = new TestDFSIO();
|
||||||
bench.getConf().setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
|
bench.getConf().setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
|
||||||
|
bench.getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||||
cluster = new MiniDFSCluster.Builder(bench.getConf())
|
cluster = new MiniDFSCluster.Builder(bench.getConf())
|
||||||
.numDataNodes(2)
|
.numDataNodes(2)
|
||||||
.format(true)
|
.format(true)
|
||||||
|
@ -279,6 +283,16 @@ public class TestDFSIO implements Tool {
|
||||||
bench.analyzeResult(fs, TestType.TEST_TYPE_APPEND, execTime);
|
bench.analyzeResult(fs, TestType.TEST_TYPE_APPEND, execTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 60000)
|
||||||
|
public void testTruncate() throws Exception {
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
bench.createControlFile(fs, DEFAULT_NR_BYTES / 2, DEFAULT_NR_FILES);
|
||||||
|
long tStart = System.currentTimeMillis();
|
||||||
|
bench.truncateTest(fs);
|
||||||
|
long execTime = System.currentTimeMillis() - tStart;
|
||||||
|
bench.analyzeResult(fs, TestType.TEST_TYPE_TRUNCATE, execTime);
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
private void createControlFile(FileSystem fs,
|
private void createControlFile(FileSystem fs,
|
||||||
long nrBytes, // in bytes
|
long nrBytes, // in bytes
|
||||||
|
@ -301,9 +315,9 @@ public class TestDFSIO implements Tool {
|
||||||
} catch(Exception e) {
|
} catch(Exception e) {
|
||||||
throw new IOException(e.getLocalizedMessage());
|
throw new IOException(e.getLocalizedMessage());
|
||||||
} finally {
|
} finally {
|
||||||
if (writer != null)
|
if (writer != null)
|
||||||
writer.close();
|
writer.close();
|
||||||
writer = null;
|
writer = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.info("created control files for: "+nrFiles+" files");
|
LOG.info("created control files for: "+nrFiles+" files");
|
||||||
|
@ -613,6 +627,51 @@ public class TestDFSIO implements Tool {
|
||||||
runIOTest(RandomReadMapper.class, readDir);
|
runIOTest(RandomReadMapper.class, readDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Truncate mapper class.
|
||||||
|
* The mapper truncates given file to the newLength, specified by -size.
|
||||||
|
*/
|
||||||
|
public static class TruncateMapper extends IOStatMapper {
|
||||||
|
private static final long DELAY = 100L;
|
||||||
|
|
||||||
|
private Path filePath;
|
||||||
|
private long fileSize;
|
||||||
|
|
||||||
|
@Override // IOMapperBase
|
||||||
|
public Closeable getIOStream(String name) throws IOException {
|
||||||
|
filePath = new Path(getDataDir(getConf()), name);
|
||||||
|
fileSize = fs.getFileStatus(filePath).getLen();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override // IOMapperBase
|
||||||
|
public Long doIO(Reporter reporter,
|
||||||
|
String name,
|
||||||
|
long newLength // in bytes
|
||||||
|
) throws IOException {
|
||||||
|
boolean isClosed = fs.truncate(filePath, newLength);
|
||||||
|
reporter.setStatus("truncating " + name + " to newLength " +
|
||||||
|
newLength + " ::host = " + hostName);
|
||||||
|
for(int i = 0; !isClosed; i++) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(DELAY);
|
||||||
|
} catch (InterruptedException ignored) {}
|
||||||
|
FileStatus status = fs.getFileStatus(filePath);
|
||||||
|
assert status != null : "status is null";
|
||||||
|
isClosed = (status.getLen() == newLength);
|
||||||
|
reporter.setStatus("truncate recover for " + name + " to newLength " +
|
||||||
|
newLength + " attempt " + i + " ::host = " + hostName);
|
||||||
|
}
|
||||||
|
return Long.valueOf(fileSize - newLength);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void truncateTest(FileSystem fs) throws IOException {
|
||||||
|
Path TruncateDir = getTruncateDir(config);
|
||||||
|
fs.delete(TruncateDir, true);
|
||||||
|
runIOTest(TruncateMapper.class, TruncateDir);
|
||||||
|
}
|
||||||
|
|
||||||
private void sequentialTest(FileSystem fs,
|
private void sequentialTest(FileSystem fs,
|
||||||
TestType testType,
|
TestType testType,
|
||||||
long fileSize, // in bytes
|
long fileSize, // in bytes
|
||||||
|
@ -634,6 +693,9 @@ public class TestDFSIO implements Tool {
|
||||||
case TEST_TYPE_READ_SKIP:
|
case TEST_TYPE_READ_SKIP:
|
||||||
ioer = new RandomReadMapper();
|
ioer = new RandomReadMapper();
|
||||||
break;
|
break;
|
||||||
|
case TEST_TYPE_TRUNCATE:
|
||||||
|
ioer = new TruncateMapper();
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -667,7 +729,7 @@ public class TestDFSIO implements Tool {
|
||||||
String resFileName = DEFAULT_RES_FILE_NAME;
|
String resFileName = DEFAULT_RES_FILE_NAME;
|
||||||
String compressionClass = null;
|
String compressionClass = null;
|
||||||
boolean isSequential = false;
|
boolean isSequential = false;
|
||||||
String version = TestDFSIO.class.getSimpleName() + ".1.7";
|
String version = TestDFSIO.class.getSimpleName() + ".1.8";
|
||||||
|
|
||||||
LOG.info(version);
|
LOG.info(version);
|
||||||
if (args.length == 0) {
|
if (args.length == 0) {
|
||||||
|
@ -691,6 +753,8 @@ public class TestDFSIO implements Tool {
|
||||||
} else if (args[i].equals("-skip")) {
|
} else if (args[i].equals("-skip")) {
|
||||||
if(testType != TestType.TEST_TYPE_READ) return -1;
|
if(testType != TestType.TEST_TYPE_READ) return -1;
|
||||||
testType = TestType.TEST_TYPE_READ_SKIP;
|
testType = TestType.TEST_TYPE_READ_SKIP;
|
||||||
|
} else if (args[i].equalsIgnoreCase("-truncate")) {
|
||||||
|
testType = TestType.TEST_TYPE_TRUNCATE;
|
||||||
} else if (args[i].equals("-clean")) {
|
} else if (args[i].equals("-clean")) {
|
||||||
testType = TestType.TEST_TYPE_CLEANUP;
|
testType = TestType.TEST_TYPE_CLEANUP;
|
||||||
} else if (args[i].startsWith("-seq")) {
|
} else if (args[i].startsWith("-seq")) {
|
||||||
|
@ -764,6 +828,11 @@ public class TestDFSIO implements Tool {
|
||||||
case TEST_TYPE_READ_BACKWARD:
|
case TEST_TYPE_READ_BACKWARD:
|
||||||
case TEST_TYPE_READ_SKIP:
|
case TEST_TYPE_READ_SKIP:
|
||||||
randomReadTest(fs);
|
randomReadTest(fs);
|
||||||
|
break;
|
||||||
|
case TEST_TYPE_TRUNCATE:
|
||||||
|
truncateTest(fs);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
}
|
}
|
||||||
long execTime = System.currentTimeMillis() - tStart;
|
long execTime = System.currentTimeMillis() - tStart;
|
||||||
|
|
||||||
|
@ -799,7 +868,7 @@ public class TestDFSIO implements Tool {
|
||||||
return ((float)bytes)/MEGA;
|
return ((float)bytes)/MEGA;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void analyzeResult( FileSystem fs,
|
private void analyzeResult( FileSystem fs,
|
||||||
TestType testType,
|
TestType testType,
|
||||||
long execTime,
|
long execTime,
|
||||||
String resFileName
|
String resFileName
|
||||||
|
@ -872,13 +941,17 @@ public class TestDFSIO implements Tool {
|
||||||
case TEST_TYPE_READ_BACKWARD:
|
case TEST_TYPE_READ_BACKWARD:
|
||||||
case TEST_TYPE_READ_SKIP:
|
case TEST_TYPE_READ_SKIP:
|
||||||
return new Path(getRandomReadDir(config), "part-00000");
|
return new Path(getRandomReadDir(config), "part-00000");
|
||||||
|
case TEST_TYPE_TRUNCATE:
|
||||||
|
return new Path(getTruncateDir(config), "part-00000");
|
||||||
|
default:
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void analyzeResult(FileSystem fs, TestType testType, long execTime)
|
private void analyzeResult(FileSystem fs, TestType testType, long execTime)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
analyzeResult(fs, testType, execTime, DEFAULT_RES_FILE_NAME);
|
String dir = System.getProperty("test.build.dir", "target/test-dir");
|
||||||
|
analyzeResult(fs, testType, execTime, dir + "/" + DEFAULT_RES_FILE_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanup(FileSystem fs)
|
private void cleanup(FileSystem fs)
|
||||||
|
|
Loading…
Reference in New Issue