HBASE-2466 Improving filter API to allow for modification of keyvalue list by filter

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@945902 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan Rawson 2010-05-18 21:37:46 +00:00
parent c4eafa2412
commit c83d862620
21 changed files with 671 additions and 142 deletions

View File

@ -602,6 +602,8 @@ Release 0.21.0 - Unreleased
HBASE-2520 Cleanup arrays vs Lists of scanners (Todd Lipcon via Stack) HBASE-2520 Cleanup arrays vs Lists of scanners (Todd Lipcon via Stack)
HBASE-2551 Forward port fixes that are in branch but not in trunk (part HBASE-2551 Forward port fixes that are in branch but not in trunk (part
of the merge of old 0.20 into TRUNK task) of the merge of old 0.20 into TRUNK task)
HBASE-2466 Improving filter API to allow for modification of keyvalue list
by filter (Juhani Connolly via Ryan)
NEW FEATURES NEW FEATURES

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
@ -280,6 +281,11 @@ public class Scan implements Writable {
* @param batch the maximum number of values * @param batch the maximum number of values
*/ */
public void setBatch(int batch) { public void setBatch(int batch) {
if(this.hasFilter() && this.filter.hasFilterRow()) {
throw new IncompatibleFilterException(
"Cannot set batch on a scan using a filter" +
" that returns true for filter.hasFilterRow");
}
this.batch = batch; this.batch = batch;
} }

View File

@ -1,4 +1,4 @@
/** /*
* Copyright 2010 The Apache Software Foundation * Copyright 2010 The Apache Software Foundation
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
@ -32,7 +32,7 @@ import java.io.IOException;
* its quota of columns, {@link #filterAllRemaining()} returns true. This * its quota of columns, {@link #filterAllRemaining()} returns true. This
* makes this filter unsuitable as a Scan filter. * makes this filter unsuitable as a Scan filter.
*/ */
public class ColumnCountGetFilter implements Filter { public class ColumnCountGetFilter extends FilterBase {
private int limit = 0; private int limit = 0;
private int count = 0; private int count = 0;
@ -52,31 +52,28 @@ public class ColumnCountGetFilter implements Filter {
return limit; return limit;
} }
@Override
public boolean filterAllRemaining() { public boolean filterAllRemaining() {
return this.count > this.limit; return this.count > this.limit;
} }
@Override
public ReturnCode filterKeyValue(KeyValue v) { public ReturnCode filterKeyValue(KeyValue v) {
this.count++; this.count++;
return filterAllRemaining()? ReturnCode.SKIP: ReturnCode.INCLUDE; return filterAllRemaining() ? ReturnCode.SKIP: ReturnCode.INCLUDE;
}
public boolean filterRow() {
return false;
}
public boolean filterRowKey(byte[] buffer, int offset, int length) {
return false;
} }
@Override
public void reset() { public void reset() {
this.count = 0; this.count = 0;
} }
@Override
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
this.limit = in.readInt(); this.limit = in.readInt();
} }
@Override
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
out.writeInt(this.limit); out.writeInt(this.limit);
} }

View File

