diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java index b0461cb1084..e6888b9d40b 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -24,10 +24,8 @@ import java.io.IOException; import java.io.PrintStream; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -67,7 +65,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.common.annotations.VisibleForTesting; import static java.lang.Math.min; -import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.time.DurationFormatUtils; import org.slf4j.Logger; @@ -75,6 +72,8 @@ import org.slf4j.LoggerFactory; import picocli.CommandLine.Command; import picocli.CommandLine.Option; import picocli.CommandLine.ParentCommand; +import java.util.concurrent.LinkedBlockingQueue; +import java.security.MessageDigest; /** * Data generator tool to generate as much keys as possible. @@ -103,6 +102,12 @@ public final class RandomKeyGenerator implements Callable { private static final int QUANTILES = 10; + private byte[] keyValueBuffer = null; + + private static final String DIGEST_ALGORITHM = "MD5"; + // A common initial MesssageDigest for each key without its UUID + private MessageDigest commonInitialMD = null; + private static final Logger LOG = LoggerFactory.getLogger(RandomKeyGenerator.class); @@ -136,7 +141,20 @@ public final class RandomKeyGenerator implements Callable { description = "Specifies the size of Key in bytes to be created", defaultValue = "10240" ) - private int keySize = 10240; + private long keySize = 10240; + + @Option( + names = "--validateWrites", + description = "Specifies whether to validate keys after writing" + ) + private boolean validateWrites = false; + + @Option( + names = "--bufferSize", + description = "Specifies the buffer size while writing", + defaultValue = "4096" + ) + private int bufferSize = 4096; @Option( names = "--json", @@ -159,9 +177,6 @@ public final class RandomKeyGenerator implements Callable { private ReplicationFactor factor = ReplicationFactor.ONE; private int threadPoolSize; - private byte[] keyValue = null; - - private boolean validateWrites; private OzoneClient ozoneClient; private ObjectStore objectStore; @@ -185,7 +200,7 @@ public final class RandomKeyGenerator implements Callable { private Long writeValidationSuccessCount; private Long writeValidationFailureCount; - private BlockingQueue validationQueue; + private BlockingQueue validationQueue; private ArrayList histograms = new ArrayList<>(); private OzoneConfiguration ozoneConfiguration; @@ -228,8 +243,20 @@ public final class RandomKeyGenerator implements Callable { init(freon.createOzoneConfiguration()); } - keyValue = - DFSUtil.string2Bytes(RandomStringUtils.randomAscii(keySize - 36)); + keyValueBuffer = DFSUtil.string2Bytes( + RandomStringUtils.randomAscii(bufferSize)); + + // Compute the common initial digest for all keys without their UUID + if (validateWrites) { + commonInitialMD = DigestUtils.getDigest(DIGEST_ALGORITHM); + int uuidLength = UUID.randomUUID().toString().length(); + keySize = Math.max(uuidLength, keySize); + for (long nrRemaining = keySize - uuidLength; nrRemaining > 0; + nrRemaining -= bufferSize) { + int curSize = (int)Math.min(bufferSize, nrRemaining); + commonInitialMD.update(keyValueBuffer, 0, curSize); + } + } LOG.info("Number of Threads: " + numOfThreads); threadPoolSize = @@ -241,6 +268,7 @@ public final class RandomKeyGenerator implements Callable { LOG.info("Number of Buckets per Volume: {}.", numOfBuckets); LOG.info("Number of Keys per Bucket: {}.", numOfKeys); LOG.info("Key size: {} bytes", keySize); + LOG.info("Buffer size: {} bytes", bufferSize); for (int i = 0; i < numOfVolumes; i++) { String volume = "vol-" + i + "-" + RandomStringUtils.randomNumeric(5); @@ -253,8 +281,7 @@ public final class RandomKeyGenerator implements Callable { writeValidationSuccessCount = 0L; writeValidationFailureCount = 0L; - validationQueue = - new ArrayBlockingQueue<>(numOfThreads); + validationQueue = new LinkedBlockingQueue<>(); validator = new Thread(new Validator()); validator.start(); LOG.info("Data validation is enabled."); @@ -512,43 +539,35 @@ public final class RandomKeyGenerator implements Callable { } /** - * Returns the length of the common key value initialized. - * - * @return key value length initialized. + * Wrapper to hold ozone keyValidate entry. */ - @VisibleForTesting - long getKeyValueLength() { - return keyValue.length; - } - - /** - * Wrapper to hold ozone key-value pair. - */ - private static class KeyValue { - + private static class KeyValidate { /** - * Bucket name associated with the key-value. + * Bucket name. */ private OzoneBucket bucket; - /** - * Key name associated with the key-value. - */ - private String key; - /** - * Value associated with the key-value. - */ - private byte[] value; /** - * Constructs a new ozone key-value pair. - * - * @param key key part - * @param value value part + * Key name. */ - KeyValue(OzoneBucket bucket, String key, byte[] value) { + private String keyName; + + /** + * Digest of this key's full value. + */ + private byte[] digest; + + /** + * Constructs a new ozone keyValidate. + * + * @param bucket bucket part + * @param keyName key part + * @param keyName digest of this key's full value + */ + KeyValidate(OzoneBucket bucket, String keyName, byte[] digest) { this.bucket = bucket; - this.key = key; - this.value = value; + this.keyName = keyName; + this.digest = digest; } } @@ -625,7 +644,11 @@ public final class RandomKeyGenerator implements Callable { try (Scope writeScope = GlobalTracer.get() .buildSpan("writeKeyData") .startActive(true)) { - os.write(keyValue); + for (long nrRemaining = keySize - randomValue.length; + nrRemaining > 0; nrRemaining -= bufferSize) { + int curSize = (int)Math.min(bufferSize, nrRemaining); + os.write(keyValueBuffer, 0, curSize); + } os.write(randomValue); os.close(); } @@ -639,9 +662,10 @@ public final class RandomKeyGenerator implements Callable { numberOfKeysAdded.getAndIncrement(); } if (validateWrites) { - byte[] value = ArrayUtils.addAll(keyValue, randomValue); + MessageDigest tmpMD = (MessageDigest)commonInitialMD.clone(); + tmpMD.update(randomValue); boolean validate = validationQueue.offer( - new KeyValue(bucket, key, value)); + new KeyValidate(bucket, key, tmpMD.digest())); if (validate) { LOG.trace("Key {}, is queued for validation.", key); } @@ -678,7 +702,8 @@ public final class RandomKeyGenerator implements Callable { private String replicationFactor; private String replicationType; - private int keySize; + private long keySize; + private int bufferSize; private String totalThroughputPerSecond; @@ -705,6 +730,7 @@ public final class RandomKeyGenerator implements Callable { this.numOfKeys = RandomKeyGenerator.this.numOfKeys; this.numOfThreads = RandomKeyGenerator.this.numOfThreads; this.keySize = RandomKeyGenerator.this.keySize; + this.bufferSize = RandomKeyGenerator.this.bufferSize; this.jobStartTime = Time.formatTime(RandomKeyGenerator.this.jobStartTime); this.replicationFactor = RandomKeyGenerator.this.factor.name(); this.replicationType = RandomKeyGenerator.this.type.name(); @@ -856,10 +882,14 @@ public final class RandomKeyGenerator implements Callable { return status; } - public int getKeySize() { + public long getKeySize() { return keySize; } + public int getBufferSize() { + return bufferSize; + } + public String getGitBaseRevision() { return gitBaseRevision; } @@ -925,28 +955,32 @@ public final class RandomKeyGenerator implements Callable { * Validates the write done in ozone cluster. */ private class Validator implements Runnable { - @Override public void run() { - while (!completed) { - try { - KeyValue kv = validationQueue.poll(5, TimeUnit.SECONDS); - if (kv != null) { + DigestUtils dig = new DigestUtils(DIGEST_ALGORITHM); - OzoneInputStream is = kv.bucket.readKey(kv.key); - byte[] value = new byte[kv.value.length]; - int length = is.read(value); + while (true) { + if (completed && validationQueue.isEmpty()) { + return; + } + + try { + KeyValidate kv = validationQueue.poll(5, TimeUnit.SECONDS); + if (kv != null) { + OzoneInputStream is = kv.bucket.readKey(kv.keyName); + dig.getMessageDigest().reset(); + byte[] curDigest = dig.digest(is); totalWritesValidated++; - if (length == kv.value.length && Arrays.equals(value, kv.value)) { + if (MessageDigest.isEqual(kv.digest, curDigest)) { writeValidationSuccessCount++; } else { writeValidationFailureCount++; LOG.warn("Data validation error for key {}/{}/{}", - kv.bucket.getVolumeName(), kv.bucket, kv.key); + kv.bucket.getVolumeName(), kv.bucket, kv.keyName); LOG.warn("Expected checksum: {}, Actual checksum: {}", - DigestUtils.md5Hex(kv.value), - DigestUtils.md5Hex(value)); + kv.digest, curDigest); } + is.close(); } } catch (IOException | InterruptedException ex) { LOG.error("Exception while validating write: " + ex.getMessage()); @@ -976,7 +1010,7 @@ public final class RandomKeyGenerator implements Callable { } @VisibleForTesting - public void setKeySize(int keySize) { + public void setKeySize(long keySize) { this.keySize = keySize; } diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java index e5bb8ae80f6..c0873d2df61 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java @@ -73,7 +73,6 @@ public class TestRandomKeyGenerator { Assert.assertEquals(2, randomKeyGenerator.getNumberOfVolumesCreated()); Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated()); Assert.assertEquals(100, randomKeyGenerator.getNumberOfKeysAdded()); - Assert.assertEquals(10240 - 36, randomKeyGenerator.getKeyValueLength()); } @Test @@ -109,4 +108,23 @@ public class TestRandomKeyGenerator { Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated()); Assert.assertEquals(100, randomKeyGenerator.getNumberOfKeysAdded()); } + + @Test + public void bigFileThan2GB() throws Exception { + RandomKeyGenerator randomKeyGenerator = + new RandomKeyGenerator((OzoneConfiguration) cluster.getConf()); + randomKeyGenerator.setNumOfVolumes(1); + randomKeyGenerator.setNumOfBuckets(1); + randomKeyGenerator.setNumOfKeys(1); + randomKeyGenerator.setNumOfThreads(1); + randomKeyGenerator.setKeySize(10L + Integer.MAX_VALUE); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); + randomKeyGenerator.setType(ReplicationType.RATIS); + randomKeyGenerator.setValidateWrites(true); + randomKeyGenerator.call(); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated()); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated()); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded()); + Assert.assertEquals(1, randomKeyGenerator.getSuccessfulValidationCount()); + } }