EC benchmark add options. buffer size and random read

This commit is contained in:
Zhuobin Zheng 2021-12-17 16:34:48 +08:00
parent 07141426e0
commit 10732107c8
1 changed files with 62 additions and 26 deletions

View File

@ -42,6 +42,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -69,7 +70,8 @@ import java.util.concurrent.TimeUnit;
public class ErasureCodeBenchmarkThroughput public class ErasureCodeBenchmarkThroughput
extends Configured implements Tool { extends Configured implements Tool {
private static final int BUFFER_SIZE_MB = 128; private static final int BUFFER_SIZE_MB = Integer.valueOf(System.getProperty(
"test.benchmark.buffer.mb", "128"));
private static final String DFS_TMP_DIR = System.getProperty( private static final String DFS_TMP_DIR = System.getProperty(
"test.benchmark.data", "/tmp/benchmark/data"); "test.benchmark.data", "/tmp/benchmark/data");
public static final String REP_DIR = DFS_TMP_DIR + "/replica"; public static final String REP_DIR = DFS_TMP_DIR + "/replica";
@ -101,6 +103,10 @@ public class ErasureCodeBenchmarkThroughput
READ, WRITE, GEN, CLEAN; READ, WRITE, GEN, CLEAN;
} }
enum ReadType {
STATEFUL, POSITIONAL, RANDOM;
}
public static String getFilePath(int dataSizeMB, boolean isEc) { public static String getFilePath(int dataSizeMB, boolean isEc) {
String parent = isEc ? EC_DIR : REP_DIR; String parent = isEc ? EC_DIR : REP_DIR;
String file = isEc ? EC_FILE_BASE : REP_FILE_BASE; String file = isEc ? EC_FILE_BASE : REP_FILE_BASE;
@ -113,19 +119,20 @@ public class ErasureCodeBenchmarkThroughput
} }
System.err.println("Usage: ErasureCodeBenchmarkThroughput " + System.err.println("Usage: ErasureCodeBenchmarkThroughput " +
"<read|write|gen|clean> <size in MB> " + "<read|write|gen|clean> <size in MB> " +
"<ec|rep> [num clients] [stf|pos]\n" + "<ec|rep> [num clients] [stf|pos|rdm]\n" +
"Stateful and positional option is only available for read."); "Stateful and positional and random option is only available for read."
);
System.exit(1); System.exit(1);
} }
private List<Long> doBenchmark(boolean isRead, int dataSizeMB, private List<Long> doBenchmark(boolean isRead, int dataSizeMB,
int numClients, boolean isEc, boolean statefulRead, boolean isGen) int numClients, boolean isEc, ReadType readType, boolean isGen)
throws Exception { throws Exception {
CompletionService<Long> cs = new ExecutorCompletionService<Long>( CompletionService<Long> cs = new ExecutorCompletionService<Long>(
Executors.newFixedThreadPool(numClients)); Executors.newFixedThreadPool(numClients));
for (int i = 0; i < numClients; i++) { for (int i = 0; i < numClients; i++) {
cs.submit(isRead ? cs.submit(isRead ?
new ReadCallable(dataSizeMB, isEc, i, statefulRead) : new ReadCallable(dataSizeMB, isEc, i, readType) :
new WriteCallable(dataSizeMB, isEc, i, isGen)); new WriteCallable(dataSizeMB, isEc, i, isGen));
} }
List<Long> results = new ArrayList<>(numClients); List<Long> results = new ArrayList<>(numClients);
@ -146,21 +153,21 @@ public class ErasureCodeBenchmarkThroughput
} }
private void benchmark(OpType type, int dataSizeMB, private void benchmark(OpType type, int dataSizeMB,
int numClients, boolean isEc, boolean statefulRead) throws Exception { int numClients, boolean isEc, ReadType readType) throws Exception {
List<Long> sizes = null; List<Long> sizes = null;
StopWatch sw = new StopWatch().start(); StopWatch sw = new StopWatch().start();
switch (type) { switch (type) {
case READ: case READ:
sizes = doBenchmark(true, dataSizeMB, numClients, isEc, sizes = doBenchmark(true, dataSizeMB, numClients, isEc,
statefulRead, false); readType, false);
break; break;
case WRITE: case WRITE:
sizes = doBenchmark( sizes = doBenchmark(
false, dataSizeMB, numClients, isEc, statefulRead, false); false, dataSizeMB, numClients, isEc, readType, false);
break; break;
case GEN: case GEN:
sizes = doBenchmark(false, dataSizeMB, numClients, isEc, sizes = doBenchmark(false, dataSizeMB, numClients, isEc,
statefulRead, true); readType, true);
} }
long elapsedSec = sw.now(TimeUnit.SECONDS); long elapsedSec = sw.now(TimeUnit.SECONDS);
double totalDataSizeMB = 0; double totalDataSizeMB = 0;
@ -204,7 +211,7 @@ public class ErasureCodeBenchmarkThroughput
int dataSizeMB = 0; int dataSizeMB = 0;
boolean isEc = true; boolean isEc = true;
int numClients = 1; int numClients = 1;
boolean statefulRead = true; ReadType readType = ReadType.STATEFUL;
if (args.length >= 3) { if (args.length >= 3) {
if (args[0].equals("read")) { if (args[0].equals("read")) {
type = OpType.READ; type = OpType.READ;
@ -243,8 +250,11 @@ public class ErasureCodeBenchmarkThroughput
} }
} }
if (args.length >= 5 && type == OpType.READ) { if (args.length >= 5 && type == OpType.READ) {
statefulRead = args[4].equals("stf"); switch (args[4]) {
if (!statefulRead && !args[4].equals("pos")) { case "stf": readType = ReadType.STATEFUL; break;
case "pos": readType = ReadType.POSITIONAL; break;
case "rdm": readType = ReadType.RANDOM; break;
default:
printUsage("Unknown read mode: " + args[4]); printUsage("Unknown read mode: " + args[4]);
} }
} }
@ -256,7 +266,7 @@ public class ErasureCodeBenchmarkThroughput
if (type == OpType.READ && isEc) { if (type == OpType.READ && isEc) {
setReadThreadPoolSize(numClients); setReadThreadPoolSize(numClients);
} }
benchmark(type, dataSizeMB, numClients, isEc, statefulRead); benchmark(type, dataSizeMB, numClients, isEc, readType);
} }
return 0; return 0;
} }
@ -345,12 +355,12 @@ public class ErasureCodeBenchmarkThroughput
} }
private class ReadCallable extends CallableBase { private class ReadCallable extends CallableBase {
private final boolean statefulRead; private final ReadType readType;
public ReadCallable(int dataSizeMB, boolean isEc, int id, public ReadCallable(int dataSizeMB, boolean isEc, int id,
boolean statefulRead) throws IOException { ReadType readType) throws IOException {
super(dataSizeMB, isEc, id); super(dataSizeMB, isEc, id);
this.statefulRead = statefulRead; this.readType = readType;
} }
private long doStateful(FSDataInputStream inputStream) throws IOException { private long doStateful(FSDataInputStream inputStream) throws IOException {
@ -383,17 +393,36 @@ public class ErasureCodeBenchmarkThroughput
return count; return count;
} }
private long doRandom(FSDataInputStream inputStream) throws IOException {
ThreadLocalRandom random = ThreadLocalRandom.current();
long dataSize = dataSizeMB * 1024 * 1024L;
long count = 0;
long bytesRead;
byte buf[] = new byte[BUFFER_SIZE_MB * 1024 * 1024];
while (true) {
bytesRead = inputStream.read(random.nextLong(dataSize), buf, 0, buf.length);
count += bytesRead;
if (count >= dataSize) {
break;
}
}
return count;
}
private long readFile(Path path) throws IOException { private long readFile(Path path) throws IOException {
try (FSDataInputStream inputStream = fs.open(path)) { try (FSDataInputStream inputStream = fs.open(path)) {
StopWatch sw = new StopWatch().start(); StopWatch sw = new StopWatch().start();
System.out.println((statefulRead ? "Stateful reading " :
"Positional reading ") + path); System.out.println(readType + " reading " + path);
long totalRead = statefulRead ? doStateful(inputStream) : long totalRead = 0;
doPositional(inputStream); switch (readType) {
System.out.println( case STATEFUL: totalRead = doStateful(inputStream); break;
(statefulRead ? "Finished stateful read " : case POSITIONAL: totalRead = doPositional(inputStream); break;
"Finished positional read ") + path + ". Time taken: " + case RANDOM: totalRead = doRandom(inputStream); break;
sw.now(TimeUnit.SECONDS) + " s."); }
System.out.println("Finished " + readType + " read " + path
+ ". Time taken: " + sw.now(TimeUnit.SECONDS) + " s.");
return totalRead; return totalRead;
} }
} }
@ -408,8 +437,15 @@ public class ErasureCodeBenchmarkThroughput
} }
long bytesRead = readFile(path); long bytesRead = readFile(path);
long dataSize = dataSizeMB * 1024 * 1024L; long dataSize = dataSizeMB * 1024 * 1024L;
Preconditions.checkArgument(bytesRead == dataSize, if (ReadType.RANDOM.equals(readType)) {
"Specified data size: " + dataSize + ", actually read " + bytesRead); Preconditions.checkArgument(bytesRead >= dataSize,
"Expect read more than size: " + dataSize +
", actually read " + bytesRead);
} else {
Preconditions.checkArgument(bytesRead == dataSize,
"Specified data size: " + dataSize + ", actually read "
+ bytesRead);
}
return bytesRead; return bytesRead;
} }
} }