diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java new file mode 100644 index 00000000000..c640eaea390 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.WrongRegionException; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This class demonstrates how to implement atomic multi row transactions using + * {@link HRegion#mutateRowsWithLocks(java.util.Collection, java.util.Collection)} + * and Coprocessor endpoints. + */ +public class MultiRowMutationEndpoint extends BaseEndpointCoprocessor implements + MultiRowMutationProtocol { + + @Override + public void mutateRows(List mutations) throws IOException { + // get the coprocessor environment + RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) getEnvironment(); + + // set of rows to lock, sorted to avoid deadlocks + SortedSet rowsToLock = new TreeSet(Bytes.BYTES_COMPARATOR); + + HRegionInfo regionInfo = env.getRegion().getRegionInfo(); + for (Mutation m : mutations) { + // check whether rows are in range for this region + if (!HRegion.rowIsInRange(regionInfo, m.getRow())) { + String msg = "Requested row out of range '" + + Bytes.toStringBinary(m.getRow()) + "'"; + if (rowsToLock.isEmpty()) { + // if this is the first row, region might have moved, + // allow client to retry + throw new WrongRegionException(msg); + } else { + // rows are split between regions, do not retry + throw new DoNotRetryIOException(msg); + } + } + rowsToLock.add(m.getRow()); + } + // call utility method on region + env.getRegion().mutateRowsWithLocks(mutations, rowsToLock); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationProtocol.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationProtocol.java new file mode 100644 index 00000000000..e8eea9f9058 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationProtocol.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.regionserver.HRegion; + +/** + * Defines a protocol to perform multi row transactions. + * See {@link MultiRowMutationEndpoint} for the implementation. + *
+ * See + * {@link HRegion#mutateRowsWithLocks(java.util.Collection, java.util.Collection)} + * for details and limitations. + *
+ * Example: + *
+ * List mutations = ...;
+ * Put p1 = new Put(row1);
+ * Put p2 = new Put(row2);
+ * ...
+ * mutations.add(p1);
+ * mutations.add(p2);
+ * MultiRowMutationProtocol mrOp = t.coprocessorProxy(
+ *   MultiRowMutationProtocol.class, row1);
+ * mrOp.mutateRows(mutations);
+ * 
+ */ +public interface MultiRowMutationProtocol extends CoprocessorProtocol { + public void mutateRows(List mutations) throws IOException; +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 389b985f64b..d6c9db07ae4 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4148,12 +4148,26 @@ public class HRegion implements HeapSize { // , Writable{ return results; } - public void mutateRow(RowMutation rm, - Integer lockid) throws IOException { + public void mutateRow(RowMutation rm) throws IOException { + mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); + } + + /** + * Perform atomic mutations within the region. + * @param mutations The list of mutations to perform. + * mutations can contain operations for multiple rows. + * Caller has to ensure that all rows are contained in this region. + * @param rowsToLock Rows to lock + * If multiple rows are locked care should be taken that + * rowsToLock is sorted in order to avoid deadlocks. + * @throws IOException + */ + public void mutateRowsWithLocks(Collection mutations, + Collection rowsToLock) throws IOException { boolean flush = false; startRegionOperation(); - Integer lid = null; + List acquiredLocks = null; try { // 1. run all pre-hooks before the atomic operation // if any pre hook indicates "bypass", bypass the entire operation @@ -4161,7 +4175,7 @@ public class HRegion implements HeapSize { // , Writable{ // one WALEdit is used for all edits. WALEdit walEdit = new WALEdit(); if (coprocessorHost != null) { - for (Mutation m : rm.getMutations()) { + for (Mutation m : mutations) { if (m instanceof Put) { if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) { // by pass everything @@ -4178,8 +4192,17 @@ public class HRegion implements HeapSize { // , Writable{ } } - // 2. acquire the row lock - lid = getLock(lockid, rm.getRow(), true); + // 2. acquire the row lock(s) + acquiredLocks = new ArrayList(rowsToLock.size()); + for (byte[] row : rowsToLock) { + // attempt to lock all involved rows, fail if one lock times out + Integer lid = getLock(null, row, true); + if (lid == null) { + throw new IOException("Failed to acquire lock on " + + Bytes.toStringBinary(row)); + } + acquiredLocks.add(lid); + } // 3. acquire the region lock this.updatesLock.readLock().lock(); @@ -4191,7 +4214,7 @@ public class HRegion implements HeapSize { // , Writable{ byte[] byteNow = Bytes.toBytes(now); try { // 5. Check mutations and apply edits to a single WALEdit - for (Mutation m : rm.getMutations()) { + for (Mutation m : mutations) { if (m instanceof Put) { Map> familyMap = m.getFamilyMap(); checkFamilies(familyMap.keySet()); @@ -4218,7 +4241,7 @@ public class HRegion implements HeapSize { // , Writable{ // 7. apply to memstore long addedSize = 0; - for (Mutation m : rm.getMutations()) { + for (Mutation m : mutations) { addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w); } flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); @@ -4231,7 +4254,7 @@ public class HRegion implements HeapSize { // , Writable{ } // 10. run all coprocessor post hooks, after region lock is released if (coprocessorHost != null) { - for (Mutation m : rm.getMutations()) { + for (Mutation m : mutations) { if (m instanceof Put) { coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL()); } else if (m instanceof Delete) { @@ -4240,9 +4263,11 @@ public class HRegion implements HeapSize { // , Writable{ } } } finally { - if (lid != null) { + if (acquiredLocks != null) { // 11. release the row lock - releaseRowLock(lid); + for (Integer lid : acquiredLocks) { + releaseRowLock(lid); + } } if (flush) { // 12. Flush cache if needed. Do it outside update lock. diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7d7be3cb9c0..2afe1595587 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3160,7 +3160,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, throws IOException { checkOpen(); if (regionName == null) { - throw new IOException("Invalid arguments to atomicMutation " + + throw new IOException("Invalid arguments to mutateRow " + "regionName is null"); } requestCount.incrementAndGet(); @@ -3169,7 +3169,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, if (!region.getRegionInfo().isMetaTable()) { this.cacheFlusher.reclaimMemStoreMemory(); } - region.mutateRow(rm, null); + region.mutateRow(rm); } catch (IOException e) { checkFileSystem(); throw e; diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 23b6cd7f8e0..713f2b3ec91 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -36,7 +36,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -65,6 +64,9 @@ import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HTable.DaemonThreadFactory; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationProtocol; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -112,6 +114,9 @@ public class TestFromClientSide { */ @BeforeClass public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName()); // We need more than one region server in this test TEST_UTIL.startMiniCluster(SLAVES); } @@ -4040,6 +4045,31 @@ public class TestFromClientSide { assertTrue(scan.getFamilyMap().containsKey(FAMILY)); } + @Test + public void testMultiRowMutation() throws Exception { + LOG.info("Starting testMultiRowMutation"); + final byte [] TABLENAME = Bytes.toBytes("testMultiRowMutation"); + final byte [] ROW1 = Bytes.toBytes("testRow1"); + + HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY); + List mrm = new ArrayList(); + Put p = new Put(ROW); + p.add(FAMILY, QUALIFIER, VALUE); + mrm.add(p); + p = new Put(ROW1); + p.add(FAMILY, QUALIFIER, VALUE); + mrm.add(p); + MultiRowMutationProtocol mr = t.coprocessorProxy( + MultiRowMutationProtocol.class, ROW); + mr.mutateRows(mrm); + Get g = new Get(ROW); + Result r = t.get(g); + assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER))); + g = new Get(ROW1); + r = t.get(g); + assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER))); + } + @Test public void testRowMutation() throws Exception { LOG.info("Starting testRowMutation"); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index 948bfec1f29..6e81542e078 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -29,11 +32,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; @@ -246,11 +251,11 @@ public class TestAtomicOperation extends HBaseTestCase { } /** - * Test multi-threaded increments. + * Test multi-threaded row mutations. */ public void testRowMutationMultiThreads() throws IOException { - LOG.info("Starting test testMutationMultiThreads"); + LOG.info("Starting test testRowMutationMultiThreads"); initHRegion(tableName, getName(), fam1); // create 100 threads, each will alternate between adding and @@ -263,7 +268,52 @@ public class TestAtomicOperation extends HBaseTestCase { AtomicInteger failures = new AtomicInteger(0); // create all threads for (int i = 0; i < numThreads; i++) { - all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures); + all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { + @Override + public void run() { + boolean op = true; + for (int i=0; i rowsToLock = Arrays.asList(row, row2); + // create all threads + for (int i = 0; i < numThreads; i++) { + all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { + @Override + public void run() { + boolean op = true; + for (int i=0; i mrm = new ArrayList(); + if (op) { + Put p = new Put(row2, ts); + p.add(fam1, qual1, value1); + mrm.add(p); + Delete d = new Delete(row); + d.deleteColumns(fam1, qual1, ts); + mrm.add(d); + } else { + Delete d = new Delete(row2); + d.deleteColumns(fam1, qual1, ts); + mrm.add(d); + Put p = new Put(row, ts); + p.add(fam1, qual1, value2); + mrm.add(p); + } + region.mutateRowsWithLocks(mrm, rowsToLock); + op ^= true; + // check: should always see exactly one column + Scan s = new Scan(row); + RegionScanner rs = region.getScanner(s); + List r = new ArrayList(); + while(rs.next(r)); + if (r.size() != 1) { + LOG.debug(r); + failures.incrementAndGet(); + fail(); + } + } catch (IOException e) { + e.printStackTrace(); + failures.incrementAndGet(); + fail(); + } + } + } + }; + } + + // run all threads + for (int i = 0; i < numThreads; i++) { + all[i].start(); + } + + // wait for all threads to finish + for (int i = 0; i < numThreads; i++) { + try { + all[i].join(); + } catch (InterruptedException e) { + } + } + assertEquals(0, failures.get()); + } + public static class AtomicOperation extends Thread { - private final HRegion region; - private final int numOps; - private final AtomicLong timeStamps; - private final AtomicInteger failures; - private final Random r = new Random(); - public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, AtomicInteger failures) { + protected final HRegion region; + protected final int numOps; + protected final AtomicLong timeStamps; + protected final AtomicInteger failures; + protected final Random r = new Random(); + + public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, + AtomicInteger failures) { this.region = region; this.numOps = numOps; this.timeStamps = timeStamps; this.failures = failures; } - @Override - public void run() { - boolean op = true; - for (int i=0; i