From 73183e6b975ea3817fad83a4cf779c6c545f652d Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 5 Jul 2007 19:50:04 +0000 Subject: [PATCH] HADOOP-1531 Add RowFilter to HRegion.HScanner. Adds a row/column filter interface and two implementations: A pager and a row/column-value regex filter. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (openScanner): Add override that specifies a row fliter. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (obtainScanner): Add override that specifies a row fliter. (ColumnScanner): Add filter parameter to constructor. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (getScanner): Add override with filter parameter. (next): Add handling of filtering. A src/contrib/hbase/src/java/org/apache/hadoop/hbase/filter/InvalidRowFilterException.java A src/contrib/hbase/src/java/org/apache/hadoop/hbase/filter/RegExpRowFilter.java A src/contrib/hbase/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java A src/contrib/hbase/src/java/org/apache/hadoop/hbase/filter/PageRowFilter.java A src/contrib/hbase/src/java/org/apache/hadoop/hbase/filter/RowFilterInterface.java Row-filter interface, exception and implementations. A src/contrib/hbase/src/test/org/apache/hadoop/hbase/filter/TestRegExpRowFilter.java A src/contrib/hbase/src/test/org/apache/hadoop/hbase/filter/TestPageRowFilter.java Simple pager and regex filter tests. git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@553620 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 8 +- src/java/org/apache/hadoop/hbase/HClient.java | 51 ++- src/java/org/apache/hadoop/hbase/HRegion.java | 213 +++++++++---- .../apache/hadoop/hbase/HRegionInterface.java | 15 + .../apache/hadoop/hbase/HRegionServer.java | 19 +- .../filter/InvalidRowFilterException.java | 31 ++ .../hadoop/hbase/filter/PageRowFilter.java | 137 ++++++++ .../hadoop/hbase/filter/RegExpRowFilter.java | 300 ++++++++++++++++++ .../hbase/filter/RowFilterInterface.java | 106 +++++++ .../hadoop/hbase/filter/RowFilterSet.java | 230 ++++++++++++++ .../hbase/filter/TestPageRowFilter.java | 48 +++ .../hbase/filter/TestRegExpRowFilter.java | 81 +++++ 12 files changed, 1154 insertions(+), 85 deletions(-) create mode 100644 src/java/org/apache/hadoop/hbase/filter/InvalidRowFilterException.java create mode 100644 src/java/org/apache/hadoop/hbase/filter/PageRowFilter.java create mode 100644 src/java/org/apache/hadoop/hbase/filter/RegExpRowFilter.java create mode 100644 src/java/org/apache/hadoop/hbase/filter/RowFilterInterface.java create mode 100644 src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java create mode 100644 src/test/org/apache/hadoop/hbase/filter/TestPageRowFilter.java create mode 100644 src/test/org/apache/hadoop/hbase/filter/TestRegExpRowFilter.java diff --git a/CHANGES.txt b/CHANGES.txt index b1fc395ae15..7b36af7921b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -38,11 +38,15 @@ Trunk (unreleased changes) 23. HADOOP-1509. Made methods/inner classes in HRegionServer and HClient protected instead of private for easier extension. Also made HRegion and HRegionInfo public too. Added an hbase-default.xml property for specifying what HRegionInterface extension to use - for proxy server connection. + for proxy server connection. (James Kennedy via Jim Kellerman) 24. HADOOP-1534. [hbase] Memcache scanner fails if start key not present 25. HADOOP-1537. Catch exceptions in testCleanRegionServerExit so we can see what is failing. 26. HADOOP-1543 [hbase] Add HClient.tableExists - 27. HADOOP-1519 [hbase] map/reduce interface for HBase + 27. HADOOP-1519 [hbase] map/reduce interface for HBase. (Vuk Ercegovac and + Jim Kellerman) 28. HADOOP-1523 Hung region server waiting on write locks 29. HADOOP-1560 NPE in MiniHBaseCluster on Windows + 30. HADOOP-1531 Add RowFilter to HRegion.HScanner + Adds a row filtering interface and two implemenentations: A page scanner, + and a regex row/column-data matcher. (James Kennedy via Stack) diff --git a/src/java/org/apache/hadoop/hbase/HClient.java b/src/java/org/apache/hadoop/hbase/HClient.java index 8b142c2da94..13a533775b0 100644 --- a/src/java/org/apache/hadoop/hbase/HClient.java +++ b/src/java/org/apache/hadoop/hbase/HClient.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.KeyedData; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Text; @@ -1173,18 +1174,34 @@ public class HClient implements HConstants { * Get a scanner on the current table starting at the specified row. * Return the specified columns. * - * @param columns - array of columns to return - * @param startRow - starting row in table to scan - * @return - scanner + * @param columns array of columns to return + * @param startRow starting row in table to scan + * @return scanner * @throws IOException */ public synchronized HScannerInterface obtainScanner(Text[] columns, - Text startRow) throws IOException { - + Text startRow) + throws IOException { + return obtainScanner(columns, startRow, null); + } + + /** + * Get a scanner on the current table starting at the specified row. + * Return the specified columns. + * + * @param columns array of columns to return + * @param startRow starting row in table to scan + * @param filter a row filter using row-key regexp and/or column data filter. + * @return scanner + * @throws IOException + */ + public synchronized HScannerInterface obtainScanner(Text[] columns, + Text startRow, RowFilterInterface filter) + throws IOException { if(this.tableServers == null) { throw new IllegalStateException("Must open table first"); } - return new ClientScanner(columns, startRow); + return new ClientScanner(columns, startRow, filter); } /* @@ -1388,6 +1405,7 @@ public class HClient implements HConstants { private int currentRegion; private HRegionInterface server; private long scannerId; + private RowFilterInterface filter; private void loadRegions() { Text firstServer = null; @@ -1404,11 +1422,15 @@ public class HClient implements HConstants { this.regions = info.toArray(new RegionLocation[info.size()]); } - ClientScanner(Text[] columns, Text startRow) throws IOException { + ClientScanner(Text[] columns, Text startRow, RowFilterInterface filter) + throws IOException { this.columns = columns; this.startRow = startRow; this.closed = false; - + this.filter = filter; + if (filter != null) { + filter.validate(columns); + } loadRegions(); this.currentRegion = -1; this.server = null; @@ -1437,9 +1459,16 @@ public class HClient implements HConstants { RegionLocation info = this.regions[currentRegion]; try { - this.scannerId = this.server.openScanner(info.regionInfo.regionName, - this.columns, currentRegion == 0 ? this.startRow : EMPTY_START_ROW); - + if (this.filter == null) { + this.scannerId = this.server.openScanner(info.regionInfo.regionName, + this.columns, currentRegion == 0 ? this.startRow + : EMPTY_START_ROW); + } else { + this.scannerId = this.server.openScanner(info.regionInfo.regionName, + this.columns, currentRegion == 0 ? this.startRow + : EMPTY_START_ROW, filter); + } + break; } catch(NotServingRegionException e) { diff --git a/src/java/org/apache/hadoop/hbase/HRegion.java b/src/java/org/apache/hadoop/hbase/HRegion.java index 8f6e45dda9e..88e3bd60109 100644 --- a/src/java/org/apache/hadoop/hbase/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/HRegion.java @@ -15,14 +15,26 @@ */ package org.apache.hadoop.hbase; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.Vector; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.conf.*; - -import java.io.*; -import java.util.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Text; /** * HRegion stores data for a certain region of a table. It stores all columns @@ -974,6 +986,15 @@ public class HRegion implements HConstants { */ public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) throws IOException { + return getScanner(cols, firstRow, null); + } + + /** + * Return an iterator that scans over the HRegion, returning the indicated + * columns for only the rows that match the data filter. This Iterator must be closed by the caller. + */ + public HInternalScannerInterface getScanner(Text[] cols, Text firstRow, RowFilterInterface filter) + throws IOException { lock.obtainReadLock(); try { TreeSet families = new TreeSet(); @@ -986,7 +1007,7 @@ public class HRegion implements HConstants { for (Text family: families) { storelist[i++] = stores.get(family); } - return new HScanner(cols, firstRow, memcache, storelist); + return new HScanner(cols, firstRow, memcache, storelist, filter); } finally { lock.releaseReadLock(); } @@ -1262,12 +1283,17 @@ public class HRegion implements HConstants { private HStoreKey[] keys; private boolean wildcardMatch; private boolean multipleMatchers; + private RowFilterInterface dataFilter; /** Create an HScanner with a handle on many HStores. */ @SuppressWarnings("unchecked") - HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores) + HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores, RowFilterInterface filter) throws IOException { long scanTime = System.currentTimeMillis(); + this.dataFilter = filter; + if (null != dataFilter) { + dataFilter.reset(); + } this.scanners = new HInternalScannerInterface[stores.length + 1]; for(int i = 0; i < this.scanners.length; i++) { this.scanners[i] = null; @@ -1335,84 +1361,135 @@ public class HRegion implements HConstants { return multipleMatchers; } - /* (non-Javadoc) + /* + * (non-Javadoc) * - * Grab the next row's worth of values. The HScanner will return the most + * Grab the next row's worth of values. The HScanner will return the most * recent data value for each row that is not newer than the target time. - * - * @see org.apache.hadoop.hbase.HInternalScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap) + * + * If a dataFilter is defined, it will be used to skip rows that do not + * match its criteria. It may cause the scanner to stop prematurely if it + * knows that it will no longer accept the remaining results. + * + * @see org.apache.hadoop.hbase.HInternalScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, + * java.util.TreeMap) */ - public boolean next(HStoreKey key, TreeMap results) + public boolean next(HStoreKey key, TreeMap results) throws IOException { - // Find the lowest-possible key. - Text chosenRow = null; - long chosenTimestamp = -1; - for(int i = 0; i < this.keys.length; i++) { - if(scanners[i] != null - && (chosenRow == null - || (keys[i].getRow().compareTo(chosenRow) < 0) - || ((keys[i].getRow().compareTo(chosenRow) == 0) - && (keys[i].getTimestamp() > chosenTimestamp)))) { - - chosenRow = new Text(keys[i].getRow()); - chosenTimestamp = keys[i].getTimestamp(); + boolean filtered = true; + boolean moreToFollow = true; + while (filtered && moreToFollow) { + // Find the lowest-possible key. + Text chosenRow = null; + long chosenTimestamp = -1; + for (int i = 0; i < this.keys.length; i++) { + if (scanners[i] != null && + (chosenRow == null || + (keys[i].getRow().compareTo(chosenRow) < 0) || + ((keys[i].getRow().compareTo(chosenRow) == 0) && + (keys[i].getTimestamp() > chosenTimestamp)))) { + chosenRow = new Text(keys[i].getRow()); + chosenTimestamp = keys[i].getTimestamp(); + } } - } - // Store the key and results for each sub-scanner. Merge them as appropriate. - boolean insertedItem = false; - if(chosenTimestamp > 0) { - key.setRow(chosenRow); - key.setVersion(chosenTimestamp); - key.setColumn(new Text("")); + // Filter whole row by row key? + filtered = dataFilter != null? dataFilter.filter(chosenRow) : false; - for(int i = 0; i < scanners.length; i++) { - while((scanners[i] != null) - && (keys[i].getRow().compareTo(chosenRow) == 0)) { - // If we are doing a wild card match or there are multiple matchers - // per column, we need to scan all the older versions of this row - // to pick up the rest of the family members - - if(!wildcardMatch - && !multipleMatchers - && (keys[i].getTimestamp() != chosenTimestamp)) { - break; - } + // Store the key and results for each sub-scanner. Merge them as + // appropriate. + if (chosenTimestamp > 0 && !filtered) { + key.setRow(chosenRow); + key.setVersion(chosenTimestamp); + key.setColumn(new Text("")); - // NOTE: We used to do results.putAll(resultSets[i]); - // but this had the effect of overwriting newer - // values with older ones. So now we only insert - // a result if the map does not contain the key. - - for(Map.Entry e: resultSets[i].entrySet()) { - if(!results.containsKey(e.getKey())) { - results.put(e.getKey(), e.getValue()); - insertedItem = true; + for (int i = 0; i < scanners.length && !filtered; i++) { + + while ((scanners[i] != null + && !filtered + && moreToFollow) + && (keys[i].getRow().compareTo(chosenRow) == 0)) { + // If we are doing a wild card match or there are multiple + // matchers + // per column, we need to scan all the older versions of this row + // to pick up the rest of the family members + + if (!wildcardMatch + && !multipleMatchers + && (keys[i].getTimestamp() != chosenTimestamp)) { + break; + } + + // Filter out null criteria columns that are not null + if (dataFilter != null) { + filtered = dataFilter.filterNotNull(resultSets[i]); + } + + // NOTE: We used to do results.putAll(resultSets[i]); + // but this had the effect of overwriting newer + // values with older ones. So now we only insert + // a result if the map does not contain the key. + + for (Map.Entry e : resultSets[i].entrySet()) { + if (!filtered && moreToFollow && + !results.containsKey(e.getKey())) { + if (dataFilter != null) { + // Filter whole row by column data? + filtered = + dataFilter.filter(chosenRow, e.getKey(), e.getValue()); + if (filtered) { + results.clear(); + break; + } + } + results.put(e.getKey(), e.getValue()); + } + } + + resultSets[i].clear(); + if (!scanners[i].next(keys[i], resultSets[i])) { + closeScanner(i); } } - resultSets[i].clear(); - if(! scanners[i].next(keys[i], resultSets[i])) { - closeScanner(i); + // If the current scanner is non-null AND has a lower-or-equal + // row label, then its timestamp is bad. We need to advance it. + while ((scanners[i] != null) && + (keys[i].getRow().compareTo(chosenRow) <= 0)) { + resultSets[i].clear(); + if (!scanners[i].next(keys[i], resultSets[i])) { + closeScanner(i); + } } + } + } + + moreToFollow = chosenTimestamp > 0; + + if (dataFilter != null) { + if (moreToFollow && !filtered) { + dataFilter.acceptedRow(chosenRow); } - - // If the current scanner is non-null AND has a lower-or-equal - // row label, then its timestamp is bad. We need to advance it. - - while((scanners[i] != null) - && (keys[i].getRow().compareTo(chosenRow) <= 0)) { - - resultSets[i].clear(); - if(! scanners[i].next(keys[i], resultSets[i])) { - closeScanner(i); - } + if (dataFilter.filterAllRemaining()) { + moreToFollow = false; + LOG.debug("page limit"); + } + } + } + + // Make sure scanners closed if no more results + if (!moreToFollow) { + for (int i = 0; i < scanners.length; i++) { + if (null != scanners[i]) { + closeScanner(i); } } } - return insertedItem; + + return moreToFollow; } + /** Shut down a single scanner */ void closeScanner(int i) { try { diff --git a/src/java/org/apache/hadoop/hbase/HRegionInterface.java b/src/java/org/apache/hadoop/hbase/HRegionInterface.java index 3b47896d765..eafaa861c78 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/HRegionInterface.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; +import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.KeyedData; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.VersionedProtocol; @@ -201,6 +202,20 @@ public interface HRegionInterface extends VersionedProtocol { */ public long openScanner(Text regionName, Text[] columns, Text startRow) throws IOException; + + /** + * Opens a remote scanner with a RowFilter. + * + * @param regionName name of region to scan + * @param columns columns to scan + * @param startRow starting row to scan + * @param filter RowFilter for filtering results at the row-level. + * + * @return scannerId scanner identifier used in other calls + * @throws IOException + */ + public long openScanner(Text regionName, Text[] columns, Text startRow, RowFilterInterface filter) + throws IOException; /** * Get the next set of values diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java index 6a6cdea0649..329e30d7ca6 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java @@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.KeyedData; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryProxy; @@ -1150,15 +1151,25 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } } - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.HRegionInterface#openScanner(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text[], org.apache.hadoop.io.Text) + /** + * {@inheritDoc} */ - public long openScanner(Text regionName, Text[] cols, Text firstRow) + public long openScanner(final Text regionName, final Text[] cols, + final Text firstRow) + throws IOException{ + return openScanner(regionName, cols, firstRow, null); + } + + /** + * {@inheritDoc} + */ + public long openScanner(Text regionName, Text[] cols, Text firstRow, + final RowFilterInterface filter) throws IOException { HRegion r = getRegion(regionName); long scannerId = -1L; try { - HInternalScannerInterface s = r.getScanner(cols, firstRow); + HInternalScannerInterface s = r.getScanner(cols, firstRow, filter); scannerId = rand.nextLong(); String scannerName = String.valueOf(scannerId); synchronized(scanners) { diff --git a/src/java/org/apache/hadoop/hbase/filter/InvalidRowFilterException.java b/src/java/org/apache/hadoop/hbase/filter/InvalidRowFilterException.java new file mode 100644 index 00000000000..f45483d42a7 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/filter/InvalidRowFilterException.java @@ -0,0 +1,31 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +/** + * Used to indicate an invalid RowFilter. + */ +public class InvalidRowFilterException extends RuntimeException { + private static final long serialVersionUID = 2667894046345657865L; + + public InvalidRowFilterException() { + super(); + } + + public InvalidRowFilterException(String s) { + super(s); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/filter/PageRowFilter.java b/src/java/org/apache/hadoop/hbase/filter/PageRowFilter.java new file mode 100644 index 00000000000..2136bbfbd39 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/filter/PageRowFilter.java @@ -0,0 +1,137 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.TreeMap; + +import org.apache.hadoop.io.Text; + +/** + * Implementation of RowFilterInterface that limits results to a specific page + * size. It terminates scanning once the number of filter-passed results is >= + * the given page size. + * + *

+ * Note that this filter cannot guarantee that the number of results returned + * to a client are <= page size. This is because the filter is applied + * separately on different region servers. It does however optimize the scan of + * individual HRegions by making sure that the page size is never exceeded + * locally. + *

+ */ +public class PageRowFilter implements RowFilterInterface { + + private long pageSize = Long.MAX_VALUE; + private int rowsAccepted = 0; + + /** + * Default constructor, filters nothing. Required though for RPC + * deserialization. + */ + public PageRowFilter() { + super(); + } + + /** + * Constructor that takes a maximum page size. + * + * @param pageSize Maximum result size. + */ + public PageRowFilter(final long pageSize) { + this.pageSize = pageSize; + } + + /** + * + * {@inheritDoc} + */ + public void validate(@SuppressWarnings("unused") final Text[] columns) { + // Doesn't filter columns + } + + /** + * + * {@inheritDoc} + */ + public void reset() { + rowsAccepted = 0; + } + + /** + * + * {@inheritDoc} + */ + public void acceptedRow(@SuppressWarnings("unused") final Text key) { + rowsAccepted++; + } + + /** + * + * {@inheritDoc} + */ + public boolean filterAllRemaining() { + if (this.rowsAccepted > this.pageSize) { + return true; + } + return false; + } + + /** + * + * {@inheritDoc} + */ + public boolean filter(@SuppressWarnings("unused") final Text rowKey) { + return filterAllRemaining(); + } + + /** + * + * {@inheritDoc} + */ + public boolean filter(@SuppressWarnings("unused") final Text rowKey, + @SuppressWarnings("unused") final Text colKey, + @SuppressWarnings("unused") final byte[] data) { + return filterAllRemaining(); + } + + /** + * + * {@inheritDoc} + */ + public boolean filterNotNull(@SuppressWarnings("unused") + final TreeMap columns) { + return filterAllRemaining(); + } + + /** + * + * {@inheritDoc} + */ + public void readFields(final DataInput in) throws IOException { + this.pageSize = in.readLong(); + } + + /** + * + * {@inheritDoc} + */ + public void write(final DataOutput out) throws IOException { + out.writeLong(pageSize); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/filter/RegExpRowFilter.java b/src/java/org/apache/hadoop/hbase/filter/RegExpRowFilter.java new file mode 100644 index 00000000000..3dace94e7c9 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/filter/RegExpRowFilter.java @@ -0,0 +1,300 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.Map.Entry; +import java.util.regex.Pattern; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.io.Text; + +/** + * Implementation of RowFilterInterface that can filter by rowkey regular + * expression and/or individual column values (equals comparison only). + * Multiple column filters imply an implicit conjunction of filter criteria. + */ +public class RegExpRowFilter implements RowFilterInterface { + + private Pattern rowKeyPattern = null; + private String rowKeyRegExp = null; + + private Map equalsMap = new HashMap(); + private Set nullColumns = new HashSet(); + + /** + * Default constructor, filters nothing. Required though for RPC + * deserialization. + */ + public RegExpRowFilter() { + super(); + } + + /** + * Constructor that takes a row key regular expression to filter on. + * + * @param rowKeyRegExp + */ + public RegExpRowFilter(final String rowKeyRegExp) { + this.rowKeyRegExp = rowKeyRegExp; + } + + /** + * Constructor that takes a row key regular expression to filter on. + * + * @param rowKeyRegExp + * @param columnFilter + */ + public RegExpRowFilter(final String rowKeyRegExp, + final Map columnFilter) { + this.rowKeyRegExp = rowKeyRegExp; + this.setColumnFilters(columnFilter); + } + + /** + * + * {@inheritDoc} + */ + public void acceptedRow(@SuppressWarnings("unused") final Text key) { + //doesn't care + } + + /** + * Specify a value that must be matched for the given column. + * + * @param colKey + * the column to match on + * @param value + * the value that must equal the stored value. + */ + public void setColumnFilter(final Text colKey, final byte[] value) { + if (null == value) { + nullColumns.add(colKey); + } else { + equalsMap.put(colKey, value); + } + } + + /** + * Set column filters for a number of columns. + * + * @param columnFilter + * Map of columns with value criteria. + */ + public void setColumnFilters(final Map columnFilter) { + if (null == columnFilter) { + nullColumns.clear(); + equalsMap.clear(); + } else { + for (Entry entry : columnFilter.entrySet()) { + setColumnFilter(entry.getKey(), entry.getValue()); + } + } + } + + /** + * + * {@inheritDoc} + */ + public void reset() { + // Nothing to reset + } + + /** + * + * {@inheritDoc} + */ + public boolean filterAllRemaining() { + return false; + } + + /** + * + * {@inheritDoc} + */ + public boolean filter(final Text rowKey) { + if (filtersByRowKey() && rowKey != null) { + return !getRowKeyPattern().matcher(rowKey.toString()).matches(); + } + return false; + } + + /** + * + * {@inheritDoc} + */ + public boolean filter(final Text rowKey, final Text colKey, + final byte[] data) { + if (filter(rowKey)) { + return true; + } + if (filtersByColumnValue()) { + byte[] filterValue = equalsMap.get(colKey); + if (null != filterValue) { + return !Arrays.equals(filterValue, data); + } + } + if (nullColumns.contains(colKey)) { + if (data != null && !Arrays.equals(HConstants.DELETE_BYTES.get(), data)) { + return true; + } + } + return false; + } + + /** + * + * {@inheritDoc} + */ + public boolean filterNotNull(final TreeMap columns) { + for (Entry col : columns.entrySet()) { + if (nullColumns.contains(col.getKey()) + && !Arrays.equals(HConstants.DELETE_BYTES.get(), col.getValue())) { + return true; + } + } + for (Text col : equalsMap.keySet()) { + if (!columns.containsKey(col)) { + return true; + } + } + return false; + } + + private boolean filtersByColumnValue() { + return equalsMap != null && equalsMap.size() > 0; + } + + private boolean filtersByRowKey() { + return null != rowKeyPattern || null != rowKeyRegExp; + } + + private String getRowKeyRegExp() { + if (null == rowKeyRegExp && rowKeyPattern != null) { + rowKeyRegExp = rowKeyPattern.toString(); + } + return rowKeyRegExp; + } + + private Pattern getRowKeyPattern() { + if (rowKeyPattern == null && rowKeyRegExp != null) { + rowKeyPattern = Pattern.compile(rowKeyRegExp); + } + return rowKeyPattern; + } + + /** + * + * {@inheritDoc} + */ + public void readFields(final DataInput in) throws IOException { + boolean hasRowKeyPattern = in.readBoolean(); + if (hasRowKeyPattern) { + rowKeyRegExp = in.readLine(); + } + // equals map + equalsMap.clear(); + int size = in.readInt(); + for (int i = 0; i < size; i++) { + Text key = new Text(); + key.readFields(in); + int len = in.readInt(); + byte[] value = null; + if (len >= 0) { + value = new byte[len]; + in.readFully(value); + } + setColumnFilter(key, value); + } + // nullColumns + nullColumns.clear(); + size = in.readInt(); + for (int i = 0; i < size; i++) { + Text key = new Text(); + key.readFields(in); + setColumnFilter(key, null); + } + } + + /** + * + * {@inheritDoc} + */ + public void validate(final Text[] columns) { + Set invalids = new HashSet(); + for (Text colKey : getFilterColumns()) { + boolean found = false; + for (Text col : columns) { + if (col.equals(colKey)) { + found = true; + break; + } + } + if (!found) { + invalids.add(colKey); + } + } + + if (invalids.size() > 0) { + throw new InvalidRowFilterException(String.format( + "RowFilter contains criteria on columns %s not in %s", invalids, + Arrays.toString(columns))); + } + } + + private Set getFilterColumns() { + Set cols = new HashSet(); + cols.addAll(equalsMap.keySet()); + cols.addAll(nullColumns); + return cols; + } + + /** + * + * {@inheritDoc} + */ + public void write(final DataOutput out) throws IOException { + if (!filtersByRowKey()) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeChars(getRowKeyRegExp()); + } + + // equalsMap + out.writeInt(equalsMap.size()); + for (Entry entry : equalsMap.entrySet()) { + entry.getKey().write(out); + byte[] value = entry.getValue(); + out.writeInt(value.length); + out.write(value); + } + + // null columns + out.writeInt(nullColumns.size()); + for (Text col : nullColumns) { + col.write(out); + } + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/filter/RowFilterInterface.java b/src/java/org/apache/hadoop/hbase/filter/RowFilterInterface.java new file mode 100644 index 00000000000..b09f507f80b --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/filter/RowFilterInterface.java @@ -0,0 +1,106 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +import java.util.TreeMap; + +import org.apache.hadoop.hbase.HRegion; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * + * Interface used for row-level filters applied to HRegion.HScanner scan + * results during calls to next(). + */ +public interface RowFilterInterface extends Writable { + + /** + * Resets the state of the filter. Used prior to the start of a Region scan. + * + */ + void reset(); + + /** + * Called to let filter know that the specified row has been included in the + * results (passed all filtering). With out HScanner calling this, the filter + * does not know if a row passed filtering even if it passed the row itself + * because other filters may have failed the row. E.g. when this filter is a + * member of a RowFilterSet with an OR operator. + * + * @see RowFilterSet + * @param key + */ + void acceptedRow(final Text key); + + /** + * Determines if the filter has decided that all remaining results should be + * filtered (skipped). This is used to prevent the scanner from scanning a + * the rest of the HRegion when for sure the filter will exclude all + * remaining rows. + * + * @return true if the filter intends to filter all remaining rows. + */ + boolean filterAllRemaining(); + + /** + * Filters on just a row key. + * + * @param rowKey + * @return true if given row key is filtered and row should not be processed. + */ + boolean filter(final Text rowKey); + + /** + * Filters on row key and/or a column key. + * + * @param rowKey + * row key to filter on. May be null for no filtering of row key. + * @param colKey + * column whose data will be filtered + * @param data + * column value + * @return true if row filtered and should not be processed. + */ + boolean filter(final Text rowKey, final Text colKey, final byte[] data); + + /** + * Filters row if given columns are non-null and have null criteria or if + * there exists criteria on columns not included in the column set. A column + * is considered null if it: + *
    + *
  • Is not included in the given columns.
  • + *
  • Has a value of HConstants.DELETE_BYTES
  • + *
+ * + * @param columns + * @return true if null/non-null criteria not met. + */ + boolean filterNotNull(final TreeMap columns); + + /** + * Validates that this filter applies only to a subset of the given columns. + * This check is done prior to opening of scanner due to the limitation that + * filtering of columns is dependent on the retrieval of those columns within + * the HRegion. Criteria on columns that are not part of a scanner's column + * list will be ignored. In the case of null value filters, all rows will pass + * the filter. This behavior should be 'undefined' for the user and therefore + * not permitted. + * + * @param columns + */ + void validate(final Text[] columns); +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java b/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java new file mode 100644 index 00000000000..9677576368b --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/filter/RowFilterSet.java @@ -0,0 +1,230 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.hadoop.io.Text; + +/** + * Implementation of RowFilterInterface that represents a set of RowFilters + * which will be evaluated with a specified boolean operator AND/OR. Since you + * can use RowFilterSets as children of RowFilterSet, you can create a + * hierarchy of filters to be evaluated. + */ +public class RowFilterSet implements RowFilterInterface { + + enum Operator { + AND, OR + } + + private Operator operator = Operator.AND; + private Set filters = new HashSet(); + + /** + * Default constructor, filters nothing. Required though for RPC + * deserialization. + */ + public RowFilterSet() { + super(); + } + + /** + * Constructor that takes a set of RowFilters. The default operator AND is + * assumed. + * + * @param rowFilters + */ + public RowFilterSet(final Set rowFilters) { + this.filters = rowFilters; + } + + /** + * Constructor that takes a set of RowFilters and an operator. + * + * @param operator Operator to process filter set with. + * @param rowFilters Set of row filters. + */ + public RowFilterSet(final Operator operator, + final Set rowFilters) { + this.filters = rowFilters; + this.operator = operator; + } + + /** + * + * {@inheritDoc} + */ + public void validate(final Text[] columns) { + for (RowFilterInterface filter : filters) { + filter.validate(columns); + } + } + + /** + * + * {@inheritDoc} + */ + public void reset() { + for (RowFilterInterface filter : filters) { + filter.reset(); + } + } + + /** + * + * {@inheritDoc} + */ + public void acceptedRow(final Text key) { + for (RowFilterInterface filter : filters) { + filter.acceptedRow(key); + } + } + + /** + * + * {@inheritDoc} + */ + public boolean filterAllRemaining() { + boolean result = operator == Operator.OR; + for (RowFilterInterface filter : filters) { + if (operator == Operator.AND) { + if (filter.filterAllRemaining()) { + return true; + } + } else if (operator == Operator.OR) { + if (!filter.filterAllRemaining()) { + return false; + } + } + } + return result; + } + + /** + * + * {@inheritDoc} + */ + public boolean filter(final Text rowKey) { + boolean result = operator == Operator.OR; + for (RowFilterInterface filter : filters) { + if (operator == Operator.AND) { + if (filter.filterAllRemaining() || filter.filter(rowKey)) { + return true; + } + } else if (operator == Operator.OR) { + if (!filter.filterAllRemaining() && !filter.filter(rowKey)) { + return false; + } + } + } + return result; + + } + + /** + * + * {@inheritDoc} + */ + public boolean filter(final Text rowKey, final Text colKey, final byte[] data) { + boolean result = operator == Operator.OR; + for (RowFilterInterface filter : filters) { + if (operator == Operator.AND) { + if (filter.filterAllRemaining() || filter.filter(rowKey, colKey, data)) { + return true; + } + } else if (operator == Operator.OR) { + if (!filter.filterAllRemaining() + && !filter.filter(rowKey, colKey, data)) { + return false; + } + } + } + return result; + } + + /** + * + * {@inheritDoc} + */ + public boolean filterNotNull(final TreeMap columns) { + boolean result = operator == Operator.OR; + for (RowFilterInterface filter : filters) { + if (operator == Operator.AND) { + if (filter.filterAllRemaining() || filter.filterNotNull(columns)) { + return true; + } + } else if (operator == Operator.OR) { + if (!filter.filterAllRemaining() && !filter.filterNotNull(columns)) { + return false; + } + } + } + return result; + } + + /** + * + * {@inheritDoc} + */ + public void readFields(final DataInput in) throws IOException { + byte opByte = in.readByte(); + operator = Operator.values()[opByte]; + int size = in.readInt(); + if (size > 0) { + filters = new HashSet(); + try { + for (int i = 0; i < size; i++) { + String className = in.readUTF(); + Class clazz = Class.forName(className); + RowFilterInterface filter; + filter = (RowFilterInterface) clazz.newInstance(); + filter.readFields(in); + filters.add(filter); + } + } catch (InstantiationException e) { + throw new RuntimeException("Failed to deserialize RowFilterInterface.", + e); + } catch (IllegalAccessException e) { + throw new RuntimeException("Failed to deserialize RowFilterInterface.", + e); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Failed to deserialize RowFilterInterface.", + e); + } + } + + } + + /** + * + * {@inheritDoc} + */ + public void write(final DataOutput out) throws IOException { + out.writeByte(operator.ordinal()); + out.writeInt(filters.size()); + for (RowFilterInterface filter : filters) { + out.writeUTF(filter.getClass().getName()); + filter.write(out); + } + } + +} diff --git a/src/test/org/apache/hadoop/hbase/filter/TestPageRowFilter.java b/src/test/org/apache/hadoop/hbase/filter/TestPageRowFilter.java new file mode 100644 index 00000000000..68aecf7bd1f --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/filter/TestPageRowFilter.java @@ -0,0 +1,48 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +import org.apache.hadoop.io.Text; + +import junit.framework.TestCase; + +public class TestPageRowFilter extends TestCase { + public void testPageSize() throws Exception { + final int pageSize = 3; + RowFilterInterface filter = new PageRowFilter(pageSize); + testFiltersBeyondPageSize(filter, pageSize); + // Test reset works by going in again. + filter.reset(); + testFiltersBeyondPageSize(filter, pageSize); + } + + private void testFiltersBeyondPageSize(final RowFilterInterface filter, + final int pageSize) { + for (int i = 0; i < (pageSize * 2); i++) { + Text row = new Text(Integer.toString(i)); + boolean filterOut = filter.filter(row); + if (!filterOut) { + assertFalse("Disagrees with 'filter'", filter.filterAllRemaining()); + filter.acceptedRow(row); + } else { + // Once we have all for a page, calls to filterAllRemaining should + // stay true. + assertTrue("Disagrees with 'filter'", filter.filterAllRemaining()); + assertTrue(i >= pageSize); + } + } + } +} diff --git a/src/test/org/apache/hadoop/hbase/filter/TestRegExpRowFilter.java b/src/test/org/apache/hadoop/hbase/filter/TestRegExpRowFilter.java new file mode 100644 index 00000000000..ebf19de7eae --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/filter/TestRegExpRowFilter.java @@ -0,0 +1,81 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +import java.util.Map; +import java.util.TreeMap; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.Text; + +public class TestRegExpRowFilter extends TestCase { + TreeMap colvalues; + RowFilterInterface filter; + final char FIRST_CHAR = 'a'; + final char LAST_CHAR = 'e'; + byte [] GOOD_BYTES = "abc".getBytes(); + final String HOST_PREFIX = "org.apache.site-"; + + @Override + protected void setUp() throws Exception { + super.setUp(); + this.colvalues = new TreeMap(); + for (char c = FIRST_CHAR; c < LAST_CHAR; c++) { + colvalues.put(new Text(new String(new char [] {c})), GOOD_BYTES); + } + this.filter = new RegExpRowFilter(HOST_PREFIX + ".*", colvalues); + } + + public void testRegexOnRow() throws Exception { + for (char c = FIRST_CHAR; c <= LAST_CHAR; c++) { + Text t = createRow(c); + assertFalse("Failed with characer " + c, filter.filter(t)); + } + String yahooSite = "com.yahoo.www"; + assertTrue("Failed with character " + + yahooSite, filter.filter(new Text(yahooSite))); + } + + public void testRegexOnRowAndColumn() throws Exception { + for (char c = FIRST_CHAR; c <= LAST_CHAR; c++) { + Text t = createRow(c); + for (Map.Entry e: this.colvalues.entrySet()) { + assertFalse("Failed on " + c, + this.filter.filter(t, e.getKey(), e.getValue())); + } + } + // Try a row and column I know will pass. + char c = 'c'; + Text r = createRow(c); + Text col = new Text(Character.toString(c)); + assertFalse("Failed with character " + c, + filter.filter(r, col, GOOD_BYTES)); + // Do same but with bad bytes. + assertTrue("Failed with character " + c, + filter.filter(r, col, "badbytes".getBytes())); + // Do with good bytes but bad column name. Should not filter out. + assertFalse("Failed with character " + c, + filter.filter(r, new Text("badcolumn"), GOOD_BYTES)); + // Good column, good bytes but bad row. + assertTrue("Failed with character " + c, + filter.filter(new Text("bad row"), new Text("badcolumn"), GOOD_BYTES)); + } + + private Text createRow(final char c) { + return new Text(HOST_PREFIX + Character.toString(c)); + } +}