HBASE-5515 Add a processRow API that supports atomic multiple reads and writes on a row (Scott Chen)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1298533 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
990de61c9b
commit
c4426b5b79
|
@ -0,0 +1,85 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Defines the procedure to atomically perform multiple scans and mutations
|
||||||
|
* on one single row. The generic type parameter T is the return type of
|
||||||
|
* RowProcessor.getResult().
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
public interface RowProcessor<T> extends Writable {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Which row to perform the read-write
|
||||||
|
*/
|
||||||
|
byte[] getRow();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Obtain the processing result
|
||||||
|
*/
|
||||||
|
T getResult();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is this operation read only? If this is true, process() should not add
|
||||||
|
* any mutations or it throws IOException.
|
||||||
|
* @return ture if read only operation
|
||||||
|
*/
|
||||||
|
boolean readOnly();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HRegion calls this to process a row. You should override this to create
|
||||||
|
* your own RowProcessor.
|
||||||
|
*
|
||||||
|
* @param now the current system millisecond
|
||||||
|
* @param scanner the call back object the can be used to scan the row
|
||||||
|
* @param mutations the mutations for HRegion to do
|
||||||
|
* @param walEdit the wal edit here allows inject some other meta data
|
||||||
|
*/
|
||||||
|
void process(long now,
|
||||||
|
RowProcessor.RowScanner scanner,
|
||||||
|
List<KeyValue> mutations,
|
||||||
|
WALEdit walEdit) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The call back provided by HRegion to perform the scans on the row
|
||||||
|
*/
|
||||||
|
public interface RowScanner {
|
||||||
|
/**
|
||||||
|
* @param scan The object defines what to read
|
||||||
|
* @param result The scan results will be added here
|
||||||
|
*/
|
||||||
|
void doScan(Scan scan, List<KeyValue> result) throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The replication cluster id.
|
||||||
|
*/
|
||||||
|
UUID getClusterId();
|
||||||
|
|
||||||
|
}
|
|
@ -49,9 +49,11 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.FutureTask;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -78,7 +80,6 @@ import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
import org.apache.hadoop.hbase.client.Append;
|
import org.apache.hadoop.hbase.client.Append;
|
||||||
import org.apache.hadoop.hbase.client.RowMutations;
|
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Increment;
|
import org.apache.hadoop.hbase.client.Increment;
|
||||||
|
@ -88,13 +89,14 @@ import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Row;
|
import org.apache.hadoop.hbase.client.Row;
|
||||||
import org.apache.hadoop.hbase.client.RowLock;
|
import org.apache.hadoop.hbase.client.RowLock;
|
||||||
|
import org.apache.hadoop.hbase.client.RowMutations;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RowProcessor;
|
||||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||||
import org.apache.hadoop.hbase.filter.NullComparator;
|
|
||||||
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
|
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
|
||||||
import org.apache.hadoop.hbase.io.HeapSize;
|
import org.apache.hadoop.hbase.io.HeapSize;
|
||||||
import org.apache.hadoop.hbase.io.TimeRange;
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
|
@ -228,13 +230,14 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
final Configuration conf;
|
final Configuration conf;
|
||||||
final int rowLockWaitDuration;
|
final int rowLockWaitDuration;
|
||||||
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
|
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
|
||||||
|
static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 10 * 1000L;
|
||||||
final HRegionInfo regionInfo;
|
final HRegionInfo regionInfo;
|
||||||
final Path regiondir;
|
final Path regiondir;
|
||||||
KeyValue.KVComparator comparator;
|
KeyValue.KVComparator comparator;
|
||||||
|
|
||||||
private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
|
private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* @return The smallest mvcc readPoint across all the scanners in this
|
* @return The smallest mvcc readPoint across all the scanners in this
|
||||||
* region. Writes older than this readPoint, are included in every
|
* region. Writes older than this readPoint, are included in every
|
||||||
* read operation.
|
* read operation.
|
||||||
|
@ -297,6 +300,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
|
|
||||||
long memstoreFlushSize;
|
long memstoreFlushSize;
|
||||||
final long timestampSlop;
|
final long timestampSlop;
|
||||||
|
final long rowProcessorTimeout;
|
||||||
private volatile long lastFlushTime;
|
private volatile long lastFlushTime;
|
||||||
final RegionServerServices rsServices;
|
final RegionServerServices rsServices;
|
||||||
private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
|
private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
|
||||||
|
@ -413,6 +417,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
this.rsServices = null;
|
this.rsServices = null;
|
||||||
this.fs = null;
|
this.fs = null;
|
||||||
this.timestampSlop = HConstants.LATEST_TIMESTAMP;
|
this.timestampSlop = HConstants.LATEST_TIMESTAMP;
|
||||||
|
this.rowProcessorTimeout = DEFAULT_ROW_PROCESSOR_TIMEOUT;
|
||||||
this.memstoreFlushSize = 0L;
|
this.memstoreFlushSize = 0L;
|
||||||
this.log = null;
|
this.log = null;
|
||||||
this.regiondir = null;
|
this.regiondir = null;
|
||||||
|
@ -476,6 +481,9 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
"hbase.hregion.keyvalue.timestamp.slop.millisecs",
|
"hbase.hregion.keyvalue.timestamp.slop.millisecs",
|
||||||
HConstants.LATEST_TIMESTAMP);
|
HConstants.LATEST_TIMESTAMP);
|
||||||
|
|
||||||
|
this.rowProcessorTimeout = conf.getLong(
|
||||||
|
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
|
||||||
|
|
||||||
// don't initialize coprocessors if not running within a regionserver
|
// don't initialize coprocessors if not running within a regionserver
|
||||||
// TODO: revisit if coprocessors should load in other cases
|
// TODO: revisit if coprocessors should load in other cases
|
||||||
if (rsServices != null) {
|
if (rsServices != null) {
|
||||||
|
@ -4288,6 +4296,155 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Performs atomic multiple reads and writes on a given row.
|
||||||
|
* @param processor The object defines the reads and writes to a row.
|
||||||
|
*/
|
||||||
|
public void processRow(RowProcessor<?> processor)
|
||||||
|
throws IOException {
|
||||||
|
byte[] row = processor.getRow();
|
||||||
|
checkRow(row, "processRow");
|
||||||
|
if (!processor.readOnly()) {
|
||||||
|
checkReadOnly();
|
||||||
|
}
|
||||||
|
checkResources();
|
||||||
|
|
||||||
|
MultiVersionConsistencyControl.WriteEntry writeEntry = null;
|
||||||
|
|
||||||
|
startRegionOperation();
|
||||||
|
|
||||||
|
boolean locked = false;
|
||||||
|
boolean walSyncSuccessful = false;
|
||||||
|
Integer rowLockID = null;
|
||||||
|
long addedSize = 0;
|
||||||
|
List<KeyValue> mutations = new ArrayList<KeyValue>();
|
||||||
|
try {
|
||||||
|
// 1. Row lock
|
||||||
|
rowLockID = getLock(null, row, true);
|
||||||
|
|
||||||
|
// 2. Region lock
|
||||||
|
this.updatesLock.readLock().lock();
|
||||||
|
locked = true;
|
||||||
|
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
|
try {
|
||||||
|
// 3. Let the processor scan the row and generate mutations
|
||||||
|
WALEdit walEdits = new WALEdit();
|
||||||
|
doProcessRowWithTimeout(processor, now, rowScanner, mutations,
|
||||||
|
walEdits, rowProcessorTimeout);
|
||||||
|
if (processor.readOnly() && !mutations.isEmpty()) {
|
||||||
|
throw new IOException(
|
||||||
|
"Processor is readOnly but generating mutations on row:" +
|
||||||
|
Bytes.toStringBinary(row));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!mutations.isEmpty()) {
|
||||||
|
// 4. Get a mvcc write number
|
||||||
|
writeEntry = mvcc.beginMemstoreInsert();
|
||||||
|
// 5. Apply to memstore and a WALEdit
|
||||||
|
for (KeyValue kv : mutations) {
|
||||||
|
kv.setMemstoreTS(writeEntry.getWriteNumber());
|
||||||
|
walEdits.add(kv);
|
||||||
|
addedSize += stores.get(kv.getFamily()).add(kv);
|
||||||
|
}
|
||||||
|
|
||||||
|
long txid = 0;
|
||||||
|
// 6. Append no sync
|
||||||
|
if (!walEdits.isEmpty()) {
|
||||||
|
txid = this.log.appendNoSync(this.regionInfo,
|
||||||
|
this.htableDescriptor.getName(), walEdits,
|
||||||
|
processor.getClusterId(), now, this.htableDescriptor);
|
||||||
|
}
|
||||||
|
// 7. Release region lock
|
||||||
|
if (locked) {
|
||||||
|
this.updatesLock.readLock().unlock();
|
||||||
|
locked = false;
|
||||||
|
}
|
||||||
|
// 8. Release row lock
|
||||||
|
if (rowLockID != null) {
|
||||||
|
releaseRowLock(rowLockID);
|
||||||
|
rowLockID = null;
|
||||||
|
}
|
||||||
|
// 9. Sync edit log
|
||||||
|
if (txid != 0) {
|
||||||
|
this.log.sync(txid);
|
||||||
|
}
|
||||||
|
walSyncSuccessful = true;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (!mutations.isEmpty() && !walSyncSuccessful) {
|
||||||
|
LOG.warn("Wal sync failed. Roll back " + mutations.size() +
|
||||||
|
" memstore keyvalues for row:" + processor.getRow());
|
||||||
|
for (KeyValue kv : mutations) {
|
||||||
|
stores.get(kv.getFamily()).rollback(kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 10. Roll mvcc forward
|
||||||
|
if (writeEntry != null) {
|
||||||
|
mvcc.completeMemstoreInsert(writeEntry);
|
||||||
|
writeEntry = null;
|
||||||
|
}
|
||||||
|
if (locked) {
|
||||||
|
this.updatesLock.readLock().unlock();
|
||||||
|
locked = false;
|
||||||
|
}
|
||||||
|
if (rowLockID != null) {
|
||||||
|
releaseRowLock(rowLockID);
|
||||||
|
rowLockID = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
closeRegionOperation();
|
||||||
|
if (!mutations.isEmpty() &&
|
||||||
|
isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
|
||||||
|
requestFlush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doProcessRowWithTimeout(final RowProcessor<?> processor,
|
||||||
|
final long now,
|
||||||
|
final RowProcessor.RowScanner scanner,
|
||||||
|
final List<KeyValue> mutations,
|
||||||
|
final WALEdit walEdits,
|
||||||
|
final long timeout) throws IOException {
|
||||||
|
FutureTask<Void> task =
|
||||||
|
new FutureTask<Void>(new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws IOException {
|
||||||
|
processor.process(now, scanner, mutations, walEdits);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Thread t = new Thread(task);
|
||||||
|
t.setDaemon(true);
|
||||||
|
t.start();
|
||||||
|
try {
|
||||||
|
task.get(timeout, TimeUnit.MILLISECONDS);
|
||||||
|
} catch (TimeoutException te) {
|
||||||
|
LOG.error("RowProcessor timeout on row:" +
|
||||||
|
Bytes.toStringBinary(processor.getRow()) + " timeout:" + timeout, te);
|
||||||
|
throw new IOException(te);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final private RowProcessor.RowScanner rowScanner =
|
||||||
|
new RowProcessor.RowScanner() {
|
||||||
|
@Override
|
||||||
|
public void doScan(Scan scan, List<KeyValue> result) throws IOException {
|
||||||
|
InternalScanner scanner = null;
|
||||||
|
try {
|
||||||
|
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
|
||||||
|
scanner = HRegion.this.getScanner(scan);
|
||||||
|
result.clear();
|
||||||
|
scanner.next(result);
|
||||||
|
} finally {
|
||||||
|
if (scanner != null) scanner.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// TODO: There's a lot of boiler plate code identical
|
// TODO: There's a lot of boiler plate code identical
|
||||||
// to increment... See how to better unify that.
|
// to increment... See how to better unify that.
|
||||||
/**
|
/**
|
||||||
|
@ -4660,8 +4817,8 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||||
ClassSize.OBJECT +
|
ClassSize.OBJECT +
|
||||||
ClassSize.ARRAY +
|
ClassSize.ARRAY +
|
||||||
30 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
|
31 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
|
||||||
(5 * Bytes.SIZEOF_LONG) +
|
(6 * Bytes.SIZEOF_LONG) +
|
||||||
Bytes.SIZEOF_BOOLEAN);
|
Bytes.SIZEOF_BOOLEAN);
|
||||||
|
|
||||||
public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
|
public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
|
||||||
|
|
|
@ -0,0 +1,321 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import com.sun.org.apache.commons.logging.Log;
|
||||||
|
import com.sun.org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies ProcessRowEndpoint works.
|
||||||
|
* The tested RowProcessor performs two scans and a read-modify-write.
|
||||||
|
*/
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestProcessRowEndpoint {
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(TestProcessRowEndpoint.class);
|
||||||
|
|
||||||
|
private static final byte[] TABLE = Bytes.toBytes("testtable");
|
||||||
|
private static final byte[] TABLE2 = Bytes.toBytes("testtable2");
|
||||||
|
private final static byte[] ROW = Bytes.toBytes("testrow");
|
||||||
|
private final static byte[] FAM = Bytes.toBytes("friendlist");
|
||||||
|
|
||||||
|
// Column names
|
||||||
|
private final static byte[] A = Bytes.toBytes("a");
|
||||||
|
private final static byte[] B = Bytes.toBytes("b");
|
||||||
|
private final static byte[] C = Bytes.toBytes("c");
|
||||||
|
private final static byte[] D = Bytes.toBytes("d");
|
||||||
|
private final static byte[] E = Bytes.toBytes("e");
|
||||||
|
private final static byte[] F = Bytes.toBytes("f");
|
||||||
|
private final static byte[] G = Bytes.toBytes("g");
|
||||||
|
private final static byte[] REQUESTS = Bytes.toBytes("requests");
|
||||||
|
|
||||||
|
private static HBaseTestingUtility util = new HBaseTestingUtility();
|
||||||
|
private volatile int numRequests;
|
||||||
|
|
||||||
|
private CountDownLatch startSignal;
|
||||||
|
private CountDownLatch doneSignal;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupBeforeClass() throws Exception {
|
||||||
|
Configuration conf = util.getConfiguration();
|
||||||
|
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||||
|
FriendsOfFriendsEndpoint.class.getName());
|
||||||
|
util.startMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
util.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSingle() throws Throwable {
|
||||||
|
HTable table = prepareTestData(TABLE, util);
|
||||||
|
verifyProcessRow(table);
|
||||||
|
assertEquals(1, numRequests);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyProcessRow(HTable table) throws Throwable {
|
||||||
|
|
||||||
|
FriendsOfFriendsProtocol processor =
|
||||||
|
table.coprocessorProxy(FriendsOfFriendsProtocol.class, ROW);
|
||||||
|
Result result = processor.query(ROW, A);
|
||||||
|
|
||||||
|
Set<String> friendsOfFriends = new HashSet<String>();
|
||||||
|
for (KeyValue kv : result.raw()) {
|
||||||
|
if (Bytes.equals(kv.getQualifier(), REQUESTS)) {
|
||||||
|
numRequests = Bytes.toInt(kv.getValue());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (byte val : kv.getValue()) {
|
||||||
|
friendsOfFriends.add((char)val + "");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Set<String> expected =
|
||||||
|
new HashSet<String>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
|
||||||
|
assertEquals(expected, friendsOfFriends);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testThreads() throws Exception {
|
||||||
|
HTable table = prepareTestData(TABLE2, util);
|
||||||
|
int numThreads = 1000;
|
||||||
|
startSignal = new CountDownLatch(numThreads);
|
||||||
|
doneSignal = new CountDownLatch(numThreads);
|
||||||
|
for (int i = 0; i < numThreads; ++i) {
|
||||||
|
new Thread(new QueryRunner(table)).start();
|
||||||
|
startSignal.countDown();
|
||||||
|
}
|
||||||
|
doneSignal.await();
|
||||||
|
Get get = new Get(ROW);
|
||||||
|
LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).list()));
|
||||||
|
assertEquals(numThreads, numRequests);
|
||||||
|
}
|
||||||
|
|
||||||
|
class QueryRunner implements Runnable {
|
||||||
|
final HTable table;
|
||||||
|
QueryRunner(final HTable table) {
|
||||||
|
this.table = table;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
startSignal.await();
|
||||||
|
verifyProcessRow(table);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
doneSignal.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static HTable prepareTestData(byte[] tableName, HBaseTestingUtility util)
|
||||||
|
throws Exception {
|
||||||
|
HTable table = util.createTable(tableName, FAM);
|
||||||
|
Put put = new Put(ROW);
|
||||||
|
put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A
|
||||||
|
put.add(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B
|
||||||
|
put.add(FAM, C, G); // G is a friend of C
|
||||||
|
table.put(put);
|
||||||
|
return table;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Coprocessor protocol that finds friends of friends of a person and
|
||||||
|
* update the number of requests.
|
||||||
|
*/
|
||||||
|
public static interface FriendsOfFriendsProtocol extends CoprocessorProtocol {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Query a person's friends of friends
|
||||||
|
*/
|
||||||
|
Result query(byte[] row, byte[] person) throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds friends of friends of a person and update the number of requests.
|
||||||
|
*/
|
||||||
|
public static class FriendsOfFriendsEndpoint extends BaseEndpointCoprocessor
|
||||||
|
implements FriendsOfFriendsProtocol, RowProcessor<Result> {
|
||||||
|
byte[] row = null;
|
||||||
|
byte[] person = null;
|
||||||
|
Result result = null;
|
||||||
|
|
||||||
|
//
|
||||||
|
// FriendsOfFriendsProtocol method
|
||||||
|
//
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result query(byte[] row, byte[] person) throws IOException {
|
||||||
|
this.row = row;
|
||||||
|
this.person = person;
|
||||||
|
HRegion region =
|
||||||
|
((RegionCoprocessorEnvironment) getEnvironment()).getRegion();
|
||||||
|
region.processRow(this);
|
||||||
|
return this.getResult();
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// RowProcessor methods
|
||||||
|
//
|
||||||
|
|
||||||
|
FriendsOfFriendsEndpoint() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] getRow() {
|
||||||
|
return row;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result getResult() {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean readOnly() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(long now, RowProcessor.RowScanner scanner,
|
||||||
|
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
||||||
|
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||||
|
{ // First scan to get friends of the person and numRequests
|
||||||
|
Scan scan = new Scan(row, row);
|
||||||
|
scan.addColumn(FAM, person);
|
||||||
|
scan.addColumn(FAM, REQUESTS);
|
||||||
|
scanner.doScan(scan, kvs);
|
||||||
|
}
|
||||||
|
LOG.debug("first scan:" + stringifyKvs(kvs));
|
||||||
|
int numRequests = 0;
|
||||||
|
// Second scan to get friends of friends
|
||||||
|
Scan scan = new Scan(row, row);
|
||||||
|
for (KeyValue kv : kvs) {
|
||||||
|
if (Bytes.equals(kv.getQualifier(), REQUESTS)) {
|
||||||
|
numRequests = Bytes.toInt(kv.getValue());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
byte[] friends = kv.getValue();
|
||||||
|
for (byte f : friends) {
|
||||||
|
scan.addColumn(FAM, new byte[]{f});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
scanner.doScan(scan, kvs);
|
||||||
|
|
||||||
|
LOG.debug("second scan:" + stringifyKvs(kvs));
|
||||||
|
numRequests += 1;
|
||||||
|
// Construct mutations and Result
|
||||||
|
KeyValue kv = new KeyValue(
|
||||||
|
row, FAM, REQUESTS, now, Bytes.toBytes(numRequests));
|
||||||
|
mutations.clear();
|
||||||
|
mutations.add(kv);
|
||||||
|
kvs.add(kv);
|
||||||
|
LOG.debug("final result:" + stringifyKvs(kvs) +
|
||||||
|
" mutations:" + stringifyKvs(mutations));
|
||||||
|
result = new Result(kvs);
|
||||||
|
// Inject some meta data to the walEdit
|
||||||
|
KeyValue metaKv = new KeyValue(
|
||||||
|
getRow(), HLog.METAFAMILY,
|
||||||
|
Bytes.toBytes("FriendsOfFriends query"),
|
||||||
|
person);
|
||||||
|
walEdit.add(metaKv);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
this.person = Bytes.readByteArray(in);
|
||||||
|
this.row = Bytes.readByteArray(in);
|
||||||
|
this.result = new Result();
|
||||||
|
result.readFields(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
Bytes.writeByteArray(out, person);
|
||||||
|
Bytes.writeByteArray(out, row);
|
||||||
|
if (result == null) {
|
||||||
|
new Result().write(out);
|
||||||
|
} else {
|
||||||
|
result.write(out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UUID getClusterId() {
|
||||||
|
return HConstants.DEFAULT_CLUSTER_ID;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static String stringifyKvs(Collection<KeyValue> kvs) {
|
||||||
|
StringBuilder out = new StringBuilder();
|
||||||
|
out.append("[");
|
||||||
|
for (KeyValue kv : kvs) {
|
||||||
|
byte[] col = kv.getQualifier();
|
||||||
|
byte[] val = kv.getValue();
|
||||||
|
if (Bytes.equals(col, REQUESTS)) {
|
||||||
|
out.append(Bytes.toStringBinary(col) + ":" +
|
||||||
|
Bytes.toInt(val) + " ");
|
||||||
|
} else {
|
||||||
|
out.append(Bytes.toStringBinary(col) + ":" +
|
||||||
|
Bytes.toStringBinary(val) + " ");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out.append("]");
|
||||||
|
return out.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@org.junit.Rule
|
||||||
|
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||||
|
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||||
|
}
|
Loading…
Reference in New Issue