HBASE-476 RegexpRowFilter behaves incorectly when there are multiple store files (Clint Morgan via Jim Kellerman)
HBASE-527 RegexpRowFilter does not work when there are columns from multiple families (Clint Morgan via Jim Kellerman) git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@640106 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e7b705a808
commit
aabe9f09a9
|
@ -50,6 +50,10 @@ Hbase Change Log
|
|||
HBASE-531 Merge tool won't merge two overlapping regions (port HBASE-483 to
|
||||
trunk)
|
||||
HBASE-537 Wait for hdfs to exit safe mode
|
||||
HBASE-476 RegexpRowFilter behaves incorectly when there are multiple store
|
||||
files (Clint Morgan via Jim Kellerman)
|
||||
HBASE-527 RegexpRowFilter does not work when there are columns from
|
||||
multiple families (Clint Morgan via Jim Kellerman)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-415 Rewrite leases to use DelayedBlockingQueue instead of polling
|
||||
|
|
|
@ -19,17 +19,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.filter;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
|
||||
/*
|
||||
/**
|
||||
* Subclass of StopRowFilter that filters rows > the stop row,
|
||||
* making it include up to the last row but no further.
|
||||
*/
|
||||
|
@ -49,6 +41,8 @@ public class InclusiveStopRowFilter extends StopRowFilter{
|
|||
super(stopRowKey);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public boolean filter(final Text rowKey) {
|
||||
if (rowKey == null) {
|
||||
if (this.stopRowKey == null) {
|
||||
|
|
|
@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.filter;
|
|||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.TreeMap;
|
||||
import java.util.SortedMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -137,7 +137,7 @@ public class PageRowFilter implements RowFilterInterface {
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
public boolean filterNotNull(@SuppressWarnings("unused")
|
||||
final TreeMap<Text, byte[]> columns) {
|
||||
final SortedMap<Text, byte[]> columns) {
|
||||
return filterAllRemaining();
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.SortedMap;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
@ -196,7 +196,7 @@ public class RegExpRowFilter implements RowFilterInterface {
|
|||
*
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public boolean filterNotNull(final TreeMap<Text, byte[]> columns) {
|
||||
public boolean filterNotNull(final SortedMap<Text, byte[]> columns) {
|
||||
for (Entry<Text, byte[]> col : columns.entrySet()) {
|
||||
if (nullColumns.contains(col.getKey())
|
||||
&& !HLogEdit.isDeleted(col.getValue())) {
|
||||
|
@ -211,7 +211,7 @@ public class RegExpRowFilter implements RowFilterInterface {
|
|||
if (!columns.containsKey(col)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("filterNotNull returning true for colKey: " + col +
|
||||
", column not found in given TreeMap<Text, byte[]>.");
|
||||
", column not found in given SortedMap<Text, byte[]>.");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.filter;
|
||||
|
||||
import java.util.TreeMap;
|
||||
import java.util.SortedMap;
|
||||
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
@ -108,7 +108,7 @@ public interface RowFilterInterface extends Writable {
|
|||
* @param columns
|
||||
* @return true if null/non-null criteria not met.
|
||||
*/
|
||||
boolean filterNotNull(final TreeMap<Text, byte[]> columns);
|
||||
boolean filterNotNull(final SortedMap<Text, byte[]> columns);
|
||||
|
||||
/**
|
||||
* Validates that this filter applies only to a subset of the given columns.
|
||||
|
|
|
@ -24,7 +24,7 @@ import java.io.DataOutput;
|
|||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.SortedMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -232,7 +232,7 @@ public class RowFilterSet implements RowFilterInterface {
|
|||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public boolean filterNotNull(final TreeMap<Text, byte[]> columns) {
|
||||
public boolean filterNotNull(final SortedMap<Text, byte[]> columns) {
|
||||
boolean resultFound = false;
|
||||
boolean result = operator == Operator.MUST_PASS_ONE;
|
||||
for (RowFilterInterface filter : filters) {
|
||||
|
|
|
@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.filter;
|
|||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.TreeMap;
|
||||
import java.util.SortedMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -132,7 +132,7 @@ public class StopRowFilter implements RowFilterInterface {
|
|||
* @param columns
|
||||
*/
|
||||
public boolean filterNotNull(@SuppressWarnings("unused")
|
||||
final TreeMap<Text, byte[]> columns) {
|
||||
final SortedMap<Text, byte[]> columns) {
|
||||
return filterAllRemaining();
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.filter;
|
|||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.TreeMap;
|
||||
import java.util.SortedMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -31,7 +31,7 @@ import org.apache.hadoop.io.Text;
|
|||
/**
|
||||
* WhileMatchRowFilter is a wrapper filter that filters everything after the
|
||||
* first filtered row. Once the nested filter returns true for either of it's
|
||||
* filter(..) methods or filterNotNull(TreeMap<Text, byte[]>), this wrapper's
|
||||
* filter(..) methods or filterNotNull(SortedMap<Text, byte[]>), this wrapper's
|
||||
* filterAllRemaining() will return true. All filtering methods will
|
||||
* thereafter defer to the result of filterAllRemaining().
|
||||
*/
|
||||
|
@ -115,7 +115,7 @@ public class WhileMatchRowFilter implements RowFilterInterface {
|
|||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public boolean filterNotNull(final TreeMap<Text, byte[]> columns) {
|
||||
public boolean filterNotNull(final SortedMap<Text, byte[]> columns) {
|
||||
changeFAR(this.filter.filterNotNull(columns));
|
||||
boolean result = filterAllRemaining();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
|
|
@ -1721,12 +1721,14 @@ public class HRegion implements HConstants {
|
|||
private HInternalScannerInterface[] scanners;
|
||||
private TreeMap<Text, byte []>[] resultSets;
|
||||
private HStoreKey[] keys;
|
||||
private RowFilterInterface filter;
|
||||
|
||||
/** Create an HScanner with a handle on many HStores. */
|
||||
@SuppressWarnings("unchecked")
|
||||
HScanner(Text[] cols, Text firstRow, long timestamp, HStore[] stores,
|
||||
RowFilterInterface filter)
|
||||
throws IOException {
|
||||
this.filter = filter;
|
||||
this.scanners = new HInternalScannerInterface[stores.length];
|
||||
try {
|
||||
for (int i = 0; i < stores.length; i++) {
|
||||
|
@ -1737,7 +1739,7 @@ public class HRegion implements HConstants {
|
|||
// At least WhileMatchRowFilter will mess up the scan if only
|
||||
// one shared across many rows. See HADOOP-2467.
|
||||
scanners[i] = stores[i].getScanner(timestamp, cols, firstRow,
|
||||
(i > 0 && filter != null)?
|
||||
filter != null ?
|
||||
(RowFilterInterface)WritableUtils.clone(filter, conf) : filter);
|
||||
}
|
||||
} catch(IOException e) {
|
||||
|
@ -1839,6 +1841,12 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (filter != null && filter.filterNotNull(results)) {
|
||||
LOG.warn("Filter return true on assembled Results in hstore");
|
||||
return moreToFollow == true && this.next(key, results);
|
||||
}
|
||||
|
||||
return moreToFollow;
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
||||
|
@ -167,11 +166,6 @@ class HStoreScanner implements HInternalScannerInterface {
|
|||
break;
|
||||
}
|
||||
|
||||
// Filter out null criteria columns that are not null
|
||||
if (dataFilter != null) {
|
||||
filtered = dataFilter.filterNotNull(resultSets[i]);
|
||||
}
|
||||
|
||||
// NOTE: We used to do results.putAll(resultSets[i]);
|
||||
// but this had the effect of overwriting newer
|
||||
// values with older ones. So now we only insert
|
||||
|
|
|
@ -0,0 +1,225 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.filter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseClusterTestCase;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HScannerInterface;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
/** Test regexp filters HBASE-476 */
|
||||
public class TestRowFilterAfterWrite extends HBaseClusterTestCase {
|
||||
|
||||
@SuppressWarnings("hiding")
|
||||
private static final Log LOG = LogFactory.getLog(TestRowFilterAfterWrite.class.getName());
|
||||
|
||||
static final String TABLE_NAME = "TestTable";
|
||||
static final String FAMILY = "C:";
|
||||
static final String COLUMN1 = FAMILY + "col1";
|
||||
static final Text TEXT_COLUMN1 = new Text(COLUMN1);
|
||||
static final String COLUMN2 = FAMILY + "col2";
|
||||
static final Text TEXT_COLUMN2 = new Text(COLUMN2);
|
||||
|
||||
private static final Text[] columns = {
|
||||
TEXT_COLUMN1, TEXT_COLUMN2
|
||||
};
|
||||
|
||||
private static final int NUM_ROWS = 10;
|
||||
private static final int VALUE_SIZE = 1000;
|
||||
private static final byte[] VALUE = new byte[VALUE_SIZE];
|
||||
private static final int COL_2_SIZE = 5;
|
||||
private static final int KEY_SIZE = 9;
|
||||
private static final int NUM_REWRITES = 10;
|
||||
static {
|
||||
Arrays.fill(VALUE, (byte) 'a');
|
||||
}
|
||||
|
||||
/** constructor */
|
||||
public TestRowFilterAfterWrite() {
|
||||
super();
|
||||
|
||||
// Make sure the cache gets flushed so we get multiple stores
|
||||
conf.setInt("hbase.hregion.memcache.flush.size", (NUM_ROWS * (VALUE_SIZE + COL_2_SIZE + KEY_SIZE)));
|
||||
LOG.info("memcach flush : " + conf.get("hbase.hregion.memcache.flush.size"));
|
||||
conf.setInt("hbase.regionserver.optionalcacheflushinterval", 100000000);
|
||||
// Avoid compaction to keep multiple stores
|
||||
conf.setInt("hbase.hstore.compactionThreshold", 10000);
|
||||
|
||||
// Make lease timeout longer, lease checks less frequent
|
||||
conf.setInt("hbase.master.lease.period", 10 * 1000);
|
||||
conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
|
||||
|
||||
// For debugging
|
||||
conf.setInt("hbase.regionserver.lease.period", 20 * 60 * 1000);
|
||||
conf.setInt("ipc.client.timeout", 20 * 60 * 1000);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
// this.conf.set(HConstants.HBASE_DIR, "file:///opt/benchmark/hadoop/hbase");
|
||||
this.conf.set(HConstants.MASTER_ADDRESS, "0.0.0.0:60100");
|
||||
// Must call super.setup() after starting mini dfs cluster. Otherwise
|
||||
// we get a local file system instead of hdfs
|
||||
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test hbase mapreduce jobs against single region and multi-region tables.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void testAfterWrite() throws IOException, InterruptedException {
|
||||
singleTableTest();
|
||||
}
|
||||
|
||||
/*
|
||||
* Test against a single region. @throws IOException
|
||||
*/
|
||||
private void singleTableTest() throws IOException, InterruptedException {
|
||||
HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
|
||||
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||
|
||||
// Create a table.
|
||||
HBaseAdmin admin = new HBaseAdmin(this.conf);
|
||||
admin.createTable(desc);
|
||||
|
||||
// insert some data into the test table
|
||||
HTable table = new HTable(conf, new Text(TABLE_NAME));
|
||||
|
||||
for (int i = 0; i < NUM_ROWS; i++) {
|
||||
BatchUpdate b =
|
||||
new BatchUpdate(new Text("row_" + String.format("%1$05d", i)));
|
||||
|
||||
b.put(TEXT_COLUMN1, VALUE);
|
||||
b.put(TEXT_COLUMN2, String.format("%1$05d", i).getBytes());
|
||||
table.commit(b);
|
||||
}
|
||||
|
||||
// LOG.info("Print table contents using scanner before map/reduce for " + TABLE_NAME);
|
||||
// scanTable(TABLE_NAME, false);
|
||||
// LOG.info("Print table contents using scanner+filter before map/reduce for " + TABLE_NAME);
|
||||
// scanTableWithRowFilter(TABLE_NAME, false);
|
||||
|
||||
// Do some identity write operations on one column of the data.
|
||||
for (int n = 0; n < NUM_REWRITES; n++) {
|
||||
for (int i = 0; i < NUM_ROWS; i++) {
|
||||
BatchUpdate b =
|
||||
new BatchUpdate(new Text("row_" + String.format("%1$05d", i)));
|
||||
|
||||
b.put(TEXT_COLUMN2, String.format("%1$05d", i).getBytes());
|
||||
table.commit(b);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the flush to happen
|
||||
LOG.info("Waiting, for flushes to complete");
|
||||
Thread.sleep(5 * 1000);
|
||||
// Wait for the flush to happen
|
||||
LOG.info("Done. No flush should happen after this");
|
||||
|
||||
// Do another round so to populate the mem cache
|
||||
for (int i = 0; i < NUM_ROWS; i++) {
|
||||
BatchUpdate b =
|
||||
new BatchUpdate(new Text("row_" + String.format("%1$05d", i)));
|
||||
|
||||
b.put(TEXT_COLUMN2, String.format("%1$05d", i).getBytes());
|
||||
table.commit(b);
|
||||
}
|
||||
|
||||
LOG.info("Print table contents using scanner after map/reduce for " + TABLE_NAME);
|
||||
scanTable(TABLE_NAME, true);
|
||||
LOG.info("Print table contents using scanner+filter after map/reduce for " + TABLE_NAME);
|
||||
scanTableWithRowFilter(TABLE_NAME, true);
|
||||
}
|
||||
|
||||
private void scanTable(final String tableName, final boolean printValues) throws IOException {
|
||||
HTable table = new HTable(conf, new Text(tableName));
|
||||
|
||||
HScannerInterface scanner = table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
|
||||
int numFound = doScan(scanner, printValues);
|
||||
Assert.assertEquals(NUM_ROWS, numFound);
|
||||
}
|
||||
|
||||
private void scanTableWithRowFilter(final String tableName, final boolean printValues) throws IOException {
|
||||
HTable table = new HTable(conf, new Text(tableName));
|
||||
Map<Text, byte[]> columnMap = new HashMap<Text, byte[]>();
|
||||
columnMap.put(TEXT_COLUMN1, VALUE);
|
||||
RegExpRowFilter filter = new RegExpRowFilter(null, columnMap);
|
||||
HScannerInterface scanner = table.obtainScanner(columns, HConstants.EMPTY_START_ROW, filter);
|
||||
int numFound = doScan(scanner, printValues);
|
||||
Assert.assertEquals(NUM_ROWS, numFound);
|
||||
}
|
||||
|
||||
private int doScan(final HScannerInterface scanner, final boolean printValues) throws IOException {
|
||||
{
|
||||
int count = 0;
|
||||
|
||||
try {
|
||||
HStoreKey key = new HStoreKey();
|
||||
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
while (scanner.next(key, results)) {
|
||||
if (printValues) {
|
||||
LOG.info("row: " + key.getRow());
|
||||
|
||||
for (Map.Entry<Text, byte[]> e : results.entrySet()) {
|
||||
LOG.info(" column: " + e.getKey() + " value: "
|
||||
+ new String(e.getValue(), HConstants.UTF8_ENCODING));
|
||||
}
|
||||
}
|
||||
count++;
|
||||
}
|
||||
|
||||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
return count;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,133 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.filter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseClusterTestCase;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HScannerInterface;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
/**
|
||||
* Test for regexp filters (HBASE-527)
|
||||
*/
|
||||
public class TestRowFilterOnMultipleFamilies extends HBaseClusterTestCase {
|
||||
private static final Log LOG = LogFactory.getLog(TestRowFilterOnMultipleFamilies.class.getName());
|
||||
|
||||
static final String TABLE_NAME = "TestTable";
|
||||
static final String COLUMN1 = "A:col1";
|
||||
static final Text TEXT_COLUMN1 = new Text(COLUMN1);
|
||||
static final String COLUMN2 = "B:col2";
|
||||
static final Text TEXT_COLUMN2 = new Text(COLUMN2);
|
||||
|
||||
private static final Text[] columns = {
|
||||
TEXT_COLUMN1, TEXT_COLUMN2
|
||||
};
|
||||
|
||||
private static final int NUM_ROWS = 10;
|
||||
private static final byte[] VALUE = "HELLO".getBytes();
|
||||
|
||||
/** @throws IOException */
|
||||
public void testMultipleFamilies() throws IOException {
|
||||
HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
|
||||
desc.addFamily(new HColumnDescriptor("A:"));
|
||||
desc.addFamily(new HColumnDescriptor("B:"));
|
||||
|
||||
// Create a table.
|
||||
HBaseAdmin admin = new HBaseAdmin(this.conf);
|
||||
admin.createTable(desc);
|
||||
|
||||
// insert some data into the test table
|
||||
HTable table = new HTable(conf, new Text(TABLE_NAME));
|
||||
|
||||
for (int i = 0; i < NUM_ROWS; i++) {
|
||||
BatchUpdate b =
|
||||
new BatchUpdate(new Text("row_" + String.format("%1$05d", i)));
|
||||
b.put(TEXT_COLUMN1, VALUE);
|
||||
b.put(TEXT_COLUMN2, String.format("%1$05d", i).getBytes());
|
||||
table.commit(b);
|
||||
}
|
||||
|
||||
LOG.info("Print table contents using scanner before map/reduce for " + TABLE_NAME);
|
||||
scanTable(TABLE_NAME, true);
|
||||
LOG.info("Print table contents using scanner+filter before map/reduce for " + TABLE_NAME);
|
||||
scanTableWithRowFilter(TABLE_NAME, true);
|
||||
}
|
||||
|
||||
private void scanTable(final String tableName, final boolean printValues) throws IOException {
|
||||
HTable table = new HTable(conf, new Text(tableName));
|
||||
|
||||
HScannerInterface scanner = table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
|
||||
int numFound = doScan(scanner, printValues);
|
||||
Assert.assertEquals(NUM_ROWS, numFound);
|
||||
}
|
||||
|
||||
private void scanTableWithRowFilter(final String tableName, final boolean printValues) throws IOException {
|
||||
HTable table = new HTable(conf, new Text(tableName));
|
||||
Map<Text, byte[]> columnMap = new HashMap<Text, byte[]>();
|
||||
columnMap.put(TEXT_COLUMN1, VALUE);
|
||||
RegExpRowFilter filter = new RegExpRowFilter(null, columnMap);
|
||||
HScannerInterface scanner = table.obtainScanner(columns, HConstants.EMPTY_START_ROW, filter);
|
||||
int numFound = doScan(scanner, printValues);
|
||||
Assert.assertEquals(NUM_ROWS, numFound);
|
||||
}
|
||||
|
||||
private int doScan(final HScannerInterface scanner, final boolean printValues) throws IOException {
|
||||
{
|
||||
int count = 0;
|
||||
|
||||
try {
|
||||
HStoreKey key = new HStoreKey();
|
||||
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
while (scanner.next(key, results)) {
|
||||
if (printValues) {
|
||||
LOG.info("row: " + key.getRow());
|
||||
|
||||
for (Map.Entry<Text, byte[]> e : results.entrySet()) {
|
||||
LOG.info(" column: " + e.getKey() + " value: "
|
||||
+ new String(e.getValue(), HConstants.UTF8_ENCODING));
|
||||
}
|
||||
}
|
||||
Assert.assertEquals(2, results.size());
|
||||
count++;
|
||||
}
|
||||
|
||||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
return count;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue