HBASE-1531 Change new Get to use new filter API

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@786672 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-06-19 21:37:20 +00:00
parent 558284d6d2
commit dfbcd9ea6b
16 changed files with 187 additions and 85 deletions

View File

@ -202,6 +202,7 @@ Release 0.20.0 - Unreleased
HBASE-1544 Cleanup HTable (Jonathan Gray via Stack)
HBASE-1488 After 1304 goes in, fix and reenable test of thrift, mr indexer,
and merge tool
HBASE-1531 Change new Get to use new filter API
IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage

View File

@ -29,7 +29,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
@ -63,7 +63,7 @@ public class Get implements Writable {
private byte [] row = null;
private long lockId = -1L;
private int maxVersions = 1;
private RowFilterInterface filter = null;
private Filter filter = null;
private TimeRange tr = new TimeRange();
private Map<byte [], NavigableSet<byte []>> familyMap =
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
@ -204,14 +204,23 @@ public class Get implements Writable {
/**
* Apply the specified server-side filter when performing the Get.
* Only {@link Filter#filterKeyValue(KeyValue)} is called AFTER all tests
* for ttl, column match, deletes and max versions have been run.
* @param filter filter to run on the server
*/
public Get setFilter(RowFilterInterface filter) {
public Get setFilter(Filter filter) {
this.filter = filter;
return this;
}
/** Accessors */
/* Accessors */
/**
* @return Filter
*/
public Filter getFilter() {
return this.filter;
}
/**
* Method for retrieving the get's row
@ -341,9 +350,8 @@ public class Get implements Writable {
this.lockId = in.readLong();
this.maxVersions = in.readInt();
boolean hasFilter = in.readBoolean();
if(hasFilter) {
this.filter =
(RowFilterInterface)HbaseObjectWritable.readObject(in, null);
if (hasFilter) {
this.filter = (Filter)HbaseObjectWritable.readObject(in, null);
}
this.tr = new TimeRange();
tr.readFields(in);
@ -375,8 +383,7 @@ public class Get implements Writable {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
HbaseObjectWritable.writeObject(out, this.filter,
RowFilterInterface.class, null);
HbaseObjectWritable.writeObject(out, this.filter, Filter.class, null);
}
tr.write(out);
out.writeInt(familyMap.size());
@ -395,4 +402,4 @@ public class Get implements Writable {
}
}
}
}
}

View File

@ -105,12 +105,12 @@ public class Put implements HeapSize, Writable, Comparable<Put> {
* its version to this Put operation.
* @param column Old style column name with family and qualifier put together
* with a colon.
* @param timestamp version timestamp
* @param ts version timestamp
* @param value column value
*/
public void add(byte [] column, long timestamp, byte [] value) {
public void add(byte [] column, long ts, byte [] value) {
byte [][] parts = KeyValue.parseColumn(column);
add(parts[0], parts[1], timestamp, value);
add(parts[0], parts[1], ts, value);
}
/**
@ -118,15 +118,15 @@ public class Put implements HeapSize, Writable, Comparable<Put> {
* its version to this Put operation.
* @param family family name
* @param qualifier column qualifier
* @param timestamp version timestamp
* @param ts version timestamp
* @param value column value
*/
public void add(byte [] family, byte [] qualifier, long timestamp, byte [] value) {
public void add(byte [] family, byte [] qualifier, long ts, byte [] value) {
List<KeyValue> list = familyMap.get(family);
if(list == null) {
list = new ArrayList<KeyValue>();
}
KeyValue kv = new KeyValue(this.row, family, qualifier, timestamp,
KeyValue kv = new KeyValue(this.row, family, qualifier, ts,
KeyValue.Type.Put, value);
list.add(kv);
familyMap.put(family, list);
@ -217,10 +217,10 @@ public class Put implements HeapSize, Writable, Comparable<Put> {
/**
* Set whether this Put should be written to the WAL or not.
* Not writing the WAL means you may lose edits on server crash.
* @param writeToWAL true if edits should be written to WAL, false if not
* @param write true if edits should be written to WAL, false if not
*/
public void writeToWAL(boolean writeToWAL) {
this.writeToWAL = writeToWAL;
public void writeToWAL(boolean write) {
this.writeToWAL = write;
}
/**

View File

@ -0,0 +1,59 @@
package org.apache.hadoop.hbase.filter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hbase.KeyValue;
/**
* Simple filter that returns first N columns on row only.
* This filter was written to test filters in Get and as soon as it gets
* its quota of columns, {@link #filterAllRemaining()} returns true. This
* makes this filter unsuitable as a Scan filter.
*/
public class ColumnCountGetFilter implements Filter {
private int limit = 0;
private int count = 0;
/**
* Used during serialization.
* Do not use.
*/
public ColumnCountGetFilter() {
super();
}
public ColumnCountGetFilter(final int n) {
this.limit = n;
}
public boolean filterAllRemaining() {
return this.count > this.limit;
}
public ReturnCode filterKeyValue(KeyValue v) {
this.count++;
return filterAllRemaining()? ReturnCode.SKIP: ReturnCode.INCLUDE;
}
public boolean filterRow() {
return false;
}
public boolean filterRowKey(byte[] buffer, int offset, int length) {
return false;
}
public void reset() {
this.count = 0;
}
public void readFields(DataInput in) throws IOException {
this.limit = in.readInt();
}
public void write(DataOutput out) throws IOException {
out.writeInt(this.limit);
}
}

View File

@ -98,7 +98,7 @@ public interface Filter extends Writable {
* still be called.
*/
NEXT_ROW,
};
}
/**
* Last chance to veto row based on previous {@link #filterKeyValue(KeyValue)}

View File

@ -28,8 +28,8 @@ import java.io.IOException;
import java.io.DataInput;
/**
* A Filter that stops after the given row. There is no "RowStopFilter" because the Scan
* spec allows you to specify a stop row.
* A Filter that stops after the given row. There is no "RowStopFilter" because
* the Scan spec allows you to specify a stop row.
*
* Use this filter to include the stop row, eg: [A,Z].
*/
@ -86,4 +86,4 @@ public class RowInclusiveStopFilter implements Filter {
public void readFields(DataInput in) throws IOException {
this.stopRowKey = Bytes.readByteArray(in);
}
}
}

View File

@ -27,30 +27,35 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.DataInput;
/**
* Pass results that have same row prefix.
*/
public class RowPrefixFilter implements Filter {
protected byte [] prefix;
protected byte [] prefix = null;
public RowPrefixFilter(final byte [] prefix) {
this.prefix = prefix;
}
public RowPrefixFilter() {
super();
}
@Override
public void reset() {
// Noop
}
@Override
public boolean filterRowKey(byte[] buffer, int offset, int length) {
if (buffer == null)
if (buffer == null || this.prefix == null)
return true;
if (length < prefix.length)
return true;
// if they are equal, return false => pass row
// else return true, filter row
return Bytes.compareTo(buffer, offset, prefix.length, prefix, 0, prefix.length) != 0;
return Bytes.compareTo(buffer, offset, this.prefix.length, this.prefix, 0,
this.prefix.length) != 0;
}
@Override
@ -70,11 +75,11 @@ public class RowPrefixFilter implements Filter {
@Override
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, prefix);
Bytes.writeByteArray(out, this.prefix);
}
@Override
public void readFields(DataInput in) throws IOException {
prefix = Bytes.readByteArray(in);
this.prefix = Bytes.readByteArray(in);
}
}
}

View File

@ -17,11 +17,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**Provides row-level filters applied to HRegion scan results during calls to {@link org.apache.hadoop.hbase.client.ResultScanner#next()}.
/**Provides row-level filters applied to HRegion scan results during calls to
* {@link org.apache.hadoop.hbase.client.ResultScanner#next()}.
<p>Use {@link org.apache.hadoop.hbase.filter.StopRowFilter} to stop the scan once rows exceed the supplied row key.
Filters will not stop the scan unless hosted inside of a {@link org.apache.hadoop.hbase.filter.WhileMatchRowFilter}.
Supply a set of filters to apply using {@link org.apache.hadoop.hbase.filter.RowFilterSet}.
<p>Since HBase 0.20.0, {@link Filter} is the new Interface used filtering.
It replaces the deprecated {@link RowFilterInterface}.
Filters run the extent of a table unless you wrap your filter in a
{@link RowWhileMatchFilter}. The latter returns as soon as the filter
stops matching.
</p>
*/
package org.apache.hadoop.hbase.filter;

View File

@ -2233,12 +2233,12 @@ public class HRegion implements HConstants { // , Writable{
*/
public Result get(final Get get, final Integer lockid) throws IOException {
// Verify families are all valid
if(get.hasFamilies()) {
for(byte [] family : get.familySet()) {
if (get.hasFamilies()) {
for (byte [] family: get.familySet()) {
checkFamily(family);
}
} else { // Adding all families to scanner
for(byte[] family: regionInfo.getTableDesc().getFamiliesKeys()){
for (byte[] family: regionInfo.getTableDesc().getFamiliesKeys()) {
get.addFamily(family);
}
}
@ -2246,7 +2246,7 @@ public class HRegion implements HConstants { // , Writable{
Integer lid = getLock(lockid, get.getRow());
List<KeyValue> result = new ArrayList<KeyValue>();
try {
for(Map.Entry<byte[],NavigableSet<byte[]>> entry:
for (Map.Entry<byte[],NavigableSet<byte[]>> entry:
get.getFamilyMap().entrySet()) {
get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
}

View File

@ -21,9 +21,11 @@
package org.apache.hadoop.hbase.regionserver;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
@ -45,7 +47,6 @@ import org.apache.hadoop.hbase.util.Bytes;
* versions,
*/
public class QueryMatcher {
/**
* {@link #match} return codes. These instruct the scanner moving through
* Memcaches and StoreFiles what to do with the current KeyValue.
@ -113,27 +114,29 @@ public class QueryMatcher {
/** Oldest allowed version stamp for TTL enforcement */
protected long oldestStamp;
protected Filter filter;
/**
* Constructs a QueryMatcher for a Get.
* @param get
* @param row
* @param family
* @param columns
* @param ttl
* @param rowComparator
*/
public QueryMatcher(Get get, byte [] row, byte [] family,
public QueryMatcher(Get get, byte [] family,
NavigableSet<byte[]> columns, long ttl, KeyComparator rowComparator,
int maxVersions) {
this.row = row;
this.row = get.getRow();
this.filter = get.getFilter();
this.tr = get.getTimeRange();
this.oldestStamp = System.currentTimeMillis() - ttl;
this.rowComparator = rowComparator;
this.deletes = new GetDeleteTracker();
this.startKey = KeyValue.createFirstOnRow(row);
// Single branch to deal with two types of Gets (columns vs all in family)
if(columns == null || columns.size() == 0) {
if (columns == null || columns.size() == 0) {
this.columns = new WildcardColumnTracker(maxVersions);
} else {
this.columns = new ExplicitColumnTracker(columns, maxVersions);
@ -142,6 +145,7 @@ public class QueryMatcher {
// For the subclasses.
protected QueryMatcher() {
super();
}
/**
@ -151,6 +155,7 @@ public class QueryMatcher {
*/
public QueryMatcher(QueryMatcher matcher, byte [] row) {
this.row = row;
this.filter = matcher.filter;
this.tr = matcher.getTimeRange();
this.oldestStamp = matcher.getOldestStamp();
this.rowComparator = matcher.getRowComparator();
@ -181,10 +186,12 @@ public class QueryMatcher {
* @return MatchCode: include, skip, next, done
*/
public MatchCode match(KeyValue kv) {
if(this.columns.done()) {
if (this.columns.done()) {
return MatchCode.DONE; // done_row
}
if (this.filter != null && this.filter.filterAllRemaining()) {
return MatchCode.DONE;
}
// Directly act on KV buffer
byte [] bytes = kv.getBuffer();
int offset = kv.getOffset();
@ -203,15 +210,14 @@ public class QueryMatcher {
*/
int ret = this.rowComparator.compareRows(row, 0, row.length,
bytes, offset, rowLength);
if(ret <= -1) {
if (ret <= -1) {
// Have reached the next row
return MatchCode.NEXT; // got_to_next_row (end)
} else if(ret >= 1) {
} else if (ret >= 1) {
// At a previous row
return MatchCode.SKIP; // skip_to_cur_row
}
offset += rowLength;
byte familyLength = bytes[offset];
offset += Bytes.SIZEOF_BYTE + familyLength;
@ -219,7 +225,7 @@ public class QueryMatcher {
(offset - kv.getOffset()) - KeyValue.TIMESTAMP_TYPE_SIZE;
int columnOffset = offset;
offset += columnLength;
/* Check TTL
* If expired, go to next KeyValue
*/
@ -229,7 +235,7 @@ public class QueryMatcher {
return MatchCode.NEXT; // done_row
}
offset += Bytes.SIZEOF_LONG;
/* Check TYPE
* If a delete within (or after) time range, add to deletes
* Move to next KeyValue
@ -237,8 +243,8 @@ public class QueryMatcher {
byte type = bytes[offset];
// if delete type == delete family, return done_row
if(isDelete(type)) {
if(tr.withinOrAfterTimeRange(timestamp)) {
if (isDelete(type)) {
if (tr.withinOrAfterTimeRange(timestamp)) {
this.deletes.add(bytes, columnOffset, columnLength, timestamp, type);
}
return MatchCode.SKIP; // skip the delete cell.
@ -247,29 +253,38 @@ public class QueryMatcher {
/* Check TimeRange
* If outside of range, move to next KeyValue
*/
if(!tr.withinTimeRange(timestamp)) {
if (!tr.withinTimeRange(timestamp)) {
return MatchCode.SKIP; // optimization chances here.
}
/* Check Deletes
* If deleted, move to next KeyValue
*/
if(!deletes.isEmpty() && deletes.isDeleted(bytes, columnOffset,
if (!deletes.isEmpty() && deletes.isDeleted(bytes, columnOffset,
columnLength, timestamp)) {
// 2 types of deletes:
// affects 1 cell or 1 column, so just skip the keyvalues.
// - delete family, so just skip to the next row.
return MatchCode.SKIP;
}
/* Check Column and Versions
* Returns a MatchCode directly, identical language
* If matched column without enough versions, include
* If enough versions of this column or does not match, skip
* If have moved past
* If enough versions of everything,
* TODO: No mapping from Filter.ReturnCode to MatchCode.
*/
return columns.checkColumn(bytes, columnOffset, columnLength);
MatchCode mc = columns.checkColumn(bytes, columnOffset, columnLength);
if (mc == MatchCode.INCLUDE && this.filter != null) {
switch(this.filter.filterKeyValue(kv)) {
case INCLUDE: return MatchCode.INCLUDE;
case SKIP: return MatchCode.SKIP;
default: return MatchCode.DONE;
}
}
return mc;
}
// should be in KeyValue.
@ -310,6 +325,7 @@ public class QueryMatcher {
public void reset() {
this.deletes.reset();
this.columns.reset();
if (this.filter != null) this.filter.reset();
}
/**

View File

@ -33,8 +33,6 @@ import org.apache.hadoop.hbase.util.Bytes;
* A query matcher that is specifically designed for the scan case.
*/
public class ScanQueryMatcher extends QueryMatcher {
private Filter filter;
// have to support old style filter for now.
private RowFilterInterface oldFilter;
// Optimization so we can skip lots of compares when we decide to skip

View File

@ -1468,9 +1468,8 @@ public class Store implements HConstants {
KeyComparator keyComparator = this.comparator.getRawComparator();
// Column matching and version enforcement
QueryMatcher matcher = new QueryMatcher(get, get.getRow(),
this.family.getName(), columns, this.ttl, keyComparator,
versionsToReturn(get.getMaxVersions()));
QueryMatcher matcher = new QueryMatcher(get, this.family.getName(), columns,
this.ttl, keyComparator, versionsToReturn(get.getMaxVersions()));
// Read from Memcache
if(this.memcache.get(matcher, result)) {
@ -1516,8 +1515,8 @@ public class Store implements HConstants {
NavigableSet<byte[]> qualifiers =
new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
qualifiers.add(qualifier);
QueryMatcher matcher = new QueryMatcher(get, row, family, qualifiers,
this.ttl, keyComparator, 1);
QueryMatcher matcher = new QueryMatcher(get, family, qualifiers, this.ttl,
keyComparator, 1);
// Read from Memcache
if(this.memcache.get(matcher, result)) {

View File

@ -54,6 +54,10 @@ public class TestRowPrefixFilter extends TestCase {
prefixRowTests(mainFilter);
}
public void testPrefixOnRowInsideWhileMatchRow() throws Exception {
prefixRowTests(new RowWhileMatchFilter(this.mainFilter), true);
}
public void testSerialization() throws Exception {
// Decompose mainFilter to bytes.
ByteArrayOutputStream stream = new ByteArrayOutputStream();
@ -72,20 +76,25 @@ public class TestRowPrefixFilter extends TestCase {
}
private void prefixRowTests(Filter filter) throws Exception {
prefixRowTests(filter, false);
}
private void prefixRowTests(Filter filter, boolean lastFilterAllRemaining)
throws Exception {
for (char c = FIRST_CHAR; c <= LAST_CHAR; c++) {
byte [] t = createRow(c);
assertFalse("Failed with characer " + c, filter.filterRowKey(t, 0, t.length));
assertFalse("Failed with character " + c,
filter.filterRowKey(t, 0, t.length));
assertFalse(filter.filterAllRemaining());
}
String yahooSite = "com.yahoo.www";
byte [] yahooSiteBytes = Bytes.toBytes(yahooSite);
assertTrue("Failed with character " +
yahooSite, filter.filterRowKey(yahooSiteBytes, 0, yahooSiteBytes.length));
assertEquals(filter.filterAllRemaining(), lastFilterAllRemaining);
}
private byte [] createRow(final char c) {
return Bytes.toBytes(HOST_PREFIX + Character.toString(c));
}
}
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
@ -71,8 +72,7 @@ public class TestHRegion extends HBaseTestCase {
// individual code pieces in the HRegion. Putting files locally in
// /tmp/testtable
//////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////
// checkAndPut tests
//////////////////////////////////////////////////////////////////////////////
@ -375,7 +375,7 @@ public class TestHRegion extends HBaseTestCase {
}
assertFalse(true);
}
public void testGet_Basic() throws IOException {
byte [] tableName = Bytes.toBytes("testtable");
byte [] row1 = Bytes.toBytes("row1");
@ -398,7 +398,7 @@ public class TestHRegion extends HBaseTestCase {
put.add(fam1, col4, null);
put.add(fam1, col5, null);
region.put(put);
Get get = new Get(row1);
get.addColumn(fam1, col2);
get.addColumn(fam1, col4);
@ -406,10 +406,9 @@ public class TestHRegion extends HBaseTestCase {
KeyValue kv1 = new KeyValue(row1, fam1, col2);
KeyValue kv2 = new KeyValue(row1, fam1, col4);
KeyValue [] expected = {kv1, kv2};
//Test
Result res = region.get(get, null);
assertEquals(expected.length, res.size());
for(int i=0; i<res.size(); i++){
assertEquals(0,
@ -420,8 +419,15 @@ public class TestHRegion extends HBaseTestCase {
Bytes.compareTo(
expected[i].getQualifier(), res.raw()[i].getQualifier()));
}
// Test using a filter on a Get
Get g = new Get(row1);
final int count = 2;
g.setFilter(new ColumnCountGetFilter(count));
res = region.get(g, null);
assertEquals(count, res.size());
}
public void testGet_Empty() throws IOException {
byte [] tableName = Bytes.toBytes("emptytable");
byte [] row = Bytes.toBytes("row");

View File

@ -296,7 +296,7 @@ public class TestMemcache extends TestCase {
long ttl = Long.MAX_VALUE;
QueryMatcher matcher =
new QueryMatcher(get, row, fam, columns, ttl, KeyValue.KEY_COMPARATOR, 1);
new QueryMatcher(get, fam, columns, ttl, KeyValue.KEY_COMPARATOR, 1);
List<KeyValue> result = new ArrayList<KeyValue>();
boolean res = memcache.get(matcher, result);
@ -324,7 +324,7 @@ public class TestMemcache extends TestCase {
long ttl = Long.MAX_VALUE;
QueryMatcher matcher =
new QueryMatcher(get, row, fam, columns, ttl, KeyValue.KEY_COMPARATOR, 1);
new QueryMatcher(get, fam, columns, ttl, KeyValue.KEY_COMPARATOR, 1);
List<KeyValue> result = new ArrayList<KeyValue>();
boolean res = memcache.get(matcher, result);
@ -349,7 +349,7 @@ public class TestMemcache extends TestCase {
long ttl = Long.MAX_VALUE;
QueryMatcher matcher =
new QueryMatcher(get, row, fam, columns, ttl, KeyValue.KEY_COMPARATOR, 1);
new QueryMatcher(get, fam, columns, ttl, KeyValue.KEY_COMPARATOR, 1);
//Setting up memcache
memcache.add(new KeyValue(row, fam ,qf1, val));
@ -390,8 +390,8 @@ public class TestMemcache extends TestCase {
columns.add(qf3);
long ttl = Long.MAX_VALUE;
QueryMatcher matcher =
new QueryMatcher(get, row, fam, columns, ttl, KeyValue.KEY_COMPARATOR, 1);
QueryMatcher matcher = new QueryMatcher(get, fam, columns, ttl,
KeyValue.KEY_COMPARATOR, 1);
//Setting up expected
List<KeyValue> expected = new ArrayList<KeyValue>();

View File

@ -97,7 +97,7 @@ implements HConstants {
expected.add(MatchCode.INCLUDE);
expected.add(MatchCode.DONE);
QueryMatcher qm = new QueryMatcher(get, get.getRow(), fam2,
QueryMatcher qm = new QueryMatcher(get, fam2,
get.getFamilyMap().get(fam2), ttl, rowComparator, 1);
List<KeyValue> memCache = new ArrayList<KeyValue>();
@ -140,8 +140,7 @@ implements HConstants {
expected.add(MatchCode.INCLUDE);
expected.add(MatchCode.NEXT);
QueryMatcher qm = new QueryMatcher(get, get.getRow(), fam2, null,
ttl, rowComparator, 1);
QueryMatcher qm = new QueryMatcher(get, fam2, null, ttl, rowComparator, 1);
List<KeyValue> memCache = new ArrayList<KeyValue>();
memCache.add(new KeyValue(row1, fam2, col1, data));