diff --git a/CHANGES.txt b/CHANGES.txt index c1aabbbda7e..e9f86dc8419 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -602,6 +602,8 @@ Release 0.21.0 - Unreleased HBASE-2520 Cleanup arrays vs Lists of scanners (Todd Lipcon via Stack) HBASE-2551 Forward port fixes that are in branch but not in trunk (part of the merge of old 0.20 into TRUNK task) + HBASE-2466 Improving filter API to allow for modification of keyvalue list + by filter (Juhani Connolly via Ryan) NEW FEATURES diff --git a/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java index cd74348af3c..29b3cb00844 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; @@ -280,6 +281,11 @@ public class Scan implements Writable { * @param batch the maximum number of values */ public void setBatch(int batch) { + if(this.hasFilter() && this.filter.hasFilterRow()) { + throw new IncompatibleFilterException( + "Cannot set batch on a scan using a filter" + + " that returns true for filter.hasFilterRow"); + } this.batch = batch; } diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java index e70fe6a3fa3..306ed2160aa 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one @@ -32,7 +32,7 @@ import java.io.IOException; * its quota of columns, {@link #filterAllRemaining()} returns true. This * makes this filter unsuitable as a Scan filter. */ -public class ColumnCountGetFilter implements Filter { +public class ColumnCountGetFilter extends FilterBase { private int limit = 0; private int count = 0; @@ -52,31 +52,28 @@ public class ColumnCountGetFilter implements Filter { return limit; } + @Override public boolean filterAllRemaining() { return this.count > this.limit; } + @Override public ReturnCode filterKeyValue(KeyValue v) { this.count++; - return filterAllRemaining()? ReturnCode.SKIP: ReturnCode.INCLUDE; - } - - public boolean filterRow() { - return false; - } - - public boolean filterRowKey(byte[] buffer, int offset, int length) { - return false; + return filterAllRemaining() ? ReturnCode.SKIP: ReturnCode.INCLUDE; } + @Override public void reset() { this.count = 0; } + @Override public void readFields(DataInput in) throws IOException { this.limit = in.readInt(); } + @Override public void write(DataOutput out) throws IOException { out.writeInt(this.limit); } diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java index d050bf7586b..aa30bbffb9b 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/ColumnPaginationFilter.java @@ -1,5 +1,5 @@ -/** - * Copyright 2007 The Apache Software Foundation +/* + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.filter; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; import org.apache.hadoop.hbase.KeyValue; @@ -30,7 +31,7 @@ import org.apache.hadoop.hbase.KeyValue; * This filter can be used for row-based indexing, where references to other tables are stored across many columns, * in order to efficient lookups and paginated results for end users. */ -public class ColumnPaginationFilter implements Filter +public class ColumnPaginationFilter extends FilterBase { private int limit = 0; private int offset = 0; @@ -50,11 +51,7 @@ public class ColumnPaginationFilter implements Filter this.offset = offset; } - public boolean filterAllRemaining() - { - return false; - } - + @Override public ReturnCode filterKeyValue(KeyValue v) { if(count >= offset + limit) @@ -67,17 +64,7 @@ public class ColumnPaginationFilter implements Filter return code; } - public boolean filterRow() - { - this.count = 0; - return false; - } - - public boolean filterRowKey(byte[] buffer, int offset, int length) - { - return false; - } - + @Override public void reset() { this.count = 0; diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java index 46cf8225110..6d734396fe2 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java @@ -43,7 +43,7 @@ import java.util.Arrays; *
* Multiple filters can be combined using {@link FilterList}.
*/
-public abstract class CompareFilter implements Filter {
+public abstract class CompareFilter extends FilterBase {
/** Comparison operators. */
public enum CompareOp {
@@ -59,6 +59,8 @@ public abstract class CompareFilter implements Filter {
GREATER_OR_EQUAL,
/** greater than */
GREATER,
+ /** no operation */
+ NO_OP,
}
protected CompareOp compareOp;
@@ -95,28 +97,12 @@ public abstract class CompareFilter implements Filter {
return comparator;
}
- public void reset() {
- }
-
- public ReturnCode filterKeyValue(KeyValue v) {
- return ReturnCode.INCLUDE;
- }
-
- public boolean filterRowKey(byte[] data, int offset, int length) {
- return false;
- }
-
- public boolean filterRow() {
- return false;
- }
-
- public boolean filterAllRemaining() {
- return false;
- }
-
protected boolean doCompare(final CompareOp compareOp,
final WritableByteArrayComparable comparator, final byte [] data,
final int offset, final int length) {
+ if (compareOp == CompareOp.NO_OP) {
+ return true;
+ }
int compareResult =
comparator.compareTo(Arrays.copyOfRange(data, offset,
offset + length));
diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java
new file mode 100644
index 00000000000..feba2ac2325
--- /dev/null
+++ b/core/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java
@@ -0,0 +1,163 @@
+package org.apache.hadoop.hbase.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A filter for adding inter-column timestamp matching
+ * Only cells with a correspondingly timestamped entry in
+ * the target column will be retained
+ * Not compatible with Scan.setBatch as operations need
+ * full rows for correct filtering
+ */
+public class DependentColumnFilter extends CompareFilter {
+
+ protected byte[] columnFamily;
+ protected byte[] columnQualifier;
+ protected boolean dropDependentColumn;
+
+ protected Set
* This filter can be used to more efficiently perform row count operations.
*/
-public class FirstKeyOnlyFilter implements Filter {
+public class FirstKeyOnlyFilter extends FilterBase {
private boolean foundKV = false;
public FirstKeyOnlyFilter() {
@@ -40,24 +41,12 @@ public class FirstKeyOnlyFilter implements Filter {
foundKV = false;
}
- public boolean filterRowKey(byte[] buffer, int offset, int length) {
- return false;
- }
-
- public boolean filterAllRemaining() {
- return false;
- }
-
public ReturnCode filterKeyValue(KeyValue v) {
if(foundKV) return ReturnCode.NEXT_ROW;
foundKV = true;
return ReturnCode.INCLUDE;
}
- public boolean filterRow() {
- return false;
- }
-
public void write(DataOutput out) throws IOException {
}
diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java
index 7e7fccef6bd..6148f3538cd 100644
--- a/core/src/main/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java
+++ b/core/src/main/java/org/apache/hadoop/hbase/filter/InclusiveStopFilter.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.List;
/**
* A Filter that stops after the given row. There is no "RowStopFilter" because
@@ -33,7 +34,7 @@ import java.io.IOException;
*
* Use this filter to include the stop row, eg: [A,Z].
*/
-public class InclusiveStopFilter implements Filter {
+public class InclusiveStopFilter extends FilterBase {
private byte [] stopRowKey;
private boolean done = false;
@@ -49,10 +50,6 @@ public class InclusiveStopFilter implements Filter {
return this.stopRowKey;
}
- public void reset() {
- // noop, no state
- }
-
public boolean filterRowKey(byte[] buffer, int offset, int length) {
if (buffer == null) {
//noinspection RedundantIfStatement
@@ -75,15 +72,6 @@ public class InclusiveStopFilter implements Filter {
return done;
}
- public ReturnCode filterKeyValue(KeyValue v) {
- // include everything.
- return ReturnCode.INCLUDE;
- }
-
- public boolean filterRow() {
- return false;
- }
-
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.stopRowKey);
}
diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/IncompatibleFilterException.java b/core/src/main/java/org/apache/hadoop/hbase/filter/IncompatibleFilterException.java
new file mode 100644
index 00000000000..84c14dad9f0
--- /dev/null
+++ b/core/src/main/java/org/apache/hadoop/hbase/filter/IncompatibleFilterException.java
@@ -0,0 +1,21 @@
+package org.apache.hadoop.hbase.filter;
+
+/**
+ * Used to indicate a filter incompatibility
+ */
+public class IncompatibleFilterException extends RuntimeException {
+ private static final long serialVersionUID = 3236763276623198231L;
+
+/** constructor */
+ public IncompatibleFilterException() {
+ super();
+ }
+
+ /**
+ * constructor
+ * @param s message
+ */
+ public IncompatibleFilterException(String s) {
+ super(s);
+ }
+}
diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/PageFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/PageFilter.java
index d10bd4f0de3..b5e4dd34677 100644
--- a/core/src/main/java/org/apache/hadoop/hbase/filter/PageFilter.java
+++ b/core/src/main/java/org/apache/hadoop/hbase/filter/PageFilter.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.KeyValue;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.List;
/**
* Implementation of Filter interface that limits results to a specific page
@@ -36,7 +37,7 @@ import java.io.IOException;
* individual HRegions by making sure that the page size is never exceeded
* locally.
*/
-public class PageFilter implements Filter {
+public class PageFilter extends FilterBase {
private long pageSize = Long.MAX_VALUE;
private int rowsAccepted = 0;
@@ -61,16 +62,13 @@ public class PageFilter implements Filter {
return pageSize;
}
- public void reset() {
- // noop
- }
-
public boolean filterAllRemaining() {
return this.rowsAccepted >= this.pageSize;
}
- public boolean filterRowKey(byte[] rowKey, int offset, int length) {
- return false;
+ public boolean filterRow() {
+ this.rowsAccepted++;
+ return this.rowsAccepted > this.pageSize;
}
public void readFields(final DataInput in) throws IOException {
@@ -80,13 +78,4 @@ public class PageFilter implements Filter {
public void write(final DataOutput out) throws IOException {
out.writeLong(pageSize);
}
-
- public ReturnCode filterKeyValue(KeyValue v) {
- return ReturnCode.INCLUDE;
- }
-
- public boolean filterRow() {
- this.rowsAccepted++;
- return this.rowsAccepted > this.pageSize;
- }
}
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java
index d233bf33694..063d06825a3 100644
--- a/core/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java
+++ b/core/src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java
@@ -26,11 +26,12 @@ import org.apache.hadoop.hbase.util.Bytes;
import java.io.DataOutput;
import java.io.IOException;
import java.io.DataInput;
+import java.util.List;
/**
* Pass results that have same row prefix.
*/
-public class PrefixFilter implements Filter {
+public class PrefixFilter extends FilterBase {
protected byte [] prefix = null;
protected boolean passedPrefix = false;
@@ -46,10 +47,6 @@ public class PrefixFilter implements Filter {
return prefix;
}
- public void reset() {
- // Noop
- }
-
public boolean filterRowKey(byte[] buffer, int offset, int length) {
if (buffer == null || this.prefix == null)
return true;
@@ -70,14 +67,6 @@ public class PrefixFilter implements Filter {
return passedPrefix;
}
- public ReturnCode filterKeyValue(KeyValue v) {
- return ReturnCode.INCLUDE;
- }
-
- public boolean filterRow() {
- return false;
- }
-
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.prefix);
}
diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/RowFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/RowFilter.java
index d1f8ba0b9d7..9d9d0a4dbf5 100644
--- a/core/src/main/java/org/apache/hadoop/hbase/filter/RowFilter.java
+++ b/core/src/main/java/org/apache/hadoop/hbase/filter/RowFilter.java
@@ -23,6 +23,8 @@ package org.apache.hadoop.hbase.filter;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
+import java.util.List;
+
/**
* This filter is used to filter based on the key. It takes an operator
* (equal, greater, not equal, etc) and a byte [] comparator for the row,
diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
index fd0b28e3d6f..24ea37f8eea 100644
--- a/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
+++ b/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
@@ -32,6 +32,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
/**
* This filter is used to filter cells based on value. It takes a {@link CompareFilter.CompareOp}
@@ -60,7 +61,7 @@ import java.util.Arrays;
*
* To filter based on the value of all scanned columns, use {@link ValueFilter}.
*/
-public class SingleColumnValueFilter implements Filter {
+public class SingleColumnValueFilter extends FilterBase {
static final Log LOG = LogFactory.getLog(SingleColumnValueFilter.class);
protected byte [] columnFamily;
@@ -144,12 +145,6 @@ public class SingleColumnValueFilter implements Filter {
return columnQualifier;
}
- public boolean filterRowKey(byte[] rowKey, int offset, int length) {
- // We don't filter on the row key... we filter later on column value so
- // always return false.
- return false;
- }
-
public ReturnCode filterKeyValue(KeyValue keyValue) {
// System.out.println("REMOVE KEY=" + keyValue.toString() + ", value=" + Bytes.toString(keyValue.getValue()));
if (this.matchedColumn) {
@@ -195,10 +190,6 @@ public class SingleColumnValueFilter implements Filter {
}
}
- public boolean filterAllRemaining() {
- return false;
- }
-
public boolean filterRow() {
// If column was found, return false if it was matched, true if it was not
// If column not found, return true if we filter if missing, false if not
diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java
index e46b09bacc6..72a91bbf97a 100644
--- a/core/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java
+++ b/core/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.KeyValue;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.List;
/**
* A wrapper filter that filters an entire row if any of the KeyValue checks do
@@ -44,7 +45,7 @@ import java.io.IOException;
* Without this filter, the other non-zero valued columns in the row would still
* be emitted.
*/
-public class SkipFilter implements Filter {
+public class SkipFilter extends FilterBase {
private boolean filterRow = false;
private Filter filter;
@@ -69,14 +70,6 @@ public class SkipFilter implements Filter {
filterRow = filterRow || value;
}
- public boolean filterRowKey(byte[] buffer, int offset, int length) {
- return false;
- }
-
- public boolean filterAllRemaining() {
- return false;
- }
-
public ReturnCode filterKeyValue(KeyValue v) {
ReturnCode c = filter.filterKeyValue(v);
changeFR(c != ReturnCode.INCLUDE);
diff --git a/core/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java b/core/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java
index 7c1f404bbd6..bad3c68ad9b 100644
--- a/core/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java
+++ b/core/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.KeyValue;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.List;
/**
* A wrapper filter that returns true from {@link #filterAllRemaining()} as soon
@@ -34,7 +35,7 @@ import java.io.IOException;
* {@link org.apache.hadoop.hbase.filter.Filter#filterAllRemaining()} methods
* returns true.
*/
-public class WhileMatchFilter implements Filter {
+public class WhileMatchFilter extends FilterBase {
private boolean filterAllRemaining = false;
private Filter filter;
diff --git a/core/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java b/core/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
index 3cc5e0ff9e3..85fde3a0596 100644
--- a/core/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
+++ b/core/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
@@ -158,6 +158,7 @@ public class HbaseObjectWritable implements Writable, Configurable {
addToMap(SkipFilter.class, code++);
addToMap(WritableByteArrayComparable.class, code++);
addToMap(FirstKeyOnlyFilter.class, code++);
+ addToMap(DependentColumnFilter.class, code++);
addToMap(Delete [].class, code++);
diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 00fafc99d60..038a3357395 100644
--- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -304,7 +305,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
/**
* Initialize this region and get it ready to roll.
* Called after construction.
- *
+ *
* @param initialFiles path
* @param reporter progressable
* @throws IOException e
@@ -448,7 +449,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
public ReadWriteConsistencyControl getRWCC() {
return rwcc;
}
-
+
/**
* Close down this HRegion. Flush the cache, shut down each HStore, don't
* service any more calls.
@@ -459,7 +460,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* @return Vector of all the storage files that the HRegion's component
* HStores make use of. It's a list of all HStoreFile objects. Returns empty
* vector if already closed and null if judged that it should not close.
- *
+ *
* @throws IOException e
*/
public List This method may block for some time.
*
* @return true if the region needs compacting
- *
+ *
* @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
@@ -1051,7 +1052,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
storeFlushers.clear();
-
+
// Set down the memstore size by amount of flush.
this.memstoreSize.addAndGet(-currentMemStoreSize);
} catch (Throwable t) {
@@ -1660,7 +1661,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
walEdit, now);
}
-
+
long size = 0;
w = rwcc.beginMemstoreInsert();
@@ -1996,9 +1997,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
results.clear();
boolean returnResult = nextInternal(limit);
- if (!returnResult && filter != null && filter.filterRow()) {
- results.clear();
- }
+
outResults.addAll(results);
resetFilters();
if (isFilterDone()) {
@@ -2024,6 +2023,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
while (true) {
byte [] currentRow = peekRow();
if (isStopRow(currentRow)) {
+ if (filter != null && filter.hasFilterRow()) {
+ filter.filterRow(results);
+ }
+ if (filter != null && filter.filterRow()) {
+ results.clear();
+ }
+
return false;
} else if (filterRowKey(currentRow)) {
nextRow(currentRow);
@@ -2032,19 +2038,33 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
do {
this.storeHeap.next(results, limit);
if (limit > 0 && results.size() == limit) {
- return true;
+ if (this.filter != null && filter.hasFilterRow()) throw new IncompatibleFilterException(
+ "Filter with filterRow(List