HADOOP-1637 ] Fix to HScanner to Support Filters, Add Filter Tests to
TestScanner2 git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@558897 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
377bf72458
commit
43e253359a
|
@ -71,3 +71,5 @@ Trunk (unreleased changes)
|
|||
(Izaak Rubin via Stack)
|
||||
46. HADOOP-1579 Add new WhileMatchRowFilter and StopRowFilter filters
|
||||
(Izaak Rubin via Stack)
|
||||
47. HADOOP-1637 Fix to HScanner to Support Filters, Add Filter Tests to
|
||||
TestScanner2 (Izaak Rubin via Stack)
|
||||
|
|
|
@ -1339,34 +1339,34 @@ public class HRegion implements HConstants {
|
|||
try {
|
||||
HInternalScannerInterface scanner =
|
||||
memcache.getScanner(timestamp, cols, firstRow);
|
||||
if(scanner.isWildcardScanner()) {
|
||||
if (scanner.isWildcardScanner()) {
|
||||
this.wildcardMatch = true;
|
||||
}
|
||||
if(scanner.isMultipleMatchScanner()) {
|
||||
if (scanner.isMultipleMatchScanner()) {
|
||||
this.multipleMatchers = true;
|
||||
}
|
||||
scanners[0] = scanner;
|
||||
|
||||
for(int i = 0; i < stores.length; i++) {
|
||||
for (int i = 0; i < stores.length; i++) {
|
||||
scanner = stores[i].getScanner(timestamp, cols, firstRow);
|
||||
if(scanner.isWildcardScanner()) {
|
||||
if (scanner.isWildcardScanner()) {
|
||||
this.wildcardMatch = true;
|
||||
}
|
||||
if(scanner.isMultipleMatchScanner()) {
|
||||
if (scanner.isMultipleMatchScanner()) {
|
||||
this.multipleMatchers = true;
|
||||
}
|
||||
scanners[i + 1] = scanner;
|
||||
}
|
||||
|
||||
} catch(IOException e) {
|
||||
for(int i = 0; i < this.scanners.length; i++) {
|
||||
for (int i = 0; i < this.scanners.length; i++) {
|
||||
if(scanners[i] != null) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
for(int i = 0; i < scanners.length; i++) {
|
||||
for (int i = 0; i < scanners.length; i++) {
|
||||
keys[i] = new HStoreKey();
|
||||
resultSets[i] = new TreeMap<Text, byte []>();
|
||||
if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
|
||||
|
@ -1428,9 +1428,8 @@ public class HRegion implements HConstants {
|
|||
&& moreToFollow)
|
||||
&& (keys[i].getRow().compareTo(chosenRow) == 0)) {
|
||||
// If we are doing a wild card match or there are multiple
|
||||
// matchers
|
||||
// per column, we need to scan all the older versions of this row
|
||||
// to pick up the rest of the family members
|
||||
// matchers per column, we need to scan all the older versions of
|
||||
// this row to pick up the rest of the family members
|
||||
|
||||
if (!wildcardMatch
|
||||
&& !multipleMatchers
|
||||
|
@ -1469,19 +1468,21 @@ public class HRegion implements HConstants {
|
|||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
|
||||
// If the current scanner is non-null AND has a lower-or-equal
|
||||
// row label, then its timestamp is bad. We need to advance it.
|
||||
while ((scanners[i] != null) &&
|
||||
(keys[i].getRow().compareTo(chosenRow) <= 0)) {
|
||||
resultSets[i].clear();
|
||||
if (!scanners[i].next(keys[i], resultSets[i])) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < scanners.length; i++) {
|
||||
// If the current scanner is non-null AND has a lower-or-equal
|
||||
// row label, then its timestamp is bad. We need to advance it.
|
||||
while ((scanners[i] != null) &&
|
||||
(keys[i].getRow().compareTo(chosenRow) <= 0)) {
|
||||
resultSets[i].clear();
|
||||
if (!scanners[i].next(keys[i], resultSets[i])) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
moreToFollow = chosenTimestamp > 0;
|
||||
|
||||
if (dataFilter != null) {
|
||||
|
@ -1492,7 +1493,10 @@ public class HRegion implements HConstants {
|
|||
moreToFollow = false;
|
||||
LOG.debug("page limit");
|
||||
}
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("ROWKEY = " + chosenRow + ", FILTERED = " + filtered);
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure scanners closed if no more results
|
||||
|
@ -1507,7 +1511,7 @@ public class HRegion implements HConstants {
|
|||
return moreToFollow;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/** Shut down a single scanner */
|
||||
void closeScanner(int i) {
|
||||
try {
|
||||
|
|
|
@ -21,12 +21,21 @@ package org.apache.hadoop.hbase;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.filter.RegExpRowFilter;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterSet;
|
||||
import org.apache.hadoop.hbase.filter.StopRowFilter;
|
||||
import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
|
||||
import org.apache.hadoop.hbase.io.KeyedData;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
|
@ -39,6 +48,109 @@ import org.apache.hadoop.io.Text;
|
|||
public class TestScanner2 extends HBaseClusterTestCase {
|
||||
final Log LOG = LogFactory.getLog(this.getClass().getName());
|
||||
|
||||
final char FIRST_ROWKEY = 'a';
|
||||
final char FIRST_BAD_RANGE_ROWKEY = 'j';
|
||||
final char LAST_BAD_RANGE_ROWKEY = 'q';
|
||||
final char LAST_ROWKEY = 'z';
|
||||
final char FIRST_COLKEY = '0';
|
||||
final char LAST_COLKEY = '3';
|
||||
final byte[] GOOD_BYTES = "goodstuff".getBytes();
|
||||
final byte[] BAD_BYTES = "badstuff".getBytes();
|
||||
|
||||
/**
|
||||
* Test the scanner's handling of various filters.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testScannerFilter() throws Exception {
|
||||
// Setup HClient, ensure that it is running correctly
|
||||
HClient client = new HClient(this.conf);
|
||||
|
||||
// Setup colkeys to be inserted
|
||||
HTableDescriptor htd = new HTableDescriptor(getName());
|
||||
Text tableName = new Text(getName());
|
||||
Text[] colKeys = new Text[(int)(LAST_COLKEY - FIRST_COLKEY) + 1];
|
||||
for (char i = 0; i < colKeys.length; i++) {
|
||||
colKeys[i] = new Text(new String(new char[] {
|
||||
(char)(FIRST_COLKEY + i), ':' }));
|
||||
htd.addFamily(new HColumnDescriptor(colKeys[i].toString()));
|
||||
}
|
||||
client.createTable(htd);
|
||||
assertTrue("Table with name " + tableName + " created successfully.",
|
||||
client.tableExists(tableName));
|
||||
assertTrue("Master is running.", client.isMasterRunning());
|
||||
|
||||
// Enter data
|
||||
client.openTable(tableName);
|
||||
for (char i = FIRST_ROWKEY; i <= LAST_ROWKEY; i++) {
|
||||
Text rowKey = new Text(new String(new char[] { i }));
|
||||
long lockID = client.startUpdate(rowKey);
|
||||
for (char j = 0; j < colKeys.length; j++) {
|
||||
client.put(lockID, colKeys[j], (i >= FIRST_BAD_RANGE_ROWKEY &&
|
||||
i <= LAST_BAD_RANGE_ROWKEY)? BAD_BYTES : GOOD_BYTES);
|
||||
}
|
||||
client.commit(lockID);
|
||||
}
|
||||
|
||||
regExpFilterTest(client, colKeys);
|
||||
rowFilterSetTest(client, colKeys);
|
||||
}
|
||||
|
||||
private void regExpFilterTest(HClient client, Text[] colKeys)
|
||||
throws Exception {
|
||||
// Get the filter. The RegExpRowFilter used should filter out vowels.
|
||||
Map<Text, byte[]> colCriteria = new TreeMap<Text, byte[]>();
|
||||
for (int i = 0; i < colKeys.length; i++) {
|
||||
colCriteria.put(colKeys[i], GOOD_BYTES);
|
||||
}
|
||||
RowFilterInterface filter = new RegExpRowFilter("[^aeiou]", colCriteria);
|
||||
|
||||
// Create the scanner from the filter.
|
||||
HScannerInterface scanner = client.obtainScanner(colKeys, new Text(new
|
||||
String(new char[] { FIRST_ROWKEY })), filter);
|
||||
|
||||
// Iterate over the scanner, ensuring that results match the passed regex.
|
||||
iterateOnScanner(scanner, "[^aei-qu]");
|
||||
}
|
||||
|
||||
private void rowFilterSetTest(HClient client, Text[] colKeys)
|
||||
throws Exception {
|
||||
// Get the filter. The RegExpRowFilter used should filter out vowels and
|
||||
// the WhileMatchRowFilter(StopRowFilter) should filter out all rows
|
||||
// greater than or equal to 'r'.
|
||||
Set<RowFilterInterface> filterSet = new HashSet<RowFilterInterface>();
|
||||
filterSet.add(new RegExpRowFilter("[^aeiou]"));
|
||||
filterSet.add(new WhileMatchRowFilter(new StopRowFilter(new Text("r"))));
|
||||
RowFilterInterface filter =
|
||||
new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL, filterSet);
|
||||
|
||||
// Create the scanner from the filter.
|
||||
HScannerInterface scanner = client.obtainScanner(colKeys, new Text(new
|
||||
String(new char[] { FIRST_ROWKEY })), filter);
|
||||
|
||||
// Iterate over the scanner, ensuring that results match the passed regex.
|
||||
iterateOnScanner(scanner, "[^aeior-z]");
|
||||
}
|
||||
|
||||
private void iterateOnScanner(HScannerInterface scanner, String regexToMatch)
|
||||
throws Exception {
|
||||
// A pattern that will only match rows that should not have been filtered.
|
||||
Pattern p = Pattern.compile(regexToMatch);
|
||||
|
||||
try {
|
||||
// Use the scanner to ensure all results match the above pattern.
|
||||
HStoreKey rowKey = new HStoreKey();
|
||||
TreeMap<Text, byte[]> columns = new TreeMap<Text, byte[]>();
|
||||
while (scanner.next(rowKey, columns)) {
|
||||
String key = rowKey.getRow().toString();
|
||||
assertTrue("Shouldn't have extracted '" + key + "'",
|
||||
p.matcher(key).matches());
|
||||
}
|
||||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test scanning of META table around split.
|
||||
* There was a problem where only one of the splits showed in a scan.
|
||||
|
|
Loading…
Reference in New Issue