HBASE-2517 During reads when passed the specified time range, seek to next column (Pranav via jgray)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@964496 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e58e1f82fb
commit
9a4a3f512c
|
@ -766,6 +766,8 @@ Release 0.21.0 - Unreleased
|
|||
(Pranav via Ryan)
|
||||
HBASE-2836 Speed mvn site building by removing generation of useless reports
|
||||
HBASE-2808 Document the implementation of replication
|
||||
HBASE-2517 During reads when passed the specified time range, seek to
|
||||
next column (Pranav via jgray)
|
||||
|
||||
NEW FEATURES
|
||||
HBASE-1961 HBase EC2 scripts
|
||||
|
|
|
@ -147,6 +147,23 @@ public class TimeRange implements Writable {
|
|||
return (timestamp >= minStamp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare the timestamp to timerange
|
||||
* @param timestamp
|
||||
* @return -1 if timestamp is less than timerange,
|
||||
* 0 if timestamp is within timerange,
|
||||
* 1 if timestamp is greater than timerange
|
||||
*/
|
||||
public int compare(long timestamp) {
|
||||
if (timestamp < minStamp) {
|
||||
return -1;
|
||||
} else if (timestamp >= maxStamp) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
|
|
@ -163,4 +163,39 @@ public class ExplicitColumnTracker implements ColumnTracker {
|
|||
col.setCount(this.maxVersions);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used to inform the column tracker that we are done with
|
||||
* this column. We may get this information from external filters or
|
||||
* timestamp range and we then need to indicate this information to
|
||||
* tracker. It is required only in case of ExplicitColumnTracker.
|
||||
* @param bytes
|
||||
* @param offset
|
||||
* @param length
|
||||
*/
|
||||
public void doneWithColumn(byte [] bytes, int offset, int length) {
|
||||
while (this.column != null) {
|
||||
int compare = Bytes.compareTo(column.getBuffer(), column.getOffset(),
|
||||
column.getLength(), bytes, offset, length);
|
||||
if (compare == 0) {
|
||||
this.columns.remove(this.index);
|
||||
if (this.columns.size() == this.index) {
|
||||
// Will not hit any more columns in this storefile
|
||||
this.column = null;
|
||||
} else {
|
||||
this.column = this.columns.get(this.index);
|
||||
}
|
||||
return;
|
||||
} else if ( compare <= -1) {
|
||||
if(++this.index != this.columns.size()) {
|
||||
this.column = this.columns.get(this.index);
|
||||
} else {
|
||||
this.column = null;
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -111,9 +111,16 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
return false;
|
||||
}
|
||||
InternalScanner currentAsInternal = (InternalScanner)this.current;
|
||||
currentAsInternal.next(result, limit);
|
||||
boolean mayContainsMoreRows = currentAsInternal.next(result, limit);
|
||||
KeyValue pee = this.current.peek();
|
||||
if (pee == null) {
|
||||
/*
|
||||
* By definition, any InternalScanner must return false only when it has no
|
||||
* further rows to be fetched. So, we can close a scanner if it returns
|
||||
* false. All existing implementations seem to be fine with this. It is much
|
||||
* more efficient to close scanners which are not needed than keep them in
|
||||
* the heap. This is also required for certain optimizations.
|
||||
*/
|
||||
if (pee == null || !mayContainsMoreRows) {
|
||||
this.current.close();
|
||||
} else {
|
||||
this.heap.add(this.current);
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
|
||||
|
@ -34,6 +35,7 @@ public class ScanQueryMatcher extends QueryMatcher {
|
|||
// Optimization so we can skip lots of compares when we decide to skip
|
||||
// to the next row.
|
||||
private boolean stickyNextRow;
|
||||
private byte[] stopRow;
|
||||
|
||||
/**
|
||||
* Constructs a QueryMatcher for a Scan.
|
||||
|
@ -50,6 +52,7 @@ public class ScanQueryMatcher extends QueryMatcher {
|
|||
this.oldestStamp = System.currentTimeMillis() - ttl;
|
||||
this.rowComparator = rowComparator;
|
||||
this.deletes = new ScanDeleteTracker();
|
||||
this.stopRow = scan.getStopRow();
|
||||
this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
|
||||
this.filter = scan.getFilter();
|
||||
|
||||
|
@ -140,17 +143,37 @@ public class ScanQueryMatcher extends QueryMatcher {
|
|||
return MatchCode.SKIP;
|
||||
}
|
||||
|
||||
if (!tr.withinTimeRange(timestamp)) {
|
||||
return MatchCode.SKIP;
|
||||
}
|
||||
|
||||
if (!this.deletes.isEmpty() &&
|
||||
deletes.isDeleted(bytes, offset, qualLength, timestamp)) {
|
||||
return MatchCode.SKIP;
|
||||
}
|
||||
|
||||
MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength);
|
||||
int timestampComparison = tr.compare(timestamp);
|
||||
if (timestampComparison >= 1) {
|
||||
return MatchCode.SKIP;
|
||||
} else if (timestampComparison <= -1) {
|
||||
return getNextRowOrNextColumn(bytes, offset, qualLength);
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters should be checked before checking column trackers. If we do
|
||||
* otherwise, as was previously being done, ColumnTracker may increment its
|
||||
* counter for even that KV which may be discarded later on by Filter. This
|
||||
* would lead to incorrect results in certain cases.
|
||||
*/
|
||||
if (filter != null) {
|
||||
ReturnCode filterResponse = filter.filterKeyValue(kv);
|
||||
if (filterResponse == ReturnCode.SKIP) {
|
||||
return MatchCode.SKIP;
|
||||
} else if (filterResponse == ReturnCode.NEXT_COL) {
|
||||
return getNextRowOrNextColumn(bytes, offset, qualLength);
|
||||
} else if (filterResponse == ReturnCode.NEXT_ROW) {
|
||||
stickyNextRow = true;
|
||||
return MatchCode.SEEK_NEXT_ROW;
|
||||
}
|
||||
}
|
||||
|
||||
MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength);
|
||||
// if SKIP -> SEEK_NEXT_COL
|
||||
// if (NEXT,DONE) -> SEEK_NEXT_ROW
|
||||
// if (INCLUDE) -> INCLUDE
|
||||
|
@ -161,24 +184,35 @@ public class ScanQueryMatcher extends QueryMatcher {
|
|||
return MatchCode.SEEK_NEXT_ROW;
|
||||
}
|
||||
|
||||
// else INCLUDE
|
||||
// if (colChecker == MatchCode.INCLUDE)
|
||||
// give the filter a chance to run.
|
||||
if (filter == null)
|
||||
return MatchCode.INCLUDE;
|
||||
return MatchCode.INCLUDE;
|
||||
|
||||
ReturnCode filterResponse = filter.filterKeyValue(kv);
|
||||
if (filterResponse == ReturnCode.INCLUDE)
|
||||
return MatchCode.INCLUDE;
|
||||
}
|
||||
|
||||
if (filterResponse == ReturnCode.SKIP)
|
||||
return MatchCode.SKIP;
|
||||
else if (filterResponse == ReturnCode.NEXT_COL)
|
||||
public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
|
||||
int qualLength) {
|
||||
if (columns instanceof ExplicitColumnTracker) {
|
||||
//We only come here when we know that columns is an instance of
|
||||
//ExplicitColumnTracker so we should never have a cast exception
|
||||
((ExplicitColumnTracker)columns).doneWithColumn(bytes, offset,
|
||||
qualLength);
|
||||
if (columns.getColumnHint() == null) {
|
||||
return MatchCode.SEEK_NEXT_ROW;
|
||||
} else {
|
||||
return MatchCode.SEEK_NEXT_COL;
|
||||
}
|
||||
} else {
|
||||
return MatchCode.SEEK_NEXT_COL;
|
||||
// else if (filterResponse == ReturnCode.NEXT_ROW)
|
||||
}
|
||||
}
|
||||
|
||||
stickyNextRow = true;
|
||||
return MatchCode.SEEK_NEXT_ROW;
|
||||
public boolean moreRowsMayExistAfter(KeyValue kv) {
|
||||
if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) &&
|
||||
rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(),
|
||||
kv.getRowLength(), stopRow, 0, stopRow.length) >= 0) {
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -256,6 +256,10 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
return false;
|
||||
|
||||
case SEEK_NEXT_ROW:
|
||||
if (!matcher.moreRowsMayExistAfter(kv)) {
|
||||
outResult.addAll(results);
|
||||
return false;
|
||||
}
|
||||
heap.next();
|
||||
break;
|
||||
|
||||
|
|
|
@ -78,6 +78,180 @@ public class TestMultipleTimestamps {
|
|||
// Nothing to do.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException {
|
||||
byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
|
||||
"ColumnMiltipleTimestamps");
|
||||
byte [] FAMILY = Bytes.toBytes("event_log");
|
||||
byte [][] FAMILIES = new byte[][] { FAMILY };
|
||||
|
||||
// create table; set versions to max...
|
||||
HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
|
||||
|
||||
Integer[] putRows = new Integer[] {1, 3, 5, 7};
|
||||
Integer[] putColumns = new Integer[] { 1, 3, 5};
|
||||
Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
|
||||
|
||||
Integer[] scanRows = new Integer[] {3, 5};
|
||||
Integer[] scanColumns = new Integer[] {3};
|
||||
Long[] scanTimestamps = new Long[] {3L, 4L};
|
||||
int scanMaxVersions = 2;
|
||||
|
||||
put(ht, FAMILY, putRows, putColumns, putTimestamps);
|
||||
|
||||
flush();
|
||||
|
||||
ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
|
||||
scanTimestamps, scanMaxVersions);
|
||||
|
||||
KeyValue[] kvs;
|
||||
|
||||
kvs = scanner.next().raw();
|
||||
assertEquals(2, kvs.length);
|
||||
checkOneCell(kvs[0], FAMILY, 3, 3, 4);
|
||||
checkOneCell(kvs[1], FAMILY, 3, 3, 3);
|
||||
kvs = scanner.next().raw();
|
||||
assertEquals(2, kvs.length);
|
||||
checkOneCell(kvs[0], FAMILY, 5, 3, 4);
|
||||
checkOneCell(kvs[1], FAMILY, 5, 3, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReseeksWithMultipleColumnOneTimestamp() throws IOException {
|
||||
byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
|
||||
"ColumnMiltipleTimestamps");
|
||||
byte [] FAMILY = Bytes.toBytes("event_log");
|
||||
byte [][] FAMILIES = new byte[][] { FAMILY };
|
||||
|
||||
// create table; set versions to max...
|
||||
HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
|
||||
|
||||
Integer[] putRows = new Integer[] {1, 3, 5, 7};
|
||||
Integer[] putColumns = new Integer[] { 1, 3, 5};
|
||||
Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
|
||||
|
||||
Integer[] scanRows = new Integer[] {3, 5};
|
||||
Integer[] scanColumns = new Integer[] {3,4};
|
||||
Long[] scanTimestamps = new Long[] {3L};
|
||||
int scanMaxVersions = 2;
|
||||
|
||||
put(ht, FAMILY, putRows, putColumns, putTimestamps);
|
||||
|
||||
flush();
|
||||
|
||||
ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
|
||||
scanTimestamps, scanMaxVersions);
|
||||
|
||||
KeyValue[] kvs;
|
||||
|
||||
kvs = scanner.next().raw();
|
||||
assertEquals(1, kvs.length);
|
||||
checkOneCell(kvs[0], FAMILY, 3, 3, 3);
|
||||
kvs = scanner.next().raw();
|
||||
assertEquals(1, kvs.length);
|
||||
checkOneCell(kvs[0], FAMILY, 5, 3, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReseeksWithMultipleColumnMultipleTimestamp() throws
|
||||
IOException {
|
||||
byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
|
||||
"ColumnMiltipleTimestamps");
|
||||
byte [] FAMILY = Bytes.toBytes("event_log");
|
||||
byte [][] FAMILIES = new byte[][] { FAMILY };
|
||||
|
||||
// create table; set versions to max...
|
||||
HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
|
||||
|
||||
Integer[] putRows = new Integer[] {1, 3, 5, 7};
|
||||
Integer[] putColumns = new Integer[] { 1, 3, 5};
|
||||
Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
|
||||
|
||||
Integer[] scanRows = new Integer[] {5, 7};
|
||||
Integer[] scanColumns = new Integer[] {3, 4, 5};
|
||||
Long[] scanTimestamps = new Long[] {2l, 3L};
|
||||
int scanMaxVersions = 2;
|
||||
|
||||
put(ht, FAMILY, putRows, putColumns, putTimestamps);
|
||||
|
||||
flush();
|
||||
|
||||
ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
|
||||
scanTimestamps, scanMaxVersions);
|
||||
|
||||
KeyValue[] kvs;
|
||||
|
||||
kvs = scanner.next().raw();
|
||||
assertEquals(4, kvs.length);
|
||||
checkOneCell(kvs[0], FAMILY, 5, 3, 3);
|
||||
checkOneCell(kvs[1], FAMILY, 5, 3, 2);
|
||||
checkOneCell(kvs[2], FAMILY, 5, 5, 3);
|
||||
checkOneCell(kvs[3], FAMILY, 5, 5, 2);
|
||||
kvs = scanner.next().raw();
|
||||
assertEquals(4, kvs.length);
|
||||
checkOneCell(kvs[0], FAMILY, 7, 3, 3);
|
||||
checkOneCell(kvs[1], FAMILY, 7, 3, 2);
|
||||
checkOneCell(kvs[2], FAMILY, 7, 5, 3);
|
||||
checkOneCell(kvs[3], FAMILY, 7, 5, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReseeksWithMultipleFiles() throws IOException {
|
||||
byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
|
||||
"ColumnMiltipleTimestamps");
|
||||
byte [] FAMILY = Bytes.toBytes("event_log");
|
||||
byte [][] FAMILIES = new byte[][] { FAMILY };
|
||||
|
||||
// create table; set versions to max...
|
||||
HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
|
||||
|
||||
Integer[] putRows1 = new Integer[] {1, 2, 3};
|
||||
Integer[] putColumns1 = new Integer[] { 2, 5, 6};
|
||||
Long[] putTimestamps1 = new Long[] {1L, 2L, 5L};
|
||||
|
||||
Integer[] putRows2 = new Integer[] {6, 7};
|
||||
Integer[] putColumns2 = new Integer[] {3, 6};
|
||||
Long[] putTimestamps2 = new Long[] {4L, 5L};
|
||||
|
||||
Integer[] putRows3 = new Integer[] {2, 3, 5};
|
||||
Integer[] putColumns3 = new Integer[] {1, 2, 3};
|
||||
Long[] putTimestamps3 = new Long[] {4L,8L};
|
||||
|
||||
|
||||
Integer[] scanRows = new Integer[] {3, 5, 7};
|
||||
Integer[] scanColumns = new Integer[] {3, 4, 5};
|
||||
Long[] scanTimestamps = new Long[] {2l, 4L};
|
||||
int scanMaxVersions = 5;
|
||||
|
||||
put(ht, FAMILY, putRows1, putColumns1, putTimestamps1);
|
||||
flush();
|
||||
put(ht, FAMILY, putRows2, putColumns2, putTimestamps2);
|
||||
flush();
|
||||
put(ht, FAMILY, putRows3, putColumns3, putTimestamps3);
|
||||
|
||||
ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
|
||||
scanTimestamps, scanMaxVersions);
|
||||
|
||||
KeyValue[] kvs;
|
||||
|
||||
kvs = scanner.next().raw();
|
||||
assertEquals(2, kvs.length);
|
||||
checkOneCell(kvs[0], FAMILY, 3, 3, 4);
|
||||
checkOneCell(kvs[1], FAMILY, 3, 5, 2);
|
||||
|
||||
kvs = scanner.next().raw();
|
||||
assertEquals(1, kvs.length);
|
||||
checkOneCell(kvs[0], FAMILY, 5, 3, 4);
|
||||
|
||||
kvs = scanner.next().raw();
|
||||
assertEquals(1, kvs.length);
|
||||
checkOneCell(kvs[0], FAMILY, 6, 3, 4);
|
||||
|
||||
kvs = scanner.next().raw();
|
||||
assertEquals(1, kvs.length);
|
||||
checkOneCell(kvs[0], FAMILY, 7, 3, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithVersionDeletes() throws Exception {
|
||||
|
||||
|
@ -109,7 +283,8 @@ public class TestMultipleTimestamps {
|
|||
|
||||
// request a bunch of versions including the deleted version. We should
|
||||
// only get back entries for the versions that exist.
|
||||
KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
|
||||
KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0,
|
||||
Arrays.asList(2L, 3L, 4L, 5L));
|
||||
assertEquals(3, kvs.length);
|
||||
checkOneCell(kvs[0], FAMILY, 0, 0, 5);
|
||||
checkOneCell(kvs[1], FAMILY, 0, 0, 3);
|
||||
|
@ -240,6 +415,44 @@ public class TestMultipleTimestamps {
|
|||
return result.raw();
|
||||
}
|
||||
|
||||
private ResultScanner scan(HTable ht, byte[] cf,
|
||||
Integer[] rowIndexes, Integer[] columnIndexes,
|
||||
Long[] versions, int maxVersions)
|
||||
throws IOException {
|
||||
Arrays.asList(rowIndexes);
|
||||
byte startRow[] = Bytes.toBytes("row:" +
|
||||
Collections.min( Arrays.asList(rowIndexes)));
|
||||
byte endRow[] = Bytes.toBytes("row:" +
|
||||
Collections.max( Arrays.asList(rowIndexes))+1);
|
||||
Scan scan = new Scan(startRow, endRow);
|
||||
for (Integer colIdx: columnIndexes) {
|
||||
byte column[] = Bytes.toBytes("column:" + colIdx);
|
||||
scan.addColumn(cf, column);
|
||||
}
|
||||
scan.setMaxVersions(maxVersions);
|
||||
scan.setTimeRange(Collections.min(Arrays.asList(versions)),
|
||||
Collections.max(Arrays.asList(versions))+1);
|
||||
ResultScanner scanner = ht.getScanner(scan);
|
||||
return scanner;
|
||||
}
|
||||
|
||||
private void put(HTable ht, byte[] cf, Integer[] rowIndexes,
|
||||
Integer[] columnIndexes, Long[] versions)
|
||||
throws IOException {
|
||||
for (int rowIdx: rowIndexes) {
|
||||
byte row[] = Bytes.toBytes("row:" + rowIdx);
|
||||
Put put = new Put(row);
|
||||
for(int colIdx: columnIndexes) {
|
||||
byte column[] = Bytes.toBytes("column:" + colIdx);
|
||||
for (long version: versions) {
|
||||
put.add(cf, column, version, Bytes.toBytes("value-version-" +
|
||||
version));
|
||||
}
|
||||
}
|
||||
ht.put(put);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert in specific row/column versions with timestamps
|
||||
* versionStart..versionEnd.
|
||||
|
|
Loading…
Reference in New Issue