HBASE-5229 Provide basic building blocks for 'multi-row' local transactions.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1242037 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2012-02-08 18:30:37 +00:00
parent ec7daf9413
commit ff1f0decc4
6 changed files with 332 additions and 67 deletions

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.WrongRegionException;
import org.apache.hadoop.hbase.util.Bytes;
/**
* This class demonstrates how to implement atomic multi row transactions using
* {@link HRegion#mutateRowsWithLocks(java.util.Collection, java.util.Collection)}
* and Coprocessor endpoints.
*/
public class MultiRowMutationEndpoint extends BaseEndpointCoprocessor implements
MultiRowMutationProtocol {
@Override
public void mutateRows(List<Mutation> mutations) throws IOException {
// get the coprocessor environment
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) getEnvironment();
// set of rows to lock, sorted to avoid deadlocks
SortedSet<byte[]> rowsToLock = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
HRegionInfo regionInfo = env.getRegion().getRegionInfo();
for (Mutation m : mutations) {
// check whether rows are in range for this region
if (!HRegion.rowIsInRange(regionInfo, m.getRow())) {
String msg = "Requested row out of range '"
+ Bytes.toStringBinary(m.getRow()) + "'";
if (rowsToLock.isEmpty()) {
// if this is the first row, region might have moved,
// allow client to retry
throw new WrongRegionException(msg);
} else {
// rows are split between regions, do not retry
throw new DoNotRetryIOException(msg);
}
}
rowsToLock.add(m.getRow());
}
// call utility method on region
env.getRegion().mutateRowsWithLocks(mutations, rowsToLock);
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.regionserver.HRegion;
/**
* Defines a protocol to perform multi row transactions.
* See {@link MultiRowMutationEndpoint} for the implementation.
* </br>
* See
* {@link HRegion#mutateRowsWithLocks(java.util.Collection, java.util.Collection)}
* for details and limitations.
* </br>
* Example:
* <code><pre>
* List<Mutation> mutations = ...;
* Put p1 = new Put(row1);
* Put p2 = new Put(row2);
* ...
* mutations.add(p1);
* mutations.add(p2);
* MultiRowMutationProtocol mrOp = t.coprocessorProxy(
* MultiRowMutationProtocol.class, row1);
* mrOp.mutateRows(mutations);
* </pre></code>
*/
public interface MultiRowMutationProtocol extends CoprocessorProtocol {
public void mutateRows(List<Mutation> mutations) throws IOException;
}

View File

@ -4148,12 +4148,26 @@ public class HRegion implements HeapSize { // , Writable{
return results;
}
public void mutateRow(RowMutation rm,
Integer lockid) throws IOException {
public void mutateRow(RowMutation rm) throws IOException {
mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
}
/**
* Perform atomic mutations within the region.
* @param mutations The list of mutations to perform.
* <code>mutations</code> can contain operations for multiple rows.
* Caller has to ensure that all rows are contained in this region.
* @param rowsToLock Rows to lock
* If multiple rows are locked care should be taken that
* <code>rowsToLock</code> is sorted in order to avoid deadlocks.
* @throws IOException
*/
public void mutateRowsWithLocks(Collection<Mutation> mutations,
Collection<byte[]> rowsToLock) throws IOException {
boolean flush = false;
startRegionOperation();
Integer lid = null;
List<Integer> acquiredLocks = null;
try {
// 1. run all pre-hooks before the atomic operation
// if any pre hook indicates "bypass", bypass the entire operation
@ -4161,7 +4175,7 @@ public class HRegion implements HeapSize { // , Writable{
// one WALEdit is used for all edits.
WALEdit walEdit = new WALEdit();
if (coprocessorHost != null) {
for (Mutation m : rm.getMutations()) {
for (Mutation m : mutations) {
if (m instanceof Put) {
if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
// by pass everything
@ -4178,8 +4192,17 @@ public class HRegion implements HeapSize { // , Writable{
}
}
// 2. acquire the row lock
lid = getLock(lockid, rm.getRow(), true);
// 2. acquire the row lock(s)
acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
for (byte[] row : rowsToLock) {
// attempt to lock all involved rows, fail if one lock times out
Integer lid = getLock(null, row, true);
if (lid == null) {
throw new IOException("Failed to acquire lock on "
+ Bytes.toStringBinary(row));
}
acquiredLocks.add(lid);
}
// 3. acquire the region lock
this.updatesLock.readLock().lock();
@ -4191,7 +4214,7 @@ public class HRegion implements HeapSize { // , Writable{
byte[] byteNow = Bytes.toBytes(now);
try {
// 5. Check mutations and apply edits to a single WALEdit
for (Mutation m : rm.getMutations()) {
for (Mutation m : mutations) {
if (m instanceof Put) {
Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
checkFamilies(familyMap.keySet());
@ -4218,7 +4241,7 @@ public class HRegion implements HeapSize { // , Writable{
// 7. apply to memstore
long addedSize = 0;
for (Mutation m : rm.getMutations()) {
for (Mutation m : mutations) {
addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
}
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
@ -4231,7 +4254,7 @@ public class HRegion implements HeapSize { // , Writable{
}
// 10. run all coprocessor post hooks, after region lock is released
if (coprocessorHost != null) {
for (Mutation m : rm.getMutations()) {
for (Mutation m : mutations) {
if (m instanceof Put) {
coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
} else if (m instanceof Delete) {
@ -4240,9 +4263,11 @@ public class HRegion implements HeapSize { // , Writable{
}
}
} finally {
if (lid != null) {
if (acquiredLocks != null) {
// 11. release the row lock
releaseRowLock(lid);
for (Integer lid : acquiredLocks) {
releaseRowLock(lid);
}
}
if (flush) {
// 12. Flush cache if needed. Do it outside update lock.

View File

@ -3160,7 +3160,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
throws IOException {
checkOpen();
if (regionName == null) {
throw new IOException("Invalid arguments to atomicMutation " +
throw new IOException("Invalid arguments to mutateRow " +
"regionName is null");
}
requestCount.incrementAndGet();
@ -3169,7 +3169,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
if (!region.getRegionInfo().isMetaTable()) {
this.cacheFlusher.reclaimMemStoreMemory();
}
region.mutateRow(rm, null);
region.mutateRow(rm);
} catch (IOException e) {
checkFileSystem();
throw e;

View File

@ -36,7 +36,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -65,6 +64,9 @@ import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HTable.DaemonThreadFactory;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationProtocol;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@ -112,6 +114,9 @@ public class TestFromClientSide {
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
MultiRowMutationEndpoint.class.getName());
// We need more than one region server in this test
TEST_UTIL.startMiniCluster(SLAVES);
}
@ -4040,6 +4045,31 @@ public class TestFromClientSide {
assertTrue(scan.getFamilyMap().containsKey(FAMILY));
}
@Test
public void testMultiRowMutation() throws Exception {
LOG.info("Starting testMultiRowMutation");
final byte [] TABLENAME = Bytes.toBytes("testMultiRowMutation");
final byte [] ROW1 = Bytes.toBytes("testRow1");
HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY);
List<Mutation> mrm = new ArrayList<Mutation>();
Put p = new Put(ROW);
p.add(FAMILY, QUALIFIER, VALUE);
mrm.add(p);
p = new Put(ROW1);
p.add(FAMILY, QUALIFIER, VALUE);
mrm.add(p);
MultiRowMutationProtocol mr = t.coprocessorProxy(
MultiRowMutationProtocol.class, ROW);
mr.mutateRows(mrm);
Get g = new Get(ROW);
Result r = t.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));
g = new Get(ROW1);
r = t.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));
}
@Test
public void testRowMutation() throws Exception {
LOG.info("Starting testRowMutation");

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -29,11 +32,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RowMutation;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
@ -246,11 +251,11 @@ public class TestAtomicOperation extends HBaseTestCase {
}
/**
* Test multi-threaded increments.
* Test multi-threaded row mutations.
*/
public void testRowMutationMultiThreads() throws IOException {
LOG.info("Starting test testMutationMultiThreads");
LOG.info("Starting test testRowMutationMultiThreads");
initHRegion(tableName, getName(), fam1);
// create 100 threads, each will alternate between adding and
@ -263,7 +268,52 @@ public class TestAtomicOperation extends HBaseTestCase {
AtomicInteger failures = new AtomicInteger(0);
// create all threads
for (int i = 0; i < numThreads; i++) {
all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures);
all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
@Override
public void run() {
boolean op = true;
for (int i=0; i<numOps; i++) {
try {
// throw in some flushes
if (r.nextFloat() < 0.001) {
LOG.debug("flushing");
region.flushcache();
}
long ts = timeStamps.incrementAndGet();
RowMutation rm = new RowMutation(row);
if (op) {
Put p = new Put(row, ts);
p.add(fam1, qual1, value1);
rm.add(p);
Delete d = new Delete(row);
d.deleteColumns(fam1, qual2, ts);
rm.add(d);
} else {
Delete d = new Delete(row);
d.deleteColumns(fam1, qual1, ts);
rm.add(d);
Put p = new Put(row, ts);
p.add(fam1, qual2, value2);
rm.add(p);
}
region.mutateRow(rm);
op ^= true;
// check: should always see exactly one column
Get g = new Get(row);
Result r = region.get(g, null);
if (r.size() != 1) {
LOG.debug(r);
failures.incrementAndGet();
fail();
}
} catch (IOException e) {
e.printStackTrace();
failures.incrementAndGet();
fail();
}
}
}
};
}
// run all threads
@ -282,62 +332,104 @@ public class TestAtomicOperation extends HBaseTestCase {
}
/**
* Test multi-threaded region mutations.
*/
public void testMultiRowMutationMultiThreads() throws IOException {
LOG.info("Starting test testMultiRowMutationMultiThreads");
initHRegion(tableName, getName(), fam1);
// create 100 threads, each will alternate between adding and
// removing a column
int numThreads = 100;
int opsPerThread = 1000;
AtomicOperation[] all = new AtomicOperation[numThreads];
AtomicLong timeStamps = new AtomicLong(0);
AtomicInteger failures = new AtomicInteger(0);
final List<byte[]> rowsToLock = Arrays.asList(row, row2);
// create all threads
for (int i = 0; i < numThreads; i++) {
all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
@Override
public void run() {
boolean op = true;
for (int i=0; i<numOps; i++) {
try {
// throw in some flushes
if (r.nextFloat() < 0.001) {
LOG.debug("flushing");
region.flushcache();
}
long ts = timeStamps.incrementAndGet();
List<Mutation> mrm = new ArrayList<Mutation>();
if (op) {
Put p = new Put(row2, ts);
p.add(fam1, qual1, value1);
mrm.add(p);
Delete d = new Delete(row);
d.deleteColumns(fam1, qual1, ts);
mrm.add(d);
} else {
Delete d = new Delete(row2);
d.deleteColumns(fam1, qual1, ts);
mrm.add(d);
Put p = new Put(row, ts);
p.add(fam1, qual1, value2);
mrm.add(p);
}
region.mutateRowsWithLocks(mrm, rowsToLock);
op ^= true;
// check: should always see exactly one column
Scan s = new Scan(row);
RegionScanner rs = region.getScanner(s);
List<KeyValue> r = new ArrayList<KeyValue>();
while(rs.next(r));
if (r.size() != 1) {
LOG.debug(r);
failures.incrementAndGet();
fail();
}
} catch (IOException e) {
e.printStackTrace();
failures.incrementAndGet();
fail();
}
}
}
};
}
// run all threads
for (int i = 0; i < numThreads; i++) {
all[i].start();
}
// wait for all threads to finish
for (int i = 0; i < numThreads; i++) {
try {
all[i].join();
} catch (InterruptedException e) {
}
}
assertEquals(0, failures.get());
}
public static class AtomicOperation extends Thread {
private final HRegion region;
private final int numOps;
private final AtomicLong timeStamps;
private final AtomicInteger failures;
private final Random r = new Random();
public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, AtomicInteger failures) {
protected final HRegion region;
protected final int numOps;
protected final AtomicLong timeStamps;
protected final AtomicInteger failures;
protected final Random r = new Random();
public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
AtomicInteger failures) {
this.region = region;
this.numOps = numOps;
this.timeStamps = timeStamps;
this.failures = failures;
}
@Override
public void run() {
boolean op = true;
for (int i=0; i<numOps; i++) {
try {
// throw in some flushes
if (r.nextFloat() < 0.001) {
LOG.debug("flushing");
region.flushcache();
}
long ts = timeStamps.incrementAndGet();
RowMutation arm = new RowMutation(row);
if (op) {
Put p = new Put(row, ts);
p.add(fam1, qual1, value1);
arm.add(p);
Delete d = new Delete(row);
d.deleteColumns(fam1, qual2, ts);
arm.add(d);
} else {
Delete d = new Delete(row);
d.deleteColumns(fam1, qual1, ts);
arm.add(d);
Put p = new Put(row, ts);
p.add(fam1, qual2, value2);
arm.add(p);
}
region.mutateRow(arm, null);
op ^= true;
// check: should always see exactly one column
Get g = new Get(row);
Result r = region.get(g, null);
if (r.size() != 1) {
LOG.debug(r);
failures.incrementAndGet();
fail();
}
} catch (IOException e) {
e.printStackTrace();
failures.incrementAndGet();
fail();
}
}
}
}
@org.junit.Rule