HDFS-12678. Ozone: Corona: Add statistical information to json output. Contributed by Lokesh Jain.

This commit is contained in:
Nanda kumar 2017-10-27 00:28:50 +05:30 committed by Owen O'Malley
parent 1b56a73a4f
commit 80357c0eee

View File

@ -17,6 +17,9 @@
package org.apache.hadoop.ozone.tools;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.UniformReservoir;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -57,7 +60,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import static java.lang.Math.max;
import static java.lang.Math.min;
/**
@ -93,6 +95,13 @@
*/
public final class Corona extends Configured implements Tool {
enum CoronaOps {
VOLUME_CREATE,
BUCKET_CREATE,
KEY_CREATE,
KEY_WRITE
}
private static final String HELP = "help";
private static final String MODE = "mode";
private static final String SOURCE = "source";
@ -116,6 +125,7 @@ public final class Corona extends Configured implements Tool {
private static final String DURATION_FORMAT = "HH:mm:ss,SSS";
private static final int KEY_SIZE_DEFAULT = 10240;
private static final int QUANTILES = 10;
private static final Logger LOG =
LoggerFactory.getLogger(Corona.class);
@ -135,6 +145,7 @@ public final class Corona extends Configured implements Tool {
private OzoneProtos.ReplicationType type;
private OzoneProtos.ReplicationFactor factor;
private int threadPoolSize;
private int keySize;
private byte[] keyValue = null;
@ -163,7 +174,7 @@ public final class Corona extends Configured implements Tool {
private Long writeValidationFailureCount;
private BlockingQueue<KeyValue> validationQueue;
private List<Double> threadThroughput;
private ArrayList<Histogram> histograms = new ArrayList<>();
@VisibleForTesting
Corona(Configuration conf) throws IOException {
@ -180,6 +191,9 @@ public final class Corona extends Configured implements Tool {
OzoneClientFactory.setConfiguration(conf);
ozoneClient = OzoneClientFactory.getClient();
objectStore = ozoneClient.getObjectStore();
for (CoronaOps ops : CoronaOps.values()) {
histograms.add(ops.ordinal(), new Histogram(new UniformReservoir()));
}
}
/**
@ -201,13 +215,13 @@ public int run(String[] args) throws Exception {
return 0;
}
threadThroughput = Collections.synchronizedList(new ArrayList<Double>());
keyValue =
DFSUtil.string2Bytes(RandomStringUtils.randomAscii(keySize - 36));
LOG.info("Number of Threads: " + numOfThreads);
processor = Executors.newFixedThreadPool(Integer.parseInt(numOfThreads));
threadPoolSize =
min(Integer.parseInt(numOfVolumes), Integer.parseInt(numOfThreads));
processor = Executors.newFixedThreadPool(threadPoolSize);
addShutdownHook();
if (mode.equals("online")) {
LOG.info("Mode: online");
@ -439,42 +453,29 @@ private Thread getProgressBarThread() {
* @param out PrintStream
*/
private void printStats(PrintStream out) {
int threadCount = Integer.parseInt(numOfThreads);
long endTime = System.nanoTime() - startTime;
String execTime = DurationFormatUtils
.formatDuration(TimeUnit.NANOSECONDS.toMillis(endTime),
DURATION_FORMAT);
String prettyTotalVolumeTime = DurationFormatUtils
.formatDuration(TimeUnit.NANOSECONDS.toMillis(volumeCreationTime.get()),
DURATION_FORMAT);
String prettyTotalBucketTime = DurationFormatUtils
.formatDuration(TimeUnit.NANOSECONDS.toMillis(bucketCreationTime.get()),
DURATION_FORMAT);
String prettyTotalKeyCreationTime = DurationFormatUtils
.formatDuration(TimeUnit.NANOSECONDS.toMillis(keyCreationTime.get()),
DURATION_FORMAT);
String prettyTotalKeyWriteTime = DurationFormatUtils
.formatDuration(TimeUnit.NANOSECONDS.toMillis(keyWriteTime.get()),
DURATION_FORMAT);
long volumeTime =
TimeUnit.NANOSECONDS.toMillis(volumeCreationTime.get()) / threadCount;
long volumeTime = TimeUnit.NANOSECONDS.toMillis(volumeCreationTime.get())
/ threadPoolSize;
String prettyAverageVolumeTime =
DurationFormatUtils.formatDuration(volumeTime, DURATION_FORMAT);
long bucketTime =
TimeUnit.NANOSECONDS.toMillis(bucketCreationTime.get()) / threadCount;
long bucketTime = TimeUnit.NANOSECONDS.toMillis(bucketCreationTime.get())
/ threadPoolSize;
String prettyAverageBucketTime =
DurationFormatUtils.formatDuration(bucketTime, DURATION_FORMAT);
long averageKeyCreationTime =
TimeUnit.NANOSECONDS.toMillis(keyCreationTime.get()) / threadCount;
TimeUnit.NANOSECONDS.toMillis(keyCreationTime.get())
/ threadPoolSize;
String prettyAverageKeyCreationTime = DurationFormatUtils
.formatDuration(averageKeyCreationTime, DURATION_FORMAT);
long averageKeyWriteTime =
TimeUnit.NANOSECONDS.toMillis(keyWriteTime.get()) / threadCount;
TimeUnit.NANOSECONDS.toMillis(keyWriteTime.get()) / threadPoolSize;
String prettyAverageKeyWriteTime = DurationFormatUtils
.formatDuration(averageKeyWriteTime, DURATION_FORMAT);
@ -487,10 +488,6 @@ private void printStats(PrintStream out) {
out.println("Number of Keys added: " + numberOfKeysAdded);
out.println("Ratis replication factor: " + factor.name());
out.println("Ratis replication type: " + type.name());
out.println("Time spent in volume creation: " + prettyTotalVolumeTime);
out.println("Time spent in bucket creation: " + prettyTotalBucketTime);
out.println("Time spent in key creation: " + prettyTotalKeyCreationTime);
out.println("Time spent in key write: " + prettyTotalKeyWriteTime);
out.println(
"Average Time spent in volume creation: " + prettyAverageVolumeTime);
out.println(
@ -515,17 +512,47 @@ private void printStats(PrintStream out) {
out.println("***************************************************");
if (jsonDir != null) {
CoronaJobInfo jobInfo = new CoronaJobInfo()
.setExecTime(execTime)
String[][] quantileTime =
new String[CoronaOps.values().length][QUANTILES + 1];
String[] deviations = new String[CoronaOps.values().length];
String[] means = new String[CoronaOps.values().length];
for (CoronaOps ops : CoronaOps.values()) {
Snapshot snapshot = histograms.get(ops.ordinal()).getSnapshot();
for (int i = 0; i <= QUANTILES; i++) {
quantileTime[ops.ordinal()][i] = DurationFormatUtils.formatDuration(
TimeUnit.NANOSECONDS
.toMillis((long) snapshot.getValue((1.0 / QUANTILES) * i)),
DURATION_FORMAT);
}
deviations[ops.ordinal()] = DurationFormatUtils.formatDuration(
TimeUnit.NANOSECONDS.toMillis((long) snapshot.getStdDev()),
DURATION_FORMAT);
means[ops.ordinal()] = DurationFormatUtils.formatDuration(
TimeUnit.NANOSECONDS.toMillis((long) snapshot.getMean()),
DURATION_FORMAT);
}
CoronaJobInfo jobInfo = new CoronaJobInfo().setExecTime(execTime)
.setGitBaseRevision(VersionInfo.getRevision())
.setAverageVolumeCreationTime(prettyAverageVolumeTime)
.setAverageBucketCreationTime(prettyAverageBucketTime)
.setAverageKeyCreationTime(prettyAverageKeyCreationTime)
.setAverageKeyWriteTime(prettyAverageKeyWriteTime)
.setTotalVolumeCreationTime(prettyTotalVolumeTime)
.setTotalBucketCreationTime(prettyTotalBucketTime)
.setTotalKeyCreationTime(prettyTotalKeyCreationTime)
.setTotalKeyWriteTime(prettyTotalKeyWriteTime);
.setMeanVolumeCreateTime(means[CoronaOps.VOLUME_CREATE.ordinal()])
.setDeviationVolumeCreateTime(
deviations[CoronaOps.VOLUME_CREATE.ordinal()])
.setTenQuantileVolumeCreateTime(
quantileTime[CoronaOps.VOLUME_CREATE.ordinal()])
.setMeanBucketCreateTime(means[CoronaOps.BUCKET_CREATE.ordinal()])
.setDeviationBucketCreateTime(
deviations[CoronaOps.BUCKET_CREATE.ordinal()])
.setTenQuantileBucketCreateTime(
quantileTime[CoronaOps.BUCKET_CREATE.ordinal()])
.setMeanKeyCreateTime(means[CoronaOps.KEY_CREATE.ordinal()])
.setDeviationKeyCreateTime(deviations[CoronaOps.KEY_CREATE.ordinal()])
.setTenQuantileKeyCreateTime(
quantileTime[CoronaOps.KEY_CREATE.ordinal()])
.setMeanKeyWriteTime(means[CoronaOps.KEY_WRITE.ordinal()])
.setDeviationKeyWriteTime(deviations[CoronaOps.KEY_WRITE.ordinal()])
.setTenQuantileKeyWriteTime(
quantileTime[CoronaOps.KEY_WRITE.ordinal()]);
String jsonName =
new SimpleDateFormat("yyyyMMddHHmmss").format(Time.now()) + ".json";
String jsonPath = jsonDir + "/" + jsonName;
@ -662,21 +689,33 @@ private class OfflineProcessor implements Runnable {
private int totalBuckets;
private int totalKeys;
private OzoneVolume volume;
private String volumeName;
OfflineProcessor(String volumeName) throws Exception {
OfflineProcessor(String volumeName) {
this.totalBuckets = Integer.parseInt(numOfBuckets);
this.totalKeys = Integer.parseInt(numOfKeys);
LOG.trace("Creating volume: {}", volumeName);
long start = System.nanoTime();
objectStore.createVolume(volumeName);
volumeCreationTime.getAndAdd(System.nanoTime() - start);
numberOfVolumesCreated.getAndIncrement();
volume = objectStore.getVolume(volumeName);
this.volumeName = volumeName;
}
@Override
public void run() {
LOG.trace("Creating volume: {}", volumeName);
long start = System.nanoTime();
OzoneVolume volume;
try {
objectStore.createVolume(volumeName);
long volumeCreationDuration = System.nanoTime() - start;
volumeCreationTime.getAndAdd(volumeCreationDuration);
histograms.get(CoronaOps.VOLUME_CREATE.ordinal())
.update(volumeCreationDuration);
numberOfVolumesCreated.getAndIncrement();
volume = objectStore.getVolume(volumeName);
} catch (IOException e) {
exception = true;
LOG.error("Could not create volume", e);
return;
}
Long threadKeyWriteTime = 0L;
for (int j = 0; j < totalBuckets; j++) {
String bucketName = "bucket-" + j + "-" +
@ -684,9 +723,12 @@ public void run() {
try {
LOG.trace("Creating bucket: {} in volume: {}",
bucketName, volume.getName());
long start = System.nanoTime();
start = System.nanoTime();
volume.createBucket(bucketName);
bucketCreationTime.getAndAdd(System.nanoTime() - start);
long bucketCreationDuration = System.nanoTime() - start;
histograms.get(CoronaOps.BUCKET_CREATE.ordinal())
.update(bucketCreationDuration);
bucketCreationTime.getAndAdd(bucketCreationDuration);
numberOfBucketsCreated.getAndIncrement();
OzoneBucket bucket = volume.getBucket(bucketName);
for (int k = 0; k < totalKeys; k++) {
@ -700,12 +742,18 @@ public void run() {
long keyCreateStart = System.nanoTime();
OzoneOutputStream os =
bucket.createKey(key, keySize, type, factor);
keyCreationTime.getAndAdd(System.nanoTime() - keyCreateStart);
long keyCreationDuration = System.nanoTime() - keyCreateStart;
histograms.get(CoronaOps.KEY_CREATE.ordinal())
.update(keyCreationDuration);
keyCreationTime.getAndAdd(keyCreationDuration);
long keyWriteStart = System.nanoTime();
os.write(keyValue);
os.write(randomValue);
os.close();
threadKeyWriteTime += System.nanoTime() - keyWriteStart;
long keyWriteDuration = System.nanoTime() - keyWriteStart;
threadKeyWriteTime += keyWriteDuration;
histograms.get(CoronaOps.KEY_WRITE.ordinal())
.update(keyWriteDuration);
totalBytesWritten.getAndAdd(keySize);
numberOfKeysAdded.getAndIncrement();
if (validateWrites) {
@ -730,13 +778,6 @@ public void run() {
}
keyWriteTime.getAndAdd(threadKeyWriteTime);
boolean success = threadThroughput.add(
(totalBuckets * totalKeys * keySize * 1.0) / TimeUnit.NANOSECONDS
.toSeconds(threadKeyWriteTime));
if (!success) {
LOG.warn("Throughput could not be added for thread id: {}",
Thread.currentThread().getId());
}
}
}
@ -751,14 +792,6 @@ private final class CoronaJobInfo {
private String numOfKeys;
private String numOfThreads;
private String mode;
private String totalBucketCreationTime;
private String totalVolumeCreationTime;
private String totalKeyCreationTime;
private String totalKeyWriteTime;
private String averageBucketCreationTime;
private String averageVolumeCreationTime;
private String averageKeyCreationTime;
private String averageKeyWriteTime;
private String dataWritten;
private String execTime;
private String replicationFactor;
@ -766,11 +799,24 @@ private final class CoronaJobInfo {
private int keySize;
private String[] threadThroughputPerSecond;
private String minThreadThroughputPerSecond;
private String maxThreadThroughputPerSecond;
private String totalThroughputPerSecond;
private String meanVolumeCreateTime;
private String deviationVolumeCreateTime;
private String[] tenQuantileVolumeCreateTime;
private String meanBucketCreateTime;
private String deviationBucketCreateTime;
private String[] tenQuantileBucketCreateTime;
private String meanKeyCreateTime;
private String deviationKeyCreateTime;
private String[] tenQuantileKeyCreateTime;
private String meanKeyWriteTime;
private String deviationKeyWriteTime;
private String[] tenQuantileKeyWriteTime;
private CoronaJobInfo() {
this.status = exception ? "Failed" : "Success";
this.numOfVolumes = Corona.this.numOfVolumes;
@ -787,37 +833,26 @@ private CoronaJobInfo() {
Long.parseLong(numOfVolumes) * Long.parseLong(numOfBuckets) * Long
.parseLong(numOfKeys) * keySize;
this.dataWritten = getInStorageUnits((double) totalBytes);
threadThroughputPerSecond = new String[Integer.parseInt(numOfThreads)];
double minThreadThroughput = Double.MAX_VALUE, maxThreadThroughput = 0.0,
totalThroughput = 0.0;
int i = 0;
for (Double throughput : Corona.this.threadThroughput) {
minThreadThroughput = min(throughput, minThreadThroughput);
maxThreadThroughput = max(throughput, maxThreadThroughput);
totalThroughput += throughput;
threadThroughputPerSecond[i++] = getInStorageUnits(throughput);
}
minThreadThroughputPerSecond = getInStorageUnits(minThreadThroughput);
maxThreadThroughputPerSecond = getInStorageUnits(maxThreadThroughput);
totalThroughputPerSecond = getInStorageUnits(totalThroughput);
this.totalThroughputPerSecond = getInStorageUnits(
(totalBytes * 1.0) / TimeUnit.NANOSECONDS
.toSeconds(Corona.this.keyWriteTime.get() / threadPoolSize));
}
private String getInStorageUnits(Double value) {
double size;
OzoneQuota.Units unit;
if ((long) (value / OzoneConsts.KB) == 0) {
size = value / OzoneConsts.KB;
unit = OzoneQuota.Units.KB;
} else if ((long) (value / OzoneConsts.MB) == 0) {
size = value / OzoneConsts.MB;
unit = OzoneQuota.Units.MB;
} else if ((long) (value / OzoneConsts.GB) == 0) {
size = value / OzoneConsts.GB;
unit = OzoneQuota.Units.GB;
} else if ((long) (value / OzoneConsts.TB) == 0) {
if ((long) (value / OzoneConsts.TB) != 0) {
size = value / OzoneConsts.TB;
unit = OzoneQuota.Units.TB;
} else if ((long) (value / OzoneConsts.GB) != 0) {
size = value / OzoneConsts.GB;
unit = OzoneQuota.Units.GB;
} else if ((long) (value / OzoneConsts.MB) != 0) {
size = value / OzoneConsts.MB;
unit = OzoneQuota.Units.MB;
} else if ((long) (value / OzoneConsts.KB) != 0) {
size = value / OzoneConsts.KB;
unit = OzoneQuota.Units.KB;
} else {
size = value;
unit = OzoneQuota.Units.BYTES;
@ -830,58 +865,81 @@ public CoronaJobInfo setGitBaseRevision(String gitBaseRevisionVal) {
return this;
}
public CoronaJobInfo setTotalBucketCreationTime(
String totalBucketCreationTimeVal) {
totalBucketCreationTime = totalBucketCreationTimeVal;
return this;
}
public CoronaJobInfo setTotalVolumeCreationTime(
String totalVolumeCreationTimeVal) {
totalVolumeCreationTime = totalVolumeCreationTimeVal;
return this;
}
public CoronaJobInfo setTotalKeyCreationTime(
String totalKeyCreationTimeVal) {
totalKeyCreationTime = totalKeyCreationTimeVal;
return this;
}
public CoronaJobInfo setTotalKeyWriteTime(String totalKeyWriteTimeVal) {
totalKeyWriteTime = totalKeyWriteTimeVal;
return this;
}
public CoronaJobInfo setAverageBucketCreationTime(
String averageBucketCreationTimeVal) {
averageBucketCreationTime = averageBucketCreationTimeVal;
return this;
}
public CoronaJobInfo setAverageVolumeCreationTime(
String averageVolumeCreationTimeVal) {
averageVolumeCreationTime = averageVolumeCreationTimeVal;
return this;
}
public CoronaJobInfo setAverageKeyCreationTime(
String averageKeyCreationTimeVal) {
averageKeyCreationTime = averageKeyCreationTimeVal;
return this;
}
public CoronaJobInfo setAverageKeyWriteTime(
String averageKeyWriteTimeVal) {
averageKeyWriteTime = averageKeyWriteTimeVal;
return this;
}
public CoronaJobInfo setExecTime(String execTimeVal) {
execTime = execTimeVal;
return this;
}
public CoronaJobInfo setMeanKeyWriteTime(String deviationKeyWriteTimeVal) {
this.meanKeyWriteTime = deviationKeyWriteTimeVal;
return this;
}
public CoronaJobInfo setDeviationKeyWriteTime(
String deviationKeyWriteTimeVal) {
this.deviationKeyWriteTime = deviationKeyWriteTimeVal;
return this;
}
public CoronaJobInfo setTenQuantileKeyWriteTime(
String[] tenQuantileKeyWriteTimeVal) {
this.tenQuantileKeyWriteTime = tenQuantileKeyWriteTimeVal;
return this;
}
public CoronaJobInfo setMeanKeyCreateTime(String deviationKeyWriteTimeVal) {
this.meanKeyCreateTime = deviationKeyWriteTimeVal;
return this;
}
public CoronaJobInfo setDeviationKeyCreateTime(
String deviationKeyCreateTimeVal) {
this.deviationKeyCreateTime = deviationKeyCreateTimeVal;
return this;
}
public CoronaJobInfo setTenQuantileKeyCreateTime(
String[] tenQuantileKeyCreateTimeVal) {
this.tenQuantileKeyCreateTime = tenQuantileKeyCreateTimeVal;
return this;
}
public CoronaJobInfo setMeanBucketCreateTime(
String deviationKeyWriteTimeVal) {
this.meanBucketCreateTime = deviationKeyWriteTimeVal;
return this;
}
public CoronaJobInfo setDeviationBucketCreateTime(
String deviationBucketCreateTimeVal) {
this.deviationBucketCreateTime = deviationBucketCreateTimeVal;
return this;
}
public CoronaJobInfo setTenQuantileBucketCreateTime(
String[] tenQuantileBucketCreateTimeVal) {
this.tenQuantileBucketCreateTime = tenQuantileBucketCreateTimeVal;
return this;
}
public CoronaJobInfo setMeanVolumeCreateTime(
String deviationKeyWriteTimeVal) {
this.meanVolumeCreateTime = deviationKeyWriteTimeVal;
return this;
}
public CoronaJobInfo setDeviationVolumeCreateTime(
String deviationVolumeCreateTimeVal) {
this.deviationVolumeCreateTime = deviationVolumeCreateTimeVal;
return this;
}
public CoronaJobInfo setTenQuantileVolumeCreateTime(
String[] tenQuantileVolumeCreateTimeVal) {
this.tenQuantileVolumeCreateTime = tenQuantileVolumeCreateTimeVal;
return this;
}
public String getJobStartTime() {
return jobStartTime;
}
@ -906,38 +964,6 @@ public String getMode() {
return mode;
}
public String getTotalBucketCreationTime() {
return totalBucketCreationTime;
}
public String getTotalVolumeCreationTime() {
return totalVolumeCreationTime;
}
public String getTotalKeyCreationTime() {
return totalKeyCreationTime;
}
public String getAverageBucketCreationTime() {
return averageBucketCreationTime;
}
public String getTotalKeyWriteTime() {
return totalKeyWriteTime;
}
public String getAverageKeyWriteTime() {
return averageKeyWriteTime;
}
public String getAverageVolumeCreationTime() {
return averageVolumeCreationTime;
}
public String getAverageKeyCreationTime() {
return averageKeyCreationTime;
}
public String getExecTime() {
return execTime;
}
@ -962,22 +988,61 @@ public String getGitBaseRevision() {
return gitBaseRevision;
}
public String getMinThreadThroughputPerSecond() {
return minThreadThroughputPerSecond;
}
public String getMaxThreadThroughputPerSecond() {
return maxThreadThroughputPerSecond;
}
public String getDataWritten() {
return dataWritten;
}
public String getTotalThroughput() {
public String getTotalThroughputPerSecond() {
return totalThroughputPerSecond;
}
public String getMeanVolumeCreateTime() {
return meanVolumeCreateTime;
}
public String getDeviationVolumeCreateTime() {
return deviationVolumeCreateTime;
}
public String[] getTenQuantileVolumeCreateTime() {
return tenQuantileVolumeCreateTime;
}
public String getMeanBucketCreateTime() {
return meanBucketCreateTime;
}
public String getDeviationBucketCreateTime() {
return deviationBucketCreateTime;
}
public String[] getTenQuantileBucketCreateTime() {
return tenQuantileBucketCreateTime;
}
public String getMeanKeyCreateTime() {
return meanKeyCreateTime;
}
public String getDeviationKeyCreateTime() {
return deviationKeyCreateTime;
}
public String[] getTenQuantileKeyCreateTime() {
return tenQuantileKeyCreateTime;
}
public String getMeanKeyWriteTime() {
return meanKeyWriteTime;
}
public String getDeviationKeyWriteTime() {
return deviationKeyWriteTime;
}
public String[] getTenQuantileKeyWriteTime() {
return tenQuantileKeyWriteTime;
}
}
private class ProgressBar implements Runnable {