HBASE-3443 ICV optimization to look in memstore first and then store files (HBASE-3082) does not work when deletes are in the mix
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1325406 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
36324e341d
commit
9a0673f8c6
|
@ -33,7 +33,6 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
|
@ -4134,89 +4133,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return new Result(results);
|
||||
}
|
||||
|
||||
/**
|
||||
* An optimized version of {@link #get(Get)} that checks MemStore first for
|
||||
* the specified query.
|
||||
* <p>
|
||||
* This is intended for use by increment operations where we have the
|
||||
* guarantee that versions are never inserted out-of-order so if a value
|
||||
* exists in MemStore it is the latest value.
|
||||
* <p>
|
||||
* It only makes sense to use this method without a TimeRange and maxVersions
|
||||
* equal to 1.
|
||||
* @param get
|
||||
* @return result
|
||||
* @throws IOException
|
||||
*/
|
||||
private List<KeyValue> getLastIncrement(final Get get) throws IOException {
|
||||
InternalScan iscan = new InternalScan(get);
|
||||
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
|
||||
// memstore scan
|
||||
iscan.checkOnlyMemStore();
|
||||
RegionScanner scanner = null;
|
||||
try {
|
||||
scanner = getScanner(iscan);
|
||||
scanner.next(results);
|
||||
} finally {
|
||||
if (scanner != null)
|
||||
scanner.close();
|
||||
}
|
||||
|
||||
// count how many columns we're looking for
|
||||
int expected = 0;
|
||||
Map<byte[], NavigableSet<byte[]>> familyMap = get.getFamilyMap();
|
||||
for (NavigableSet<byte[]> qfs : familyMap.values()) {
|
||||
expected += qfs.size();
|
||||
}
|
||||
|
||||
// found everything we were looking for, done
|
||||
if (results.size() == expected) {
|
||||
return results;
|
||||
}
|
||||
|
||||
// still have more columns to find
|
||||
if (results != null && !results.isEmpty()) {
|
||||
// subtract what was found in memstore
|
||||
for (KeyValue kv : results) {
|
||||
byte [] family = kv.getFamily();
|
||||
NavigableSet<byte[]> qfs = familyMap.get(family);
|
||||
qfs.remove(kv.getQualifier());
|
||||
if (qfs.isEmpty()) familyMap.remove(family);
|
||||
expected--;
|
||||
}
|
||||
// make a new get for just what is left
|
||||
Get newGet = new Get(get.getRow());
|
||||
for (Map.Entry<byte[], NavigableSet<byte[]>> f : familyMap.entrySet()) {
|
||||
byte [] family = f.getKey();
|
||||
for (byte [] qualifier : f.getValue()) {
|
||||
newGet.addColumn(family, qualifier);
|
||||
}
|
||||
}
|
||||
newGet.setTimeRange(get.getTimeRange().getMin(),
|
||||
get.getTimeRange().getMax());
|
||||
iscan = new InternalScan(newGet);
|
||||
}
|
||||
|
||||
// check store files for what is left
|
||||
List<KeyValue> fileResults = new ArrayList<KeyValue>();
|
||||
iscan.checkOnlyStoreFiles();
|
||||
scanner = null;
|
||||
try {
|
||||
scanner = getScanner(iscan);
|
||||
scanner.next(fileResults);
|
||||
} finally {
|
||||
if (scanner != null)
|
||||
scanner.close();
|
||||
}
|
||||
|
||||
// combine and return
|
||||
results.addAll(fileResults);
|
||||
Collections.sort(results, KeyValue.COMPARATOR);
|
||||
return results;
|
||||
}
|
||||
|
||||
/*
|
||||
* Do a get based on the get parameter.
|
||||
* @param withCoprocessor invoke coprocessor or not. We don't want to
|
||||
|
@ -4713,7 +4629,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
get.addColumn(family.getKey(), column.getKey());
|
||||
}
|
||||
get.setTimeRange(tr.getMin(), tr.getMax());
|
||||
List<KeyValue> results = getLastIncrement(get);
|
||||
List<KeyValue> results = get(get, false);
|
||||
|
||||
// Iterate the input columns and update existing values if they were
|
||||
// found, otherwise add new column initialized to the increment amount
|
||||
|
@ -4813,7 +4729,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
// we don't want to invoke coprocessor in this case; ICV is wrapped
|
||||
// in HRegionServer, so we leave getLastIncrement alone
|
||||
List<KeyValue> results = getLastIncrement(get);
|
||||
List<KeyValue> results = get(get, false);
|
||||
|
||||
if (!results.isEmpty()) {
|
||||
KeyValue kv = results.get(0);
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.filter.RegexStringComparator;
|
|||
import org.apache.hadoop.hbase.filter.RowFilter;
|
||||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
|
||||
import org.apache.hadoop.hbase.generated.master.table_jsp;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
|
@ -4186,6 +4187,27 @@ public class TestFromClientSide {
|
|||
assertEquals(0, Bytes.compareTo(Bytes.add(v2,v1), r.getValue(FAMILY, QUALIFIERS[1])));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementWithDeletes() throws Exception {
|
||||
LOG.info("Starting testIncrement");
|
||||
final byte [] TABLENAME = Bytes.toBytes("testIncrementWithDeletes");
|
||||
HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
|
||||
final byte[] COLUMN = Bytes.toBytes("column");
|
||||
|
||||
ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5);
|
||||
TEST_UTIL.flush(TABLENAME);
|
||||
|
||||
Delete del = new Delete(ROW);
|
||||
ht.delete(del);
|
||||
|
||||
ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5);
|
||||
|
||||
Get get = new Get(ROW);
|
||||
Result r = ht.get(get);
|
||||
assertEquals(1, r.size());
|
||||
assertEquals(5, Bytes.toLong(r.getValue(FAMILY, COLUMN)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrement() throws Exception {
|
||||
LOG.info("Starting testIncrement");
|
||||
|
|
Loading…
Reference in New Issue