HBASE-1563 incrementColumnValue does not write to WAL
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@788529 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6ab29a6c47
commit
79f31a2f0f
|
@ -223,6 +223,7 @@ Release 0.20.0 - Unreleased
|
|||
HBASE-1567 cant serialize new filters
|
||||
HBASE-1585 More binary key/value log output cleanup
|
||||
(Lars George via Stack)
|
||||
HBASE-1563 incrementColumnValue does not write to WAL (Jon Gray via Stack)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
|
||||
|
|
|
@ -234,7 +234,7 @@ public class KeyValue implements Writable, HeapSize {
|
|||
public KeyValue(final byte [] bytes) {
|
||||
this(bytes, 0);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Creates a KeyValue from the specified byte array and offset.
|
||||
* Presumes <code>bytes</code> content starting at <code>offset</code> is
|
||||
|
@ -565,6 +565,16 @@ public class KeyValue implements Writable, HeapSize {
|
|||
//
|
||||
//---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Clones a KeyValue. This creates a copy, re-allocating the buffer.
|
||||
* @return Fully copied clone of this KeyValue
|
||||
*/
|
||||
public KeyValue clone() {
|
||||
byte [] bytes = new byte[this.length];
|
||||
System.arraycopy(this.bytes, this.offset, bytes, 0, this.length);
|
||||
return new KeyValue(bytes, 0, bytes.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clones a row.
|
||||
*
|
||||
|
|
|
@ -472,8 +472,8 @@ public class HTable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Atomically increments a column value. If the column value isn't long-like,
|
||||
* this could throw an exception.
|
||||
* Atomically increments a column value. If the column value already exists
|
||||
* and is not a big-endian long, this could throw an exception.<p>
|
||||
*
|
||||
* @param row
|
||||
* @param family
|
||||
|
@ -484,6 +484,26 @@ public class HTable {
|
|||
*/
|
||||
public long incrementColumnValue(final byte [] row, final byte [] family,
|
||||
final byte [] qualifier, final long amount)
|
||||
throws IOException {
|
||||
return incrementColumnValue(row, family, qualifier, amount, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically increments a column value. If the column value already exists
|
||||
* and is not a big-endian long, this could throw an exception.<p>
|
||||
*
|
||||
* Setting writeToWAL to false means that in a fail scenario, you will lose
|
||||
* any increments that have not been flushed.
|
||||
* @param row
|
||||
* @param family
|
||||
* @param qualifier
|
||||
* @param amount
|
||||
* @param writeToWAL true if increment should be applied to WAL, false if not
|
||||
* @return The new value.
|
||||
* @throws IOException
|
||||
*/
|
||||
public long incrementColumnValue(final byte [] row, final byte [] family,
|
||||
final byte [] qualifier, final long amount, final boolean writeToWAL)
|
||||
throws IOException {
|
||||
NullPointerException npe = null;
|
||||
if (row == null) {
|
||||
|
@ -499,11 +519,9 @@ public class HTable {
|
|||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Long>(connection, tableName, row) {
|
||||
public Long call() throws IOException {
|
||||
Get get = new Get(row);
|
||||
get.addColumn(family, qualifier);
|
||||
return server.incrementColumnValue(
|
||||
location.getRegionInfo().getRegionName(), row, family,
|
||||
qualifier, amount);
|
||||
qualifier, amount, writeToWAL);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
@ -147,11 +147,12 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion {
|
|||
* @param family
|
||||
* @param qualifier
|
||||
* @param amount
|
||||
* @param writeToWAL whether to write the increment to the WAL
|
||||
* @return new incremented column value
|
||||
* @throws IOException
|
||||
*/
|
||||
public long incrementColumnValue(byte [] regionName, byte [] row,
|
||||
byte [] family, byte [] qualifier, long amount)
|
||||
byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
|
||||
throws IOException;
|
||||
|
||||
|
||||
|
|
|
@ -2256,18 +2256,29 @@ public class HRegion implements HConstants { // , Writable{
|
|||
* @throws IOException
|
||||
*/
|
||||
public long incrementColumnValue(byte [] row, byte [] family,
|
||||
byte [] qualifier, long amount)
|
||||
byte [] qualifier, long amount, boolean writeToWAL)
|
||||
throws IOException {
|
||||
checkRow(row);
|
||||
|
||||
boolean flush = false;
|
||||
// Lock row
|
||||
Integer lid = obtainRowLock(row);
|
||||
long result = 0L;
|
||||
try {
|
||||
Store store = stores.get(family);
|
||||
Store.ValueAndSize vas =
|
||||
// Determine what to do and perform increment on returned KV, no insertion
|
||||
Store.ICVResult vas =
|
||||
store.incrementColumnValue(row, family, qualifier, amount);
|
||||
// Write incremented value to WAL before inserting
|
||||
if (writeToWAL) {
|
||||
long now = System.currentTimeMillis();
|
||||
List<KeyValue> edits = new ArrayList<KeyValue>(1);
|
||||
edits.add(vas.kv);
|
||||
this.log.append(regionInfo.getRegionName(),
|
||||
regionInfo.getTableDesc().getName(), edits,
|
||||
(regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
|
||||
}
|
||||
// Insert to the Store
|
||||
store.add(vas.kv);
|
||||
result = vas.value;
|
||||
long size = this.memstoreSize.addAndGet(vas.sizeAdded);
|
||||
flush = isFlushSize(size);
|
||||
|
|
|
@ -2392,7 +2392,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
|
||||
/** {@inheritDoc} */
|
||||
public long incrementColumnValue(byte [] regionName, byte [] row,
|
||||
byte [] family, byte [] qualifier, long amount)
|
||||
byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
|
||||
throws IOException {
|
||||
checkOpen();
|
||||
|
||||
|
@ -2403,7 +2403,8 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
requestCount.incrementAndGet();
|
||||
try {
|
||||
HRegion region = getRegion(regionName);
|
||||
return region.incrementColumnValue(row, family, qualifier, amount);
|
||||
return region.incrementColumnValue(row, family, qualifier, amount,
|
||||
writeToWAL);
|
||||
} catch (IOException e) {
|
||||
checkFileSystem();
|
||||
throw e;
|
||||
|
|
|
@ -1493,15 +1493,21 @@ public class Store implements HConstants {
|
|||
scanner.get(result);
|
||||
}
|
||||
|
||||
public static class ValueAndSize {
|
||||
public long value;
|
||||
public long sizeAdded;
|
||||
public ValueAndSize(long value, long sizeAdded) {
|
||||
/*
|
||||
* Data structure to hold incrementColumnValue result.
|
||||
*/
|
||||
static class ICVResult {
|
||||
final long value;
|
||||
final long sizeAdded;
|
||||
final KeyValue kv;
|
||||
|
||||
ICVResult(long value, long sizeAdded, KeyValue kv) {
|
||||
this.value = value;
|
||||
this.sizeAdded = sizeAdded;
|
||||
this.kv = kv;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Increments the value for the given row/family/qualifier
|
||||
* @param row
|
||||
|
@ -1511,8 +1517,9 @@ public class Store implements HConstants {
|
|||
* @return The new value.
|
||||
* @throws IOException
|
||||
*/
|
||||
public ValueAndSize incrementColumnValue(byte [] row, byte [] f,
|
||||
byte [] qualifier, long amount) throws IOException {
|
||||
public ICVResult incrementColumnValue(byte [] row, byte [] f,
|
||||
byte [] qualifier, long amount)
|
||||
throws IOException {
|
||||
long value = 0;
|
||||
List<KeyValue> result = new ArrayList<KeyValue>();
|
||||
KeyComparator keyComparator = this.comparator.getRawComparator();
|
||||
|
@ -1526,19 +1533,20 @@ public class Store implements HConstants {
|
|||
keyComparator, 1);
|
||||
|
||||
// Read from memstore
|
||||
if(this.memstore.get(matcher, result)) {
|
||||
if (this.memstore.get(matcher, result)) {
|
||||
// Received early-out from memstore
|
||||
KeyValue kv = result.get(0);
|
||||
// Make a copy of the KV and increment it
|
||||
KeyValue kv = result.get(0).clone();
|
||||
byte [] buffer = kv.getBuffer();
|
||||
int valueOffset = kv.getValueOffset();
|
||||
value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount;
|
||||
Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0,
|
||||
Bytes.SIZEOF_LONG);
|
||||
return new ValueAndSize(value, 0);
|
||||
Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0,
|
||||
Bytes.SIZEOF_LONG);
|
||||
return new ICVResult(value, 0, kv);
|
||||
}
|
||||
// Check if we even have storefiles
|
||||
if(this.storefiles.isEmpty()) {
|
||||
return addNewKeyValue(row, f, qualifier, value, amount);
|
||||
return createNewKeyValue(row, f, qualifier, value, amount);
|
||||
}
|
||||
|
||||
// Get storefiles for this store
|
||||
|
@ -1555,16 +1563,15 @@ public class Store implements HConstants {
|
|||
if(result.size() > 0) {
|
||||
value = Bytes.toLong(result.get(0).getValue());
|
||||
}
|
||||
return addNewKeyValue(row, f, qualifier, value, amount);
|
||||
return createNewKeyValue(row, f, qualifier, value, amount);
|
||||
}
|
||||
|
||||
private ValueAndSize addNewKeyValue(byte [] row, byte [] f, byte [] qualifier,
|
||||
long value, long amount) {
|
||||
private ICVResult createNewKeyValue(byte [] row, byte [] f,
|
||||
byte [] qualifier, long value, long amount) {
|
||||
long newValue = value + amount;
|
||||
KeyValue newKv = new KeyValue(row, f, qualifier,
|
||||
System.currentTimeMillis(),
|
||||
Bytes.toBytes(newValue));
|
||||
add(newKv);
|
||||
return new ValueAndSize(newValue, newKv.heapSize());
|
||||
return new ICVResult(newValue, newKv.heapSize(), newKv);
|
||||
}
|
||||
}
|
|
@ -208,7 +208,9 @@ public class TestStore extends TestCase {
|
|||
long amount = 3L;
|
||||
this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
|
||||
|
||||
this.store.incrementColumnValue(row, family, qf1, amount);
|
||||
Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
|
||||
assertEquals(vas.value, value+amount);
|
||||
store.add(vas.kv);
|
||||
Get get = new Get(row);
|
||||
get.addColumn(family, qf1);
|
||||
NavigableSet<byte[]> qualifiers =
|
||||
|
@ -232,7 +234,9 @@ public class TestStore extends TestCase {
|
|||
long amount = -1L;
|
||||
this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
|
||||
|
||||
this.store.incrementColumnValue(row, family, qf1, amount);
|
||||
Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
|
||||
assertEquals(vas.value, value+amount);
|
||||
store.add(vas.kv);
|
||||
Get get = new Get(row);
|
||||
get.addColumn(family, qf1);
|
||||
NavigableSet<byte[]> qualifiers =
|
||||
|
@ -256,7 +260,8 @@ public class TestStore extends TestCase {
|
|||
this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
|
||||
this.store.add(new KeyValue(row, family, qf2, Bytes.toBytes(value)));
|
||||
|
||||
this.store.incrementColumnValue(row, family, qf3, amount);
|
||||
Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf3, amount);
|
||||
store.add(vas.kv);
|
||||
Get get = new Get(row);
|
||||
get.addColumn(family, qf3);
|
||||
NavigableSet<byte[]> qualifiers =
|
||||
|
@ -283,7 +288,8 @@ public class TestStore extends TestCase {
|
|||
|
||||
flush(1);
|
||||
|
||||
this.store.incrementColumnValue(row, family, qf1, amount);
|
||||
Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
|
||||
store.add(vas.kv);
|
||||
Get get = new Get(row);
|
||||
get.addColumn(family, qf1);
|
||||
NavigableSet<byte[]> qualifiers =
|
||||
|
@ -311,7 +317,8 @@ public class TestStore extends TestCase {
|
|||
|
||||
flush(1);
|
||||
|
||||
this.store.incrementColumnValue(row, family, qf3, amount);
|
||||
Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf3, amount);
|
||||
store.add(vas.kv);
|
||||
Get get = new Get(row);
|
||||
get.addColumn(family, qf3);
|
||||
NavigableSet<byte[]> qualifiers =
|
||||
|
|
Loading…
Reference in New Issue