HBASE-13679 Change ColumnTracker and SQM to deal with Cell instead of byte[], int, int.

This commit is contained in:
anoopsjohn 2015-05-21 10:24:08 +05:30
parent eddabdd353
commit 5e06ede3f7
6 changed files with 82 additions and 107 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
@ -52,40 +53,38 @@ public interface ColumnTracker {
/**
* Checks if the column is present in the list of requested columns by returning the match code
* instance. It does not check against the number of versions for the columns asked for. To do the
* version check, one has to call {@link #checkVersions(byte[], int, int, long, byte, boolean)}
* version check, one has to call {@link #checkVersions(Cell, long, byte, boolean)}
* method based on the return type (INCLUDE) of this method. The values that can be returned by
* this method are {@link MatchCode#INCLUDE}, {@link MatchCode#SEEK_NEXT_COL} and
* {@link MatchCode#SEEK_NEXT_ROW}.
* @param bytes
* @param offset
* @param length
* @param cell
* @param type The type of the KeyValue
* @return The match code instance.
* @throws IOException in case there is an internal consistency problem caused by a data
* corruption.
*/
ScanQueryMatcher.MatchCode checkColumn(byte[] bytes, int offset, int length, byte type)
throws IOException;
ScanQueryMatcher.MatchCode checkColumn(Cell cell, byte type) throws IOException;
/**
* Keeps track of the number of versions for the columns asked for. It assumes that the user has
* already checked if the keyvalue needs to be included by calling the
* {@link #checkColumn(byte[], int, int, byte)} method. The enum values returned by this method
* {@link #checkColumn(Cell, byte)} method. The enum values returned by this method
* are {@link MatchCode#SKIP}, {@link MatchCode#INCLUDE},
* {@link MatchCode#INCLUDE_AND_SEEK_NEXT_COL} and {@link MatchCode#INCLUDE_AND_SEEK_NEXT_ROW}.
* Implementations which include all the columns could just return {@link MatchCode#INCLUDE} in
* the {@link #checkColumn(byte[], int, int, byte)} method and perform all the operations in this
* the {@link #checkColumn(Cell, byte)} method and perform all the operations in this
* checkVersions method.
* @param type the type of the key value (Put/Delete)
* @param cell
* @param ttl The timeToLive to enforce.
* @param type the type of the key value (Put/Delete)
* @param ignoreCount indicates if the KV needs to be excluded while counting (used during
* compactions. We only count KV's that are older than all the scanners' read points.)
* @return the scan query matcher match code instance
* @throws IOException in case there is an internal consistency problem caused by a data
* corruption.
*/
ScanQueryMatcher.MatchCode checkVersions(byte[] bytes, int offset, int length, long ttl,
byte type, boolean ignoreCount) throws IOException;
ScanQueryMatcher.MatchCode checkVersions(Cell cell, long ttl, byte type, boolean ignoreCount)
throws IOException;
/**
* Resets the Matcher
*/
@ -112,10 +111,9 @@ public interface ColumnTracker {
/**
* Retrieve the MatchCode for the next row or column
* @param cell
*/
MatchCode getNextRowOrNextColumn(
byte[] bytes, int offset, int qualLength
);
MatchCode getNextRowOrNextColumn(Cell cell);
/**
* Give the tracker a chance to declare it's done based on only the timestamp

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
@ -105,8 +106,7 @@ public class ExplicitColumnTracker implements ColumnTracker {
* {@inheritDoc}
*/
@Override
public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
int length, byte type) {
public ScanQueryMatcher.MatchCode checkColumn(Cell cell, byte type) {
// delete markers should never be passed to an
// *Explicit*ColumnTracker
assert !CellUtil.isDelete(type);
@ -122,8 +122,9 @@ public class ExplicitColumnTracker implements ColumnTracker {
}
// Compare specific column to current column
int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(),
column.getLength(), bytes, offset, length);
// TODO when cell is offheap backed, we won't use getQualifierArray()
int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(), column.getLength(),
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
// Column Matches. Return include code. The caller would call checkVersions
// to limit the number of versions.
@ -156,7 +157,7 @@ public class ExplicitColumnTracker implements ColumnTracker {
}
@Override
public ScanQueryMatcher.MatchCode checkVersions(byte[] bytes, int offset, int length,
public ScanQueryMatcher.MatchCode checkVersions(Cell cell,
long timestamp, byte type, boolean ignoreCount) throws IOException {
assert !CellUtil.isDelete(type);
if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
@ -215,14 +216,12 @@ public class ExplicitColumnTracker implements ColumnTracker {
* this column. We may get this information from external filters or
* timestamp range and we then need to indicate this information to
* tracker. It is required only in case of ExplicitColumnTracker.
* @param bytes
* @param offset
* @param length
* @param cell
*/
public void doneWithColumn(byte [] bytes, int offset, int length) {
public void doneWithColumn(Cell cell) {
while (this.column != null) {
int compare = Bytes.compareTo(column.getBuffer(), column.getOffset(),
column.getLength(), bytes, offset, length);
int compare = Bytes.compareTo(column.getBuffer(), column.getOffset(), column.getLength(),
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
resetTS();
if (compare <= 0) {
++this.index;
@ -239,9 +238,9 @@ public class ExplicitColumnTracker implements ColumnTracker {
}
}
public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
int qualLength) {
doneWithColumn(bytes, offset,qualLength);
@Override
public MatchCode getNextRowOrNextColumn(Cell cell) {
doneWithColumn(cell);
if (getColumnHint() == null) {
return MatchCode.SEEK_NEXT_ROW;

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
@ -91,9 +92,7 @@ public class ScanQueryMatcher {
/* row is not private for tests */
/** Row the query is on */
byte [] row;
int rowOffset;
short rowLength;
Cell curCell;
/**
* Oldest put in any of the involved store files
@ -279,7 +278,7 @@ public class ScanQueryMatcher {
if (filter != null && filter.filterAllRemaining()) {
return MatchCode.DONE_SCAN;
}
int ret = -(this.rowComparator.compareRows(cell, row, this.rowOffset, this.rowLength));
int ret = this.rowComparator.compareRows(curCell, cell);
if (!this.isReversed) {
if (ret <= -1) {
return MatchCode.DONE;
@ -306,14 +305,10 @@ public class ScanQueryMatcher {
return MatchCode.SEEK_NEXT_ROW;
}
int qualifierOffset = cell.getQualifierOffset();
int qualifierLength = cell.getQualifierLength();
long timestamp = cell.getTimestamp();
// check for early out based on timestamp alone
if (columns.isDone(timestamp)) {
return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset,
qualifierLength);
return columns.getNextRowOrNextColumn(cell);
}
// check if the cell is expired by cell TTL
if (HStore.isCellTTLExpired(cell, this.oldestUnexpiredTS, this.now)) {
@ -372,8 +367,7 @@ public class ScanQueryMatcher {
if (timestamp < earliestPutTs) {
// keeping delete rows, but there are no puts older than
// this delete in the store files.
return columns.getNextRowOrNextColumn(cell.getQualifierArray(),
qualifierOffset, qualifierLength);
return columns.getNextRowOrNextColumn(cell);
}
// else: fall through and do version counting on the
// delete markers
@ -387,8 +381,7 @@ public class ScanQueryMatcher {
switch (deleteResult) {
case FAMILY_DELETED:
case COLUMN_DELETED:
return columns.getNextRowOrNextColumn(cell.getQualifierArray(),
qualifierOffset, qualifierLength);
return columns.getNextRowOrNextColumn(cell);
case VERSION_DELETED:
case FAMILY_VERSION_DELETED:
return MatchCode.SKIP;
@ -403,13 +396,11 @@ public class ScanQueryMatcher {
if (timestampComparison >= 1) {
return MatchCode.SKIP;
} else if (timestampComparison <= -1) {
return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset,
qualifierLength);
return columns.getNextRowOrNextColumn(cell);
}
// STEP 1: Check if the column is part of the requested columns
MatchCode colChecker = columns.checkColumn(cell.getQualifierArray(),
qualifierOffset, qualifierLength, typeByte);
MatchCode colChecker = columns.checkColumn(cell, typeByte);
if (colChecker == MatchCode.INCLUDE) {
ReturnCode filterResponse = ReturnCode.SKIP;
// STEP 2: Yes, the column is part of the requested columns. Check if filter is present
@ -420,8 +411,7 @@ public class ScanQueryMatcher {
case SKIP:
return MatchCode.SKIP;
case NEXT_COL:
return columns.getNextRowOrNextColumn(cell.getQualifierArray(),
qualifierOffset, qualifierLength);
return columns.getNextRowOrNextColumn(cell);
case NEXT_ROW:
stickyNextRow = true;
return MatchCode.SEEK_NEXT_ROW;
@ -451,10 +441,8 @@ public class ScanQueryMatcher {
* In all the above scenarios, we return the column checker return value except for
* FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)
*/
colChecker =
columns.checkVersions(cell.getQualifierArray(), qualifierOffset,
qualifierLength, timestamp, typeByte,
mvccVersion > maxReadPointToTrackVersions);
colChecker = columns.checkVersions(cell, timestamp, typeByte,
mvccVersion > maxReadPointToTrackVersions);
//Optimize with stickyNextRow
stickyNextRow = colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW ? true : stickyNextRow;
return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL &&
@ -469,15 +457,15 @@ public class ScanQueryMatcher {
/** Handle partial-drop-deletes. As we match keys in order, when we have a range from which
* we can drop deletes, we can set retainDeletesInOutput to false for the duration of this
* range only, and maintain consistency. */
private void checkPartialDropDeleteRange(byte [] row, int offset, short length) {
private void checkPartialDropDeleteRange(Cell curCell) {
// If partial-drop-deletes are used, initially, dropDeletesFromRow and dropDeletesToRow
// are both set, and the matcher is set to retain deletes. We assume ordered keys. When
// dropDeletesFromRow is leq current kv, we start dropping deletes and reset
// dropDeletesFromRow; thus the 2nd "if" starts to apply.
if ((dropDeletesFromRow != null)
&& (Arrays.equals(dropDeletesFromRow, HConstants.EMPTY_START_ROW)
|| (Bytes.compareTo(row, offset, length,
dropDeletesFromRow, 0, dropDeletesFromRow.length) >= 0))) {
&& (Arrays.equals(dropDeletesFromRow, HConstants.EMPTY_START_ROW) ||
(CellComparator.COMPARATOR.compareRows(curCell, dropDeletesFromRow, 0,
dropDeletesFromRow.length) >= 0))) {
retainDeletesInOutput = false;
dropDeletesFromRow = null;
}
@ -485,9 +473,10 @@ public class ScanQueryMatcher {
// drop-deletes range. When dropDeletesToRow is leq current kv, we stop dropping deletes,
// and reset dropDeletesToRow so that we don't do any more compares.
if ((dropDeletesFromRow == null)
&& (dropDeletesToRow != null) && !Arrays.equals(dropDeletesToRow, HConstants.EMPTY_END_ROW)
&& (Bytes.compareTo(row, offset, length,
dropDeletesToRow, 0, dropDeletesToRow.length) >= 0)) {
&& (dropDeletesToRow != null)
&& !Arrays.equals(dropDeletesToRow, HConstants.EMPTY_END_ROW)
&& (CellComparator.COMPARATOR
.compareRows(curCell, dropDeletesToRow, 0, dropDeletesToRow.length) >= 0)) {
retainDeletesInOutput = true;
dropDeletesToRow = null;
}
@ -512,14 +501,12 @@ public class ScanQueryMatcher {
}
/**
* Set current row
* @param row
* Set the row when there is change in row
* @param curCell
*/
public void setRow(byte [] row, int offset, short length) {
checkPartialDropDeleteRange(row, offset, length);
this.row = row;
this.rowOffset = offset;
this.rowLength = length;
public void setToNewRow(Cell curCell) {
checkPartialDropDeleteRange(curCell);
this.curCell = curCell;
reset();
}
@ -607,9 +594,11 @@ public class ScanQueryMatcher {
//Used only for testing purposes
static MatchCode checkColumn(ColumnTracker columnTracker, byte[] bytes, int offset,
int length, long ttl, byte type, boolean ignoreCount) throws IOException {
MatchCode matchCode = columnTracker.checkColumn(bytes, offset, length, type);
KeyValue kv = KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY, 0, 0,
HConstants.EMPTY_BYTE_ARRAY, 0, 0, bytes, offset, length);
MatchCode matchCode = columnTracker.checkColumn(kv, type);
if (matchCode == MatchCode.INCLUDE) {
return columnTracker.checkVersions(bytes, offset, length, ttl, type, ignoreCount);
return columnTracker.checkVersions(kv, ttl, type, ignoreCount);
}
return matchCode;
}

View File

@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
@ -32,9 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
*/
@InterfaceAudience.Private
public class ScanWildcardColumnTracker implements ColumnTracker {
private byte [] columnBuffer = null;
private int columnOffset = 0;
private int columnLength = 0;
private Cell columnCell = null;
private int currentCount = 0;
private int maxVersions;
private int minVersions;
@ -64,8 +64,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
* This receives puts *and* deletes.
*/
@Override
public MatchCode checkColumn(byte[] bytes, int offset, int length, byte type)
throws IOException {
public MatchCode checkColumn(Cell cell, byte type) throws IOException {
return MatchCode.INCLUDE;
}
@ -75,18 +74,17 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
* take the version of the previous put (so eventually all but the last can be reclaimed).
*/
@Override
public ScanQueryMatcher.MatchCode checkVersions(byte[] bytes, int offset, int length,
public ScanQueryMatcher.MatchCode checkVersions(Cell cell,
long timestamp, byte type, boolean ignoreCount) throws IOException {
if (columnBuffer == null) {
if (columnCell == null) {
// first iteration.
resetBuffer(bytes, offset, length);
resetCell(cell);
if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
// do not count a delete marker as another version
return checkVersion(type, timestamp);
}
int cmp = Bytes.compareTo(bytes, offset, length,
columnBuffer, columnOffset, columnLength);
int cmp = CellComparator.compareQualifiers(cell, this.columnCell);
if (cmp == 0) {
if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
@ -102,7 +100,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
// new col > old col
if (cmp > 0) {
// switched columns, lets do something.x
resetBuffer(bytes, offset, length);
resetCell(cell);
if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
return checkVersion(type, timestamp);
}
@ -114,13 +112,11 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
throw new IOException(
"ScanWildcardColumnTracker.checkColumn ran into a column actually " +
"smaller than the previous column: " +
Bytes.toStringBinary(bytes, offset, length));
Bytes.toStringBinary(CellUtil.cloneQualifier(cell)));
}
private void resetBuffer(byte[] bytes, int offset, int length) {
columnBuffer = bytes;
columnOffset = offset;
columnLength = length;
private void resetCell(Cell columnCell) {
this.columnCell = columnCell;
currentCount = 0;
}
@ -152,7 +148,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
@Override
public void reset() {
columnBuffer = null;
columnCell = null;
resetTSAndType();
}
@ -194,8 +190,8 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
return false;
}
public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
int qualLength) {
@Override
public MatchCode getNextRowOrNextColumn(Cell cell) {
return MatchCode.SEEK_NEXT_COL;
}

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
@ -504,17 +503,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// only call setRow if the row changes; avoids confusing the query matcher
// if scanning intra-row
byte[] row = peeked.getRowArray();
int offset = peeked.getRowOffset();
short length = peeked.getRowLength();
// If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
// rows. Else it is possible we are still traversing the same row so we must perform the row
// comparison.
if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.row == null ||
!Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null ||
!CellUtil.matchingRow(peeked, matcher.curCell)) {
this.countPerRow = 0;
matcher.setRow(row, offset, length);
matcher.setToNewRow(peeked);
}
// Clear progress away unless invoker has indicated it should be kept.
@ -758,18 +754,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Reset the state of the Query Matcher and set to top row.
// Only reset and call setRow if the row changes; avoids confusing the
// query matcher if scanning intra-row.
Cell kv = heap.peek();
if (kv == null) {
kv = lastTopKey;
Cell cell = heap.peek();
if (cell == null) {
cell = lastTopKey;
}
byte[] row = kv.getRowArray();
int offset = kv.getRowOffset();
short length = kv.getRowLength();
if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
matcher.rowOffset, matcher.rowLength)) {
if ((matcher.curCell == null) || !CellUtil.matchingRows(cell, matcher.curCell)) {
this.countPerRow = 0;
matcher.reset();
matcher.setRow(row, offset, length);
matcher.setToNewRow(cell);
}
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -113,7 +114,7 @@ public class TestQueryMatcher extends HBaseTestCase {
List<ScanQueryMatcher.MatchCode> actual = new ArrayList<ScanQueryMatcher.MatchCode>();
KeyValue k = memstore.get(0);
qm.setRow(k.getRowArray(), k.getRowOffset(), k.getRowLength());
qm.setToNewRow(k);
for (KeyValue kv : memstore){
actual.add(qm.match(kv));
@ -178,7 +179,7 @@ public class TestQueryMatcher extends HBaseTestCase {
List<ScanQueryMatcher.MatchCode> actual = new ArrayList<ScanQueryMatcher.MatchCode>();
KeyValue k = memstore.get(0);
qm.setRow(k.getRowArray(), k.getRowOffset(), k.getRowLength());
qm.setToNewRow(k);
for(KeyValue kv : memstore) {
actual.add(qm.match(kv));
@ -232,7 +233,7 @@ public class TestQueryMatcher extends HBaseTestCase {
};
KeyValue k = kvs[0];
qm.setRow(k.getRowArray(), k.getRowOffset(), k.getRowLength());
qm.setToNewRow(k);
List<MatchCode> actual = new ArrayList<MatchCode>(kvs.length);
for (KeyValue kv : kvs) {
@ -286,7 +287,7 @@ public class TestQueryMatcher extends HBaseTestCase {
new KeyValue(row2, fam1, col1, now-10, data)
};
KeyValue k = kvs[0];
qm.setRow(k.getRowArray(), k.getRowOffset(), k.getRowLength());
qm.setToNewRow(k);
List<ScanQueryMatcher.MatchCode> actual =
new ArrayList<ScanQueryMatcher.MatchCode>(kvs.length);
@ -340,7 +341,7 @@ public class TestQueryMatcher extends HBaseTestCase {
byte[] prevRow = null;
for (byte[] row : rows) {
if (prevRow == null || !Bytes.equals(prevRow, row)) {
qm.setRow(row, 0, (short)row.length);
qm.setToNewRow(KeyValueUtil.createFirstOnRow(row));
prevRow = row;
}
actual.add(qm.match(new KeyValue(row, fam2, null, now, Type.Delete)));