@ -1,5 +1,5 @@
/** /*
* Copyright 2007 The Apache Software Foundation * Copyright 2010 The Apache Software Foundation
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.filter;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
@ -30,7 +31,7 @@ import org.apache.hadoop.hbase.KeyValue;
* This filter can be used for row-based indexing, where references to other tables are stored across many columns, * This filter can be used for row-based indexing, where references to other tables are stored across many columns,
* in order to efficient lookups and paginated results for end users. * in order to efficient lookups and paginated results for end users.
*/ */
public class ColumnPaginationFilter implements Filter public class ColumnPaginationFilter extends FilterBase
{ {
private int limit = 0; private int limit = 0;
private int offset = 0; private int offset = 0;
@ -50,11 +51,7 @@ public class ColumnPaginationFilter implements Filter
this.offset = offset; this.offset = offset;
} }
public boolean filterAllRemaining() @Override
{
return false;
}
public ReturnCode filterKeyValue(KeyValue v) public ReturnCode filterKeyValue(KeyValue v)
{ {
if(count >= offset + limit) if(count >= offset + limit)
@ -67,17 +64,7 @@ public class ColumnPaginationFilter implements Filter
return code; return code;
} }
public boolean filterRow() @Override
{
this.count = 0;
return false;
}
public boolean filterRowKey(byte[] buffer, int offset, int length)
{
return false;
}
public void reset() public void reset()
{ {
this.count = 0; this.count = 0;

View File

@ -43,7 +43,7 @@ import java.util.Arrays;
* <p> * <p>
* Multiple filters can be combined using {@link FilterList}. * Multiple filters can be combined using {@link FilterList}.
*/ */
public abstract class CompareFilter implements Filter { public abstract class CompareFilter extends FilterBase {
/** Comparison operators. */ /** Comparison operators. */
public enum CompareOp { public enum CompareOp {
@ -59,6 +59,8 @@ public abstract class CompareFilter implements Filter {
GREATER_OR_EQUAL, GREATER_OR_EQUAL,
/** greater than */ /** greater than */
GREATER, GREATER,
/** no operation */
NO_OP,
} }
protected CompareOp compareOp; protected CompareOp compareOp;
@ -95,28 +97,12 @@ public abstract class CompareFilter implements Filter {
return comparator; return comparator;
} }
public void reset() {
}
public ReturnCode filterKeyValue(KeyValue v) {
return ReturnCode.INCLUDE;
}
public boolean filterRowKey(byte[] data, int offset, int length) {
return false;
}
public boolean filterRow() {
return false;
}
public boolean filterAllRemaining() {
return false;
}
protected boolean doCompare(final CompareOp compareOp, protected boolean doCompare(final CompareOp compareOp,
final WritableByteArrayComparable comparator, final byte [] data, final WritableByteArrayComparable comparator, final byte [] data,
final int offset, final int length) { final int offset, final int length) {
if (compareOp == CompareOp.NO_OP) {
return true;
}
int compareResult = int compareResult =
comparator.compareTo(Arrays.copyOfRange(data, offset, comparator.compareTo(Arrays.copyOfRange(data, offset,
offset + length)); offset + length));

View File

@ -0,0 +1,163 @@
package org.apache.hadoop.hbase.filter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
/**
* A filter for adding inter-column timestamp matching
* Only cells with a correspondingly timestamped entry in
* the target column will be retained
* Not compatible with Scan.setBatch as operations need
* full rows for correct filtering
*/
public class DependentColumnFilter extends CompareFilter {
protected byte[] columnFamily;
protected byte[] columnQualifier;
protected boolean dropDependentColumn;
protected Set<Long> stampSet = new HashSet<Long>();
/**
* Should only be used for writable
*/
public DependentColumnFilter() {
}
/**
* Build a dependent column filter with value checking
* dependent column varies will be compared using the supplied
* compareOp and comparator, for usage of which
* refer to {@link CompareFilter}
*
* @param family dependent column family
* @param qualifier dependent column qualifier
* @param dropDependentColumn whether the column should be discarded after
* @param valueCompareOp comparison op
* @param valueComparator comparator
*/
public DependentColumnFilter(final byte [] family, final byte[] qualifier,
final boolean dropDependentColumn, final CompareOp valueCompareOp,
final WritableByteArrayComparable valueComparator) {
// set up the comparator
super(valueCompareOp, valueComparator);
this.columnFamily = family;
this.columnQualifier = qualifier;
this.dropDependentColumn = dropDependentColumn;
}
/**
* Constructor for DependentColumn filter.
* Keyvalues where a keyvalue from target column
* with the same timestamp do not exist will be dropped.
*
* @param family name of target column family
* @param qualifier name of column qualifier
*/
public DependentColumnFilter(final byte [] family, final byte [] qualifier) {
this(family, qualifier, false);
}
/**
* Constructor for DependentColumn filter.
* Keyvalues where a keyvalue from target column
* with the same timestamp do not exist will be dropped.
*
* @param family name of dependent column family
* @param qualifier name of dependent qualifier
* @param dropDependentColumn whether the dependent columns keyvalues should be discarded
*/
public DependentColumnFilter(final byte [] family, final byte [] qualifier,
final boolean dropDependentColumn) {
this(family, qualifier, dropDependentColumn, CompareOp.NO_OP, null);
}
@Override
public boolean filterAllRemaining() {
return false;
}
@Override
public ReturnCode filterKeyValue(KeyValue v) {
// Check if the column and qualifier match
if (!v.matchingColumn(this.columnFamily, this.columnQualifier)) {
// include non-matches for the time being, they'll be discarded afterwards
return ReturnCode.INCLUDE;
}
// If it doesn't pass the op, skip it
if(comparator != null && doCompare(compareOp, comparator, v.getValue(), 0, v.getValueLength()))
return ReturnCode.SKIP;
stampSet.add(v.getTimestamp());
if(dropDependentColumn) {
return ReturnCode.SKIP;
}
return ReturnCode.INCLUDE;
}
@Override
public void filterRow(List<KeyValue> kvs) {
Iterator<KeyValue> it = kvs.iterator();
KeyValue kv;
while(it.hasNext()) {
kv = it.next();
if(!stampSet.contains(kv.getTimestamp())) {
it.remove();
}
}
}
@Override
public boolean hasFilterRow() {
return true;
}
@Override
public boolean filterRow() {
return false;
}
@Override
public boolean filterRowKey(byte[] buffer, int offset, int length) {
return false;
}
@Override
public void reset() {
stampSet.clear();
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
this.columnFamily = Bytes.readByteArray(in);
if(this.columnFamily.length == 0) {
this.columnFamily = null;
}
this.columnQualifier = Bytes.readByteArray(in);
if(this.columnQualifier.length == 0) {
this.columnQualifier = null;
}
this.dropDependentColumn = in.readBoolean();
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
Bytes.writeByteArray(out, this.columnFamily);
Bytes.writeByteArray(out, this.columnQualifier);
out.writeBoolean(this.dropDependentColumn);
}
}

View File

@ -23,6 +23,8 @@ package org.apache.hadoop.hbase.filter;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import java.util.List;
/** /**
* Interface for row and column filters directly applied within the regionserver. * Interface for row and column filters directly applied within the regionserver.
* A filter can expect the following call sequence: * A filter can expect the following call sequence:
@ -32,6 +34,7 @@ import org.apache.hadoop.io.Writable;
* <li>{@link #filterRowKey(byte[],int,int)} -> true to drop this row, * <li>{@link #filterRowKey(byte[],int,int)} -> true to drop this row,
* if false, we will also call</li> * if false, we will also call</li>
* <li>{@link #filterKeyValue(KeyValue)} -> true to drop this key/value</li> * <li>{@link #filterKeyValue(KeyValue)} -> true to drop this key/value</li>
* <li>{@link #filterRow(List)} -> allows directmodification of the final list to be submitted
* <li>{@link #filterRow()} -> last chance to drop entire row based on the sequence of * <li>{@link #filterRow()} -> last chance to drop entire row based on the sequence of
* filterValue() calls. Eg: filter a row if it doesn't contain a specified column. * filterValue() calls. Eg: filter a row if it doesn't contain a specified column.
* </li> * </li>
@ -39,6 +42,11 @@ import org.apache.hadoop.io.Writable;
* *
* Filter instances are created one per region/scan. This interface replaces * Filter instances are created one per region/scan. This interface replaces
* the old RowFilterInterface. * the old RowFilterInterface.
*
* When implementing your own filters, consider inheriting {@link FilterBase} to help
* you reduce boilerplate.
*
* @see FilterBase
*/ */
public interface Filter extends Writable { public interface Filter extends Writable {
/** /**
@ -100,6 +108,21 @@ public interface Filter extends Writable {
NEXT_ROW, NEXT_ROW,
} }
/**
* Chance to alter the list of keyvalues to be submitted.
* Modifications to the list will carry on
* @param kvs the list of keyvalues to be filtered
*/
public void filterRow(List<KeyValue> kvs);
/**
* Return whether or not this filter actively uses filterRow(List)
* Primarily used to check for conflicts with scans(such as scans
* that do not read a full row at a time)
* @return
*/
public boolean hasFilterRow();
/** /**
* Last chance to veto row based on previous {@link #filterKeyValue(KeyValue)} * Last chance to veto row based on previous {@link #filterKeyValue(KeyValue)}
* calls. The filter needs to retain state then return a particular value for * calls. The filter needs to retain state then return a particular value for
@ -108,4 +131,5 @@ public interface Filter extends Writable {
* @return true to exclude row, false to include row. * @return true to exclude row, false to include row.
*/ */
public boolean filterRow(); public boolean filterRow();
} }

View File

@ -0,0 +1,113 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.filter;
import org.apache.hadoop.hbase.KeyValue;
import java.util.List;
/**
* Abstract base class to help you implement new Filters. Common "ignore" or NOOP type
* methods can go here, helping to reduce boiler plate in an ever-expanding filter
* library.
*
* If you could instantiate FilterBase, it would end up being a "null" filter -
* that is one that never filters anything.
*
* @inheritDoc
*/
public abstract class FilterBase implements Filter {
/**
* Filters that are purely stateless and do nothing in their reset() methods can inherit
* this null/empty implementation.
*
* @inheritDoc
*/
@Override
public void reset() {
}
/**
* Filters that do not filter by row key can inherit this implementation that
* never filters anything. (ie: returns false).
*
* @inheritDoc
*/
@Override
public boolean filterRowKey(byte [] buffer, int offset, int length) {
return false;
}
/**
* Filters that never filter all remaining can inherit this implementation that
* never stops the filter early.
*
* @inheritDoc
*/
@Override
public boolean filterAllRemaining() {
return false;
}
/**
* Filters that dont filter by key value can inherit this implementation that
* includes all KeyValues.
*
* @inheritDoc
*/
@Override
public ReturnCode filterKeyValue(KeyValue ignored) {
return ReturnCode.INCLUDE;
}
/**
* Filters that never filter by modifying the returned List of KeyValues can
* inherit this implementation that does nothing.
*
* @inheritDoc
*/
@Override
public void filterRow(List<KeyValue> ignored) {
}
/**
* Fitlers that never filter by modifying the returned List of KeyValues can
* inherit this implementation that does nothing.
*
* @inheritDoc
*/
@Override
public boolean hasFilterRow() {
return false;
}
/**
* Filters that never filter by rows based on previously gathered state from
* @{link #filterKeyValue(KeyValue)} can inherit this implementation that
* never filters a row.
*
* @inheritDoc
*/
@Override
public boolean filterRow() {
return false;
}
}

View File

@ -118,12 +118,14 @@ public class FilterList implements Filter {
this.filters.add(filter); this.filters.add(filter);
} }
@Override
public void reset() { public void reset() {
for (Filter filter : filters) { for (Filter filter : filters) {
filter.reset(); filter.reset();
} }
} }
@Override
public boolean filterRowKey(byte[] rowKey, int offset, int length) { public boolean filterRowKey(byte[] rowKey, int offset, int length) {
for (Filter filter : filters) { for (Filter filter : filters) {
if (this.operator == Operator.MUST_PASS_ALL) { if (this.operator == Operator.MUST_PASS_ALL) {
@ -141,6 +143,7 @@ public class FilterList implements Filter {
return this.operator == Operator.MUST_PASS_ONE; return this.operator == Operator.MUST_PASS_ONE;
} }
@Override
public boolean filterAllRemaining() { public boolean filterAllRemaining() {
for (Filter filter : filters) { for (Filter filter : filters) {
if (filter.filterAllRemaining()) { if (filter.filterAllRemaining()) {
@ -156,6 +159,7 @@ public class FilterList implements Filter {
return operator == Operator.MUST_PASS_ONE; return operator == Operator.MUST_PASS_ONE;
} }
@Override
public ReturnCode filterKeyValue(KeyValue v) { public ReturnCode filterKeyValue(KeyValue v) {
for (Filter filter : filters) { for (Filter filter : filters) {
if (operator == Operator.MUST_PASS_ALL) { if (operator == Operator.MUST_PASS_ALL) {
@ -187,6 +191,24 @@ public class FilterList implements Filter {
ReturnCode.SKIP: ReturnCode.INCLUDE; ReturnCode.SKIP: ReturnCode.INCLUDE;
} }
@Override
public void filterRow(List<KeyValue> kvs) {
for (Filter filter : filters) {
filter.filterRow(kvs);
}
}
@Override
public boolean hasFilterRow() {
for (Filter filter : filters) {
if(filter.hasFilterRow()) {
return true;
}
}
return false;
}
@Override
public boolean filterRow() { public boolean filterRow() {
for (Filter filter : filters) { for (Filter filter : filters) {
if (operator == Operator.MUST_PASS_ALL) { if (operator == Operator.MUST_PASS_ALL) {

View File

@ -24,13 +24,14 @@ import org.apache.hadoop.hbase.KeyValue;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.io.DataInput; import java.io.DataInput;
import java.util.List;
/** /**
* A filter that will only return the first KV from each row. * A filter that will only return the first KV from each row.
* <p> * <p>
* This filter can be used to more efficiently perform row count operations. * This filter can be used to more efficiently perform row count operations.
*/ */
public class FirstKeyOnlyFilter implements Filter { public class FirstKeyOnlyFilter extends FilterBase {
private boolean foundKV = false; private boolean foundKV = false;
public FirstKeyOnlyFilter() { public FirstKeyOnlyFilter() {
@ -40,24 +41,12 @@ public class FirstKeyOnlyFilter implements Filter {
foundKV = false; foundKV = false;
} }
public boolean filterRowKey(byte[] buffer, int offset, int length) {
return false;
}
public boolean filterAllRemaining() {
return false;
}
public ReturnCode filterKeyValue(KeyValue v) { public ReturnCode filterKeyValue(KeyValue v) {
if(foundKV) return ReturnCode.NEXT_ROW; if(foundKV) return ReturnCode.NEXT_ROW;
foundKV = true; foundKV = true;
return ReturnCode.INCLUDE; return ReturnCode.INCLUDE;
} }
public boolean filterRow() {
return false;
}
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
} }

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.List;
/** /**
* A Filter that stops after the given row. There is no "RowStopFilter" because * A Filter that stops after the given row. There is no "RowStopFilter" because
@ -33,7 +34,7 @@ import java.io.IOException;
* *
* Use this filter to include the stop row, eg: [A,Z]. * Use this filter to include the stop row, eg: [A,Z].
*/ */
public class InclusiveStopFilter implements Filter { public class InclusiveStopFilter extends FilterBase {
private byte [] stopRowKey; private byte [] stopRowKey;
private boolean done = false; private boolean done = false;
@ -49,10 +50,6 @@ public class InclusiveStopFilter implements Filter {
return this.stopRowKey; return this.stopRowKey;
} }
public void reset() {
// noop, no state
}
public boolean filterRowKey(byte[] buffer, int offset, int length) { public boolean filterRowKey(byte[] buffer, int offset, int length) {
if (buffer == null) { if (buffer == null) {
//noinspection RedundantIfStatement //noinspection RedundantIfStatement
@ -75,15 +72,6 @@ public class InclusiveStopFilter implements Filter {
return done; return done;
} }
public ReturnCode filterKeyValue(KeyValue v) {
// include everything.
return ReturnCode.INCLUDE;
}
public boolean filterRow() {
return false;
}
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.stopRowKey); Bytes.writeByteArray(out, this.stopRowKey);
} }

View File

@ -0,0 +1,21 @@
package org.apache.hadoop.hbase.filter;
/**
* Used to indicate a filter incompatibility
*/
public class IncompatibleFilterException extends RuntimeException {
private static final long serialVersionUID = 3236763276623198231L;
/** constructor */
public IncompatibleFilterException() {
super();
}
/**
* constructor
* @param s message
*/
public IncompatibleFilterException(String s) {
super(s);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.KeyValue;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.List;
/** /**
* Implementation of Filter interface that limits results to a specific page * Implementation of Filter interface that limits results to a specific page
@ -36,7 +37,7 @@ import java.io.IOException;
* individual HRegions by making sure that the page size is never exceeded * individual HRegions by making sure that the page size is never exceeded
* locally. * locally.
*/ */
public class PageFilter implements Filter { public class PageFilter extends FilterBase {
private long pageSize = Long.MAX_VALUE; private long pageSize = Long.MAX_VALUE;
private int rowsAccepted = 0; private int rowsAccepted = 0;
@ -61,16 +62,13 @@ public class PageFilter implements Filter {
return pageSize; return pageSize;
} }
public void reset() {
// noop
}
public boolean filterAllRemaining() { public boolean filterAllRemaining() {
return this.rowsAccepted >= this.pageSize; return this.rowsAccepted >= this.pageSize;
} }
public boolean filterRowKey(byte[] rowKey, int offset, int length) { public boolean filterRow() {
return false; this.rowsAccepted++;
return this.rowsAccepted > this.pageSize;
} }
public void readFields(final DataInput in) throws IOException { public void readFields(final DataInput in) throws IOException {
@ -80,13 +78,4 @@ public class PageFilter implements Filter {
public void write(final DataOutput out) throws IOException { public void write(final DataOutput out) throws IOException {
out.writeLong(pageSize); out.writeLong(pageSize);
} }
public ReturnCode filterKeyValue(KeyValue v) {
return ReturnCode.INCLUDE;
}
public boolean filterRow() {
this.rowsAccepted++;
return this.rowsAccepted > this.pageSize;
}
} }

View File

@ -26,11 +26,12 @@ import org.apache.hadoop.hbase.util.Bytes;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.io.DataInput; import java.io.DataInput;
import java.util.List;
/** /**
* Pass results that have same row prefix. * Pass results that have same row prefix.
*/ */
public class PrefixFilter implements Filter { public class PrefixFilter extends FilterBase {
protected byte [] prefix = null; protected byte [] prefix = null;
protected boolean passedPrefix = false; protected boolean passedPrefix = false;
@ -46,10 +47,6 @@ public class PrefixFilter implements Filter {
return prefix; return prefix;
} }
public void reset() {
// Noop
}
public boolean filterRowKey(byte[] buffer, int offset, int length) { public boolean filterRowKey(byte[] buffer, int offset, int length) {
if (buffer == null || this.prefix == null) if (buffer == null || this.prefix == null)
return true; return true;
@ -70,14 +67,6 @@ public class PrefixFilter implements Filter {
return passedPrefix; return passedPrefix;
} }
public ReturnCode filterKeyValue(KeyValue v) {
return ReturnCode.INCLUDE;
}
public boolean filterRow() {
return false;
}
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.prefix); Bytes.writeByteArray(out, this.prefix);
} }

View File

@ -23,6 +23,8 @@ package org.apache.hadoop.hbase.filter;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import java.util.List;
/** /**
* This filter is used to filter based on the key. It takes an operator * This filter is used to filter based on the key. It takes an operator
* (equal, greater, not equal, etc) and a byte [] comparator for the row, * (equal, greater, not equal, etc) and a byte [] comparator for the row,

View File

@ -32,6 +32,7 @@ import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
/** /**
* This filter is used to filter cells based on value. It takes a {@link CompareFilter.CompareOp} * This filter is used to filter cells based on value. It takes a {@link CompareFilter.CompareOp}
@ -60,7 +61,7 @@ import java.util.Arrays;
* <p> * <p>
* To filter based on the value of all scanned columns, use {@link ValueFilter}. * To filter based on the value of all scanned columns, use {@link ValueFilter}.
*/ */
public class SingleColumnValueFilter implements Filter { public class SingleColumnValueFilter extends FilterBase {
static final Log LOG = LogFactory.getLog(SingleColumnValueFilter.class); static final Log LOG = LogFactory.getLog(SingleColumnValueFilter.class);
protected byte [] columnFamily; protected byte [] columnFamily;
@ -144,12 +145,6 @@ public class SingleColumnValueFilter implements Filter {
return columnQualifier; return columnQualifier;
} }
public boolean filterRowKey(byte[] rowKey, int offset, int length) {
// We don't filter on the row key... we filter later on column value so
// always return false.
return false;
}
public ReturnCode filterKeyValue(KeyValue keyValue) { public ReturnCode filterKeyValue(KeyValue keyValue) {
// System.out.println("REMOVE KEY=" + keyValue.toString() + ", value=" + Bytes.toString(keyValue.getValue())); // System.out.println("REMOVE KEY=" + keyValue.toString() + ", value=" + Bytes.toString(keyValue.getValue()));
if (this.matchedColumn) { if (this.matchedColumn) {
@ -195,10 +190,6 @@ public class SingleColumnValueFilter implements Filter {
} }
} }
public boolean filterAllRemaining() {
return false;
}
public boolean filterRow() { public boolean filterRow() {
// If column was found, return false if it was matched, true if it was not // If column was found, return false if it was matched, true if it was not
// If column not found, return true if we filter if missing, false if not // If column not found, return true if we filter if missing, false if not

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.KeyValue;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.List;
/** /**
* A wrapper filter that filters an entire row if any of the KeyValue checks do * A wrapper filter that filters an entire row if any of the KeyValue checks do
@ -44,7 +45,7 @@ import java.io.IOException;
* Without this filter, the other non-zero valued columns in the row would still * Without this filter, the other non-zero valued columns in the row would still
* be emitted. * be emitted.
*/ */
public class SkipFilter implements Filter { public class SkipFilter extends FilterBase {
private boolean filterRow = false; private boolean filterRow = false;
private Filter filter; private Filter filter;
@ -69,14 +70,6 @@ public class SkipFilter implements Filter {
filterRow = filterRow || value; filterRow = filterRow || value;
} }
public boolean filterRowKey(byte[] buffer, int offset, int length) {
return false;
}
public boolean filterAllRemaining() {
return false;
}
public ReturnCode filterKeyValue(KeyValue v) { public ReturnCode filterKeyValue(KeyValue v) {
ReturnCode c = filter.filterKeyValue(v); ReturnCode c = filter.filterKeyValue(v);
changeFR(c != ReturnCode.INCLUDE); changeFR(c != ReturnCode.INCLUDE);

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.KeyValue;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.List;
/** /**
* A wrapper filter that returns true from {@link #filterAllRemaining()} as soon * A wrapper filter that returns true from {@link #filterAllRemaining()} as soon
@ -34,7 +35,7 @@ import java.io.IOException;
* {@link org.apache.hadoop.hbase.filter.Filter#filterAllRemaining()} methods * {@link org.apache.hadoop.hbase.filter.Filter#filterAllRemaining()} methods
* returns true. * returns true.
*/ */
public class WhileMatchFilter implements Filter { public class WhileMatchFilter extends FilterBase {
private boolean filterAllRemaining = false; private boolean filterAllRemaining = false;
private Filter filter; private Filter filter;

View File

@ -158,6 +158,7 @@ public class HbaseObjectWritable implements Writable, Configurable {
addToMap(SkipFilter.class, code++); addToMap(SkipFilter.class, code++);
addToMap(WritableByteArrayComparable.class, code++); addToMap(WritableByteArrayComparable.class, code++);
addToMap(FirstKeyOnlyFilter.class, code++); addToMap(FirstKeyOnlyFilter.class, code++);
addToMap(DependentColumnFilter.class, code++);
addToMap(Delete [].class, code++); addToMap(Delete [].class, code++);

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.Reference.Range; import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCache;
@ -1996,9 +1997,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
results.clear(); results.clear();
boolean returnResult = nextInternal(limit); boolean returnResult = nextInternal(limit);
if (!returnResult && filter != null && filter.filterRow()) {
results.clear();
}
outResults.addAll(results); outResults.addAll(results);
resetFilters(); resetFilters();
if (isFilterDone()) { if (isFilterDone()) {
@ -2024,6 +2023,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
while (true) { while (true) {
byte [] currentRow = peekRow(); byte [] currentRow = peekRow();
if (isStopRow(currentRow)) { if (isStopRow(currentRow)) {
if (filter != null && filter.hasFilterRow()) {
filter.filterRow(results);
}
if (filter != null && filter.filterRow()) {
results.clear();
}
return false; return false;
} else if (filterRowKey(currentRow)) { } else if (filterRowKey(currentRow)) {
nextRow(currentRow); nextRow(currentRow);
@ -2032,19 +2038,33 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
do { do {
this.storeHeap.next(results, limit); this.storeHeap.next(results, limit);
if (limit > 0 && results.size() == limit) { if (limit > 0 && results.size() == limit) {
return true; if (this.filter != null && filter.hasFilterRow()) throw new IncompatibleFilterException(
"Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
return true; // we are expecting more yes, but also limited to how many we can return.
} }
} while (Bytes.equals(currentRow, nextRow = peekRow())); } while (Bytes.equals(currentRow, nextRow = peekRow()));
final boolean stopRow = isStopRow(nextRow); final boolean stopRow = isStopRow(nextRow);
if (!stopRow && (results.isEmpty() || filterRow())) {
// now that we have an entire row, lets process with a filters:
// first filter with the filterRow(List)
if (filter != null && filter.hasFilterRow()) {
filter.filterRow(results);
}
if (results.isEmpty() || filterRow()) {
// this seems like a redundant step - we already consumed the row // this seems like a redundant step - we already consumed the row
// there're no left overs. // there're no left overs.
// the reasons for calling this method are: // the reasons for calling this method are:
// 1. reset the filters. // 1. reset the filters.
// 2. provide a hook to fast forward the row (used by subclasses) // 2. provide a hook to fast forward the row (used by subclasses)
nextRow(currentRow); nextRow(currentRow);
continue;
// This row was totally filtered out, if this is NOT the last row,
// we should continue on.
if (!stopRow) continue;
} }
return !stopRow; return !stopRow;
} }

View File

@ -0,0 +1,245 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.filter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import junit.framework.TestCase;
public class TestDependentColumnFilter extends TestCase {
private final Log LOG = LogFactory.getLog(this.getClass());
private static final byte[][] ROWS = {
Bytes.toBytes("test1"),Bytes.toBytes("test2")
};
private static final byte[][] FAMILIES = {
Bytes.toBytes("familyOne"),Bytes.toBytes("familyTwo")
};
private static final long STAMP_BASE = System.currentTimeMillis();
private static final long[] STAMPS = {
STAMP_BASE-100, STAMP_BASE-200, STAMP_BASE-300
};
private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
private static final byte[][] BAD_VALS = {
Bytes.toBytes("bad1"), Bytes.toBytes("bad2"), Bytes.toBytes("bad3")
};
private static final byte[] MATCH_VAL = Bytes.toBytes("match");
private HBaseTestingUtility testUtil;
List<KeyValue> testVals;
private HRegion region;
@Override
protected void setUp() throws Exception {
super.setUp();
testUtil = new HBaseTestingUtility();
testVals = makeTestVals();
HTableDescriptor htd = new HTableDescriptor(getName());
htd.addFamily(new HColumnDescriptor(FAMILIES[0]));
htd.addFamily(new HColumnDescriptor(FAMILIES[1]));
HRegionInfo info = new HRegionInfo(htd, null, null, false);
this.region = HRegion.createHRegion(info, testUtil.getTestDir(), testUtil.getConfiguration());
addData();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
this.region.close();
}
private void addData() throws IOException {
Put put = new Put(ROWS[0]);
// add in an entry for each stamp, with 2 as a "good" value
put.add(FAMILIES[0], QUALIFIER, STAMPS[0], BAD_VALS[0]);
put.add(FAMILIES[0], QUALIFIER, STAMPS[1], BAD_VALS[1]);
put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL);
// add in entries for stamps 0 and 2.
// without a value check both will be "accepted"
// with one 2 will be accepted(since the corresponding ts entry
// has a matching value
put.add(FAMILIES[1], QUALIFIER, STAMPS[0], BAD_VALS[0]);
put.add(FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]);
this.region.put(put);
put = new Put(ROWS[1]);
put.add(FAMILIES[0], QUALIFIER, STAMPS[0], BAD_VALS[0]);
// there is no corresponding timestamp for this so it should never pass
put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL);
// if we reverse the qualifiers this one should pass
put.add(FAMILIES[1], QUALIFIER, STAMPS[0], MATCH_VAL);
// should pass
put.add(FAMILIES[1], QUALIFIER, STAMPS[1], BAD_VALS[2]);
this.region.put(put);
}
private List<KeyValue> makeTestVals() {
List<KeyValue> testVals = new ArrayList<KeyValue>();
testVals.add(new KeyValue(ROWS[0], FAMILIES[0], QUALIFIER, STAMPS[0], BAD_VALS[0]));
testVals.add(new KeyValue(ROWS[0], FAMILIES[0], QUALIFIER, STAMPS[1], BAD_VALS[1]));
testVals.add(new KeyValue(ROWS[0], FAMILIES[1], QUALIFIER, STAMPS[1], BAD_VALS[2]));
testVals.add(new KeyValue(ROWS[0], FAMILIES[1], QUALIFIER, STAMPS[0], MATCH_VAL));
testVals.add(new KeyValue(ROWS[0], FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]));
return testVals;
}
/**
* This shouldn't be confused with TestFilter#verifyScan
* as expectedKeys is not the per row total, but the scan total
*
* @param s
* @param expectedRows
* @param expectedCells
* @throws IOException
*/
private void verifyScan(Scan s, long expectedRows, long expectedCells)
throws IOException {
InternalScanner scanner = this.region.getScanner(s);
List<KeyValue> results = new ArrayList<KeyValue>();
int i = 0;
int cells = 0;
for (boolean done = true; done; i++) {
done = scanner.next(results);
Arrays.sort(results.toArray(new KeyValue[results.size()]),
KeyValue.COMPARATOR);
LOG.info("counter=" + i + ", " + results);
if (results.isEmpty()) break;
cells += results.size();
assertTrue("Scanned too many rows! Only expected " + expectedRows +
" total but already scanned " + (i+1), expectedRows > i);
assertTrue("Expected " + expectedCells + " cells total but " +
"already scanned " + cells, expectedCells >= cells);
results.clear();
}
assertEquals("Expected " + expectedRows + " rows but scanned " + i +
" rows", expectedRows, i);
assertEquals("Expected " + expectedCells + " cells but scanned " + cells +
" cells", expectedCells, cells);
}
/**
* Test scans using a DependentColumnFilter
*/
public void testScans() throws Exception {
Filter filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER);
Scan scan = new Scan();
scan.setFilter(filter);
scan.setMaxVersions(Integer.MAX_VALUE);
verifyScan(scan, 2, 8);
// drop the filtering cells
filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER, true);
scan = new Scan();
scan.setFilter(filter);
scan.setMaxVersions(Integer.MAX_VALUE);
verifyScan(scan, 2, 3);
// include a comparator operation
filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER, false,
CompareOp.EQUAL, new BinaryComparator(MATCH_VAL));
scan = new Scan();
scan.setFilter(filter);
scan.setMaxVersions(Integer.MAX_VALUE);
/*
* expecting to get the following 3 cells
* row 0
* put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL);
* put.add(FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]);
* row 1
* put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL);
*/
verifyScan(scan, 2, 3);
// include a comparator operation and drop comparator
filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER, true,
CompareOp.EQUAL, new BinaryComparator(MATCH_VAL));
scan = new Scan();
scan.setFilter(filter);
scan.setMaxVersions(Integer.MAX_VALUE);
/*
* expecting to get the following 1 cell
* row 0
* put.add(FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]);
*/
verifyScan(scan, 1, 1);
}
/**
* Test that the filter correctly drops rows without a corresponding timestamp
*
* @throws Exception
*/
public void testFilterDropping() throws Exception {
Filter filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER);
List<KeyValue> accepted = new ArrayList<KeyValue>();
for(KeyValue val : testVals) {
if(filter.filterKeyValue(val) == ReturnCode.INCLUDE) {
accepted.add(val);
}
}
assertEquals("check all values accepted from filterKeyValue", 5, accepted.size());
filter.filterRow(accepted);
assertEquals("check filterRow(List<KeyValue>) dropped cell without corresponding column entry", 4, accepted.size());
// start do it again with dependent column dropping on
filter = new DependentColumnFilter(FAMILIES[1], QUALIFIER, true);
accepted.clear();
for(KeyValue val : testVals) {
if(filter.filterKeyValue(val) == ReturnCode.INCLUDE) {
accepted.add(val);
}
}
assertEquals("check the filtering column cells got dropped", 2, accepted.size());
filter.filterRow(accepted);
assertEquals("check cell retention", 2, accepted.size());
}
}