HBASE-25574 Revisit put/delete/increment/append related RegionObserver methods (#2953)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Toshihiro Suzuki 2021-02-22 09:37:30 +09:00 committed by GitHub
parent dc993f34ae
commit d8b86627ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 210 additions and 55 deletions

View File

@ -374,10 +374,30 @@ public interface RegionObserver {
* @param put The Put object
* @param edit The WALEdit object that will be written to the wal
* @param durability Persistence guarantee for this Put
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #prePut(ObserverContext, Put, WALEdit)} instead.
*/
@Deprecated
default void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
Durability durability) throws IOException {}
/**
* Called before the client stores a value.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions.
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
* <p>
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param put The Put object
* @param edit The WALEdit object that will be written to the wal
*/
default void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit)
throws IOException {
prePut(c, put, edit, put.getDurability());
}
/**
* Called after the client stores a value.
* <p>
@ -387,10 +407,27 @@ public interface RegionObserver {
* @param put The Put object
* @param edit The WALEdit object for the wal
* @param durability Persistence guarantee for this Put
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #postPut(ObserverContext, Put, WALEdit)} instead.
*/
@Deprecated
default void postPut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
Durability durability) throws IOException {}
/**
* Called after the client stores a value.
* <p>
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param put The Put object
* @param edit The WALEdit object for the wal
*/
default void postPut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit)
throws IOException {
postPut(c, put, edit, put.getDurability());
}
/**
* Called before the client deletes a value.
* <p>
@ -403,10 +440,30 @@ public interface RegionObserver {
* @param delete The Delete object
* @param edit The WALEdit object for the wal
* @param durability Persistence guarantee for this Delete
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #preDelete(ObserverContext, Delete, WALEdit)} instead.
*/
@Deprecated
default void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
WALEdit edit, Durability durability) throws IOException {}
/**
* Called before the client deletes a value.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions.
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
* <p>
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param delete The Delete object
* @param edit The WALEdit object for the wal
*/
default void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
WALEdit edit) throws IOException {
preDelete(c, delete, edit, delete.getDurability());
}
/**
* Called before the server updates the timestamp for version delete with latest timestamp.
* <p>
@ -434,10 +491,27 @@ public interface RegionObserver {
* @param delete The Delete object
* @param edit The WALEdit object for the wal
* @param durability Persistence guarantee for this Delete
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #postDelete(ObserverContext, Delete, WALEdit)} instead.
*/
@Deprecated
default void postDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
WALEdit edit, Durability durability) throws IOException {}
/**
* Called after the client deletes a value.
* <p>
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param delete The Delete object
* @param edit The WALEdit object for the wal
*/
default void postDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
WALEdit edit) throws IOException {
postDelete(c, delete, edit, delete.getDurability());
}
/**
* This will be called for every batch mutation operation happening at the server. This will be
* called after acquiring the locks on the mutating rows and after applying the proper timestamp
@ -457,10 +531,10 @@ public interface RegionObserver {
/**
* This will be called after applying a batch of Mutations on a region. The Mutations are added
* to memstore and WAL. The difference of this one with
* {@link #postPut(ObserverContext, Put, WALEdit, Durability)}
* and {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)}
* and {@link #postIncrement(ObserverContext, Increment, Result)}
* and {@link #postAppend(ObserverContext, Append, Result)} is
* {@link #postPut(ObserverContext, Put, WALEdit)}
* and {@link #postDelete(ObserverContext, Delete, WALEdit)}
* and {@link #postIncrement(ObserverContext, Increment, Result, WALEdit)}
* and {@link #postAppend(ObserverContext, Append, Result, WALEdit)} is
* this hook will be executed before the mvcc transaction completion.
* <p>
* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
@ -968,12 +1042,33 @@ public interface RegionObserver {
* @param c the environment provided by the region server
* @param append Append object
* @return result to return to the client if bypassing default processing
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #preAppend(ObserverContext, Append, WALEdit)} instead.
*/
@Deprecated
default Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
throws IOException {
throws IOException {
return null;
}
/**
* Called before Append.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions.
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
* <p>
* Note: Do not retain references to any Cells in 'append' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param append Append object
* @param edit The WALEdit object that will be written to the wal
* @return result to return to the client if bypassing default processing
*/
default Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append,
WALEdit edit) throws IOException {
return preAppend(c, append);
}
/**
* Called before Append but after acquiring rowlock.
* <p>
@ -989,9 +1084,12 @@ public interface RegionObserver {
* @param c the environment provided by the region server
* @param append Append object
* @return result to return to the client if bypassing default processing
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #preBatchMutate(ObserverContext, MiniBatchOperationInProgress)} instead.
*/
@Deprecated
default Result preAppendAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
Append append) throws IOException {
Append append) throws IOException {
return null;
}
@ -1004,10 +1102,29 @@ public interface RegionObserver {
* @param append Append object
* @param result the result returned by increment
* @return the result to return to the client
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #postAppend(ObserverContext, Append, Result, WALEdit)} instead.
*/
@Deprecated
default Result postAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append,
Result result) throws IOException {
return result;
}
/**
* Called after Append
* <p>
* Note: Do not retain references to any Cells in 'append' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param append Append object
* @param result the result returned by increment
* @param edit The WALEdit object for the wal
* @return the result to return to the client
*/
default Result postAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append,
Result result) throws IOException {
return result;
Result result, WALEdit edit) throws IOException {
return postAppend(c, append, result);
}
/**
@ -1021,12 +1138,33 @@ public interface RegionObserver {
* @param c the environment provided by the region server
* @param increment increment object
* @return result to return to the client if bypassing default processing
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #preIncrement(ObserverContext, Increment, WALEdit)} instead.
*/
@Deprecated
default Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
throws IOException {
throws IOException {
return null;
}
/**
* Called before Increment.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions.
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
* <p>
* Note: Do not retain references to any Cells in 'increment' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param increment increment object
* @param edit The WALEdit object that will be written to the wal
* @return result to return to the client if bypassing default processing
*/
default Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment,
WALEdit edit) throws IOException {
return preIncrement(c, increment);
}
/**
* Called before Increment but after acquiring rowlock.
* <p>
@ -1040,15 +1178,15 @@ public interface RegionObserver {
* Note: Do not retain references to any Cells in 'increment' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
*
* @param c
* the environment provided by the region server
* @param increment
* increment object
* @param c the environment provided by the region server
* @param increment increment object
* @return result to return to the client if bypassing default processing
* if an error occurred on the coprocessor
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #preBatchMutate(ObserverContext, MiniBatchOperationInProgress)} instead.
*/
@Deprecated
default Result preIncrementAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
Increment increment) throws IOException {
Increment increment) throws IOException {
return null;
}
@ -1061,10 +1199,29 @@ public interface RegionObserver {
* @param increment increment object
* @param result the result returned by increment
* @return the result to return to the client
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #postIncrement(ObserverContext, Increment, Result, WALEdit)} instead.
*/
@Deprecated
default Result postIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment,
Result result) throws IOException {
return result;
}
/**
* Called after increment
* <p>
* Note: Do not retain references to any Cells in 'increment' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param increment increment object
* @param result the result returned by increment
* @param edit The WALEdit object for the wal
* @return the result to return to the client
*/
default Result postIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment,
Result result) throws IOException {
return result;
Result result, WALEdit edit) throws IOException {
return postIncrement(c, increment, result);
}
/**

View File

@ -4197,18 +4197,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.SUCCESS) {
Mutation m = getMutation(i);
if (m instanceof Put) {
region.coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
region.coprocessorHost.postPut((Put) m, walEdit);
} else if (m instanceof Delete) {
region.coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
region.coprocessorHost.postDelete((Delete) m, walEdit);
} else if (m instanceof Increment) {
Result result = region.getCoprocessorHost().postIncrement((Increment) m,
results[i]);
results[i], walEdit);
if (result != results[i]) {
retCodeDetails[i] =
new OperationStatus(retCodeDetails[i].getOperationStatusCode(), result);
}
} else if (m instanceof Append) {
Result result = region.getCoprocessorHost().postAppend((Append) m, results[i]);
Result result = region.getCoprocessorHost().postAppend((Append) m, results[i],
walEdit);
if (result != results[i]) {
retCodeDetails[i] =
new OperationStatus(retCodeDetails[i].getOperationStatusCode(), result);
@ -4270,7 +4271,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throws IOException {
Mutation m = getMutation(index);
if (m instanceof Put) {
if (region.coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
if (region.coprocessorHost.prePut((Put) m, walEdit)) {
// pre hook says skip this Put
// mark as success and skip in doMiniBatchMutation
metrics[0]++;
@ -4284,7 +4285,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Can this be avoided?
region.prepareDelete(curDel);
}
if (region.coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
if (region.coprocessorHost.preDelete(curDel, walEdit)) {
// pre hook says skip this Delete
// mark as success and skip in doMiniBatchMutation
metrics[1]++;
@ -4292,7 +4293,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
} else if (m instanceof Increment) {
Increment increment = (Increment) m;
Result result = region.coprocessorHost.preIncrement(increment);
Result result = region.coprocessorHost.preIncrement(increment, walEdit);
if (result != null) {
// pre hook says skip this Increment
// mark as success and skip in doMiniBatchMutation
@ -4301,7 +4302,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
} else if (m instanceof Append) {
Append append = (Append) m;
Result result = region.coprocessorHost.preAppend(append);
Result result = region.coprocessorHost.preAppend(append, walEdit);
if (result != null) {
// pre hook says skip this Append
// mark as success and skip in doMiniBatchMutation

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
@ -902,12 +901,10 @@ public class RegionCoprocessorHost
* Supports Coprocessor 'bypass'.
* @param put The Put object
* @param edit The WALEdit object.
* @param durability The durability used
* @return true if default processing should be bypassed
* @exception IOException Exception
*/
public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
throws IOException {
public boolean prePut(final Put put, final WALEdit edit) throws IOException {
if (coprocEnvironments.isEmpty()) {
return false;
}
@ -915,7 +912,7 @@ public class RegionCoprocessorHost
return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.prePut(this, put, edit, durability);
observer.prePut(this, put, edit);
}
});
}
@ -949,18 +946,16 @@ public class RegionCoprocessorHost
/**
* @param put The Put object
* @param edit The WALEdit object.
* @param durability The durability used
* @exception IOException Exception
*/
public void postPut(final Put put, final WALEdit edit, final Durability durability)
throws IOException {
public void postPut(final Put put, final WALEdit edit) throws IOException {
if (coprocEnvironments.isEmpty()) {
return;
}
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postPut(this, put, edit, durability);
observer.postPut(this, put, edit);
}
});
}
@ -969,12 +964,10 @@ public class RegionCoprocessorHost
* Supports Coprocessor 'bypass'.
* @param delete The Delete object
* @param edit The WALEdit object.
* @param durability The durability used
* @return true if default processing should be bypassed
* @exception IOException Exception
*/
public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
throws IOException {
public boolean preDelete(final Delete delete, final WALEdit edit) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return false;
}
@ -982,7 +975,7 @@ public class RegionCoprocessorHost
return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preDelete(this, delete, edit, durability);
observer.preDelete(this, delete, edit);
}
});
}
@ -990,18 +983,16 @@ public class RegionCoprocessorHost
/**
* @param delete The Delete object
* @param edit The WALEdit object.
* @param durability The durability used
* @exception IOException Exception
*/
public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
throws IOException {
public void postDelete(final Delete delete, final WALEdit edit) throws IOException {
execOperation(coprocEnvironments.isEmpty()? null:
new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postDelete(this, delete, edit, durability);
}
});
new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postDelete(this, delete, edit);
}
});
}
public void preBatchMutate(
@ -1117,10 +1108,11 @@ public class RegionCoprocessorHost
/**
* Supports Coprocessor 'bypass'.
* @param append append object
* @param edit The WALEdit object.
* @return result to return to client if default operation should be bypassed, null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public Result preAppend(final Append append) throws IOException {
public Result preAppend(final Append append, final WALEdit edit) throws IOException {
boolean bypassable = true;
Result defaultResult = null;
if (this.coprocEnvironments.isEmpty()) {
@ -1131,7 +1123,7 @@ public class RegionCoprocessorHost
bypassable) {
@Override
public Result call(RegionObserver observer) throws IOException {
return observer.preAppend(this, append);
return observer.preAppend(this, append, edit);
}
});
}
@ -1161,10 +1153,11 @@ public class RegionCoprocessorHost
/**
* Supports Coprocessor 'bypass'.
* @param increment increment object
* @param edit The WALEdit object.
* @return result to return to client if default operation should be bypassed, null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public Result preIncrement(final Increment increment) throws IOException {
public Result preIncrement(final Increment increment, final WALEdit edit) throws IOException {
boolean bypassable = true;
Result defaultResult = null;
if (coprocEnvironments.isEmpty()) {
@ -1175,7 +1168,7 @@ public class RegionCoprocessorHost
bypassable) {
@Override
public Result call(RegionObserver observer) throws IOException {
return observer.preIncrement(this, increment);
return observer.preIncrement(this, increment, edit);
}
});
}
@ -1205,9 +1198,11 @@ public class RegionCoprocessorHost
/**
* @param append Append object
* @param result the result returned by the append
* @param edit The WALEdit object.
* @throws IOException if an error occurred on the coprocessor
*/
public Result postAppend(final Append append, final Result result) throws IOException {
public Result postAppend(final Append append, final Result result, final WALEdit edit)
throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
}
@ -1215,7 +1210,7 @@ public class RegionCoprocessorHost
new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
@Override
public Result call(RegionObserver observer) throws IOException {
return observer.postAppend(this, append, result);
return observer.postAppend(this, append, result, edit);
}
});
}
@ -1223,9 +1218,11 @@ public class RegionCoprocessorHost
/**
* @param increment increment object
* @param result the result returned by postIncrement
* @param edit The WALEdit object.
* @throws IOException if an error occurred on the coprocessor
*/
public Result postIncrement(final Increment increment, Result result) throws IOException {
public Result postIncrement(final Increment increment, Result result, final WALEdit edit)
throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
}
@ -1233,7 +1230,7 @@ public class RegionCoprocessorHost
new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
@Override
public Result call(RegionObserver observer) throws IOException {
return observer.postIncrement(this, increment, getResult());
return observer.postIncrement(this, increment, getResult(), edit);
}
});
}