HBASE-8317 Seek returns wrong result with PREFIX_TREE Encoding
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1470123 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f22a9ce1ac
commit
4370fd18e4
|
@ -126,7 +126,11 @@ public class PrefixTreeArraySearcher extends PrefixTreeArrayReversibleScanner im
|
||||||
//detect dead end (no fan to descend into)
|
//detect dead end (no fan to descend into)
|
||||||
if(!currentRowNode.hasFan()){
|
if(!currentRowNode.hasFan()){
|
||||||
if(hasOccurrences()){
|
if(hasOccurrences()){
|
||||||
populateFirstNonRowFields();
|
if (rowLength < key.getRowLength()) {
|
||||||
|
nextRow();
|
||||||
|
} else {
|
||||||
|
populateFirstNonRowFields();
|
||||||
|
}
|
||||||
return CellScannerPosition.AFTER;
|
return CellScannerPosition.AFTER;
|
||||||
}else{
|
}else{
|
||||||
//TODO i don't think this case is exercised by any tests
|
//TODO i don't think this case is exercised by any tests
|
||||||
|
|
|
@ -126,50 +126,55 @@ public class TestPrefixTreeSearcher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* very hard to test nubs with this thing since the a nextRowKey function will usually skip them
|
|
||||||
*/
|
|
||||||
@Test
|
@Test
|
||||||
public void testRandomSeekMisses() throws IOException {
|
public void testRandomSeekMisses() throws IOException {
|
||||||
CellSearcher searcher = null;
|
CellSearcher searcher = null;
|
||||||
List<Integer> rowStartIndexes = rows.getRowStartIndexes();
|
List<Integer> rowStartIndexes = rows.getRowStartIndexes();
|
||||||
try {
|
try {
|
||||||
searcher = DecoderFactory.checkOut(block, true);
|
searcher = DecoderFactory.checkOut(block, true);
|
||||||
for (int i=0; i < rows.getInputs().size(); ++i) {
|
|
||||||
KeyValue kv = rows.getInputs().get(i);
|
|
||||||
|
|
||||||
//nextRow
|
//test both the positionAtOrBefore and positionAtOrAfter methods
|
||||||
KeyValue inputNextRow = KeyValueUtil.createFirstKeyInNextRow(kv);
|
for(boolean beforeVsAfterOnMiss : new boolean[]{true, false}){
|
||||||
|
for (int i=0; i < rows.getInputs().size(); ++i) {
|
||||||
|
KeyValue kv = rows.getInputs().get(i);
|
||||||
|
|
||||||
CellScannerPosition position = searcher.positionAtOrBefore(inputNextRow);
|
//nextRow
|
||||||
boolean isFirstInRow = rowStartIndexes.contains(i);
|
KeyValue inputNextRow = KeyValueUtil.createFirstKeyInNextRow(kv);
|
||||||
if(isFirstInRow){
|
|
||||||
int rowIndex = rowStartIndexes.indexOf(i);
|
CellScannerPosition position = beforeVsAfterOnMiss
|
||||||
if(rowIndex < rowStartIndexes.size() - 1){
|
? searcher.positionAtOrBefore(inputNextRow)
|
||||||
// int lastKvInRowI = rowStartIndexes.get(rowIndex + 1) - 1;
|
: searcher.positionAtOrAfter(inputNextRow);
|
||||||
Assert.assertEquals(CellScannerPosition.BEFORE, position);
|
boolean isFirstInRow = rowStartIndexes.contains(i);
|
||||||
/*
|
if(isFirstInRow){
|
||||||
* Can't get this to work between nubs like rowB\x00 <-> rowBB
|
int rowIndex = rowStartIndexes.indexOf(i);
|
||||||
*
|
if(rowIndex < rowStartIndexes.size() - 1){
|
||||||
* No reason to doubt that it works, but will have to come up with a smarter test.
|
if(beforeVsAfterOnMiss){
|
||||||
*/
|
Assert.assertEquals(CellScannerPosition.BEFORE, position);
|
||||||
// Assert.assertEquals(rows.getInputs().get(lastKvInRowI), searcher.getCurrentCell());
|
}else{
|
||||||
|
Assert.assertEquals(CellScannerPosition.AFTER, position);
|
||||||
|
}
|
||||||
|
|
||||||
|
int expectedInputIndex = beforeVsAfterOnMiss
|
||||||
|
? rowStartIndexes.get(rowIndex + 1) - 1
|
||||||
|
: rowStartIndexes.get(rowIndex + 1);
|
||||||
|
Assert.assertEquals(rows.getInputs().get(expectedInputIndex), searcher.current());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
//previous KV
|
//previous KV
|
||||||
KeyValue inputPreviousKv = KeyValueUtil.previousKey(kv);
|
KeyValue inputPreviousKv = KeyValueUtil.previousKey(kv);
|
||||||
boolean hit = searcher.positionAt(inputPreviousKv);
|
boolean hit = searcher.positionAt(inputPreviousKv);
|
||||||
Assert.assertFalse(hit);
|
Assert.assertFalse(hit);
|
||||||
position = searcher.positionAtOrAfter(inputPreviousKv);
|
position = searcher.positionAtOrAfter(inputPreviousKv);
|
||||||
if(CollectionUtils.isLastIndex(rows.getInputs(), i)){
|
if(CollectionUtils.isLastIndex(rows.getInputs(), i)){
|
||||||
Assert.assertTrue(CellScannerPosition.AFTER_LAST == position);
|
Assert.assertTrue(CellScannerPosition.AFTER_LAST == position);
|
||||||
}else{
|
}else{
|
||||||
Assert.assertTrue(CellScannerPosition.AFTER == position);
|
Assert.assertTrue(CellScannerPosition.AFTER == position);
|
||||||
/*
|
/*
|
||||||
* TODO: why i+1 instead of i?
|
* TODO: why i+1 instead of i?
|
||||||
*/
|
*/
|
||||||
Assert.assertEquals(rows.getInputs().get(i+1), searcher.current());
|
Assert.assertEquals(rows.getInputs().get(i+1), searcher.current());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -0,0 +1,219 @@
|
||||||
|
/**
|
||||||
|
* Copyright The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.encoding;
|
||||||
|
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListSet;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
||||||
|
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests scanning/seeking data with PrefixTree Encoding.
|
||||||
|
*/
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestPrefixTreeEncoding {
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(TestPrefixTreeEncoding.class);
|
||||||
|
static final String CF = "EncodingTestCF";
|
||||||
|
static final byte[] CF_BYTES = Bytes.toBytes(CF);
|
||||||
|
private static final int NUM_ROWS_PER_BATCH = 50;
|
||||||
|
private static final int NUM_COLS_PER_ROW = 20;
|
||||||
|
|
||||||
|
private int numBatchesWritten = 0;
|
||||||
|
private ConcurrentSkipListSet<KeyValue> kvset = new ConcurrentSkipListSet<KeyValue>(
|
||||||
|
KeyValue.COMPARATOR);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
kvset.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScanWithRandomData() throws Exception {
|
||||||
|
PrefixTreeCodec encoder = new PrefixTreeCodec();
|
||||||
|
ByteBuffer dataBuffer = generateRandomTestData(kvset, numBatchesWritten++);
|
||||||
|
HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
|
||||||
|
Algorithm.NONE, DataBlockEncoding.PREFIX_TREE, new byte[0]);
|
||||||
|
encoder.encodeKeyValues(dataBuffer, false, blkEncodingCtx);
|
||||||
|
EncodedSeeker seeker = encoder.createSeeker(KeyValue.KEY_COMPARATOR, false);
|
||||||
|
byte[] onDiskBytes=blkEncodingCtx.getOnDiskBytesWithHeader();
|
||||||
|
ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes,
|
||||||
|
DataBlockEncoding.ID_SIZE, onDiskBytes.length
|
||||||
|
- DataBlockEncoding.ID_SIZE);
|
||||||
|
seeker.setCurrentBuffer(readBuffer);
|
||||||
|
KeyValue previousKV = null;
|
||||||
|
do{
|
||||||
|
KeyValue currentKV = seeker.getKeyValue();
|
||||||
|
if (previousKV != null && KeyValue.COMPARATOR.compare(currentKV, previousKV) < 0) {
|
||||||
|
dumpInputKVSet();
|
||||||
|
fail("Current kv " + currentKV + " is smaller than previous keyvalue "
|
||||||
|
+ previousKV);
|
||||||
|
}
|
||||||
|
previousKV = currentKV;
|
||||||
|
} while (seeker.next());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSeekWithRandomData() throws Exception {
|
||||||
|
PrefixTreeCodec encoder = new PrefixTreeCodec();
|
||||||
|
int batchId = numBatchesWritten++;
|
||||||
|
ByteBuffer dataBuffer = generateRandomTestData(kvset, batchId);
|
||||||
|
HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
|
||||||
|
Algorithm.NONE, DataBlockEncoding.PREFIX_TREE, new byte[0]);
|
||||||
|
encoder.encodeKeyValues(dataBuffer, false, blkEncodingCtx);
|
||||||
|
EncodedSeeker seeker = encoder.createSeeker(KeyValue.KEY_COMPARATOR, false);
|
||||||
|
byte[] onDiskBytes = blkEncodingCtx.getOnDiskBytesWithHeader();
|
||||||
|
ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes,
|
||||||
|
DataBlockEncoding.ID_SIZE, onDiskBytes.length
|
||||||
|
- DataBlockEncoding.ID_SIZE);
|
||||||
|
verifySeeking(seeker, readBuffer, batchId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSeekWithFixedData() throws Exception {
|
||||||
|
PrefixTreeCodec encoder = new PrefixTreeCodec();
|
||||||
|
int batchId = numBatchesWritten++;
|
||||||
|
ByteBuffer dataBuffer = generateFixedTestData(kvset, batchId);
|
||||||
|
HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
|
||||||
|
Algorithm.NONE, DataBlockEncoding.PREFIX_TREE, new byte[0]);
|
||||||
|
encoder.encodeKeyValues(dataBuffer, false, blkEncodingCtx);
|
||||||
|
EncodedSeeker seeker = encoder.createSeeker(KeyValue.KEY_COMPARATOR,
|
||||||
|
false);
|
||||||
|
byte[] onDiskBytes = blkEncodingCtx.getOnDiskBytesWithHeader();
|
||||||
|
ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes,
|
||||||
|
DataBlockEncoding.ID_SIZE, onDiskBytes.length
|
||||||
|
- DataBlockEncoding.ID_SIZE);
|
||||||
|
verifySeeking(seeker, readBuffer, batchId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifySeeking(EncodedSeeker encodeSeeker,
|
||||||
|
ByteBuffer encodedData, int batchId) {
|
||||||
|
List<KeyValue> kvList = new ArrayList<KeyValue>();
|
||||||
|
for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
|
||||||
|
kvList.clear();
|
||||||
|
encodeSeeker.setCurrentBuffer(encodedData);
|
||||||
|
KeyValue firstOnRow = KeyValue.createFirstOnRow(getRowKey(batchId, i));
|
||||||
|
encodeSeeker.seekToKeyInBlock(firstOnRow.getBuffer(),
|
||||||
|
firstOnRow.getKeyOffset(), firstOnRow.getKeyLength(), false);
|
||||||
|
boolean hasMoreOfEncodeScanner = encodeSeeker.next();
|
||||||
|
CollectionBackedScanner collectionScanner = new CollectionBackedScanner(
|
||||||
|
this.kvset);
|
||||||
|
boolean hasMoreOfCollectionScanner = collectionScanner.seek(firstOnRow);
|
||||||
|
if (hasMoreOfEncodeScanner != hasMoreOfCollectionScanner) {
|
||||||
|
dumpInputKVSet();
|
||||||
|
fail("Get error result after seeking " + firstOnRow);
|
||||||
|
}
|
||||||
|
if (hasMoreOfEncodeScanner) {
|
||||||
|
if (KeyValue.COMPARATOR.compare(encodeSeeker.getKeyValue(),
|
||||||
|
collectionScanner.peek()) != 0) {
|
||||||
|
dumpInputKVSet();
|
||||||
|
fail("Expected " + collectionScanner.peek() + " actual "
|
||||||
|
+ encodeSeeker.getKeyValue() + ", after seeking " + firstOnRow);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void dumpInputKVSet() {
|
||||||
|
LOG.info("Dumping input keyvalue set in error case:");
|
||||||
|
for (KeyValue kv : kvset) {
|
||||||
|
System.out.println(kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ByteBuffer generateFixedTestData(
|
||||||
|
ConcurrentSkipListSet<KeyValue> kvset, int batchId) throws Exception {
|
||||||
|
ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
|
||||||
|
DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
|
||||||
|
for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
|
||||||
|
if (i / 10 % 2 == 1) continue;
|
||||||
|
for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
|
||||||
|
KeyValue kv = new KeyValue(getRowKey(batchId, i), CF_BYTES,
|
||||||
|
getQualifier(j), getValue(batchId, i, j));
|
||||||
|
kvset.add(kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (KeyValue kv : kvset) {
|
||||||
|
userDataStream.writeInt(kv.getKeyLength());
|
||||||
|
userDataStream.writeInt(kv.getValueLength());
|
||||||
|
userDataStream
|
||||||
|
.write(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
|
||||||
|
userDataStream.write(kv.getBuffer(), kv.getValueOffset(),
|
||||||
|
kv.getValueLength());
|
||||||
|
}
|
||||||
|
return ByteBuffer.wrap(baosInMemory.toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ByteBuffer generateRandomTestData(
|
||||||
|
ConcurrentSkipListSet<KeyValue> kvset, int batchId) throws Exception {
|
||||||
|
ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
|
||||||
|
DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
|
||||||
|
Random random = new Random();
|
||||||
|
for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
|
||||||
|
if (random.nextInt(100) < 50) continue;
|
||||||
|
for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
|
||||||
|
if (random.nextInt(100) < 50) continue;
|
||||||
|
KeyValue kv = new KeyValue(getRowKey(batchId, i), CF_BYTES,
|
||||||
|
getQualifier(j), getValue(batchId, i, j));
|
||||||
|
kvset.add(kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (KeyValue kv : kvset) {
|
||||||
|
userDataStream.writeInt(kv.getKeyLength());
|
||||||
|
userDataStream.writeInt(kv.getValueLength());
|
||||||
|
userDataStream
|
||||||
|
.write(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
|
||||||
|
userDataStream.write(kv.getBuffer(), kv.getValueOffset(),
|
||||||
|
kv.getValueLength());
|
||||||
|
}
|
||||||
|
return ByteBuffer.wrap(baosInMemory.toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] getRowKey(int batchId, int i) {
|
||||||
|
return Bytes.toBytes("batch" + batchId + "_row" + i);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] getQualifier(int j) {
|
||||||
|
return Bytes.toBytes("col" + j);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] getValue(int batchId, int i, int j) {
|
||||||
|
return Bytes.toBytes("value_for_" + Bytes.toString(getRowKey(batchId, i))
|
||||||
|
+ "_col" + j);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue