HBASE-2863 HBASE-2553 removed an important edge case
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@966906 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
07d085c2e5
commit
65b09b7cd5
|
@ -448,6 +448,7 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-2854 broken tests on trunk
|
||||
HBASE-2859 Cleanup deprecated stuff in TestHLog (Alex Newman via Stack)
|
||||
HBASE-2858 TestReplication.queueFailover fails half the time
|
||||
HBASE-2863 HBASE-2553 removed an important edge case
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1760 Cleanup TODOs in HTable
|
||||
|
|
|
@ -1047,12 +1047,20 @@ public class KeyValue implements Writable, HeapSize {
|
|||
* @return True if matching families.
|
||||
*/
|
||||
public boolean matchingFamily(final byte [] family) {
|
||||
return matchingFamily(family, 0, family.length);
|
||||
}
|
||||
|
||||
public boolean matchingFamily(final byte[] family, int offset, int length) {
|
||||
if (this.length == 0 || this.bytes.length == 0) {
|
||||
return false;
|
||||
}
|
||||
int o = getFamilyOffset();
|
||||
int l = getFamilyLength(o);
|
||||
return Bytes.compareTo(family, 0, family.length, this.bytes, o, l) == 0;
|
||||
return Bytes.compareTo(family, offset, length,
|
||||
this.bytes, getFamilyOffset(), getFamilyLength()) == 0;
|
||||
}
|
||||
|
||||
public boolean matchingFamily(final KeyValue other) {
|
||||
return matchingFamily(other.getBuffer(), other.getFamilyOffset(),
|
||||
other.getFamilyLength());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1060,10 +1068,31 @@ public class KeyValue implements Writable, HeapSize {
|
|||
* @return True if matching qualifiers.
|
||||
*/
|
||||
public boolean matchingQualifier(final byte [] qualifier) {
|
||||
int o = getQualifierOffset();
|
||||
int l = getQualifierLength();
|
||||
return Bytes.compareTo(qualifier, 0, qualifier.length,
|
||||
this.bytes, o, l) == 0;
|
||||
return matchingQualifier(qualifier, 0, qualifier.length);
|
||||
}
|
||||
|
||||
public boolean matchingQualifier(final byte [] qualifier, int offset, int length) {
|
||||
return Bytes.compareTo(qualifier, offset, length,
|
||||
this.bytes, getQualifierOffset(), getQualifierLength()) == 0;
|
||||
}
|
||||
|
||||
public boolean matchingQualifier(final KeyValue other) {
|
||||
return matchingQualifier(other.getBuffer(), other.getQualifierOffset(),
|
||||
other.getQualifierLength());
|
||||
}
|
||||
|
||||
public boolean matchingRow(final byte [] row) {
|
||||
return matchingRow(row, 0, row.length);
|
||||
}
|
||||
|
||||
public boolean matchingRow(final byte[] row, int offset, int length) {
|
||||
return Bytes.compareTo(row, offset, length,
|
||||
this.bytes, getRowOffset(), getRowLength()) == 0;
|
||||
}
|
||||
|
||||
public boolean matchingRow(KeyValue other) {
|
||||
return matchingRow(other.getBuffer(), other.getRowOffset(),
|
||||
other.getRowLength());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1089,12 +1118,12 @@ public class KeyValue implements Writable, HeapSize {
|
|||
int o = getFamilyOffset(rl);
|
||||
int fl = getFamilyLength(o);
|
||||
int ql = getQualifierLength(rl,fl);
|
||||
if(Bytes.compareTo(family, 0, family.length, this.bytes, o, family.length)
|
||||
if (Bytes.compareTo(family, 0, family.length, this.bytes, o, family.length)
|
||||
!= 0) {
|
||||
return false;
|
||||
}
|
||||
if(qualifier == null || qualifier.length == 0) {
|
||||
if(ql == 0) {
|
||||
if (qualifier == null || qualifier.length == 0) {
|
||||
if (ql == 0) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -369,16 +369,59 @@ public class MemStore implements HeapSize {
|
|||
long now) {
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
KeyValue firstKv = KeyValue.createFirstOnRow(
|
||||
row, family, qualifier);
|
||||
// create a new KeyValue with 'now' and a 0 memstoreTS == immediately visible
|
||||
KeyValue newKv = new KeyValue(row, family, qualifier,
|
||||
KeyValue newKv;
|
||||
// Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
|
||||
SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
|
||||
if (!snSs.isEmpty()) {
|
||||
KeyValue snKv = snSs.first();
|
||||
// is there a matching KV in the snapshot?
|
||||
if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
|
||||
if (snKv.getTimestamp() == now) {
|
||||
// poop,
|
||||
now += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
|
||||
// But the timestamp should also be max(now, mostRecentTsInMemstore)
|
||||
|
||||
// so we cant add the new KV w/o knowing what's there already, but we also
|
||||
// want to take this chance to delete some kvs. So two loops (sad)
|
||||
|
||||
SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
|
||||
Iterator<KeyValue> it = ss.iterator();
|
||||
while ( it.hasNext() ) {
|
||||
KeyValue kv = it.next();
|
||||
|
||||
// if this isnt the row we are interested in, then bail:
|
||||
if (!firstKv.matchingRow(kv)) {
|
||||
break; // rows dont match, bail.
|
||||
}
|
||||
|
||||
// if the qualifier matches and it's a put, just RM it out of the kvset.
|
||||
if (firstKv.matchingQualifier(kv)) {
|
||||
// to be extra safe we only remove Puts that have a memstoreTS==0
|
||||
if (kv.getType() == KeyValue.Type.Put.getCode()) {
|
||||
now = Math.max(now, kv.getTimestamp());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// add the new value now. this might have the same TS as an existing KV, thus confusing
|
||||
// readers slightly for a MOMENT until we erase the old one (and thus old value).
|
||||
newKv = new KeyValue(row, family, qualifier,
|
||||
now,
|
||||
Bytes.toBytes(newValue));
|
||||
|
||||
long addedSize = add(newKv);
|
||||
|
||||
// now find and RM the old one(s) to prevent version explosion:
|
||||
SortedSet<KeyValue> ss = kvset.tailSet(newKv);
|
||||
Iterator<KeyValue> it = ss.iterator();
|
||||
// remove extra versions.
|
||||
ss = kvset.tailSet(firstKv);
|
||||
it = ss.iterator();
|
||||
while ( it.hasNext() ) {
|
||||
KeyValue kv = it.next();
|
||||
|
||||
|
@ -386,21 +429,16 @@ public class MemStore implements HeapSize {
|
|||
// ignore the one i just put in (heh)
|
||||
continue;
|
||||
}
|
||||
|
||||
// if this isnt the row we are interested in, then bail:
|
||||
if (0 != Bytes.compareTo(
|
||||
newKv.getBuffer(), newKv.getRowOffset(), newKv.getRowLength(),
|
||||
kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
|
||||
if (!firstKv.matchingRow(kv)) {
|
||||
break; // rows dont match, bail.
|
||||
}
|
||||
|
||||
// if the qualifier matches and it's a put, just RM it out of the kvset.
|
||||
if (0 == Bytes.compareTo(
|
||||
newKv.getBuffer(), newKv.getQualifierOffset(), newKv.getQualifierLength(),
|
||||
kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength())) {
|
||||
|
||||
if (firstKv.matchingQualifier(kv)) {
|
||||
// to be extra safe we only remove Puts that have a memstoreTS==0
|
||||
if (kv.getType() == KeyValue.Type.Put.getCode() &&
|
||||
kv.getMemstoreTS() == 0) {
|
||||
if (kv.getType() == KeyValue.Type.Put.getCode()) {
|
||||
// false means there was a change, so give us the size.
|
||||
addedSize -= heapSizeChange(kv, false);
|
||||
|
||||
|
|
|
@ -52,6 +52,8 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
@ -1301,7 +1303,7 @@ public class Store implements HeapSize {
|
|||
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
long now = System.currentTimeMillis();
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
|
||||
return this.memstore.updateColumnValue(row,
|
||||
f,
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
/**
|
||||
* An environment edge that uses a manually set value. This is useful for testing events that are supposed to
|
||||
* happen in the same millisecond.
|
||||
*/
|
||||
public class ManualEnvironmentEdge implements EnvironmentEdge {
|
||||
|
||||
// Sometimes 0 ts might have a special value, so lets start with 1
|
||||
protected long value = 1L;
|
||||
|
||||
public void setValue(long newValue) {
|
||||
value = newValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long currentTimeMillis() {
|
||||
return this.value;
|
||||
}
|
||||
}
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
||||
|
@ -1859,14 +1860,16 @@ public class TestHRegion extends HBaseTestCase {
|
|||
assertEquals(value+amount, result);
|
||||
|
||||
Store store = region.getStore(fam1);
|
||||
// we will have the original Put, and also the ICV'ed Put as well.
|
||||
assertEquals(2, store.memstore.kvset.size());
|
||||
// ICV removes any extra values floating around in there.
|
||||
assertEquals(1, store.memstore.kvset.size());
|
||||
assertTrue(store.memstore.snapshot.isEmpty());
|
||||
|
||||
assertICV(row, fam1, qual1, value+amount);
|
||||
}
|
||||
|
||||
public void testIncrementColumnValue_BumpSnapshot() throws IOException {
|
||||
ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
|
||||
EnvironmentEdgeManagerTestHelper.injectEdge(mee);
|
||||
initHRegion(tableName, getName(), fam1);
|
||||
|
||||
long value = 42L;
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.util.Collections;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
@ -49,12 +48,12 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.apache.hadoop.security.UnixUserGroupInformation;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
@ -303,9 +302,6 @@ public class TestStore extends TestCase {
|
|||
System.currentTimeMillis(),
|
||||
Bytes.toBytes(oldValue)));
|
||||
|
||||
// sleep 2 ms to space out the increments.
|
||||
Thread.sleep(2);
|
||||
|
||||
// update during the snapshot.
|
||||
long ret = this.store.updateColumnValue(row, family, qf1, newValue);
|
||||
|
||||
|
@ -324,9 +320,6 @@ public class TestStore extends TestCase {
|
|||
get.setMaxVersions(); // all versions.
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
|
||||
NavigableSet<byte[]> cols = new TreeSet<byte[]>();
|
||||
cols.add(qf1);
|
||||
|
||||
results = HBaseTestingUtility.getFromStoreFile(store, get);
|
||||
assertEquals(2, results.size());
|
||||
|
||||
|
@ -337,7 +330,73 @@ public class TestStore extends TestCase {
|
|||
|
||||
assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
|
||||
assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
|
||||
}
|
||||
|
||||
public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
|
||||
ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
|
||||
EnvironmentEdgeManagerTestHelper.injectEdge(mee);
|
||||
init(this.getName());
|
||||
|
||||
long oldValue = 1L;
|
||||
long newValue = 3L;
|
||||
this.store.add(new KeyValue(row, family, qf1,
|
||||
EnvironmentEdgeManager.currentTimeMillis(),
|
||||
Bytes.toBytes(oldValue)));
|
||||
|
||||
// snapshot the store.
|
||||
this.store.snapshot();
|
||||
|
||||
// update during the snapshot, the exact same TS as the Put (lololol)
|
||||
long ret = this.store.updateColumnValue(row, family, qf1, newValue);
|
||||
|
||||
// memstore should have grown by some amount.
|
||||
assertTrue(ret > 0);
|
||||
|
||||
// then flush.
|
||||
flushStore(store, id++);
|
||||
assertEquals(1, this.store.getStorefiles().size());
|
||||
assertEquals(1, this.store.memstore.kvset.size());
|
||||
|
||||
// now increment again:
|
||||
newValue += 1;
|
||||
this.store.updateColumnValue(row, family, qf1, newValue);
|
||||
|
||||
// at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again:
|
||||
newValue += 1;
|
||||
this.store.updateColumnValue(row, family, qf1, newValue);
|
||||
|
||||
// the second TS should be TS=2 or higher., even though 'time=1' right now.
|
||||
|
||||
|
||||
// how many key/values for this row are there?
|
||||
Get get = new Get(row);
|
||||
get.addColumn(family, qf1);
|
||||
get.setMaxVersions(); // all versions.
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
|
||||
results = HBaseTestingUtility.getFromStoreFile(store, get);
|
||||
assertEquals(2, results.size());
|
||||
|
||||
long ts1 = results.get(0).getTimestamp();
|
||||
long ts2 = results.get(1).getTimestamp();
|
||||
|
||||
assertTrue(ts1 > ts2);
|
||||
assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
|
||||
assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
|
||||
|
||||
mee.setValue(2); // time goes up slightly
|
||||
newValue += 1;
|
||||
this.store.updateColumnValue(row, family, qf1, newValue);
|
||||
|
||||
results = HBaseTestingUtility.getFromStoreFile(store, get);
|
||||
assertEquals(2, results.size());
|
||||
|
||||
ts1 = results.get(0).getTimestamp();
|
||||
ts2 = results.get(1).getTimestamp();
|
||||
|
||||
assertTrue(ts1 > ts2);
|
||||
assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
|
||||
assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
|
||||
}
|
||||
|
||||
public void testHandleErrorsInFlush() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue