HBASE-9261 Add cp hooks after {start|close}RegionOperation

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1550990 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
rajeshbabu 2013-12-15 02:02:29 +00:00
parent b5199318fa
commit 43841e340e
7 changed files with 206 additions and 38 deletions

View File

@ -104,8 +104,8 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
} }
} }
if (this.region != null) { if (this.region != null) {
this.region.closeRegionOperation();
try { try {
this.region.closeRegionOperation();
this.region.close(true); this.region.close(true);
this.region = null; this.region = null;
} catch (IOException ex) { } catch (IOException ex) {

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@ -321,6 +322,11 @@ public abstract class BaseRegionObserver implements RegionObserver {
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
} }
@Override
public void postBatchMutateIndispensably(final ObserverContext<RegionCoprocessorEnvironment> ctx,
MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
}
@Override @Override
public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> e, public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> e,
final byte [] row, final byte [] family, final byte [] qualifier, final byte [] row, final byte [] family, final byte [] qualifier,
@ -481,4 +487,14 @@ public abstract class BaseRegionObserver implements RegionObserver {
MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
return newCell; return newCell;
} }
@Override
public void postStartRegionOperation(final ObserverContext<RegionCoprocessorEnvironment> ctx,
Operation op) throws IOException {
}
@Override
public void postCloseRegionOperation(final ObserverContext<RegionCoprocessorEnvironment> ctx,
Operation op) throws IOException {
}
} }

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@ -618,6 +619,36 @@ public interface RegionObserver extends Coprocessor {
void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c, void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException; final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException;
/**
* This will be called for region operations where read lock is acquired in
* {@link HRegion#startRegionOperation()}.
* @param ctx
* @param operation The operation is about to be taken on the region
* @throws IOException
*/
void postStartRegionOperation(final ObserverContext<RegionCoprocessorEnvironment> ctx,
Operation operation) throws IOException;
/**
* Called after releasing read lock in {@link HRegion#closeRegionOperation(Operation)}.
* @param ctx
* @param operation
* @throws IOException
*/
void postCloseRegionOperation(final ObserverContext<RegionCoprocessorEnvironment> ctx,
Operation operation) throws IOException;
/**
* Called after the completion of batch put/delete and will be called even if the batch operation
* fails
* @param ctx
* @param miniBatchOp
* @param success true if batch operation is successful otherwise false.
* @throws IOException
*/
void postBatchMutateIndispensably(final ObserverContext<RegionCoprocessorEnvironment> ctx,
MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException;
/** /**
* Called before checkAndPut * Called before checkAndPut
* <p> * <p>

View File

@ -225,7 +225,7 @@ public class HRegion implements HeapSize { // , Writable{
* operations have to be defined here. It's only needed when a special check is need in * operations have to be defined here. It's only needed when a special check is need in
* startRegionOperation * startRegionOperation
*/ */
protected enum Operation { public enum Operation {
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE, ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
REPLAY_BATCH_MUTATE, COMPACT_REGION REPLAY_BATCH_MUTATE, COMPACT_REGION
} }
@ -1738,7 +1738,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
return result; return result;
} finally { } finally {
closeRegionOperation(); closeRegionOperation(Operation.GET);
} }
} }
@ -1778,7 +1778,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
return instantiateRegionScanner(scan, additionalScanners); return instantiateRegionScanner(scan, additionalScanners);
} finally { } finally {
closeRegionOperation(); closeRegionOperation(Operation.SCAN);
} }
} }
@ -1831,7 +1831,7 @@ public class HRegion implements HeapSize { // , Writable{
// All edits for the given row (across all column families) must happen atomically. // All edits for the given row (across all column families) must happen atomically.
doBatchMutate(delete); doBatchMutate(delete);
} finally { } finally {
closeRegionOperation(); closeRegionOperation(Operation.DELETE);
} }
} }
@ -1927,7 +1927,7 @@ public class HRegion implements HeapSize { // , Writable{
// All edits for the given row (across all column families) must happen atomically. // All edits for the given row (across all column families) must happen atomically.
doBatchMutate(put); doBatchMutate(put);
} finally { } finally {
closeRegionOperation(); closeRegionOperation(Operation.PUT);
} }
} }
@ -2077,11 +2077,9 @@ public class HRegion implements HeapSize { // , Writable{
checkResources(); checkResources();
long newSize; long newSize;
if (batchOp.isInReplay()) { Operation op = Operation.BATCH_MUTATE;
startRegionOperation(Operation.REPLAY_BATCH_MUTATE); if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE;
} else { startRegionOperation(op);
startRegionOperation(Operation.BATCH_MUTATE);
}
try { try {
if (!initialized) { if (!initialized) {
@ -2094,7 +2092,7 @@ public class HRegion implements HeapSize { // , Writable{
long addedSize = doMiniBatchMutation(batchOp); long addedSize = doMiniBatchMutation(batchOp);
newSize = this.addAndGetGlobalMemstoreSize(addedSize); newSize = this.addAndGetGlobalMemstoreSize(addedSize);
} finally { } finally {
closeRegionOperation(); closeRegionOperation(op);
} }
if (isFlushSize(newSize)) { if (isFlushSize(newSize)) {
requestFlush(); requestFlush();
@ -2476,6 +2474,16 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
} }
if (coprocessorHost != null && !batchOp.isInReplay()) {
// call the coprocessor hook to do any finalization steps
// after the put is done
MiniBatchOperationInProgress<Mutation> miniBatchOp =
new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
lastIndexExclusive);
coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
}
batchOp.nextIndexToProcess = lastIndexExclusive; batchOp.nextIndexToProcess = lastIndexExclusive;
} }
} }
@ -3655,7 +3663,7 @@ public class HRegion implements HeapSize { // , Writable{
try { try {
return nextRaw(outResults, limit); return nextRaw(outResults, limit);
} finally { } finally {
closeRegionOperation(); closeRegionOperation(Operation.SCAN);
} }
} }
@ -5015,7 +5023,7 @@ public class HRegion implements HeapSize { // , Writable{
if (w != null) { if (w != null) {
mvcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
} }
closeRegionOperation(); closeRegionOperation(Operation.APPEND);
} }
if (this.metricsRegion != null) { if (this.metricsRegion != null) {
@ -5193,7 +5201,7 @@ public class HRegion implements HeapSize { // , Writable{
if (w != null) { if (w != null) {
mvcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
} }
closeRegionOperation(); closeRegionOperation(Operation.INCREMENT);
if (this.metricsRegion != null) { if (this.metricsRegion != null) {
this.metricsRegion.updateIncrement(); this.metricsRegion.updateIncrement();
} }
@ -5515,23 +5523,17 @@ public class HRegion implements HeapSize { // , Writable{
* modifies data. It has to be called just before a try. * modifies data. It has to be called just before a try.
* #closeRegionOperation needs to be called in the try's finally block * #closeRegionOperation needs to be called in the try's finally block
* Acquires a read lock and checks if the region is closing or closed. * Acquires a read lock and checks if the region is closing or closed.
* @throws NotServingRegionException when the region is closing or closed * @throws IOException
* @throws RegionTooBusyException if failed to get the lock in time
* @throws InterruptedIOException if interrupted while waiting for a lock
*/ */
public void startRegionOperation() public void startRegionOperation() throws IOException {
throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
startRegionOperation(Operation.ANY); startRegionOperation(Operation.ANY);
} }
/** /**
* @param op The operation is about to be taken on the region * @param op The operation is about to be taken on the region
* @throws NotServingRegionException * @throws IOException
* @throws RegionTooBusyException
* @throws InterruptedIOException
*/ */
protected void startRegionOperation(Operation op) throws NotServingRegionException, protected void startRegionOperation(Operation op) throws IOException {
RegionTooBusyException, InterruptedIOException {
switch (op) { switch (op) {
case INCREMENT: case INCREMENT:
case APPEND: case APPEND:
@ -5566,14 +5568,36 @@ public class HRegion implements HeapSize { // , Writable{
lock.readLock().unlock(); lock.readLock().unlock();
throw new NotServingRegionException(getRegionNameAsString() + " is closed"); throw new NotServingRegionException(getRegionNameAsString() + " is closed");
} }
try {
if (coprocessorHost != null) {
coprocessorHost.postStartRegionOperation(op);
}
} catch (Exception e) {
lock.readLock().unlock();
throw new IOException(e);
}
} }
/** /**
* Closes the lock. This needs to be called in the finally block corresponding * Closes the lock. This needs to be called in the finally block corresponding
* to the try block of #startRegionOperation * to the try block of #startRegionOperation
* @throws IOException
*/ */
public void closeRegionOperation() { public void closeRegionOperation() throws IOException {
closeRegionOperation(Operation.ANY);
}
/**
* Closes the lock. This needs to be called in the finally block corresponding
* to the try block of {@link #startRegionOperation(Operation)}
* @param operation
* @throws IOException
*/
public void closeRegionOperation(Operation operation) throws IOException {
lock.readLock().unlock(); lock.readLock().unlock();
if (coprocessorHost != null) {
coprocessorHost.postCloseRegionOperation(operation);
}
} }
/** /**

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -1077,6 +1078,26 @@ public class RegionCoprocessorHost
} }
} }
public void postBatchMutateIndispensably(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
((RegionObserver) env.getInstance()).postBatchMutateIndispensably(ctx, miniBatchOp,
success);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
}
/** /**
* @param row row to check * @param row row to check
* @param family column family * @param family column family
@ -1641,6 +1662,40 @@ public class RegionCoprocessorHost
return hasLoaded; return hasLoaded;
} }
public void postStartRegionOperation(Operation op) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
((RegionObserver) env.getInstance()).postStartRegionOperation(ctx, op);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
}
public void postCloseRegionOperation(Operation op) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
((RegionObserver) env.getInstance()).postCloseRegionOperation(ctx, op);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
}
/** /**
* @param fs fileystem to read from * @param fs fileystem to read from
* @param p path to the file * @param p path to the file

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Leases; import org.apache.hadoop.hbase.regionserver.Leases;
@ -119,6 +120,9 @@ public class SimpleRegionObserver extends BaseRegionObserver {
final AtomicInteger ctPreSplitAfterPONR = new AtomicInteger(0); final AtomicInteger ctPreSplitAfterPONR = new AtomicInteger(0);
final AtomicInteger ctPreStoreFileReaderOpen = new AtomicInteger(0); final AtomicInteger ctPreStoreFileReaderOpen = new AtomicInteger(0);
final AtomicInteger ctPostStoreFileReaderOpen = new AtomicInteger(0); final AtomicInteger ctPostStoreFileReaderOpen = new AtomicInteger(0);
final AtomicInteger ctPostBatchMutateIndispensably = new AtomicInteger(0);
final AtomicInteger ctPostStartRegionOperation = new AtomicInteger(0);
final AtomicInteger ctPostCloseRegionOperation = new AtomicInteger(0);
final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false); final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE"; static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE";
@ -463,6 +467,26 @@ public class SimpleRegionObserver extends BaseRegionObserver {
ctPostBatchMutate.incrementAndGet(); ctPostBatchMutate.incrementAndGet();
} }
@Override
public void postStartRegionOperation(final ObserverContext<RegionCoprocessorEnvironment> ctx,
Operation op) throws IOException {
ctPostStartRegionOperation.incrementAndGet();
}
@Override
public void postCloseRegionOperation(final ObserverContext<RegionCoprocessorEnvironment> ctx,
Operation op) throws IOException {
if (ctPostStartRegionOperation.get() > 0) {
ctPostCloseRegionOperation.incrementAndGet();
}
}
@Override
public void postBatchMutateIndispensably(final ObserverContext<RegionCoprocessorEnvironment> ctx,
MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
ctPostBatchMutateIndispensably.incrementAndGet();
}
@Override @Override
public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c, public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
final byte[] row, final byte[] family, final Result result) final byte[] row, final byte[] family, final Result result)
@ -597,11 +621,31 @@ public class SimpleRegionObserver extends BaseRegionObserver {
public boolean hadPostBatchMutate() { public boolean hadPostBatchMutate() {
return ctPostBatchMutate.get() > 0; return ctPostBatchMutate.get() > 0;
} }
public boolean hadPostBatchMutateIndispensably() {
return ctPostBatchMutateIndispensably.get() > 0;
}
public boolean hadPostStartRegionOperation() {
return ctPostStartRegionOperation.get() > 0;
}
public boolean hadPostCloseRegionOperation() {
return ctPostCloseRegionOperation.get() > 0;
}
public boolean hadDelete() { public boolean hadDelete() {
return !(ctBeforeDelete.get() > 0); return !(ctBeforeDelete.get() > 0);
} }
public int getCtPostStartRegionOperation() {
return ctPostStartRegionOperation.get();
}
public int getCtPostCloseRegionOperation() {
return ctPostCloseRegionOperation.get();
}
public boolean hadPreIncrement() { public boolean hadPreIncrement() {
return ctPreIncrement.get() > 0; return ctPreIncrement.get() > 0;
} }

View File

@ -115,11 +115,10 @@ public class TestRegionObserverInterface {
// coprocessor. // coprocessor.
HTable table = util.createTable(tableName, new byte[][] {A, B, C}); HTable table = util.createTable(tableName, new byte[][] {A, B, C});
try { try {
verifyMethodResult(SimpleRegionObserver.class, verifyMethodResult(SimpleRegionObserver.class, new String[] { "hadPreGet", "hadPostGet",
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPrePut", "hadPostPut", "hadDelete", "hadPostStartRegionOperation",
"hadDelete"}, "hadPostCloseRegionOperation", "hadPostBatchMutateIndispensably" }, tableName,
tableName, new Boolean[] { false, false, false, false, false, false, false, false });
new Boolean[] {false, false, false, false, false});
Put put = new Put(ROW); Put put = new Put(ROW);
put.add(A, A, A); put.add(A, A, A);
@ -127,12 +126,11 @@ public class TestRegionObserverInterface {
put.add(C, C, C); put.add(C, C, C);
table.put(put); table.put(put);
verifyMethodResult(SimpleRegionObserver.class, verifyMethodResult(SimpleRegionObserver.class, new String[] { "hadPreGet", "hadPostGet",
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadPrePut", "hadPostPut", "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete",
"hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, "hadPostStartRegionOperation", "hadPostCloseRegionOperation",
tableName, "hadPostBatchMutateIndispensably" }, TEST_TABLE, new Boolean[] { false, false, true,
new Boolean[] {false, false, true, true, true, true, false} true, true, true, false, true, true, true });
);
verifyMethodResult(SimpleRegionObserver.class, verifyMethodResult(SimpleRegionObserver.class,
new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"}, new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"},