From 18815c8879f8482fc4fb1780eafbf0c62294c161 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Tue, 6 Jan 2009 20:21:17 +0000 Subject: [PATCH] HBASE-1090 Atomic Check And Save in HTable git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@732094 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../apache/hadoop/hbase/client/HTable.java | 28 ++++++ .../hadoop/hbase/ipc/HRegionInterface.java | 16 ++++ .../hadoop/hbase/regionserver/HRegion.java | 90 +++++++++++++++++++ .../hbase/regionserver/HRegionServer.java | 20 +++++ .../hadoop/hbase/client/TestHTable.java | 62 +++++++++++++ 6 files changed, 217 insertions(+) diff --git a/CHANGES.txt b/CHANGES.txt index 3a574f0c557..f089c59856c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -209,6 +209,7 @@ Release 0.19.0 - Unreleased HBASE-1106 Expose getClosestRowBefore in HTable (Michael Gottesman via Stack) HBASE-1082 Administrative functions for table/region maintenance + HBASE-1090 Atomic Check And Save in HTable (Michael Gottesman via Stack) NEW FEATURES HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters] diff --git a/src/java/org/apache/hadoop/hbase/client/HTable.java b/src/java/org/apache/hadoop/hbase/client/HTable.java index 07ca111e694..09e12760231 100644 --- a/src/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/java/org/apache/hadoop/hbase/client/HTable.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.io.BatchOperation; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; @@ -1334,6 +1335,33 @@ public class HTable { } } + /** + * Atomically checks if a row's values match + * the expectedValues. If it does, it uses the + * batchUpdate to update the row. + * @param batchUpdate batchupdate to apply if check is successful + * @param expectedValues values to check + * @param rl rowlock + * @throws IOException + */ + public synchronized boolean checkAndSave(final BatchUpdate batchUpdate, + final HbaseMapWritable expectedValues, final RowLock rl) + throws IOException { + checkRowAndColumns(batchUpdate); + if(rl != null) { + batchUpdate.setRowLock(rl.getLockId()); + } + return connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, batchUpdate.getRow()) { + public Boolean call() throws IOException { + return server.checkAndSave(location.getRegionInfo().getRegionName(), + batchUpdate, expectedValues)? + Boolean.TRUE: Boolean.FALSE; + } + } + ).booleanValue(); + } + /** * Commit to the table the buffer of BatchUpdate. * Called automaticaly in the commit methods when autoFlush is true. diff --git a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index 33abc621d12..2366192a013 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotServingRegionException; @@ -117,6 +118,21 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion { public int batchUpdates(final byte[] regionName, final BatchUpdate[] b) throws IOException; + /** + * Applies a batch of updates to one row atomically via one RPC + * if the columns specified in expectedValues match + * the given values in expectedValues + * + * @param regionName name of the region to update + * @param b BatchUpdate + * @param expectedValues map of column names to expected data values. + * @throws IOException + */ + public boolean checkAndSave(final byte [] regionName, final BatchUpdate b, + final HbaseMapWritable expectedValues) + throws IOException; + + /** * Delete all cells that match the passed row and column and whose timestamp * is equal-to or older than the passed timestamp. diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 04bdd2fee31..dffb64e3cea 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1291,6 +1291,96 @@ public class HRegion implements HConstants { } } + + /** + * Performs an atomic check and save operation. Checks if + * the specified expected values have changed, and if not + * applies the update. + * + * @param b the update to apply + * @param expectedValues the expected values to check + * @param writeToWAL whether or not to write to the write ahead log + */ + public boolean checkAndSave(BatchUpdate b, + HbaseMapWritable expectedValues, Integer lockid, + boolean writeToWAL) + throws IOException { + // This is basically a copy of batchUpdate with the atomic check and save + // added in. So you should read this method with batchUpdate. I will + // comment the areas that I have changed where I have not changed, you + // should read the comments from the batchUpdate method + boolean success = true; + checkReadOnly(); + checkResources(); + splitsAndClosesLock.readLock().lock(); + try { + byte[] row = b.getRow(); + Integer lid = getLock(lockid,row); + try { + Set keySet = expectedValues.keySet(); + Map actualValues = this.getFull(row,keySet, + HConstants.LATEST_TIMESTAMP, 1,lid); + for (byte[] key : keySet) { + // If test fails exit + if(!Bytes.equals(actualValues.get(key).getValue(), + expectedValues.get(key))) { + success = false; + break; + } + } + + if (success) { + long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP)? + System.currentTimeMillis(): b.getTimestamp(); + List deletes = null; + for (BatchOperation op: b) { + HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime, + this.regionInfo); + byte[] val = null; + if (op.isPut()) { + val = op.getValue(); + if (HLogEdit.isDeleted(val)) { + throw new IOException("Cannot insert value: " + val); + } + } else { + if (b.getTimestamp() == LATEST_TIMESTAMP) { + // Save off these deletes + if (deletes == null) { + deletes = new ArrayList(); + } + deletes.add(op.getColumn()); + } else { + val = HLogEdit.deleteBytes.get(); + } + } + if (val != null) { + localput(lid, key, val); + } + } + TreeMap edits = + this.targetColumns.remove(lid); + if (edits != null && edits.size() > 0) { + update(edits, writeToWAL); + } + if (deletes != null && deletes.size() > 0) { + // We have some LATEST_TIMESTAMP deletes to run. + for (byte [] column: deletes) { + deleteMultiple(row, column, LATEST_TIMESTAMP, 1); + } + } + } + } catch (IOException e) { + this.targetColumns.remove(Long.valueOf(lid)); + throw e; + } finally { + if(lockid == null) releaseRowLock(lid); + } + } finally { + splitsAndClosesLock.readLock().unlock(); + } + return success; + } + /* * Check if resources to support an update. * diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3f79640e315..6ce7cda6b5f 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1626,6 +1626,26 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { return -1; } + public boolean checkAndSave(final byte [] regionName, final BatchUpdate b, + final HbaseMapWritable expectedValues) + throws IOException { + if (b.getRow() == null) + throw new IllegalArgumentException("update has null row"); + checkOpen(); + this.requestCount.incrementAndGet(); + HRegion region = getRegion(regionName); + validateValuesLength(b, region); + try { + cacheFlusher.reclaimMemcacheMemory(); + boolean result = region.checkAndSave(b, + expectedValues,getLockFromId(b.getRowLock()), false); + return result; + } catch (Throwable t) { + throw convertThrowableToIOE(cleanup(t)); + } + } + + /** * Utility method to verify values length * @param batchUpdate The update to verify diff --git a/src/test/org/apache/hadoop/hbase/client/TestHTable.java b/src/test/org/apache/hadoop/hbase/client/TestHTable.java index 2c8953038b1..0c4a2fd30f6 100644 --- a/src/test/org/apache/hadoop/hbase/client/TestHTable.java +++ b/src/test/org/apache/hadoop/hbase/client/TestHTable.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.hbase.util.Bytes; /** @@ -48,6 +49,67 @@ public class TestHTable extends HBaseClusterTestCase implements HConstants { private static final byte [] attrName = Bytes.toBytes("TESTATTR"); private static final byte [] attrValue = Bytes.toBytes("somevalue"); + public void testCheckAndSave() throws IOException { + HTable table = null; + HColumnDescriptor column2 = + new HColumnDescriptor(Bytes.toBytes("info2:")); + HBaseAdmin admin = new HBaseAdmin(conf); + HTableDescriptor testTableADesc = + new HTableDescriptor(tableAname); + testTableADesc.addFamily(column); + testTableADesc.addFamily(column2); + admin.createTable(testTableADesc); + + table = new HTable(conf, tableAname); + BatchUpdate batchUpdate = new BatchUpdate(row); + BatchUpdate batchUpdate2 = new BatchUpdate(row); + BatchUpdate batchUpdate3 = new BatchUpdate(row); + + HbaseMapWritable expectedValues = + new HbaseMapWritable(); + HbaseMapWritable badExpectedValues = + new HbaseMapWritable(); + + for(int i = 0; i < 5; i++) { + // This batchupdate is our initial batch update, + // As such we also set our expected values to the same values + // since we will be comparing the two + batchUpdate.put(COLUMN_FAMILY_STR+i, Bytes.toBytes(i)); + expectedValues.put(Bytes.toBytes(COLUMN_FAMILY_STR+i), Bytes.toBytes(i)); + + badExpectedValues.put(Bytes.toBytes(COLUMN_FAMILY_STR+i), + Bytes.toBytes(500)); + + // This is our second batchupdate that we will use to update the initial + // batchupdate + batchUpdate2.put(COLUMN_FAMILY_STR+i, Bytes.toBytes(i+1)); + + // This final batch update is to check that our expected values (which + // are now wrong) + batchUpdate3.put(COLUMN_FAMILY_STR+i, Bytes.toBytes(i+2)); + } + + // Initialize rows + table.commit(batchUpdate); + + // check if incorrect values are returned false + assertFalse(table.checkAndSave(batchUpdate2,badExpectedValues,null)); + + // make sure first expected values are correct + assertTrue(table.checkAndSave(batchUpdate2, expectedValues,null)); + + // make sure check and save truly saves the data after checking the expected + // values + RowResult r = table.getRow(row); + byte[][] columns = batchUpdate2.getColumns(); + for(int i = 0;i < columns.length;i++) { + assertTrue(Bytes.equals(r.get(columns[i]).getValue(),batchUpdate2.get(columns[i]))); + } + + // make sure that the old expected values fail + assertFalse(table.checkAndSave(batchUpdate3, expectedValues,null)); + } + /** * the test * @throws IOException