Make TestHRegion pass by porting the RegionScanner implementation from 0.20
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@944534 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ef4d353ab5
commit
1e39459111
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
|
@ -54,27 +54,25 @@ import org.apache.hadoop.hbase.util.Writables;
|
|||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.AbstractList;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.AbstractList;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
/**
|
||||
* HRegion stores data for a certain region of a table. It stores all columns
|
||||
|
@ -1937,7 +1935,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
//DebugPrint.println("HRegionScanner.<init>");
|
||||
this.filter = scan.getFilter();
|
||||
this.batch = scan.getBatch();
|
||||
|
||||
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
|
||||
this.stopRow = null;
|
||||
} else {
|
||||
|
@ -1969,7 +1966,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator);
|
||||
}
|
||||
|
||||
|
||||
private void resetFilters() {
|
||||
if (filter != null) {
|
||||
filter.reset();
|
||||
|
@ -2025,70 +2021,64 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
return this.filter != null && this.filter.filterAllRemaining();
|
||||
}
|
||||
|
||||
/*
|
||||
* @return true if there are more rows, false if scanner is done
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean nextInternal(int limit) throws IOException {
|
||||
byte [] currentRow = null;
|
||||
boolean filterCurrentRow = false;
|
||||
while (true) {
|
||||
KeyValue kv = this.storeHeap.peek();
|
||||
if (kv == null) return false;
|
||||
byte [] row = kv.getRow();
|
||||
boolean samerow = Bytes.equals(currentRow, row);
|
||||
if (samerow && filterCurrentRow) {
|
||||
// Filter all columns until row changes
|
||||
readAndDumpCurrentResult();
|
||||
continue;
|
||||
}
|
||||
if (!samerow) {
|
||||
// Continue on the next row:
|
||||
currentRow = row;
|
||||
filterCurrentRow = false;
|
||||
// See if we passed stopRow
|
||||
if (this.stopRow != null &&
|
||||
comparator.compareRows(this.stopRow, 0, this.stopRow.length,
|
||||
currentRow, 0, currentRow.length) <= this.isScan) {
|
||||
return false;
|
||||
byte [] currentRow = peekRow();
|
||||
if (isStopRow(currentRow)) {
|
||||
return false;
|
||||
} else if (filterRowKey(currentRow)) {
|
||||
nextRow(currentRow);
|
||||
} else {
|
||||
byte [] nextRow;
|
||||
do {
|
||||
this.storeHeap.next(results, limit);
|
||||
if (limit > 0 && results.size() == limit) {
|
||||
return true;
|
||||
}
|
||||
} while (Bytes.equals(currentRow, nextRow = peekRow()));
|
||||
|
||||
final boolean stopRow = isStopRow(nextRow);
|
||||
if (!stopRow && (results.isEmpty() || filterRow())) {
|
||||
// this seems like a redundant step - we already consumed the row
|
||||
// there're no left overs.
|
||||
// the reasons for calling this method are:
|
||||
// 1. reset the filters.
|
||||
// 2. provide a hook to fast forward the row (used by subclasses)
|
||||
nextRow(currentRow);
|
||||
continue;
|
||||
}
|
||||
if (hasResults()) return true;
|
||||
}
|
||||
// See if current row should be filtered based on row key
|
||||
if (this.filter != null && this.filter.filterRowKey(row, 0, row.length)) {
|
||||
readAndDumpCurrentResult();
|
||||
resetFilters();
|
||||
filterCurrentRow = true;
|
||||
currentRow = row;
|
||||
continue;
|
||||
}
|
||||
this.storeHeap.next(results, limit);
|
||||
if (limit > 0 && results.size() == limit) {
|
||||
return true;
|
||||
return !stopRow;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void readAndDumpCurrentResult() throws IOException {
|
||||
this.storeHeap.next(this.results);
|
||||
this.results.clear();
|
||||
private boolean filterRow() {
|
||||
return filter != null
|
||||
&& filter.filterRow();
|
||||
}
|
||||
private boolean filterRowKey(byte[] row) {
|
||||
return filter != null
|
||||
&& filter.filterRowKey(row, 0, row.length);
|
||||
}
|
||||
|
||||
/*
|
||||
* Do we have results to return or should we continue. Call when we get to
|
||||
* the end of a row. Does house cleaning -- clearing results and resetting
|
||||
* filters -- if we are to continue.
|
||||
* @return True if we should return else false if need to keep going.
|
||||
*/
|
||||
private boolean hasResults() {
|
||||
if (this.results.isEmpty() ||
|
||||
this.filter != null && this.filter.filterRow()) {
|
||||
// Make sure results is empty, reset filters
|
||||
this.results.clear();
|
||||
resetFilters();
|
||||
return false;
|
||||
protected void nextRow(byte [] currentRow) throws IOException {
|
||||
while (Bytes.equals(currentRow, peekRow())) {
|
||||
this.storeHeap.next(MOCKED_LIST);
|
||||
}
|
||||
return true;
|
||||
results.clear();
|
||||
resetFilters();
|
||||
}
|
||||
|
||||
private byte[] peekRow() {
|
||||
KeyValue kv = this.storeHeap.peek();
|
||||
return kv == null ? null : kv.getRow();
|
||||
}
|
||||
|
||||
private boolean isStopRow(byte [] currentRow) {
|
||||
return currentRow == null ||
|
||||
(stopRow != null &&
|
||||
comparator.compareRows(stopRow, 0, stopRow.length,
|
||||
currentRow, 0, currentRow.length) <= isScan);
|
||||
}
|
||||
|
||||
public synchronized void close() {
|
||||
|
@ -2096,14 +2086,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
storeHeap.close();
|
||||
storeHeap = null;
|
||||
}
|
||||
this.filterClosed = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the current storeHeap
|
||||
*/
|
||||
public synchronized KeyValueHeap getStoreHeap() {
|
||||
return this.storeHeap;
|
||||
this.filterClosed = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2828,6 +2811,33 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* A mocked list implementaion - discards all updates.
|
||||
*/
|
||||
private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {
|
||||
|
||||
@Override
|
||||
public void add(int index, KeyValue element) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addAll(int index, Collection<? extends KeyValue> c) {
|
||||
return false; // this list is never changed as a result of an update
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyValue get(int index) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Facility for dumping and compacting catalog tables.
|
||||
* Only does catalog tables since these are only tables we for sure know
|
||||
|
|
Loading…
Reference in New Issue