HBASE-1703 ICV across flush
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@798448 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a6c6f77612
commit
9a23b30068
|
@ -293,6 +293,8 @@ Release 0.20.0 - Unreleased
|
|||
when trying to read
|
||||
HBASE-1705 Thrift server: deletes in mutateRow/s don't delete
|
||||
(Tim Sell and Ryan Rawson via Stack)
|
||||
HBASE-1703 ICVs across /during a flush can cause multiple keys with the
|
||||
same TS (bad)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
|
||||
|
|
|
@ -58,7 +58,7 @@ public class MemStore implements HeapSize {
|
|||
|
||||
private final long ttl;
|
||||
|
||||
// MemStore. Use a SkipListMap rather than SkipListSet because of the
|
||||
// MemStore. Use a KeyValueSkipListSet rather than SkipListSet because of the
|
||||
// better semantics. The Map will overwrite if passed a key it already had
|
||||
// whereas the Set will not add new KV if key is same though value might be
|
||||
// different. Value is not important -- just make sure always same
|
||||
|
@ -575,7 +575,7 @@ public class MemStore implements HeapSize {
|
|||
* @return true if done with store (early-out), false if not
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean internalGet(final NavigableSet<KeyValue> set,
|
||||
boolean internalGet(final NavigableSet<KeyValue> set,
|
||||
final QueryMatcher matcher, final List<KeyValue> result)
|
||||
throws IOException {
|
||||
if(set.isEmpty()) return false;
|
||||
|
|
|
@ -279,9 +279,9 @@ public class QueryMatcher {
|
|||
MatchCode mc = columns.checkColumn(bytes, columnOffset, columnLength);
|
||||
if (mc == MatchCode.INCLUDE && this.filter != null) {
|
||||
switch(this.filter.filterKeyValue(kv)) {
|
||||
case INCLUDE: return MatchCode.INCLUDE;
|
||||
case SKIP: return MatchCode.SKIP;
|
||||
default: return MatchCode.DONE;
|
||||
case INCLUDE: return MatchCode.INCLUDE;
|
||||
case SKIP: return MatchCode.SKIP;
|
||||
default: return MatchCode.DONE;
|
||||
}
|
||||
}
|
||||
return mc;
|
||||
|
|
|
@ -1607,22 +1607,46 @@ public class Store implements HConstants, HeapSize {
|
|||
|
||||
// Setting up the QueryMatcher
|
||||
Get get = new Get(row);
|
||||
NavigableSet<byte[]> qualifiers =
|
||||
NavigableSet<byte[]> qualifiers =
|
||||
new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
||||
qualifiers.add(qualifier);
|
||||
QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl,
|
||||
keyComparator, 1);
|
||||
|
||||
// Read from memstore
|
||||
if (this.memstore.get(matcher, result)) {
|
||||
|
||||
boolean newTs = true;
|
||||
KeyValue kv = null;
|
||||
// Read from memstore first:
|
||||
this.memstore.internalGet(this.memstore.kvset,
|
||||
matcher, result);
|
||||
if (!result.isEmpty()) {
|
||||
kv = result.get(0).clone();
|
||||
newTs = false;
|
||||
} else {
|
||||
// try the snapshot.
|
||||
this.memstore.internalGet(this.memstore.snapshot,
|
||||
matcher, result);
|
||||
if (!result.isEmpty()) {
|
||||
kv = result.get(0).clone();
|
||||
}
|
||||
}
|
||||
|
||||
if (kv != null) {
|
||||
// Received early-out from memstore
|
||||
// Make a copy of the KV and increment it
|
||||
KeyValue kv = result.get(0).clone();
|
||||
byte [] buffer = kv.getBuffer();
|
||||
int valueOffset = kv.getValueOffset();
|
||||
value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount;
|
||||
Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0,
|
||||
Bytes.SIZEOF_LONG);
|
||||
if (newTs) {
|
||||
long currTs = System.currentTimeMillis();
|
||||
if (currTs == kv.getTimestamp()) {
|
||||
currTs++; // just in case something wacky happens.
|
||||
}
|
||||
byte [] stampBytes = Bytes.toBytes(currTs);
|
||||
Bytes.putBytes(buffer, kv.getTimestampOffset(), stampBytes, 0,
|
||||
Bytes.SIZEOF_LONG);
|
||||
}
|
||||
return new ICVResult(value, 0, kv);
|
||||
}
|
||||
// Check if we even have storefiles
|
||||
|
|
|
@ -1,15 +1,6 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
|
@ -22,6 +13,15 @@ import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
||||
/**
|
||||
* Test class fosr the Store
|
||||
*/
|
||||
|
@ -242,7 +242,7 @@ public class TestStore extends TestCase {
|
|||
this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
|
||||
|
||||
Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
|
||||
assertEquals(vas.value, value+amount);
|
||||
assertEquals(value+amount, vas.value);
|
||||
store.add(vas.kv);
|
||||
Get get = new Get(row);
|
||||
get.addColumn(family, qf1);
|
||||
|
@ -361,5 +361,45 @@ public class TestStore extends TestCase {
|
|||
this.store.get(get, qualifiers, result);
|
||||
assertEquals(amount, Bytes.toLong(result.get(0).getValue()));
|
||||
}
|
||||
|
||||
public void testIncrementColumnValue_ICVDuringFlush()
|
||||
throws IOException {
|
||||
init(this.getName());
|
||||
|
||||
long value = 1L;
|
||||
long amount = 3L;
|
||||
this.store.add(new KeyValue(row, family, qf1,
|
||||
System.currentTimeMillis(),
|
||||
Bytes.toBytes(value)));
|
||||
|
||||
// snapshot the store.
|
||||
this.store.snapshot();
|
||||
|
||||
// incrment during the snapshot...
|
||||
|
||||
Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
|
||||
|
||||
// then flush.
|
||||
this.store.flushCache(id++);
|
||||
assertEquals(1, this.store.getStorefiles().size());
|
||||
assertEquals(0, this.store.memstore.kvset.size());
|
||||
|
||||
Get get = new Get(row);
|
||||
get.addColumn(family, qf1);
|
||||
get.setMaxVersions(); // all versions.
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
|
||||
NavigableSet<byte[]> cols = new TreeSet<byte[]>();
|
||||
cols.add(qf1);
|
||||
|
||||
this.store.get(get, cols, results);
|
||||
// only one, because Store.ICV doesnt add to memcache.
|
||||
assertEquals(1, results.size());
|
||||
|
||||
// but the timestamps should be different...
|
||||
long icvTs = vas.kv.getTimestamp();
|
||||
long storeTs = results.get(0).getTimestamp();
|
||||
assertTrue(icvTs != storeTs);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue