From 43841e340e81c87787448253e821d563a8e9ebd6 Mon Sep 17 00:00:00 2001 From: rajeshbabu Date: Sun, 15 Dec 2013 02:02:29 +0000 Subject: [PATCH] HBASE-9261 Add cp hooks after {start|close}RegionOperation git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1550990 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/client/ClientSideRegionScanner.java | 2 +- .../hbase/coprocessor/BaseRegionObserver.java | 16 ++++ .../hbase/coprocessor/RegionObserver.java | 31 ++++++++ .../hadoop/hbase/regionserver/HRegion.java | 74 ++++++++++++------- .../regionserver/RegionCoprocessorHost.java | 55 ++++++++++++++ .../coprocessor/SimpleRegionObserver.java | 46 +++++++++++- .../TestRegionObserverInterface.java | 20 +++-- 7 files changed, 206 insertions(+), 38 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java index 52b1e3167e6..a1e2fd9a510 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -104,8 +104,8 @@ public class ClientSideRegionScanner extends AbstractClientScanner { } } if (this.region != null) { - this.region.closeRegionOperation(); try { + this.region.closeRegionOperation(); this.region.close(true); this.region = null; } catch (IOException ex) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index 9aada51a63d..cffb9e09854 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; @@ -321,6 +322,11 @@ public abstract class BaseRegionObserver implements RegionObserver { final MiniBatchOperationInProgress miniBatchOp) throws IOException { } + @Override + public void postBatchMutateIndispensably(final ObserverContext ctx, + MiniBatchOperationInProgress miniBatchOp, final boolean success) throws IOException { + } + @Override public boolean preCheckAndPut(final ObserverContext e, final byte [] row, final byte [] family, final byte [] qualifier, @@ -481,4 +487,14 @@ public abstract class BaseRegionObserver implements RegionObserver { MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { return newCell; } + + @Override + public void postStartRegionOperation(final ObserverContext ctx, + Operation op) throws IOException { + } + + @Override + public void postCloseRegionOperation(final ObserverContext ctx, + Operation op) throws IOException { + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index c6b5b1d4c39..fba1225e35b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; @@ -618,6 +619,36 @@ public interface RegionObserver extends Coprocessor { void postBatchMutate(final ObserverContext c, final MiniBatchOperationInProgress miniBatchOp) throws IOException; + /** + * This will be called for region operations where read lock is acquired in + * {@link HRegion#startRegionOperation()}. + * @param ctx + * @param operation The operation is about to be taken on the region + * @throws IOException + */ + void postStartRegionOperation(final ObserverContext ctx, + Operation operation) throws IOException; + + /** + * Called after releasing read lock in {@link HRegion#closeRegionOperation(Operation)}. + * @param ctx + * @param operation + * @throws IOException + */ + void postCloseRegionOperation(final ObserverContext ctx, + Operation operation) throws IOException; + + /** + * Called after the completion of batch put/delete and will be called even if the batch operation + * fails + * @param ctx + * @param miniBatchOp + * @param success true if batch operation is successful otherwise false. + * @throws IOException + */ + void postBatchMutateIndispensably(final ObserverContext ctx, + MiniBatchOperationInProgress miniBatchOp, final boolean success) throws IOException; + /** * Called before checkAndPut *

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ddfecef1cbc..d7705551863 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -225,7 +225,7 @@ public class HRegion implements HeapSize { // , Writable{ * operations have to be defined here. It's only needed when a special check is need in * startRegionOperation */ - protected enum Operation { + public enum Operation { ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE, REPLAY_BATCH_MUTATE, COMPACT_REGION } @@ -1738,7 +1738,7 @@ public class HRegion implements HeapSize { // , Writable{ } return result; } finally { - closeRegionOperation(); + closeRegionOperation(Operation.GET); } } @@ -1778,7 +1778,7 @@ public class HRegion implements HeapSize { // , Writable{ } return instantiateRegionScanner(scan, additionalScanners); } finally { - closeRegionOperation(); + closeRegionOperation(Operation.SCAN); } } @@ -1831,7 +1831,7 @@ public class HRegion implements HeapSize { // , Writable{ // All edits for the given row (across all column families) must happen atomically. doBatchMutate(delete); } finally { - closeRegionOperation(); + closeRegionOperation(Operation.DELETE); } } @@ -1927,7 +1927,7 @@ public class HRegion implements HeapSize { // , Writable{ // All edits for the given row (across all column families) must happen atomically. doBatchMutate(put); } finally { - closeRegionOperation(); + closeRegionOperation(Operation.PUT); } } @@ -2077,11 +2077,9 @@ public class HRegion implements HeapSize { // , Writable{ checkResources(); long newSize; - if (batchOp.isInReplay()) { - startRegionOperation(Operation.REPLAY_BATCH_MUTATE); - } else { - startRegionOperation(Operation.BATCH_MUTATE); - } + Operation op = Operation.BATCH_MUTATE; + if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE; + startRegionOperation(op); try { if (!initialized) { @@ -2094,7 +2092,7 @@ public class HRegion implements HeapSize { // , Writable{ long addedSize = doMiniBatchMutation(batchOp); newSize = this.addAndGetGlobalMemstoreSize(addedSize); } finally { - closeRegionOperation(); + closeRegionOperation(op); } if (isFlushSize(newSize)) { requestFlush(); @@ -2476,6 +2474,16 @@ public class HRegion implements HeapSize { // , Writable{ } } } + if (coprocessorHost != null && !batchOp.isInReplay()) { + // call the coprocessor hook to do any finalization steps + // after the put is done + MiniBatchOperationInProgress miniBatchOp = + new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), + batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, + lastIndexExclusive); + coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success); + } + batchOp.nextIndexToProcess = lastIndexExclusive; } } @@ -3655,7 +3663,7 @@ public class HRegion implements HeapSize { // , Writable{ try { return nextRaw(outResults, limit); } finally { - closeRegionOperation(); + closeRegionOperation(Operation.SCAN); } } @@ -5015,7 +5023,7 @@ public class HRegion implements HeapSize { // , Writable{ if (w != null) { mvcc.completeMemstoreInsert(w); } - closeRegionOperation(); + closeRegionOperation(Operation.APPEND); } if (this.metricsRegion != null) { @@ -5193,7 +5201,7 @@ public class HRegion implements HeapSize { // , Writable{ if (w != null) { mvcc.completeMemstoreInsert(w); } - closeRegionOperation(); + closeRegionOperation(Operation.INCREMENT); if (this.metricsRegion != null) { this.metricsRegion.updateIncrement(); } @@ -5515,23 +5523,17 @@ public class HRegion implements HeapSize { // , Writable{ * modifies data. It has to be called just before a try. * #closeRegionOperation needs to be called in the try's finally block * Acquires a read lock and checks if the region is closing or closed. - * @throws NotServingRegionException when the region is closing or closed - * @throws RegionTooBusyException if failed to get the lock in time - * @throws InterruptedIOException if interrupted while waiting for a lock + * @throws IOException */ - public void startRegionOperation() - throws NotServingRegionException, RegionTooBusyException, InterruptedIOException { + public void startRegionOperation() throws IOException { startRegionOperation(Operation.ANY); } /** * @param op The operation is about to be taken on the region - * @throws NotServingRegionException - * @throws RegionTooBusyException - * @throws InterruptedIOException + * @throws IOException */ - protected void startRegionOperation(Operation op) throws NotServingRegionException, - RegionTooBusyException, InterruptedIOException { + protected void startRegionOperation(Operation op) throws IOException { switch (op) { case INCREMENT: case APPEND: @@ -5566,14 +5568,36 @@ public class HRegion implements HeapSize { // , Writable{ lock.readLock().unlock(); throw new NotServingRegionException(getRegionNameAsString() + " is closed"); } + try { + if (coprocessorHost != null) { + coprocessorHost.postStartRegionOperation(op); + } + } catch (Exception e) { + lock.readLock().unlock(); + throw new IOException(e); + } } /** * Closes the lock. This needs to be called in the finally block corresponding * to the try block of #startRegionOperation + * @throws IOException */ - public void closeRegionOperation() { + public void closeRegionOperation() throws IOException { + closeRegionOperation(Operation.ANY); + } + + /** + * Closes the lock. This needs to be called in the finally block corresponding + * to the try block of {@link #startRegionOperation(Operation)} + * @param operation + * @throws IOException + */ + public void closeRegionOperation(Operation operation) throws IOException { lock.readLock().unlock(); + if (coprocessorHost != null) { + coprocessorHost.postCloseRegionOperation(operation); + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 62b637d97e1..a05e0a89eb7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -1077,6 +1078,26 @@ public class RegionCoprocessorHost } } + public void postBatchMutateIndispensably( + final MiniBatchOperationInProgress miniBatchOp, final boolean success) + throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).postBatchMutateIndispensably(ctx, miniBatchOp, + success); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } + /** * @param row row to check * @param family column family @@ -1641,6 +1662,40 @@ public class RegionCoprocessorHost return hasLoaded; } + public void postStartRegionOperation(Operation op) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).postStartRegionOperation(ctx, op); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } + + public void postCloseRegionOperation(Operation op) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).postCloseRegionOperation(ctx, op); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } + /** * @param fs fileystem to read from * @param p path to the file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index d4830f00c04..4a8f2646f06 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.Leases; @@ -119,6 +120,9 @@ public class SimpleRegionObserver extends BaseRegionObserver { final AtomicInteger ctPreSplitAfterPONR = new AtomicInteger(0); final AtomicInteger ctPreStoreFileReaderOpen = new AtomicInteger(0); final AtomicInteger ctPostStoreFileReaderOpen = new AtomicInteger(0); + final AtomicInteger ctPostBatchMutateIndispensably = new AtomicInteger(0); + final AtomicInteger ctPostStartRegionOperation = new AtomicInteger(0); + final AtomicInteger ctPostCloseRegionOperation = new AtomicInteger(0); final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false); static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE"; @@ -463,6 +467,26 @@ public class SimpleRegionObserver extends BaseRegionObserver { ctPostBatchMutate.incrementAndGet(); } + @Override + public void postStartRegionOperation(final ObserverContext ctx, + Operation op) throws IOException { + ctPostStartRegionOperation.incrementAndGet(); + } + + @Override + public void postCloseRegionOperation(final ObserverContext ctx, + Operation op) throws IOException { + if (ctPostStartRegionOperation.get() > 0) { + ctPostCloseRegionOperation.incrementAndGet(); + } + } + + @Override + public void postBatchMutateIndispensably(final ObserverContext ctx, + MiniBatchOperationInProgress miniBatchOp, final boolean success) throws IOException { + ctPostBatchMutateIndispensably.incrementAndGet(); + } + @Override public void preGetClosestRowBefore(final ObserverContext c, final byte[] row, final byte[] family, final Result result) @@ -597,11 +621,31 @@ public class SimpleRegionObserver extends BaseRegionObserver { public boolean hadPostBatchMutate() { return ctPostBatchMutate.get() > 0; } - + + public boolean hadPostBatchMutateIndispensably() { + return ctPostBatchMutateIndispensably.get() > 0; + } + + public boolean hadPostStartRegionOperation() { + return ctPostStartRegionOperation.get() > 0; + } + + public boolean hadPostCloseRegionOperation() { + return ctPostCloseRegionOperation.get() > 0; + } + public boolean hadDelete() { return !(ctBeforeDelete.get() > 0); } + public int getCtPostStartRegionOperation() { + return ctPostStartRegionOperation.get(); + } + + public int getCtPostCloseRegionOperation() { + return ctPostCloseRegionOperation.get(); + } + public boolean hadPreIncrement() { return ctPreIncrement.get() > 0; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index 9f3e8b638b7..f0eeeea4178 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -115,11 +115,10 @@ public class TestRegionObserverInterface { // coprocessor. HTable table = util.createTable(tableName, new byte[][] {A, B, C}); try { - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadDelete"}, - tableName, - new Boolean[] {false, false, false, false, false}); + verifyMethodResult(SimpleRegionObserver.class, new String[] { "hadPreGet", "hadPostGet", + "hadPrePut", "hadPostPut", "hadDelete", "hadPostStartRegionOperation", + "hadPostCloseRegionOperation", "hadPostBatchMutateIndispensably" }, tableName, + new Boolean[] { false, false, false, false, false, false, false, false }); Put put = new Put(ROW); put.add(A, A, A); @@ -127,12 +126,11 @@ public class TestRegionObserverInterface { put.add(C, C, C); table.put(put); - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, - tableName, - new Boolean[] {false, false, true, true, true, true, false} - ); + verifyMethodResult(SimpleRegionObserver.class, new String[] { "hadPreGet", "hadPostGet", + "hadPrePut", "hadPostPut", "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete", + "hadPostStartRegionOperation", "hadPostCloseRegionOperation", + "hadPostBatchMutateIndispensably" }, TEST_TABLE, new Boolean[] { false, false, true, + true, true, true, false, true, true, true }); verifyMethodResult(SimpleRegionObserver.class, new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"},