HubSpot Backport: HBASE-27053 IOException during caching of uncompressed block to the block cache (#4610)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Reviewed-by: wenwj0 <wenweijian2@huawei.com>
This commit is contained in:
parent
7023542cce
commit
e4ce632d59
|
@ -489,18 +489,8 @@ public class HFileBlock implements Cacheable {
|
|||
* @return the buffer with header skipped and checksum omitted.
|
||||
*/
|
||||
public ByteBuff getBufferWithoutHeader() {
|
||||
return this.getBufferWithoutHeader(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a buffer that does not include the header or checksum.
|
||||
* @param withChecksum to indicate whether include the checksum or not.
|
||||
* @return the buffer with header skipped and checksum omitted.
|
||||
*/
|
||||
public ByteBuff getBufferWithoutHeader(boolean withChecksum) {
|
||||
ByteBuff dup = getBufferReadOnly();
|
||||
int delta = withChecksum ? 0 : totalChecksumBytes();
|
||||
return dup.position(headerSize()).limit(buf.limit() - delta).slice();
|
||||
return dup.position(headerSize()).slice();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -566,19 +556,21 @@ public class HFileBlock implements Cacheable {
|
|||
sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
|
||||
}
|
||||
|
||||
int cksumBytes = totalChecksumBytes();
|
||||
int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
|
||||
if (dup.limit() != expectedBufLimit) {
|
||||
throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit());
|
||||
if (dup.limit() != onDiskDataSizeWithHeader) {
|
||||
throw new AssertionError(
|
||||
"Expected limit " + onDiskDataSizeWithHeader + ", got " + dup.limit());
|
||||
}
|
||||
|
||||
// We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
|
||||
// block's header, so there are two sensible values for buffer capacity.
|
||||
int hdrSize = headerSize();
|
||||
dup.rewind();
|
||||
if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) {
|
||||
throw new AssertionError("Invalid buffer capacity: " + dup.remaining() +
|
||||
", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
|
||||
if (
|
||||
dup.remaining() != onDiskDataSizeWithHeader
|
||||
&& dup.remaining() != onDiskDataSizeWithHeader + hdrSize
|
||||
) {
|
||||
throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected "
|
||||
+ onDiskDataSizeWithHeader + " or " + (onDiskDataSizeWithHeader + hdrSize));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -636,19 +628,20 @@ public class HFileBlock implements Cacheable {
|
|||
return this;
|
||||
}
|
||||
|
||||
ByteBuff newBuf = allocateBuffer(); // allocates space for the decompressed block
|
||||
ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block
|
||||
HFileBlock unpacked = shallowClone(this, newBuf);
|
||||
boolean succ = false;
|
||||
try {
|
||||
HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
|
||||
? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
|
||||
// Create a duplicated buffer without the header part.
|
||||
int headerSize = this.headerSize();
|
||||
ByteBuff dup = this.buf.duplicate();
|
||||
dup.position(this.headerSize());
|
||||
dup.position(headerSize);
|
||||
dup = dup.slice();
|
||||
// Decode the dup into unpacked#buf
|
||||
ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
|
||||
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
|
||||
ctx.prepareDecoding(unpacked.getOnDiskDataSizeWithHeader() - headerSize,
|
||||
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup);
|
||||
succ = true;
|
||||
return unpacked;
|
||||
} finally {
|
||||
|
@ -663,10 +656,9 @@ public class HFileBlock implements Cacheable {
|
|||
* from the existing buffer. Does not change header fields.
|
||||
* Reserve room to keep checksum bytes too.
|
||||
*/
|
||||
private ByteBuff allocateBuffer() {
|
||||
int cksumBytes = totalChecksumBytes();
|
||||
private ByteBuff allocateBufferForUnpacking() {
|
||||
int headerSize = headerSize();
|
||||
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
|
||||
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader;
|
||||
|
||||
ByteBuff source = buf.duplicate();
|
||||
ByteBuff newBuf = allocator.allocate(capacityNeeded);
|
||||
|
@ -685,9 +677,8 @@ public class HFileBlock implements Cacheable {
|
|||
* calculated heuristic, not tracked attribute of the block.
|
||||
*/
|
||||
public boolean isUnpacked() {
|
||||
final int cksumBytes = totalChecksumBytes();
|
||||
final int headerSize = headerSize();
|
||||
final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
|
||||
final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader;
|
||||
final int bufCapacity = buf.remaining();
|
||||
return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
|
||||
}
|
||||
|
@ -1755,6 +1746,9 @@ public class HFileBlock implements Cacheable {
|
|||
if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
|
||||
return null;
|
||||
}
|
||||
// remove checksum from buffer now that it's verified
|
||||
int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
|
||||
curBlock.limit(sizeWithoutChecksum);
|
||||
long duration = System.currentTimeMillis() - startTime;
|
||||
if (updateMetrics) {
|
||||
HFile.updateReadLatency(duration, pread);
|
||||
|
|
|
@ -162,11 +162,13 @@ public class TestChecksum {
|
|||
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
|
||||
assertTrue(!b.isSharedMem());
|
||||
|
||||
ByteBuff bufferWithChecksum = getBufferWithChecksum(b);
|
||||
|
||||
// verify SingleByteBuff checksum.
|
||||
verifySBBCheckSum(b.getBufferReadOnly());
|
||||
verifySBBCheckSum(bufferWithChecksum);
|
||||
|
||||
// verify MultiByteBuff checksum.
|
||||
verifyMBBCheckSum(b.getBufferReadOnly());
|
||||
verifyMBBCheckSum(bufferWithChecksum);
|
||||
|
||||
ByteBuff data = b.getBufferWithoutHeader();
|
||||
for (int i = 0; i < intCount; i++) {
|
||||
|
@ -183,8 +185,29 @@ public class TestChecksum {
|
|||
}
|
||||
|
||||
/**
|
||||
* Introduce checksum failures and check that we can still read
|
||||
* the data
|
||||
* HFileBlock buffer does not include checksum because it is discarded after verifying upon
|
||||
* reading from disk. We artificially add a checksum onto the buffer for use in testing that
|
||||
* ChecksumUtil.validateChecksum works for SingleByteBuff and MultiByteBuff in
|
||||
* {@link #verifySBBCheckSum(ByteBuff)} and {@link #verifyMBBCheckSum(ByteBuff)}
|
||||
*/
|
||||
private ByteBuff getBufferWithChecksum(HFileBlock block) throws IOException {
|
||||
ByteBuff buf = block.getBufferReadOnly();
|
||||
|
||||
int numBytes =
|
||||
(int) ChecksumUtil.numBytes(buf.remaining(), block.getHFileContext().getBytesPerChecksum());
|
||||
byte[] checksum = new byte[numBytes];
|
||||
ChecksumUtil.generateChecksums(buf.array(), 0, buf.limit(), checksum, 0,
|
||||
block.getHFileContext().getChecksumType(), block.getBytesPerChecksum());
|
||||
|
||||
ByteBuff bufWithChecksum = ByteBuffAllocator.HEAP.allocate(buf.limit() + numBytes);
|
||||
bufWithChecksum.put(buf.array(), 0, buf.limit());
|
||||
bufWithChecksum.put(checksum);
|
||||
|
||||
return bufWithChecksum.rewind();
|
||||
}
|
||||
|
||||
/**
|
||||
* Introduce checksum failures and check that we can still read the data
|
||||
*/
|
||||
@Test
|
||||
public void testChecksumCorruption() throws IOException {
|
||||
|
|
|
@ -687,14 +687,13 @@ public class TestHFileBlock {
|
|||
// verifies that the unpacked value read back off disk matches the unpacked value
|
||||
// generated before writing to disk.
|
||||
HFileBlock newBlock = b.unpack(meta, hbr);
|
||||
// b's buffer has header + data + checksum while
|
||||
// expectedContents have header + data only
|
||||
// neither b's unpacked nor the expectedContents have checksum.
|
||||
// they should be identical
|
||||
ByteBuff bufRead = newBlock.getBufferReadOnly();
|
||||
ByteBuffer bufExpected = expectedContents.get(i);
|
||||
byte[] tmp = new byte[bufRead.limit() - newBlock.totalChecksumBytes()];
|
||||
bufRead.get(tmp, 0, tmp.length);
|
||||
boolean bytesAreCorrect = Bytes.compareTo(tmp, 0, tmp.length, bufExpected.array(),
|
||||
bufExpected.arrayOffset(), bufExpected.limit()) == 0;
|
||||
byte[] bytesRead = bufRead.toBytes();
|
||||
boolean bytesAreCorrect = Bytes.compareTo(bytesRead, 0, bytesRead.length,
|
||||
bufExpected.array(), bufExpected.arrayOffset(), bufExpected.limit()) == 0;
|
||||
String wrongBytesMsg = "";
|
||||
|
||||
if (!bytesAreCorrect) {
|
||||
|
@ -704,10 +703,8 @@ public class TestHFileBlock {
|
|||
+ algo + ", pread=" + pread
|
||||
+ ", cacheOnWrite=" + cacheOnWrite + "):\n";
|
||||
wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
|
||||
bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit()))
|
||||
+ ", actual:\n"
|
||||
+ Bytes.toStringBinary(bufRead.array(),
|
||||
bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit()));
|
||||
bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) + ", actual:\n"
|
||||
+ Bytes.toStringBinary(bytesRead, 0, Math.min(32 + 10, bytesRead.length));
|
||||
if (detailedLogging) {
|
||||
LOG.warn("expected header" +
|
||||
HFileBlock.toStringHeader(new SingleByteBuff(bufExpected)) +
|
||||
|
|
|
@ -0,0 +1,249 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
@Category({ IOTests.class, MediumTests.class })
|
||||
public class TestHFileBlockUnpack {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHFileBlockUnpack.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
// repetition gives us some chance to get a good compression ratio
|
||||
private static float CHANCE_TO_REPEAT = 0.6f;
|
||||
|
||||
private static final int MIN_ALLOCATION_SIZE = 10 * 1024;
|
||||
|
||||
ByteBuffAllocator allocator;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
private FileSystem fs;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
fs = HFileSystem.get(TEST_UTIL.getConfiguration());
|
||||
Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
||||
conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE);
|
||||
allocator = ByteBuffAllocator.create(conf, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* It's important that if you read and unpack the same HFileBlock twice, it results in an
|
||||
* identical buffer each time. Otherwise we end up with validation failures in block cache, since
|
||||
* contents may not match if the same block is cached twice. See
|
||||
* https://issues.apache.org/jira/browse/HBASE-27053
|
||||
*/
|
||||
@Test
|
||||
public void itUnpacksIdenticallyEachTime() throws IOException {
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName());
|
||||
int totalSize = createTestBlock(path);
|
||||
|
||||
// Allocate a bunch of random buffers, so we can be sure that unpack will only have "dirty"
|
||||
// buffers to choose from when allocating itself.
|
||||
Random random = new Random();
|
||||
byte[] temp = new byte[HConstants.DEFAULT_BLOCKSIZE];
|
||||
List<ByteBuff> buffs = new ArrayList<>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
ByteBuff buff = allocator.allocate(HConstants.DEFAULT_BLOCKSIZE);
|
||||
random.nextBytes(temp);
|
||||
buff.put(temp);
|
||||
buffs.add(buff);
|
||||
}
|
||||
|
||||
buffs.forEach(ByteBuff::release);
|
||||
|
||||
// read the same block twice. we should expect the underlying buffer below to
|
||||
// be identical each time
|
||||
HFileBlockWrapper blockOne = readBlock(path, totalSize);
|
||||
HFileBlockWrapper blockTwo = readBlock(path, totalSize);
|
||||
|
||||
// first check size fields
|
||||
assertEquals(blockOne.original.getOnDiskSizeWithHeader(),
|
||||
blockTwo.original.getOnDiskSizeWithHeader());
|
||||
assertEquals(blockOne.original.getUncompressedSizeWithoutHeader(),
|
||||
blockTwo.original.getUncompressedSizeWithoutHeader());
|
||||
|
||||
// next check packed buffers
|
||||
assertBuffersEqual(blockOne.original.getBufferWithoutHeader(),
|
||||
blockTwo.original.getBufferWithoutHeader(),
|
||||
blockOne.original.getOnDiskDataSizeWithHeader() - blockOne.original.headerSize());
|
||||
|
||||
// now check unpacked buffers. prior to HBASE-27053, this would fail because
|
||||
// the unpacked buffer would include extra space for checksums at the end that was not written.
|
||||
// so the checksum space would be filled with random junk when re-using pooled buffers.
|
||||
assertBuffersEqual(blockOne.unpacked.getBufferWithoutHeader(),
|
||||
blockTwo.unpacked.getBufferWithoutHeader(),
|
||||
blockOne.original.getUncompressedSizeWithoutHeader());
|
||||
}
|
||||
|
||||
private void assertBuffersEqual(ByteBuff bufferOne, ByteBuff bufferTwo, int expectedSize) {
|
||||
assertEquals(expectedSize, bufferOne.limit());
|
||||
assertEquals(expectedSize, bufferTwo.limit());
|
||||
assertEquals(0,
|
||||
ByteBuff.compareTo(bufferOne, 0, bufferOne.limit(), bufferTwo, 0, bufferTwo.limit()));
|
||||
}
|
||||
|
||||
/**
|
||||
* If the block on disk size is less than {@link ByteBuffAllocator}'s min allocation size, that
|
||||
* block will be allocated to heap regardless of desire for off-heap. After de-compressing the
|
||||
* block, the new size may now exceed the min allocation size. This test ensures that those
|
||||
* de-compressed blocks, which will be allocated off-heap, are properly marked as
|
||||
* {@link HFileBlock#isSharedMem()} == true See https://issues.apache.org/jira/browse/HBASE-27170
|
||||
*/
|
||||
@Test
|
||||
public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize() throws IOException {
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName());
|
||||
int totalSize = createTestBlock(path);
|
||||
HFileBlockWrapper blockFromHFile = readBlock(path, totalSize);
|
||||
|
||||
assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.original.isUnpacked());
|
||||
assertFalse("expected hfile block to NOT use shared memory",
|
||||
blockFromHFile.original.isSharedMem());
|
||||
|
||||
assertTrue(
|
||||
"expected generated block size " + blockFromHFile.original.getOnDiskSizeWithHeader()
|
||||
+ " to be less than " + MIN_ALLOCATION_SIZE,
|
||||
blockFromHFile.original.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE);
|
||||
assertTrue(
|
||||
"expected generated block uncompressed size "
|
||||
+ blockFromHFile.original.getUncompressedSizeWithoutHeader() + " to be more than "
|
||||
+ MIN_ALLOCATION_SIZE,
|
||||
blockFromHFile.original.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE);
|
||||
|
||||
assertTrue("expected unpacked block to be unpacked", blockFromHFile.unpacked.isUnpacked());
|
||||
assertTrue("expected unpacked block to use shared memory",
|
||||
blockFromHFile.unpacked.isSharedMem());
|
||||
}
|
||||
|
||||
private final static class HFileBlockWrapper {
|
||||
private final HFileBlock original;
|
||||
private final HFileBlock unpacked;
|
||||
|
||||
private HFileBlockWrapper(HFileBlock original, HFileBlock unpacked) {
|
||||
this.original = original;
|
||||
this.unpacked = unpacked;
|
||||
}
|
||||
}
|
||||
|
||||
private HFileBlockWrapper readBlock(Path path, int totalSize) throws IOException {
|
||||
try (FSDataInputStream is = fs.open(path)) {
|
||||
HFileContext meta =
|
||||
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ)
|
||||
.withIncludesMvcc(false).withIncludesTags(false).build();
|
||||
ReaderContext context =
|
||||
new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
|
||||
.withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
|
||||
HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, meta, allocator);
|
||||
hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE);
|
||||
hbr.setIncludesMemStoreTS(false);
|
||||
HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false);
|
||||
blockFromHFile.sanityCheck();
|
||||
return new HFileBlockWrapper(blockFromHFile, blockFromHFile.unpack(meta, hbr));
|
||||
}
|
||||
}
|
||||
|
||||
private int createTestBlock(Path path) throws IOException {
|
||||
HFileContext meta =
|
||||
new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false)
|
||||
.withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
|
||||
|
||||
int totalSize;
|
||||
try (FSDataOutputStream os = fs.create(path)) {
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(NoOpDataBlockEncoder.INSTANCE, meta, allocator);
|
||||
hbw.startWriting(BlockType.DATA);
|
||||
writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1);
|
||||
hbw.writeHeaderAndData(os);
|
||||
totalSize = hbw.getOnDiskSizeWithHeader();
|
||||
assertTrue(
|
||||
"expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE,
|
||||
totalSize < MIN_ALLOCATION_SIZE);
|
||||
}
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException {
|
||||
Random random = new Random(42);
|
||||
|
||||
byte[] family = new byte[] { 1 };
|
||||
int rowKey = 0;
|
||||
int qualifier = 0;
|
||||
int value = 0;
|
||||
long timestamp = 0;
|
||||
|
||||
int totalSize = 0;
|
||||
|
||||
// go until just up to the limit. compression should bring the total on-disk size under
|
||||
while (totalSize < desiredSize) {
|
||||
rowKey = maybeIncrement(random, rowKey);
|
||||
qualifier = maybeIncrement(random, qualifier);
|
||||
value = maybeIncrement(random, value);
|
||||
timestamp = maybeIncrement(random, (int) timestamp);
|
||||
|
||||
KeyValue keyValue = new KeyValue(Bytes.toBytes(rowKey), family, Bytes.toBytes(qualifier),
|
||||
timestamp, Bytes.toBytes(value));
|
||||
hbw.write(keyValue);
|
||||
totalSize += keyValue.getLength();
|
||||
}
|
||||
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
private static int maybeIncrement(Random random, int value) {
|
||||
if (random.nextFloat() < CHANCE_TO_REPEAT) {
|
||||
return value;
|
||||
}
|
||||
return value + 1;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue