HBASE-4016 HRegion.incrementColumnValue() doesn't have a consistent behavior when the field that we are incrementing is less than 8 bytes long
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1140907 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8ffed175ef
commit
95dc02182e
|
@ -138,6 +138,9 @@ Release 0.91.0 - Unreleased
|
|||
all tests
|
||||
HBASE-4024 Major compaction may not be triggered, even though region
|
||||
server log says it is triggered (Ted Yu)
|
||||
HBASE-4016 HRegion.incrementColumnValue() doesn't have a consistent
|
||||
behavior when the field that we are incrementing is less
|
||||
than 8 bytes long (Li Pi)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
||||
|
|
|
@ -3515,6 +3515,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
throws IOException {
|
||||
checkRow(row);
|
||||
boolean flush = false;
|
||||
boolean wrongLength = false;
|
||||
// Lock row
|
||||
long result = amount;
|
||||
startRegionOperation();
|
||||
|
@ -3535,32 +3536,38 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
if (!results.isEmpty()) {
|
||||
KeyValue kv = results.get(0);
|
||||
byte [] buffer = kv.getBuffer();
|
||||
int valueOffset = kv.getValueOffset();
|
||||
result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
|
||||
if(kv.getValueLength() == 8){
|
||||
byte [] buffer = kv.getBuffer();
|
||||
int valueOffset = kv.getValueOffset();
|
||||
result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
|
||||
}
|
||||
else{
|
||||
wrongLength = true;
|
||||
}
|
||||
}
|
||||
|
||||
// build the KeyValue now:
|
||||
KeyValue newKv = new KeyValue(row, family,
|
||||
if(!wrongLength){
|
||||
// build the KeyValue now:
|
||||
KeyValue newKv = new KeyValue(row, family,
|
||||
qualifier, EnvironmentEdgeManager.currentTimeMillis(),
|
||||
Bytes.toBytes(result));
|
||||
|
||||
// now log it:
|
||||
if (writeToWAL) {
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
WALEdit walEdit = new WALEdit();
|
||||
walEdit.add(newKv);
|
||||
this.log.append(regionInfo, this.htableDescriptor.getName(),
|
||||
walEdit, now, this.htableDescriptor);
|
||||
// now log it:
|
||||
if (writeToWAL) {
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
WALEdit walEdit = new WALEdit();
|
||||
walEdit.add(newKv);
|
||||
this.log.append(regionInfo, this.htableDescriptor.getName(),
|
||||
walEdit, now, this.htableDescriptor);
|
||||
}
|
||||
|
||||
// Now request the ICV to the store, this will set the timestamp
|
||||
// appropriately depending on if there is a value in memcache or not.
|
||||
// returns the change in the size of the memstore from operation
|
||||
long size = store.updateColumnValue(row, family, qualifier, result);
|
||||
|
||||
size = this.addAndGetGlobalMemstoreSize(size);
|
||||
flush = isFlushSize(size);
|
||||
}
|
||||
|
||||
// Now request the ICV to the store, this will set the timestamp
|
||||
// appropriately depending on if there is a value in memcache or not.
|
||||
// returns the change in the size of the memstore from operation
|
||||
long size = store.updateColumnValue(row, family, qualifier, result);
|
||||
|
||||
size = this.addAndGetGlobalMemstoreSize(size);
|
||||
flush = isFlushSize(size);
|
||||
} finally {
|
||||
this.updatesLock.readLock().unlock();
|
||||
releaseRowLock(lid);
|
||||
|
@ -3573,7 +3580,9 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// Request a cache flush. Do it outside update lock.
|
||||
requestFlush();
|
||||
}
|
||||
|
||||
if(wrongLength){
|
||||
throw new IOException("Attempted to increment field that isn't 64 bits wide");
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -279,7 +279,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
private Replication replicationHandler;
|
||||
|
||||
private final RegionServerAccounting regionServerAccounting;
|
||||
|
||||
|
||||
/**
|
||||
* The server name the Master sees us as. Its made from the hostname the
|
||||
* master passes us, port, and server startcode. Gets set after registration
|
||||
|
|
|
@ -2146,6 +2146,30 @@ public class TestHRegion extends HBaseTestCase {
|
|||
EnvironmentEdgeManagerTestHelper.reset();
|
||||
}
|
||||
|
||||
public void testIncrementColumnValue_WrongInitialSize() throws IOException {
|
||||
initHRegion(tableName, getName(), fam1);
|
||||
|
||||
byte[] row1 = Bytes.add(Bytes.toBytes("1234"), Bytes.toBytes(0L));
|
||||
int row1Field1 = 0;
|
||||
int row1Field2 = 1;
|
||||
Put put1 = new Put(row1);
|
||||
put1.add(fam1, qual1, Bytes.toBytes(row1Field1));
|
||||
put1.add(fam1, qual2, Bytes.toBytes(row1Field2));
|
||||
region.put(put1);
|
||||
|
||||
long result;
|
||||
try {
|
||||
result = region.incrementColumnValue(row1, fam1, qual1, 1, true);
|
||||
fail("Expected to fail here");
|
||||
} catch (Exception exception) {
|
||||
// Expected.
|
||||
}
|
||||
|
||||
|
||||
assertICV(row1, fam1, qual1, row1Field1);
|
||||
assertICV(row1, fam1, qual2, row1Field2);
|
||||
}
|
||||
|
||||
private void assertICV(byte [] row,
|
||||
byte [] familiy,
|
||||
byte[] qualifier,
|
||||
|
@ -2161,7 +2185,20 @@ public class TestHRegion extends HBaseTestCase {
|
|||
assertEquals(amount, r);
|
||||
}
|
||||
|
||||
private void assertICV(byte [] row,
|
||||
byte [] familiy,
|
||||
byte[] qualifier,
|
||||
int amount) throws IOException {
|
||||
// run a get and see?
|
||||
Get get = new Get(row);
|
||||
get.addColumn(familiy, qualifier);
|
||||
Result result = region.get(get, null);
|
||||
assertEquals(1, result.size());
|
||||
|
||||
KeyValue kv = result.raw()[0];
|
||||
int r = Bytes.toInt(kv.getValue());
|
||||
assertEquals(amount, r);
|
||||
}
|
||||
|
||||
public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions()
|
||||
throws IOException {
|
||||
|
|
Loading…
Reference in New Issue