diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessor.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessor.java new file mode 100644 index 00000000000..75c7768a37f --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.io.Writable; + +/** + * Defines the procedure to atomically perform multiple scans and mutations + * on one single row. The generic type parameter T is the return type of + * RowProcessor.getResult(). + */ +@InterfaceAudience.Public +public interface RowProcessor extends Writable { + + /** + * Which row to perform the read-write + */ + byte[] getRow(); + + /** + * Obtain the processing result + */ + T getResult(); + + /** + * Is this operation read only? If this is true, process() should not add + * any mutations or it throws IOException. + * @return ture if read only operation + */ + boolean readOnly(); + + /** + * HRegion calls this to process a row. You should override this to create + * your own RowProcessor. + * + * @param now the current system millisecond + * @param scanner the call back object the can be used to scan the row + * @param mutations the mutations for HRegion to do + * @param walEdit the wal edit here allows inject some other meta data + */ + void process(long now, + RowProcessor.RowScanner scanner, + List mutations, + WALEdit walEdit) throws IOException; + + /** + * The call back provided by HRegion to perform the scans on the row + */ + public interface RowScanner { + /** + * @param scan The object defines what to read + * @param result The scan results will be added here + */ + void doScan(Scan scan, List result) throws IOException; + } + + /** + * @return The replication cluster id. + */ + UUID getClusterId(); + +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 76ff42253ba..d422cf25abc 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -49,9 +49,11 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -78,7 +80,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -88,13 +89,14 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowLock; +import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.Exec; import org.apache.hadoop.hbase.client.coprocessor.ExecResult; +import org.apache.hadoop.hbase.coprocessor.RowProcessor; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; -import org.apache.hadoop.hbase.filter.NullComparator; import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TimeRange; @@ -228,13 +230,14 @@ public class HRegion implements HeapSize { // , Writable{ final Configuration conf; final int rowLockWaitDuration; static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; + static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 10 * 1000L; final HRegionInfo regionInfo; final Path regiondir; KeyValue.KVComparator comparator; private ConcurrentHashMap scannerReadPoints; - /* + /** * @return The smallest mvcc readPoint across all the scanners in this * region. Writes older than this readPoint, are included in every * read operation. @@ -297,6 +300,7 @@ public class HRegion implements HeapSize { // , Writable{ long memstoreFlushSize; final long timestampSlop; + final long rowProcessorTimeout; private volatile long lastFlushTime; final RegionServerServices rsServices; private List> recentFlushes = new ArrayList>(); @@ -413,6 +417,7 @@ public class HRegion implements HeapSize { // , Writable{ this.rsServices = null; this.fs = null; this.timestampSlop = HConstants.LATEST_TIMESTAMP; + this.rowProcessorTimeout = DEFAULT_ROW_PROCESSOR_TIMEOUT; this.memstoreFlushSize = 0L; this.log = null; this.regiondir = null; @@ -476,6 +481,9 @@ public class HRegion implements HeapSize { // , Writable{ "hbase.hregion.keyvalue.timestamp.slop.millisecs", HConstants.LATEST_TIMESTAMP); + this.rowProcessorTimeout = conf.getLong( + "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT); + // don't initialize coprocessors if not running within a regionserver // TODO: revisit if coprocessors should load in other cases if (rsServices != null) { @@ -4288,6 +4296,155 @@ public class HRegion implements HeapSize { // , Writable{ } } + /** + * Performs atomic multiple reads and writes on a given row. + * @param processor The object defines the reads and writes to a row. + */ + public void processRow(RowProcessor processor) + throws IOException { + byte[] row = processor.getRow(); + checkRow(row, "processRow"); + if (!processor.readOnly()) { + checkReadOnly(); + } + checkResources(); + + MultiVersionConsistencyControl.WriteEntry writeEntry = null; + + startRegionOperation(); + + boolean locked = false; + boolean walSyncSuccessful = false; + Integer rowLockID = null; + long addedSize = 0; + List mutations = new ArrayList(); + try { + // 1. Row lock + rowLockID = getLock(null, row, true); + + // 2. Region lock + this.updatesLock.readLock().lock(); + locked = true; + long now = EnvironmentEdgeManager.currentTimeMillis(); + try { + // 3. Let the processor scan the row and generate mutations + WALEdit walEdits = new WALEdit(); + doProcessRowWithTimeout(processor, now, rowScanner, mutations, + walEdits, rowProcessorTimeout); + if (processor.readOnly() && !mutations.isEmpty()) { + throw new IOException( + "Processor is readOnly but generating mutations on row:" + + Bytes.toStringBinary(row)); + } + + if (!mutations.isEmpty()) { + // 4. Get a mvcc write number + writeEntry = mvcc.beginMemstoreInsert(); + // 5. Apply to memstore and a WALEdit + for (KeyValue kv : mutations) { + kv.setMemstoreTS(writeEntry.getWriteNumber()); + walEdits.add(kv); + addedSize += stores.get(kv.getFamily()).add(kv); + } + + long txid = 0; + // 6. Append no sync + if (!walEdits.isEmpty()) { + txid = this.log.appendNoSync(this.regionInfo, + this.htableDescriptor.getName(), walEdits, + processor.getClusterId(), now, this.htableDescriptor); + } + // 7. Release region lock + if (locked) { + this.updatesLock.readLock().unlock(); + locked = false; + } + // 8. Release row lock + if (rowLockID != null) { + releaseRowLock(rowLockID); + rowLockID = null; + } + // 9. Sync edit log + if (txid != 0) { + this.log.sync(txid); + } + walSyncSuccessful = true; + } + } finally { + if (!mutations.isEmpty() && !walSyncSuccessful) { + LOG.warn("Wal sync failed. Roll back " + mutations.size() + + " memstore keyvalues for row:" + processor.getRow()); + for (KeyValue kv : mutations) { + stores.get(kv.getFamily()).rollback(kv); + } + } + // 10. Roll mvcc forward + if (writeEntry != null) { + mvcc.completeMemstoreInsert(writeEntry); + writeEntry = null; + } + if (locked) { + this.updatesLock.readLock().unlock(); + locked = false; + } + if (rowLockID != null) { + releaseRowLock(rowLockID); + rowLockID = null; + } + } + } finally { + closeRegionOperation(); + if (!mutations.isEmpty() && + isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) { + requestFlush(); + } + } + } + + private void doProcessRowWithTimeout(final RowProcessor processor, + final long now, + final RowProcessor.RowScanner scanner, + final List mutations, + final WALEdit walEdits, + final long timeout) throws IOException { + FutureTask task = + new FutureTask(new Callable() { + @Override + public Void call() throws IOException { + processor.process(now, scanner, mutations, walEdits); + return null; + } + }); + Thread t = new Thread(task); + t.setDaemon(true); + t.start(); + try { + task.get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException te) { + LOG.error("RowProcessor timeout on row:" + + Bytes.toStringBinary(processor.getRow()) + " timeout:" + timeout, te); + throw new IOException(te); + } catch (Exception e) { + throw new IOException(e); + } + } + + final private RowProcessor.RowScanner rowScanner = + new RowProcessor.RowScanner() { + @Override + public void doScan(Scan scan, List result) throws IOException { + InternalScanner scanner = null; + try { + scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + scanner = HRegion.this.getScanner(scan); + result.clear(); + scanner.next(result); + } finally { + if (scanner != null) scanner.close(); + } + } + }; + // TODO: There's a lot of boiler plate code identical // to increment... See how to better unify that. /** @@ -4660,8 +4817,8 @@ public class HRegion implements HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 30 * ClassSize.REFERENCE + Bytes.SIZEOF_INT + - (5 * Bytes.SIZEOF_LONG) + + 31 * ClassSize.REFERENCE + Bytes.SIZEOF_INT + + (6 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN); public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + diff --git a/src/test/java/org/apache/hadoop/hbase/coprocessor/TestProcessRowEndpoint.java b/src/test/java/org/apache/hadoop/hbase/coprocessor/TestProcessRowEndpoint.java new file mode 100644 index 00000000000..a4e495e5ecb --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/coprocessor/TestProcessRowEndpoint.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.sun.org.apache.commons.logging.Log; +import com.sun.org.apache.commons.logging.LogFactory; + +/** + * Verifies ProcessRowEndpoint works. + * The tested RowProcessor performs two scans and a read-modify-write. + */ +@Category(SmallTests.class) +public class TestProcessRowEndpoint { + + static final Log LOG = LogFactory.getLog(TestProcessRowEndpoint.class); + + private static final byte[] TABLE = Bytes.toBytes("testtable"); + private static final byte[] TABLE2 = Bytes.toBytes("testtable2"); + private final static byte[] ROW = Bytes.toBytes("testrow"); + private final static byte[] FAM = Bytes.toBytes("friendlist"); + + // Column names + private final static byte[] A = Bytes.toBytes("a"); + private final static byte[] B = Bytes.toBytes("b"); + private final static byte[] C = Bytes.toBytes("c"); + private final static byte[] D = Bytes.toBytes("d"); + private final static byte[] E = Bytes.toBytes("e"); + private final static byte[] F = Bytes.toBytes("f"); + private final static byte[] G = Bytes.toBytes("g"); + private final static byte[] REQUESTS = Bytes.toBytes("requests"); + + private static HBaseTestingUtility util = new HBaseTestingUtility(); + private volatile int numRequests; + + private CountDownLatch startSignal; + private CountDownLatch doneSignal; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration conf = util.getConfiguration(); + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + FriendsOfFriendsEndpoint.class.getName()); + util.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + @Test + public void testSingle() throws Throwable { + HTable table = prepareTestData(TABLE, util); + verifyProcessRow(table); + assertEquals(1, numRequests); + } + + private void verifyProcessRow(HTable table) throws Throwable { + + FriendsOfFriendsProtocol processor = + table.coprocessorProxy(FriendsOfFriendsProtocol.class, ROW); + Result result = processor.query(ROW, A); + + Set friendsOfFriends = new HashSet(); + for (KeyValue kv : result.raw()) { + if (Bytes.equals(kv.getQualifier(), REQUESTS)) { + numRequests = Bytes.toInt(kv.getValue()); + continue; + } + for (byte val : kv.getValue()) { + friendsOfFriends.add((char)val + ""); + } + } + Set expected = + new HashSet(Arrays.asList(new String[]{"d", "e", "f", "g"})); + assertEquals(expected, friendsOfFriends); + } + + @Test + public void testThreads() throws Exception { + HTable table = prepareTestData(TABLE2, util); + int numThreads = 1000; + startSignal = new CountDownLatch(numThreads); + doneSignal = new CountDownLatch(numThreads); + for (int i = 0; i < numThreads; ++i) { + new Thread(new QueryRunner(table)).start(); + startSignal.countDown(); + } + doneSignal.await(); + Get get = new Get(ROW); + LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).list())); + assertEquals(numThreads, numRequests); + } + + class QueryRunner implements Runnable { + final HTable table; + QueryRunner(final HTable table) { + this.table = table; + } + @Override + public void run() { + try { + startSignal.await(); + verifyProcessRow(table); + } catch (Throwable e) { + e.printStackTrace(); + } + doneSignal.countDown(); + } + } + + static HTable prepareTestData(byte[] tableName, HBaseTestingUtility util) + throws Exception { + HTable table = util.createTable(tableName, FAM); + Put put = new Put(ROW); + put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A + put.add(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B + put.add(FAM, C, G); // G is a friend of C + table.put(put); + return table; + } + + /** + * Coprocessor protocol that finds friends of friends of a person and + * update the number of requests. + */ + public static interface FriendsOfFriendsProtocol extends CoprocessorProtocol { + + /** + * Query a person's friends of friends + */ + Result query(byte[] row, byte[] person) throws IOException; + } + + /** + * Finds friends of friends of a person and update the number of requests. + */ + public static class FriendsOfFriendsEndpoint extends BaseEndpointCoprocessor + implements FriendsOfFriendsProtocol, RowProcessor { + byte[] row = null; + byte[] person = null; + Result result = null; + + // + // FriendsOfFriendsProtocol method + // + + @Override + public Result query(byte[] row, byte[] person) throws IOException { + this.row = row; + this.person = person; + HRegion region = + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion(); + region.processRow(this); + return this.getResult(); + } + + // + // RowProcessor methods + // + + FriendsOfFriendsEndpoint() { + } + + @Override + public byte[] getRow() { + return row; + } + + @Override + public Result getResult() { + return result; + } + + @Override + public boolean readOnly() { + return false; + } + + @Override + public void process(long now, RowProcessor.RowScanner scanner, + List mutations, WALEdit walEdit) throws IOException { + List kvs = new ArrayList(); + { // First scan to get friends of the person and numRequests + Scan scan = new Scan(row, row); + scan.addColumn(FAM, person); + scan.addColumn(FAM, REQUESTS); + scanner.doScan(scan, kvs); + } + LOG.debug("first scan:" + stringifyKvs(kvs)); + int numRequests = 0; + // Second scan to get friends of friends + Scan scan = new Scan(row, row); + for (KeyValue kv : kvs) { + if (Bytes.equals(kv.getQualifier(), REQUESTS)) { + numRequests = Bytes.toInt(kv.getValue()); + continue; + } + byte[] friends = kv.getValue(); + for (byte f : friends) { + scan.addColumn(FAM, new byte[]{f}); + } + } + scanner.doScan(scan, kvs); + + LOG.debug("second scan:" + stringifyKvs(kvs)); + numRequests += 1; + // Construct mutations and Result + KeyValue kv = new KeyValue( + row, FAM, REQUESTS, now, Bytes.toBytes(numRequests)); + mutations.clear(); + mutations.add(kv); + kvs.add(kv); + LOG.debug("final result:" + stringifyKvs(kvs) + + " mutations:" + stringifyKvs(mutations)); + result = new Result(kvs); + // Inject some meta data to the walEdit + KeyValue metaKv = new KeyValue( + getRow(), HLog.METAFAMILY, + Bytes.toBytes("FriendsOfFriends query"), + person); + walEdit.add(metaKv); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.person = Bytes.readByteArray(in); + this.row = Bytes.readByteArray(in); + this.result = new Result(); + result.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, person); + Bytes.writeByteArray(out, row); + if (result == null) { + new Result().write(out); + } else { + result.write(out); + } + } + + @Override + public UUID getClusterId() { + return HConstants.DEFAULT_CLUSTER_ID; + } + } + + static String stringifyKvs(Collection kvs) { + StringBuilder out = new StringBuilder(); + out.append("["); + for (KeyValue kv : kvs) { + byte[] col = kv.getQualifier(); + byte[] val = kv.getValue(); + if (Bytes.equals(col, REQUESTS)) { + out.append(Bytes.toStringBinary(col) + ":" + + Bytes.toInt(val) + " "); + } else { + out.append(Bytes.toStringBinary(col) + ":" + + Bytes.toStringBinary(val) + " "); + } + } + out.append("]"); + return out.toString(); + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +}