HBASE-4218 Data Block Encoding of KeyValues (aka delta encoding / prefix compression) - files used for testing
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1223021 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cd47ea0562
commit
73e8383359
|
@ -0,0 +1,290 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.encoding;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
|
||||
/**
|
||||
* Generate list of key values which are very useful to test data block encoding
|
||||
* and compression.
|
||||
*/
|
||||
public class RedundantKVGenerator {
|
||||
// row settings
|
||||
static int DEFAULT_NUMBER_OF_ROW_PREFIXES = 10;
|
||||
static int DEFAULT_AVERAGE_PREFIX_LENGTH = 6;
|
||||
static int DEFAULT_PREFIX_LENGTH_VARIANCE = 3;
|
||||
static int DEFAULT_AVERAGE_SUFFIX_LENGTH = 3;
|
||||
static int DEFAULT_SUFFIX_LENGTH_VARIANCE = 3;
|
||||
static int DEFAULT_NUMBER_OF_ROW = 500;
|
||||
|
||||
// qualifier
|
||||
static float DEFAULT_CHANCE_FOR_SAME_QUALIFIER = 0.5f;
|
||||
static float DEFAULT_CHANCE_FOR_SIMILIAR_QUALIFIER = 0.4f;
|
||||
static int DEFAULT_AVERAGE_QUALIFIER_LENGTH = 9;
|
||||
static int DEFAULT_QUALIFIER_LENGTH_VARIANCE = 3;
|
||||
|
||||
static int DEFAULT_COLUMN_FAMILY_LENGTH = 9;
|
||||
static int DEFAULT_VALUE_LENGTH = 8;
|
||||
static float DEFAULT_CHANCE_FOR_ZERO_VALUE = 0.5f;
|
||||
|
||||
static int DEFAULT_BASE_TIMESTAMP_DIVIDE = 1000000;
|
||||
static int DEFAULT_TIMESTAMP_DIFF_SIZE = 100000000;
|
||||
|
||||
/**
|
||||
* Default constructor, assumes all parameters from class constants.
|
||||
*/
|
||||
public RedundantKVGenerator() {
|
||||
this(new Random(42L),
|
||||
DEFAULT_NUMBER_OF_ROW_PREFIXES,
|
||||
DEFAULT_AVERAGE_PREFIX_LENGTH,
|
||||
DEFAULT_PREFIX_LENGTH_VARIANCE,
|
||||
DEFAULT_AVERAGE_SUFFIX_LENGTH,
|
||||
DEFAULT_SUFFIX_LENGTH_VARIANCE,
|
||||
DEFAULT_NUMBER_OF_ROW,
|
||||
|
||||
DEFAULT_CHANCE_FOR_SAME_QUALIFIER,
|
||||
DEFAULT_CHANCE_FOR_SIMILIAR_QUALIFIER,
|
||||
DEFAULT_AVERAGE_QUALIFIER_LENGTH,
|
||||
DEFAULT_QUALIFIER_LENGTH_VARIANCE,
|
||||
|
||||
DEFAULT_COLUMN_FAMILY_LENGTH,
|
||||
DEFAULT_VALUE_LENGTH,
|
||||
DEFAULT_CHANCE_FOR_ZERO_VALUE,
|
||||
|
||||
DEFAULT_BASE_TIMESTAMP_DIVIDE,
|
||||
DEFAULT_TIMESTAMP_DIFF_SIZE
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Various configuration options for generating key values
|
||||
* @param randomizer pick things by random
|
||||
*/
|
||||
public RedundantKVGenerator(Random randomizer,
|
||||
int numberOfRowPrefixes,
|
||||
int averagePrefixLength,
|
||||
int prefixLengthVariance,
|
||||
int averageSuffixLength,
|
||||
int suffixLengthVariance,
|
||||
int numberOfRows,
|
||||
|
||||
float chanceForSameQualifier,
|
||||
float chanceForSimiliarQualifier,
|
||||
int averageQualifierLength,
|
||||
int qualifierLengthVariance,
|
||||
|
||||
int columnFamilyLength,
|
||||
int valueLength,
|
||||
float chanceForZeroValue,
|
||||
|
||||
int baseTimestampDivide,
|
||||
int timestampDiffSize
|
||||
) {
|
||||
this.randomizer = randomizer;
|
||||
|
||||
this.numberOfRowPrefixes = numberOfRowPrefixes;
|
||||
this.averagePrefixLength = averagePrefixLength;
|
||||
this.prefixLengthVariance = prefixLengthVariance;
|
||||
this.averageSuffixLength = averageSuffixLength;
|
||||
this.suffixLengthVariance = suffixLengthVariance;
|
||||
this.numberOfRows = numberOfRows;
|
||||
|
||||
this.chanceForSameQualifier = chanceForSameQualifier;
|
||||
this.chanceForSimiliarQualifier = chanceForSimiliarQualifier;
|
||||
this.averageQualifierLength = averageQualifierLength;
|
||||
this.qualifierLengthVariance = qualifierLengthVariance;
|
||||
|
||||
this.columnFamilyLength = columnFamilyLength;
|
||||
this.valueLength = valueLength;
|
||||
this.chanceForZeroValue = chanceForZeroValue;
|
||||
|
||||
this.baseTimestampDivide = baseTimestampDivide;
|
||||
this.timestampDiffSize = timestampDiffSize;
|
||||
}
|
||||
|
||||
/** Used to generate dataset */
|
||||
private Random randomizer;
|
||||
|
||||
// row settings
|
||||
private int numberOfRowPrefixes;
|
||||
private int averagePrefixLength = 6;
|
||||
private int prefixLengthVariance = 3;
|
||||
private int averageSuffixLength = 3;
|
||||
private int suffixLengthVariance = 3;
|
||||
private int numberOfRows = 500;
|
||||
|
||||
// qualifier
|
||||
private float chanceForSameQualifier = 0.5f;
|
||||
private float chanceForSimiliarQualifier = 0.4f;
|
||||
private int averageQualifierLength = 9;
|
||||
private int qualifierLengthVariance = 3;
|
||||
|
||||
private int columnFamilyLength = 9;
|
||||
private int valueLength = 8;
|
||||
private float chanceForZeroValue = 0.5f;
|
||||
|
||||
private int baseTimestampDivide = 1000000;
|
||||
private int timestampDiffSize = 100000000;
|
||||
|
||||
private List<byte[]> generateRows() {
|
||||
// generate prefixes
|
||||
List<byte[]> prefixes = new ArrayList<byte[]>();
|
||||
prefixes.add(new byte[0]);
|
||||
for (int i = 1 ; i < numberOfRowPrefixes ; ++i) {
|
||||
int prefixLength = averagePrefixLength;
|
||||
prefixLength += randomizer.nextInt(2 * prefixLengthVariance + 1) -
|
||||
prefixLengthVariance;
|
||||
byte[] newPrefix = new byte[prefixLength];
|
||||
randomizer.nextBytes(newPrefix);
|
||||
prefixes.add(newPrefix);
|
||||
}
|
||||
|
||||
// generate rest of the row
|
||||
List<byte[]> rows = new ArrayList<byte[]>();
|
||||
for (int i = 0 ; i < numberOfRows ; ++i) {
|
||||
int suffixLength = averageSuffixLength;
|
||||
suffixLength += randomizer.nextInt(2 * suffixLengthVariance + 1) -
|
||||
suffixLengthVariance;
|
||||
int randomPrefix = randomizer.nextInt(prefixes.size());
|
||||
byte[] row = new byte[prefixes.get(randomPrefix).length +
|
||||
suffixLength];
|
||||
rows.add(row);
|
||||
}
|
||||
|
||||
return rows;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate test data useful to test encoders.
|
||||
* @param howMany How many Key values should be generated.
|
||||
* @return sorted list of key values
|
||||
*/
|
||||
public List<KeyValue> generateTestKeyValues(int howMany) {
|
||||
List<KeyValue> result = new ArrayList<KeyValue>();
|
||||
|
||||
List<byte[]> rows = generateRows();
|
||||
Map<Integer, List<byte[]>> rowsToQualifier =
|
||||
new HashMap<Integer, List<byte[]>>();
|
||||
|
||||
byte[] family = new byte[columnFamilyLength];
|
||||
randomizer.nextBytes(family);
|
||||
|
||||
long baseTimestamp = Math.abs(randomizer.nextLong()) /
|
||||
baseTimestampDivide;
|
||||
|
||||
byte[] value = new byte[valueLength];
|
||||
|
||||
for (int i = 0 ; i < howMany ; ++i) {
|
||||
long timestamp = baseTimestamp + randomizer.nextInt(
|
||||
timestampDiffSize);
|
||||
Integer rowId = randomizer.nextInt(rows.size());
|
||||
byte[] row = rows.get(rowId);
|
||||
|
||||
// generate qualifier, sometimes it is same, sometimes similar,
|
||||
// occasionally completely different
|
||||
byte[] qualifier;
|
||||
float qualifierChance = randomizer.nextFloat();
|
||||
if (!rowsToQualifier.containsKey(rowId) ||
|
||||
qualifierChance > chanceForSameQualifier +
|
||||
chanceForSimiliarQualifier) {
|
||||
int qualifierLength = averageQualifierLength;
|
||||
qualifierLength +=
|
||||
randomizer.nextInt(2 * qualifierLengthVariance + 1) -
|
||||
qualifierLengthVariance;
|
||||
qualifier = new byte[qualifierLength];
|
||||
randomizer.nextBytes(qualifier);
|
||||
|
||||
// add it to map
|
||||
if (!rowsToQualifier.containsKey(rowId)) {
|
||||
rowsToQualifier.put(rowId, new ArrayList<byte[]>());
|
||||
}
|
||||
rowsToQualifier.get(rowId).add(qualifier);
|
||||
} else if (qualifierChance > chanceForSameQualifier) {
|
||||
// similar qualifier
|
||||
List<byte[]> previousQualifiers = rowsToQualifier.get(rowId);
|
||||
byte[] originalQualifier = previousQualifiers.get(
|
||||
randomizer.nextInt(previousQualifiers.size()));
|
||||
|
||||
qualifier = new byte[originalQualifier.length];
|
||||
int commonPrefix = randomizer.nextInt(qualifier.length);
|
||||
System.arraycopy(originalQualifier, 0, qualifier, 0, commonPrefix);
|
||||
for (int j = commonPrefix ; j < qualifier.length ; ++j) {
|
||||
qualifier[j] = (byte) (randomizer.nextInt() & 0xff);
|
||||
}
|
||||
|
||||
rowsToQualifier.get(rowId).add(qualifier);
|
||||
} else {
|
||||
// same qualifier
|
||||
List<byte[]> previousQualifiers = rowsToQualifier.get(rowId);
|
||||
qualifier = previousQualifiers.get(
|
||||
randomizer.nextInt(previousQualifiers.size()));
|
||||
}
|
||||
|
||||
if (randomizer.nextFloat() < chanceForZeroValue) {
|
||||
for (int j = 0 ; j < value.length ; ++j) {
|
||||
value[j] = (byte) 0;
|
||||
}
|
||||
} else {
|
||||
randomizer.nextBytes(value);
|
||||
}
|
||||
|
||||
result.add(new KeyValue(row, family, qualifier, timestamp, value));
|
||||
}
|
||||
|
||||
Collections.sort(result, KeyValue.COMPARATOR);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert list of KeyValues to byte buffer.
|
||||
* @param keyValues list of KeyValues to be converted.
|
||||
* @return buffer with content from key values
|
||||
*/
|
||||
public static ByteBuffer convertKvToByteBuffer(List<KeyValue> keyValues,
|
||||
boolean includesMemstoreTS) {
|
||||
int totalSize = 0;
|
||||
for (KeyValue kv : keyValues) {
|
||||
totalSize += kv.getLength();
|
||||
if (includesMemstoreTS) {
|
||||
totalSize += WritableUtils.getVIntSize(kv.getMemstoreTS());
|
||||
}
|
||||
}
|
||||
|
||||
ByteBuffer result = ByteBuffer.allocate(totalSize);
|
||||
for (KeyValue kv : keyValues) {
|
||||
result.put(kv.getBuffer(), kv.getOffset(), kv.getLength());
|
||||
if (includesMemstoreTS) {
|
||||
ByteBufferUtils.writeVLong(result, kv.getMemstoreTS());
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.encoding;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestBufferedDataBlockEncoder {
|
||||
|
||||
@Test
|
||||
public void testEnsureSpaceForKey() {
|
||||
BufferedDataBlockEncoder.SeekerState state =
|
||||
new BufferedDataBlockEncoder.SeekerState();
|
||||
for (int i = 1; i <= 65536; ++i) {
|
||||
state.keyLength = i;
|
||||
state.ensureSpaceForKey();
|
||||
state.keyBuffer[state.keyLength - 1] = (byte) ((i - 1) % 0xff);
|
||||
for (int j = 0; j < i - 1; ++j) {
|
||||
// Check that earlier bytes were preserved as the buffer grew.
|
||||
assertEquals((byte) (j % 0xff), state.keyBuffer[j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,343 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.encoding;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
/**
|
||||
* Test all of the data block encoding algorithms for correctness.
|
||||
* Most of the class generate data which will test different branches in code.
|
||||
*/
|
||||
@Category(LargeTests.class)
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestDataBlockEncoders {
|
||||
static int NUMBER_OF_KV = 10000;
|
||||
static int NUM_RANDOM_SEEKS = 10000;
|
||||
|
||||
private RedundantKVGenerator generator = new RedundantKVGenerator();
|
||||
private Random randomizer = new Random(42l);
|
||||
|
||||
private final boolean includesMemstoreTS;
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> parameters() {
|
||||
return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
|
||||
}
|
||||
|
||||
public TestDataBlockEncoders(boolean includesMemstoreTS) {
|
||||
this.includesMemstoreTS = includesMemstoreTS;
|
||||
}
|
||||
|
||||
private void testAlgorithm(ByteBuffer dataset, DataBlockEncoder encoder)
|
||||
throws IOException {
|
||||
// encode
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
DataOutputStream dataOut = new DataOutputStream(baos);
|
||||
encoder.compressKeyValues(dataOut, dataset, includesMemstoreTS);
|
||||
|
||||
// decode
|
||||
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
|
||||
DataInputStream dis = new DataInputStream(bais);
|
||||
ByteBuffer actualDataset;
|
||||
actualDataset = encoder.uncompressKeyValues(dis, includesMemstoreTS);
|
||||
|
||||
dataset.rewind();
|
||||
actualDataset.rewind();
|
||||
|
||||
assertEquals("Encoding -> decoding gives different results for " + encoder,
|
||||
dataset, actualDataset);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test data block encoding of empty KeyValue.
|
||||
* @throws IOException On test failure.
|
||||
*/
|
||||
@Test
|
||||
public void testEmptyKeyValues() throws IOException {
|
||||
List<KeyValue> kvList = new ArrayList<KeyValue>();
|
||||
byte[] row = new byte[0];
|
||||
byte[] family = new byte[0];
|
||||
byte[] qualifier = new byte[0];
|
||||
byte[] value = new byte[0];
|
||||
kvList.add(new KeyValue(row, family, qualifier, 0l, Type.Put, value));
|
||||
kvList.add(new KeyValue(row, family, qualifier, 0l, Type.Put, value));
|
||||
testEncodersOnDataset(RedundantKVGenerator.convertKvToByteBuffer(kvList,
|
||||
includesMemstoreTS));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test KeyValues with negative timestamp.
|
||||
* @throws IOException On test failure.
|
||||
*/
|
||||
@Test
|
||||
public void testNegativeTimestamps() throws IOException {
|
||||
List<KeyValue> kvList = new ArrayList<KeyValue>();
|
||||
byte[] row = new byte[0];
|
||||
byte[] family = new byte[0];
|
||||
byte[] qualifier = new byte[0];
|
||||
byte[] value = new byte[0];
|
||||
kvList.add(new KeyValue(row, family, qualifier, -1l, Type.Put, value));
|
||||
kvList.add(new KeyValue(row, family, qualifier, -2l, Type.Put, value));
|
||||
testEncodersOnDataset(
|
||||
RedundantKVGenerator.convertKvToByteBuffer(kvList,
|
||||
includesMemstoreTS));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test whether compression -> decompression gives the consistent results on
|
||||
* pseudorandom sample.
|
||||
* @throws IOException On test failure.
|
||||
*/
|
||||
@Test
|
||||
public void testExecutionOnSample() throws IOException {
|
||||
testEncodersOnDataset(
|
||||
RedundantKVGenerator.convertKvToByteBuffer(
|
||||
generator.generateTestKeyValues(NUMBER_OF_KV),
|
||||
includesMemstoreTS));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test seeking while file is encoded.
|
||||
*/
|
||||
@Test
|
||||
public void testSeekingOnSample() throws IOException{
|
||||
List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV);
|
||||
ByteBuffer originalBuffer =
|
||||
RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
|
||||
includesMemstoreTS);
|
||||
List<DataBlockEncoder> dataBlockEncoders = DataBlockEncodings.getAllEncoders();
|
||||
|
||||
// create all seekers
|
||||
List<DataBlockEncoder.EncodedSeeker> encodedSeekers =
|
||||
new ArrayList<DataBlockEncoder.EncodedSeeker>();
|
||||
for (DataBlockEncoder encoder : dataBlockEncoders) {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
DataOutputStream dataOut = new DataOutputStream(baos);
|
||||
encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS);
|
||||
ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray());
|
||||
DataBlockEncoder.EncodedSeeker seeker =
|
||||
encoder.createSeeker(KeyValue.KEY_COMPARATOR, includesMemstoreTS);
|
||||
seeker.setCurrentBuffer(encodedBuffer);
|
||||
encodedSeekers.add(seeker);
|
||||
}
|
||||
|
||||
// test it!
|
||||
// try a few random seeks
|
||||
for (boolean seekBefore : new boolean[] {false, true}) {
|
||||
for (int i = 0 ; i < NUM_RANDOM_SEEKS ; ++i) {
|
||||
int keyValueId;
|
||||
if (!seekBefore) {
|
||||
keyValueId = randomizer.nextInt(sampleKv.size());
|
||||
} else {
|
||||
keyValueId = randomizer.nextInt(sampleKv.size() - 1) + 1;
|
||||
}
|
||||
|
||||
KeyValue keyValue = sampleKv.get(keyValueId);
|
||||
checkSeekingConsistency(encodedSeekers, seekBefore, keyValue);
|
||||
}
|
||||
}
|
||||
|
||||
// check edge cases
|
||||
checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0));
|
||||
for (boolean seekBefore : new boolean[] {false, true}) {
|
||||
checkSeekingConsistency(encodedSeekers, seekBefore,
|
||||
sampleKv.get(sampleKv.size() - 1));
|
||||
KeyValue midKv = sampleKv.get(sampleKv.size() / 2);
|
||||
KeyValue lastMidKv = midKv.createLastOnRowCol();
|
||||
checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test iterating on encoded buffers.
|
||||
*/
|
||||
@Test
|
||||
public void testNextOnSample() {
|
||||
List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV);
|
||||
ByteBuffer originalBuffer =
|
||||
RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
|
||||
includesMemstoreTS);
|
||||
List<DataBlockEncoder> dataBlockEncoders = DataBlockEncodings.getAllEncoders();
|
||||
|
||||
for (DataBlockEncoder encoder : dataBlockEncoders) {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
DataOutputStream dataOut = new DataOutputStream(baos);
|
||||
try {
|
||||
encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(String.format(
|
||||
"Bug while encoding using '%s'", encoder.toString()), e);
|
||||
}
|
||||
|
||||
ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray());
|
||||
DataBlockEncoder.EncodedSeeker seeker =
|
||||
encoder.createSeeker(KeyValue.KEY_COMPARATOR, includesMemstoreTS);
|
||||
seeker.setCurrentBuffer(encodedBuffer);
|
||||
int i = 0;
|
||||
do {
|
||||
KeyValue expectedKeyValue = sampleKv.get(i);
|
||||
ByteBuffer keyValue = seeker.getKeyValue();
|
||||
if (0 != Bytes.compareTo(
|
||||
keyValue.array(), keyValue.arrayOffset(), keyValue.limit(),
|
||||
expectedKeyValue.getBuffer(), expectedKeyValue.getOffset(),
|
||||
expectedKeyValue.getLength())) {
|
||||
|
||||
int commonPrefix = 0;
|
||||
byte[] left = keyValue.array();
|
||||
byte[] right = expectedKeyValue.getBuffer();
|
||||
int leftOff = keyValue.arrayOffset();
|
||||
int rightOff = expectedKeyValue.getOffset();
|
||||
int length = Math.min(keyValue.limit(), expectedKeyValue.getLength());
|
||||
while (commonPrefix < length &&
|
||||
left[commonPrefix + leftOff] == right[commonPrefix + rightOff]) {
|
||||
commonPrefix++;
|
||||
}
|
||||
|
||||
fail(String.format(
|
||||
"next() produces wrong results " +
|
||||
"encoder: %s i: %d commonPrefix: %d" +
|
||||
"\n expected %s\n actual %s",
|
||||
encoder.toString(), i, commonPrefix,
|
||||
Bytes.toStringBinary(expectedKeyValue.getBuffer(),
|
||||
expectedKeyValue.getOffset(), expectedKeyValue.getLength()),
|
||||
Bytes.toStringBinary(keyValue)));
|
||||
}
|
||||
i++;
|
||||
} while (seeker.next());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test whether the decompression of first key is implemented correctly.
|
||||
*/
|
||||
@Test
|
||||
public void testFirstKeyInBlockOnSample() {
|
||||
List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV);
|
||||
ByteBuffer originalBuffer =
|
||||
RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
|
||||
includesMemstoreTS);
|
||||
List<DataBlockEncoder> dataBlockEncoders = DataBlockEncodings.getAllEncoders();
|
||||
|
||||
for (DataBlockEncoder encoder : dataBlockEncoders) {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
DataOutputStream dataOut = new DataOutputStream(baos);
|
||||
try {
|
||||
encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(String.format(
|
||||
"Bug while encoding using '%s'", encoder.toString()), e);
|
||||
}
|
||||
|
||||
ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray());
|
||||
ByteBuffer keyBuffer = encoder.getFirstKeyInBlock(encodedBuffer);
|
||||
KeyValue firstKv = sampleKv.get(0);
|
||||
if (0 != Bytes.compareTo(
|
||||
keyBuffer.array(), keyBuffer.arrayOffset(), keyBuffer.limit(),
|
||||
firstKv.getBuffer(), firstKv.getKeyOffset(),
|
||||
firstKv.getKeyLength())) {
|
||||
|
||||
int commonPrefix = 0;
|
||||
int length = Math.min(keyBuffer.limit(), firstKv.getKeyLength());
|
||||
while (commonPrefix < length &&
|
||||
keyBuffer.array()[keyBuffer.arrayOffset() + commonPrefix] ==
|
||||
firstKv.getBuffer()[firstKv.getKeyOffset() + commonPrefix]) {
|
||||
commonPrefix++;
|
||||
}
|
||||
fail(String.format("Bug in '%s' commonPrefix %d",
|
||||
encoder.toString(), commonPrefix));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkSeekingConsistency(
|
||||
List<DataBlockEncoder.EncodedSeeker> encodedSeekers, boolean seekBefore,
|
||||
KeyValue keyValue) {
|
||||
ByteBuffer expectedKeyValue = null;
|
||||
ByteBuffer expectedKey = null;
|
||||
ByteBuffer expectedValue = null;
|
||||
|
||||
for (DataBlockEncoder.EncodedSeeker seeker : encodedSeekers) {
|
||||
seeker.blockSeekTo(keyValue.getBuffer(),
|
||||
keyValue.getKeyOffset(), keyValue.getKeyLength(), seekBefore);
|
||||
seeker.rewind();
|
||||
|
||||
ByteBuffer actualKeyValue = seeker.getKeyValue();
|
||||
ByteBuffer actualKey = seeker.getKey();
|
||||
ByteBuffer actualValue = seeker.getValue();
|
||||
|
||||
if (expectedKeyValue != null) {
|
||||
assertEquals(expectedKeyValue, actualKeyValue);
|
||||
} else {
|
||||
expectedKeyValue = actualKeyValue;
|
||||
}
|
||||
|
||||
if (expectedKey != null) {
|
||||
assertEquals(expectedKey, actualKey);
|
||||
} else {
|
||||
expectedKey = actualKey;
|
||||
}
|
||||
|
||||
if (expectedValue != null) {
|
||||
assertEquals(expectedValue, actualValue);
|
||||
} else {
|
||||
expectedValue = actualValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void testEncodersOnDataset(ByteBuffer onDataset)
|
||||
throws IOException{
|
||||
List<DataBlockEncoder> dataBlockEncoders =
|
||||
DataBlockEncodings.getAllEncoders();
|
||||
ByteBuffer dataset = ByteBuffer.allocate(onDataset.capacity());
|
||||
onDataset.rewind();
|
||||
dataset.put(onDataset);
|
||||
onDataset.rewind();
|
||||
dataset.flip();
|
||||
|
||||
for (DataBlockEncoder encoder : dataBlockEncoders) {
|
||||
testAlgorithm(dataset, encoder);
|
||||
|
||||
// ensure that dataset is unchanged
|
||||
dataset.rewind();
|
||||
assertEquals("Input of two methods is changed", onDataset, dataset);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue