From a93a8878fea49224310e9e51ac929c33ae6aa41f Mon Sep 17 00:00:00 2001 From: tedyu Date: Mon, 4 Apr 2016 12:52:24 -0700 Subject: [PATCH] HBASE-15485 Filter.reset() should not be called between batches (Phil Yang) --- .../hadoop/hbase/regionserver/HRegion.java | 4 +- .../hbase/regionserver/ScannerContext.java | 9 + .../filter/TestFilterFromRegionSide.java | 183 ++++++++++++++++++ 3 files changed, 194 insertions(+), 2 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterFromRegionSide.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 4da0f130413..acaecf1dd2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5652,7 +5652,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // partial Result means that we should not reset the filters; filters // should only be reset in // between rows - if (!scannerContext.partialResultFormed()) resetFilters(); + if (!scannerContext.midRowResultFormed()) resetFilters(); if (isFilterDoneInternal()) { moreValues = false; @@ -5727,7 +5727,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi nextKv = heap.peek(); moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); - if (scannerContext.checkBatchLimit(limitScope)) { + if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); } else if (scannerContext.checkSizeLimit(limitScope)) { ScannerContext.NextState state = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 6674443aeaa..de4647d8f67 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -229,6 +229,15 @@ public class ScannerContext { || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW; } + /** + * @return true when a mid-row result is formed. + */ + boolean midRowResultFormed() { + return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW + || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW + || scannerState == NextState.BATCH_LIMIT_REACHED; + } + /** * @param checkerScope * @return true if the batch limit can be enforced in the checker's scope diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterFromRegionSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterFromRegionSide.java new file mode 100644 index 00000000000..0a287ce8186 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterFromRegionSide.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HTestConst; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * To test behavior of filters at server from region side. + */ +@Category(SmallTests.class) +public class TestFilterFromRegionSide { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static HRegion REGION; + + private static TableName TABLE_NAME = TableName.valueOf("TestFilterFromRegionSide"); + + private static int NUM_ROWS = 5; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); + + // Should keep this value below 10 to keep generation of expected kv's simple. If above 10 then + // table/row/cf1/... will be followed by table/row/cf10/... instead of table/row/cf2/... which + // breaks the simple generation of expected kv's + private static int NUM_FAMILIES = 5; + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); + + private static int NUM_QUALIFIERS = 5; + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); + + private static int VALUE_SIZE = 1024; + private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); + + private static int NUM_COLS = NUM_FAMILIES * NUM_QUALIFIERS; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); + for (byte[] family : FAMILIES) { + HColumnDescriptor hcd = new HColumnDescriptor(family); + htd.addFamily(hcd); + } + HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); + REGION = HBaseTestingUtility + .createRegionAndWAL(info, TEST_UTIL.getDataTestDir(), TEST_UTIL.getConfiguration(), htd); + for(Put put:createPuts(ROWS, FAMILIES, QUALIFIERS, VALUE)){ + REGION.put(put); + } + } + + private static ArrayList createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, + byte[] value) throws IOException { + Put put; + ArrayList puts = new ArrayList<>(); + + for (byte[] row1 : rows) { + put = new Put(row1); + for (byte[] family : families) { + for (int qual = 0; qual < qualifiers.length; qual++) { + KeyValue kv = new KeyValue(row1, family, qualifiers[qual], qual, value); + put.add(kv); + } + } + puts.add(put); + } + + return puts; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + REGION.close(); + } + + @Test + public void testFirstKeyOnlyFilterAndBatch() throws IOException { + Scan scan = new Scan(); + scan.setFilter(new FirstKeyOnlyFilter()); + scan.setBatch(1); + InternalScanner scanner = REGION.getScanner(scan); + List results = new ArrayList<>(); + for (int i = 0; i < NUM_ROWS; i++) { + results.clear(); + scanner.next(results); + assertEquals(1, results.size()); + Cell cell = results.get(0); + assertArrayEquals(ROWS[i], + Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + } + assertFalse(scanner.next(results)); + scanner.close(); + } + + public static class FirstSeveralCellsFilter extends FilterBase{ + private int count = 0; + + public void reset() { + count = 0; + } + + @Override + public boolean filterRowKey(Cell cell) throws IOException { + return false; + } + + @Override + public ReturnCode filterKeyValue(Cell v) { + if (count++ < NUM_COLS) { + return ReturnCode.INCLUDE; + } + return ReturnCode.SKIP; + } + + public static Filter parseFrom(final byte [] pbBytes){ + return new FirstSeveralCellsFilter(); + } + } + + @Test + public void testFirstSeveralCellsFilterAndBatch() throws IOException { + Scan scan = new Scan(); + scan.setFilter(new FirstSeveralCellsFilter()); + scan.setBatch(NUM_COLS); + InternalScanner scanner = REGION.getScanner(scan); + List results = new ArrayList<>(); + for (int i = 0; i < NUM_ROWS; i++) { + results.clear(); + scanner.next(results); + assertEquals(NUM_COLS, results.size()); + Cell cell = results.get(0); + assertArrayEquals(ROWS[i], + Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + assertArrayEquals(FAMILIES[0], + Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())); + assertArrayEquals(QUALIFIERS[0], Bytes.copy(cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength())); + } + assertFalse(scanner.next(results)); + scanner.close(); + } +}