HBASE-11126-Add RegionObserver pre hooks that operate under row lock (Ram)

This commit is contained in:
Ramkrishna 2014-06-03 17:54:59 +05:30
parent 369141b795
commit 6a2467bbf2
7 changed files with 644 additions and 42 deletions

View File

@ -310,6 +310,12 @@ public abstract class BaseRegionObserver implements RegionObserver {
final WALEdit edit, final Durability durability) throws IOException { final WALEdit edit, final Durability durability) throws IOException {
} }
@Override
public void prePrepareTimeStampForDeleteVersion(
final ObserverContext<RegionCoprocessorEnvironment> e, final Mutation delete,
final Cell cell, final byte[] byteNow, final Get get) throws IOException {
}
@Override @Override
public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> e, public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> e,
final Delete delete, final WALEdit edit, final Durability durability) final Delete delete, final WALEdit edit, final Durability durability)
@ -339,6 +345,15 @@ public abstract class BaseRegionObserver implements RegionObserver {
return result; return result;
} }
@Override
public boolean preCheckAndPutAfterRowLock(
final ObserverContext<RegionCoprocessorEnvironment> e,
final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp,
final ByteArrayComparable comparator, final Put put,
final boolean result) throws IOException {
return result;
}
@Override @Override
public boolean postCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> e, public boolean postCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> e,
final byte [] row, final byte [] family, final byte [] qualifier, final byte [] row, final byte [] family, final byte [] qualifier,
@ -355,6 +370,15 @@ public abstract class BaseRegionObserver implements RegionObserver {
return result; return result;
} }
@Override
public boolean preCheckAndDeleteAfterRowLock(
final ObserverContext<RegionCoprocessorEnvironment> e,
final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp,
final ByteArrayComparable comparator, final Delete delete,
final boolean result) throws IOException {
return result;
}
@Override @Override
public boolean postCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> e, public boolean postCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> e,
final byte [] row, final byte [] family, final byte [] qualifier, final byte [] row, final byte [] family, final byte [] qualifier,
@ -369,6 +393,12 @@ public abstract class BaseRegionObserver implements RegionObserver {
return null; return null;
} }
@Override
public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
final Append append) throws IOException {
return null;
}
@Override @Override
public Result postAppend(final ObserverContext<RegionCoprocessorEnvironment> e, public Result postAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
final Append append, final Result result) throws IOException { final Append append, final Result result) throws IOException {
@ -396,6 +426,12 @@ public abstract class BaseRegionObserver implements RegionObserver {
return null; return null;
} }
@Override
public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
final Increment increment) throws IOException {
return null;
}
@Override @Override
public Result postIncrement(final ObserverContext<RegionCoprocessorEnvironment> e, public Result postIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
final Increment increment, final Result result) throws IOException { final Increment increment, final Result result) throws IOException {

View File

@ -587,6 +587,24 @@ public interface RegionObserver extends Coprocessor {
void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c, void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final Delete delete, final WALEdit edit, final Durability durability) final Delete delete, final WALEdit edit, final Durability durability)
throws IOException; throws IOException;
/**
* Called before the server updates the timestamp for version delete with latest timestamp.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param c the environment provided by the region server
* @param mutation - the parent mutation associated with this delete cell
* @param cell - The deleteColumn with latest version cell
* @param byteNow - timestamp bytes
* @param get - the get formed using the current cell's row.
* Note that the get does not specify the family and qualifier
* @throws IOException
*/
void prePrepareTimeStampForDeleteVersion(final ObserverContext<RegionCoprocessorEnvironment> c,
final Mutation mutation, final Cell cell, final byte[] byteNow,
final Get get) throws IOException;
/** /**
* Called after the client deletes a value. * Called after the client deletes a value.
@ -657,7 +675,7 @@ public interface RegionObserver extends Coprocessor {
MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException; MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException;
/** /**
* Called before checkAndPut * Called before checkAndPut.
* <p> * <p>
* Call CoprocessorEnvironment#bypass to skip default actions * Call CoprocessorEnvironment#bypass to skip default actions
* <p> * <p>
@ -681,6 +699,34 @@ public interface RegionObserver extends Coprocessor {
final Put put, final boolean result) final Put put, final boolean result)
throws IOException; throws IOException;
/**
* Called before checkAndPut but after acquiring rowlock.
* <p>
* <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
* can lead to potential deadlock.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param c the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param compareOp the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds
* @param result
* @return the return value to return to client if bypassing default
* processing
* @throws IOException if an error occurred on the coprocessor
*/
boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp,
final ByteArrayComparable comparator, final Put put,
final boolean result) throws IOException;
/** /**
* Called after checkAndPut * Called after checkAndPut
* <p> * <p>
@ -704,7 +750,7 @@ public interface RegionObserver extends Coprocessor {
throws IOException; throws IOException;
/** /**
* Called before checkAndDelete * Called before checkAndDelete.
* <p> * <p>
* Call CoprocessorEnvironment#bypass to skip default actions * Call CoprocessorEnvironment#bypass to skip default actions
* <p> * <p>
@ -727,6 +773,33 @@ public interface RegionObserver extends Coprocessor {
final Delete delete, final boolean result) final Delete delete, final boolean result)
throws IOException; throws IOException;
/**
* Called before checkAndDelete but after acquiring rowock.
* <p>
* <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
* can lead to potential deadlock.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param c the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param compareOp the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds
* @param result
* @return the value to return to client if bypassing default processing
* @throws IOException if an error occurred on the coprocessor
*/
boolean preCheckAndDeleteAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp,
final ByteArrayComparable comparator, final Delete delete,
final boolean result) throws IOException;
/** /**
* Called after checkAndDelete * Called after checkAndDelete
* <p> * <p>
@ -795,7 +868,7 @@ public interface RegionObserver extends Coprocessor {
throws IOException; throws IOException;
/** /**
* Called before Append * Called before Append.
* <p> * <p>
* Call CoprocessorEnvironment#bypass to skip default actions * Call CoprocessorEnvironment#bypass to skip default actions
* <p> * <p>
@ -810,6 +883,25 @@ public interface RegionObserver extends Coprocessor {
final Append append) final Append append)
throws IOException; throws IOException;
/**
* Called before Append but after acquiring rowlock.
* <p>
* <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
* can lead to potential deadlock.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param c the environment provided by the region server
* @param append Append object
* @return result to return to the client if bypassing default processing
* @throws IOException if an error occurred on the coprocessor
*/
Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
final Append append) throws IOException;
/** /**
* Called after Append * Called after Append
* <p> * <p>
@ -826,7 +918,7 @@ public interface RegionObserver extends Coprocessor {
throws IOException; throws IOException;
/** /**
* Called before Increment * Called before Increment.
* <p> * <p>
* Call CoprocessorEnvironment#bypass to skip default actions * Call CoprocessorEnvironment#bypass to skip default actions
* <p> * <p>
@ -841,6 +933,28 @@ public interface RegionObserver extends Coprocessor {
final Increment increment) final Increment increment)
throws IOException; throws IOException;
/**
* Called before Increment but after acquiring rowlock.
* <p>
* <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
* can lead to potential deadlock.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors
*
* @param c
* the environment provided by the region server
* @param increment
* increment object
* @return result to return to the client if bypassing default processing
* @throws IOException
* if an error occurred on the coprocessor
*/
Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
final Increment increment) throws IOException;
/** /**
* Called after increment * Called after increment
* <p> * <p>

View File

@ -2030,10 +2030,13 @@ public class HRegion implements HeapSize { // , Writable{
/** /**
* Setup correct timestamps in the KVs in Delete object. * Setup correct timestamps in the KVs in Delete object.
* Caller should have the row and region locks. * Caller should have the row and region locks.
* @param mutation
* @param familyMap
* @param byteNow
* @throws IOException * @throws IOException
*/ */
void prepareDeleteTimestamps(Map<byte[], List<Cell>> familyMap, byte[] byteNow) void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
throws IOException { byte[] byteNow) throws IOException {
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey(); byte[] family = e.getKey();
@ -2059,20 +2062,14 @@ public class HRegion implements HeapSize { // , Writable{
Get get = new Get(CellUtil.cloneRow(kv)); Get get = new Get(CellUtil.cloneRow(kv));
get.setMaxVersions(count); get.setMaxVersions(count);
get.addColumn(family, qual); get.addColumn(family, qual);
if (coprocessorHost != null) {
List<Cell> result = get(get, false); if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell, byteNow,
get)) {
if (result.size() < count) { updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
// Nothing to delete }
kv.updateLatestStamp(byteNow); } else {
continue; updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
} }
if (result.size() > count) {
throw new RuntimeException("Unexpected size: " + result.size());
}
KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
} else { } else {
kv.updateLatestStamp(byteNow); kv.updateLatestStamp(byteNow);
} }
@ -2080,6 +2077,23 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
void updateDeleteLatestVersionTimeStamp(KeyValue kv, Get get, int count, byte[] byteNow)
throws IOException {
List<Cell> result = get(get, false);
if (result.size() < count) {
// Nothing to delete
kv.updateLatestStamp(byteNow);
return;
}
if (result.size() > count) {
throw new RuntimeException("Unexpected size: " + result.size());
}
KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), getkv.getBuffer(),
getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
}
/** /**
* @throws IOException * @throws IOException
*/ */
@ -2452,7 +2466,9 @@ public class HRegion implements HeapSize { // , Writable{
updateKVTimestamps(familyMaps[i].values(), byteNow); updateKVTimestamps(familyMaps[i].values(), byteNow);
noOfPuts++; noOfPuts++;
} else { } else {
prepareDeleteTimestamps(familyMaps[i], byteNow); if (!isInReplay) {
prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
}
noOfDeletes++; noOfDeletes++;
} }
} }
@ -2712,9 +2728,21 @@ public class HRegion implements HeapSize { // , Writable{
RowLock rowLock = getRowLock(get.getRow()); RowLock rowLock = getRowLock(get.getRow());
// wait for all previous transactions to complete (with lock held) // wait for all previous transactions to complete (with lock held)
mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
List<Cell> result;
try { try {
result = get(get, false); if (this.getCoprocessorHost() != null) {
Boolean processed = null;
if (w instanceof Put) {
processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
qualifier, compareOp, comparator, (Put) w);
} else if (w instanceof Delete) {
processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
qualifier, compareOp, comparator, (Delete) w);
}
if (processed != null) {
return processed;
}
}
List<Cell> result = get(get, false);
boolean valueIsNull = comparator.getValue() == null || boolean valueIsNull = comparator.getValue() == null ||
comparator.getValue().length == 0; comparator.getValue().length == 0;
@ -5033,12 +5061,18 @@ public class HRegion implements HeapSize { // , Writable{
rowLock = getRowLock(row); rowLock = getRowLock(row);
try { try {
lock(this.updatesLock.readLock()); lock(this.updatesLock.readLock());
// wait for all prior MVCC transactions to finish - while we hold the row lock
// (so that we are guaranteed to see the latest state)
mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
// now start my own transaction
w = mvcc.beginMemstoreInsert();
try { try {
// wait for all prior MVCC transactions to finish - while we hold the row lock
// (so that we are guaranteed to see the latest state)
mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
if (this.coprocessorHost != null) {
Result r = this.coprocessorHost.preAppendAfterRowLock(append);
if(r!= null) {
return r;
}
}
// now start my own transaction
w = mvcc.beginMemstoreInsert();
long now = EnvironmentEdgeManager.currentTimeMillis(); long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family // Process each family
for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) { for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
@ -5221,12 +5255,18 @@ public class HRegion implements HeapSize { // , Writable{
RowLock rowLock = getRowLock(row); RowLock rowLock = getRowLock(row);
try { try {
lock(this.updatesLock.readLock()); lock(this.updatesLock.readLock());
// wait for all prior MVCC transactions to finish - while we hold the row lock
// (so that we are guaranteed to see the latest state)
mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
// now start my own transaction
w = mvcc.beginMemstoreInsert();
try { try {
// wait for all prior MVCC transactions to finish - while we hold the row lock
// (so that we are guaranteed to see the latest state)
mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
if (this.coprocessorHost != null) {
Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
if (r != null) {
return r;
}
}
// now start my own transaction
w = mvcc.beginMemstoreInsert();
long now = EnvironmentEdgeManager.currentTimeMillis(); long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family // Process each family
for (Map.Entry<byte [], List<Cell>> family: for (Map.Entry<byte [], List<Cell>> family:

View File

@ -27,9 +27,9 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorRequest; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorRequest;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorResponse; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorResponse;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -80,7 +80,7 @@ MultiRowMutationProcessorResponse> {
} else if (m instanceof Delete) { } else if (m instanceof Delete) {
Delete d = (Delete) m; Delete d = (Delete) m;
region.prepareDelete(d); region.prepareDelete(d);
region.prepareDeleteTimestamps(d.getFamilyCellMap(), byteNow); region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow);
} else { } else {
throw new DoNotRetryIOException( throw new DoNotRetryIOException(
"Action must be Put or Delete. But was: " "Action must be Put or Delete. But was: "

View File

@ -1134,6 +1134,44 @@ public class RegionCoprocessorHost
return bypass; return bypass;
} }
/**
* @param mutation - the current mutation
* @param kv - the current cell
* @param byteNow - current timestamp in bytes
* @param get - the get that could be used
* Note that the get only does not specify the family and qualifier that should be used
* @return true if default processing should be bypassed
* @exception IOException
* Exception
*/
public boolean prePrepareTimeStampForDeleteVersion(Mutation mutation,
Cell kv, byte[] byteNow, Get get) throws IOException {
boolean bypass = false;
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
((RegionObserver) env.getInstance())
.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv,
byteNow, get);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
}
bypass |= ctx.shouldBypass();
if (ctx.shouldComplete()) {
break;
}
}
}
return bypass;
}
/** /**
* @param put The Put object * @param put The Put object
* @param edit The WALEdit object. * @param edit The WALEdit object.
@ -1349,6 +1387,46 @@ public class RegionCoprocessorHost
return bypass ? result : null; return bypass ? result : null;
} }
/**
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param compareOp the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds
* @return true or false to return to client if default processing should
* be bypassed, or null otherwise
* @throws IOException e
*/
public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
final Put put) throws IOException {
boolean bypass = false;
boolean result = false;
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
result = ((RegionObserver) env.getInstance()).preCheckAndPutAfterRowLock(ctx, row,
family, qualifier, compareOp, comparator, put, result);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
}
bypass |= ctx.shouldBypass();
if (ctx.shouldComplete()) {
break;
}
}
}
return bypass ? result : null;
}
/** /**
* @param row row to check * @param row row to check
* @param family column family * @param family column family
@ -1427,6 +1505,46 @@ public class RegionCoprocessorHost
return bypass ? result : null; return bypass ? result : null;
} }
/**
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param compareOp the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds
* @return true or false to return to client if default processing should
* be bypassed, or null otherwise
* @throws IOException e
*/
public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
final Delete delete) throws IOException {
boolean bypass = false;
boolean result = false;
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
result = ((RegionObserver) env.getInstance()).preCheckAndDeleteAfterRowLock(ctx, row,
family, qualifier, compareOp, comparator, delete, result);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
}
bypass |= ctx.shouldBypass();
if (ctx.shouldComplete()) {
break;
}
}
}
return bypass ? result : null;
}
/** /**
* @param row row to check * @param row row to check
* @param family column family * @param family column family
@ -1495,6 +1613,38 @@ public class RegionCoprocessorHost
return bypass ? result : null; return bypass ? result : null;
} }
/**
* @param append append object
* @return result to return to client if default operation should be
* bypassed, null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public Result preAppendAfterRowLock(final Append append) throws IOException {
boolean bypass = false;
Result result = null;
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
result = ((RegionObserver) env.getInstance()).preAppendAfterRowLock(ctx, append);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
}
bypass |= ctx.shouldBypass();
if (ctx.shouldComplete()) {
break;
}
}
}
return bypass ? result : null;
}
/** /**
* @param increment increment object * @param increment increment object
* @return result to return to client if default operation should be * @return result to return to client if default operation should be
@ -1527,6 +1677,38 @@ public class RegionCoprocessorHost
return bypass ? result : null; return bypass ? result : null;
} }
/**
* @param increment increment object
* @return result to return to client if default operation should be
* bypassed, null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
boolean bypass = false;
Result result = null;
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
result = ((RegionObserver) env.getInstance()).preIncrementAfterRowLock(ctx, increment);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
currentThread.setContextClassLoader(cl);
}
bypass |= ctx.shouldBypass();
if (ctx.shouldComplete()) {
break;
}
}
}
return bypass ? result : null;
}
/** /**
* @param append Append object * @param append Append object
* @param result the result returned by the append * @param result the result returned by the append

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
@ -48,6 +49,8 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -96,11 +99,22 @@ public class SimpleRegionObserver extends BaseRegionObserver {
final AtomicInteger ctPrePut = new AtomicInteger(0); final AtomicInteger ctPrePut = new AtomicInteger(0);
final AtomicInteger ctPostPut = new AtomicInteger(0); final AtomicInteger ctPostPut = new AtomicInteger(0);
final AtomicInteger ctPreDeleted = new AtomicInteger(0); final AtomicInteger ctPreDeleted = new AtomicInteger(0);
final AtomicInteger ctPrePrepareDeleteTS = new AtomicInteger(0);
final AtomicInteger ctPostDeleted = new AtomicInteger(0); final AtomicInteger ctPostDeleted = new AtomicInteger(0);
final AtomicInteger ctPreGetClosestRowBefore = new AtomicInteger(0); final AtomicInteger ctPreGetClosestRowBefore = new AtomicInteger(0);
final AtomicInteger ctPostGetClosestRowBefore = new AtomicInteger(0); final AtomicInteger ctPostGetClosestRowBefore = new AtomicInteger(0);
final AtomicInteger ctPreIncrement = new AtomicInteger(0); final AtomicInteger ctPreIncrement = new AtomicInteger(0);
final AtomicInteger ctPreIncrementAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPreAppend = new AtomicInteger(0);
final AtomicInteger ctPreAppendAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPostIncrement = new AtomicInteger(0); final AtomicInteger ctPostIncrement = new AtomicInteger(0);
final AtomicInteger ctPostAppend = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndPut = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndPutAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPostCheckAndPut = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndDelete = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndDeleteAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0);
final AtomicInteger ctPreWALRestored = new AtomicInteger(0); final AtomicInteger ctPreWALRestored = new AtomicInteger(0);
final AtomicInteger ctPostWALRestored = new AtomicInteger(0); final AtomicInteger ctPostWALRestored = new AtomicInteger(0);
final AtomicInteger ctPreScannerNext = new AtomicInteger(0); final AtomicInteger ctPreScannerNext = new AtomicInteger(0);
@ -434,6 +448,12 @@ public class SimpleRegionObserver extends BaseRegionObserver {
} }
} }
@Override
public void prePrepareTimeStampForDeleteVersion(ObserverContext<RegionCoprocessorEnvironment> e,
Mutation delete, Cell cell, byte[] byteNow, Get get) throws IOException {
ctPrePrepareDeleteTS.incrementAndGet();
}
@Override @Override
public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c, public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final Delete delete, final WALEdit edit, final Delete delete, final WALEdit edit,
@ -520,6 +540,13 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return null; return null;
} }
@Override
public Result preIncrementAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
Increment increment) throws IOException {
ctPreIncrementAfterRowLock.incrementAndGet();
return null;
}
@Override @Override
public Result postIncrement(final ObserverContext<RegionCoprocessorEnvironment> c, public Result postIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
final Increment increment, final Result result) throws IOException { final Increment increment, final Result result) throws IOException {
@ -527,6 +554,75 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return result; return result;
} }
@Override
public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
Put put, boolean result) throws IOException {
ctPreCheckAndPut.incrementAndGet();
return true;
}
@Override
public boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
ByteArrayComparable comparator, Put put, boolean result) throws IOException {
ctPreCheckAndPutAfterRowLock.incrementAndGet();
return true;
}
@Override
public boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
Put put, boolean result) throws IOException {
ctPostCheckAndPut.incrementAndGet();
return true;
}
@Override
public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
Delete delete, boolean result) throws IOException {
ctPreCheckAndDelete.incrementAndGet();
return true;
}
@Override
public boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
ByteArrayComparable comparator, Delete delete, boolean result) throws IOException {
ctPreCheckAndDeleteAfterRowLock.incrementAndGet();
return true;
}
@Override
public boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
Delete delete, boolean result) throws IOException {
ctPostCheckAndDelete.incrementAndGet();
return true;
}
@Override
public Result preAppendAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
Append append) throws IOException {
ctPreAppendAfterRowLock.incrementAndGet();
return null;
}
@Override
public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
throws IOException {
ctPreAppend.incrementAndGet();
return null;
}
@Override
public Result postAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append,
Result result) throws IOException {
ctPostAppend.incrementAndGet();
return null;
}
@Override @Override
public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx, public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths) throws IOException { List<Pair<byte[], String>> familyPaths) throws IOException {
@ -646,14 +742,58 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return ctPostCloseRegionOperation.get(); return ctPostCloseRegionOperation.get();
} }
public boolean hadPreCheckAndPut() {
return ctPreCheckAndPut.get() > 0;
}
public boolean hadPreCheckAndPutAfterRowLock() {
return ctPreCheckAndPutAfterRowLock.get() > 0;
}
public boolean hadPostCheckAndPut() {
return ctPostCheckAndPut.get() > 0;
}
public boolean hadPreCheckAndDelete() {
return ctPreCheckAndDelete.get() > 0;
}
public boolean hadPreCheckAndDeleteAfterRowLock() {
return ctPreCheckAndDeleteAfterRowLock.get() > 0;
}
public boolean hadPostCheckAndDelete() {
return ctPostCheckAndDelete.get() > 0;
}
public boolean hadPreIncrement() { public boolean hadPreIncrement() {
return ctPreIncrement.get() > 0; return ctPreIncrement.get() > 0;
} }
public boolean hadPreIncrementAfterRowLock() {
return ctPreIncrementAfterRowLock.get() > 0;
}
public boolean hadPostIncrement() { public boolean hadPostIncrement() {
return ctPostIncrement.get() > 0; return ctPostIncrement.get() > 0;
} }
public boolean hadPreAppend() {
return ctPreAppend.get() > 0;
}
public boolean hadPreAppendAfterRowLock() {
return ctPreAppendAfterRowLock.get() > 0;
}
public boolean hadPostAppend() {
return ctPostAppend.get() > 0;
}
public boolean hadPrePreparedDeleteTS() {
return ctPrePrepareDeleteTS.get() > 0;
}
public boolean hadPreWALRestored() { public boolean hadPreWALRestored() {
return ctPreWALRestored.get() > 0; return ctPreWALRestored.get() > 0;
} }

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
@ -145,9 +146,9 @@ public class TestRegionObserverInterface {
verifyMethodResult(SimpleRegionObserver.class, verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDelete"}, "hadDelete", "hadPrePreparedDeleteTS"},
tableName, tableName,
new Boolean[] {true, true, true, true, false} new Boolean[] {true, true, true, true, false, false}
); );
Delete delete = new Delete(ROW); Delete delete = new Delete(ROW);
@ -158,9 +159,9 @@ public class TestRegionObserverInterface {
verifyMethodResult(SimpleRegionObserver.class, verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete", "hadPrePreparedDeleteTS"},
tableName, tableName,
new Boolean[] {true, true, true, true, true, true, true} new Boolean[] {true, true, true, true, true, true, true, true}
); );
} finally { } finally {
util.deleteTable(tableName); util.deleteTable(tableName);
@ -218,17 +219,106 @@ public class TestRegionObserverInterface {
inc.addColumn(A, A, 1); inc.addColumn(A, A, 1);
verifyMethodResult(SimpleRegionObserver.class, verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreIncrement", "hadPostIncrement"}, new String[] {"hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock"},
tableName, tableName,
new Boolean[] {false, false} new Boolean[] {false, false, false}
); );
table.increment(inc); table.increment(inc);
verifyMethodResult(SimpleRegionObserver.class, verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreIncrement", "hadPostIncrement"}, new String[] {"hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock"},
tableName, tableName,
new Boolean[] {true, true} new Boolean[] {true, true, true}
);
} finally {
util.deleteTable(tableName);
table.close();
}
}
@Test
public void testCheckAndPutHooks() throws IOException {
TableName tableName =
TableName.valueOf(TEST_TABLE.getNameAsString() + ".testCheckAndPutHooks");
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
try {
Put p = new Put(Bytes.toBytes(0));
p.add(A, A, A);
table.put(p);
table.flushCommits();
p = new Put(Bytes.toBytes(0));
p.add(A, A, A);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreCheckAndPut",
"hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut"},
tableName,
new Boolean[] {false, false, false}
);
table.checkAndPut(Bytes.toBytes(0), A, A, A, p);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreCheckAndPut",
"hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut"},
tableName,
new Boolean[] {true, true, true}
);
} finally {
util.deleteTable(tableName);
table.close();
}
}
@Test
public void testCheckAndDeleteHooks() throws IOException {
TableName tableName =
TableName.valueOf(TEST_TABLE.getNameAsString() + ".testCheckAndDeleteHooks");
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
try {
Put p = new Put(Bytes.toBytes(0));
p.add(A, A, A);
table.put(p);
table.flushCommits();
Delete d = new Delete(Bytes.toBytes(0));
table.delete(d);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreCheckAndDelete",
"hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete"},
tableName,
new Boolean[] {false, false, false}
);
table.checkAndDelete(Bytes.toBytes(0), A, A, A, d);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreCheckAndDelete",
"hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete"},
tableName,
new Boolean[] {true, true, true}
);
} finally {
util.deleteTable(tableName);
table.close();
}
}
@Test
public void testAppendHook() throws IOException {
TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testAppendHook");
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
try {
Append app = new Append(Bytes.toBytes(0));
app.add(A, A, A);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock"},
tableName,
new Boolean[] {false, false, false}
);
table.append(app);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock"},
tableName,
new Boolean[] {true, true, true}
); );
} finally { } finally {
util.deleteTable(tableName); util.deleteTable(tableName);