HBASE-13871 Change RegionScannerImpl to deal with Cell instead of byte[], int, int.
This commit is contained in:
parent
4713fc6407
commit
bf3924ed05
|
@ -18,6 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -26,7 +28,6 @@ import java.util.List;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.NavigableMap;
|
||||
|
||||
import org.apache.hadoop.hbase.CellComparator.MetaCellComparator;
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
@ -1038,4 +1039,166 @@ public final class CellUtil {
|
|||
}
|
||||
return matchingColumn(left, right);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Cell that is smaller than all other possible Cells for the given Cell's row.
|
||||
*
|
||||
* @param cell
|
||||
* @return First possible Cell on passed Cell's row.
|
||||
*/
|
||||
public static Cell createFirstOnRow(final Cell cell) {
|
||||
return new FirstOnRowFakeCell(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
private static abstract class FakeCell implements Cell {
|
||||
|
||||
@Override
|
||||
public byte[] getRowArray() {
|
||||
return EMPTY_BYTE_ARRAY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRowOffset() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getRowLength() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getFamilyArray() {
|
||||
return EMPTY_BYTE_ARRAY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getFamilyOffset() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getFamilyLength() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getQualifierArray() {
|
||||
return EMPTY_BYTE_ARRAY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQualifierOffset() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQualifierLength() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMvccVersion() {
|
||||
return getSequenceId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSequenceId() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getValueArray() {
|
||||
return EMPTY_BYTE_ARRAY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueOffset() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueLength() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getTagsArray() {
|
||||
return EMPTY_BYTE_ARRAY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTagsOffset() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTagsLength() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getValue() {
|
||||
return EMPTY_BYTE_ARRAY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getFamily() {
|
||||
return EMPTY_BYTE_ARRAY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getQualifier() {
|
||||
return EMPTY_BYTE_ARRAY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getRow() {
|
||||
return EMPTY_BYTE_ARRAY;
|
||||
}
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
private static class FirstOnRowFakeCell extends FakeCell {
|
||||
private final byte[] rowArray;
|
||||
private final int roffest;
|
||||
private final short rlength;
|
||||
|
||||
public FirstOnRowFakeCell(final byte[] row, int roffset, short rlength) {
|
||||
this.rowArray = row;
|
||||
this.roffest = roffset;
|
||||
this.rlength = rlength;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getRowArray() {
|
||||
return this.rowArray;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRowOffset() {
|
||||
return this.roffest;
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getRowLength() {
|
||||
return this.rlength;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTimestamp() {
|
||||
return HConstants.LATEST_TIMESTAMP;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getTypeByte() {
|
||||
return Type.Maximum.getCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getRow() {
|
||||
return Bytes.copy(this.rowArray, this.roffest, this.rlength);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5247,9 +5247,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
} else {
|
||||
this.stopRow = scan.getStopRow();
|
||||
}
|
||||
// If we are doing a get, we want to be [startRow,endRow] normally
|
||||
// If we are doing a get, we want to be [startRow,endRow]. Normally
|
||||
// it is [startRow,endRow) and if startRow=endRow we get nothing.
|
||||
this.isScan = scan.isGetScan() ? -1 : 0;
|
||||
this.isScan = scan.isGetScan() ? 1 : 0;
|
||||
|
||||
// synchronize on scannerReadPoints so that nobody calculates
|
||||
// getSmallestReadPoint, before scannerReadPoints is updated.
|
||||
|
@ -5384,10 +5384,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)
|
||||
throws IOException {
|
||||
assert joinedContinuationRow != null;
|
||||
boolean moreValues =
|
||||
populateResult(results, this.joinedHeap, scannerContext,
|
||||
joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
|
||||
joinedContinuationRow.getRowLength());
|
||||
boolean moreValues = populateResult(results, this.joinedHeap, scannerContext,
|
||||
joinedContinuationRow);
|
||||
|
||||
if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
|
||||
// We are done with this row, reset the continuation.
|
||||
|
@ -5404,14 +5402,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* reached, or remainingResultSize (if not -1) is reaced
|
||||
* @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
|
||||
* @param scannerContext
|
||||
* @param currentRow Byte array with key we are fetching.
|
||||
* @param offset offset for currentRow
|
||||
* @param length length for currentRow
|
||||
* @param currentRowCell
|
||||
* @return state of last call to {@link KeyValueHeap#next()}
|
||||
*/
|
||||
private boolean populateResult(List<Cell> results, KeyValueHeap heap,
|
||||
ScannerContext scannerContext, byte[] currentRow, int offset, short length)
|
||||
throws IOException {
|
||||
ScannerContext scannerContext, Cell currentRowCell) throws IOException {
|
||||
Cell nextKv;
|
||||
boolean moreCellsInRow = false;
|
||||
boolean tmpKeepProgress = scannerContext.getKeepProgress();
|
||||
|
@ -5427,7 +5422,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
scannerContext.setKeepProgress(tmpKeepProgress);
|
||||
|
||||
nextKv = heap.peek();
|
||||
moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
|
||||
moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
|
||||
if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
|
||||
if (scannerContext.checkBatchLimit(limitScope)) {
|
||||
return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
|
||||
|
@ -5452,14 +5447,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* cells to be read in the heap. If the row of the nextKv in the heap matches the current row
|
||||
* then there are more cells to be read in the row.
|
||||
* @param nextKv
|
||||
* @param currentRow
|
||||
* @param offset
|
||||
* @param length
|
||||
* @param currentRowCell
|
||||
* @return true When there are more cells in the row to be read
|
||||
*/
|
||||
private boolean moreCellsInRow(final Cell nextKv, byte[] currentRow, int offset,
|
||||
short length) {
|
||||
return nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length);
|
||||
private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {
|
||||
return nextKv != null && CellUtil.matchingRow(nextKv, currentRowCell);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -5524,16 +5516,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// Let's see what we have in the storeHeap.
|
||||
Cell current = this.storeHeap.peek();
|
||||
|
||||
byte[] currentRow = null;
|
||||
int offset = 0;
|
||||
short length = 0;
|
||||
if (current != null) {
|
||||
currentRow = current.getRowArray();
|
||||
offset = current.getRowOffset();
|
||||
length = current.getRowLength();
|
||||
}
|
||||
|
||||
boolean stopRow = isStopRow(currentRow, offset, length);
|
||||
boolean stopRow = isStopRow(current);
|
||||
// When has filter row is true it means that the all the cells for a particular row must be
|
||||
// read before a filtering decision can be made. This means that filters where hasFilterRow
|
||||
// run the risk of encountering out of memory errors in the case that they are applied to a
|
||||
|
@ -5581,7 +5564,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
// Ok, we are good, let's try to get some results from the main heap.
|
||||
populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
|
||||
populateResult(results, this.storeHeap, scannerContext, current);
|
||||
|
||||
if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
|
||||
if (hasFilterRow) {
|
||||
|
@ -5593,8 +5576,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
Cell nextKv = this.storeHeap.peek();
|
||||
stopRow = nextKv == null ||
|
||||
isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
|
||||
stopRow = nextKv == null || isStopRow(nextKv);
|
||||
// save that the row was empty before filters applied to it.
|
||||
final boolean isEmptyRow = results.isEmpty();
|
||||
|
||||
|
@ -5640,7 +5622,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// These values are not needed for filter to work, so we postpone their
|
||||
// fetch to (possibly) reduce amount of data loads from disk.
|
||||
if (this.joinedHeap != null) {
|
||||
boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length);
|
||||
boolean mayHaveData = joinedHeapMayHaveData(current);
|
||||
if (mayHaveData) {
|
||||
joinedContinuationRow = current;
|
||||
populateFromJoinedHeap(results, scannerContext);
|
||||
|
@ -5696,27 +5678,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
/**
|
||||
* @param currentRow
|
||||
* @param offset
|
||||
* @param length
|
||||
* @param currentRowCell
|
||||
* @return true when the joined heap may have data for the current row
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean joinedHeapMayHaveData(byte[] currentRow, int offset, short length)
|
||||
private boolean joinedHeapMayHaveData(Cell currentRowCell)
|
||||
throws IOException {
|
||||
Cell nextJoinedKv = joinedHeap.peek();
|
||||
boolean matchCurrentRow =
|
||||
nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRow, offset, length);
|
||||
nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRowCell);
|
||||
boolean matchAfterSeek = false;
|
||||
|
||||
// If the next value in the joined heap does not match the current row, try to seek to the
|
||||
// correct row
|
||||
if (!matchCurrentRow) {
|
||||
Cell firstOnCurrentRow = KeyValueUtil.createFirstOnRow(currentRow, offset, length);
|
||||
Cell firstOnCurrentRow = CellUtil.createFirstOnRow(currentRowCell);
|
||||
boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
|
||||
matchAfterSeek =
|
||||
seekSuccessful && joinedHeap.peek() != null
|
||||
&& CellUtil.matchingRow(joinedHeap.peek(), currentRow, offset, length);
|
||||
&& CellUtil.matchingRow(joinedHeap.peek(), currentRowCell);
|
||||
}
|
||||
|
||||
return matchCurrentRow || matchAfterSeek;
|
||||
|
@ -5755,12 +5735,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
.postScannerFilterRow(this, curRowCell);
|
||||
}
|
||||
|
||||
protected boolean isStopRow(byte[] currentRow, int offset, short length) {
|
||||
return currentRow == null ||
|
||||
(stopRow != null &&
|
||||
// TODO : currentRow can be tracked as cell rather than byte[]
|
||||
comparator.compareRows(stopRow, 0, stopRow.length,
|
||||
currentRow, offset, length) <= isScan);
|
||||
protected boolean isStopRow(Cell currentRowCell) {
|
||||
return currentRowCell == null
|
||||
|| (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length) >= isScan);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -58,10 +58,10 @@ class ReversedRegionScannerImpl extends RegionScannerImpl {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean isStopRow(byte[] currentRow, int offset, short length) {
|
||||
return currentRow == null
|
||||
|| (super.stopRow != null && comparator.compareRows(
|
||||
stopRow, 0, stopRow.length, currentRow, offset, length) >= super.isScan);
|
||||
protected boolean isStopRow(Cell currentRowCell) {
|
||||
return currentRowCell == null
|
||||
|| (super.stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0,
|
||||
stopRow.length) <= super.isScan);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue