HBASE-2579 Add atomic checkAndDelete support
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@950743 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0a7e9913a6
commit
94c6f4c1b8
|
@ -661,6 +661,8 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-2636 Upgrade Jetty to 6.1.24
|
||||
HBASE-2437 Refactor HLog splitLog (Cosmin Lehene via Stack)
|
||||
HBASE-2638 Speed up REST tests
|
||||
HBASE-2653 Remove unused DynamicBloomFilter (especially as its tests are
|
||||
failing hudson on occasion)
|
||||
|
||||
NEW FEATURES
|
||||
HBASE-1961 HBase EC2 scripts
|
||||
|
@ -699,8 +701,7 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-1200 Add bloomfilters (Nicolas Spiegelberg via Stack)
|
||||
HBASE-2588 Add easier way to ship HBase dependencies to MR cluster within Job
|
||||
HBASE-1923 Bulk incremental load into an existing table
|
||||
HBASE-2653 Remove unused DynamicBloomFilter (especially as its tests are
|
||||
failing hudson on occasion)
|
||||
HBASE-2579 Add atomic checkAndDelete support (Michael Dalton via Stack)
|
||||
|
||||
OPTIMIZATIONS
|
||||
HBASE-410 [testing] Speed up the test suite
|
||||
|
|
|
@ -513,6 +513,35 @@ public class HTable implements HTableInterface {
|
|||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value match the expectedValue.
|
||||
* If it does, it adds the delete. If value == null, checks for non-existence
|
||||
* of the value.
|
||||
*
|
||||
* @param row to check
|
||||
* @param family column family
|
||||
* @param qualifier column qualifier
|
||||
* @param value the expected value
|
||||
* @param delete delete to execute if value matches.
|
||||
* @throws IOException
|
||||
* @return true if the new delete was executed, false otherwise
|
||||
*/
|
||||
public boolean checkAndDelete(final byte [] row,
|
||||
final byte [] family, final byte [] qualifier, final byte [] value,
|
||||
final Delete delete)
|
||||
throws IOException {
|
||||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Boolean>(connection, tableName, row) {
|
||||
public Boolean call() throws IOException {
|
||||
return server.checkAndDelete(
|
||||
location.getRegionInfo().getRegionName(),
|
||||
row, family, qualifier, value, delete)
|
||||
? Boolean.TRUE : Boolean.FALSE;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for the existence of columns in the table, as specified in the Get.<p>
|
||||
*
|
||||
|
|
|
@ -189,6 +189,22 @@ public interface HTableInterface {
|
|||
*/
|
||||
void delete(List<Delete> deletes) throws IOException;
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected
|
||||
* value. If it does, it adds the delete. If the passed value is null, the
|
||||
* check is for the lack of column (ie: non-existance)
|
||||
*
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param value the expected value
|
||||
* @param delete data to delete if check succeeds
|
||||
* @throws IOException e
|
||||
* @return true if the new delete was executed, false otherwise
|
||||
*/
|
||||
boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Delete delete) throws IOException;
|
||||
|
||||
/**
|
||||
* Atomically increments a column value.
|
||||
* <p>
|
||||
|
|
|
@ -152,6 +152,26 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion {
|
|||
final Put put)
|
||||
throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value match the expectedValue.
|
||||
* If it does, it adds the delete. If passed expected value is null, then the
|
||||
* check is for non-existance of the row/column.
|
||||
*
|
||||
* @param regionName region name
|
||||
* @param row row to check
|
||||
* @param family column family
|
||||
* @param qualifier column qualifier
|
||||
* @param value the expected value
|
||||
* @param delete data to delete if check succeeds
|
||||
* @throws IOException e
|
||||
* @return true if the new delete was execute, false otherwise
|
||||
*/
|
||||
public boolean checkAndDelete(final byte[] regionName, final byte [] row,
|
||||
final byte [] family, final byte [] qualifier, final byte [] value,
|
||||
final Delete delete)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Atomically increments a column value. If the column value isn't long-like,
|
||||
* this could throw an exception. If passed expected value is null, then the
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Delete;
|
|||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.RowLock;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||
|
@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
|
@ -1208,6 +1210,26 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
return new RegionScanner(scan, additionalScanners);
|
||||
}
|
||||
|
||||
/*
|
||||
* @param delete The passed delete is modified by this method. WARNING!
|
||||
*/
|
||||
private void prepareDelete(Delete delete) throws IOException {
|
||||
// Check to see if this is a deleteRow insert
|
||||
if(delete.getFamilyMap().isEmpty()){
|
||||
for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
|
||||
// Don't eat the timestamp
|
||||
delete.deleteFamily(family, delete.getTimeStamp());
|
||||
}
|
||||
} else {
|
||||
for(byte [] family : delete.getFamilyMap().keySet()) {
|
||||
if(family == null) {
|
||||
throw new NoSuchColumnFamilyException("Empty family is invalid");
|
||||
}
|
||||
checkFamily(family);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// set() methods for client use.
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1228,22 +1250,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
// If we did not pass an existing row lock, obtain a new one
|
||||
lid = getLock(lockid, row);
|
||||
|
||||
// Check to see if this is a deleteRow insert
|
||||
if(delete.getFamilyMap().isEmpty()){
|
||||
for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
|
||||
// Don't eat the timestamp
|
||||
delete.deleteFamily(family, delete.getTimeStamp());
|
||||
}
|
||||
} else {
|
||||
for(byte [] family : delete.getFamilyMap().keySet()) {
|
||||
if(family == null) {
|
||||
throw new NoSuchColumnFamilyException("Empty family is invalid");
|
||||
}
|
||||
checkFamily(family);
|
||||
}
|
||||
}
|
||||
|
||||
// All edits for the given row (across all column families) must happen atomically.
|
||||
prepareDelete(delete);
|
||||
delete(delete.getFamilyMap(), writeToWAL);
|
||||
|
||||
} finally {
|
||||
|
@ -1436,7 +1444,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
|
||||
//TODO, Think that gets/puts and deletes should be refactored a bit so that
|
||||
//the getting of the lock happens before, so that you would just pass it into
|
||||
//the methods. So in the case of checkAndPut you could just do lockRow,
|
||||
//the methods. So in the case of checkAndMutate you could just do lockRow,
|
||||
//get, put, unlockRow or something
|
||||
/**
|
||||
*
|
||||
|
@ -1450,16 +1458,21 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
* @throws IOException
|
||||
* @return true if the new put was execute, false otherwise
|
||||
*/
|
||||
public boolean checkAndPut(byte [] row, byte [] family, byte [] qualifier,
|
||||
byte [] expectedValue, Put put, Integer lockId, boolean writeToWAL)
|
||||
public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
|
||||
byte [] expectedValue, Writable w, Integer lockId, boolean writeToWAL)
|
||||
throws IOException{
|
||||
checkReadOnly();
|
||||
//TODO, add check for value length or maybe even better move this to the
|
||||
//client if this becomes a global setting
|
||||
checkResources();
|
||||
boolean isPut = w instanceof Put;
|
||||
if (!isPut && !(w instanceof Delete))
|
||||
throw new IOException("Action must be Put or Delete");
|
||||
|
||||
splitsAndClosesLock.readLock().lock();
|
||||
try {
|
||||
Get get = new Get(row, put.getRowLock());
|
||||
RowLock lock = isPut ? ((Put)w).getRowLock() : ((Delete)w).getRowLock();
|
||||
Get get = new Get(row, lock);
|
||||
checkFamily(family);
|
||||
get.addColumn(family, qualifier);
|
||||
|
||||
|
@ -1479,10 +1492,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
byte [] actualValue = result.get(0).getValue();
|
||||
matches = Bytes.equals(expectedValue, actualValue);
|
||||
}
|
||||
//If matches put the new put
|
||||
//If matches put the new put or delete the new delete
|
||||
if (matches) {
|
||||
// All edits for the given row (across all column families) must happen atomically.
|
||||
put(put.getFamilyMap(), writeToWAL);
|
||||
if (isPut)
|
||||
put(((Put)w).getFamilyMap(), writeToWAL);
|
||||
else
|
||||
delete(prepareDelete((Delete)w).getFamilyMap(), writeToWAL);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -1676,6 +1676,24 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
return -1;
|
||||
}
|
||||
|
||||
private boolean checkAndMutate(final byte[] regionName, final byte [] row,
|
||||
final byte [] family, final byte [] qualifier, final byte [] value,
|
||||
final Writable w, Integer lock) throws IOException {
|
||||
checkOpen();
|
||||
this.requestCount.incrementAndGet();
|
||||
HRegion region = getRegion(regionName);
|
||||
try {
|
||||
if (!region.getRegionInfo().isMetaTable()) {
|
||||
this.cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
return region.checkAndMutate(row, family, qualifier, value, w, lock,
|
||||
true);
|
||||
} catch (Throwable t) {
|
||||
throw convertThrowableToIOE(cleanup(t));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @param regionName
|
||||
|
@ -1690,23 +1708,26 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
public boolean checkAndPut(final byte[] regionName, final byte [] row,
|
||||
final byte [] family, final byte [] qualifier, final byte [] value,
|
||||
final Put put) throws IOException{
|
||||
//Getting actual value
|
||||
Get get = new Get(row);
|
||||
get.addColumn(family, qualifier);
|
||||
return checkAndMutate(regionName, row, family, qualifier, value, put,
|
||||
getLockFromId(put.getLockId()));
|
||||
}
|
||||
|
||||
checkOpen();
|
||||
this.requestCount.incrementAndGet();
|
||||
HRegion region = getRegion(regionName);
|
||||
try {
|
||||
if (!region.getRegionInfo().isMetaTable()) {
|
||||
this.cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
boolean retval = region.checkAndPut(row, family, qualifier, value, put,
|
||||
getLockFromId(put.getLockId()), true);
|
||||
return retval;
|
||||
} catch (Throwable t) {
|
||||
throw convertThrowableToIOE(cleanup(t));
|
||||
}
|
||||
/**
|
||||
*
|
||||
* @param regionName
|
||||
* @param row
|
||||
* @param family
|
||||
* @param qualifier
|
||||
* @param value the expected value
|
||||
* @param delete
|
||||
* @throws IOException
|
||||
* @return true if the new put was execute, false otherwise
|
||||
*/
|
||||
public boolean checkAndDelete(final byte[] regionName, final byte [] row,
|
||||
final byte [] family, final byte [] qualifier, final byte [] value,
|
||||
final Delete delete) throws IOException{
|
||||
return checkAndMutate(regionName, row, family, qualifier, value, delete,
|
||||
getLockFromId(delete.getLockId()));
|
||||
}
|
||||
|
||||
//
|
||||
|
|
|
@ -582,6 +582,12 @@ public class RemoteHTable implements HTableInterface {
|
|||
throw new IOException("checkAndPut not supported");
|
||||
}
|
||||
|
||||
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Delete delete) throws IOException {
|
||||
throw new IOException("checkAndDelete not supported");
|
||||
}
|
||||
|
||||
|
||||
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
|
||||
long amount) throws IOException {
|
||||
throw new IOException("incrementColumnValue not supported");
|
||||
|
|
|
@ -320,9 +320,9 @@ public class TestHRegion extends HBaseTestCase {
|
|||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// checkAndPut tests
|
||||
// checkAndMutate tests
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
public void testCheckAndPut_WithEmptyRowValue() throws IOException {
|
||||
public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
|
||||
byte [] tableName = Bytes.toBytes("testtable");
|
||||
byte [] row1 = Bytes.toBytes("row1");
|
||||
byte [] fam1 = Bytes.toBytes("fam1");
|
||||
|
@ -338,24 +338,42 @@ public class TestHRegion extends HBaseTestCase {
|
|||
//Putting data in key
|
||||
Put put = new Put(row1);
|
||||
put.add(fam1, qf1, val1);
|
||||
|
||||
|
||||
//checkAndPut with correct value
|
||||
boolean res = region.checkAndPut(row1, fam1, qf1, emptyVal, put, lockId,
|
||||
boolean res = region.checkAndMutate(row1, fam1, qf1, emptyVal, put, lockId,
|
||||
true);
|
||||
assertTrue(res);
|
||||
|
||||
// not empty anymore
|
||||
res = region.checkAndPut(row1, fam1, qf1, emptyVal, put, lockId, true);
|
||||
res = region.checkAndMutate(row1, fam1, qf1, emptyVal, put, lockId, true);
|
||||
assertFalse(res);
|
||||
|
||||
Delete delete = new Delete(row1);
|
||||
delete.deleteColumn(fam1, qf1);
|
||||
res = region.checkAndMutate(row1, fam1, qf1, emptyVal, delete, lockId,
|
||||
true);
|
||||
assertFalse(res);
|
||||
|
||||
put = new Put(row1);
|
||||
put.add(fam1, qf1, val2);
|
||||
//checkAndPut with correct value
|
||||
res = region.checkAndPut(row1, fam1, qf1, val1, put, lockId, true);
|
||||
res = region.checkAndMutate(row1, fam1, qf1, val1, put, lockId, true);
|
||||
assertTrue(res);
|
||||
|
||||
//checkAndDelete with correct value
|
||||
delete = new Delete(row1);
|
||||
delete.deleteColumn(fam1, qf1);
|
||||
delete.deleteColumn(fam1, qf1);
|
||||
res = region.checkAndMutate(row1, fam1, qf1, val2, delete, lockId, true);
|
||||
assertTrue(res);
|
||||
|
||||
delete = new Delete(row1);
|
||||
res = region.checkAndMutate(row1, fam1, qf1, emptyVal, delete, lockId,
|
||||
true);
|
||||
assertTrue(res);
|
||||
}
|
||||
|
||||
public void testCheckAndPut_WithWrongValue() throws IOException{
|
||||
public void testCheckAndMutate_WithWrongValue() throws IOException{
|
||||
byte [] tableName = Bytes.toBytes("testtable");
|
||||
byte [] row1 = Bytes.toBytes("row1");
|
||||
byte [] fam1 = Bytes.toBytes("fam1");
|
||||
|
@ -374,11 +392,17 @@ public class TestHRegion extends HBaseTestCase {
|
|||
region.put(put);
|
||||
|
||||
//checkAndPut with wrong value
|
||||
boolean res = region.checkAndPut(row1, fam1, qf1, val2, put, lockId, true);
|
||||
boolean res = region.checkAndMutate(row1, fam1, qf1, val2, put, lockId, true);
|
||||
assertEquals(false, res);
|
||||
|
||||
//checkAndDelete with wrong value
|
||||
Delete delete = new Delete(row1);
|
||||
delete.deleteFamily(fam1);
|
||||
res = region.checkAndMutate(row1, fam1, qf1, val2, delete, lockId, true);
|
||||
assertEquals(false, res);
|
||||
}
|
||||
|
||||
public void testCheckAndPut_WithCorrectValue() throws IOException{
|
||||
public void testCheckAndMutate_WithCorrectValue() throws IOException{
|
||||
byte [] tableName = Bytes.toBytes("testtable");
|
||||
byte [] row1 = Bytes.toBytes("row1");
|
||||
byte [] fam1 = Bytes.toBytes("fam1");
|
||||
|
@ -396,7 +420,13 @@ public class TestHRegion extends HBaseTestCase {
|
|||
region.put(put);
|
||||
|
||||
//checkAndPut with correct value
|
||||
boolean res = region.checkAndPut(row1, fam1, qf1, val1, put, lockId, true);
|
||||
boolean res = region.checkAndMutate(row1, fam1, qf1, val1, put, lockId, true);
|
||||
assertEquals(true, res);
|
||||
|
||||
//checkAndDelete with correct value
|
||||
Delete delete = new Delete(row1);
|
||||
delete.deleteColumn(fam1, qf1);
|
||||
res = region.checkAndMutate(row1, fam1, qf1, val1, put, lockId, true);
|
||||
assertEquals(true, res);
|
||||
}
|
||||
|
||||
|
@ -431,7 +461,7 @@ public class TestHRegion extends HBaseTestCase {
|
|||
Store store = region.getStore(fam1);
|
||||
store.memstore.kvset.size();
|
||||
|
||||
boolean res = region.checkAndPut(row1, fam1, qf1, val1, put, lockId, true);
|
||||
boolean res = region.checkAndMutate(row1, fam1, qf1, val1, put, lockId, true);
|
||||
assertEquals(true, res);
|
||||
store.memstore.kvset.size();
|
||||
|
||||
|
@ -448,6 +478,79 @@ public class TestHRegion extends HBaseTestCase {
|
|||
|
||||
}
|
||||
|
||||
public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException{
|
||||
byte [] tableName = Bytes.toBytes("testtable");
|
||||
byte [] row1 = Bytes.toBytes("row1");
|
||||
byte [] fam1 = Bytes.toBytes("fam1");
|
||||
byte [] fam2 = Bytes.toBytes("fam2");
|
||||
byte [] qf1 = Bytes.toBytes("qualifier1");
|
||||
byte [] qf2 = Bytes.toBytes("qualifier2");
|
||||
byte [] qf3 = Bytes.toBytes("qualifier3");
|
||||
byte [] val1 = Bytes.toBytes("value1");
|
||||
byte [] val2 = Bytes.toBytes("value2");
|
||||
byte [] val3 = Bytes.toBytes("value3");
|
||||
byte[] emptyVal = new byte[] { };
|
||||
Integer lockId = null;
|
||||
|
||||
byte [][] families = {fam1, fam2};
|
||||
|
||||
//Setting up region
|
||||
String method = this.getName();
|
||||
initHRegion(tableName, method, families);
|
||||
|
||||
//Put content
|
||||
Put put = new Put(row1);
|
||||
put.add(fam1, qf1, val1);
|
||||
region.put(put);
|
||||
|
||||
put = new Put(row1);
|
||||
put.add(fam1, qf1, val2);
|
||||
put.add(fam2, qf1, val3);
|
||||
put.add(fam2, qf2, val2);
|
||||
put.add(fam2, qf3, val1);
|
||||
put.add(fam1, qf3, val1);
|
||||
region.put(put);
|
||||
|
||||
//Multi-column delete
|
||||
Delete delete = new Delete(row1);
|
||||
delete.deleteColumn(fam1, qf1);
|
||||
delete.deleteColumn(fam2, qf1);
|
||||
delete.deleteColumn(fam1, qf3);
|
||||
boolean res = region.checkAndMutate(row1, fam1, qf1, val2, delete, lockId,
|
||||
true);
|
||||
assertEquals(true, res);
|
||||
|
||||
Get get = new Get(row1);
|
||||
get.addColumn(fam1, qf1);
|
||||
get.addColumn(fam1, qf3);
|
||||
get.addColumn(fam2, qf2);
|
||||
Result r = region.get(get, null);
|
||||
assertEquals(2, r.size());
|
||||
assertEquals(val1, r.getValue(fam1, qf1));
|
||||
assertEquals(val2, r.getValue(fam2, qf2));
|
||||
|
||||
//Family delete
|
||||
delete = new Delete(row1);
|
||||
delete.deleteFamily(fam2);
|
||||
res = region.checkAndMutate(row1, fam2, qf1, emptyVal, delete, lockId,
|
||||
true);
|
||||
assertEquals(true, res);
|
||||
|
||||
get = new Get(row1);
|
||||
r = region.get(get, null);
|
||||
assertEquals(1, r.size());
|
||||
assertEquals(val1, r.getValue(fam1, qf1));
|
||||
|
||||
//Row delete
|
||||
delete = new Delete(row1);
|
||||
res = region.checkAndMutate(row1, fam1, qf1, val1, delete, lockId,
|
||||
true);
|
||||
assertEquals(true, res);
|
||||
get = new Get(row1);
|
||||
r = region.get(get, null);
|
||||
assertEquals(0, r.size());
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// Delete tests
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
Loading…
Reference in New Issue