HBASE-1090 Atomic Check And Save in HTable
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@732094 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1224440de8
commit
18815c8879
|
@ -209,6 +209,7 @@ Release 0.19.0 - Unreleased
|
||||||
HBASE-1106 Expose getClosestRowBefore in HTable
|
HBASE-1106 Expose getClosestRowBefore in HTable
|
||||||
(Michael Gottesman via Stack)
|
(Michael Gottesman via Stack)
|
||||||
HBASE-1082 Administrative functions for table/region maintenance
|
HBASE-1082 Administrative functions for table/region maintenance
|
||||||
|
HBASE-1090 Atomic Check And Save in HTable (Michael Gottesman via Stack)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]
|
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.io.BatchOperation;
|
||||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||||
import org.apache.hadoop.hbase.io.Cell;
|
import org.apache.hadoop.hbase.io.Cell;
|
||||||
import org.apache.hadoop.hbase.io.RowResult;
|
import org.apache.hadoop.hbase.io.RowResult;
|
||||||
|
import org.apache.hadoop.hbase.io.HbaseMapWritable;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
|
|
||||||
|
@ -1334,6 +1335,33 @@ public class HTable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Atomically checks if a row's values match
|
||||||
|
* the expectedValues. If it does, it uses the
|
||||||
|
* batchUpdate to update the row.
|
||||||
|
* @param batchUpdate batchupdate to apply if check is successful
|
||||||
|
* @param expectedValues values to check
|
||||||
|
* @param rl rowlock
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public synchronized boolean checkAndSave(final BatchUpdate batchUpdate,
|
||||||
|
final HbaseMapWritable<byte[],byte[]> expectedValues, final RowLock rl)
|
||||||
|
throws IOException {
|
||||||
|
checkRowAndColumns(batchUpdate);
|
||||||
|
if(rl != null) {
|
||||||
|
batchUpdate.setRowLock(rl.getLockId());
|
||||||
|
}
|
||||||
|
return connection.getRegionServerWithRetries(
|
||||||
|
new ServerCallable<Boolean>(connection, tableName, batchUpdate.getRow()) {
|
||||||
|
public Boolean call() throws IOException {
|
||||||
|
return server.checkAndSave(location.getRegionInfo().getRegionName(),
|
||||||
|
batchUpdate, expectedValues)?
|
||||||
|
Boolean.TRUE: Boolean.FALSE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
).booleanValue();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Commit to the table the buffer of BatchUpdate.
|
* Commit to the table the buffer of BatchUpdate.
|
||||||
* Called automaticaly in the commit methods when autoFlush is true.
|
* Called automaticaly in the commit methods when autoFlush is true.
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||||
import org.apache.hadoop.hbase.io.Cell;
|
import org.apache.hadoop.hbase.io.Cell;
|
||||||
import org.apache.hadoop.hbase.io.RowResult;
|
import org.apache.hadoop.hbase.io.RowResult;
|
||||||
|
import org.apache.hadoop.hbase.io.HbaseMapWritable;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
|
@ -117,6 +118,21 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion {
|
||||||
public int batchUpdates(final byte[] regionName, final BatchUpdate[] b)
|
public int batchUpdates(final byte[] regionName, final BatchUpdate[] b)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Applies a batch of updates to one row atomically via one RPC
|
||||||
|
* if the columns specified in expectedValues match
|
||||||
|
* the given values in expectedValues
|
||||||
|
*
|
||||||
|
* @param regionName name of the region to update
|
||||||
|
* @param b BatchUpdate
|
||||||
|
* @param expectedValues map of column names to expected data values.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public boolean checkAndSave(final byte [] regionName, final BatchUpdate b,
|
||||||
|
final HbaseMapWritable<byte[],byte[]> expectedValues)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete all cells that match the passed row and column and whose timestamp
|
* Delete all cells that match the passed row and column and whose timestamp
|
||||||
* is equal-to or older than the passed timestamp.
|
* is equal-to or older than the passed timestamp.
|
||||||
|
|
|
@ -1291,6 +1291,96 @@ public class HRegion implements HConstants {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Performs an atomic check and save operation. Checks if
|
||||||
|
* the specified expected values have changed, and if not
|
||||||
|
* applies the update.
|
||||||
|
*
|
||||||
|
* @param b the update to apply
|
||||||
|
* @param expectedValues the expected values to check
|
||||||
|
* @param writeToWAL whether or not to write to the write ahead log
|
||||||
|
*/
|
||||||
|
public boolean checkAndSave(BatchUpdate b,
|
||||||
|
HbaseMapWritable<byte[], byte[]> expectedValues, Integer lockid,
|
||||||
|
boolean writeToWAL)
|
||||||
|
throws IOException {
|
||||||
|
// This is basically a copy of batchUpdate with the atomic check and save
|
||||||
|
// added in. So you should read this method with batchUpdate. I will
|
||||||
|
// comment the areas that I have changed where I have not changed, you
|
||||||
|
// should read the comments from the batchUpdate method
|
||||||
|
boolean success = true;
|
||||||
|
checkReadOnly();
|
||||||
|
checkResources();
|
||||||
|
splitsAndClosesLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
byte[] row = b.getRow();
|
||||||
|
Integer lid = getLock(lockid,row);
|
||||||
|
try {
|
||||||
|
Set<byte[]> keySet = expectedValues.keySet();
|
||||||
|
Map<byte[],Cell> actualValues = this.getFull(row,keySet,
|
||||||
|
HConstants.LATEST_TIMESTAMP, 1,lid);
|
||||||
|
for (byte[] key : keySet) {
|
||||||
|
// If test fails exit
|
||||||
|
if(!Bytes.equals(actualValues.get(key).getValue(),
|
||||||
|
expectedValues.get(key))) {
|
||||||
|
success = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (success) {
|
||||||
|
long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP)?
|
||||||
|
System.currentTimeMillis(): b.getTimestamp();
|
||||||
|
List<byte []> deletes = null;
|
||||||
|
for (BatchOperation op: b) {
|
||||||
|
HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime,
|
||||||
|
this.regionInfo);
|
||||||
|
byte[] val = null;
|
||||||
|
if (op.isPut()) {
|
||||||
|
val = op.getValue();
|
||||||
|
if (HLogEdit.isDeleted(val)) {
|
||||||
|
throw new IOException("Cannot insert value: " + val);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (b.getTimestamp() == LATEST_TIMESTAMP) {
|
||||||
|
// Save off these deletes
|
||||||
|
if (deletes == null) {
|
||||||
|
deletes = new ArrayList<byte []>();
|
||||||
|
}
|
||||||
|
deletes.add(op.getColumn());
|
||||||
|
} else {
|
||||||
|
val = HLogEdit.deleteBytes.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (val != null) {
|
||||||
|
localput(lid, key, val);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TreeMap<HStoreKey, byte[]> edits =
|
||||||
|
this.targetColumns.remove(lid);
|
||||||
|
if (edits != null && edits.size() > 0) {
|
||||||
|
update(edits, writeToWAL);
|
||||||
|
}
|
||||||
|
if (deletes != null && deletes.size() > 0) {
|
||||||
|
// We have some LATEST_TIMESTAMP deletes to run.
|
||||||
|
for (byte [] column: deletes) {
|
||||||
|
deleteMultiple(row, column, LATEST_TIMESTAMP, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
this.targetColumns.remove(Long.valueOf(lid));
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
if(lockid == null) releaseRowLock(lid);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
splitsAndClosesLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check if resources to support an update.
|
* Check if resources to support an update.
|
||||||
*
|
*
|
||||||
|
|
|
@ -1626,6 +1626,26 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean checkAndSave(final byte [] regionName, final BatchUpdate b,
|
||||||
|
final HbaseMapWritable<byte[],byte[]> expectedValues)
|
||||||
|
throws IOException {
|
||||||
|
if (b.getRow() == null)
|
||||||
|
throw new IllegalArgumentException("update has null row");
|
||||||
|
checkOpen();
|
||||||
|
this.requestCount.incrementAndGet();
|
||||||
|
HRegion region = getRegion(regionName);
|
||||||
|
validateValuesLength(b, region);
|
||||||
|
try {
|
||||||
|
cacheFlusher.reclaimMemcacheMemory();
|
||||||
|
boolean result = region.checkAndSave(b,
|
||||||
|
expectedValues,getLockFromId(b.getRowLock()), false);
|
||||||
|
return result;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
throw convertThrowableToIOE(cleanup(t));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility method to verify values length
|
* Utility method to verify values length
|
||||||
* @param batchUpdate The update to verify
|
* @param batchUpdate The update to verify
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||||
import org.apache.hadoop.hbase.io.Cell;
|
import org.apache.hadoop.hbase.io.Cell;
|
||||||
import org.apache.hadoop.hbase.io.RowResult;
|
import org.apache.hadoop.hbase.io.RowResult;
|
||||||
|
import org.apache.hadoop.hbase.io.HbaseMapWritable;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -48,6 +49,67 @@ public class TestHTable extends HBaseClusterTestCase implements HConstants {
|
||||||
private static final byte [] attrName = Bytes.toBytes("TESTATTR");
|
private static final byte [] attrName = Bytes.toBytes("TESTATTR");
|
||||||
private static final byte [] attrValue = Bytes.toBytes("somevalue");
|
private static final byte [] attrValue = Bytes.toBytes("somevalue");
|
||||||
|
|
||||||
|
public void testCheckAndSave() throws IOException {
|
||||||
|
HTable table = null;
|
||||||
|
HColumnDescriptor column2 =
|
||||||
|
new HColumnDescriptor(Bytes.toBytes("info2:"));
|
||||||
|
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||||
|
HTableDescriptor testTableADesc =
|
||||||
|
new HTableDescriptor(tableAname);
|
||||||
|
testTableADesc.addFamily(column);
|
||||||
|
testTableADesc.addFamily(column2);
|
||||||
|
admin.createTable(testTableADesc);
|
||||||
|
|
||||||
|
table = new HTable(conf, tableAname);
|
||||||
|
BatchUpdate batchUpdate = new BatchUpdate(row);
|
||||||
|
BatchUpdate batchUpdate2 = new BatchUpdate(row);
|
||||||
|
BatchUpdate batchUpdate3 = new BatchUpdate(row);
|
||||||
|
|
||||||
|
HbaseMapWritable<byte[],byte[]> expectedValues =
|
||||||
|
new HbaseMapWritable<byte[],byte[]>();
|
||||||
|
HbaseMapWritable<byte[],byte[]> badExpectedValues =
|
||||||
|
new HbaseMapWritable<byte[],byte[]>();
|
||||||
|
|
||||||
|
for(int i = 0; i < 5; i++) {
|
||||||
|
// This batchupdate is our initial batch update,
|
||||||
|
// As such we also set our expected values to the same values
|
||||||
|
// since we will be comparing the two
|
||||||
|
batchUpdate.put(COLUMN_FAMILY_STR+i, Bytes.toBytes(i));
|
||||||
|
expectedValues.put(Bytes.toBytes(COLUMN_FAMILY_STR+i), Bytes.toBytes(i));
|
||||||
|
|
||||||
|
badExpectedValues.put(Bytes.toBytes(COLUMN_FAMILY_STR+i),
|
||||||
|
Bytes.toBytes(500));
|
||||||
|
|
||||||
|
// This is our second batchupdate that we will use to update the initial
|
||||||
|
// batchupdate
|
||||||
|
batchUpdate2.put(COLUMN_FAMILY_STR+i, Bytes.toBytes(i+1));
|
||||||
|
|
||||||
|
// This final batch update is to check that our expected values (which
|
||||||
|
// are now wrong)
|
||||||
|
batchUpdate3.put(COLUMN_FAMILY_STR+i, Bytes.toBytes(i+2));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize rows
|
||||||
|
table.commit(batchUpdate);
|
||||||
|
|
||||||
|
// check if incorrect values are returned false
|
||||||
|
assertFalse(table.checkAndSave(batchUpdate2,badExpectedValues,null));
|
||||||
|
|
||||||
|
// make sure first expected values are correct
|
||||||
|
assertTrue(table.checkAndSave(batchUpdate2, expectedValues,null));
|
||||||
|
|
||||||
|
// make sure check and save truly saves the data after checking the expected
|
||||||
|
// values
|
||||||
|
RowResult r = table.getRow(row);
|
||||||
|
byte[][] columns = batchUpdate2.getColumns();
|
||||||
|
for(int i = 0;i < columns.length;i++) {
|
||||||
|
assertTrue(Bytes.equals(r.get(columns[i]).getValue(),batchUpdate2.get(columns[i])));
|
||||||
|
}
|
||||||
|
|
||||||
|
// make sure that the old expected values fail
|
||||||
|
assertFalse(table.checkAndSave(batchUpdate3, expectedValues,null));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* the test
|
* the test
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
|
Loading…
Reference in New Issue