diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java index e6888b9d40b..6e1e02ccdbe 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -64,7 +64,6 @@ import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.common.annotations.VisibleForTesting; -import static java.lang.Math.min; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.time.DurationFormatUtils; import org.slf4j.Logger; @@ -102,6 +101,8 @@ public final class RandomKeyGenerator implements Callable { private static final int QUANTILES = 10; + private static final int CHECK_INTERVAL_MILLIS = 5000; + private byte[] keyValueBuffer = null; private static final String DIGEST_ALGORITHM = "MD5"; @@ -180,7 +181,7 @@ public final class RandomKeyGenerator implements Callable { private OzoneClient ozoneClient; private ObjectStore objectStore; - private ExecutorService processor; + private ExecutorService executor; private long startTime; private long jobStartTime; @@ -259,9 +260,8 @@ public final class RandomKeyGenerator implements Callable { } LOG.info("Number of Threads: " + numOfThreads); - threadPoolSize = - min(numOfVolumes, numOfThreads); - processor = Executors.newFixedThreadPool(threadPoolSize); + threadPoolSize = numOfThreads; + executor = Executors.newFixedThreadPool(threadPoolSize); addShutdownHook(); LOG.info("Number of Volumes: {}.", numOfVolumes); @@ -270,9 +270,8 @@ public final class RandomKeyGenerator implements Callable { LOG.info("Key size: {} bytes", keySize); LOG.info("Buffer size: {} bytes", bufferSize); for (int i = 0; i < numOfVolumes; i++) { - String volume = "vol-" + i + "-" + - RandomStringUtils.randomNumeric(5); - processor.submit(new OfflineProcessor(volume)); + String volumeName = "vol-" + i + "-" + RandomStringUtils.randomNumeric(5); + executor.submit(new VolumeProcessor(volumeName)); } Thread validator = null; @@ -301,8 +300,17 @@ public final class RandomKeyGenerator implements Callable { progressbar.start(); - processor.shutdown(); - processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); + // wait until all keys are added or exception occurred. + while ((numberOfKeysAdded.get() != numOfVolumes * numOfBuckets * numOfKeys) + && exception == null) { + try { + Thread.sleep(CHECK_INTERVAL_MILLIS); + } catch (InterruptedException e) { + throw e; + } + } + executor.shutdown(); + executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); completed = true; if (exception != null) { @@ -571,15 +579,10 @@ public final class RandomKeyGenerator implements Callable { } } - private class OfflineProcessor implements Runnable { - - private int totalBuckets; - private int totalKeys; + private class VolumeProcessor implements Runnable { private String volumeName; - OfflineProcessor(String volumeName) { - this.totalBuckets = numOfBuckets; - this.totalKeys = numOfKeys; + VolumeProcessor(String volumeName) { this.volumeName = volumeName; } @@ -604,88 +607,118 @@ public final class RandomKeyGenerator implements Callable { return; } - Long threadKeyWriteTime = 0L; - for (int j = 0; j < totalBuckets; j++) { - String bucketName = "bucket-" + j + "-" + + for (int i = 0; i < numOfBuckets; i++) { + String bucketName = "bucket-" + i + "-" + RandomStringUtils.randomNumeric(5); - try { - LOG.trace("Creating bucket: {} in volume: {}", - bucketName, volume.getName()); - start = System.nanoTime(); - try (Scope scope = GlobalTracer.get().buildSpan("createBucket") - .startActive(true)) { - volume.createBucket(bucketName); - long bucketCreationDuration = System.nanoTime() - start; - histograms.get(FreonOps.BUCKET_CREATE.ordinal()) - .update(bucketCreationDuration); - bucketCreationTime.getAndAdd(bucketCreationDuration); - numberOfBucketsCreated.getAndIncrement(); - } - OzoneBucket bucket = volume.getBucket(bucketName); - for (int k = 0; k < totalKeys; k++) { - String key = "key-" + k + "-" + - RandomStringUtils.randomNumeric(5); - byte[] randomValue = - DFSUtil.string2Bytes(UUID.randomUUID().toString()); - try { - LOG.trace("Adding key: {} in bucket: {} of volume: {}", - key, bucket, volume); - long keyCreateStart = System.nanoTime(); - try (Scope scope = GlobalTracer.get().buildSpan("createKey") - .startActive(true)) { - OzoneOutputStream os = - bucket - .createKey(key, keySize, type, factor, new HashMap<>()); - long keyCreationDuration = System.nanoTime() - keyCreateStart; - histograms.get(FreonOps.KEY_CREATE.ordinal()) - .update(keyCreationDuration); - keyCreationTime.getAndAdd(keyCreationDuration); - long keyWriteStart = System.nanoTime(); - try (Scope writeScope = GlobalTracer.get() - .buildSpan("writeKeyData") - .startActive(true)) { - for (long nrRemaining = keySize - randomValue.length; - nrRemaining > 0; nrRemaining -= bufferSize) { - int curSize = (int)Math.min(bufferSize, nrRemaining); - os.write(keyValueBuffer, 0, curSize); - } - os.write(randomValue); - os.close(); - } - - long keyWriteDuration = System.nanoTime() - keyWriteStart; - - threadKeyWriteTime += keyWriteDuration; - histograms.get(FreonOps.KEY_WRITE.ordinal()) - .update(keyWriteDuration); - totalBytesWritten.getAndAdd(keySize); - numberOfKeysAdded.getAndIncrement(); - } - if (validateWrites) { - MessageDigest tmpMD = (MessageDigest)commonInitialMD.clone(); - tmpMD.update(randomValue); - boolean validate = validationQueue.offer( - new KeyValidate(bucket, key, tmpMD.digest())); - if (validate) { - LOG.trace("Key {}, is queued for validation.", key); - } - } - } catch (Exception e) { - exception = e; - LOG.error("Exception while adding key: {} in bucket: {}" + - " of volume: {}.", key, bucket, volume, e); - } - } - } catch (Exception e) { - exception = e; - LOG.error("Exception while creating bucket: {}" + - " in volume: {}.", bucketName, volume, e); - } + BucketProcessor bp = new BucketProcessor(volume, bucketName); + executor.submit(bp); } + } + } - keyWriteTime.getAndAdd(threadKeyWriteTime); + private class BucketProcessor implements Runnable { + private OzoneVolume volume; + private String bucketName; + + BucketProcessor(OzoneVolume volume, String bucketName) { + this.volume = volume; + this.bucketName = bucketName; } + @Override + @SuppressFBWarnings("REC_CATCH_EXCEPTION") + public void run() { + LOG.trace("Creating bucket: {} in volume: {}", + bucketName, volume.getName()); + long start = System.nanoTime(); + OzoneBucket bucket; + try (Scope scope = GlobalTracer.get().buildSpan("createBucket") + .startActive(true)) { + volume.createBucket(bucketName); + long bucketCreationDuration = System.nanoTime() - start; + histograms.get(FreonOps.BUCKET_CREATE.ordinal()) + .update(bucketCreationDuration); + bucketCreationTime.getAndAdd(bucketCreationDuration); + numberOfBucketsCreated.getAndIncrement(); + + bucket = volume.getBucket(bucketName); + } catch (IOException e) { + exception = e; + LOG.error("Could not create bucket ", e); + return; + } + + for (int i = 0; i < numOfKeys; i++) { + String keyName = "key-" + i + "-" + RandomStringUtils.randomNumeric(5); + KeyProcessor kp = new KeyProcessor(bucket, keyName); + executor.submit(kp); + } + } + } + + private class KeyProcessor implements Runnable { + private OzoneBucket bucket; + private String keyName; + + KeyProcessor(OzoneBucket bucket, String keyName) { + this.bucket = bucket; + this.keyName = keyName; + } + + @Override + @SuppressFBWarnings("REC_CATCH_EXCEPTION") + public void run() { + String bucketName = bucket.getName(); + String volumeName = bucket.getVolumeName(); + LOG.trace("Adding key: {} in bucket: {} of volume: {}", + keyName, bucketName, volumeName); + byte[] randomValue = DFSUtil.string2Bytes(UUID.randomUUID().toString()); + try { + long keyCreateStart = System.nanoTime(); + try (Scope scope = GlobalTracer.get().buildSpan("createKey") + .startActive(true)) { + OzoneOutputStream os = bucket.createKey(keyName, keySize, type, + factor, new HashMap<>()); + long keyCreationDuration = System.nanoTime() - keyCreateStart; + histograms.get(FreonOps.KEY_CREATE.ordinal()) + .update(keyCreationDuration); + keyCreationTime.getAndAdd(keyCreationDuration); + + long keyWriteStart = System.nanoTime(); + try (Scope writeScope = GlobalTracer.get().buildSpan("writeKeyData") + .startActive(true)) { + for (long nrRemaining = keySize - randomValue.length; + nrRemaining > 0; nrRemaining -= bufferSize) { + int curSize = (int)Math.min(bufferSize, nrRemaining); + os.write(keyValueBuffer, 0, curSize); + } + os.write(randomValue); + os.close(); + + long keyWriteDuration = System.nanoTime() - keyWriteStart; + histograms.get(FreonOps.KEY_WRITE.ordinal()) + .update(keyWriteDuration); + keyWriteTime.getAndAdd(keyWriteDuration); + totalBytesWritten.getAndAdd(keySize); + numberOfKeysAdded.getAndIncrement(); + } + } + + if (validateWrites) { + MessageDigest tmpMD = (MessageDigest)commonInitialMD.clone(); + tmpMD.update(randomValue); + boolean validate = validationQueue.offer( + new KeyValidate(bucket, keyName, tmpMD.digest())); + if (validate) { + LOG.trace("Key {}, is queued for validation.", keyName); + } + } + } catch (Exception e) { + exception = e; + LOG.error("Exception while adding key: {} in bucket: {}" + + " of volume: {}.", keyName, bucketName, volumeName, e); + } + } } private final class FreonJobInfo { @@ -1028,4 +1061,9 @@ public final class RandomKeyGenerator implements Callable { public void setValidateWrites(boolean validateWrites) { this.validateWrites = validateWrites; } + + @VisibleForTesting + public int getThreadPoolSize() { + return threadPoolSize; + } } diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java index c0873d2df61..748972eafe5 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java @@ -127,4 +127,19 @@ public class TestRandomKeyGenerator { Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded()); Assert.assertEquals(1, randomKeyGenerator.getSuccessfulValidationCount()); } + + @Test + public void testThreadPoolSize() throws Exception { + RandomKeyGenerator randomKeyGenerator = + new RandomKeyGenerator((OzoneConfiguration) cluster.getConf()); + randomKeyGenerator.setNumOfVolumes(1); + randomKeyGenerator.setNumOfBuckets(1); + randomKeyGenerator.setNumOfKeys(1); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); + randomKeyGenerator.setType(ReplicationType.RATIS); + randomKeyGenerator.setNumOfThreads(10); + randomKeyGenerator.call(); + Assert.assertEquals(10, randomKeyGenerator.getThreadPoolSize()); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded()); + } }