HBASE-7032 Remove old IncrementColumnValue code.
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1401447 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1725705a47
commit
2f06bfd297
|
@ -4820,110 +4820,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return new Result(allKVs);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param row
|
||||
* @param family
|
||||
* @param qualifier
|
||||
* @param amount
|
||||
* @param writeToWAL
|
||||
* @return The new value.
|
||||
* @throws IOException
|
||||
* @deprecated use {@link #increment(Increment, Integer, boolean)}
|
||||
*/
|
||||
@Deprecated
|
||||
public long incrementColumnValue(byte [] row, byte [] family,
|
||||
byte [] qualifier, long amount, boolean writeToWAL)
|
||||
throws IOException {
|
||||
// to be used for metrics
|
||||
long before = EnvironmentEdgeManager.currentTimeMillis();
|
||||
|
||||
checkRow(row, "increment");
|
||||
boolean flush = false;
|
||||
boolean wrongLength = false;
|
||||
long txid = 0;
|
||||
// Lock row
|
||||
long result = amount;
|
||||
startRegionOperation();
|
||||
this.writeRequestsCount.increment();
|
||||
try {
|
||||
Integer lid = obtainRowLock(row);
|
||||
this.updatesLock.readLock().lock();
|
||||
try {
|
||||
Store store = stores.get(family);
|
||||
|
||||
// Get the old value:
|
||||
Get get = new Get(row);
|
||||
get.addColumn(family, qualifier);
|
||||
|
||||
// we don't want to invoke coprocessor in this case; ICV is wrapped
|
||||
// in HRegionServer, so we leave getLastIncrement alone
|
||||
List<KeyValue> results = get(get, false);
|
||||
|
||||
if (!results.isEmpty()) {
|
||||
KeyValue kv = results.get(0);
|
||||
if(kv.getValueLength() == Bytes.SIZEOF_LONG){
|
||||
byte [] buffer = kv.getBuffer();
|
||||
int valueOffset = kv.getValueOffset();
|
||||
result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
|
||||
}
|
||||
else{
|
||||
wrongLength = true;
|
||||
}
|
||||
}
|
||||
if(!wrongLength){
|
||||
// build the KeyValue now:
|
||||
KeyValue newKv = new KeyValue(row, family,
|
||||
qualifier, EnvironmentEdgeManager.currentTimeMillis(),
|
||||
Bytes.toBytes(result));
|
||||
|
||||
// now log it:
|
||||
if (writeToWAL) {
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
WALEdit walEdit = new WALEdit();
|
||||
walEdit.add(newKv);
|
||||
// Using default cluster id, as this can only happen in the
|
||||
// orginating cluster. A slave cluster receives the final value (not
|
||||
// the delta) as a Put.
|
||||
txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
|
||||
walEdit, HConstants.DEFAULT_CLUSTER_ID, now,
|
||||
this.htableDescriptor);
|
||||
}
|
||||
|
||||
// Now request the ICV to the store, this will set the timestamp
|
||||
// appropriately depending on if there is a value in memcache or not.
|
||||
// returns the change in the size of the memstore from operation
|
||||
long size = store.updateColumnValue(row, family, qualifier, result);
|
||||
|
||||
size = this.addAndGetGlobalMemstoreSize(size);
|
||||
flush = isFlushSize(size);
|
||||
}
|
||||
} finally {
|
||||
this.updatesLock.readLock().unlock();
|
||||
releaseRowLock(lid);
|
||||
}
|
||||
if (writeToWAL) {
|
||||
syncOrDefer(txid); // sync the transaction log outside the rowlock
|
||||
}
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
}
|
||||
|
||||
// do after lock
|
||||
long after = EnvironmentEdgeManager.currentTimeMillis();
|
||||
this.opMetrics.updateIncrementColumnValueMetrics(family, after - before);
|
||||
|
||||
if (flush) {
|
||||
// Request a cache flush. Do it outside update lock.
|
||||
requestFlush();
|
||||
}
|
||||
if (wrongLength) {
|
||||
throw new DoNotRetryIOException(
|
||||
"Attempted to increment field that isn't 64 bits wide");
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// New HBASE-880 Helpers
|
||||
//
|
||||
|
|
|
@ -31,14 +31,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -109,32 +102,6 @@ public class TestAtomicOperation extends HBaseTestCase {
|
|||
assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test one increment command.
|
||||
*/
|
||||
public void testIncrementColumnValue() throws IOException {
|
||||
LOG.info("Starting test testIncrementColumnValue");
|
||||
initHRegion(tableName, getName(), fam1);
|
||||
|
||||
long value = 1L;
|
||||
long amount = 3L;
|
||||
|
||||
Put put = new Put(row);
|
||||
put.add(fam1, qual1, Bytes.toBytes(value));
|
||||
region.put(put);
|
||||
|
||||
long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
|
||||
|
||||
assertEquals(value+amount, result);
|
||||
|
||||
HStore store = (HStore) region.getStore(fam1);
|
||||
// ICV removes any extra values floating around in there.
|
||||
assertEquals(1, store.memstore.kvset.size());
|
||||
assertTrue(store.memstore.snapshot.isEmpty());
|
||||
|
||||
assertICV(row, fam1, qual1, value+amount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test multi-threaded increments.
|
||||
*/
|
||||
|
@ -223,7 +190,7 @@ public class TestAtomicOperation extends HBaseTestCase {
|
|||
|
||||
private int count;
|
||||
|
||||
public Incrementer(HRegion region,
|
||||
public Incrementer(HRegion region,
|
||||
int threadNumber, int amount, int numIncrements) {
|
||||
this.region = region;
|
||||
this.threadNumber = threadNumber;
|
||||
|
@ -237,7 +204,9 @@ public class TestAtomicOperation extends HBaseTestCase {
|
|||
public void run() {
|
||||
for (int i=0; i<numIncrements; i++) {
|
||||
try {
|
||||
long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
|
||||
Increment inc = new Increment(row);
|
||||
inc.addColumn(fam1, qual1, amount);
|
||||
Result result = region.increment(inc, null, true);
|
||||
// LOG.info("thread:" + threadNumber + " iter:" + i);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
|
|
|
@ -2389,351 +2389,6 @@ public class TestHRegion extends HBaseTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testIncrementColumnValue_UpdatingInPlace() throws IOException {
|
||||
this.region = initHRegion(tableName, getName(), fam1);
|
||||
try {
|
||||
long value = 1L;
|
||||
long amount = 3L;
|
||||
|
||||
Put put = new Put(row);
|
||||
put.add(fam1, qual1, Bytes.toBytes(value));
|
||||
region.put(put);
|
||||
|
||||
long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
|
||||
|
||||
assertEquals(value+amount, result);
|
||||
|
||||
HStore store = (HStore) region.getStore(fam1);
|
||||
// ICV removes any extra values floating around in there.
|
||||
assertEquals(1, store.memstore.kvset.size());
|
||||
assertTrue(store.memstore.snapshot.isEmpty());
|
||||
|
||||
assertICV(row, fam1, qual1, value+amount);
|
||||
} finally {
|
||||
HRegion.closeHRegion(this.region);
|
||||
this.region = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void testIncrementColumnValue_BumpSnapshot() throws IOException {
|
||||
ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
|
||||
EnvironmentEdgeManagerTestHelper.injectEdge(mee);
|
||||
this.region = initHRegion(tableName, getName(), fam1);
|
||||
try {
|
||||
long value = 42L;
|
||||
long incr = 44L;
|
||||
|
||||
// first put something in kvset, then snapshot it.
|
||||
Put put = new Put(row);
|
||||
put.add(fam1, qual1, Bytes.toBytes(value));
|
||||
region.put(put);
|
||||
|
||||
// get the store in question:
|
||||
HStore s = (HStore) region.getStore(fam1);
|
||||
s.snapshot(); //bam
|
||||
|
||||
// now increment:
|
||||
long newVal = region.incrementColumnValue(row, fam1, qual1,
|
||||
incr, false);
|
||||
|
||||
assertEquals(value+incr, newVal);
|
||||
|
||||
// get both versions:
|
||||
Get get = new Get(row);
|
||||
get.setMaxVersions();
|
||||
get.addColumn(fam1,qual1);
|
||||
|
||||
Result r = region.get(get, null);
|
||||
assertEquals(2, r.size());
|
||||
KeyValue first = r.raw()[0];
|
||||
KeyValue second = r.raw()[1];
|
||||
|
||||
assertTrue("ICV failed to upgrade timestamp",
|
||||
first.getTimestamp() != second.getTimestamp());
|
||||
} finally {
|
||||
HRegion.closeHRegion(this.region);
|
||||
this.region = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void testIncrementColumnValue_ConcurrentFlush() throws IOException {
|
||||
this.region = initHRegion(tableName, getName(), fam1);
|
||||
try {
|
||||
long value = 1L;
|
||||
long amount = 3L;
|
||||
|
||||
Put put = new Put(row);
|
||||
put.add(fam1, qual1, Bytes.toBytes(value));
|
||||
region.put(put);
|
||||
|
||||
// now increment during a flush
|
||||
Thread t = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
region.flushcache();
|
||||
} catch (IOException e) {
|
||||
LOG.info("test ICV, got IOE during flushcache()");
|
||||
}
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
long r = region.incrementColumnValue(row, fam1, qual1, amount, true);
|
||||
assertEquals(value+amount, r);
|
||||
|
||||
// this also asserts there is only 1 KeyValue in the set.
|
||||
assertICV(row, fam1, qual1, value+amount);
|
||||
} finally {
|
||||
HRegion.closeHRegion(this.region);
|
||||
this.region = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void testIncrementColumnValue_heapSize() throws IOException {
|
||||
EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
|
||||
|
||||
this.region = initHRegion(tableName, getName(), fam1);
|
||||
try {
|
||||
long byAmount = 1L;
|
||||
long size;
|
||||
|
||||
for( int i = 0; i < 1000 ; i++) {
|
||||
region.incrementColumnValue(row, fam1, qual1, byAmount, true);
|
||||
|
||||
size = region.memstoreSize.get();
|
||||
assertTrue("memstore size: " + size, size >= 0);
|
||||
}
|
||||
} finally {
|
||||
HRegion.closeHRegion(this.region);
|
||||
this.region = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void testIncrementColumnValue_UpdatingInPlace_Negative()
|
||||
throws IOException {
|
||||
this.region = initHRegion(tableName, getName(), fam1);
|
||||
try {
|
||||
long value = 3L;
|
||||
long amount = -1L;
|
||||
|
||||
Put put = new Put(row);
|
||||
put.add(fam1, qual1, Bytes.toBytes(value));
|
||||
region.put(put);
|
||||
|
||||
long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
|
||||
assertEquals(value+amount, result);
|
||||
|
||||
assertICV(row, fam1, qual1, value+amount);
|
||||
} finally {
|
||||
HRegion.closeHRegion(this.region);
|
||||
this.region = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void testIncrementColumnValue_AddingNew()
|
||||
throws IOException {
|
||||
this.region = initHRegion(tableName, getName(), fam1);
|
||||
try {
|
||||
long value = 1L;
|
||||
long amount = 3L;
|
||||
|
||||
Put put = new Put(row);
|
||||
put.add(fam1, qual1, Bytes.toBytes(value));
|
||||
put.add(fam1, qual2, Bytes.toBytes(value));
|
||||
region.put(put);
|
||||
|
||||
long result = region.incrementColumnValue(row, fam1, qual3, amount, true);
|
||||
assertEquals(amount, result);
|
||||
|
||||
Get get = new Get(row);
|
||||
get.addColumn(fam1, qual3);
|
||||
Result rr = region.get(get, null);
|
||||
assertEquals(1, rr.size());
|
||||
|
||||
// ensure none of the other cols were incremented.
|
||||
assertICV(row, fam1, qual1, value);
|
||||
assertICV(row, fam1, qual2, value);
|
||||
assertICV(row, fam1, qual3, amount);
|
||||
} finally {
|
||||
HRegion.closeHRegion(this.region);
|
||||
this.region = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void testIncrementColumnValue_UpdatingFromSF() throws IOException {
|
||||
this.region = initHRegion(tableName, getName(), fam1);
|
||||
try {
|
||||
long value = 1L;
|
||||
long amount = 3L;
|
||||
|
||||
Put put = new Put(row);
|
||||
put.add(fam1, qual1, Bytes.toBytes(value));
|
||||
put.add(fam1, qual2, Bytes.toBytes(value));
|
||||
region.put(put);
|
||||
|
||||
// flush to disk.
|
||||
region.flushcache();
|
||||
|
||||
HStore store = (HStore) region.getStore(fam1);
|
||||
assertEquals(0, store.memstore.kvset.size());
|
||||
|
||||
long r = region.incrementColumnValue(row, fam1, qual1, amount, true);
|
||||
assertEquals(value+amount, r);
|
||||
|
||||
assertICV(row, fam1, qual1, value+amount);
|
||||
} finally {
|
||||
HRegion.closeHRegion(this.region);
|
||||
this.region = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void testIncrementColumnValue_AddingNewAfterSFCheck()
|
||||
throws IOException {
|
||||
this.region = initHRegion(tableName, getName(), fam1);
|
||||
try {
|
||||
long value = 1L;
|
||||
long amount = 3L;
|
||||
|
||||
Put put = new Put(row);
|
||||
put.add(fam1, qual1, Bytes.toBytes(value));
|
||||
put.add(fam1, qual2, Bytes.toBytes(value));
|
||||
region.put(put);
|
||||
region.flushcache();
|
||||
|
||||
HStore store = (HStore) region.getStore(fam1);
|
||||
assertEquals(0, store.memstore.kvset.size());
|
||||
|
||||
long r = region.incrementColumnValue(row, fam1, qual3, amount, true);
|
||||
assertEquals(amount, r);
|
||||
|
||||
assertICV(row, fam1, qual3, amount);
|
||||
|
||||
region.flushcache();
|
||||
|
||||
// ensure that this gets to disk.
|
||||
assertICV(row, fam1, qual3, amount);
|
||||
} finally {
|
||||
HRegion.closeHRegion(this.region);
|
||||
this.region = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Added for HBASE-3235.
|
||||
*
|
||||
* When the initial put and an ICV update were arriving with the same timestamp,
|
||||
* the initial Put KV was being skipped during {@link MemStore#upsert(KeyValue)}
|
||||
* causing the iteration for matching KVs, causing the update-in-place to not
|
||||
* happen and the ICV put to effectively disappear.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void testIncrementColumnValue_UpdatingInPlace_TimestampClobber() throws IOException {
|
||||
this.region = initHRegion(tableName, getName(), fam1);
|
||||
try {
|
||||
long value = 1L;
|
||||
long amount = 3L;
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
ManualEnvironmentEdge mock = new ManualEnvironmentEdge();
|
||||
mock.setValue(now);
|
||||
EnvironmentEdgeManagerTestHelper.injectEdge(mock);
|
||||
|
||||
// verify we catch an ICV on a put with the same timestamp
|
||||
Put put = new Put(row);
|
||||
put.add(fam1, qual1, now, Bytes.toBytes(value));
|
||||
region.put(put);
|
||||
|
||||
long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
|
||||
|
||||
assertEquals(value+amount, result);
|
||||
|
||||
HStore store = (HStore) region.getStore(fam1);
|
||||
// ICV should update the existing Put with the same timestamp
|
||||
assertEquals(1, store.memstore.kvset.size());
|
||||
assertTrue(store.memstore.snapshot.isEmpty());
|
||||
|
||||
assertICV(row, fam1, qual1, value+amount);
|
||||
|
||||
// verify we catch an ICV even when the put ts > now
|
||||
put = new Put(row);
|
||||
put.add(fam1, qual2, now+1, Bytes.toBytes(value));
|
||||
region.put(put);
|
||||
|
||||
result = region.incrementColumnValue(row, fam1, qual2, amount, true);
|
||||
|
||||
assertEquals(value+amount, result);
|
||||
|
||||
store = (HStore) region.getStore(fam1);
|
||||
// ICV should update the existing Put with the same timestamp
|
||||
assertEquals(2, store.memstore.kvset.size());
|
||||
assertTrue(store.memstore.snapshot.isEmpty());
|
||||
|
||||
assertICV(row, fam1, qual2, value+amount);
|
||||
EnvironmentEdgeManagerTestHelper.reset();
|
||||
} finally {
|
||||
HRegion.closeHRegion(this.region);
|
||||
this.region = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void testIncrementColumnValue_WrongInitialSize() throws IOException {
|
||||
this.region = initHRegion(tableName, getName(), fam1);
|
||||
try {
|
||||
byte[] row1 = Bytes.add(Bytes.toBytes("1234"), Bytes.toBytes(0L));
|
||||
int row1Field1 = 0;
|
||||
int row1Field2 = 1;
|
||||
Put put1 = new Put(row1);
|
||||
put1.add(fam1, qual1, Bytes.toBytes(row1Field1));
|
||||
put1.add(fam1, qual2, Bytes.toBytes(row1Field2));
|
||||
region.put(put1);
|
||||
|
||||
long result;
|
||||
try {
|
||||
result = region.incrementColumnValue(row1, fam1, qual1, 1, true);
|
||||
fail("Expected to fail here");
|
||||
} catch (Exception exception) {
|
||||
// Expected.
|
||||
}
|
||||
|
||||
|
||||
assertICV(row1, fam1, qual1, row1Field1);
|
||||
assertICV(row1, fam1, qual2, row1Field2);
|
||||
} finally {
|
||||
HRegion.closeHRegion(this.region);
|
||||
this.region = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void testIncrement_WrongInitialSize() throws IOException {
|
||||
this.region = initHRegion(tableName, getName(), fam1);
|
||||
try {
|
||||
byte[] row1 = Bytes.add(Bytes.toBytes("1234"), Bytes.toBytes(0L));
|
||||
long row1Field1 = 0;
|
||||
int row1Field2 = 1;
|
||||
Put put1 = new Put(row1);
|
||||
put1.add(fam1, qual1, Bytes.toBytes(row1Field1));
|
||||
put1.add(fam1, qual2, Bytes.toBytes(row1Field2));
|
||||
region.put(put1);
|
||||
Increment increment = new Increment(row1);
|
||||
increment.addColumn(fam1, qual1, 1);
|
||||
|
||||
//here we should be successful as normal
|
||||
region.increment(increment, null, true);
|
||||
assertICV(row1, fam1, qual1, row1Field1 + 1);
|
||||
|
||||
//failed to increment
|
||||
increment = new Increment(row1);
|
||||
increment.addColumn(fam1, qual2, 1);
|
||||
try {
|
||||
region.increment(increment, null, true);
|
||||
fail("Expected to fail here");
|
||||
} catch (Exception exception) {
|
||||
// Expected.
|
||||
}
|
||||
assertICV(row1, fam1, qual2, row1Field2);
|
||||
} finally {
|
||||
HRegion.closeHRegion(this.region);
|
||||
this.region = null;
|
||||
}
|
||||
}
|
||||
private void assertICV(byte [] row,
|
||||
byte [] familiy,
|
||||
byte[] qualifier,
|
||||
|
|
Loading…
Reference in New Issue