HBASE-3583 Coprocessors: scannerNext and scannerClose hooks are called when HRegionInterface#get is invoked

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1084854 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2011-03-24 06:27:23 +00:00
parent c63fe3174a
commit a17043496b
8 changed files with 253 additions and 142 deletions

View File

@ -46,6 +46,9 @@ Release 0.91.0 - Unreleased
HBASE-3532 HRegion#equals is broken (Ted Yu via Stack)
HBASE-3697 Admin actions that use MetaReader to iterate regions need to
skip offline ones
HBASE-3583 Coprocessors: scannerNext and scannerClose hooks are called
when HRegionInterface#get is invoked (Mingjie Lai via
Andrew Purtell)
IMPROVEMENTS
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)

View File

@ -206,14 +206,14 @@ public abstract class BaseRegionObserverCoprocessor implements RegionObserver {
@Override
public boolean preScannerNext(final RegionCoprocessorEnvironment e,
final InternalScanner s, final List<KeyValue> results,
final InternalScanner s, final List<Result> results,
final int limit, final boolean hasMore) throws IOException {
return hasMore;
}
@Override
public boolean postScannerNext(final RegionCoprocessorEnvironment e,
final InternalScanner s, final List<KeyValue> results, final int limit,
final InternalScanner s, final List<Result> results, final int limit,
final boolean hasMore) throws IOException {
return hasMore;
}

View File

@ -492,7 +492,7 @@ public interface RegionObserver extends Coprocessor {
* @throws IOException if an error occurred on the coprocessor
*/
public boolean preScannerNext(final RegionCoprocessorEnvironment e,
final InternalScanner s, final List<KeyValue> result,
final InternalScanner s, final List<Result> result,
final int limit, final boolean hasNext)
throws IOException;
@ -510,7 +510,7 @@ public interface RegionObserver extends Coprocessor {
* @throws IOException if an error occurred on the coprocessor
*/
public boolean postScannerNext(final RegionCoprocessorEnvironment e,
final InternalScanner s, final List<KeyValue> result, final int limit,
final InternalScanner s, final List<Result> result, final int limit,
final boolean hasNext)
throws IOException;

View File

@ -2461,14 +2461,6 @@ public class HRegion implements HeapSize { // , Writable{
public synchronized boolean next(List<KeyValue> outResults, int limit)
throws IOException {
if (coprocessorHost != null) {
Boolean result = coprocessorHost.preScannerNext((InternalScanner)this,
outResults, limit);
if (result != null) {
return result.booleanValue();
}
}
if (this.filterClosed) {
throw new UnknownScannerException("Scanner was closed (timed out?) " +
"after we renewed it. Could be caused by a very slow scanner " +
@ -2484,11 +2476,6 @@ public class HRegion implements HeapSize { // , Writable{
boolean returnResult = nextInternal(limit);
if (coprocessorHost != null) {
returnResult = coprocessorHost.postScannerNext((InternalScanner)this,
results, limit, returnResult);
}
outResults.addAll(results);
resetFilters();
if (isFilterDone()) {
@ -2596,20 +2583,12 @@ public class HRegion implements HeapSize { // , Writable{
currentRow, 0, currentRow.length) <= isScan);
}
public synchronized void close() throws IOException {
if (coprocessorHost != null) {
if (coprocessorHost.preScannerClose((InternalScanner)this)) {
return;
}
}
public synchronized void close() {
if (storeHeap != null) {
storeHeap.close();
storeHeap = null;
}
this.filterClosed = true;
if (coprocessorHost != null) {
coprocessorHost.postScannerClose((InternalScanner)this);
}
}
}

View File

@ -1929,6 +1929,32 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
List<Result> results = new ArrayList<Result>(nbRows);
long currentScanResultSize = 0;
List<KeyValue> values = new ArrayList<KeyValue>();
// Call coprocessor. Get region info from scanner.
HRegion region = null;
if (s instanceof HRegion.RegionScanner) {
HRegion.RegionScanner rs = (HRegion.RegionScanner) s;
region = getRegion(rs.getRegionName().getRegionName());
} else {
throw new IOException("InternalScanner implementation is expected " +
"to be HRegion.RegionScanner.");
}
if (region != null && region.getCoprocessorHost() != null) {
Boolean bypass = region.getCoprocessorHost().preScannerNext(s,
results, nbRows);
if (!results.isEmpty()) {
for (Result r : results) {
for (KeyValue kv : r.raw()) {
currentScanResultSize += kv.heapSize();
}
}
}
if (bypass != null) {
return ((HRegion.RegionScanner) s).isFilterDone() && results.isEmpty() ? null
: results.toArray(new Result[0]);
}
}
for (int i = 0; i < nbRows
&& currentScanResultSize < maxScannerResultSize; i++) {
requestCount.incrementAndGet();
@ -1945,6 +1971,12 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
}
values.clear();
}
// coprocessor postNext hook
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerNext(s, results, nbRows, true);
}
// Below is an ugly hack where we cast the InternalScanner to be a
// HRegion.RegionScanner. The alternative is to change InternalScanner
// interface but its used everywhere whereas we just need a bit of info
@ -1967,10 +1999,33 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
checkOpen();
requestCount.incrementAndGet();
String scannerName = String.valueOf(scannerId);
InternalScanner s = scanners.remove(scannerName);
InternalScanner s = scanners.get(scannerName);
HRegion region = null;
if (s != null) {
// call coprocessor.
if (s instanceof HRegion.RegionScanner) {
HRegion.RegionScanner rs = (HRegion.RegionScanner) s;
region = getRegion(rs.getRegionName().getRegionName());
} else {
throw new IOException("InternalScanner implementation is expected " +
"to be HRegion.RegionScanner.");
}
if (region != null && region.getCoprocessorHost() != null) {
if (region.getCoprocessorHost().preScannerClose(s)) {
return; // bypass
}
}
}
s = scanners.remove(scannerName);
if (s != null) {
s.close();
this.leases.cancelLease(scannerName);
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(s);
}
}
} catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t));

View File

@ -911,7 +911,7 @@ public class RegionCoprocessorHost
* @exception IOException Exception
*/
public Boolean preScannerNext(final InternalScanner s,
final List<KeyValue> results, int limit) throws IOException {
final List<Result> results, int limit) throws IOException {
try {
boolean bypass = false;
boolean hasNext = false;
@ -941,7 +941,7 @@ public class RegionCoprocessorHost
* @exception IOException Exception
*/
public boolean postScannerNext(final InternalScanner s,
final List<KeyValue> results, final int limit, boolean hasMore)
final List<Result> results, final int limit, boolean hasMore)
throws IOException {
try {
coprocessorLock.readLock().lock();

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -68,6 +69,10 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
boolean hadPostIncrement = false;
boolean hadPreWALRestored = false;
boolean hadPostWALRestored = false;
boolean hadPreScannerNext = false;
boolean hadPostScannerNext = false;
boolean hadPreScannerClose = false;
boolean hadPostScannerClose = false;
@Override
public void preOpen(RegionCoprocessorEnvironment e) {
@ -135,6 +140,35 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
hadPostCompact = true;
}
@Override
public boolean preScannerNext(final RegionCoprocessorEnvironment e,
final InternalScanner s, final List<Result> results,
final int limit, final boolean hasMore) throws IOException {
hadPreScannerNext = true;
return hasMore;
}
@Override
public boolean postScannerNext(final RegionCoprocessorEnvironment e,
final InternalScanner s, final List<Result> results, final int limit,
final boolean hasMore) throws IOException {
hadPostScannerNext = true;
return hasMore;
}
@Override
public void preScannerClose(final RegionCoprocessorEnvironment e,
final InternalScanner s) throws IOException {
hadPreScannerClose = true;
}
@Override
public void postScannerClose(final RegionCoprocessorEnvironment e,
final InternalScanner s) throws IOException {
hadPostScannerClose = true;
}
public boolean wasCompacted() {
return hadPreCompact && hadPostCompact;
}
@ -146,11 +180,8 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
assertNotNull(e.getRegion());
assertNotNull(get);
assertNotNull(results);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
hadPreGet = true;
}
}
@Override
public void postGet(final RegionCoprocessorEnvironment e, final Get get,
@ -178,8 +209,8 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
assertTrue(foundA);
assertTrue(foundB);
assertTrue(foundC);
hadPostGet = true;
}
hadPostGet = true;
}
@Override
@ -205,8 +236,8 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
assertNotNull(kvs.get(0));
assertTrue(Bytes.equals(kvs.get(0).getQualifier(),
TestRegionObserverInterface.C));
hadPrePut = true;
}
hadPrePut = true;
}
@Override
@ -232,8 +263,8 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
assertNotNull(kvs.get(0));
assertTrue(Bytes.equals(kvs.get(0).getQualifier(),
TestRegionObserverInterface.C));
hadPostPut = true;
}
hadPostPut = true;
}
@Override
@ -242,8 +273,7 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(familyMap);
if (beforeDelete && e.getRegion().getTableDesc().getName().equals(
TestRegionObserverInterface.TEST_TABLE)) {
if (beforeDelete) {
hadPreDeleted = true;
}
}
@ -254,12 +284,9 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(familyMap);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
beforeDelete = false;
hadPostDeleted = true;
}
}
@Override
public void preGetClosestRowBefore(final RegionCoprocessorEnvironment e,
@ -269,8 +296,7 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
assertNotNull(e.getRegion());
assertNotNull(row);
assertNotNull(result);
if (beforeDelete && e.getRegion().getTableDesc().getName().equals(
TestRegionObserverInterface.TEST_TABLE)) {
if (beforeDelete) {
hadPreGetClosestRowBefore = true;
}
}
@ -283,63 +309,59 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
assertNotNull(e.getRegion());
assertNotNull(row);
assertNotNull(result);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
hadPostGetClosestRowBefore = true;
}
}
@Override
public void preIncrement(final RegionCoprocessorEnvironment e,
final Increment increment, final Result result) throws IOException {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE_2)) {
hadPreIncrement = true;
}
}
@Override
public void postIncrement(final RegionCoprocessorEnvironment e,
final Increment increment, final Result result) throws IOException {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE_2)) {
hadPostIncrement = true;
}
}
boolean hadPreGet() {
public boolean hadPreGet() {
return hadPreGet;
}
boolean hadPostGet() {
public boolean hadPostGet() {
return hadPostGet;
}
boolean hadPrePut() {
public boolean hadPrePut() {
return hadPrePut;
}
boolean hadPostPut() {
public boolean hadPostPut() {
return hadPostPut;
}
boolean hadDelete() {
public boolean hadDelete() {
return !beforeDelete;
}
boolean hadPreIncrement() {
public boolean hadPreIncrement() {
return hadPreIncrement;
}
boolean hadPostIncrement() {
public boolean hadPostIncrement() {
return hadPostIncrement;
}
boolean hadPreWALRestored() {
public boolean hadPreWALRestored() {
return hadPreWALRestored;
}
boolean hadPostWALRestored() {
public boolean hadPostWALRestored() {
return hadPostWALRestored;
}
public boolean wasScannerNextCalled() {
return hadPreScannerNext && hadPostScannerNext;
}
public boolean wasScannerCloseCalled() {
return hadPreScannerClose && hadPostScannerClose;
}
}

View File

@ -21,6 +21,7 @@
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import org.apache.commons.logging.Log;
@ -32,11 +33,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -47,6 +44,8 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.el.MethodNotFoundException;
import static org.junit.Assert.*;
public class TestRegionObserverInterface {
@ -54,16 +53,10 @@ public class TestRegionObserverInterface {
static final String DIR = "test/build/data/TestRegionObserver/";
public static final byte[] TEST_TABLE = Bytes.toBytes("TestTable");
public static final byte[] TEST_TABLE_2 = Bytes.toBytes("TestTable2");
public static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
public static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
public final static byte[] A = Bytes.toBytes("a");
public final static byte[] B = Bytes.toBytes("b");
public final static byte[] C = Bytes.toBytes("c");
public final static byte[] ROW = Bytes.toBytes("testrow");
public final static byte[] ROW1 = Bytes.toBytes("testrow1");
public final static byte[] ROW2 = Bytes.toBytes("testrow2");
private static final int ROWSIZE = 20;
private static byte [][] ROWS = makeN(ROW, ROWSIZE);
@ -80,17 +73,6 @@ public class TestRegionObserverInterface {
util.startMiniCluster(2);
cluster = util.getMiniHBaseCluster();
HTable table = util.createTable(TEST_TABLE_2, TEST_FAMILY);
for(int i = 0; i < ROWSIZE; i++) {
Put put = new Put(ROWS[i]);
put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
table.put(put);
}
// sleep here is an ugly hack to allow region transitions to finish
Thread.sleep(5000);
}
@AfterClass
@ -98,89 +80,159 @@ public class TestRegionObserverInterface {
util.shutdownMiniCluster();
}
HRegion initHRegion (byte [] tableName, String callingMethod,
Configuration conf, Class<?> implClass, byte [] ... families)
throws IOException{
HTableDescriptor htd = new HTableDescriptor(tableName);
for(byte [] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
HRegionInfo info = new HRegionInfo(htd, null, null, false);
Path path = new Path(DIR + callingMethod);
// this following piece is a hack. currently a coprocessorHost
// is secretly loaded at OpenRegionHandler. we don't really
// start a region server here, so just manually create cphost
// and set it to region.
HRegion r = HRegion.createHRegion(info, path, conf);
RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
r.setCoprocessorHost(host);
host.load(implClass, Priority.USER);
return r;
}
@Test
public void testRegionObserver() throws IOException {
byte[] TABLE = Bytes.toBytes(getClass().getName());
byte[][] FAMILIES = new byte[][] { A, B, C } ;
byte[] tableName = TEST_TABLE;
// recreate table every time in order to reset the status of the
// coproccessor.
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDelete"},
TEST_TABLE,
new Boolean[] {false, false, false, false, false});
Put put = new Put(ROW);
put.add(A, A, A);
put.add(B, B, B);
put.add(C, C, C);
table.put(put);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDelete"},
TEST_TABLE,
new Boolean[] {false, false, true, true, false}
);
Get get = new Get(ROW);
get.addColumn(A, A);
get.addColumn(B, B);
get.addColumn(C, C);
table.get(get);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDelete"},
TEST_TABLE,
new Boolean[] {true, true, true, true, false}
);
Delete delete = new Delete(ROW);
delete.deleteColumn(A, A);
delete.deleteColumn(B, B);
delete.deleteColumn(C, C);
table.delete(delete);
for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
for (HRegionInfo r : t.getRegionServer().getOnlineRegions()) {
if (!Arrays.equals(r.getTableDesc().getName(), TEST_TABLE)) {
continue;
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDelete"},
TEST_TABLE,
new Boolean[] {true, true, true, true, true}
);
util.deleteTable(tableName);
}
RegionCoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
getCoprocessorHost();
Coprocessor c = cph.findCoprocessor(SimpleRegionObserver.class.getName());
assertNotNull(c);
assertTrue(((SimpleRegionObserver)c).hadPreGet());
assertTrue(((SimpleRegionObserver)c).hadPostGet());
assertTrue(((SimpleRegionObserver)c).hadPrePut());
assertTrue(((SimpleRegionObserver)c).hadPostPut());
assertTrue(((SimpleRegionObserver)c).hadDelete());
}
}
}
// TODO: add tests for other methods which need to be tested
// at region servers.
@Test
public void testIncrementHook() throws IOException {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE_2);
byte[] tableName = TEST_TABLE;
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
Increment inc = new Increment(Bytes.toBytes(0));
inc.addColumn(TEST_FAMILY, TEST_QUALIFIER, 1);
inc.addColumn(A, A, 1);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreIncrement", "hadPostIncrement"},
tableName,
new Boolean[] {false, false}
);
table.increment(inc);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreIncrement", "hadPostIncrement"},
tableName,
new Boolean[] {true, true}
);
util.deleteTable(tableName);
}
@Test
// HBase-3583
public void testHBase3583() throws IOException {
byte[] tableName = Bytes.toBytes("testHBase3583");
util.createTable(tableName, new byte[][] {A, B, C});
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "wasScannerNextCalled",
"wasScannerCloseCalled"},
tableName,
new Boolean[] {false, false, false, false}
);
HTable table = new HTable(util.getConfiguration(), tableName);
Put put = new Put(ROW);
put.add(A, A, A);
table.put(put);
Get get = new Get(ROW);
get.addColumn(A, A);
table.get(get);
// verify that scannerNext and scannerClose upcalls won't be invoked
// when we perform get().
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "wasScannerNextCalled",
"wasScannerCloseCalled"},
tableName,
new Boolean[] {true, true, false, false}
);
Scan s = new Scan();
ResultScanner scanner = table.getScanner(s);
try {
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
}
} finally {
scanner.close();
}
// now scanner hooks should be invoked.
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"wasScannerNextCalled", "wasScannerCloseCalled"},
tableName,
new Boolean[] {true, true}
);
util.deleteTable(tableName);
}
// check each region whether the coprocessor upcalls are called or not.
private void verifyMethodResult(Class c, String methodName[], byte[] tableName,
Object value[]) throws IOException {
try {
for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
for (HRegionInfo r : t.getRegionServer().getOnlineRegions()) {
if (!Arrays.equals(r.getTableDesc().getName(), TEST_TABLE_2)) {
if (!Arrays.equals(r.getTableDesc().getName(), tableName)) {
continue;
}
RegionCoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
getCoprocessorHost();
Coprocessor c = cph.findCoprocessor(SimpleRegionObserver.class.getName());
assertTrue(((SimpleRegionObserver)c).hadPreIncrement());
assertTrue(((SimpleRegionObserver)c).hadPostIncrement());
Coprocessor cp = cph.findCoprocessor(c.getName());
assertNotNull(cp);
for (int i = 0; i < methodName.length; ++i) {
Method m = c.getMethod(methodName[i]);
Object o = m.invoke(cp);
assertTrue("Result of " + c.getName() + "." + methodName[i]
+ " is expected to be " + value[i].toString()
+ ", while we get " + o.toString(), o.equals(value[i]));
}
}
}
} catch (Exception e) {
throw new IOException(e.toString());
}
}
private static byte [][] makeN(byte [] base, int n) {
byte [][] ret = new byte[n][];