From e124054e4e8a505034802f255832d65eb3827156 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Thu, 3 Apr 2014 10:13:32 +0000 Subject: [PATCH] HBASE-10850 essential column family optimization is broken git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1584335 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/filter/FilterWrapper.java | 19 +- .../hadoop/hbase/regionserver/HRegion.java | 9 +- .../regionserver/TestSCVFWithMiniCluster.java | 242 ++++++++++++++++++ 3 files changed, 263 insertions(+), 7 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSCVFWithMiniCluster.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java index ffc56ae3d26..a69bc42969f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.FilterProtos; -import org.apache.zookeeper.KeeperException.UnimplementedException; import com.google.protobuf.InvalidProtocolBufferException; @@ -154,15 +153,29 @@ final public class FilterWrapper extends Filter { @Override public void filterRowCells(List kvs) throws IOException { + filterRowCellsWithRet(kvs); + } + + public enum FilterRowRetCode { + NOT_CALLED, + INCLUDE, // corresponds to filter.filterRow() returning false + EXCLUDE // corresponds to filter.filterRow() returning true + } + public FilterRowRetCode filterRowCellsWithRet(List kvs) throws IOException { //To fix HBASE-6429, //Filter with filterRow() returning true is incompatible with scan with limit //1. hasFilterRow() returns true, if either filterRow() or filterRow(kvs) is implemented. //2. filterRow() is merged with filterRow(kvs), //so that to make all those row related filtering stuff in the same function. this.filter.filterRowCells(kvs); - if (!kvs.isEmpty() && this.filter.filterRow()) { - kvs.clear(); + if (!kvs.isEmpty()) { + if (this.filter.filterRow()) { + kvs.clear(); + return FilterRowRetCode.EXCLUDE; + } + return FilterRowRetCode.INCLUDE; } + return FilterRowRetCode.NOT_CALLED; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 14ab2cb407b..c2c2813caa8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3636,7 +3636,7 @@ public class HRegion implements HeapSize { // , Writable{ // KeyValue indicating that limit is reached when scanning private final KeyValue KV_LIMIT = new KeyValue(); protected final byte[] stopRow; - private final Filter filter; + private final FilterWrapper filter; private int batch; protected int isScan; private boolean filterClosed = false; @@ -3916,14 +3916,15 @@ public class HRegion implements HeapSize { // , Writable{ isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength()); // save that the row was empty before filters applied to it. final boolean isEmptyRow = results.isEmpty(); - + // We have the part of the row necessary for filtering (all of it, usually). // First filter with the filterRow(List). + FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; if (filter != null && filter.hasFilterRow()) { - filter.filterRowCells(results); + ret = filter.filterRowCellsWithRet(results); } - if (isEmptyRow || filterRow()) { + if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) { results.clear(); boolean moreRows = nextRow(currentRow, offset, length); if (!moreRows) return false; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSCVFWithMiniCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSCVFWithMiniCluster.java new file mode 100644 index 00000000000..9dde1660b3e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSCVFWithMiniCluster.java @@ -0,0 +1,242 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static junit.framework.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +/* + * This test verifies that the scenarios illustrated by HBASE-10850 work + * w.r.t. essential column family optimization + */ +public class TestSCVFWithMiniCluster { + private static final String HBASE_TABLE_NAME = "TestSCVFWithMiniCluster"; + + private static final byte[] FAMILY_A = Bytes.toBytes("a"); + private static final byte[] FAMILY_B = Bytes.toBytes("b"); + + private static final byte[] QUALIFIER_FOO = Bytes.toBytes("foo"); + private static final byte[] QUALIFIER_BAR = Bytes.toBytes("bar"); + + private HTable htable; + + private Filter scanFilter; + + private int expected = 1; + + @Before + public void setUp() throws Exception { + HBaseTestingUtility util = new HBaseTestingUtility(); + + util.startMiniCluster(1); + + HBaseAdmin admin = util.getHBaseAdmin(); + destroy(admin, HBASE_TABLE_NAME); + create(admin, HBASE_TABLE_NAME, FAMILY_A, FAMILY_B); + admin.close(); + htable = new HTable(util.getConfiguration(), HBASE_TABLE_NAME); + + /* Add some values */ + List puts = new ArrayList(); + + /* Add a row with 'a:foo' = false */ + Put put = new Put(Bytes.toBytes("1")); + put.setDurability(Durability.SKIP_WAL); + put.add(FAMILY_A, QUALIFIER_FOO, Bytes.toBytes("false")); + put.add(FAMILY_A, QUALIFIER_BAR, Bytes.toBytes("_flag_")); + put.add(FAMILY_B, QUALIFIER_FOO, Bytes.toBytes("_flag_")); + put.add(FAMILY_B, QUALIFIER_BAR, Bytes.toBytes("_flag_")); + puts.add(put); + + /* Add a row with 'a:foo' = true */ + put = new Put(Bytes.toBytes("2")); + put.setDurability(Durability.SKIP_WAL); + put.add(FAMILY_A, QUALIFIER_FOO, Bytes.toBytes("true")); + put.add(FAMILY_A, QUALIFIER_BAR, Bytes.toBytes("_flag_")); + put.add(FAMILY_B, QUALIFIER_FOO, Bytes.toBytes("_flag_")); + put.add(FAMILY_B, QUALIFIER_BAR, Bytes.toBytes("_flag_")); + puts.add(put); + + /* Add a row with 'a:foo' qualifier not set */ + put = new Put(Bytes.toBytes("3")); + put.setDurability(Durability.SKIP_WAL); + put.add(FAMILY_A, QUALIFIER_BAR, Bytes.toBytes("_flag_")); + put.add(FAMILY_B, QUALIFIER_FOO, Bytes.toBytes("_flag_")); + put.add(FAMILY_B, QUALIFIER_BAR, Bytes.toBytes("_flag_")); + puts.add(put); + + htable.put(puts); + /* + * We want to filter out from the scan all rows that do not have the column 'a:foo' with value + * 'false'. Only row with key '1' should be returned in the scan. + */ + scanFilter = new SingleColumnValueFilter(FAMILY_A, QUALIFIER_FOO, CompareOp.EQUAL, + new BinaryComparator(Bytes.toBytes("false"))); + ((SingleColumnValueFilter) scanFilter).setFilterIfMissing(true); + } + @After + public void tearDown() throws Exception { + htable.close(); + } + + private void verify(Scan scan) throws IOException { + ResultScanner scanner = htable.getScanner(scan); + Iterator it = scanner.iterator(); + + /* Then */ + int count = 0; + try { + while (it.hasNext()) { + it.next(); + count++; + } + } finally { + scanner.close(); + } + assertEquals(expected, count); + } + /** + * Test the filter by adding all columns of family A in the scan. (OK) + */ + @Test + public void scanWithAllQualifiersOfFamiliyA() throws IOException { + /* Given */ + Scan scan = new Scan(); + scan.addFamily(FAMILY_A); + scan.setFilter(scanFilter); + + verify(scan); + } + + /** + * Test the filter by adding all columns of family A and B in the scan. (KO: row '3' without + * 'a:foo' qualifier is returned) + */ + @Test + public void scanWithAllQualifiersOfBothFamilies() throws IOException { + /* When */ + Scan scan = new Scan(); + scan.setFilter(scanFilter); + + verify(scan); + } + + /** + * Test the filter by adding 2 columns of family A and 1 column of family B in the scan. (KO: row + * '3' without 'a:foo' qualifier is returned) + */ + @Test + public void scanWithSpecificQualifiers1() throws IOException { + /* When */ + Scan scan = new Scan(); + scan.addColumn(FAMILY_A, QUALIFIER_FOO); + scan.addColumn(FAMILY_A, QUALIFIER_BAR); + scan.addColumn(FAMILY_B, QUALIFIER_BAR); + scan.addColumn(FAMILY_B, QUALIFIER_FOO); + scan.setFilter(scanFilter); + + verify(scan); + } + + /** + * Test the filter by adding 1 column of family A (the one used in the filter) and 1 column of + * family B in the scan. (OK) + */ + @Test + public void scanWithSpecificQualifiers2() throws IOException { + /* When */ + Scan scan = new Scan(); + scan.addColumn(FAMILY_A, QUALIFIER_FOO); + scan.addColumn(FAMILY_B, QUALIFIER_BAR); + scan.setFilter(scanFilter); + + verify(scan); + } + + /** + * Test the filter by adding 2 columns of family A in the scan. (OK) + */ + @Test + public void scanWithSpecificQualifiers3() throws IOException { + /* When */ + Scan scan = new Scan(); + scan.addColumn(FAMILY_A, QUALIFIER_FOO); + scan.addColumn(FAMILY_A, QUALIFIER_BAR); + scan.setFilter(scanFilter); + + verify(scan); + } + + private static void create(HBaseAdmin admin, String tableName, byte[]... families) + throws IOException { + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + for (byte[] family : families) { + HColumnDescriptor colDesc = new HColumnDescriptor(family); + colDesc.setMaxVersions(1); + colDesc.setCompressionType(Algorithm.GZ); + desc.addFamily(colDesc); + } + try { + admin.createTable(desc); + } catch (TableExistsException tee) { + /* Ignore */ + } + } + + private static void destroy(HBaseAdmin admin, String tableName) throws IOException { + try { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } catch (TableNotFoundException tnfe) { + /* Ignore */ + } + } +}