From c83d862620d2895e28179161b456235fd28a3a2b Mon Sep 17 00:00:00 2001 From: Ryan Rawson Date: Tue, 18 May 2010 21:37:46 +0000 Subject: [PATCH] HBASE-2466 Improving filter API to allow for modification of keyvalue list by filter git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@945902 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../org/apache/hadoop/hbase/client/Scan.java | 6 + .../hbase/filter/ColumnCountGetFilter.java | 19 +- .../hbase/filter/ColumnPaginationFilter.java | 25 +- .../hadoop/hbase/filter/CompareFilter.java | 26 +- .../hbase/filter/DependentColumnFilter.java | 163 ++++++++++++ .../apache/hadoop/hbase/filter/Filter.java | 24 ++ .../hadoop/hbase/filter/FilterBase.java | 113 ++++++++ .../hadoop/hbase/filter/FilterList.java | 22 ++ .../hbase/filter/FirstKeyOnlyFilter.java | 15 +- .../hbase/filter/InclusiveStopFilter.java | 16 +- .../filter/IncompatibleFilterException.java | 21 ++ .../hadoop/hbase/filter/PageFilter.java | 21 +- .../hadoop/hbase/filter/PrefixFilter.java | 15 +- .../apache/hadoop/hbase/filter/RowFilter.java | 2 + .../hbase/filter/SingleColumnValueFilter.java | 13 +- .../hadoop/hbase/filter/SkipFilter.java | 11 +- .../hadoop/hbase/filter/WhileMatchFilter.java | 3 +- .../hadoop/hbase/io/HbaseObjectWritable.java | 1 + .../hadoop/hbase/regionserver/HRegion.java | 50 ++-- .../filter/TestDependentColumnFilter.java | 245 ++++++++++++++++++ 21 files changed, 671 insertions(+), 142 deletions(-) create mode 100644 core/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java create mode 100644 core/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java create mode 100644 core/src/main/java/org/apache/hadoop/hbase/filter/IncompatibleFilterException.java create mode 100644 core/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java diff --git a/CHANGES.txt b/CHANGES.txt index c1aabbbda7e..e9f86dc8419 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -602,6 +602,8 @@ Release 0.21.0 - Unreleased HBASE-2520 Cleanup arrays vs Lists of scanners (Todd Lipcon via Stack) HBASE-2551 Forward port fixes that are in branch but not in trunk (part of the merge of old 0.20 into TRUNK task) + HBASE-2466 Improving filter API to allow for modification of keyvalue list + by filter (Juhani Connolly via Ryan) NEW FEATURES diff --git a/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java index cd74348af3c..29b3cb00844 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; @@ -280,6 +281,11 @@ public class Scan implements Writable { * @param batch the maximum number of values */ public void setBatch(int batch) { + if(this.hasFilter() && this.filter.hasFilterRow()) { + throw new IncompatibleFilterException( + "Cannot set batch on a scan using a filter" + + " that returns true for filter.hasFilterRow"); + } this.batch = batch; } diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java index e70fe6a3fa3..306ed2160aa 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one @@ -32,7 +32,7 @@ import java.io.IOException; * its quota of columns, {@link #filterAllRemaining()} returns true. This * makes this filter unsuitable as a Scan filter. */ -public class ColumnCountGetFilter implements Filter { +public class ColumnCountGetFilter extends FilterBase { private int limit = 0; private int count = 0; @@ -52,31 +52,28 @@ public class ColumnCountGetFilter implements Filter { return limit; } + @Override public boolean filterAllRemaining() { return this.count > this.limit; } + @Override public ReturnCode filterKeyValue(KeyValue v) { this.count++; - return filterAllRemaining()? ReturnCode.SKIP: ReturnCode.INCLUDE; - } - - public boolean filterRow() { - return false; - } - - public boolean filterRowKey(byte[] buffer, int offset, int length) { - return false; + return filterAllRemaining() ? ReturnCode.SKIP: ReturnCode.INCLUDE; } + @Override public void reset() { this.count = 0; } + @Override public void readFields(DataInput in) throws IOException { this.limit = in.readInt(); } + @Override public void write(DataOutput out) throws IOException { out.writeInt(this.limit); } diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java index d050bf7586b..aa30bbffb9b 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java @@ -1,5 +1,5 @@ -/** - * Copyright 2007 The Apache Software Foundation +/* + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.filter; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; import org.apache.hadoop.hbase.KeyValue; @@ -30,7 +31,7 @@ import org.apache.hadoop.hbase.KeyValue; * This filter can be used for row-based indexing, where references to other tables are stored across many columns, * in order to efficient lookups and paginated results for end users. */ -public class ColumnPaginationFilter implements Filter +public class ColumnPaginationFilter extends FilterBase { private int limit = 0; private int offset = 0; @@ -50,11 +51,7 @@ public class ColumnPaginationFilter implements Filter this.offset = offset; } - public boolean filterAllRemaining() - { - return false; - } - + @Override public ReturnCode filterKeyValue(KeyValue v) { if(count >= offset + limit) @@ -67,17 +64,7 @@ public class ColumnPaginationFilter implements Filter return code; } - public boolean filterRow() - { - this.count = 0; - return false; - } - - public boolean filterRowKey(byte[] buffer, int offset, int length) - { - return false; - } - + @Override public void reset() { this.count = 0; diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java index 46cf8225110..6d734396fe2 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java @@ -43,7 +43,7 @@ import java.util.Arrays; *

* Multiple filters can be combined using {@link FilterList}. */ -public abstract class CompareFilter implements Filter { +public abstract class CompareFilter extends FilterBase { /** Comparison operators. */ public enum CompareOp { @@ -59,6 +59,8 @@ public abstract class CompareFilter implements Filter { GREATER_OR_EQUAL, /** greater than */ GREATER, + /** no operation */ + NO_OP, } protected CompareOp compareOp; @@ -95,28 +97,12 @@ public abstract class CompareFilter implements Filter { return comparator; } - public void reset() { - } - - public ReturnCode filterKeyValue(KeyValue v) { - return ReturnCode.INCLUDE; - } - - public boolean filterRowKey(byte[] data, int offset, int length) { - return false; - } - - public boolean filterRow() { - return false; - } - - public boolean filterAllRemaining() { - return false; - } - protected boolean doCompare(final CompareOp compareOp, final WritableByteArrayComparable comparator, final byte [] data, final int offset, final int length) { + if (compareOp == CompareOp.NO_OP) { + return true; + } int compareResult = comparator.compareTo(Arrays.copyOfRange(data, offset, offset + length)); diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java new file mode 100644 index 00000000000..feba2ac2325 --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java @@ -0,0 +1,163 @@ +package org.apache.hadoop.hbase.filter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A filter for adding inter-column timestamp matching + * Only cells with a correspondingly timestamped entry in + * the target column will be retained + * Not compatible with Scan.setBatch as operations need + * full rows for correct filtering + */ +public class DependentColumnFilter extends CompareFilter { + + protected byte[] columnFamily; + protected byte[] columnQualifier; + protected boolean dropDependentColumn; + + protected Set stampSet = new HashSet(); + + /** + * Should only be used for writable + */ + public DependentColumnFilter() { + } + + /** + * Build a dependent column filter with value checking + * dependent column varies will be compared using the supplied + * compareOp and comparator, for usage of which + * refer to {@link CompareFilter} + * + * @param family dependent column family + * @param qualifier dependent column qualifier + * @param dropDependentColumn whether the column should be discarded after + * @param valueCompareOp comparison op + * @param valueComparator comparator + */ + public DependentColumnFilter(final byte [] family, final byte[] qualifier, + final boolean dropDependentColumn, final CompareOp valueCompareOp, + final WritableByteArrayComparable valueComparator) { + // set up the comparator + super(valueCompareOp, valueComparator); + this.columnFamily = family; + this.columnQualifier = qualifier; + this.dropDependentColumn = dropDependentColumn; + } + + /** + * Constructor for DependentColumn filter. + * Keyvalues where a keyvalue from target column + * with the same timestamp do not exist will be dropped. + * + * @param family name of target column family + * @param qualifier name of column qualifier + */ + public DependentColumnFilter(final byte [] family, final byte [] qualifier) { + this(family, qualifier, false); + } + + /** + * Constructor for DependentColumn filter. + * Keyvalues where a keyvalue from target column + * with the same timestamp do not exist will be dropped. + * + * @param family name of dependent column family + * @param qualifier name of dependent qualifier + * @param dropDependentColumn whether the dependent columns keyvalues should be discarded + */ + public DependentColumnFilter(final byte [] family, final byte [] qualifier, + final boolean dropDependentColumn) { + this(family, qualifier, dropDependentColumn, CompareOp.NO_OP, null); + } + + + @Override + public boolean filterAllRemaining() { + return false; + } + + @Override + public ReturnCode filterKeyValue(KeyValue v) { + // Check if the column and qualifier match + if (!v.matchingColumn(this.columnFamily, this.columnQualifier)) { + // include non-matches for the time being, they'll be discarded afterwards + return ReturnCode.INCLUDE; + } + // If it doesn't pass the op, skip it + if(comparator != null && doCompare(compareOp, comparator, v.getValue(), 0, v.getValueLength())) + return ReturnCode.SKIP; + + stampSet.add(v.getTimestamp()); + if(dropDependentColumn) { + return ReturnCode.SKIP; + } + return ReturnCode.INCLUDE; + } + + @Override + public void filterRow(List kvs) { + Iterator it = kvs.iterator(); + KeyValue kv; + while(it.hasNext()) { + kv = it.next(); + if(!stampSet.contains(kv.getTimestamp())) { + it.remove(); + } + } + } + + @Override + public boolean hasFilterRow() { + return true; + } + + @Override + public boolean filterRow() { + return false; + } + + @Override + public boolean filterRowKey(byte[] buffer, int offset, int length) { + return false; + } + + @Override + public void reset() { + stampSet.clear(); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.columnFamily = Bytes.readByteArray(in); + if(this.columnFamily.length == 0) { + this.columnFamily = null; + } + + this.columnQualifier = Bytes.readByteArray(in); + if(this.columnQualifier.length == 0) { + this.columnQualifier = null; + } + + this.dropDependentColumn = in.readBoolean(); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Bytes.writeByteArray(out, this.columnFamily); + Bytes.writeByteArray(out, this.columnQualifier); + out.writeBoolean(this.dropDependentColumn); + } + +} diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/Filter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/Filter.java index c5cc7db39e8..f66514d67f8 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/Filter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/Filter.java @@ -23,6 +23,8 @@ package org.apache.hadoop.hbase.filter; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.io.Writable; +import java.util.List; + /** * Interface for row and column filters directly applied within the regionserver. * A filter can expect the following call sequence: @@ -32,6 +34,7 @@ import org.apache.hadoop.io.Writable; *

  • {@link #filterRowKey(byte[],int,int)} -> true to drop this row, * if false, we will also call
  • *
  • {@link #filterKeyValue(KeyValue)} -> true to drop this key/value
  • + *
  • {@link #filterRow(List)} -> allows directmodification of the final list to be submitted *
  • {@link #filterRow()} -> last chance to drop entire row based on the sequence of * filterValue() calls. Eg: filter a row if it doesn't contain a specified column. *
  • @@ -39,6 +42,11 @@ import org.apache.hadoop.io.Writable; * * Filter instances are created one per region/scan. This interface replaces * the old RowFilterInterface. + * + * When implementing your own filters, consider inheriting {@link FilterBase} to help + * you reduce boilerplate. + * + * @see FilterBase */ public interface Filter extends Writable { /** @@ -100,6 +108,21 @@ public interface Filter extends Writable { NEXT_ROW, } + /** + * Chance to alter the list of keyvalues to be submitted. + * Modifications to the list will carry on + * @param kvs the list of keyvalues to be filtered + */ + public void filterRow(List kvs); + + /** + * Return whether or not this filter actively uses filterRow(List) + * Primarily used to check for conflicts with scans(such as scans + * that do not read a full row at a time) + * @return + */ + public boolean hasFilterRow(); + /** * Last chance to veto row based on previous {@link #filterKeyValue(KeyValue)} * calls. The filter needs to retain state then return a particular value for @@ -108,4 +131,5 @@ public interface Filter extends Writable { * @return true to exclude row, false to include row. */ public boolean filterRow(); + } \ No newline at end of file diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java b/core/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java new file mode 100644 index 00000000000..4df8123820b --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java @@ -0,0 +1,113 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.filter; + +import org.apache.hadoop.hbase.KeyValue; + +import java.util.List; + +/** + * Abstract base class to help you implement new Filters. Common "ignore" or NOOP type + * methods can go here, helping to reduce boiler plate in an ever-expanding filter + * library. + * + * If you could instantiate FilterBase, it would end up being a "null" filter - + * that is one that never filters anything. + * + * @inheritDoc + */ +public abstract class FilterBase implements Filter { + + /** + * Filters that are purely stateless and do nothing in their reset() methods can inherit + * this null/empty implementation. + * + * @inheritDoc + */ + @Override + public void reset() { + } + + /** + * Filters that do not filter by row key can inherit this implementation that + * never filters anything. (ie: returns false). + * + * @inheritDoc + */ + @Override + public boolean filterRowKey(byte [] buffer, int offset, int length) { + return false; + } + + /** + * Filters that never filter all remaining can inherit this implementation that + * never stops the filter early. + * + * @inheritDoc + */ + @Override + public boolean filterAllRemaining() { + return false; + } + + /** + * Filters that dont filter by key value can inherit this implementation that + * includes all KeyValues. + * + * @inheritDoc + */ + @Override + public ReturnCode filterKeyValue(KeyValue ignored) { + return ReturnCode.INCLUDE; + } + + /** + * Filters that never filter by modifying the returned List of KeyValues can + * inherit this implementation that does nothing. + * + * @inheritDoc + */ + @Override + public void filterRow(List ignored) { + } + + /** + * Fitlers that never filter by modifying the returned List of KeyValues can + * inherit this implementation that does nothing. + * + * @inheritDoc + */ + @Override + public boolean hasFilterRow() { + return false; + } + + /** + * Filters that never filter by rows based on previously gathered state from + * @{link #filterKeyValue(KeyValue)} can inherit this implementation that + * never filters a row. + * + * @inheritDoc + */ + @Override + public boolean filterRow() { + return false; + } +} diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java b/core/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java index 6905e95f08b..e88901072c4 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java @@ -118,12 +118,14 @@ public class FilterList implements Filter { this.filters.add(filter); } + @Override public void reset() { for (Filter filter : filters) { filter.reset(); } } + @Override public boolean filterRowKey(byte[] rowKey, int offset, int length) { for (Filter filter : filters) { if (this.operator == Operator.MUST_PASS_ALL) { @@ -141,6 +143,7 @@ public class FilterList implements Filter { return this.operator == Operator.MUST_PASS_ONE; } + @Override public boolean filterAllRemaining() { for (Filter filter : filters) { if (filter.filterAllRemaining()) { @@ -156,6 +159,7 @@ public class FilterList implements Filter { return operator == Operator.MUST_PASS_ONE; } + @Override public ReturnCode filterKeyValue(KeyValue v) { for (Filter filter : filters) { if (operator == Operator.MUST_PASS_ALL) { @@ -187,6 +191,24 @@ public class FilterList implements Filter { ReturnCode.SKIP: ReturnCode.INCLUDE; } + @Override + public void filterRow(List kvs) { + for (Filter filter : filters) { + filter.filterRow(kvs); + } + } + + @Override + public boolean hasFilterRow() { + for (Filter filter : filters) { + if(filter.hasFilterRow()) { + return true; + } + } + return false; + } + + @Override public boolean filterRow() { for (Filter filter : filters) { if (operator == Operator.MUST_PASS_ALL) { diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java index ddbe7c3006e..36170bfa360 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java @@ -24,13 +24,14 @@ import org.apache.hadoop.hbase.KeyValue; import java.io.DataOutput; import java.io.IOException; import java.io.DataInput; +import java.util.List; /** * A filter that will only return the first KV from each row. *

    * This filter can be used to more efficiently perform row count operations. */ -public class FirstKeyOnlyFilter implements Filter { +public class FirstKeyOnlyFilter extends FilterBase { private boolean foundKV = false; public FirstKeyOnlyFilter() { @@ -40,24 +41,12 @@ public class FirstKeyOnlyFilter implements Filter { foundKV = false; } - public boolean filterRowKey(byte[] buffer, int offset, int length) { - return false; - } - - public boolean filterAllRemaining() { - return false; - } - public ReturnCode filterKeyValue(KeyValue v) { if(foundKV) return ReturnCode.NEXT_ROW; foundKV = true; return ReturnCode.INCLUDE; } - public boolean filterRow() { - return false; - } - public void write(DataOutput out) throws IOException { } diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java index 7e7fccef6bd..6148f3538cd 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.util.Bytes; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; /** * A Filter that stops after the given row. There is no "RowStopFilter" because @@ -33,7 +34,7 @@ import java.io.IOException; * * Use this filter to include the stop row, eg: [A,Z]. */ -public class InclusiveStopFilter implements Filter { +public class InclusiveStopFilter extends FilterBase { private byte [] stopRowKey; private boolean done = false; @@ -49,10 +50,6 @@ public class InclusiveStopFilter implements Filter { return this.stopRowKey; } - public void reset() { - // noop, no state - } - public boolean filterRowKey(byte[] buffer, int offset, int length) { if (buffer == null) { //noinspection RedundantIfStatement @@ -75,15 +72,6 @@ public class InclusiveStopFilter implements Filter { return done; } - public ReturnCode filterKeyValue(KeyValue v) { - // include everything. - return ReturnCode.INCLUDE; - } - - public boolean filterRow() { - return false; - } - public void write(DataOutput out) throws IOException { Bytes.writeByteArray(out, this.stopRowKey); } diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/IncompatibleFilterException.java b/core/src/main/java/org/apache/hadoop/hbase/filter/IncompatibleFilterException.java new file mode 100644 index 00000000000..84c14dad9f0 --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/IncompatibleFilterException.java @@ -0,0 +1,21 @@ +package org.apache.hadoop.hbase.filter; + +/** + * Used to indicate a filter incompatibility + */ +public class IncompatibleFilterException extends RuntimeException { + private static final long serialVersionUID = 3236763276623198231L; + +/** constructor */ + public IncompatibleFilterException() { + super(); + } + + /** + * constructor + * @param s message + */ + public IncompatibleFilterException(String s) { + super(s); + } +} diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/PageFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/PageFilter.java index d10bd4f0de3..b5e4dd34677 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/PageFilter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/PageFilter.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.KeyValue; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; /** * Implementation of Filter interface that limits results to a specific page @@ -36,7 +37,7 @@ import java.io.IOException; * individual HRegions by making sure that the page size is never exceeded * locally. */ -public class PageFilter implements Filter { +public class PageFilter extends FilterBase { private long pageSize = Long.MAX_VALUE; private int rowsAccepted = 0; @@ -61,16 +62,13 @@ public class PageFilter implements Filter { return pageSize; } - public void reset() { - // noop - } - public boolean filterAllRemaining() { return this.rowsAccepted >= this.pageSize; } - public boolean filterRowKey(byte[] rowKey, int offset, int length) { - return false; + public boolean filterRow() { + this.rowsAccepted++; + return this.rowsAccepted > this.pageSize; } public void readFields(final DataInput in) throws IOException { @@ -80,13 +78,4 @@ public class PageFilter implements Filter { public void write(final DataOutput out) throws IOException { out.writeLong(pageSize); } - - public ReturnCode filterKeyValue(KeyValue v) { - return ReturnCode.INCLUDE; - } - - public boolean filterRow() { - this.rowsAccepted++; - return this.rowsAccepted > this.pageSize; - } } \ No newline at end of file diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java index d233bf33694..063d06825a3 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java @@ -26,11 +26,12 @@ import org.apache.hadoop.hbase.util.Bytes; import java.io.DataOutput; import java.io.IOException; import java.io.DataInput; +import java.util.List; /** * Pass results that have same row prefix. */ -public class PrefixFilter implements Filter { +public class PrefixFilter extends FilterBase { protected byte [] prefix = null; protected boolean passedPrefix = false; @@ -46,10 +47,6 @@ public class PrefixFilter implements Filter { return prefix; } - public void reset() { - // Noop - } - public boolean filterRowKey(byte[] buffer, int offset, int length) { if (buffer == null || this.prefix == null) return true; @@ -70,14 +67,6 @@ public class PrefixFilter implements Filter { return passedPrefix; } - public ReturnCode filterKeyValue(KeyValue v) { - return ReturnCode.INCLUDE; - } - - public boolean filterRow() { - return false; - } - public void write(DataOutput out) throws IOException { Bytes.writeByteArray(out, this.prefix); } diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/RowFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/RowFilter.java index d1f8ba0b9d7..9d9d0a4dbf5 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/RowFilter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/RowFilter.java @@ -23,6 +23,8 @@ package org.apache.hadoop.hbase.filter; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; +import java.util.List; + /** * This filter is used to filter based on the key. It takes an operator * (equal, greater, not equal, etc) and a byte [] comparator for the row, diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java index fd0b28e3d6f..24ea37f8eea 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java @@ -32,6 +32,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Arrays; +import java.util.List; /** * This filter is used to filter cells based on value. It takes a {@link CompareFilter.CompareOp} @@ -60,7 +61,7 @@ import java.util.Arrays; *

    * To filter based on the value of all scanned columns, use {@link ValueFilter}. */ -public class SingleColumnValueFilter implements Filter { +public class SingleColumnValueFilter extends FilterBase { static final Log LOG = LogFactory.getLog(SingleColumnValueFilter.class); protected byte [] columnFamily; @@ -144,12 +145,6 @@ public class SingleColumnValueFilter implements Filter { return columnQualifier; } - public boolean filterRowKey(byte[] rowKey, int offset, int length) { - // We don't filter on the row key... we filter later on column value so - // always return false. - return false; - } - public ReturnCode filterKeyValue(KeyValue keyValue) { // System.out.println("REMOVE KEY=" + keyValue.toString() + ", value=" + Bytes.toString(keyValue.getValue())); if (this.matchedColumn) { @@ -195,10 +190,6 @@ public class SingleColumnValueFilter implements Filter { } } - public boolean filterAllRemaining() { - return false; - } - public boolean filterRow() { // If column was found, return false if it was matched, true if it was not // If column not found, return true if we filter if missing, false if not diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java index e46b09bacc6..72a91bbf97a 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.KeyValue; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; /** * A wrapper filter that filters an entire row if any of the KeyValue checks do @@ -44,7 +45,7 @@ import java.io.IOException; * Without this filter, the other non-zero valued columns in the row would still * be emitted. */ -public class SkipFilter implements Filter { +public class SkipFilter extends FilterBase { private boolean filterRow = false; private Filter filter; @@ -69,14 +70,6 @@ public class SkipFilter implements Filter { filterRow = filterRow || value; } - public boolean filterRowKey(byte[] buffer, int offset, int length) { - return false; - } - - public boolean filterAllRemaining() { - return false; - } - public ReturnCode filterKeyValue(KeyValue v) { ReturnCode c = filter.filterKeyValue(v); changeFR(c != ReturnCode.INCLUDE); diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java index 7c1f404bbd6..bad3c68ad9b 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.KeyValue; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; /** * A wrapper filter that returns true from {@link #filterAllRemaining()} as soon @@ -34,7 +35,7 @@ import java.io.IOException; * {@link org.apache.hadoop.hbase.filter.Filter#filterAllRemaining()} methods * returns true. */ -public class WhileMatchFilter implements Filter { +public class WhileMatchFilter extends FilterBase { private boolean filterAllRemaining = false; private Filter filter; diff --git a/core/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java b/core/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java index 3cc5e0ff9e3..85fde3a0596 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java +++ b/core/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java @@ -158,6 +158,7 @@ public class HbaseObjectWritable implements Writable, Configurable { addToMap(SkipFilter.class, code++); addToMap(WritableByteArrayComparable.class, code++); addToMap(FirstKeyOnlyFilter.class, code++); + addToMap(DependentColumnFilter.class, code++); addToMap(Delete [].class, code++); diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 00fafc99d60..038a3357395 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.Reference.Range; import org.apache.hadoop.hbase.io.hfile.BlockCache; @@ -304,7 +305,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ /** * Initialize this region and get it ready to roll. * Called after construction. - * + * * @param initialFiles path * @param reporter progressable * @throws IOException e @@ -448,7 +449,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ public ReadWriteConsistencyControl getRWCC() { return rwcc; } - + /** * Close down this HRegion. Flush the cache, shut down each HStore, don't * service any more calls. @@ -459,7 +460,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * @return Vector of all the storage files that the HRegion's component * HStores make use of. It's a list of all HStoreFile objects. Returns empty * vector if already closed and null if judged that it should not close. - * + * * @throws IOException e */ public List close() throws IOException { @@ -477,7 +478,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * @return Vector of all the storage files that the HRegion's component * HStores make use of. It's a list of HStoreFile objects. Can be null if * we are not to close at this time or we are already closed. - * + * * @throws IOException e */ public List close(final boolean abort) throws IOException { @@ -875,7 +876,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * time-sensitive thread. * * @return true if cache was flushed - * + * * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. @@ -941,7 +942,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ *

    This method may block for some time. * * @return true if the region needs compacting - * + * * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. @@ -1051,7 +1052,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } storeFlushers.clear(); - + // Set down the memstore size by amount of flush. this.memstoreSize.addAndGet(-currentMemStoreSize); } catch (Throwable t) { @@ -1660,7 +1661,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ this.log.append(regionInfo, regionInfo.getTableDesc().getName(), walEdit, now); } - + long size = 0; w = rwcc.beginMemstoreInsert(); @@ -1996,9 +1997,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ results.clear(); boolean returnResult = nextInternal(limit); - if (!returnResult && filter != null && filter.filterRow()) { - results.clear(); - } + outResults.addAll(results); resetFilters(); if (isFilterDone()) { @@ -2024,6 +2023,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ while (true) { byte [] currentRow = peekRow(); if (isStopRow(currentRow)) { + if (filter != null && filter.hasFilterRow()) { + filter.filterRow(results); + } + if (filter != null && filter.filterRow()) { + results.clear(); + } + return false; } else if (filterRowKey(currentRow)) { nextRow(currentRow); @@ -2032,19 +2038,33 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ do { this.storeHeap.next(results, limit); if (limit > 0 && results.size() == limit) { - return true; + if (this.filter != null && filter.hasFilterRow()) throw new IncompatibleFilterException( + "Filter with filterRow(List) incompatible with scan with limit!"); + return true; // we are expecting more yes, but also limited to how many we can return. } } while (Bytes.equals(currentRow, nextRow = peekRow())); final boolean stopRow = isStopRow(nextRow); - if (!stopRow && (results.isEmpty() || filterRow())) { + + // now that we have an entire row, lets process with a filters: + + // first filter with the filterRow(List) + if (filter != null && filter.hasFilterRow()) { + filter.filterRow(results); + } + + if (results.isEmpty() || filterRow()) { // this seems like a redundant step - we already consumed the row // there're no left overs. // the reasons for calling this method are: // 1. reset the filters. // 2. provide a hook to fast forward the row (used by subclasses) nextRow(currentRow); - continue; + + // This row was totally filtered out, if this is NOT the last row, + // we should continue on. + + if (!stopRow) continue; } return !stopRow; } @@ -2694,7 +2714,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( (5 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN + (21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); - + public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) + ClassSize.ATOMIC_LONG + ClassSize.ATOMIC_INTEGER + diff --git a/core/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java b/core/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java new file mode 100644 index 00000000000..04705c397e0 --- /dev/null +++ b/core/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java @@ -0,0 +1,245 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.filter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter.ReturnCode; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Bytes; + +import junit.framework.TestCase; + +public class TestDependentColumnFilter extends TestCase { + private final Log LOG = LogFactory.getLog(this.getClass()); + private static final byte[][] ROWS = { + Bytes.toBytes("test1"),Bytes.toBytes("test2") + }; + private static final byte[][] FAMILIES = { + Bytes.toBytes("familyOne"),Bytes.toBytes("familyTwo") + }; + private static final long STAMP_BASE = System.currentTimeMillis(); + private static final long[] STAMPS = { + STAMP_BASE-100, STAMP_BASE-200, STAMP_BASE-300 + }; + private static final byte[] QUALIFIER = Bytes.toBytes("qualifier"); + private static final byte[][] BAD_VALS = { + Bytes.toBytes("bad1"), Bytes.toBytes("bad2"), Bytes.toBytes("bad3") + }; + private static final byte[] MATCH_VAL = Bytes.toBytes("match"); + private HBaseTestingUtility testUtil; + + List testVals; + private HRegion region; + + @Override + protected void setUp() throws Exception { + super.setUp(); + + testUtil = new HBaseTestingUtility(); + + testVals = makeTestVals(); + + HTableDescriptor htd = new HTableDescriptor(getName()); + htd.addFamily(new HColumnDescriptor(FAMILIES[0])); + htd.addFamily(new HColumnDescriptor(FAMILIES[1])); + HRegionInfo info = new HRegionInfo(htd, null, null, false); + this.region = HRegion.createHRegion(info, testUtil.getTestDir(), testUtil.getConfiguration()); + addData(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + this.region.close(); + } + + private void addData() throws IOException { + Put put = new Put(ROWS[0]); + // add in an entry for each stamp, with 2 as a "good" value + put.add(FAMILIES[0], QUALIFIER, STAMPS[0], BAD_VALS[0]); + put.add(FAMILIES[0], QUALIFIER, STAMPS[1], BAD_VALS[1]); + put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL); + // add in entries for stamps 0 and 2. + // without a value check both will be "accepted" + // with one 2 will be accepted(since the corresponding ts entry + // has a matching value + put.add(FAMILIES[1], QUALIFIER, STAMPS[0], BAD_VALS[0]); + put.add(FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]); + + this.region.put(put); + + put = new Put(ROWS[1]); + put.add(FAMILIES[0], QUALIFIER, STAMPS[0], BAD_VALS[0]); + // there is no corresponding timestamp for this so it should never pass + put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL); + // if we reverse the qualifiers this one should pass + put.add(FAMILIES[1], QUALIFIER, STAMPS[0], MATCH_VAL); + // should pass + put.add(FAMILIES[1], QUALIFIER, STAMPS[1], BAD_VALS[2]); + + this.region.put(put); + } + + private List makeTestVals() { + List testVals = new ArrayList(); + testVals.add(new KeyValue(ROWS[0], FAMILIES[0], QUALIFIER, STAMPS[0], BAD_VALS[0])); + testVals.add(new KeyValue(ROWS[0], FAMILIES[0], QUALIFIER, STAMPS[1], BAD_VALS[1])); + testVals.add(new KeyValue(ROWS[0], FAMILIES[1], QUALIFIER, STAMPS[1], BAD_VALS[2])); + testVals.add(new KeyValue(ROWS[0], FAMILIES[1], QUALIFIER, STAMPS[0], MATCH_VAL)); + testVals.add(new KeyValue(ROWS[0], FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2])); + + return testVals; + } + + /** + * This shouldn't be confused with TestFilter#verifyScan + * as expectedKeys is not the per row total, but the scan total + * + * @param s + * @param expectedRows + * @param expectedCells + * @throws IOException + */ + private void verifyScan(Scan s, long expectedRows, long expectedCells) + throws IOException { + InternalScanner scanner = this.region.getScanner(s); + List results = new ArrayList(); + int i = 0; + int cells = 0; + for (boolean done = true; done; i++) { + done = scanner.next(results); + Arrays.sort(results.toArray(new KeyValue[results.size()]), + KeyValue.COMPARATOR); + LOG.info("counter=" + i + ", " + results); + if (results.isEmpty()) break; + cells += results.size(); + assertTrue("Scanned too many rows! Only expected " + expectedRows + + " total but already scanned " + (i+1), expectedRows > i); + assertTrue("Expected " + expectedCells + " cells total but " + + "already scanned " + cells, expectedCells >= cells); + results.clear(); + } + assertEquals("Expected " + expectedRows + " rows but scanned " + i + + " rows", expectedRows, i); + assertEquals("Expected " + expectedCells + " cells but scanned " + cells + + " cells", expectedCells, cells); + } + + /** + * Test scans using a DependentColumnFilter + */ + public void testScans() throws Exception { + Filter filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER); + + Scan scan = new Scan(); + scan.setFilter(filter); + scan.setMaxVersions(Integer.MAX_VALUE); + + verifyScan(scan, 2, 8); + + // drop the filtering cells + filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER, true); + scan = new Scan(); + scan.setFilter(filter); + scan.setMaxVersions(Integer.MAX_VALUE); + + verifyScan(scan, 2, 3); + + // include a comparator operation + filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER, false, + CompareOp.EQUAL, new BinaryComparator(MATCH_VAL)); + scan = new Scan(); + scan.setFilter(filter); + scan.setMaxVersions(Integer.MAX_VALUE); + + /* + * expecting to get the following 3 cells + * row 0 + * put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL); + * put.add(FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]); + * row 1 + * put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL); + */ + verifyScan(scan, 2, 3); + + // include a comparator operation and drop comparator + filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER, true, + CompareOp.EQUAL, new BinaryComparator(MATCH_VAL)); + scan = new Scan(); + scan.setFilter(filter); + scan.setMaxVersions(Integer.MAX_VALUE); + + /* + * expecting to get the following 1 cell + * row 0 + * put.add(FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]); + */ + verifyScan(scan, 1, 1); + + } + + /** + * Test that the filter correctly drops rows without a corresponding timestamp + * + * @throws Exception + */ + public void testFilterDropping() throws Exception { + Filter filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER); + List accepted = new ArrayList(); + for(KeyValue val : testVals) { + if(filter.filterKeyValue(val) == ReturnCode.INCLUDE) { + accepted.add(val); + } + } + assertEquals("check all values accepted from filterKeyValue", 5, accepted.size()); + + filter.filterRow(accepted); + assertEquals("check filterRow(List) dropped cell without corresponding column entry", 4, accepted.size()); + + // start do it again with dependent column dropping on + filter = new DependentColumnFilter(FAMILIES[1], QUALIFIER, true); + accepted.clear(); + for(KeyValue val : testVals) { + if(filter.filterKeyValue(val) == ReturnCode.INCLUDE) { + accepted.add(val); + } + } + assertEquals("check the filtering column cells got dropped", 2, accepted.size()); + + filter.filterRow(accepted); + assertEquals("check cell retention", 2, accepted.size()); + } +}