diff --git a/CHANGES.txt b/CHANGES.txt
index ab2dba39d79..61eb60b2e07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -448,6 +448,7 @@ Release 0.21.0 - Unreleased
HBASE-2854 broken tests on trunk
HBASE-2859 Cleanup deprecated stuff in TestHLog (Alex Newman via Stack)
HBASE-2858 TestReplication.queueFailover fails half the time
+ HBASE-2863 HBASE-2553 removed an important edge case
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable
diff --git a/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index e32d6838f4a..cba0da83396 100644
--- a/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -218,7 +218,7 @@ public class KeyValue implements Writable, HeapSize {
/** Dragon time over, return to normal business */
-
+
/** Writable Constructor -- DO NOT USE */
public KeyValue() {}
@@ -965,7 +965,7 @@ public class KeyValue implements Writable, HeapSize {
System.arraycopy(this.bytes, o, result, 0, l);
return result;
}
-
+
//---------------------------------------------------------------------------
//
// KeyValue splitter
@@ -1047,12 +1047,20 @@ public class KeyValue implements Writable, HeapSize {
* @return True if matching families.
*/
public boolean matchingFamily(final byte [] family) {
+ return matchingFamily(family, 0, family.length);
+ }
+
+ public boolean matchingFamily(final byte[] family, int offset, int length) {
if (this.length == 0 || this.bytes.length == 0) {
return false;
}
- int o = getFamilyOffset();
- int l = getFamilyLength(o);
- return Bytes.compareTo(family, 0, family.length, this.bytes, o, l) == 0;
+ return Bytes.compareTo(family, offset, length,
+ this.bytes, getFamilyOffset(), getFamilyLength()) == 0;
+ }
+
+ public boolean matchingFamily(final KeyValue other) {
+ return matchingFamily(other.getBuffer(), other.getFamilyOffset(),
+ other.getFamilyLength());
}
/**
@@ -1060,10 +1068,31 @@ public class KeyValue implements Writable, HeapSize {
* @return True if matching qualifiers.
*/
public boolean matchingQualifier(final byte [] qualifier) {
- int o = getQualifierOffset();
- int l = getQualifierLength();
- return Bytes.compareTo(qualifier, 0, qualifier.length,
- this.bytes, o, l) == 0;
+ return matchingQualifier(qualifier, 0, qualifier.length);
+ }
+
+ public boolean matchingQualifier(final byte [] qualifier, int offset, int length) {
+ return Bytes.compareTo(qualifier, offset, length,
+ this.bytes, getQualifierOffset(), getQualifierLength()) == 0;
+ }
+
+ public boolean matchingQualifier(final KeyValue other) {
+ return matchingQualifier(other.getBuffer(), other.getQualifierOffset(),
+ other.getQualifierLength());
+ }
+
+ public boolean matchingRow(final byte [] row) {
+ return matchingRow(row, 0, row.length);
+ }
+
+ public boolean matchingRow(final byte[] row, int offset, int length) {
+ return Bytes.compareTo(row, offset, length,
+ this.bytes, getRowOffset(), getRowLength()) == 0;
+ }
+
+ public boolean matchingRow(KeyValue other) {
+ return matchingRow(other.getBuffer(), other.getRowOffset(),
+ other.getRowLength());
}
/**
@@ -1089,12 +1118,12 @@ public class KeyValue implements Writable, HeapSize {
int o = getFamilyOffset(rl);
int fl = getFamilyLength(o);
int ql = getQualifierLength(rl,fl);
- if(Bytes.compareTo(family, 0, family.length, this.bytes, o, family.length)
+ if (Bytes.compareTo(family, 0, family.length, this.bytes, o, family.length)
!= 0) {
return false;
}
- if(qualifier == null || qualifier.length == 0) {
- if(ql == 0) {
+ if (qualifier == null || qualifier.length == 0) {
+ if (ql == 0) {
return true;
}
return false;
@@ -1520,7 +1549,7 @@ public class KeyValue implements Writable, HeapSize {
* Create a KeyValue that is smaller than all other possible KeyValues
* for the given row. That is any (valid) KeyValue on 'row' would sort
* _after_ the result.
- *
+ *
* @param row - row key (arbitrary byte array)
* @return First possible KeyValue on passed row
*/
@@ -1760,7 +1789,7 @@ public class KeyValue implements Writable, HeapSize {
// if row matches, and no column in the 'left' AND put type is 'minimum',
// then return that left is larger than right.
-
+
// This supports 'last key on a row' - the magic is if there is no column in the
// left operand, and the left operand has a type of '0' - magical value,
// then we say the left is bigger. This will let us seek to the last key in
@@ -1835,8 +1864,8 @@ public class KeyValue implements Writable, HeapSize {
// HeapSize
public long heapSize() {
- return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE +
- ClassSize.align(ClassSize.ARRAY + length) +
+ return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE +
+ ClassSize.align(ClassSize.ARRAY + length) +
(2 * Bytes.SIZEOF_INT) +
Bytes.SIZEOF_LONG);
}
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index 44fa0c39e88..01820a38d5d 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -369,16 +369,59 @@ public class MemStore implements HeapSize {
long now) {
this.lock.readLock().lock();
try {
+ KeyValue firstKv = KeyValue.createFirstOnRow(
+ row, family, qualifier);
// create a new KeyValue with 'now' and a 0 memstoreTS == immediately visible
- KeyValue newKv = new KeyValue(row, family, qualifier,
+ KeyValue newKv;
+ // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
+ SortedSet snSs = snapshot.tailSet(firstKv);
+ if (!snSs.isEmpty()) {
+ KeyValue snKv = snSs.first();
+ // is there a matching KV in the snapshot?
+ if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
+ if (snKv.getTimestamp() == now) {
+ // poop,
+ now += 1;
+ }
+ }
+ }
+
+ // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
+ // But the timestamp should also be max(now, mostRecentTsInMemstore)
+
+ // so we cant add the new KV w/o knowing what's there already, but we also
+ // want to take this chance to delete some kvs. So two loops (sad)
+
+ SortedSet ss = kvset.tailSet(firstKv);
+ Iterator it = ss.iterator();
+ while ( it.hasNext() ) {
+ KeyValue kv = it.next();
+
+ // if this isnt the row we are interested in, then bail:
+ if (!firstKv.matchingRow(kv)) {
+ break; // rows dont match, bail.
+ }
+
+ // if the qualifier matches and it's a put, just RM it out of the kvset.
+ if (firstKv.matchingQualifier(kv)) {
+ // to be extra safe we only remove Puts that have a memstoreTS==0
+ if (kv.getType() == KeyValue.Type.Put.getCode()) {
+ now = Math.max(now, kv.getTimestamp());
+ }
+ }
+ }
+
+
+ // add the new value now. this might have the same TS as an existing KV, thus confusing
+ // readers slightly for a MOMENT until we erase the old one (and thus old value).
+ newKv = new KeyValue(row, family, qualifier,
now,
Bytes.toBytes(newValue));
-
long addedSize = add(newKv);
- // now find and RM the old one(s) to prevent version explosion:
- SortedSet ss = kvset.tailSet(newKv);
- Iterator it = ss.iterator();
+ // remove extra versions.
+ ss = kvset.tailSet(firstKv);
+ it = ss.iterator();
while ( it.hasNext() ) {
KeyValue kv = it.next();
@@ -386,21 +429,16 @@ public class MemStore implements HeapSize {
// ignore the one i just put in (heh)
continue;
}
+
// if this isnt the row we are interested in, then bail:
- if (0 != Bytes.compareTo(
- newKv.getBuffer(), newKv.getRowOffset(), newKv.getRowLength(),
- kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
+ if (!firstKv.matchingRow(kv)) {
break; // rows dont match, bail.
}
// if the qualifier matches and it's a put, just RM it out of the kvset.
- if (0 == Bytes.compareTo(
- newKv.getBuffer(), newKv.getQualifierOffset(), newKv.getQualifierLength(),
- kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength())) {
-
+ if (firstKv.matchingQualifier(kv)) {
// to be extra safe we only remove Puts that have a memstoreTS==0
- if (kv.getType() == KeyValue.Type.Put.getCode() &&
- kv.getMemstoreTS() == 0) {
+ if (kv.getType() == KeyValue.Type.Put.getCode()) {
// false means there was a change, so give us the size.
addedSize -= heapSizeChange(kv, false);
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index c1ff9f29747..21d355f0f86 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -52,6 +52,8 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
import com.google.common.collect.ImmutableList;
@@ -1301,7 +1303,7 @@ public class Store implements HeapSize {
this.lock.readLock().lock();
try {
- long now = System.currentTimeMillis();
+ long now = EnvironmentEdgeManager.currentTimeMillis();
return this.memstore.updateColumnValue(row,
f,
diff --git a/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java b/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java
new file mode 100644
index 00000000000..d698df17ea9
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+/**
+ * An environment edge that uses a manually set value. This is useful for testing events that are supposed to
+ * happen in the same millisecond.
+ */
+public class ManualEnvironmentEdge implements EnvironmentEdge {
+
+ // Sometimes 0 ts might have a special value, so lets start with 1
+ protected long value = 1L;
+
+ public void setValue(long newValue) {
+ value = newValue;
+ }
+
+ @Override
+ public long currentTimeMillis() {
+ return this.value;
+ }
+}
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 4ead02d9837..0a6713812a4 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
@@ -1859,14 +1860,16 @@ public class TestHRegion extends HBaseTestCase {
assertEquals(value+amount, result);
Store store = region.getStore(fam1);
- // we will have the original Put, and also the ICV'ed Put as well.
- assertEquals(2, store.memstore.kvset.size());
+ // ICV removes any extra values floating around in there.
+ assertEquals(1, store.memstore.kvset.size());
assertTrue(store.memstore.snapshot.isEmpty());
assertICV(row, fam1, qual1, value+amount);
}
public void testIncrementColumnValue_BumpSnapshot() throws IOException {
+ ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
+ EnvironmentEdgeManagerTestHelper.injectEdge(mee);
initHRegion(tableName, getName(), fam1);
long value = 42L;
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index a32eed68ef5..0b47975b399 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -27,7 +27,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
-import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import junit.framework.TestCase;
@@ -49,12 +48,12 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.security.UnixUserGroupInformation;
import com.google.common.base.Joiner;
@@ -303,9 +302,6 @@ public class TestStore extends TestCase {
System.currentTimeMillis(),
Bytes.toBytes(oldValue)));
- // sleep 2 ms to space out the increments.
- Thread.sleep(2);
-
// update during the snapshot.
long ret = this.store.updateColumnValue(row, family, qf1, newValue);
@@ -324,9 +320,6 @@ public class TestStore extends TestCase {
get.setMaxVersions(); // all versions.
List results = new ArrayList();
- NavigableSet cols = new TreeSet();
- cols.add(qf1);
-
results = HBaseTestingUtility.getFromStoreFile(store, get);
assertEquals(2, results.size());
@@ -337,7 +330,73 @@ public class TestStore extends TestCase {
assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
+ }
+ public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
+ ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
+ EnvironmentEdgeManagerTestHelper.injectEdge(mee);
+ init(this.getName());
+
+ long oldValue = 1L;
+ long newValue = 3L;
+ this.store.add(new KeyValue(row, family, qf1,
+ EnvironmentEdgeManager.currentTimeMillis(),
+ Bytes.toBytes(oldValue)));
+
+ // snapshot the store.
+ this.store.snapshot();
+
+ // update during the snapshot, the exact same TS as the Put (lololol)
+ long ret = this.store.updateColumnValue(row, family, qf1, newValue);
+
+ // memstore should have grown by some amount.
+ assertTrue(ret > 0);
+
+ // then flush.
+ flushStore(store, id++);
+ assertEquals(1, this.store.getStorefiles().size());
+ assertEquals(1, this.store.memstore.kvset.size());
+
+ // now increment again:
+ newValue += 1;
+ this.store.updateColumnValue(row, family, qf1, newValue);
+
+ // at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again:
+ newValue += 1;
+ this.store.updateColumnValue(row, family, qf1, newValue);
+
+ // the second TS should be TS=2 or higher., even though 'time=1' right now.
+
+
+ // how many key/values for this row are there?
+ Get get = new Get(row);
+ get.addColumn(family, qf1);
+ get.setMaxVersions(); // all versions.
+ List results = new ArrayList();
+
+ results = HBaseTestingUtility.getFromStoreFile(store, get);
+ assertEquals(2, results.size());
+
+ long ts1 = results.get(0).getTimestamp();
+ long ts2 = results.get(1).getTimestamp();
+
+ assertTrue(ts1 > ts2);
+ assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
+ assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
+
+ mee.setValue(2); // time goes up slightly
+ newValue += 1;
+ this.store.updateColumnValue(row, family, qf1, newValue);
+
+ results = HBaseTestingUtility.getFromStoreFile(store, get);
+ assertEquals(2, results.size());
+
+ ts1 = results.get(0).getTimestamp();
+ ts2 = results.get(1).getTimestamp();
+
+ assertTrue(ts1 > ts2);
+ assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
+ assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
}
public void testHandleErrorsInFlush() throws Exception {
@@ -508,4 +567,4 @@ public class TestStore extends TestCase {
result = HBaseTestingUtility.getFromStoreFile(store, get);
assertTrue(result.size()==0);
}
-}
\ No newline at end of file
+}