HBASE-6912 Filters are not properly applied in certain cases (revert HBASE-6562)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1393853 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2012-10-04 00:38:45 +00:00
parent ba92b2f2a1
commit ec4cb63a09
3 changed files with 28 additions and 84 deletions

View File

@ -3483,15 +3483,14 @@ public class HRegion implements HeapSize { // , Writable{
rpcCall.throwExceptionIfCallerDisconnected(); rpcCall.throwExceptionIfCallerDisconnected();
} }
KeyValue kv = this.storeHeap.peek(); byte [] currentRow = peekRow();
byte [] currentRow = kv == null ? null : kv.getRow();
if (isStopRow(currentRow)) { if (isStopRow(currentRow)) {
if (filter != null && filter.hasFilterRow()) { if (filter != null && filter.hasFilterRow()) {
filter.filterRow(results); filter.filterRow(results);
} }
return false; return false;
} else if (kv != null && !kv.isInternal() && filterRowKey(currentRow)) { } else if (filterRowKey(currentRow)) {
nextRow(currentRow); nextRow(currentRow);
} else { } else {
byte [] nextRow; byte [] nextRow;

View File

@ -1,81 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.*;
import org.junit.experimental.categories.Category;
/**
* Make sure the fake KVs created internally are never user visible
* (not even to filters)
*/
@Category(SmallTests.class)
public class TestFakeKeyInFilter extends BinaryComparator {
protected static HBaseTestingUtility UTIL = new HBaseTestingUtility();
public TestFakeKeyInFilter() {
super(Bytes.toBytes("foo"));
}
@Override
public int compareTo(byte[] value, int offset, int length) {
if (value.length == 0) {
throw new RuntimeException("Found mysterious empty row");
}
return 0;
}
/**
* Simple way to verify the scenario.
* There are no KVs with an empty row key in the
* table, yet such a KV is presented to the filter.
*/
@Test
public void testForEmptyRowKey() throws Exception {
byte[] table = Bytes.toBytes("testForEmptyRowKey");
byte[] row = Bytes.toBytes("myRow");
byte[] cf = Bytes.toBytes("myFamily");
byte[] cq = Bytes.toBytes("myColumn");
HTableDescriptor desc = new HTableDescriptor(table);
desc.addFamily(new HColumnDescriptor(cf));
HRegionInfo hri = new HRegionInfo(desc.getName(), null, null);
HRegion region = HRegion.createHRegion(hri, FSUtils.getRootDir(UTIL.getConfiguration()), UTIL.getConfiguration(), desc);
Put put = new Put(row);
put.add(cf, cq, cq);
region.put(put);
region.flushcache();
Scan scan = new Scan();
scan.addColumn(cf, cq);
ByteArrayComparable comparable = new TestFakeKeyInFilter();
Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, comparable);
scan.setFilter(filter);
RegionScanner scanner = region.getScanner(scan);
scanner.next(new ArrayList<KeyValue>());
scanner.close();
region.close();
}
}

View File

@ -4899,5 +4899,31 @@ public class TestFromClientSide {
assertEquals(1, regionsList.size()); assertEquals(1, regionsList.size());
} }
@Test
public void testJira6912() throws Exception {
byte [] TABLE = Bytes.toBytes("testJira6912");
HTable foo = TEST_UTIL.createTable(TABLE, new byte[][] {FAMILY}, 10);
List<Put> puts = new ArrayList<Put>();
for (int i=0;i !=100; i++){
Put put = new Put(Bytes.toBytes(i));
put.add(FAMILY, FAMILY, Bytes.toBytes(i));
puts.add(put);
}
foo.put(puts);
// If i comment this out it works
TEST_UTIL.flush();
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(1));
scan.setStopRow(Bytes.toBytes(3));
scan.addColumn(FAMILY, FAMILY);
scan.setFilter(new RowFilter(CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(1))));
ResultScanner scanner = foo.getScanner(scan);
Result[] bar = scanner.next(100);
assertEquals(1, bar.length);
}
} }