HBASE-10929-Change ScanQueryMatcher to use Cells instead of KeyValue.(Ram)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1589018 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
ramkrishna 2014-04-22 03:58:41 +00:00
parent 57f8591001
commit 2513370c3e
3 changed files with 60 additions and 75 deletions

View File

@ -94,7 +94,7 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
* Do a backwardSeek in a reversed StoreScanner(scan backward) * Do a backwardSeek in a reversed StoreScanner(scan backward)
*/ */
@Override @Override
protected boolean seekAsDirection(KeyValue kv) throws IOException { protected boolean seekAsDirection(Cell kv) throws IOException {
return backwardSeek(kv); return backwardSeek(kv);
} }

View File

@ -24,6 +24,7 @@ import java.util.NavigableSet;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
@ -80,7 +81,7 @@ public class ScanQueryMatcher {
private final ColumnTracker columns; private final ColumnTracker columns;
/** Key to seek to in memstore and StoreFiles */ /** Key to seek to in memstore and StoreFiles */
private final KeyValue startKey; private final Cell startKey;
/** Row comparator for the region this query is for */ /** Row comparator for the region this query is for */
private final KeyValue.KVComparator rowComparator; private final KeyValue.KVComparator rowComparator;
@ -240,29 +241,17 @@ public class ScanQueryMatcher {
* - ignore the current KeyValue (MatchCode.SKIP) * - ignore the current KeyValue (MatchCode.SKIP)
* - got to the next row (MatchCode.DONE) * - got to the next row (MatchCode.DONE)
* *
* @param kv KeyValue to check * @param cell KeyValue to check
* @return The match code instance. * @return The match code instance.
* @throws IOException in case there is an internal consistency problem * @throws IOException in case there is an internal consistency problem
* caused by a data corruption. * caused by a data corruption.
*/ */
public MatchCode match(KeyValue kv) throws IOException { public MatchCode match(Cell cell) throws IOException {
if (filter != null && filter.filterAllRemaining()) { if (filter != null && filter.filterAllRemaining()) {
return MatchCode.DONE_SCAN; return MatchCode.DONE_SCAN;
} }
byte [] bytes = kv.getBuffer();
int offset = kv.getOffset();
int keyLength = Bytes.toInt(bytes, offset, Bytes.SIZEOF_INT);
offset += KeyValue.ROW_OFFSET;
int initialOffset = offset;
short rowLength = Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT);
offset += Bytes.SIZEOF_SHORT;
int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength, int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength,
bytes, offset, rowLength); cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
if (!this.isReversed) { if (!this.isReversed) {
if (ret <= -1) { if (ret <= -1) {
return MatchCode.DONE; return MatchCode.DONE;
@ -280,30 +269,19 @@ public class ScanQueryMatcher {
} }
} }
// optimize case. // optimize case.
if (this.stickyNextRow) if (this.stickyNextRow)
return MatchCode.SEEK_NEXT_ROW; return MatchCode.SEEK_NEXT_ROW;
if (this.columns.done()) { if (this.columns.done()) {
stickyNextRow = true; stickyNextRow = true;
return MatchCode.SEEK_NEXT_ROW; return MatchCode.SEEK_NEXT_ROW;
} }
//Passing rowLength
offset += rowLength;
//Skipping family
byte familyLength = bytes [offset];
offset += familyLength + 1;
int qualLength = keyLength -
(offset - initialOffset) - KeyValue.TIMESTAMP_TYPE_SIZE;
long timestamp = Bytes.toLong(bytes, initialOffset + keyLength - KeyValue.TIMESTAMP_TYPE_SIZE);
// check for early out based on timestamp alone // check for early out based on timestamp alone
if (columns.isDone(timestamp)) { if (columns.isDone(cell.getTimestamp())) {
return columns.getNextRowOrNextColumn(bytes, offset, qualLength); return columns.getNextRowOrNextColumn(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
} }
/* /*
@ -319,8 +297,7 @@ public class ScanQueryMatcher {
* 7. Delete marker need to be version counted together with puts * 7. Delete marker need to be version counted together with puts
* they affect * they affect
*/ */
byte type = bytes[initialOffset + keyLength - 1]; if (CellUtil.isDelete(cell)) {
if (kv.isDelete()) {
if (!keepDeletedCells) { if (!keepDeletedCells) {
// first ignore delete markers if the scanner can do so, and the // first ignore delete markers if the scanner can do so, and the
// range does not include the marker // range does not include the marker
@ -329,20 +306,22 @@ public class ScanQueryMatcher {
// than the readpoint of any open scanner, this prevents deleted // than the readpoint of any open scanner, this prevents deleted
// rows that could still be seen by a scanner from being collected // rows that could still be seen by a scanner from being collected
boolean includeDeleteMarker = seePastDeleteMarkers ? boolean includeDeleteMarker = seePastDeleteMarkers ?
tr.withinTimeRange(timestamp) : tr.withinTimeRange(cell.getTimestamp()) :
tr.withinOrAfterTimeRange(timestamp); tr.withinOrAfterTimeRange(cell.getTimestamp());
if (includeDeleteMarker if (includeDeleteMarker
&& kv.getMvccVersion() <= maxReadPointToTrackVersions) { && cell.getMvccVersion() <= maxReadPointToTrackVersions) {
this.deletes.add(bytes, offset, qualLength, timestamp, type); this.deletes.add(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength(), cell.getTimestamp(), cell.getTypeByte());
} }
// Can't early out now, because DelFam come before any other keys // Can't early out now, because DelFam come before any other keys
} }
if ((!isUserScan) if ((!isUserScan)
&& timeToPurgeDeletes > 0 && timeToPurgeDeletes > 0
&& (EnvironmentEdgeManager.currentTimeMillis() - timestamp) <= timeToPurgeDeletes) { && (EnvironmentEdgeManager.currentTimeMillis() - cell.getTimestamp())
<= timeToPurgeDeletes) {
return MatchCode.INCLUDE; return MatchCode.INCLUDE;
} else if (retainDeletesInOutput || kv.getMvccVersion() > maxReadPointToTrackVersions) { } else if (retainDeletesInOutput || cell.getMvccVersion() > maxReadPointToTrackVersions) {
// always include or it is not time yet to check whether it is OK // always include or it is not time yet to check whether it is OK
// to purge deltes or not // to purge deltes or not
if (!isUserScan) { if (!isUserScan) {
@ -351,10 +330,11 @@ public class ScanQueryMatcher {
return MatchCode.INCLUDE; return MatchCode.INCLUDE;
} }
} else if (keepDeletedCells) { } else if (keepDeletedCells) {
if (timestamp < earliestPutTs) { if (cell.getTimestamp() < earliestPutTs) {
// keeping delete rows, but there are no puts older than // keeping delete rows, but there are no puts older than
// this delete in the store files. // this delete in the store files.
return columns.getNextRowOrNextColumn(bytes, offset, qualLength); return columns.getNextRowOrNextColumn(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
} }
// else: fall through and do version counting on the // else: fall through and do version counting on the
// delete markers // delete markers
@ -364,12 +344,13 @@ public class ScanQueryMatcher {
// note the following next else if... // note the following next else if...
// delete marker are not subject to other delete markers // delete marker are not subject to other delete markers
} else if (!this.deletes.isEmpty()) { } else if (!this.deletes.isEmpty()) {
DeleteResult deleteResult = deletes.isDeleted(bytes, offset, qualLength, DeleteResult deleteResult = deletes.isDeleted(cell.getQualifierArray(),
timestamp); cell.getQualifierOffset(), cell.getQualifierLength(), cell.getTimestamp());
switch (deleteResult) { switch (deleteResult) {
case FAMILY_DELETED: case FAMILY_DELETED:
case COLUMN_DELETED: case COLUMN_DELETED:
return columns.getNextRowOrNextColumn(bytes, offset, qualLength); return columns.getNextRowOrNextColumn(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
case VERSION_DELETED: case VERSION_DELETED:
case FAMILY_VERSION_DELETED: case FAMILY_VERSION_DELETED:
return MatchCode.SKIP; return MatchCode.SKIP;
@ -380,26 +361,29 @@ public class ScanQueryMatcher {
} }
} }
int timestampComparison = tr.compare(timestamp); int timestampComparison = tr.compare(cell.getTimestamp());
if (timestampComparison >= 1) { if (timestampComparison >= 1) {
return MatchCode.SKIP; return MatchCode.SKIP;
} else if (timestampComparison <= -1) { } else if (timestampComparison <= -1) {
return columns.getNextRowOrNextColumn(bytes, offset, qualLength); return columns.getNextRowOrNextColumn(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
} }
// STEP 1: Check if the column is part of the requested columns // STEP 1: Check if the column is part of the requested columns
MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, type); MatchCode colChecker = columns.checkColumn(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength(), cell.getTypeByte());
if (colChecker == MatchCode.INCLUDE) { if (colChecker == MatchCode.INCLUDE) {
ReturnCode filterResponse = ReturnCode.SKIP; ReturnCode filterResponse = ReturnCode.SKIP;
// STEP 2: Yes, the column is part of the requested columns. Check if filter is present // STEP 2: Yes, the column is part of the requested columns. Check if filter is present
if (filter != null) { if (filter != null) {
// STEP 3: Filter the key value and return if it filters out // STEP 3: Filter the key value and return if it filters out
filterResponse = filter.filterKeyValue(kv); filterResponse = filter.filterKeyValue(cell);
switch (filterResponse) { switch (filterResponse) {
case SKIP: case SKIP:
return MatchCode.SKIP; return MatchCode.SKIP;
case NEXT_COL: case NEXT_COL:
return columns.getNextRowOrNextColumn(bytes, offset, qualLength); return columns.getNextRowOrNextColumn(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
case NEXT_ROW: case NEXT_ROW:
stickyNextRow = true; stickyNextRow = true;
return MatchCode.SEEK_NEXT_ROW; return MatchCode.SEEK_NEXT_ROW;
@ -430,8 +414,9 @@ public class ScanQueryMatcher {
* FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE) * FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)
*/ */
colChecker = colChecker =
columns.checkVersions(bytes, offset, qualLength, timestamp, type, columns.checkVersions(cell.getQualifierArray(), cell.getQualifierOffset(),
kv.getMvccVersion() > maxReadPointToTrackVersions); cell.getQualifierLength(), cell.getTimestamp(), cell.getTypeByte(),
cell.getMvccVersion() > maxReadPointToTrackVersions);
//Optimize with stickyNextRow //Optimize with stickyNextRow
stickyNextRow = colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW ? true : stickyNextRow; stickyNextRow = colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW ? true : stickyNextRow;
return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL && return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL &&
@ -470,7 +455,7 @@ public class ScanQueryMatcher {
} }
} }
public boolean moreRowsMayExistAfter(KeyValue kv) { public boolean moreRowsMayExistAfter(Cell kv) {
if (this.isReversed) { if (this.isReversed) {
if (rowComparator.compareRows(kv.getRowArray(), kv.getRowOffset(), if (rowComparator.compareRows(kv.getRowArray(), kv.getRowOffset(),
kv.getRowLength(), stopRow, 0, stopRow.length) <= 0) { kv.getRowLength(), stopRow, 0, stopRow.length) <= 0) {
@ -513,7 +498,7 @@ public class ScanQueryMatcher {
* *
* @return the start key * @return the start key
*/ */
public KeyValue getStartKey() { public Cell getStartKey() {
return this.startKey; return this.startKey;
} }
@ -533,7 +518,7 @@ public class ScanQueryMatcher {
} }
} }
public KeyValue getKeyForNextColumn(KeyValue kv) { public Cell getKeyForNextColumn(Cell kv) {
ColumnCount nextColumn = columns.getColumnHint(); ColumnCount nextColumn = columns.getColumnHint();
if (nextColumn == null) { if (nextColumn == null) {
return KeyValueUtil.createLastOnRow( return KeyValueUtil.createLastOnRow(
@ -548,7 +533,7 @@ public class ScanQueryMatcher {
} }
} }
public KeyValue getKeyForNextRow(KeyValue kv) { public Cell getKeyForNextRow(Cell kv) {
return KeyValueUtil.createLastOnRow( return KeyValueUtil.createLastOnRow(
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
null, 0, 0, null, 0, 0,

View File

@ -82,7 +82,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* KVs skipped via seeking to next row/column. TODO: estimate them? * KVs skipped via seeking to next row/column. TODO: estimate them?
*/ */
private long kvsScanned = 0; private long kvsScanned = 0;
private Cell prevKV = null; private Cell prevCell = null;
/** We don't ever expect to change this, the constant is just for clarity. */ /** We don't ever expect to change this, the constant is just for clarity. */
static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
@ -454,19 +454,19 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
matcher.setRow(row, offset, length); matcher.setRow(row, offset, length);
} }
Cell kv; Cell cell;
// Only do a sanity-check if store and comparator are available. // Only do a sanity-check if store and comparator are available.
KeyValue.KVComparator comparator = KeyValue.KVComparator comparator =
store != null ? store.getComparator() : null; store != null ? store.getComparator() : null;
int count = 0; int count = 0;
LOOP: while((kv = this.heap.peek()) != null) { LOOP: while((cell = this.heap.peek()) != null) {
if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap. if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
checkScanOrder(prevKV, kv, comparator); checkScanOrder(prevCell, cell, comparator);
prevKV = kv; prevCell = cell;
ScanQueryMatcher.MatchCode qcode = matcher.match(KeyValueUtil.ensureKeyValue(kv)); ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
switch(qcode) { switch(qcode) {
case INCLUDE: case INCLUDE:
case INCLUDE_AND_SEEK_NEXT_ROW: case INCLUDE_AND_SEEK_NEXT_ROW:
@ -475,34 +475,34 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
Filter f = matcher.getFilter(); Filter f = matcher.getFilter();
if (f != null) { if (f != null) {
// TODO convert Scan Query Matcher to be Cell instead of KV based ? // TODO convert Scan Query Matcher to be Cell instead of KV based ?
kv = KeyValueUtil.ensureKeyValue(f.transformCell(kv)); cell = f.transformCell(cell);
} }
this.countPerRow++; this.countPerRow++;
if (storeLimit > -1 && if (storeLimit > -1 &&
this.countPerRow > (storeLimit + storeOffset)) { this.countPerRow > (storeLimit + storeOffset)) {
// do what SEEK_NEXT_ROW does. // do what SEEK_NEXT_ROW does.
if (!matcher.moreRowsMayExistAfter(KeyValueUtil.ensureKeyValue(kv))) { if (!matcher.moreRowsMayExistAfter(cell)) {
return false; return false;
} }
seekToNextRow(kv); seekToNextRow(cell);
break LOOP; break LOOP;
} }
// add to results only if we have skipped #storeOffset kvs // add to results only if we have skipped #storeOffset kvs
// also update metric accordingly // also update metric accordingly
if (this.countPerRow > storeOffset) { if (this.countPerRow > storeOffset) {
outResult.add(kv); outResult.add(cell);
count++; count++;
} }
if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
if (!matcher.moreRowsMayExistAfter(KeyValueUtil.ensureKeyValue(kv))) { if (!matcher.moreRowsMayExistAfter(cell)) {
return false; return false;
} }
seekToNextRow(kv); seekToNextRow(cell);
} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
seekAsDirection(matcher.getKeyForNextColumn(KeyValueUtil.ensureKeyValue(kv))); seekAsDirection(matcher.getKeyForNextColumn(cell));
} else { } else {
this.heap.next(); this.heap.next();
} }
@ -522,15 +522,15 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
case SEEK_NEXT_ROW: case SEEK_NEXT_ROW:
// This is just a relatively simple end of scan fix, to short-cut end // This is just a relatively simple end of scan fix, to short-cut end
// us if there is an endKey in the scan. // us if there is an endKey in the scan.
if (!matcher.moreRowsMayExistAfter(KeyValueUtil.ensureKeyValue(kv))) { if (!matcher.moreRowsMayExistAfter(cell)) {
return false; return false;
} }
seekToNextRow(kv); seekToNextRow(cell);
break; break;
case SEEK_NEXT_COL: case SEEK_NEXT_COL:
seekAsDirection(matcher.getKeyForNextColumn(KeyValueUtil.ensureKeyValue(kv))); seekAsDirection(matcher.getKeyForNextColumn(cell));
break; break;
case SKIP: case SKIP:
@ -539,7 +539,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
case SEEK_NEXT_USING_HINT: case SEEK_NEXT_USING_HINT:
// TODO convert resee to Cell? // TODO convert resee to Cell?
KeyValue nextKV = KeyValueUtil.ensureKeyValue(matcher.getNextKeyHint(kv)); Cell nextKV = matcher.getNextKeyHint(cell);
if (nextKV != null) { if (nextKV != null) {
seekAsDirection(nextKV); seekAsDirection(nextKV);
} else { } else {
@ -678,7 +678,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* @return true if scanner has values left, false if end of scanner * @return true if scanner has values left, false if end of scanner
* @throws IOException * @throws IOException
*/ */
protected boolean seekAsDirection(KeyValue kv) protected boolean seekAsDirection(Cell kv)
throws IOException { throws IOException {
return reseek(kv); return reseek(kv);
} }