From ca3adf588ceef53de340f03cbfbd45db8d25a408 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Fri, 7 Sep 2018 13:27:15 -0700 Subject: [PATCH] HDDS-398. Support multiple tests in freon. Contributed by Elek, Marton. --- .../src/test/acceptance/basic/basic.robot | 2 +- .../hadoop/ozone/freon/TestDataValidate.java | 119 +- .../apache/hadoop/ozone/freon/TestFreon.java | 129 -- .../ozone/freon/TestRandomKeyGenerator.java | 106 ++ .../org/apache/hadoop/ozone/freon/Freon.java | 1136 +---------------- .../ozone/freon/RandomKeyGenerator.java | 1038 +++++++++++++++ 6 files changed, 1205 insertions(+), 1325 deletions(-) delete mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreon.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java create mode 100644 hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java diff --git a/hadoop-ozone/acceptance-test/src/test/acceptance/basic/basic.robot b/hadoop-ozone/acceptance-test/src/test/acceptance/basic/basic.robot index 6d6fea02733..71d6e4ca164 100644 --- a/hadoop-ozone/acceptance-test/src/test/acceptance/basic/basic.robot +++ b/hadoop-ozone/acceptance-test/src/test/acceptance/basic/basic.robot @@ -45,6 +45,6 @@ Check webui static resources Should contain ${result} 200 Start freon testing - ${result} = Execute on ozoneManager ozone freon -numOfVolumes 5 -numOfBuckets 5 -numOfKeys 5 -numOfThreads 10 + ${result} = Execute on ozoneManager ozone freon randomkeys --numOfVolumes 5 --numOfBuckets 5 --numOfKeys 5 --numOfThreads 10 Wait Until Keyword Succeeds 3min 10sec Should contain ${result} Number of Keys added: 125 Should Not Contain ${result} ERROR diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java index 0a3c3918d49..fdce73600e1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,20 +18,15 @@ package org.apache.hadoop.ozone.freon; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.util.ToolRunner; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.util.ArrayList; -import java.util.List; - /** * Tests Freon, with MiniOzoneCluster and validate data. */ @@ -45,7 +40,6 @@ public class TestDataValidate { *

* Ozone is made active by setting OZONE_ENABLED = true * - * @throws IOException */ @BeforeClass public static void init() throws Exception { @@ -67,74 +61,55 @@ public class TestDataValidate { @Test public void ratisTestLargeKey() throws Exception { - List args = new ArrayList<>(); - args.add("-validateWrites"); - args.add("-numOfVolumes"); - args.add("1"); - args.add("-numOfBuckets"); - args.add("1"); - args.add("-numOfKeys"); - args.add("1"); - args.add("-ratis"); - args.add("3"); - args.add("-keySize"); - args.add("104857600"); - Freon freon = new Freon(conf); - int res = ToolRunner.run(conf, freon, - args.toArray(new String[0])); - Assert.assertEquals(1, freon.getNumberOfVolumesCreated()); - Assert.assertEquals(1, freon.getNumberOfBucketsCreated()); - Assert.assertEquals(1, freon.getNumberOfKeysAdded()); - Assert.assertEquals(0, freon.getUnsuccessfulValidationCount()); - Assert.assertEquals(0, res); + RandomKeyGenerator randomKeyGenerator = + new RandomKeyGenerator((OzoneConfiguration) cluster.getConf()); + randomKeyGenerator.setNumOfVolumes(1); + randomKeyGenerator.setNumOfBuckets(1); + randomKeyGenerator.setNumOfKeys(1); + randomKeyGenerator.setType(ReplicationType.RATIS); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); + randomKeyGenerator.setKeySize(104857600); + randomKeyGenerator.setValidateWrites(true); + randomKeyGenerator.call(); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated()); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated()); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded()); + Assert.assertEquals(0, randomKeyGenerator.getUnsuccessfulValidationCount()); } @Test public void standaloneTestLargeKey() throws Exception { - List args = new ArrayList<>(); - args.add("-validateWrites"); - args.add("-numOfVolumes"); - args.add("1"); - args.add("-numOfBuckets"); - args.add("1"); - args.add("-numOfKeys"); - args.add("1"); - args.add("-keySize"); - args.add("104857600"); - Freon freon = new Freon(conf); - int res = ToolRunner.run(conf, freon, - args.toArray(new String[0])); - Assert.assertEquals(1, freon.getNumberOfVolumesCreated()); - Assert.assertEquals(1, freon.getNumberOfBucketsCreated()); - Assert.assertEquals(1, freon.getNumberOfKeysAdded()); - Assert.assertEquals(0, freon.getUnsuccessfulValidationCount()); - Assert.assertEquals(0, res); + RandomKeyGenerator randomKeyGenerator = + new RandomKeyGenerator((OzoneConfiguration) cluster.getConf()); + randomKeyGenerator.setNumOfVolumes(1); + randomKeyGenerator.setNumOfBuckets(1); + randomKeyGenerator.setNumOfKeys(1); + randomKeyGenerator.setKeySize(104857600); + randomKeyGenerator.setValidateWrites(true); + randomKeyGenerator.call(); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated()); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated()); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded()); + Assert.assertEquals(0, randomKeyGenerator.getUnsuccessfulValidationCount()); } @Test public void validateWriteTest() throws Exception { - PrintStream originalStream = System.out; - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - System.setOut(new PrintStream(outStream)); - List args = new ArrayList<>(); - args.add("-validateWrites"); - args.add("-numOfVolumes"); - args.add("2"); - args.add("-numOfBuckets"); - args.add("5"); - args.add("-numOfKeys"); - args.add("10"); - Freon freon = new Freon(conf); - int res = ToolRunner.run(conf, freon, - args.toArray(new String[0])); - Assert.assertEquals(0, res); - Assert.assertEquals(2, freon.getNumberOfVolumesCreated()); - Assert.assertEquals(10, freon.getNumberOfBucketsCreated()); - Assert.assertEquals(100, freon.getNumberOfKeysAdded()); - Assert.assertTrue(freon.getValidateWrites()); - Assert.assertNotEquals(0, freon.getTotalKeysValidated()); - Assert.assertNotEquals(0, freon.getSuccessfulValidationCount()); - Assert.assertEquals(0, freon.getUnsuccessfulValidationCount()); - System.setOut(originalStream); + RandomKeyGenerator randomKeyGenerator = + new RandomKeyGenerator((OzoneConfiguration) cluster.getConf()); + randomKeyGenerator.setNumOfVolumes(2); + randomKeyGenerator.setNumOfBuckets(5); + randomKeyGenerator.setNumOfKeys(10); + randomKeyGenerator.setValidateWrites(true); + randomKeyGenerator.call(); + Assert.assertEquals(2, randomKeyGenerator.getNumberOfVolumesCreated()); + Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated()); + Assert.assertEquals(100, randomKeyGenerator.getNumberOfKeysAdded()); + Assert.assertTrue(randomKeyGenerator.getValidateWrites()); + Assert.assertNotEquals(0, randomKeyGenerator.getTotalKeysValidated()); + Assert.assertNotEquals(0, randomKeyGenerator + .getSuccessfulValidationCount()); + Assert.assertEquals(0, randomKeyGenerator + .getUnsuccessfulValidationCount()); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreon.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreon.java deleted file mode 100644 index 022d6b590c3..00000000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreon.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.freon; - -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.util.ToolRunner; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * Tests Freon, with MiniOzoneCluster. - */ -public class TestFreon { - - private static MiniOzoneCluster cluster; - private static OzoneConfiguration conf; - - /** - * Create a MiniDFSCluster for testing. - *

- * Ozone is made active by setting OZONE_ENABLED = true - * - * @throws IOException - */ - @BeforeClass - public static void init() throws Exception { - conf = new OzoneConfiguration(); - cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build(); - cluster.waitForClusterToBeReady(); - } - - /** - * Shutdown MiniDFSCluster. - */ - @AfterClass - public static void shutdown() { - if (cluster != null) { - cluster.shutdown(); - } - } - - @Test - public void defaultTest() throws Exception { - List args = new ArrayList<>(); - args.add("-numOfVolumes"); - args.add("2"); - args.add("-numOfBuckets"); - args.add("5"); - args.add("-numOfKeys"); - args.add("10"); - Freon freon = new Freon(conf); - int res = ToolRunner.run(conf, freon, - args.toArray(new String[0])); - Assert.assertEquals(2, freon.getNumberOfVolumesCreated()); - Assert.assertEquals(10, freon.getNumberOfBucketsCreated()); - Assert.assertEquals(100, freon.getNumberOfKeysAdded()); - Assert.assertEquals(10240 - 36, freon.getKeyValueLength()); - Assert.assertEquals(0, res); - } - - @Test - public void multiThread() throws Exception { - List args = new ArrayList<>(); - args.add("-numOfVolumes"); - args.add("10"); - args.add("-numOfBuckets"); - args.add("1"); - args.add("-numOfKeys"); - args.add("10"); - args.add("-numOfThread"); - args.add("10"); - args.add("-keySize"); - args.add("10240"); - Freon freon = new Freon(conf); - int res = ToolRunner.run(conf, freon, - args.toArray(new String[0])); - Assert.assertEquals(10, freon.getNumberOfVolumesCreated()); - Assert.assertEquals(10, freon.getNumberOfBucketsCreated()); - Assert.assertEquals(100, freon.getNumberOfKeysAdded()); - Assert.assertEquals(0, res); - } - - @Test - public void ratisTest3() throws Exception { - List args = new ArrayList<>(); - args.add("-numOfVolumes"); - args.add("10"); - args.add("-numOfBuckets"); - args.add("1"); - args.add("-numOfKeys"); - args.add("10"); - args.add("-ratis"); - args.add("3"); - args.add("-numOfThread"); - args.add("10"); - args.add("-keySize"); - args.add("10240"); - Freon freon = new Freon(conf); - int res = ToolRunner.run(conf, freon, - args.toArray(new String[0])); - Assert.assertEquals(10, freon.getNumberOfVolumesCreated()); - Assert.assertEquals(10, freon.getNumberOfBucketsCreated()); - Assert.assertEquals(100, freon.getNumberOfKeysAdded()); - Assert.assertEquals(0, res); - } -} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java new file mode 100644 index 00000000000..d21d399941c --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.freon; + +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests Freon, with MiniOzoneCluster. + */ +public class TestRandomKeyGenerator { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build(); + cluster.waitForClusterToBeReady(); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void defaultTest() throws Exception { + RandomKeyGenerator randomKeyGenerator = + new RandomKeyGenerator((OzoneConfiguration) cluster.getConf()); + randomKeyGenerator.setNumOfVolumes(2); + randomKeyGenerator.setNumOfBuckets(5); + randomKeyGenerator.setNumOfKeys(10); + randomKeyGenerator.call(); + Assert.assertEquals(2, randomKeyGenerator.getNumberOfVolumesCreated()); + Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated()); + Assert.assertEquals(100, randomKeyGenerator.getNumberOfKeysAdded()); + Assert.assertEquals(10240 - 36, randomKeyGenerator.getKeyValueLength()); + } + + @Test + public void multiThread() throws Exception { + RandomKeyGenerator randomKeyGenerator = + new RandomKeyGenerator((OzoneConfiguration) cluster.getConf()); + randomKeyGenerator.setNumOfVolumes(10); + randomKeyGenerator.setNumOfBuckets(1); + randomKeyGenerator.setNumOfKeys(10); + randomKeyGenerator.setNumOfThreads(10); + randomKeyGenerator.setKeySize(10240); + randomKeyGenerator.call(); + Assert.assertEquals(10, randomKeyGenerator.getNumberOfVolumesCreated()); + Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated()); + Assert.assertEquals(100, randomKeyGenerator.getNumberOfKeysAdded()); + } + + @Test + public void ratisTest3() throws Exception { + RandomKeyGenerator randomKeyGenerator = + new RandomKeyGenerator((OzoneConfiguration) cluster.getConf()); + randomKeyGenerator.setNumOfVolumes(10); + randomKeyGenerator.setNumOfBuckets(1); + randomKeyGenerator.setNumOfKeys(10); + randomKeyGenerator.setNumOfThreads(10); + randomKeyGenerator.setKeySize(10240); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); + randomKeyGenerator.setType(ReplicationType.RATIS); + randomKeyGenerator.call(); + Assert.assertEquals(10, randomKeyGenerator.getNumberOfVolumesCreated()); + Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated()); + Assert.assertEquals(100, randomKeyGenerator.getNumberOfKeysAdded()); + } +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java index ab52b86c23a..f9e8c9b97ce 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java @@ -14,1136 +14,26 @@ * License for the specific language governing permissions and limitations under * the License. */ - package org.apache.hadoop.ozone.freon; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Snapshot; -import com.codahale.metrics.UniformReservoir; -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.PropertyAccessor; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.time.DurationFormatUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.hdds.client.OzoneQuota; -import org.apache.hadoop.hdds.client.ReplicationFactor; -import org.apache.hadoop.hdds.client.ReplicationType; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.client.*; -import org.apache.hadoop.ozone.client.io.OzoneInputStream; -import org.apache.hadoop.ozone.client.io.OzoneOutputStream; -import org.apache.hadoop.util.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.hdds.cli.GenericCli; +import org.apache.hadoop.hdds.cli.HddsVersionProvider; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; - -import static java.lang.Math.min; +import picocli.CommandLine.Command; /** - * Freon - A tool to populate ozone with data for testing.
- * This is not a map-reduce program and this is not for benchmarking - * Ozone write throughput.
- * It supports both online and offline modes. Default mode is offline, - * -mode can be used to change the mode. - *

- * In online mode, active internet connection is required, - * common crawl data from AWS will be used.
- * Default source is:
- * https://commoncrawl.s3.amazonaws.com/crawl-data/ - * CC-MAIN-2017-17/warc.paths.gz
- * (it contains the path to actual data segment)
- * User can override this using -source. - * The following values are derived from URL of Common Crawl data - *

- * In offline mode, the data will be random bytes and - * size of data will be 10 KB.
- * + * Ozone data generator and performance test tool. */ -public final class Freon extends Configured implements Tool { +@Command( + name = "ozone freon", + description = "Load generator and tester tool for ozone", + subcommands = RandomKeyGenerator.class, + versionProvider = HddsVersionProvider.class, + mixinStandardHelpOptions = true) +public class Freon extends GenericCli { - enum FreonOps { - VOLUME_CREATE, - BUCKET_CREATE, - KEY_CREATE, - KEY_WRITE + public static void main(String[] args) { + new Freon().run(args); } - private static final String HELP = "help"; - private static final String MODE = "mode"; - private static final String SOURCE = "source"; - private static final String VALIDATE_WRITE = "validateWrites"; - private static final String JSON_WRITE_DIRECTORY = "jsonDir"; - private static final String NUM_OF_THREADS = "numOfThreads"; - private static final String NUM_OF_VOLUMES = "numOfVolumes"; - private static final String NUM_OF_BUCKETS = "numOfBuckets"; - private static final String NUM_OF_KEYS = "numOfKeys"; - private static final String KEY_SIZE = "keySize"; - private static final String RATIS = "ratis"; - - private static final String MODE_DEFAULT = "offline"; - private static final String SOURCE_DEFAULT = - "https://commoncrawl.s3.amazonaws.com/" + - "crawl-data/CC-MAIN-2017-17/warc.paths.gz"; - private static final String NUM_OF_THREADS_DEFAULT = "10"; - private static final String NUM_OF_VOLUMES_DEFAULT = "10"; - private static final String NUM_OF_BUCKETS_DEFAULT = "1000"; - private static final String NUM_OF_KEYS_DEFAULT = "500000"; - private static final String DURATION_FORMAT = "HH:mm:ss,SSS"; - - private static final int KEY_SIZE_DEFAULT = 10240; - private static final int QUANTILES = 10; - - private static final Logger LOG = - LoggerFactory.getLogger(Freon.class); - - private boolean printUsage = false; - private boolean completed = false; - private boolean exception = false; - - private String mode; - private String source; - private String numOfThreads; - private String numOfVolumes; - private String numOfBuckets; - private String numOfKeys; - private String jsonDir; - private boolean useRatis; - private ReplicationType type; - private ReplicationFactor factor; - - private int threadPoolSize; - private int keySize; - private byte[] keyValue = null; - - private boolean validateWrites; - - private OzoneClient ozoneClient; - private ObjectStore objectStore; - private ExecutorService processor; - - private long startTime; - private long jobStartTime; - - private AtomicLong volumeCreationTime; - private AtomicLong bucketCreationTime; - private AtomicLong keyCreationTime; - private AtomicLong keyWriteTime; - - private AtomicLong totalBytesWritten; - - private AtomicInteger numberOfVolumesCreated; - private AtomicInteger numberOfBucketsCreated; - private AtomicLong numberOfKeysAdded; - - private Long totalWritesValidated; - private Long writeValidationSuccessCount; - private Long writeValidationFailureCount; - - private BlockingQueue validationQueue; - private ArrayList histograms = new ArrayList<>(); - - @VisibleForTesting - Freon(Configuration conf) throws IOException { - startTime = System.nanoTime(); - jobStartTime = System.currentTimeMillis(); - volumeCreationTime = new AtomicLong(); - bucketCreationTime = new AtomicLong(); - keyCreationTime = new AtomicLong(); - keyWriteTime = new AtomicLong(); - totalBytesWritten = new AtomicLong(); - numberOfVolumesCreated = new AtomicInteger(); - numberOfBucketsCreated = new AtomicInteger(); - numberOfKeysAdded = new AtomicLong(); - ozoneClient = OzoneClientFactory.getClient(conf); - objectStore = ozoneClient.getObjectStore(); - for (FreonOps ops : FreonOps.values()) { - histograms.add(ops.ordinal(), new Histogram(new UniformReservoir())); - } - } - - /** - * @param args arguments - */ - public static void main(String[] args) throws Exception { - Configuration conf = new OzoneConfiguration(); - int res = ToolRunner.run(conf, new Freon(conf), args); - System.exit(res); - } - - @Override - public int run(String[] args) throws Exception { - GenericOptionsParser parser = new GenericOptionsParser(getConf(), - getOptions(), args); - parseOptions(parser.getCommandLine()); - if (printUsage) { - usage(); - return 0; - } - - keyValue = - DFSUtil.string2Bytes(RandomStringUtils.randomAscii(keySize - 36)); - - LOG.info("Number of Threads: " + numOfThreads); - threadPoolSize = - min(Integer.parseInt(numOfVolumes), Integer.parseInt(numOfThreads)); - processor = Executors.newFixedThreadPool(threadPoolSize); - addShutdownHook(); - if (mode.equals("online")) { - LOG.info("Mode: online"); - throw new UnsupportedOperationException("Not yet implemented."); - } else { - LOG.info("Mode: offline"); - LOG.info("Number of Volumes: {}.", numOfVolumes); - LOG.info("Number of Buckets per Volume: {}.", numOfBuckets); - LOG.info("Number of Keys per Bucket: {}.", numOfKeys); - LOG.info("Key size: {} bytes", keySize); - for (int i = 0; i < Integer.parseInt(numOfVolumes); i++) { - String volume = "vol-" + i + "-" + - RandomStringUtils.randomNumeric(5); - processor.submit(new OfflineProcessor(volume)); - } - } - Thread validator = null; - if (validateWrites) { - totalWritesValidated = 0L; - writeValidationSuccessCount = 0L; - writeValidationFailureCount = 0L; - - validationQueue = - new ArrayBlockingQueue<>(Integer.parseInt(numOfThreads)); - validator = new Thread(new Validator()); - validator.start(); - LOG.info("Data validation is enabled."); - } - Thread progressbar = getProgressBarThread(); - LOG.info("Starting progress bar Thread."); - progressbar.start(); - processor.shutdown(); - processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - completed = true; - progressbar.join(); - if (validateWrites) { - validator.join(); - } - ozoneClient.close(); - return 0; - } - - private Options getOptions() { - Options options = new Options(); - - OptionBuilder.withDescription("prints usage."); - Option optHelp = OptionBuilder.create(HELP); - - OptionBuilder.withArgName("online | offline"); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("specifies the mode of " + - "Freon run."); - Option optMode = OptionBuilder.create(MODE); - - OptionBuilder.withArgName("source url"); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("specifies the URL of s3 " + - "commoncrawl warc file to be used when the mode is online."); - Option optSource = OptionBuilder.create(SOURCE); - - OptionBuilder.withDescription("do random validation of " + - "data written into ozone, only subset of data is validated."); - Option optValidateWrite = OptionBuilder.create(VALIDATE_WRITE); - - - OptionBuilder.withDescription("directory where json is created"); - OptionBuilder.hasArg(); - Option optJsonDir = OptionBuilder.create(JSON_WRITE_DIRECTORY); - - OptionBuilder.withArgName("value"); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("number of threads to be launched " + - "for the run"); - Option optNumOfThreads = OptionBuilder.create(NUM_OF_THREADS); - - OptionBuilder.withArgName("value"); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("specifies number of Volumes to be " + - "created in offline mode"); - Option optNumOfVolumes = OptionBuilder.create(NUM_OF_VOLUMES); - - OptionBuilder.withArgName("value"); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("specifies number of Buckets to be " + - "created per Volume in offline mode"); - Option optNumOfBuckets = OptionBuilder.create(NUM_OF_BUCKETS); - - OptionBuilder.withArgName("value"); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("specifies number of Keys to be " + - "created per Bucket in offline mode"); - Option optNumOfKeys = OptionBuilder.create(NUM_OF_KEYS); - - OptionBuilder.withArgName("value"); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("specifies the size of Key in bytes to be " + - "created in offline mode"); - Option optKeySize = OptionBuilder.create(KEY_SIZE); - - OptionBuilder.withArgName(RATIS); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("Use Ratis as the default replication " + - "strategy"); - Option optRatis = OptionBuilder.create(RATIS); - - options.addOption(optHelp); - options.addOption(optMode); - options.addOption(optSource); - options.addOption(optValidateWrite); - options.addOption(optJsonDir); - options.addOption(optNumOfThreads); - options.addOption(optNumOfVolumes); - options.addOption(optNumOfBuckets); - options.addOption(optNumOfKeys); - options.addOption(optKeySize); - options.addOption(optRatis); - return options; - } - - private void parseOptions(CommandLine cmdLine) { - printUsage = cmdLine.hasOption(HELP); - - mode = cmdLine.getOptionValue(MODE, MODE_DEFAULT); - - source = cmdLine.getOptionValue(SOURCE, SOURCE_DEFAULT); - - numOfThreads = - cmdLine.getOptionValue(NUM_OF_THREADS, NUM_OF_THREADS_DEFAULT); - - validateWrites = cmdLine.hasOption(VALIDATE_WRITE); - - jsonDir = cmdLine.getOptionValue(JSON_WRITE_DIRECTORY); - - numOfVolumes = - cmdLine.getOptionValue(NUM_OF_VOLUMES, NUM_OF_VOLUMES_DEFAULT); - - numOfBuckets = - cmdLine.getOptionValue(NUM_OF_BUCKETS, NUM_OF_BUCKETS_DEFAULT); - - numOfKeys = cmdLine.getOptionValue(NUM_OF_KEYS, NUM_OF_KEYS_DEFAULT); - - keySize = cmdLine.hasOption(KEY_SIZE) ? - Integer.parseInt(cmdLine.getOptionValue(KEY_SIZE)) : KEY_SIZE_DEFAULT; - if (keySize < 1024) { - throw new IllegalArgumentException( - "keySize can not be less than 1024 bytes"); - } - - useRatis = cmdLine.hasOption(RATIS); - - type = ReplicationType.STAND_ALONE; - factor = ReplicationFactor.ONE; - - if (useRatis) { - type = ReplicationType.RATIS; - int replicationFactor = Integer.parseInt(cmdLine.getOptionValue(RATIS)); - switch (replicationFactor) { - case 1: - factor = ReplicationFactor.ONE; - break; - case 3: - factor = ReplicationFactor.THREE; - break; - default: - throw new IllegalArgumentException("Illegal replication factor:" - + replicationFactor); - } - } - } - - private void usage() { - System.out.println("Options supported are:"); - System.out.println("-numOfThreads " - + "number of threads to be launched for the run."); - System.out.println("-validateWrites " - + "do random validation of data written into ozone, " + - "only subset of data is validated."); - System.out.println("-jsonDir " - + "directory where json is created."); - System.out.println("-mode [online | offline] " - + "specifies the mode in which Freon should run."); - System.out.println("-source " - + "specifies the URL of s3 commoncrawl warc file to " + - "be used when the mode is online."); - System.out.println("-numOfVolumes " - + "specifies number of Volumes to be created in offline mode"); - System.out.println("-numOfBuckets " - + "specifies number of Buckets to be created per Volume " + - "in offline mode"); - System.out.println("-numOfKeys " - + "specifies number of Keys to be created per Bucket " + - "in offline mode"); - System.out.println("-keySize " - + "specifies the size of Key in bytes to be created in offline mode"); - System.out.println("-help " - + "prints usage."); - System.out.println(); - } - - /** - * Adds ShutdownHook to print statistics. - */ - private void addShutdownHook() { - Runtime.getRuntime().addShutdownHook( - new Thread(() -> printStats(System.out))); - } - - private Thread getProgressBarThread() { - Supplier currentValue; - long maxValue; - - if (mode.equals("online")) { - throw new UnsupportedOperationException("Not yet implemented."); - } else { - currentValue = () -> numberOfKeysAdded.get(); - maxValue = Long.parseLong(numOfVolumes) * - Long.parseLong(numOfBuckets) * - Long.parseLong(numOfKeys); - } - Thread progressBarThread = new Thread( - new ProgressBar(System.out, currentValue, maxValue)); - progressBarThread.setName("ProgressBar"); - return progressBarThread; - } - - /** - * Prints stats of {@link Freon} run to the PrintStream. - * - * @param out PrintStream - */ - private void printStats(PrintStream out) { - long endTime = System.nanoTime() - startTime; - String execTime = DurationFormatUtils - .formatDuration(TimeUnit.NANOSECONDS.toMillis(endTime), - DURATION_FORMAT); - - long volumeTime = TimeUnit.NANOSECONDS.toMillis(volumeCreationTime.get()) - / threadPoolSize; - String prettyAverageVolumeTime = - DurationFormatUtils.formatDuration(volumeTime, DURATION_FORMAT); - - long bucketTime = TimeUnit.NANOSECONDS.toMillis(bucketCreationTime.get()) - / threadPoolSize; - String prettyAverageBucketTime = - DurationFormatUtils.formatDuration(bucketTime, DURATION_FORMAT); - - long averageKeyCreationTime = - TimeUnit.NANOSECONDS.toMillis(keyCreationTime.get()) - / threadPoolSize; - String prettyAverageKeyCreationTime = DurationFormatUtils - .formatDuration(averageKeyCreationTime, DURATION_FORMAT); - - long averageKeyWriteTime = - TimeUnit.NANOSECONDS.toMillis(keyWriteTime.get()) / threadPoolSize; - String prettyAverageKeyWriteTime = DurationFormatUtils - .formatDuration(averageKeyWriteTime, DURATION_FORMAT); - - out.println(); - out.println("***************************************************"); - out.println("Status: " + (exception ? "Failed" : "Success")); - out.println("Git Base Revision: " + VersionInfo.getRevision()); - out.println("Number of Volumes created: " + numberOfVolumesCreated); - out.println("Number of Buckets created: " + numberOfBucketsCreated); - out.println("Number of Keys added: " + numberOfKeysAdded); - out.println("Ratis replication factor: " + factor.name()); - out.println("Ratis replication type: " + type.name()); - out.println( - "Average Time spent in volume creation: " + prettyAverageVolumeTime); - out.println( - "Average Time spent in bucket creation: " + prettyAverageBucketTime); - out.println( - "Average Time spent in key creation: " + prettyAverageKeyCreationTime); - out.println( - "Average Time spent in key write: " + prettyAverageKeyWriteTime); - out.println("Total bytes written: " + totalBytesWritten); - if (validateWrites) { - out.println("Total number of writes validated: " + - totalWritesValidated); - out.println("Writes validated: " + - (100.0 * totalWritesValidated / numberOfKeysAdded.get()) - + " %"); - out.println("Successful validation: " + - writeValidationSuccessCount); - out.println("Unsuccessful validation: " + - writeValidationFailureCount); - } - out.println("Total Execution time: " + execTime); - out.println("***************************************************"); - - if (jsonDir != null) { - - String[][] quantileTime = - new String[FreonOps.values().length][QUANTILES + 1]; - String[] deviations = new String[FreonOps.values().length]; - String[] means = new String[FreonOps.values().length]; - for (FreonOps ops : FreonOps.values()) { - Snapshot snapshot = histograms.get(ops.ordinal()).getSnapshot(); - for (int i = 0; i <= QUANTILES; i++) { - quantileTime[ops.ordinal()][i] = DurationFormatUtils.formatDuration( - TimeUnit.NANOSECONDS - .toMillis((long) snapshot.getValue((1.0 / QUANTILES) * i)), - DURATION_FORMAT); - } - deviations[ops.ordinal()] = DurationFormatUtils.formatDuration( - TimeUnit.NANOSECONDS.toMillis((long) snapshot.getStdDev()), - DURATION_FORMAT); - means[ops.ordinal()] = DurationFormatUtils.formatDuration( - TimeUnit.NANOSECONDS.toMillis((long) snapshot.getMean()), - DURATION_FORMAT); - } - - FreonJobInfo jobInfo = new FreonJobInfo().setExecTime(execTime) - .setGitBaseRevision(VersionInfo.getRevision()) - .setMeanVolumeCreateTime(means[FreonOps.VOLUME_CREATE.ordinal()]) - .setDeviationVolumeCreateTime( - deviations[FreonOps.VOLUME_CREATE.ordinal()]) - .setTenQuantileVolumeCreateTime( - quantileTime[FreonOps.VOLUME_CREATE.ordinal()]) - .setMeanBucketCreateTime(means[FreonOps.BUCKET_CREATE.ordinal()]) - .setDeviationBucketCreateTime( - deviations[FreonOps.BUCKET_CREATE.ordinal()]) - .setTenQuantileBucketCreateTime( - quantileTime[FreonOps.BUCKET_CREATE.ordinal()]) - .setMeanKeyCreateTime(means[FreonOps.KEY_CREATE.ordinal()]) - .setDeviationKeyCreateTime(deviations[FreonOps.KEY_CREATE.ordinal()]) - .setTenQuantileKeyCreateTime( - quantileTime[FreonOps.KEY_CREATE.ordinal()]) - .setMeanKeyWriteTime(means[FreonOps.KEY_WRITE.ordinal()]) - .setDeviationKeyWriteTime(deviations[FreonOps.KEY_WRITE.ordinal()]) - .setTenQuantileKeyWriteTime( - quantileTime[FreonOps.KEY_WRITE.ordinal()]); - String jsonName = - new SimpleDateFormat("yyyyMMddHHmmss").format(Time.now()) + ".json"; - String jsonPath = jsonDir + "/" + jsonName; - FileOutputStream os = null; - try { - os = new FileOutputStream(jsonPath); - ObjectMapper mapper = new ObjectMapper(); - mapper.setVisibility(PropertyAccessor.FIELD, - JsonAutoDetect.Visibility.ANY); - ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter(); - writer.writeValue(os, jobInfo); - } catch (FileNotFoundException e) { - out.println("Json File could not be created for the path: " + jsonPath); - out.println(e); - } catch (IOException e) { - out.println("Json object could not be created"); - out.println(e); - } finally { - try { - if (os != null) { - os.close(); - } - } catch (IOException e) { - LOG.warn("Could not close the output stream for json", e); - } - } - } - } - - /** - * Returns the number of volumes created. - * @return volume count. - */ - @VisibleForTesting - int getNumberOfVolumesCreated() { - return numberOfVolumesCreated.get(); - } - - /** - * Returns the number of buckets created. - * @return bucket count. - */ - @VisibleForTesting - int getNumberOfBucketsCreated() { - return numberOfBucketsCreated.get(); - } - - /** - * Returns the number of keys added. - * @return keys count. - */ - @VisibleForTesting - long getNumberOfKeysAdded() { - return numberOfKeysAdded.get(); - } - - /** - * Returns true if random validation of write is enabled. - * @return validateWrites - */ - @VisibleForTesting - boolean getValidateWrites() { - return validateWrites; - } - - /** - * Returns the number of keys validated. - * @return validated key count. - */ - @VisibleForTesting - long getTotalKeysValidated() { - return totalWritesValidated; - } - - /** - * Returns the number of successful validation. - * @return successful validation count. - */ - @VisibleForTesting - long getSuccessfulValidationCount() { - return writeValidationSuccessCount; - } - - /** - * Returns the number of unsuccessful validation. - * @return unsuccessful validation count. - */ - @VisibleForTesting - long getUnsuccessfulValidationCount() { - return writeValidationFailureCount; - } - - /** - * Returns the length of the common key value initialized. - * @return key value length initialized. - */ - @VisibleForTesting - long getKeyValueLength(){ - return keyValue.length; - } - - /** - * Wrapper to hold ozone key-value pair. - */ - private static class KeyValue { - - /** - * Bucket name associated with the key-value. - */ - private OzoneBucket bucket; - /** - * Key name associated with the key-value. - */ - private String key; - /** - * Value associated with the key-value. - */ - private byte[] value; - - /** - * Constructs a new ozone key-value pair. - * - * @param key key part - * @param value value part - */ - KeyValue(OzoneBucket bucket, String key, byte[] value) { - this.bucket = bucket; - this.key = key; - this.value = value; - } - } - - private class OfflineProcessor implements Runnable { - - private int totalBuckets; - private int totalKeys; - private String volumeName; - - OfflineProcessor(String volumeName) { - this.totalBuckets = Integer.parseInt(numOfBuckets); - this.totalKeys = Integer.parseInt(numOfKeys); - this.volumeName = volumeName; - } - - @Override - public void run() { - LOG.trace("Creating volume: {}", volumeName); - long start = System.nanoTime(); - OzoneVolume volume; - try { - objectStore.createVolume(volumeName); - long volumeCreationDuration = System.nanoTime() - start; - volumeCreationTime.getAndAdd(volumeCreationDuration); - histograms.get(FreonOps.VOLUME_CREATE.ordinal()) - .update(volumeCreationDuration); - numberOfVolumesCreated.getAndIncrement(); - volume = objectStore.getVolume(volumeName); - } catch (IOException e) { - exception = true; - LOG.error("Could not create volume", e); - return; - } - - Long threadKeyWriteTime = 0L; - for (int j = 0; j < totalBuckets; j++) { - String bucketName = "bucket-" + j + "-" + - RandomStringUtils.randomNumeric(5); - try { - LOG.trace("Creating bucket: {} in volume: {}", - bucketName, volume.getName()); - start = System.nanoTime(); - volume.createBucket(bucketName); - long bucketCreationDuration = System.nanoTime() - start; - histograms.get(FreonOps.BUCKET_CREATE.ordinal()) - .update(bucketCreationDuration); - bucketCreationTime.getAndAdd(bucketCreationDuration); - numberOfBucketsCreated.getAndIncrement(); - OzoneBucket bucket = volume.getBucket(bucketName); - for (int k = 0; k < totalKeys; k++) { - String key = "key-" + k + "-" + - RandomStringUtils.randomNumeric(5); - byte[] randomValue = - DFSUtil.string2Bytes(UUID.randomUUID().toString()); - try { - LOG.trace("Adding key: {} in bucket: {} of volume: {}", - key, bucket, volume); - long keyCreateStart = System.nanoTime(); - OzoneOutputStream os = - bucket.createKey(key, keySize, type, factor); - long keyCreationDuration = System.nanoTime() - keyCreateStart; - histograms.get(FreonOps.KEY_CREATE.ordinal()) - .update(keyCreationDuration); - keyCreationTime.getAndAdd(keyCreationDuration); - long keyWriteStart = System.nanoTime(); - os.write(keyValue); - os.write(randomValue); - os.close(); - long keyWriteDuration = System.nanoTime() - keyWriteStart; - threadKeyWriteTime += keyWriteDuration; - histograms.get(FreonOps.KEY_WRITE.ordinal()) - .update(keyWriteDuration); - totalBytesWritten.getAndAdd(keySize); - numberOfKeysAdded.getAndIncrement(); - if (validateWrites) { - byte[] value = ArrayUtils.addAll(keyValue, randomValue); - boolean validate = validationQueue.offer( - new KeyValue(bucket, key, value)); - if (validate) { - LOG.trace("Key {}, is queued for validation.", key); - } - } - } catch (Exception e) { - exception = true; - LOG.error("Exception while adding key: {} in bucket: {}" + - " of volume: {}.", key, bucket, volume, e); - } - } - } catch (Exception e) { - exception = true; - LOG.error("Exception while creating bucket: {}" + - " in volume: {}.", bucketName, volume, e); - } - } - - keyWriteTime.getAndAdd(threadKeyWriteTime); - } - - } - - private final class FreonJobInfo { - - private String status; - private String gitBaseRevision; - private String jobStartTime; - private String numOfVolumes; - private String numOfBuckets; - private String numOfKeys; - private String numOfThreads; - private String mode; - private String dataWritten; - private String execTime; - private String replicationFactor; - private String replicationType; - - private int keySize; - - private String totalThroughputPerSecond; - - private String meanVolumeCreateTime; - private String deviationVolumeCreateTime; - private String[] tenQuantileVolumeCreateTime; - - private String meanBucketCreateTime; - private String deviationBucketCreateTime; - private String[] tenQuantileBucketCreateTime; - - private String meanKeyCreateTime; - private String deviationKeyCreateTime; - private String[] tenQuantileKeyCreateTime; - - private String meanKeyWriteTime; - private String deviationKeyWriteTime; - private String[] tenQuantileKeyWriteTime; - - private FreonJobInfo() { - this.status = exception ? "Failed" : "Success"; - this.numOfVolumes = Freon.this.numOfVolumes; - this.numOfBuckets = Freon.this.numOfBuckets; - this.numOfKeys = Freon.this.numOfKeys; - this.numOfThreads = Freon.this.numOfThreads; - this.keySize = Freon.this.keySize; - this.mode = Freon.this.mode; - this.jobStartTime = Time.formatTime(Freon.this.jobStartTime); - this.replicationFactor = Freon.this.factor.name(); - this.replicationType = Freon.this.type.name(); - - long totalBytes = - Long.parseLong(numOfVolumes) * Long.parseLong(numOfBuckets) * Long - .parseLong(numOfKeys) * keySize; - this.dataWritten = getInStorageUnits((double) totalBytes); - this.totalThroughputPerSecond = getInStorageUnits( - (totalBytes * 1.0) / TimeUnit.NANOSECONDS - .toSeconds(Freon.this.keyWriteTime.get() / threadPoolSize)); - } - - private String getInStorageUnits(Double value) { - double size; - OzoneQuota.Units unit; - if ((long) (value / OzoneConsts.TB) != 0) { - size = value / OzoneConsts.TB; - unit = OzoneQuota.Units.TB; - } else if ((long) (value / OzoneConsts.GB) != 0) { - size = value / OzoneConsts.GB; - unit = OzoneQuota.Units.GB; - } else if ((long) (value / OzoneConsts.MB) != 0) { - size = value / OzoneConsts.MB; - unit = OzoneQuota.Units.MB; - } else if ((long) (value / OzoneConsts.KB) != 0) { - size = value / OzoneConsts.KB; - unit = OzoneQuota.Units.KB; - } else { - size = value; - unit = OzoneQuota.Units.BYTES; - } - return size + " " + unit; - } - - public FreonJobInfo setGitBaseRevision(String gitBaseRevisionVal) { - gitBaseRevision = gitBaseRevisionVal; - return this; - } - - public FreonJobInfo setExecTime(String execTimeVal) { - execTime = execTimeVal; - return this; - } - - public FreonJobInfo setMeanKeyWriteTime(String deviationKeyWriteTimeVal) { - this.meanKeyWriteTime = deviationKeyWriteTimeVal; - return this; - } - - public FreonJobInfo setDeviationKeyWriteTime( - String deviationKeyWriteTimeVal) { - this.deviationKeyWriteTime = deviationKeyWriteTimeVal; - return this; - } - - public FreonJobInfo setTenQuantileKeyWriteTime( - String[] tenQuantileKeyWriteTimeVal) { - this.tenQuantileKeyWriteTime = tenQuantileKeyWriteTimeVal; - return this; - } - - public FreonJobInfo setMeanKeyCreateTime(String deviationKeyWriteTimeVal) { - this.meanKeyCreateTime = deviationKeyWriteTimeVal; - return this; - } - - public FreonJobInfo setDeviationKeyCreateTime( - String deviationKeyCreateTimeVal) { - this.deviationKeyCreateTime = deviationKeyCreateTimeVal; - return this; - } - - public FreonJobInfo setTenQuantileKeyCreateTime( - String[] tenQuantileKeyCreateTimeVal) { - this.tenQuantileKeyCreateTime = tenQuantileKeyCreateTimeVal; - return this; - } - - public FreonJobInfo setMeanBucketCreateTime( - String deviationKeyWriteTimeVal) { - this.meanBucketCreateTime = deviationKeyWriteTimeVal; - return this; - } - - public FreonJobInfo setDeviationBucketCreateTime( - String deviationBucketCreateTimeVal) { - this.deviationBucketCreateTime = deviationBucketCreateTimeVal; - return this; - } - - public FreonJobInfo setTenQuantileBucketCreateTime( - String[] tenQuantileBucketCreateTimeVal) { - this.tenQuantileBucketCreateTime = tenQuantileBucketCreateTimeVal; - return this; - } - - public FreonJobInfo setMeanVolumeCreateTime( - String deviationKeyWriteTimeVal) { - this.meanVolumeCreateTime = deviationKeyWriteTimeVal; - return this; - } - - public FreonJobInfo setDeviationVolumeCreateTime( - String deviationVolumeCreateTimeVal) { - this.deviationVolumeCreateTime = deviationVolumeCreateTimeVal; - return this; - } - - public FreonJobInfo setTenQuantileVolumeCreateTime( - String[] tenQuantileVolumeCreateTimeVal) { - this.tenQuantileVolumeCreateTime = tenQuantileVolumeCreateTimeVal; - return this; - } - - public String getJobStartTime() { - return jobStartTime; - } - - public String getNumOfVolumes() { - return numOfVolumes; - } - - public String getNumOfBuckets() { - return numOfBuckets; - } - - public String getNumOfKeys() { - return numOfKeys; - } - - public String getNumOfThreads() { - return numOfThreads; - } - - public String getMode() { - return mode; - } - - public String getExecTime() { - return execTime; - } - - public String getReplicationFactor() { - return replicationFactor; - } - - public String getReplicationType() { - return replicationType; - } - - public String getStatus() { - return status; - } - - public int getKeySize() { - return keySize; - } - - public String getGitBaseRevision() { - return gitBaseRevision; - } - - public String getDataWritten() { - return dataWritten; - } - - public String getTotalThroughputPerSecond() { - return totalThroughputPerSecond; - } - - public String getMeanVolumeCreateTime() { - return meanVolumeCreateTime; - } - - public String getDeviationVolumeCreateTime() { - return deviationVolumeCreateTime; - } - - public String[] getTenQuantileVolumeCreateTime() { - return tenQuantileVolumeCreateTime; - } - - public String getMeanBucketCreateTime() { - return meanBucketCreateTime; - } - - public String getDeviationBucketCreateTime() { - return deviationBucketCreateTime; - } - - public String[] getTenQuantileBucketCreateTime() { - return tenQuantileBucketCreateTime; - } - - public String getMeanKeyCreateTime() { - return meanKeyCreateTime; - } - - public String getDeviationKeyCreateTime() { - return deviationKeyCreateTime; - } - - public String[] getTenQuantileKeyCreateTime() { - return tenQuantileKeyCreateTime; - } - - public String getMeanKeyWriteTime() { - return meanKeyWriteTime; - } - - public String getDeviationKeyWriteTime() { - return deviationKeyWriteTime; - } - - public String[] getTenQuantileKeyWriteTime() { - return tenQuantileKeyWriteTime; - } - } - - private class ProgressBar implements Runnable { - - private static final long REFRESH_INTERVAL = 1000L; - - private PrintStream stream; - private Supplier currentValue; - private long maxValue; - - ProgressBar(PrintStream stream, Supplier currentValue, - long maxValue) { - this.stream = stream; - this.currentValue = currentValue; - this.maxValue = maxValue; - } - - @Override - public void run() { - try { - stream.println(); - long value; - while ((value = currentValue.get()) < maxValue) { - print(value); - if (completed) { - break; - } - Thread.sleep(REFRESH_INTERVAL); - } - if (exception) { - stream.println(); - stream.println("Incomplete termination, " + - "check log for exception."); - } else { - print(maxValue); - } - stream.println(); - } catch (InterruptedException e) { - } - } - - /** - * Given current value prints the progress bar. - * - * @param value - */ - private void print(long value) { - stream.print('\r'); - double percent = 100.0 * value / maxValue; - StringBuilder sb = new StringBuilder(); - sb.append(" " + String.format("%.2f", percent) + "% |"); - - for (int i = 0; i <= percent; i++) { - sb.append('█'); - } - for (int j = 0; j < 100 - percent; j++) { - sb.append(' '); - } - sb.append("| "); - sb.append(value + "/" + maxValue); - long timeInSec = TimeUnit.SECONDS.convert( - System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600, - (timeInSec % 3600) / 60, timeInSec % 60); - sb.append(" Time: " + timeToPrint); - stream.print(sb); - } - } - - /** - * Validates the write done in ozone cluster. - */ - private class Validator implements Runnable { - - @Override - public void run() { - while (!completed) { - try { - KeyValue kv = validationQueue.poll(5, TimeUnit.SECONDS); - if (kv != null) { - - OzoneInputStream is = kv.bucket.readKey(kv.key); - byte[] value = new byte[kv.value.length]; - int length = is.read(value); - totalWritesValidated++; - if (length == kv.value.length && Arrays.equals(value, kv.value)) { - writeValidationSuccessCount++; - } else { - writeValidationFailureCount++; - LOG.warn("Data validation error for key {}/{}/{}", - kv.bucket.getVolumeName(), kv.bucket, kv.key); - LOG.warn("Expected: {}, Actual: {}", - DFSUtil.bytes2String(kv.value), - DFSUtil.bytes2String(value)); - } - } - } catch (IOException | InterruptedException ex) { - LOG.error("Exception while validating write: " + ex.getMessage()); - } - } - } - } } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java new file mode 100644 index 00000000000..ee4cc87818c --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -0,0 +1,1038 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.freon; + +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.client.OzoneQuota; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.VersionInfo; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.UniformReservoir; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.common.annotations.VisibleForTesting; +import static java.lang.Math.min; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; +import picocli.CommandLine.ParentCommand; + +/** + * Data generator tool to generate as much keys as possible. + */ +@Command(name = "randomkeys", + aliases = "rk", + description = "Generate volumes/buckets and put generated keys.", + versionProvider = HddsVersionProvider.class, + mixinStandardHelpOptions = true, + showDefaultValues = true) +public final class RandomKeyGenerator implements Callable { + + @ParentCommand + private Freon freon; + + enum FreonOps { + VOLUME_CREATE, + BUCKET_CREATE, + KEY_CREATE, + KEY_WRITE + } + + private static final String RATIS = "ratis"; + + private static final String DURATION_FORMAT = "HH:mm:ss,SSS"; + + private static final int QUANTILES = 10; + + private static final Logger LOG = + LoggerFactory.getLogger(RandomKeyGenerator.class); + + private boolean completed = false; + private boolean exception = false; + + @Option(names = "--numOfThreads", + description = "number of threads to be launched for the run", + defaultValue = "10") + private int numOfThreads = 10; + + @Option(names = "--numOfVolumes", + description = "specifies number of Volumes to be created in offline mode", + defaultValue = "10") + private int numOfVolumes = 10; + + @Option(names = "--numOfBuckets", + description = "specifies number of Buckets to be created per Volume", + defaultValue = "1000") + private int numOfBuckets = 1000; + + @Option( + names = "--numOfKeys", + description = "specifies number of Keys to be created per Bucket", + defaultValue = "500000" + ) + private int numOfKeys = 500000; + + @Option( + names = "--keySize", + description = "Specifies the size of Key in bytes to be created", + defaultValue = "10240" + ) + private int keySize = 10240; + + @Option( + names = "--json", + description = "directory where json is created." + ) + private String jsonDir; + + @Option( + names = "--replicationType", + description = "Replication type (STAND_ALONE, RATIS)", + defaultValue = "STAND_ALONE" + ) + private ReplicationType type = ReplicationType.STAND_ALONE; + + @Option( + names = "--factor", + description = "Replication factor (ONE, THREE)", + defaultValue = "ONE" + ) + private ReplicationFactor factor = ReplicationFactor.ONE; + + private int threadPoolSize; + private byte[] keyValue = null; + + private boolean validateWrites; + + private OzoneClient ozoneClient; + private ObjectStore objectStore; + private ExecutorService processor; + + private long startTime; + private long jobStartTime; + + private AtomicLong volumeCreationTime; + private AtomicLong bucketCreationTime; + private AtomicLong keyCreationTime; + private AtomicLong keyWriteTime; + + private AtomicLong totalBytesWritten; + + private AtomicInteger numberOfVolumesCreated; + private AtomicInteger numberOfBucketsCreated; + private AtomicLong numberOfKeysAdded; + + private Long totalWritesValidated; + private Long writeValidationSuccessCount; + private Long writeValidationFailureCount; + + private BlockingQueue validationQueue; + private ArrayList histograms = new ArrayList<>(); + + private OzoneConfiguration ozoneConfiguration; + + RandomKeyGenerator() { + } + + @VisibleForTesting + RandomKeyGenerator(OzoneConfiguration ozoneConfiguration) { + this.ozoneConfiguration = ozoneConfiguration; + } + + public void init(OzoneConfiguration configuration) throws IOException { + startTime = System.nanoTime(); + jobStartTime = System.currentTimeMillis(); + volumeCreationTime = new AtomicLong(); + bucketCreationTime = new AtomicLong(); + keyCreationTime = new AtomicLong(); + keyWriteTime = new AtomicLong(); + totalBytesWritten = new AtomicLong(); + numberOfVolumesCreated = new AtomicInteger(); + numberOfBucketsCreated = new AtomicInteger(); + numberOfKeysAdded = new AtomicLong(); + ozoneClient = OzoneClientFactory.getClient(configuration); + objectStore = ozoneClient.getObjectStore(); + for (FreonOps ops : FreonOps.values()) { + histograms.add(ops.ordinal(), new Histogram(new UniformReservoir())); + } + } + + @Override + public Void call() throws Exception { + if (ozoneConfiguration != null) { + init(ozoneConfiguration); + } else { + init(freon.createOzoneConfiguration()); + } + + keyValue = + DFSUtil.string2Bytes(RandomStringUtils.randomAscii(keySize - 36)); + + LOG.info("Number of Threads: " + numOfThreads); + threadPoolSize = + min(numOfVolumes, numOfThreads); + processor = Executors.newFixedThreadPool(threadPoolSize); + addShutdownHook(); + + LOG.info("Number of Volumes: {}.", numOfVolumes); + LOG.info("Number of Buckets per Volume: {}.", numOfBuckets); + LOG.info("Number of Keys per Bucket: {}.", numOfKeys); + LOG.info("Key size: {} bytes", keySize); + for (int i = 0; i < numOfVolumes; i++) { + String volume = "vol-" + i + "-" + + RandomStringUtils.randomNumeric(5); + processor.submit(new OfflineProcessor(volume)); + } + + Thread validator = null; + if (validateWrites) { + totalWritesValidated = 0L; + writeValidationSuccessCount = 0L; + writeValidationFailureCount = 0L; + + validationQueue = + new ArrayBlockingQueue<>(numOfThreads); + validator = new Thread(new Validator()); + validator.start(); + LOG.info("Data validation is enabled."); + } + Thread progressbar = getProgressBarThread(); + LOG.info("Starting progress bar Thread."); + progressbar.start(); + processor.shutdown(); + processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); + completed = true; + progressbar.join(); + if (validateWrites) { + validator.join(); + } + ozoneClient.close(); + return null; + } + + private void parseOptions(CommandLine cmdLine) { + if (keySize < 1024) { + throw new IllegalArgumentException( + "keySize can not be less than 1024 bytes"); + } + + } + + /** + * Adds ShutdownHook to print statistics. + */ + private void addShutdownHook() { + Runtime.getRuntime().addShutdownHook( + new Thread(() -> printStats(System.out))); + } + + private Thread getProgressBarThread() { + Supplier currentValue; + long maxValue; + + currentValue = () -> numberOfKeysAdded.get(); + maxValue = numOfVolumes * + numOfBuckets * + numOfKeys; + + Thread progressBarThread = new Thread( + new ProgressBar(System.out, currentValue, maxValue)); + progressBarThread.setName("ProgressBar"); + return progressBarThread; + } + + /** + * Prints stats of {@link Freon} run to the PrintStream. + * + * @param out PrintStream + */ + private void printStats(PrintStream out) { + long endTime = System.nanoTime() - startTime; + String execTime = DurationFormatUtils + .formatDuration(TimeUnit.NANOSECONDS.toMillis(endTime), + DURATION_FORMAT); + + long volumeTime = TimeUnit.NANOSECONDS.toMillis(volumeCreationTime.get()) + / threadPoolSize; + String prettyAverageVolumeTime = + DurationFormatUtils.formatDuration(volumeTime, DURATION_FORMAT); + + long bucketTime = TimeUnit.NANOSECONDS.toMillis(bucketCreationTime.get()) + / threadPoolSize; + String prettyAverageBucketTime = + DurationFormatUtils.formatDuration(bucketTime, DURATION_FORMAT); + + long averageKeyCreationTime = + TimeUnit.NANOSECONDS.toMillis(keyCreationTime.get()) + / threadPoolSize; + String prettyAverageKeyCreationTime = DurationFormatUtils + .formatDuration(averageKeyCreationTime, DURATION_FORMAT); + + long averageKeyWriteTime = + TimeUnit.NANOSECONDS.toMillis(keyWriteTime.get()) / threadPoolSize; + String prettyAverageKeyWriteTime = DurationFormatUtils + .formatDuration(averageKeyWriteTime, DURATION_FORMAT); + + out.println(); + out.println("***************************************************"); + out.println("Status: " + (exception ? "Failed" : "Success")); + out.println("Git Base Revision: " + VersionInfo.getRevision()); + out.println("Number of Volumes created: " + numberOfVolumesCreated); + out.println("Number of Buckets created: " + numberOfBucketsCreated); + out.println("Number of Keys added: " + numberOfKeysAdded); + out.println("Ratis replication factor: " + factor.name()); + out.println("Ratis replication type: " + type.name()); + out.println( + "Average Time spent in volume creation: " + prettyAverageVolumeTime); + out.println( + "Average Time spent in bucket creation: " + prettyAverageBucketTime); + out.println( + "Average Time spent in key creation: " + prettyAverageKeyCreationTime); + out.println( + "Average Time spent in key write: " + prettyAverageKeyWriteTime); + out.println("Total bytes written: " + totalBytesWritten); + if (validateWrites) { + out.println("Total number of writes validated: " + + totalWritesValidated); + out.println("Writes validated: " + + (100.0 * totalWritesValidated / numberOfKeysAdded.get()) + + " %"); + out.println("Successful validation: " + + writeValidationSuccessCount); + out.println("Unsuccessful validation: " + + writeValidationFailureCount); + } + out.println("Total Execution time: " + execTime); + out.println("***************************************************"); + + if (jsonDir != null) { + + String[][] quantileTime = + new String[FreonOps.values().length][QUANTILES + 1]; + String[] deviations = new String[FreonOps.values().length]; + String[] means = new String[FreonOps.values().length]; + for (FreonOps ops : FreonOps.values()) { + Snapshot snapshot = histograms.get(ops.ordinal()).getSnapshot(); + for (int i = 0; i <= QUANTILES; i++) { + quantileTime[ops.ordinal()][i] = DurationFormatUtils.formatDuration( + TimeUnit.NANOSECONDS + .toMillis((long) snapshot.getValue((1.0 / QUANTILES) * i)), + DURATION_FORMAT); + } + deviations[ops.ordinal()] = DurationFormatUtils.formatDuration( + TimeUnit.NANOSECONDS.toMillis((long) snapshot.getStdDev()), + DURATION_FORMAT); + means[ops.ordinal()] = DurationFormatUtils.formatDuration( + TimeUnit.NANOSECONDS.toMillis((long) snapshot.getMean()), + DURATION_FORMAT); + } + + FreonJobInfo jobInfo = new FreonJobInfo().setExecTime(execTime) + .setGitBaseRevision(VersionInfo.getRevision()) + .setMeanVolumeCreateTime(means[FreonOps.VOLUME_CREATE.ordinal()]) + .setDeviationVolumeCreateTime( + deviations[FreonOps.VOLUME_CREATE.ordinal()]) + .setTenQuantileVolumeCreateTime( + quantileTime[FreonOps.VOLUME_CREATE.ordinal()]) + .setMeanBucketCreateTime(means[FreonOps.BUCKET_CREATE.ordinal()]) + .setDeviationBucketCreateTime( + deviations[FreonOps.BUCKET_CREATE.ordinal()]) + .setTenQuantileBucketCreateTime( + quantileTime[FreonOps.BUCKET_CREATE.ordinal()]) + .setMeanKeyCreateTime(means[FreonOps.KEY_CREATE.ordinal()]) + .setDeviationKeyCreateTime(deviations[FreonOps.KEY_CREATE.ordinal()]) + .setTenQuantileKeyCreateTime( + quantileTime[FreonOps.KEY_CREATE.ordinal()]) + .setMeanKeyWriteTime(means[FreonOps.KEY_WRITE.ordinal()]) + .setDeviationKeyWriteTime(deviations[FreonOps.KEY_WRITE.ordinal()]) + .setTenQuantileKeyWriteTime( + quantileTime[FreonOps.KEY_WRITE.ordinal()]); + String jsonName = + new SimpleDateFormat("yyyyMMddHHmmss").format(Time.now()) + ".json"; + String jsonPath = jsonDir + "/" + jsonName; + FileOutputStream os = null; + try { + os = new FileOutputStream(jsonPath); + ObjectMapper mapper = new ObjectMapper(); + mapper.setVisibility(PropertyAccessor.FIELD, + JsonAutoDetect.Visibility.ANY); + ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter(); + writer.writeValue(os, jobInfo); + } catch (FileNotFoundException e) { + out.println("Json File could not be created for the path: " + jsonPath); + out.println(e); + } catch (IOException e) { + out.println("Json object could not be created"); + out.println(e); + } finally { + try { + if (os != null) { + os.close(); + } + } catch (IOException e) { + LOG.warn("Could not close the output stream for json", e); + } + } + } + } + + /** + * Returns the number of volumes created. + * + * @return volume count. + */ + @VisibleForTesting + int getNumberOfVolumesCreated() { + return numberOfVolumesCreated.get(); + } + + /** + * Returns the number of buckets created. + * + * @return bucket count. + */ + @VisibleForTesting + int getNumberOfBucketsCreated() { + return numberOfBucketsCreated.get(); + } + + /** + * Returns the number of keys added. + * + * @return keys count. + */ + @VisibleForTesting + long getNumberOfKeysAdded() { + return numberOfKeysAdded.get(); + } + + /** + * Returns true if random validation of write is enabled. + * + * @return validateWrites + */ + @VisibleForTesting + boolean getValidateWrites() { + return validateWrites; + } + + /** + * Returns the number of keys validated. + * + * @return validated key count. + */ + @VisibleForTesting + long getTotalKeysValidated() { + return totalWritesValidated; + } + + /** + * Returns the number of successful validation. + * + * @return successful validation count. + */ + @VisibleForTesting + long getSuccessfulValidationCount() { + return writeValidationSuccessCount; + } + + /** + * Returns the number of unsuccessful validation. + * + * @return unsuccessful validation count. + */ + @VisibleForTesting + long getUnsuccessfulValidationCount() { + return writeValidationFailureCount; + } + + /** + * Returns the length of the common key value initialized. + * + * @return key value length initialized. + */ + @VisibleForTesting + long getKeyValueLength() { + return keyValue.length; + } + + /** + * Wrapper to hold ozone key-value pair. + */ + private static class KeyValue { + + /** + * Bucket name associated with the key-value. + */ + private OzoneBucket bucket; + /** + * Key name associated with the key-value. + */ + private String key; + /** + * Value associated with the key-value. + */ + private byte[] value; + + /** + * Constructs a new ozone key-value pair. + * + * @param key key part + * @param value value part + */ + KeyValue(OzoneBucket bucket, String key, byte[] value) { + this.bucket = bucket; + this.key = key; + this.value = value; + } + } + + private class OfflineProcessor implements Runnable { + + private int totalBuckets; + private int totalKeys; + private String volumeName; + + OfflineProcessor(String volumeName) { + this.totalBuckets = numOfBuckets; + this.totalKeys = numOfKeys; + this.volumeName = volumeName; + } + + @Override + public void run() { + LOG.trace("Creating volume: {}", volumeName); + long start = System.nanoTime(); + OzoneVolume volume; + try { + objectStore.createVolume(volumeName); + long volumeCreationDuration = System.nanoTime() - start; + volumeCreationTime.getAndAdd(volumeCreationDuration); + histograms.get(FreonOps.VOLUME_CREATE.ordinal()) + .update(volumeCreationDuration); + numberOfVolumesCreated.getAndIncrement(); + volume = objectStore.getVolume(volumeName); + } catch (IOException e) { + exception = true; + LOG.error("Could not create volume", e); + return; + } + + Long threadKeyWriteTime = 0L; + for (int j = 0; j < totalBuckets; j++) { + String bucketName = "bucket-" + j + "-" + + RandomStringUtils.randomNumeric(5); + try { + LOG.trace("Creating bucket: {} in volume: {}", + bucketName, volume.getName()); + start = System.nanoTime(); + volume.createBucket(bucketName); + long bucketCreationDuration = System.nanoTime() - start; + histograms.get(FreonOps.BUCKET_CREATE.ordinal()) + .update(bucketCreationDuration); + bucketCreationTime.getAndAdd(bucketCreationDuration); + numberOfBucketsCreated.getAndIncrement(); + OzoneBucket bucket = volume.getBucket(bucketName); + for (int k = 0; k < totalKeys; k++) { + String key = "key-" + k + "-" + + RandomStringUtils.randomNumeric(5); + byte[] randomValue = + DFSUtil.string2Bytes(UUID.randomUUID().toString()); + try { + LOG.trace("Adding key: {} in bucket: {} of volume: {}", + key, bucket, volume); + long keyCreateStart = System.nanoTime(); + OzoneOutputStream os = + bucket.createKey(key, keySize, type, factor); + long keyCreationDuration = System.nanoTime() - keyCreateStart; + histograms.get(FreonOps.KEY_CREATE.ordinal()) + .update(keyCreationDuration); + keyCreationTime.getAndAdd(keyCreationDuration); + long keyWriteStart = System.nanoTime(); + os.write(keyValue); + os.write(randomValue); + os.close(); + long keyWriteDuration = System.nanoTime() - keyWriteStart; + threadKeyWriteTime += keyWriteDuration; + histograms.get(FreonOps.KEY_WRITE.ordinal()) + .update(keyWriteDuration); + totalBytesWritten.getAndAdd(keySize); + numberOfKeysAdded.getAndIncrement(); + if (validateWrites) { + byte[] value = ArrayUtils.addAll(keyValue, randomValue); + boolean validate = validationQueue.offer( + new KeyValue(bucket, key, value)); + if (validate) { + LOG.trace("Key {}, is queued for validation.", key); + } + } + } catch (Exception e) { + exception = true; + LOG.error("Exception while adding key: {} in bucket: {}" + + " of volume: {}.", key, bucket, volume, e); + } + } + } catch (Exception e) { + exception = true; + LOG.error("Exception while creating bucket: {}" + + " in volume: {}.", bucketName, volume, e); + } + } + + keyWriteTime.getAndAdd(threadKeyWriteTime); + } + + } + + private final class FreonJobInfo { + + private String status; + private String gitBaseRevision; + private String jobStartTime; + private int numOfVolumes; + private int numOfBuckets; + private int numOfKeys; + private int numOfThreads; + private String dataWritten; + private String execTime; + private String replicationFactor; + private String replicationType; + + private int keySize; + + private String totalThroughputPerSecond; + + private String meanVolumeCreateTime; + private String deviationVolumeCreateTime; + private String[] tenQuantileVolumeCreateTime; + + private String meanBucketCreateTime; + private String deviationBucketCreateTime; + private String[] tenQuantileBucketCreateTime; + + private String meanKeyCreateTime; + private String deviationKeyCreateTime; + private String[] tenQuantileKeyCreateTime; + + private String meanKeyWriteTime; + private String deviationKeyWriteTime; + private String[] tenQuantileKeyWriteTime; + + private FreonJobInfo() { + this.status = exception ? "Failed" : "Success"; + this.numOfVolumes = RandomKeyGenerator.this.numOfVolumes; + this.numOfBuckets = RandomKeyGenerator.this.numOfBuckets; + this.numOfKeys = RandomKeyGenerator.this.numOfKeys; + this.numOfThreads = RandomKeyGenerator.this.numOfThreads; + this.keySize = RandomKeyGenerator.this.keySize; + this.jobStartTime = Time.formatTime(RandomKeyGenerator.this.jobStartTime); + this.replicationFactor = RandomKeyGenerator.this.factor.name(); + this.replicationType = RandomKeyGenerator.this.type.name(); + + long totalBytes = + (long) numOfVolumes * numOfBuckets * numOfKeys * keySize; + this.dataWritten = getInStorageUnits((double) totalBytes); + this.totalThroughputPerSecond = getInStorageUnits( + (totalBytes * 1.0) / TimeUnit.NANOSECONDS + .toSeconds( + RandomKeyGenerator.this.keyWriteTime.get() / threadPoolSize)); + } + + private String getInStorageUnits(Double value) { + double size; + OzoneQuota.Units unit; + if ((long) (value / OzoneConsts.TB) != 0) { + size = value / OzoneConsts.TB; + unit = OzoneQuota.Units.TB; + } else if ((long) (value / OzoneConsts.GB) != 0) { + size = value / OzoneConsts.GB; + unit = OzoneQuota.Units.GB; + } else if ((long) (value / OzoneConsts.MB) != 0) { + size = value / OzoneConsts.MB; + unit = OzoneQuota.Units.MB; + } else if ((long) (value / OzoneConsts.KB) != 0) { + size = value / OzoneConsts.KB; + unit = OzoneQuota.Units.KB; + } else { + size = value; + unit = OzoneQuota.Units.BYTES; + } + return size + " " + unit; + } + + public FreonJobInfo setGitBaseRevision(String gitBaseRevisionVal) { + gitBaseRevision = gitBaseRevisionVal; + return this; + } + + public FreonJobInfo setExecTime(String execTimeVal) { + execTime = execTimeVal; + return this; + } + + public FreonJobInfo setMeanKeyWriteTime(String deviationKeyWriteTimeVal) { + this.meanKeyWriteTime = deviationKeyWriteTimeVal; + return this; + } + + public FreonJobInfo setDeviationKeyWriteTime( + String deviationKeyWriteTimeVal) { + this.deviationKeyWriteTime = deviationKeyWriteTimeVal; + return this; + } + + public FreonJobInfo setTenQuantileKeyWriteTime( + String[] tenQuantileKeyWriteTimeVal) { + this.tenQuantileKeyWriteTime = tenQuantileKeyWriteTimeVal; + return this; + } + + public FreonJobInfo setMeanKeyCreateTime(String deviationKeyWriteTimeVal) { + this.meanKeyCreateTime = deviationKeyWriteTimeVal; + return this; + } + + public FreonJobInfo setDeviationKeyCreateTime( + String deviationKeyCreateTimeVal) { + this.deviationKeyCreateTime = deviationKeyCreateTimeVal; + return this; + } + + public FreonJobInfo setTenQuantileKeyCreateTime( + String[] tenQuantileKeyCreateTimeVal) { + this.tenQuantileKeyCreateTime = tenQuantileKeyCreateTimeVal; + return this; + } + + public FreonJobInfo setMeanBucketCreateTime( + String deviationKeyWriteTimeVal) { + this.meanBucketCreateTime = deviationKeyWriteTimeVal; + return this; + } + + public FreonJobInfo setDeviationBucketCreateTime( + String deviationBucketCreateTimeVal) { + this.deviationBucketCreateTime = deviationBucketCreateTimeVal; + return this; + } + + public FreonJobInfo setTenQuantileBucketCreateTime( + String[] tenQuantileBucketCreateTimeVal) { + this.tenQuantileBucketCreateTime = tenQuantileBucketCreateTimeVal; + return this; + } + + public FreonJobInfo setMeanVolumeCreateTime( + String deviationKeyWriteTimeVal) { + this.meanVolumeCreateTime = deviationKeyWriteTimeVal; + return this; + } + + public FreonJobInfo setDeviationVolumeCreateTime( + String deviationVolumeCreateTimeVal) { + this.deviationVolumeCreateTime = deviationVolumeCreateTimeVal; + return this; + } + + public FreonJobInfo setTenQuantileVolumeCreateTime( + String[] tenQuantileVolumeCreateTimeVal) { + this.tenQuantileVolumeCreateTime = tenQuantileVolumeCreateTimeVal; + return this; + } + + public String getJobStartTime() { + return jobStartTime; + } + + public int getNumOfVolumes() { + return numOfVolumes; + } + + public int getNumOfBuckets() { + return numOfBuckets; + } + + public int getNumOfKeys() { + return numOfKeys; + } + + public int getNumOfThreads() { + return numOfThreads; + } + + public String getExecTime() { + return execTime; + } + + public String getReplicationFactor() { + return replicationFactor; + } + + public String getReplicationType() { + return replicationType; + } + + public String getStatus() { + return status; + } + + public int getKeySize() { + return keySize; + } + + public String getGitBaseRevision() { + return gitBaseRevision; + } + + public String getDataWritten() { + return dataWritten; + } + + public String getTotalThroughputPerSecond() { + return totalThroughputPerSecond; + } + + public String getMeanVolumeCreateTime() { + return meanVolumeCreateTime; + } + + public String getDeviationVolumeCreateTime() { + return deviationVolumeCreateTime; + } + + public String[] getTenQuantileVolumeCreateTime() { + return tenQuantileVolumeCreateTime; + } + + public String getMeanBucketCreateTime() { + return meanBucketCreateTime; + } + + public String getDeviationBucketCreateTime() { + return deviationBucketCreateTime; + } + + public String[] getTenQuantileBucketCreateTime() { + return tenQuantileBucketCreateTime; + } + + public String getMeanKeyCreateTime() { + return meanKeyCreateTime; + } + + public String getDeviationKeyCreateTime() { + return deviationKeyCreateTime; + } + + public String[] getTenQuantileKeyCreateTime() { + return tenQuantileKeyCreateTime; + } + + public String getMeanKeyWriteTime() { + return meanKeyWriteTime; + } + + public String getDeviationKeyWriteTime() { + return deviationKeyWriteTime; + } + + public String[] getTenQuantileKeyWriteTime() { + return tenQuantileKeyWriteTime; + } + } + + private class ProgressBar implements Runnable { + + private static final long REFRESH_INTERVAL = 1000L; + + private PrintStream stream; + private Supplier currentValue; + private long maxValue; + + ProgressBar(PrintStream stream, Supplier currentValue, + long maxValue) { + this.stream = stream; + this.currentValue = currentValue; + this.maxValue = maxValue; + } + + @Override + public void run() { + try { + stream.println(); + long value; + while ((value = currentValue.get()) < maxValue) { + print(value); + if (completed) { + break; + } + Thread.sleep(REFRESH_INTERVAL); + } + if (exception) { + stream.println(); + stream.println("Incomplete termination, " + + "check log for exception."); + } else { + print(maxValue); + } + stream.println(); + } catch (InterruptedException e) { + } + } + + /** + * Given current value prints the progress bar. + * + * @param value + */ + private void print(long value) { + stream.print('\r'); + double percent = 100.0 * value / maxValue; + StringBuilder sb = new StringBuilder(); + sb.append(" " + String.format("%.2f", percent) + "% |"); + + for (int i = 0; i <= percent; i++) { + sb.append('█'); + } + for (int j = 0; j < 100 - percent; j++) { + sb.append(' '); + } + sb.append("| "); + sb.append(value + "/" + maxValue); + long timeInSec = TimeUnit.SECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600, + (timeInSec % 3600) / 60, timeInSec % 60); + sb.append(" Time: " + timeToPrint); + stream.print(sb); + } + } + + /** + * Validates the write done in ozone cluster. + */ + private class Validator implements Runnable { + + @Override + public void run() { + while (!completed) { + try { + KeyValue kv = validationQueue.poll(5, TimeUnit.SECONDS); + if (kv != null) { + + OzoneInputStream is = kv.bucket.readKey(kv.key); + byte[] value = new byte[kv.value.length]; + int length = is.read(value); + totalWritesValidated++; + if (length == kv.value.length && Arrays.equals(value, kv.value)) { + writeValidationSuccessCount++; + } else { + writeValidationFailureCount++; + LOG.warn("Data validation error for key {}/{}/{}", + kv.bucket.getVolumeName(), kv.bucket, kv.key); + LOG.warn("Expected: {}, Actual: {}", + DFSUtil.bytes2String(kv.value), + DFSUtil.bytes2String(value)); + } + } + } catch (IOException | InterruptedException ex) { + LOG.error("Exception while validating write: " + ex.getMessage()); + } + } + } + } + + @VisibleForTesting + public void setNumOfVolumes(int numOfVolumes) { + this.numOfVolumes = numOfVolumes; + } + + @VisibleForTesting + public void setNumOfBuckets(int numOfBuckets) { + this.numOfBuckets = numOfBuckets; + } + + @VisibleForTesting + public void setNumOfKeys(int numOfKeys) { + this.numOfKeys = numOfKeys; + } + + @VisibleForTesting + public void setNumOfThreads(int numOfThreads) { + this.numOfThreads = numOfThreads; + } + + @VisibleForTesting + public void setKeySize(int keySize) { + this.keySize = keySize; + } + + @VisibleForTesting + public void setType(ReplicationType type) { + this.type = type; + } + + @VisibleForTesting + public void setFactor(ReplicationFactor factor) { + this.factor = factor; + } + + @VisibleForTesting + public void setValidateWrites(boolean validateWrites) { + this.validateWrites = validateWrites; + } +}