HBASE-1831 Scanning API must be reworked to allow for fully functional Filters client-side

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@822111 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-10-06 03:25:07 +00:00
parent db8adc7b78
commit e5c8531001
12 changed files with 295 additions and 77 deletions

View File

@ -58,6 +58,8 @@ Release 0.21.0 - Unreleased
doReconstructionLog (Clint Morgan via Stack)
HBASE-1878 BaseScanner results can't be trusted at all (Related to
hbase-1784)
HBASE-1831 Scanning API must be reworked to allow for fully functional
Filters client-side
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable

View File

@ -238,7 +238,7 @@ public class HBaseAdmin {
} catch (RemoteException e) {
throw RemoteExceptionHandler.decodeRemoteException(e);
}
final int batchCount = this.conf.getInt("hbase.admin.scanner.caching", 10);
// Wait until first region is deleted
HRegionInterface server =
connection.getHRegionConnection(firstMetaServer.getServerAddress());
@ -247,31 +247,34 @@ public class HBaseAdmin {
long scannerId = -1L;
try {
Scan scan = new Scan().addColumn(HConstants.CATALOG_FAMILY,
HConstants.REGIONINFO_QUALIFIER);
HConstants.REGIONINFO_QUALIFIER);
scannerId = server.openScanner(
firstMetaServer.getRegionInfo().getRegionName(),
scan);
Result values = server.next(scannerId);
if (values == null || values.size() == 0) {
firstMetaServer.getRegionInfo().getRegionName(), scan);
// Get a batch at a time.
Result [] values = server.next(scannerId, batchCount);
if (values == null || values.length == 0) {
break;
}
boolean found = false;
NavigableMap<byte[], byte[]> infoValues = values.getFamilyMap(HConstants.CATALOG_FAMILY);
for (Map.Entry<byte [], byte []> e: infoValues.entrySet()) {
if (Bytes.equals(e.getKey(), HConstants.REGIONINFO_QUALIFIER)) {
info = (HRegionInfo) Writables.getWritable(
e.getValue(), info);
if (Bytes.equals(info.getTableDesc().getName(), tableName)) {
found = true;
for (int i = 0; i < values.length; i++) {
Result r = values[i];
NavigableMap<byte[], byte[]> infoValues =
r.getFamilyMap(HConstants.CATALOG_FAMILY);
for (Map.Entry<byte[], byte[]> e : infoValues.entrySet()) {
if (Bytes.equals(e.getKey(), HConstants.REGIONINFO_QUALIFIER)) {
info = (HRegionInfo) Writables.getWritable(e.getValue(), info);
if (Bytes.equals(info.getTableDesc().getName(), tableName)) {
found = true;
} else {
found = false;
break;
}
}
}
}
if (!found) {
break;
}
} catch (IOException ex) {
if(tries == numRetries - 1) { // no more tries left
if (ex instanceof RemoteException) {
@ -279,7 +282,6 @@ public class HBaseAdmin {
}
throw ex;
}
} finally {
if (scannerId != -1L) {
try {
@ -289,7 +291,6 @@ public class HBaseAdmin {
}
}
}
try {
Thread.sleep(getPauseTime(tries));
} catch (InterruptedException e) {
@ -301,6 +302,8 @@ public class HBaseAdmin {
LOG.info("Deleted " + Bytes.toString(tableName));
}
/**
* Brings a table on-line (enables it).
* Synchronous operation.

View File

@ -483,11 +483,7 @@ public class HConnectionManager implements HConstants {
currentRegion = s.getHRegionInfo();
Result r = null;
Result [] rrs = null;
do {
rrs = getRegionServerWithRetries(s);
if (rrs == null || rrs.length == 0 || rrs[0].size() == 0) {
break; //exit completely
}
while ((rrs = getRegionServerWithRetries(s)) != null && rrs.length > 0) {
r = rrs[0];
byte [] value = r.getValue(HConstants.CATALOG_FAMILY,
HConstants.REGIONINFO_QUALIFIER);
@ -500,7 +496,7 @@ public class HConnectionManager implements HConstants {
}
}
}
} while(true);
}
endKey = currentRegion.getEndKey();
} while (!(endKey == null ||
Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)));

View File

@ -736,7 +736,7 @@ public class HTable implements HTableInterface {
}
public void initialize() throws IOException {
nextScanner(this.caching);
nextScanner(this.caching, false);
}
protected Scan getScan() {
@ -747,13 +747,36 @@ public class HTable implements HTableInterface {
return lastNext;
}
/**
* @param endKey
* @return Returns true if the passed region endkey.
*/
private boolean checkScanStopRow(final byte [] endKey) {
if (this.scan.getStopRow().length > 0) {
// there is a stop row, check to see if we are past it.
byte [] stopRow = scan.getStopRow();
int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
endKey, 0, endKey.length);
if (cmp <= 0) {
// stopRow <= endKey (endKey is equals to or larger than stopRow)
// This is a stop.
return true;
}
}
return false; //unlikely.
}
/*
* Gets a scanner for the next region. If this.currentRegion != null, then
* we will move to the endrow of this.currentRegion. Else we will get
* scanner at the scan.getStartRow().
* scanner at the scan.getStartRow(). We will go no further, just tidy
* up outstanding scanners, if <code>currentRegion != null</code> and
* <code>done</code> is true.
* @param nbRows
* @param done Server-side says we're done scanning.
*/
private boolean nextScanner(int nbRows) throws IOException {
private boolean nextScanner(int nbRows, final boolean done)
throws IOException {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
@ -764,20 +787,23 @@ public class HTable implements HTableInterface {
// Where to start the next scanner
byte [] localStartKey = null;
// if we're at the end of the table, then close and return false
// to stop iterating
// if we're at end of table, close and return false to stop iterating
if (this.currentRegion != null) {
if (CLIENT_LOG.isDebugEnabled()) {
CLIENT_LOG.debug("Finished with region " + this.currentRegion);
}
byte [] endKey = this.currentRegion.getEndKey();
if (endKey == null ||
Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
filterSaysStop(endKey)) {
checkScanStopRow(endKey) ||
done) {
close();
if (CLIENT_LOG.isDebugEnabled()) {
CLIENT_LOG.debug("Finished with scanning at " + this.currentRegion);
}
return false;
}
localStartKey = endKey;
if (CLIENT_LOG.isDebugEnabled()) {
CLIENT_LOG.debug("Finished with region " + this.currentRegion);
}
} else {
localStartKey = this.scan.getStartRow();
}
@ -855,6 +881,9 @@ public class HTable implements HTableInterface {
boolean skipFirst = false;
do {
try {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = getConnection().getRegionServerWithRetries(callable);
if (skipFirst) {
skipFirst = false;
@ -892,7 +921,8 @@ public class HTable implements HTableInterface {
this.lastResult = rs;
}
}
} while (countdown > 0 && nextScanner(countdown));
// Values == null means server-side filter has determined we must STOP
} while (countdown > 0 && nextScanner(countdown, values == null));
}
if (cache.size() > 0) {

View File

@ -77,7 +77,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
try {
rrs = server.next(scannerId, caching);
} catch (IOException e) {
IOException ioe = null;
IOException ioe = null;
if (e instanceof RemoteException) {
ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
}
@ -88,7 +88,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
throw new DoNotRetryIOException("Reset scanner", ioe);
}
}
return rrs == null || rrs.length == 0? null: rrs;
return rrs;
}
return null;
}

View File

@ -62,4 +62,4 @@ public class BinaryComparator implements WritableByteArrayComparable {
public int compareTo(byte [] value) {
return Bytes.compareTo(this.value, value);
}
}
}

View File

@ -25,5 +25,9 @@ Filters run the extent of a table unless you wrap your filter in a
{@link org.apache.hadoop.hbase.filter.WhileMatchFilter}.
The latter returns as soon as the filter stops matching.
</p>
<p>Do not rely on filters carrying state across rows; its not reliable in current
hbase as we have no handlers in place for when regions split, close or server
crashes.
</p>
*/
package org.apache.hadoop.hbase.filter;

View File

@ -186,7 +186,7 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion {
/**
* Get the next set of values
* @param scannerId clientId passed to openScanner
* @return map of values
* @return map of values; returns null if no results.
* @throws IOException
*/
public Result next(long scannerId) throws IOException;
@ -195,7 +195,9 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion {
* Get the next set of values
* @param scannerId clientId passed to openScanner
* @param numberOfRows the number of rows to fetch
* @return map of values
* @return Array of Results (map of values); array is empty if done with this
* region and null if we are NOT to go to the next region (happens when a
* filter rules that the scan is done).
* @throws IOException
*/
public Result [] next(long scannerId, int numberOfRows) throws IOException;

View File

@ -1713,13 +1713,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
}
/**
* Get the next row of results from this region.
* @param results list to append results to
* @return true if there are more rows, false if scanner is done
* @throws NotServerRegionException If this region is closing or closed
*/
@Override
public boolean next(List<KeyValue> outResults) throws IOException {
if (closing.get() || closed.get()) {
close();
@ -1733,12 +1726,23 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
outResults.addAll(results);
resetFilters();
if(filter != null && filter.filterAllRemaining()) {
if (isFilterDone()) {
return false;
}
return returnResult;
}
/*
* @return True if a filter rules the scanner is over, done.
*/
boolean isFilterDone() {
return this.filter != null && this.filter.filterAllRemaining();
}
/*
* @return true if there are more rows, false if scanner is done
* @throws IOException
*/
private boolean nextInternal() throws IOException {
// This method should probably be reorganized a bit... has gotten messy
KeyValue kv;

View File

@ -1946,7 +1946,14 @@ public class HRegionServer implements HConstants, HRegionInterface,
break;
}
}
return results.toArray(new Result[0]);
// Below is an ugly hack where we cast the InternalScanner to be a
// HRegion.RegionScanner. The alternative is to change InternalScanner
// interface but its used everywhere whereas we just need a bit of info
// from HRegion.RegionScanner, IF its filter if any is done with the scan
// and wants to tell the client to stop the scan. This is done by passing
// a null result.
return ((HRegion.RegionScanner)s).isFilterDone() && results.isEmpty()?
null: results.toArray(new Result[0]);
} catch (Throwable t) {
if (t instanceof NotServingRegionException) {
this.scanners.remove(scannerId);
@ -2527,9 +2534,10 @@ public class HRegionServer implements HConstants, HRegionInterface,
public static void main(String [] args) {
Configuration conf = new HBaseConfiguration();
@SuppressWarnings("unchecked")
Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
Class<? extends HRegionServer> regionServerClass =
(Class<? extends HRegionServer>) conf.getClass(HConstants.REGION_SERVER_IMPL,
HRegionServer.class);
doMain(args, regionServerClass);
}
}
}

View File

@ -22,20 +22,32 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseClusterTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Tests from client-side of a cluster.
*/
public class TestClient extends HBaseClusterTestCase {
final Log LOG = LogFactory.getLog(getClass());
private static byte [] ROW = Bytes.toBytes("testRow");
private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
@ -49,9 +61,182 @@ public class TestClient extends HBaseClusterTestCase {
public TestClient() {
super();
}
/**
* Test filters when multiple regions. It does counts. Needs eye-balling of
* logs to ensure that we're not scanning more regions that we're supposed to.
* Related to the TestFilterAcrossRegions over in the o.a.h.h.filter package.
* @throws IOException
*/
public void testFilterAcrossMutlipleRegions() throws IOException {
byte [] name = Bytes.toBytes(getName());
HTable t = createTable(name, FAMILY);
int rowCount = loadTable(t);
assertRowCount(t, rowCount);
// Split the table. Should split on a reasonable key; 'lqj'
Map<HRegionInfo, HServerAddress> regions = splitTable(t);
assertRowCount(t, rowCount);
// Get end key of first region.
byte [] endKey = regions.keySet().iterator().next().getEndKey();
// Count rows with a filter that stops us before passed 'endKey'.
// Should be count of rows in first region.
int endKeyCount = countRows(t, createScanWithRowFilter(endKey));
assertTrue(endKeyCount < rowCount);
// How do I know I did not got to second region? Thats tough. Can't really
// do that in client-side region test. I verified by tracing in debugger.
// I changed the messages that come out when set to DEBUG so should see
// when scanner is done. Says "Finished with scanning..." with region name.
// Check that its finished in right region.
// New test. Make it so scan goes into next region by one and then two.
// Make sure count comes out right.
byte [] key = new byte [] {endKey[0], endKey[1], (byte)(endKey[2] + 1)};
int plusOneCount = countRows(t, createScanWithRowFilter(key));
assertEquals(endKeyCount + 1, plusOneCount);
key = new byte [] {endKey[0], endKey[1], (byte)(endKey[2] + 2)};
int plusTwoCount = countRows(t, createScanWithRowFilter(key));
assertEquals(endKeyCount + 2, plusTwoCount);
// New test. Make it so I scan one less than endkey.
key = new byte [] {endKey[0], endKey[1], (byte)(endKey[2] - 1)};
int minusOneCount = countRows(t, createScanWithRowFilter(key));
assertEquals(endKeyCount - 1, minusOneCount);
// For above test... study logs. Make sure we do "Finished with scanning.."
// in first region and that we do not fall into the next region.
key = new byte [] {'a', 'a', 'a'};
int countBBB = countRows(t,
createScanWithRowFilter(key, null, CompareFilter.CompareOp.EQUAL));
assertEquals(1, countBBB);
int countGreater = countRows(t, createScanWithRowFilter(endKey, null,
CompareFilter.CompareOp.GREATER_OR_EQUAL));
// Because started at start of table.
assertEquals(0, countGreater);
countGreater = countRows(t, createScanWithRowFilter(endKey, endKey,
CompareFilter.CompareOp.GREATER_OR_EQUAL));
assertEquals(rowCount - endKeyCount, countGreater);
}
/**
* Load table with rows from 'aaa' to 'zzz'.
* @param t
* @return Count of rows loaded.
* @throws IOException
*/
private int loadTable(final HTable t) throws IOException {
// Add data to table.
byte[] k = new byte[3];
int rowCount = 0;
for (byte b1 = 'a'; b1 < 'z'; b1++) {
for (byte b2 = 'a'; b2 < 'z'; b2++) {
for (byte b3 = 'a'; b3 < 'z'; b3++) {
k[0] = b1;
k[1] = b2;
k[2] = b3;
Put put = new Put(k);
put.add(FAMILY, new byte[0], k);
t.put(put);
rowCount++;
}
}
}
return rowCount;
}
/*
* @param key
* @return Scan with RowFilter that does LESS than passed key.
*/
private Scan createScanWithRowFilter(final byte [] key) {
return createScanWithRowFilter(key, null, CompareFilter.CompareOp.LESS);
}
/*
* @param key
* @param op
* @param startRow
* @return Scan with RowFilter that does CompareOp op on passed key.
*/
private Scan createScanWithRowFilter(final byte [] key,
final byte [] startRow, CompareFilter.CompareOp op) {
// Make sure key is of some substance... non-null and > than first key.
assertTrue(key != null && key.length > 0 &&
Bytes.BYTES_COMPARATOR.compare(key, new byte [] {'a', 'a', 'a'}) >= 0);
LOG.info("Key=" + Bytes.toString(key));
Scan s = startRow == null? new Scan(): new Scan(startRow);
Filter f = new RowFilter(op, new BinaryComparator(key));
f = new WhileMatchFilter(f);
s.setFilter(f);
return s;
}
/*
* @param t
* @param s
* @return Count of rows in table.
* @throws IOException
*/
private int countRows(final HTable t, final Scan s)
throws IOException {
// Assert all rows in table.
ResultScanner scanner = t.getScanner(s);
int count = 0;
for (Result result: scanner) {
count++;
assertTrue(result.size() > 0);
// LOG.info("Count=" + count + ", row=" + Bytes.toString(result.getRow()));
}
return count;
}
private void assertRowCount(final HTable t, final int expected)
throws IOException {
assertEquals(expected, countRows(t, new Scan()));
}
/*
* Split table into multiple regions.
* @param t Table to split.
* @return Map of regions to servers.
* @throws IOException
*/
private Map<HRegionInfo, HServerAddress> splitTable(final HTable t)
throws IOException {
// Split this table in two.
HBaseAdmin admin = new HBaseAdmin(this.conf);
admin.split(t.getTableName());
Map<HRegionInfo, HServerAddress> regions = waitOnSplit(t);
assertTrue(regions.size() > 1);
return regions;
}
/*
* Wait on table split. May return because we waited long enough on the split
* and it didn't happen. Caller should check.
* @param t
* @return Map of table regions; caller needs to check table actually split.
*/
private Map<HRegionInfo, HServerAddress> waitOnSplit(final HTable t)
throws IOException {
Map<HRegionInfo, HServerAddress> regions = t.getRegionsInfo();
int originalCount = regions.size();
for (int i = 0; i < this.conf.getInt("hbase.test.retries", 30); i++) {
Thread.currentThread();
try {
Thread.sleep(this.conf.getInt("hbase.server.thread.wakefrequency", 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
regions = t.getRegionsInfo();
if (regions.size() > originalCount) break;
}
return regions;
}
public void testSuperSimple() throws Exception {
byte [] TABLE = Bytes.toBytes("testSuperSimple");
byte [] TABLE = Bytes.toBytes(getName());
HTable ht = createTable(TABLE, FAMILY);
Put put = new Put(ROW);
put.add(FAMILY, QUALIFIER, VALUE);
@ -64,7 +249,6 @@ public class TestClient extends HBaseClusterTestCase {
scanner.close();
System.out.println("Done.");
}
public void testFilters() throws Exception {
byte [] TABLE = Bytes.toBytes("testFilters");
HTable ht = createTable(TABLE, FAMILY);
@ -98,7 +282,7 @@ public class TestClient extends HBaseClusterTestCase {
assertEquals(expectedIndex, 6);
scanner.close();
}
/**
* Test simple table and non-existent row cases.
*/

View File

@ -65,8 +65,6 @@ public class TestHRegion extends HBaseTestCase {
private final byte[] value1 = Bytes.toBytes("value1");
private final byte[] value2 = Bytes.toBytes("value2");
private final byte [] row = Bytes.toBytes("rowA");
private final byte [] row2 = Bytes.toBytes("rowB");
private final byte [] row3 = Bytes.toBytes("rowC");
/**
* @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
@ -208,11 +206,11 @@ public class TestHRegion extends HBaseTestCase {
//checkAndPut with wrong value
Store store = region.getStore(fam1);
int size = store.memstore.kvset.size();
store.memstore.kvset.size();
boolean res = region.checkAndPut(row1, fam1, qf1, val1, put, lockId, true);
assertEquals(true, res);
size = store.memstore.kvset.size();
store.memstore.kvset.size();
Get get = new Get(row1);
get.addColumn(fam2, qf1);
@ -811,7 +809,7 @@ public class TestHRegion extends HBaseTestCase {
scan.addFamily(fam1);
scan.addFamily(fam2);
try {
InternalScanner is = region.getScanner(scan);
region.getScanner(scan);
} catch (Exception e) {
assertTrue("Families could not be found in Region", false);
}
@ -832,7 +830,7 @@ public class TestHRegion extends HBaseTestCase {
scan.addFamily(fam2);
boolean ok = false;
try {
InternalScanner is = region.getScanner(scan);
region.getScanner(scan);
} catch (Exception e) {
ok = true;
}
@ -1775,21 +1773,10 @@ public class TestHRegion extends HBaseTestCase {
conf.setLong("hbase.hregion.max.filesize", 1024 * 128);
return conf;
}
//////////////////////////////////////////////////////////////////////////////
// Helpers
//////////////////////////////////////////////////////////////////////////////
private HBaseConfiguration initHRegion() {
HBaseConfiguration conf = new HBaseConfiguration();
conf.set("hbase.hstore.compactionThreshold", "2");
conf.setLong("hbase.hregion.max.filesize", 65536);
return conf;
}
private void initHRegion (byte [] tableName, String callingMethod,
byte[] ... families) throws IOException{
byte[] ... families)
throws IOException {
initHRegion(tableName, callingMethod, new HBaseConfiguration(), families);
}
@ -1797,10 +1784,8 @@ public class TestHRegion extends HBaseTestCase {
HBaseConfiguration conf, byte [] ... families) throws IOException{
HTableDescriptor htd = new HTableDescriptor(tableName);
for(byte [] family : families) {
HColumnDescriptor hcd = new HColumnDescriptor(family);
htd.addFamily(new HColumnDescriptor(family));
}
HRegionInfo info = new HRegionInfo(htd, null, null, false);
Path path = new Path(DIR + callingMethod);
region = HRegion.createHRegion(info, path, conf);