HBASE-1485 Wrong or indeterminate behavior when there are duplicate versions of a column (pranav via jgray)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@995163 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Gray 2010-09-08 17:22:31 +00:00
parent b89eb144d1
commit 4481fafee4
19 changed files with 679 additions and 65 deletions

View File

@ -507,6 +507,8 @@ Release 0.21.0 - Unreleased
HBaseConfiguration is changed
(Robert Mahfoud via Stack)
HBASE-2964 Deadlock when RS tries to RPC to itself inside SplitTransaction
HBASE-1485 Wrong or indeterminate behavior when there are duplicate
versions of a column (pranav via jgray)
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable

View File

@ -42,9 +42,11 @@ public interface ColumnTracker {
* @param bytes
* @param offset
* @param length
* @param timestamp
* @return The match code instance.
*/
public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, int length);
public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
int length, long timestamp);
/**
* Updates internal variables in between files

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -51,6 +52,9 @@ public class ExplicitColumnTracker implements ColumnTracker {
private final List<ColumnCount> columnsToReuse;
private int index;
private ColumnCount column;
/** Keeps track of the latest timestamp included for current column.
* Used to eliminate duplicates. */
private long latestTSOfCurrentColumn;
/**
* Default constructor.
@ -84,51 +88,63 @@ public class ExplicitColumnTracker implements ColumnTracker {
* @param bytes KeyValue buffer
* @param offset offset to the start of the qualifier
* @param length length of the qualifier
* @param timestamp timestamp of the key being checked
* @return MatchCode telling ScanQueryMatcher what action to take
*/
public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, int length) {
public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
int length, long timestamp) {
do {
// No more columns left, we are done with this query
if(this.columns.size() == 0) {
return ScanQueryMatcher.MatchCode.DONE; // done_row
return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
}
// No more columns to match against, done with storefile
if(this.column == null) {
return ScanQueryMatcher.MatchCode.NEXT; // done_row
return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
}
// Compare specific column to current column
int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(),
column.getLength(), bytes, offset, length);
// Matches, decrement versions left and include
// Column Matches. If it is not a duplicate key, decrement versions left
// and include.
if(ret == 0) {
//If column matches, check if it is a duplicate timestamp
if (sameAsPreviousTS(timestamp)) {
//If duplicate, skip this Key
return ScanQueryMatcher.MatchCode.SKIP;
}
if(this.column.decrement() == 0) {
// Done with versions for this column
this.columns.remove(this.index);
resetTS();
if(this.columns.size() == this.index) {
// Will not hit any more columns in this storefile
this.column = null;
} else {
this.column = this.columns.get(this.index);
}
} else {
setTS(timestamp);
}
return ScanQueryMatcher.MatchCode.INCLUDE;
}
resetTS();
if (ret > 0) {
// Specified column is smaller than the current, skip to next column.
return ScanQueryMatcher.MatchCode.SKIP;
return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
}
// Specified column is bigger than current column
// Move down current column and check again
if(ret <= -1) {
if(++this.index == this.columns.size()) {
if(++this.index >= this.columns.size()) {
// No more to match, do not include, done with storefile
return ScanQueryMatcher.MatchCode.NEXT; // done_row
return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
}
// This is the recursive case.
this.column = this.columns.get(this.index);
@ -154,6 +170,19 @@ public class ExplicitColumnTracker implements ColumnTracker {
buildColumnList();
this.index = 0;
this.column = this.columns.get(this.index);
resetTS();
}
private void resetTS() {
latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP;
}
private void setTS(long timestamp) {
latestTSOfCurrentColumn = timestamp;
}
private boolean sameAsPreviousTS(long timestamp) {
return timestamp == latestTSOfCurrentColumn;
}
private void buildColumnList() {

View File

@ -87,7 +87,7 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
} else {
KeyValueScanner topScanner = this.heap.peek();
if (topScanner == null ||
this.comparator.compare(kvNext, topScanner.peek()) > 0) {
this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
this.heap.add(this.current);
this.current = this.heap.poll();
}
@ -153,7 +153,22 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
this.kvComparator = kvComparator;
}
public int compare(KeyValueScanner left, KeyValueScanner right) {
return compare(left.peek(), right.peek());
int comparison = compare(left.peek(), right.peek());
if (comparison != 0) {
return comparison;
} else {
// Since both the keys are exactly the same, we break the tie in favor
// of the key which came latest.
long leftSequenceID = left.getSequenceID();
long rightSequenceID = right.getSequenceID();
if (leftSequenceID > rightSequenceID) {
return -1;
} else if (leftSequenceID < rightSequenceID) {
return 1;
} else {
return 0;
}
}
}
/**
* Compares two KeyValue
@ -253,4 +268,9 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
public PriorityQueue<KeyValueScanner> getHeap() {
return this.heap;
}
@Override
public long getSequenceID() {
return 0;
}
}

View File

@ -56,6 +56,14 @@ public interface KeyValueScanner {
*/
public boolean reseek(KeyValue key) throws IOException;
/**
* Get the sequence id associated with this KeyValueScanner. This is required
* for comparing multiple files to find out which one has the latest data.
* The default implementation for this would be to return 0. A file having
* lower sequence id will be considered to be the older one.
*/
public long getSequenceID();
/**
* Close the KeyValue scanner.
*/

View File

@ -668,6 +668,15 @@ public class MemStore implements HeapSize {
this.kvsetIt = null;
this.snapshotIt = null;
}
/**
* MemStoreScanner returns max value as sequence id because it will
* always have the latest data among all files.
*/
@Override
public long getSequenceID() {
return Long.MAX_VALUE;
}
}
public final static long FIXED_OVERHEAD = ClassSize.align(

View File

@ -130,4 +130,9 @@ public class MinorCompactingStoreScanner implements KeyValueScanner, InternalSca
public void close() {
heap.close();
}
@Override
public long getSequenceID() {
return 0;
}
}

View File

@ -199,18 +199,16 @@ public class ScanQueryMatcher {
}
}
MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength);
// if SKIP -> SEEK_NEXT_COL
// if (NEXT,DONE) -> SEEK_NEXT_ROW
// if (INCLUDE) -> INCLUDE
if (colChecker == MatchCode.SKIP) {
return MatchCode.SEEK_NEXT_COL;
} else if (colChecker == MatchCode.NEXT || colChecker == MatchCode.DONE) {
MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, timestamp);
/*
* According to current implementation, colChecker can only be
* SEEK_NEXT_COL, SEEK_NEXT_ROW, SKIP or INCLUDE. Therefore, always return
* the MatchCode. If it is SEEK_NEXT_ROW, also set stickyNextRow.
*/
if (colChecker == MatchCode.SEEK_NEXT_ROW) {
stickyNextRow = true;
return MatchCode.SEEK_NEXT_ROW;
}
return MatchCode.INCLUDE;
return colChecker;
}

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.util.Bytes;
@ -36,6 +37,9 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
private int columnLength = 0;
private int currentCount = 0;
private int maxVersions;
/* Keeps track of the latest timestamp included for current column.
* Used to eliminate duplicates. */
private long latestTSOfCurrentColumn;
/**
* Return maxVersions of every row.
@ -53,10 +57,12 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
* @param bytes
* @param offset
* @param length
* @param timestamp
* @return The match code instance.
*/
@Override
public MatchCode checkColumn(byte[] bytes, int offset, int length) {
public MatchCode checkColumn(byte[] bytes, int offset, int length,
long timestamp) {
if (columnBuffer == null) {
// first iteration.
columnBuffer = bytes;
@ -64,18 +70,28 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
columnLength = length;
currentCount = 0;
if (++currentCount > maxVersions)
return ScanQueryMatcher.MatchCode.SKIP;
if (++currentCount > maxVersions) {
return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
}
setTS(timestamp);
return ScanQueryMatcher.MatchCode.INCLUDE;
}
int cmp = Bytes.compareTo(bytes, offset, length,
columnBuffer, columnOffset, columnLength);
if (cmp == 0) {
if (++currentCount > maxVersions)
return ScanQueryMatcher.MatchCode.SKIP; // skip to next col
//If column matches, check if it is a duplicate timestamp
if (sameAsPreviousTS(timestamp)) {
return ScanQueryMatcher.MatchCode.SKIP;
}
if (++currentCount > maxVersions) {
return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col
}
setTS(timestamp);
return ScanQueryMatcher.MatchCode.INCLUDE;
}
resetTS();
// new col > old col
if (cmp > 0) {
// switched columns, lets do something.x
@ -84,7 +100,8 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
columnLength = length;
currentCount = 0;
if (++currentCount > maxVersions)
return ScanQueryMatcher.MatchCode.SKIP;
return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
setTS(timestamp);
return ScanQueryMatcher.MatchCode.INCLUDE;
}
@ -101,8 +118,10 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
columnOffset = offset;
columnLength = length;
currentCount = 0;
if (++currentCount > maxVersions)
return ScanQueryMatcher.MatchCode.SKIP;
if (++currentCount > maxVersions) {
return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
}
setTS(timestamp);
return ScanQueryMatcher.MatchCode.INCLUDE;
}
@ -116,6 +135,19 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
@Override
public void reset() {
columnBuffer = null;
resetTS();
}
private void resetTS() {
latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP;
}
private void setTS(long timestamp) {
latestTSOfCurrentColumn = timestamp;
}
private boolean sameAsPreviousTS(long timestamp) {
return timestamp == latestTSOfCurrentColumn;
}
/**

View File

@ -184,7 +184,7 @@ public class Store implements HeapSize {
}
this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
this.storefiles = ImmutableList.copyOf(loadStoreFiles());
this.storefiles = sortAndClone(loadStoreFiles());
}
public HColumnDescriptor getFamily() {
@ -219,7 +219,7 @@ public class Store implements HeapSize {
}
/*
* Creates a series of StoreFile loaded from the given directory.
* Creates an unsorted list of StoreFile loaded from the given directory.
* @throws IOException
*/
private List<StoreFile> loadStoreFiles()
@ -256,7 +256,6 @@ public class Store implements HeapSize {
}
results.add(curfile);
}
Collections.sort(results, StoreFile.Comparators.FLUSH_TIME);
return results;
}
@ -357,7 +356,7 @@ public class Store implements HeapSize {
try {
ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
newFiles.add(sf);
this.storefiles = ImmutableList.copyOf(newFiles);
this.storefiles = sortAndClone(newFiles);
notifyChangedReadersObservers();
} finally {
this.lock.writeLock().unlock();
@ -511,7 +510,7 @@ public class Store implements HeapSize {
try {
ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles);
newList.add(sf);
storefiles = ImmutableList.copyOf(newList);
storefiles = sortAndClone(newList);
this.memstore.clearSnapshot(set);
// Tell listeners of the change in readers.
@ -900,7 +899,7 @@ public class Store implements HeapSize {
newStoreFiles.add(result);
}
this.storefiles = ImmutableList.copyOf(newStoreFiles);
this.storefiles = sortAndClone(newStoreFiles);
// Tell observers that list of StoreFiles has changed.
notifyChangedReadersObservers();
@ -931,6 +930,12 @@ public class Store implements HeapSize {
return result;
}
public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
Collections.sort(storeFiles, StoreFile.Comparators.FLUSH_TIME);
ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
return newList;
}
// ////////////////////////////////////////////////////////////////////////////
// Accessors.
// (This is the only section that is directly useful!)

View File

@ -394,6 +394,7 @@ public class StoreFile {
}
}
}
this.reader.setSequenceID(this.sequenceid);
b = metadataMap.get(MAJOR_COMPACTION_KEY);
if (b != null) {
@ -866,6 +867,7 @@ public class StoreFile {
protected BloomType bloomFilterType;
private final HFile.Reader reader;
protected TimeRangeTracker timeRangeTracker = null;
protected long sequenceID = -1;
public Reader(FileSystem fs, Path path, BlockCache blockCache, boolean inMemory)
throws IOException {
@ -1048,6 +1050,14 @@ public class StoreFile {
public BloomType getBloomFilterType() {
return this.bloomFilterType;
}
public long getSequenceID() {
return sequenceID;
}
public void setSequenceID(long sequenceID) {
this.sequenceID = sequenceID;
}
}
/**

View File

@ -163,4 +163,9 @@ class StoreFileScanner implements KeyValueScanner {
public boolean shouldSeek(Scan scan, final SortedSet<byte[]> columns) {
return reader.shouldSeek(scan, columns);
}
@Override
public long getSequenceID() {
return reader.getSequenceID();
}
}

View File

@ -368,4 +368,9 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
//guarantees that heap will never be null before this call.
return this.heap.reseek(kv);
}
@Override
public long getSequenceID() {
return 0;
}
}

View File

@ -35,6 +35,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import org.apache.commons.logging.Log;
@ -2844,7 +2845,7 @@ public class TestFromClientSide {
return Bytes.equals(left, right);
}
@Ignore @Test
@Test
public void testDuplicateVersions() throws Exception {
byte [] TABLE = Bytes.toBytes("testDuplicateVersions");
@ -3044,18 +3045,198 @@ public class TestFromClientSide {
get.setMaxVersions(Integer.MAX_VALUE);
result = ht.get(get);
assertNResult(result, ROW, FAMILY, QUALIFIER,
new long [] {STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[8], STAMPS[9], STAMPS[13], STAMPS[15]},
new byte[][] {VALUES[3], VALUES[14], VALUES[5], VALUES[6], VALUES[8], VALUES[9], VALUES[13], VALUES[15]},
0, 7);
new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[8], STAMPS[9], STAMPS[13], STAMPS[15]},
new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[14], VALUES[5], VALUES[6], VALUES[8], VALUES[9], VALUES[13], VALUES[15]},
0, 9);
scan = new Scan(ROW);
scan.addColumn(FAMILY, QUALIFIER);
scan.setMaxVersions(Integer.MAX_VALUE);
result = getSingleScanResult(ht, scan);
assertNResult(result, ROW, FAMILY, QUALIFIER,
new long [] {STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[8], STAMPS[9], STAMPS[13], STAMPS[15]},
new byte[][] {VALUES[3], VALUES[14], VALUES[5], VALUES[6], VALUES[8], VALUES[9], VALUES[13], VALUES[15]},
0, 7);
new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[8], STAMPS[9], STAMPS[13], STAMPS[15]},
new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[14], VALUES[5], VALUES[6], VALUES[8], VALUES[9], VALUES[13], VALUES[15]},
0, 9);
}
@Test
public void testUpdates() throws Exception {
byte [] TABLE = Bytes.toBytes("testUpdates");
HTable hTable = TEST_UTIL.createTable(TABLE, FAMILY, 10);
// Write a column with values at timestamp 1, 2 and 3
byte[] row = Bytes.toBytes("row1");
byte[] qualifier = Bytes.toBytes("myCol");
Put put = new Put(row);
put.add(FAMILY, qualifier, 1L, Bytes.toBytes("AAA"));
hTable.put(put);
put = new Put(row);
put.add(FAMILY, qualifier, 2L, Bytes.toBytes("BBB"));
hTable.put(put);
put = new Put(row);
put.add(FAMILY, qualifier, 3L, Bytes.toBytes("EEE"));
hTable.put(put);
Get get = new Get(row);
get.addColumn(FAMILY, qualifier);
get.setMaxVersions();
// Check that the column indeed has the right values at timestamps 1 and
// 2
Result result = hTable.get(get);
NavigableMap<Long, byte[]> navigableMap =
result.getMap().get(FAMILY).get(qualifier);
assertEquals("AAA", Bytes.toString(navigableMap.get(1L)));
assertEquals("BBB", Bytes.toString(navigableMap.get(2L)));
// Update the value at timestamp 1
put = new Put(row);
put.add(FAMILY, qualifier, 1L, Bytes.toBytes("CCC"));
hTable.put(put);
// Update the value at timestamp 2
put = new Put(row);
put.add(FAMILY, qualifier, 2L, Bytes.toBytes("DDD"));
hTable.put(put);
// Check that the values at timestamp 2 and 1 got updated
result = hTable.get(get);
navigableMap = result.getMap().get(FAMILY).get(qualifier);
assertEquals("CCC", Bytes.toString(navigableMap.get(1L)));
assertEquals("DDD", Bytes.toString(navigableMap.get(2L)));
}
@Test
public void testUpdatesWithMajorCompaction() throws Exception {
String tableName = "testUpdatesWithMajorCompaction";
byte [] TABLE = Bytes.toBytes(tableName);
HTable hTable = TEST_UTIL.createTable(TABLE, FAMILY, 10);
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
// Write a column with values at timestamp 1, 2 and 3
byte[] row = Bytes.toBytes("row2");
byte[] qualifier = Bytes.toBytes("myCol");
Put put = new Put(row);
put.add(FAMILY, qualifier, 1L, Bytes.toBytes("AAA"));
hTable.put(put);
put = new Put(row);
put.add(FAMILY, qualifier, 2L, Bytes.toBytes("BBB"));
hTable.put(put);
put = new Put(row);
put.add(FAMILY, qualifier, 3L, Bytes.toBytes("EEE"));
hTable.put(put);
Get get = new Get(row);
get.addColumn(FAMILY, qualifier);
get.setMaxVersions();
// Check that the column indeed has the right values at timestamps 1 and
// 2
Result result = hTable.get(get);
NavigableMap<Long, byte[]> navigableMap =
result.getMap().get(FAMILY).get(qualifier);
assertEquals("AAA", Bytes.toString(navigableMap.get(1L)));
assertEquals("BBB", Bytes.toString(navigableMap.get(2L)));
// Trigger a major compaction
admin.flush(tableName);
admin.majorCompact(tableName);
Thread.sleep(6000);
// Update the value at timestamp 1
put = new Put(row);
put.add(FAMILY, qualifier, 1L, Bytes.toBytes("CCC"));
hTable.put(put);
// Update the value at timestamp 2
put = new Put(row);
put.add(FAMILY, qualifier, 2L, Bytes.toBytes("DDD"));
hTable.put(put);
// Trigger a major compaction
admin.flush(tableName);
admin.majorCompact(tableName);
Thread.sleep(6000);
// Check that the values at timestamp 2 and 1 got updated
result = hTable.get(get);
navigableMap = result.getMap().get(FAMILY).get(qualifier);
assertEquals("CCC", Bytes.toString(navigableMap.get(1L)));
assertEquals("DDD", Bytes.toString(navigableMap.get(2L)));
}
@Test
public void testMajorCompactionBetweenTwoUpdates() throws Exception {
String tableName = "testMajorCompactionBetweenTwoUpdates";
byte [] TABLE = Bytes.toBytes(tableName);
HTable hTable = TEST_UTIL.createTable(TABLE, FAMILY, 10);
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
// Write a column with values at timestamp 1, 2 and 3
byte[] row = Bytes.toBytes("row3");
byte[] qualifier = Bytes.toBytes("myCol");
Put put = new Put(row);
put.add(FAMILY, qualifier, 1L, Bytes.toBytes("AAA"));
hTable.put(put);
put = new Put(row);
put.add(FAMILY, qualifier, 2L, Bytes.toBytes("BBB"));
hTable.put(put);
put = new Put(row);
put.add(FAMILY, qualifier, 3L, Bytes.toBytes("EEE"));
hTable.put(put);
Get get = new Get(row);
get.addColumn(FAMILY, qualifier);
get.setMaxVersions();
// Check that the column indeed has the right values at timestamps 1 and
// 2
Result result = hTable.get(get);
NavigableMap<Long, byte[]> navigableMap =
result.getMap().get(FAMILY).get(qualifier);
assertEquals("AAA", Bytes.toString(navigableMap.get(1L)));
assertEquals("BBB", Bytes.toString(navigableMap.get(2L)));
// Trigger a major compaction
admin.flush(tableName);
admin.majorCompact(tableName);
Thread.sleep(6000);
// Update the value at timestamp 1
put = new Put(row);
put.add(FAMILY, qualifier, 1L, Bytes.toBytes("CCC"));
hTable.put(put);
// Trigger a major compaction
admin.flush(tableName);
admin.majorCompact(tableName);
Thread.sleep(6000);
// Update the value at timestamp 2
put = new Put(row);
put.add(FAMILY, qualifier, 2L, Bytes.toBytes("DDD"));
hTable.put(put);
// Trigger a major compaction
admin.flush(tableName);
admin.majorCompact(tableName);
Thread.sleep(6000);
// Check that the values at timestamp 2 and 1 got updated
result = hTable.get(get);
navigableMap = result.getMap().get(FAMILY).get(qualifier);
assertEquals("CCC", Bytes.toString(navigableMap.get(1L)));
assertEquals("DDD", Bytes.toString(navigableMap.get(2L)));
}
@Test

View File

@ -103,4 +103,9 @@ public class KeyValueScanFixture implements KeyValueScanner {
public void close() {
// noop.
}
@Override
public long getSequenceID() {
return 0;
}
}

View File

@ -0,0 +1,289 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
public class TestColumnSeeking {
private final static HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
static final Log LOG = LogFactory.getLog(TestColumnSeeking.class);
@SuppressWarnings("unchecked")
@Test
public void testDuplicateVersions() throws IOException {
String family = "Family";
byte[] familyBytes = Bytes.toBytes("Family");
String table = "TestDuplicateVersions";
HColumnDescriptor hcd =
new HColumnDescriptor(familyBytes, 1000,
HColumnDescriptor.DEFAULT_COMPRESSION,
HColumnDescriptor.DEFAULT_IN_MEMORY,
HColumnDescriptor.DEFAULT_BLOCKCACHE,
HColumnDescriptor.DEFAULT_TTL,
HColumnDescriptor.DEFAULT_BLOOMFILTER);
HTableDescriptor htd = new HTableDescriptor(table);
htd.addFamily(hcd);
HRegionInfo info = new HRegionInfo(htd, null, null, false);
HRegion region =
HRegion.createHRegion(info, HBaseTestingUtility.getTestDir(), TEST_UTIL
.getConfiguration());
List<String> rows = generateRandomWords(10, "row");
List<String> allColumns = generateRandomWords(10, "column");
List<String> values = generateRandomWords(100, "value");
long maxTimestamp = 2;
double selectPercent = 0.5;
int numberOfTests = 5;
double flushPercentage = 0.2;
double minorPercentage = 0.2;
double majorPercentage = 0.2;
double putPercentage = 0.2;
HashMap<String, KeyValue> allKVMap = new HashMap<String, KeyValue>();
HashMap<String, KeyValue>[] kvMaps = new HashMap[numberOfTests];
ArrayList<String>[] columnLists = new ArrayList[numberOfTests];
for (int i = 0; i < numberOfTests; i++) {
kvMaps[i] = new HashMap<String, KeyValue>();
columnLists[i] = new ArrayList<String>();
for (String column : allColumns) {
if (Math.random() < selectPercent) {
columnLists[i].add(column);
}
}
}
for (String value : values) {
for (String row : rows) {
Put p = new Put(Bytes.toBytes(row));
for (String column : allColumns) {
for (long timestamp = 1; timestamp <= maxTimestamp; timestamp++) {
KeyValue kv =
KeyValueTestUtil.create(row, family, column, timestamp, value);
if (Math.random() < putPercentage) {
p.add(kv);
allKVMap.put(kv.getKeyString(), kv);
for (int i = 0; i < numberOfTests; i++) {
if (columnLists[i].contains(column)) {
kvMaps[i].put(kv.getKeyString(), kv);
}
}
}
}
}
region.put(p);
if (Math.random() < flushPercentage) {
LOG.info("Flushing... ");
region.flushcache();
}
if (Math.random() < minorPercentage) {
LOG.info("Minor compacting... ");
region.compactStores(false);
}
if (Math.random() < majorPercentage) {
LOG.info("Major compacting... ");
region.compactStores(true);
}
}
}
for (int i = 0; i < numberOfTests + 1; i++) {
Collection<KeyValue> kvSet;
Scan scan = new Scan();
scan.setMaxVersions();
if (i < numberOfTests) {
kvSet = kvMaps[i].values();
for (String column : columnLists[i]) {
scan.addColumn(familyBytes, Bytes.toBytes(column));
}
LOG.info("ExplicitColumns scanner");
LOG.info("Columns: " + columnLists[i].size() + " Keys: "
+ kvSet.size());
} else {
kvSet = allKVMap.values();
LOG.info("Wildcard scanner");
LOG.info("Columns: " + allColumns.size() + " Keys: " + kvSet.size());
}
InternalScanner scanner = region.getScanner(scan);
List<KeyValue> results = new ArrayList<KeyValue>();
while (scanner.next(results))
;
assertEquals(kvSet.size(), results.size());
assertTrue(results.containsAll(kvSet));
}
}
@SuppressWarnings("unchecked")
@Test
public void testReseeking() throws IOException {
String family = "Family";
byte[] familyBytes = Bytes.toBytes("Family");
String table = "TestSingleVersions";
HTableDescriptor htd = new HTableDescriptor(table);
htd.addFamily(new HColumnDescriptor(family));
HRegionInfo info = new HRegionInfo(htd, null, null, false);
HRegion region =
HRegion.createHRegion(info, HBaseTestingUtility.getTestDir(), TEST_UTIL
.getConfiguration());
List<String> rows = generateRandomWords(10, "row");
List<String> allColumns = generateRandomWords(100, "column");
long maxTimestamp = 2;
double selectPercent = 0.5;
int numberOfTests = 5;
double flushPercentage = 0.2;
double minorPercentage = 0.2;
double majorPercentage = 0.2;
double putPercentage = 0.2;
HashMap<String, KeyValue> allKVMap = new HashMap<String, KeyValue>();
HashMap<String, KeyValue>[] kvMaps = new HashMap[numberOfTests];
ArrayList<String>[] columnLists = new ArrayList[numberOfTests];
String valueString = "Value";
for (int i = 0; i < numberOfTests; i++) {
kvMaps[i] = new HashMap<String, KeyValue>();
columnLists[i] = new ArrayList<String>();
for (String column : allColumns) {
if (Math.random() < selectPercent) {
columnLists[i].add(column);
}
}
}
for (String row : rows) {
Put p = new Put(Bytes.toBytes(row));
for (String column : allColumns) {
for (long timestamp = 1; timestamp <= maxTimestamp; timestamp++) {
KeyValue kv =
KeyValueTestUtil.create(row, family, column, timestamp,
valueString);
if (Math.random() < putPercentage) {
p.add(kv);
allKVMap.put(kv.getKeyString(), kv);
for (int i = 0; i < numberOfTests; i++) {
if (columnLists[i].contains(column)) {
kvMaps[i].put(kv.getKeyString(), kv);
}
}
}
}
}
region.put(p);
if (Math.random() < flushPercentage) {
LOG.info("Flushing... ");
region.flushcache();
}
if (Math.random() < minorPercentage) {
LOG.info("Minor compacting... ");
region.compactStores(false);
}
if (Math.random() < majorPercentage) {
LOG.info("Major compacting... ");
region.compactStores(true);
}
}
for (int i = 0; i < numberOfTests + 1; i++) {
Collection<KeyValue> kvSet;
Scan scan = new Scan();
scan.setMaxVersions();
if (i < numberOfTests) {
kvSet = kvMaps[i].values();
for (String column : columnLists[i]) {
scan.addColumn(familyBytes, Bytes.toBytes(column));
}
LOG.info("ExplicitColumns scanner");
LOG.info("Columns: " + columnLists[i].size() + " Keys: "
+ kvSet.size());
} else {
kvSet = allKVMap.values();
LOG.info("Wildcard scanner");
LOG.info("Columns: " + allColumns.size() + " Keys: " + kvSet.size());
}
InternalScanner scanner = region.getScanner(scan);
List<KeyValue> results = new ArrayList<KeyValue>();
while (scanner.next(results))
;
assertEquals(kvSet.size(), results.size());
assertTrue(results.containsAll(kvSet));
}
}
List<String> generateRandomWords(int numberOfWords, String suffix) {
Set<String> wordSet = new HashSet<String>();
for (int i = 0; i < numberOfWords; i++) {
int lengthOfWords = (int) (Math.random() * 5) + 1;
char[] wordChar = new char[lengthOfWords];
for (int j = 0; j < wordChar.length; j++) {
wordChar[j] = (char) (Math.random() * 26 + 97);
}
String word;
if (suffix == null) {
word = new String(wordChar);
} else {
word = new String(wordChar) + suffix;
}
wordSet.add(word);
}
List<String> wordList = new ArrayList<String>(wordSet);
return wordList;
}
}

View File

@ -50,9 +50,10 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
//Initialize result
List<ScanQueryMatcher.MatchCode> result = new ArrayList<ScanQueryMatcher.MatchCode>();
long timestamp = 0;
//"Match"
for(byte [] col : scannerColumns){
result.add(exp.checkColumn(col, 0, col.length));
result.add(exp.checkColumn(col, 0, col.length, ++timestamp));
}
assertEquals(expected.size(), result.size());
@ -76,11 +77,11 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
columns.add(col2);
columns.add(col4);
List<MatchCode> expected = new ArrayList<ScanQueryMatcher.MatchCode>();
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.DONE);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
int maxVersions = 1;
//Create "Scanner"
@ -106,25 +107,25 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
columns.add(col4);
List<ScanQueryMatcher.MatchCode> expected = new ArrayList<ScanQueryMatcher.MatchCode>();
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.DONE);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
expected.add(ScanQueryMatcher.MatchCode.DONE);
expected.add(ScanQueryMatcher.MatchCode.DONE);
expected.add(ScanQueryMatcher.MatchCode.DONE);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
int maxVersions = 2;
//Create "Scanner"
@ -163,13 +164,13 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
ColumnTracker explicit = new ExplicitColumnTracker(columns, maxVersions);
for (int i = 0; i < 100000; i+=2) {
byte [] col = Bytes.toBytes("col"+i);
explicit.checkColumn(col, 0, col.length);
explicit.checkColumn(col, 0, col.length, 1);
}
explicit.update();
for (int i = 1; i < 100000; i+=2) {
byte [] col = Bytes.toBytes("col"+i);
explicit.checkColumn(col, 0, col.length);
explicit.checkColumn(col, 0, col.length, 1);
}
}
@ -184,8 +185,8 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
new byte[][] { col1, col4 });
List<ScanQueryMatcher.MatchCode> expected = Arrays.<ScanQueryMatcher.MatchCode>asList(
new ScanQueryMatcher.MatchCode[] {
ScanQueryMatcher.MatchCode.SKIP,
ScanQueryMatcher.MatchCode.SKIP });
ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
ScanQueryMatcher.MatchCode.SEEK_NEXT_COL });
runTest(1, columns, scanner, expected);
}
}

View File

@ -259,6 +259,11 @@ public class TestKeyValueHeap extends HBaseTestCase {
public boolean reseek(KeyValue key) throws IOException {
return seek(key);
}
@Override
public long getSequenceID() {
return 0;
}
}
}

View File

@ -52,7 +52,8 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
List<ScanQueryMatcher.MatchCode> actual = new ArrayList<MatchCode>();
for(byte [] qualifier : qualifiers) {
ScanQueryMatcher.MatchCode mc = tracker.checkColumn(qualifier, 0, qualifier.length);
ScanQueryMatcher.MatchCode mc = tracker.checkColumn(qualifier, 0,
qualifier.length, 1);
actual.add(mc);
}
@ -77,13 +78,15 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
List<ScanQueryMatcher.MatchCode> expected = new ArrayList<MatchCode>();
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.SKIP);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
List<MatchCode> actual = new ArrayList<ScanQueryMatcher.MatchCode>();
long timestamp = 0;
for(byte [] qualifier : qualifiers) {
MatchCode mc = tracker.checkColumn(qualifier, 0, qualifier.length);
MatchCode mc = tracker.checkColumn(qualifier, 0, qualifier.length,
++timestamp);
actual.add(mc);
}
@ -106,7 +109,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
try {
for(byte [] qualifier : qualifiers) {
tracker.checkColumn(qualifier, 0, qualifier.length);
tracker.checkColumn(qualifier, 0, qualifier.length, 1);
}
} catch (Exception e) {
ok = true;