HDFS-12537. Ozone: Reduce key creation overhead in Corona. Contributed by Lokesh Jain.

This commit is contained in:
Nandakumar 2017-10-07 15:04:10 +05:30 committed by Owen O'Malley
parent 9630621be7
commit 3504af9411
3 changed files with 392 additions and 92 deletions

View File

@ -36,7 +36,7 @@ public class OzoneQuota {
private int size; private int size;
/** Quota Units.*/ /** Quota Units.*/
public enum Units {UNDEFINED, BYTES, MB, GB, TB} public enum Units {UNDEFINED, BYTES, KB, MB, GB, TB}
/** /**
* Returns size. * Returns size.

View File

@ -17,33 +17,37 @@
package org.apache.hadoop.ozone.tools; package org.apache.hadoop.ozone.tools;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.*;
import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.util.*;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.Arrays; import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -53,6 +57,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier; import java.util.function.Supplier;
import static java.lang.Math.max;
import static java.lang.Math.min;
/** /**
* Corona - A tool to populate ozone with data for testing.<br> * Corona - A tool to populate ozone with data for testing.<br>
* This is not a map-reduce program and this is not for benchmarking * This is not a map-reduce program and this is not for benchmarking
@ -90,6 +97,7 @@ public final class Corona extends Configured implements Tool {
private static final String MODE = "mode"; private static final String MODE = "mode";
private static final String SOURCE = "source"; private static final String SOURCE = "source";
private static final String VALIDATE_WRITE = "validateWrites"; private static final String VALIDATE_WRITE = "validateWrites";
private static final String JSON_WRITE_DIRECTORY = "jsonDir";
private static final String NUM_OF_THREADS = "numOfThreads"; private static final String NUM_OF_THREADS = "numOfThreads";
private static final String NUM_OF_VOLUMES = "numOfVolumes"; private static final String NUM_OF_VOLUMES = "numOfVolumes";
private static final String NUM_OF_BUCKETS = "numOfBuckets"; private static final String NUM_OF_BUCKETS = "numOfBuckets";
@ -105,6 +113,7 @@ public final class Corona extends Configured implements Tool {
private static final String NUM_OF_VOLUMES_DEFAULT = "10"; private static final String NUM_OF_VOLUMES_DEFAULT = "10";
private static final String NUM_OF_BUCKETS_DEFAULT = "1000"; private static final String NUM_OF_BUCKETS_DEFAULT = "1000";
private static final String NUM_OF_KEYS_DEFAULT = "500000"; private static final String NUM_OF_KEYS_DEFAULT = "500000";
private static final String DURATION_FORMAT = "HH:mm:ss,SSS";
private static final int KEY_SIZE_DEFAULT = 10240; private static final int KEY_SIZE_DEFAULT = 10240;
@ -121,10 +130,12 @@ public final class Corona extends Configured implements Tool {
private String numOfVolumes; private String numOfVolumes;
private String numOfBuckets; private String numOfBuckets;
private String numOfKeys; private String numOfKeys;
private String jsonDir;
private boolean useRatis; private boolean useRatis;
private int replicationFactor = 0; private int replicationFactor = 0;
private int keySize; private int keySize;
private byte[] keyValue = null;
private boolean validateWrites; private boolean validateWrites;
@ -133,6 +144,7 @@ public final class Corona extends Configured implements Tool {
private ExecutorService processor; private ExecutorService processor;
private long startTime; private long startTime;
private long jobStartTime;
private AtomicLong volumeCreationTime; private AtomicLong volumeCreationTime;
private AtomicLong bucketCreationTime; private AtomicLong bucketCreationTime;
@ -150,10 +162,12 @@ public final class Corona extends Configured implements Tool {
private Long writeValidationFailureCount; private Long writeValidationFailureCount;
private BlockingQueue<KeyValue> validationQueue; private BlockingQueue<KeyValue> validationQueue;
private List<Double> threadThroughput;
@VisibleForTesting @VisibleForTesting
Corona(Configuration conf) throws IOException { Corona(Configuration conf) throws IOException {
startTime = System.nanoTime(); startTime = System.nanoTime();
jobStartTime = System.currentTimeMillis();
volumeCreationTime = new AtomicLong(); volumeCreationTime = new AtomicLong();
bucketCreationTime = new AtomicLong(); bucketCreationTime = new AtomicLong();
keyCreationTime = new AtomicLong(); keyCreationTime = new AtomicLong();
@ -185,6 +199,12 @@ public final class Corona extends Configured implements Tool {
usage(); usage();
return 0; return 0;
} }
threadThroughput = Collections.synchronizedList(new ArrayList<Double>());
keyValue =
DFSUtil.string2Bytes(RandomStringUtils.randomAscii(keySize - 36));
LOG.info("Number of Threads: " + numOfThreads); LOG.info("Number of Threads: " + numOfThreads);
processor = Executors.newFixedThreadPool(Integer.parseInt(numOfThreads)); processor = Executors.newFixedThreadPool(Integer.parseInt(numOfThreads));
addShutdownHook(); addShutdownHook();
@ -251,6 +271,11 @@ public final class Corona extends Configured implements Tool {
"data written into ozone, only subset of data is validated."); "data written into ozone, only subset of data is validated.");
Option optValidateWrite = OptionBuilder.create(VALIDATE_WRITE); Option optValidateWrite = OptionBuilder.create(VALIDATE_WRITE);
OptionBuilder.withDescription("directory where json is created");
OptionBuilder.hasArg();
Option optJsonDir = OptionBuilder.create(JSON_WRITE_DIRECTORY);
OptionBuilder.withArgName("value"); OptionBuilder.withArgName("value");
OptionBuilder.hasArg(); OptionBuilder.hasArg();
OptionBuilder.withDescription("number of threads to be launched " + OptionBuilder.withDescription("number of threads to be launched " +
@ -291,6 +316,7 @@ public final class Corona extends Configured implements Tool {
options.addOption(optMode); options.addOption(optMode);
options.addOption(optSource); options.addOption(optSource);
options.addOption(optValidateWrite); options.addOption(optValidateWrite);
options.addOption(optJsonDir);
options.addOption(optNumOfThreads); options.addOption(optNumOfThreads);
options.addOption(optNumOfVolumes); options.addOption(optNumOfVolumes);
options.addOption(optNumOfBuckets); options.addOption(optNumOfBuckets);
@ -303,33 +329,37 @@ public final class Corona extends Configured implements Tool {
private void parseOptions(CommandLine cmdLine) { private void parseOptions(CommandLine cmdLine) {
printUsage = cmdLine.hasOption(HELP); printUsage = cmdLine.hasOption(HELP);
mode = cmdLine.hasOption(MODE) ? mode = cmdLine.getOptionValue(MODE, MODE_DEFAULT);
cmdLine.getOptionValue(MODE) : MODE_DEFAULT;
source = cmdLine.hasOption(SOURCE) ? source = cmdLine.getOptionValue(SOURCE, SOURCE_DEFAULT);
cmdLine.getOptionValue(SOURCE) : SOURCE_DEFAULT;
numOfThreads = cmdLine.hasOption(NUM_OF_THREADS) ? numOfThreads =
cmdLine.getOptionValue(NUM_OF_THREADS) : NUM_OF_THREADS_DEFAULT; cmdLine.getOptionValue(NUM_OF_THREADS, NUM_OF_THREADS_DEFAULT);
validateWrites = cmdLine.hasOption(VALIDATE_WRITE); validateWrites = cmdLine.hasOption(VALIDATE_WRITE);
numOfVolumes = cmdLine.hasOption(NUM_OF_VOLUMES) ? jsonDir = cmdLine.getOptionValue(JSON_WRITE_DIRECTORY);
cmdLine.getOptionValue(NUM_OF_VOLUMES) : NUM_OF_VOLUMES_DEFAULT;
numOfBuckets = cmdLine.hasOption(NUM_OF_BUCKETS) ? numOfVolumes =
cmdLine.getOptionValue(NUM_OF_BUCKETS) : NUM_OF_BUCKETS_DEFAULT; cmdLine.getOptionValue(NUM_OF_VOLUMES, NUM_OF_VOLUMES_DEFAULT);
numOfKeys = cmdLine.hasOption(NUM_OF_KEYS) ? numOfBuckets =
cmdLine.getOptionValue(NUM_OF_KEYS) : NUM_OF_KEYS_DEFAULT; cmdLine.getOptionValue(NUM_OF_BUCKETS, NUM_OF_BUCKETS_DEFAULT);
numOfKeys = cmdLine.getOptionValue(NUM_OF_KEYS, NUM_OF_KEYS_DEFAULT);
keySize = cmdLine.hasOption(KEY_SIZE) ? keySize = cmdLine.hasOption(KEY_SIZE) ?
Integer.parseInt(cmdLine.getOptionValue(KEY_SIZE)) : KEY_SIZE_DEFAULT; Integer.parseInt(cmdLine.getOptionValue(KEY_SIZE)) : KEY_SIZE_DEFAULT;
if (keySize < 1024) {
throw new IllegalArgumentException(
"keySize can not be less than 1024 bytes");
}
useRatis = cmdLine.hasOption(RATIS); useRatis = cmdLine.hasOption(RATIS);
//To-do if replication factor is not mentioned throw an exception //To-do if replication factor is not mentioned throw an exception
replicationFactor = useRatis ? replicationFactor =
Integer.parseInt(cmdLine.getOptionValue(RATIS)) : 0; useRatis ? Integer.parseInt(cmdLine.getOptionValue(RATIS)) : 0;
} }
private void usage() { private void usage() {
@ -339,6 +369,8 @@ public final class Corona extends Configured implements Tool {
System.out.println("-validateWrites " System.out.println("-validateWrites "
+ "do random validation of data written into ozone, " + + "do random validation of data written into ozone, " +
"only subset of data is validated."); "only subset of data is validated.");
System.out.println("-jsonDir "
+ "directory where json is created.");
System.out.println("-mode [online | offline] " System.out.println("-mode [online | offline] "
+ "specifies the mode in which Corona should run."); + "specifies the mode in which Corona should run.");
System.out.println("-source <url> " System.out.println("-source <url> "
@ -394,76 +426,60 @@ public final class Corona extends Configured implements Tool {
int threadCount = Integer.parseInt(numOfThreads); int threadCount = Integer.parseInt(numOfThreads);
long endTime = System.nanoTime() - startTime; long endTime = System.nanoTime() - startTime;
String execTime = String.format("%02d:%02d:%02d", String execTime = DurationFormatUtils
TimeUnit.NANOSECONDS.toHours(endTime), .formatDuration(TimeUnit.NANOSECONDS.toMillis(endTime),
TimeUnit.NANOSECONDS.toMinutes(endTime) - DURATION_FORMAT);
TimeUnit.HOURS.toMinutes( String prettyTotalVolumeTime = DurationFormatUtils
TimeUnit.NANOSECONDS.toHours(endTime)), .formatDuration(TimeUnit.NANOSECONDS.toMillis(volumeCreationTime.get()),
TimeUnit.NANOSECONDS.toSeconds(endTime) - DURATION_FORMAT);
TimeUnit.MINUTES.toSeconds( String prettyTotalBucketTime = DurationFormatUtils
TimeUnit.NANOSECONDS.toMinutes(endTime))); .formatDuration(TimeUnit.NANOSECONDS.toMillis(bucketCreationTime.get()),
DURATION_FORMAT);
String prettyTotalKeyCreationTime = DurationFormatUtils
.formatDuration(TimeUnit.NANOSECONDS.toMillis(keyCreationTime.get()),
DURATION_FORMAT);
String prettyTotalKeyWriteTime = DurationFormatUtils
.formatDuration(TimeUnit.NANOSECONDS.toMillis(keyWriteTime.get()),
DURATION_FORMAT);
long volumeTime = volumeCreationTime.longValue(); long volumeTime =
String prettyVolumeTime = String.format("%02d:%02d:%02d:%02d", TimeUnit.NANOSECONDS.toMillis(volumeCreationTime.get()) / threadCount;
TimeUnit.NANOSECONDS.toHours(volumeTime), String prettyAverageVolumeTime =
TimeUnit.NANOSECONDS.toMinutes(volumeTime) - DurationFormatUtils.formatDuration(volumeTime, DURATION_FORMAT);
TimeUnit.HOURS.toMinutes(
TimeUnit.NANOSECONDS.toHours(volumeTime)),
TimeUnit.NANOSECONDS.toSeconds(volumeTime) -
TimeUnit.MINUTES.toSeconds(
TimeUnit.NANOSECONDS.toMinutes(volumeTime)),
TimeUnit.NANOSECONDS.toMillis(volumeTime) -
TimeUnit.SECONDS.toMillis(
TimeUnit.NANOSECONDS.toSeconds(volumeTime)));
long bucketTime = bucketCreationTime.longValue() / threadCount; long bucketTime =
String prettyBucketTime = String.format("%02d:%02d:%02d:%02d", TimeUnit.NANOSECONDS.toMillis(bucketCreationTime.get()) / threadCount;
TimeUnit.NANOSECONDS.toHours(bucketTime), String prettyAverageBucketTime =
TimeUnit.NANOSECONDS.toMinutes(bucketTime) - DurationFormatUtils.formatDuration(bucketTime, DURATION_FORMAT);
TimeUnit.HOURS.toMinutes(
TimeUnit.NANOSECONDS.toHours(bucketTime)),
TimeUnit.NANOSECONDS.toSeconds(bucketTime) -
TimeUnit.MINUTES.toSeconds(
TimeUnit.NANOSECONDS.toMinutes(bucketTime)),
TimeUnit.NANOSECONDS.toMillis(bucketTime) -
TimeUnit.SECONDS.toMillis(
TimeUnit.NANOSECONDS.toSeconds(bucketTime)));
long totalKeyCreationTime = keyCreationTime.longValue() / threadCount; long averageKeyCreationTime =
String prettyKeyCreationTime = String.format("%02d:%02d:%02d:%02d", TimeUnit.NANOSECONDS.toMillis(keyCreationTime.get()) / threadCount;
TimeUnit.NANOSECONDS.toHours(totalKeyCreationTime), String prettyAverageKeyCreationTime = DurationFormatUtils
TimeUnit.NANOSECONDS.toMinutes(totalKeyCreationTime) - .formatDuration(averageKeyCreationTime, DURATION_FORMAT);
TimeUnit.HOURS.toMinutes(
TimeUnit.NANOSECONDS.toHours(totalKeyCreationTime)),
TimeUnit.NANOSECONDS.toSeconds(totalKeyCreationTime) -
TimeUnit.MINUTES.toSeconds(
TimeUnit.NANOSECONDS.toMinutes(totalKeyCreationTime)),
TimeUnit.NANOSECONDS.toMillis(totalKeyCreationTime) -
TimeUnit.SECONDS.toMillis(
TimeUnit.NANOSECONDS.toSeconds(totalKeyCreationTime)));
long totalKeyWriteTime = keyWriteTime.longValue() / threadCount; long averageKeyWriteTime =
String prettyKeyWriteTime = String.format("%02d:%02d:%02d:%02d", TimeUnit.NANOSECONDS.toMillis(keyWriteTime.get()) / threadCount;
TimeUnit.NANOSECONDS.toHours(totalKeyWriteTime), String prettyAverageKeyWriteTime = DurationFormatUtils
TimeUnit.NANOSECONDS.toMinutes(totalKeyWriteTime) - .formatDuration(averageKeyWriteTime, DURATION_FORMAT);
TimeUnit.HOURS.toMinutes(
TimeUnit.NANOSECONDS.toHours(totalKeyWriteTime)),
TimeUnit.NANOSECONDS.toSeconds(totalKeyWriteTime) -
TimeUnit.MINUTES.toSeconds(
TimeUnit.NANOSECONDS.toMinutes(totalKeyWriteTime)),
TimeUnit.NANOSECONDS.toMillis(totalKeyWriteTime) -
TimeUnit.SECONDS.toMillis(
TimeUnit.NANOSECONDS.toSeconds(totalKeyWriteTime)));
out.println(); out.println();
out.println("***************************************************"); out.println("***************************************************");
out.println("Git Base Revision: " + VersionInfo.getRevision());
out.println("Number of Volumes created: " + numberOfVolumesCreated); out.println("Number of Volumes created: " + numberOfVolumesCreated);
out.println("Number of Buckets created: " + numberOfBucketsCreated); out.println("Number of Buckets created: " + numberOfBucketsCreated);
out.println("Number of Keys added: " + numberOfKeysAdded); out.println("Number of Keys added: " + numberOfKeysAdded);
out.println("Time spent in volume creation: " + prettyVolumeTime); out.println("Time spent in volume creation: " + prettyTotalVolumeTime);
out.println("Time spent in bucket creation: " + prettyBucketTime); out.println("Time spent in bucket creation: " + prettyTotalBucketTime);
out.println("Time spent in key creation: " + prettyKeyCreationTime); out.println("Time spent in key creation: " + prettyTotalKeyCreationTime);
out.println("Time spent in writing keys: " + prettyKeyWriteTime); out.println("Time spent in key write: " + prettyTotalKeyWriteTime);
out.println(
"Average Time spent in volume creation: " + prettyAverageVolumeTime);
out.println(
"Average Time spent in bucket creation: " + prettyAverageBucketTime);
out.println(
"Average Time spent in key creation: " + prettyAverageKeyCreationTime);
out.println(
"Average Time spent in key write: " + prettyAverageKeyWriteTime);
out.println("Total bytes written: " + totalBytesWritten); out.println("Total bytes written: " + totalBytesWritten);
if (validateWrites) { if (validateWrites) {
out.println("Total number of writes validated: " + out.println("Total number of writes validated: " +
@ -478,6 +494,46 @@ public final class Corona extends Configured implements Tool {
} }
out.println("Total Execution time: " + execTime); out.println("Total Execution time: " + execTime);
out.println("***************************************************"); out.println("***************************************************");
if (jsonDir != null) {
CoronaJobInfo jobInfo = new CoronaJobInfo()
.setExecTime(execTime)
.setGitBaseRevision(VersionInfo.getRevision())
.setAverageVolumeCreationTime(prettyAverageVolumeTime)
.setAverageBucketCreationTime(prettyAverageBucketTime)
.setAverageKeyCreationTime(prettyAverageKeyCreationTime)
.setAverageKeyWriteTime(prettyAverageKeyWriteTime)
.setTotalVolumeCreationTime(prettyTotalVolumeTime)
.setTotalBucketCreationTime(prettyTotalBucketTime)
.setTotalKeyCreationTime(prettyTotalKeyCreationTime)
.setTotalKeyWriteTime(prettyTotalKeyWriteTime);
String jsonName =
new SimpleDateFormat("yyyyMMddHHmmss").format(Time.now()) + ".json";
String jsonPath = jsonDir + "/" + jsonName;
FileOutputStream os = null;
try {
os = new FileOutputStream(jsonPath);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.FIELD,
JsonAutoDetect.Visibility.ANY);
ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
writer.writeValue(os, jobInfo);
} catch (FileNotFoundException e) {
out.println("Json File could not be created for the path: " + jsonPath);
out.println(e);
} catch (IOException e) {
out.println("Json object could not be created");
out.println(e);
} finally {
try {
if (os != null) {
os.close();
}
} catch (IOException e) {
LOG.warn("Could not close the output stream for json", e);
}
}
}
} }
/** /**
@ -543,6 +599,15 @@ public final class Corona extends Configured implements Tool {
return writeValidationFailureCount; return writeValidationFailureCount;
} }
/**
* Returns the length of the common key value initialized.
* @return key value length initialized.
*/
@VisibleForTesting
long getKeyValueLength(){
return keyValue.length;
}
/** /**
* Wrapper to hold ozone key-value pair. * Wrapper to hold ozone key-value pair.
*/ */
@ -602,8 +667,9 @@ public final class Corona extends Configured implements Tool {
factor = replicationFactor != 0 ? factor = replicationFactor != 0 ?
OzoneProtos.ReplicationFactor.valueOf(replicationFactor) : OzoneProtos.ReplicationFactor.valueOf(replicationFactor) :
OzoneProtos.ReplicationFactor.THREE; OzoneProtos.ReplicationFactor.THREE;
} }
Long threadKeyWriteTime = 0L;
for (int j = 0; j < totalBuckets; j++) { for (int j = 0; j < totalBuckets; j++) {
String bucketName = "bucket-" + j + "-" + String bucketName = "bucket-" + j + "-" +
RandomStringUtils.randomNumeric(5); RandomStringUtils.randomNumeric(5);
@ -618,22 +684,24 @@ public final class Corona extends Configured implements Tool {
for (int k = 0; k < totalKeys; k++) { for (int k = 0; k < totalKeys; k++) {
String key = "key-" + k + "-" + String key = "key-" + k + "-" +
RandomStringUtils.randomNumeric(5); RandomStringUtils.randomNumeric(5);
byte[] value = DFSUtil.string2Bytes( byte[] randomValue =
RandomStringUtils.randomAscii(keySize)); DFSUtil.string2Bytes(UUID.randomUUID().toString());
try { try {
LOG.trace("Adding key: {} in bucket: {} of volume: {}", LOG.trace("Adding key: {} in bucket: {} of volume: {}",
key, bucket, volume); key, bucket, volume);
long keyCreateStart = System.nanoTime(); long keyCreateStart = System.nanoTime();
OzoneOutputStream os = bucket.createKey(key, value.length, OzoneOutputStream os =
type, factor); bucket.createKey(key, keySize, type, factor);
keyCreationTime.getAndAdd(System.nanoTime() - keyCreateStart); keyCreationTime.getAndAdd(System.nanoTime() - keyCreateStart);
long keyWriteStart = System.nanoTime(); long keyWriteStart = System.nanoTime();
os.write(value); os.write(keyValue);
os.write(randomValue);
os.close(); os.close();
keyWriteTime.getAndAdd(System.nanoTime() - keyWriteStart); threadKeyWriteTime += System.nanoTime() - keyWriteStart;
totalBytesWritten.getAndAdd(value.length); totalBytesWritten.getAndAdd(keySize);
numberOfKeysAdded.getAndIncrement(); numberOfKeysAdded.getAndIncrement();
if (validateWrites) { if (validateWrites) {
byte[] value = ArrayUtils.addAll(keyValue, randomValue);
boolean validate = validationQueue.offer( boolean validate = validationQueue.offer(
new KeyValue(bucket, key, value)); new KeyValue(bucket, key, value));
if (validate) { if (validate) {
@ -652,7 +720,238 @@ public final class Corona extends Configured implements Tool {
" in volume: {}.", bucketName, volume, e); " in volume: {}.", bucketName, volume, e);
} }
} }
keyWriteTime.getAndAdd(threadKeyWriteTime);
boolean success = threadThroughput.add(
(totalBuckets * totalKeys * keySize * 1.0) / TimeUnit.NANOSECONDS
.toSeconds(threadKeyWriteTime));
if (!success) {
LOG.warn("Throughput could not be added for thread id: {}",
Thread.currentThread().getId());
}
} }
}
private final class CoronaJobInfo {
private String gitBaseRevision;
private String jobStartTime;
private String numOfVolumes;
private String numOfBuckets;
private String numOfKeys;
private String numOfThreads;
private String mode;
private String totalBucketCreationTime;
private String totalVolumeCreationTime;
private String totalKeyCreationTime;
private String totalKeyWriteTime;
private String averageBucketCreationTime;
private String averageVolumeCreationTime;
private String averageKeyCreationTime;
private String averageKeyWriteTime;
private String dataWritten;
private String execTime;
private int keySize;
private String[] threadThroughputPerSecond;
private String minThreadThroughputPerSecond;
private String maxThreadThroughputPerSecond;
private String totalThroughputPerSecond;
private CoronaJobInfo() {
this.numOfVolumes = Corona.this.numOfVolumes;
this.numOfBuckets = Corona.this.numOfBuckets;
this.numOfKeys = Corona.this.numOfKeys;
this.numOfThreads = Corona.this.numOfThreads;
this.keySize = Corona.this.keySize;
this.mode = Corona.this.mode;
this.jobStartTime = Time.formatTime(Corona.this.jobStartTime);
long totalBytes =
Long.parseLong(numOfVolumes) * Long.parseLong(numOfBuckets) * Long
.parseLong(numOfKeys) * keySize;
this.dataWritten = getInStorageUnits((double) totalBytes);
threadThroughputPerSecond = new String[Integer.parseInt(numOfThreads)];
double minThreadThroughput = Double.MAX_VALUE, maxThreadThroughput = 0.0,
totalThroughput = 0.0;
int i = 0;
for (Double throughput : Corona.this.threadThroughput) {
minThreadThroughput = min(throughput, minThreadThroughput);
maxThreadThroughput = max(throughput, maxThreadThroughput);
totalThroughput += throughput;
threadThroughputPerSecond[i++] = getInStorageUnits(throughput);
}
minThreadThroughputPerSecond = getInStorageUnits(minThreadThroughput);
maxThreadThroughputPerSecond = getInStorageUnits(maxThreadThroughput);
totalThroughputPerSecond = getInStorageUnits(totalThroughput);
}
private String getInStorageUnits(Double value) {
double size;
OzoneQuota.Units unit;
if ((long) (value / OzoneConsts.KB) == 0) {
size = value / OzoneConsts.KB;
unit = OzoneQuota.Units.KB;
} else if ((long) (value / OzoneConsts.MB) == 0) {
size = value / OzoneConsts.MB;
unit = OzoneQuota.Units.MB;
} else if ((long) (value / OzoneConsts.GB) == 0) {
size = value / OzoneConsts.GB;
unit = OzoneQuota.Units.GB;
} else if ((long) (value / OzoneConsts.TB) == 0) {
size = value / OzoneConsts.TB;
unit = OzoneQuota.Units.TB;
} else {
size = value;
unit = OzoneQuota.Units.BYTES;
}
return size + " " + unit;
}
public CoronaJobInfo setGitBaseRevision(String gitBaseRevisionVal) {
gitBaseRevision = gitBaseRevisionVal;
return this;
}
public CoronaJobInfo setTotalBucketCreationTime(
String totalBucketCreationTimeVal) {
totalBucketCreationTime = totalBucketCreationTimeVal;
return this;
}
public CoronaJobInfo setTotalVolumeCreationTime(
String totalVolumeCreationTimeVal) {
totalVolumeCreationTime = totalVolumeCreationTimeVal;
return this;
}
public CoronaJobInfo setTotalKeyCreationTime(
String totalKeyCreationTimeVal) {
totalKeyCreationTime = totalKeyCreationTimeVal;
return this;
}
public CoronaJobInfo setTotalKeyWriteTime(String totalKeyWriteTimeVal) {
totalKeyWriteTime = totalKeyWriteTimeVal;
return this;
}
public CoronaJobInfo setAverageBucketCreationTime(
String averageBucketCreationTimeVal) {
averageBucketCreationTime = averageBucketCreationTimeVal;
return this;
}
public CoronaJobInfo setAverageVolumeCreationTime(
String averageVolumeCreationTimeVal) {
averageVolumeCreationTime = averageVolumeCreationTimeVal;
return this;
}
public CoronaJobInfo setAverageKeyCreationTime(
String averageKeyCreationTimeVal) {
averageKeyCreationTime = averageKeyCreationTimeVal;
return this;
}
public CoronaJobInfo setAverageKeyWriteTime(
String averageKeyWriteTimeVal) {
averageKeyWriteTime = averageKeyWriteTimeVal;
return this;
}
public CoronaJobInfo setExecTime(String execTimeVal) {
execTime = execTimeVal;
return this;
}
public String getJobStartTime() {
return jobStartTime;
}
public String getNumOfVolumes() {
return numOfVolumes;
}
public String getNumOfBuckets() {
return numOfBuckets;
}
public String getNumOfKeys() {
return numOfKeys;
}
public String getNumOfThreads() {
return numOfThreads;
}
public String getMode() {
return mode;
}
public String getTotalBucketCreationTime() {
return totalBucketCreationTime;
}
public String getTotalVolumeCreationTime() {
return totalVolumeCreationTime;
}
public String getTotalKeyCreationTime() {
return totalKeyCreationTime;
}
public String getAverageBucketCreationTime() {
return averageBucketCreationTime;
}
public String getTotalKeyWriteTime() {
return totalKeyWriteTime;
}
public String getAverageKeyWriteTime() {
return averageKeyWriteTime;
}
public String getAverageVolumeCreationTime() {
return averageVolumeCreationTime;
}
public String getAverageKeyCreationTime() {
return averageKeyCreationTime;
}
public String getExecTime() {
return execTime;
}
public int getKeySize() {
return keySize;
}
public String getGitBaseRevision() {
return gitBaseRevision;
}
public String getMinThreadThroughputPerSecond() {
return minThreadThroughputPerSecond;
}
public String getMaxThreadThroughputPerSecond() {
return maxThreadThroughputPerSecond;
}
public String getDataWritten() {
return dataWritten;
}
public String getTotalThroughput() {
return totalThroughputPerSecond;
}
} }
private class ProgressBar implements Runnable { private class ProgressBar implements Runnable {

View File

@ -84,6 +84,7 @@ public class TestCorona {
Assert.assertEquals(2, corona.getNumberOfVolumesCreated()); Assert.assertEquals(2, corona.getNumberOfVolumesCreated());
Assert.assertEquals(10, corona.getNumberOfBucketsCreated()); Assert.assertEquals(10, corona.getNumberOfBucketsCreated());
Assert.assertEquals(99, corona.getNumberOfKeysAdded()); Assert.assertEquals(99, corona.getNumberOfKeysAdded());
Assert.assertEquals(10240 - 36, corona.getKeyValueLength());
Assert.assertEquals(0, res); Assert.assertEquals(0, res);
} }