HBASE-5897 prePut coprocessor hook causing substantial CPU usage
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1332811 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8152b444ef
commit
69ffbf247f
|
@ -1928,10 +1928,12 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
T[] operations;
|
T[] operations;
|
||||||
int nextIndexToProcess = 0;
|
int nextIndexToProcess = 0;
|
||||||
OperationStatus[] retCodeDetails;
|
OperationStatus[] retCodeDetails;
|
||||||
|
WALEdit[] walEditsFromCoprocessors;
|
||||||
|
|
||||||
public BatchOperationInProgress(T[] operations) {
|
public BatchOperationInProgress(T[] operations) {
|
||||||
this.operations = operations;
|
this.operations = operations;
|
||||||
this.retCodeDetails = new OperationStatus[operations.length];
|
this.retCodeDetails = new OperationStatus[operations.length];
|
||||||
|
this.walEditsFromCoprocessors = new WALEdit[operations.length];
|
||||||
Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
|
Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1968,14 +1970,21 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
BatchOperationInProgress<Pair<Put, Integer>> batchOp =
|
BatchOperationInProgress<Pair<Put, Integer>> batchOp =
|
||||||
new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
|
new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
|
||||||
|
|
||||||
|
boolean initialized = false;
|
||||||
|
|
||||||
while (!batchOp.isDone()) {
|
while (!batchOp.isDone()) {
|
||||||
checkReadOnly();
|
checkReadOnly();
|
||||||
checkResources();
|
checkResources();
|
||||||
|
|
||||||
long newSize;
|
long newSize;
|
||||||
startRegionOperation();
|
startRegionOperation();
|
||||||
this.writeRequestsCount.increment();
|
|
||||||
try {
|
try {
|
||||||
|
if (!initialized) {
|
||||||
|
this.writeRequestsCount.increment();
|
||||||
|
doPrePutHook(batchOp);
|
||||||
|
initialized = true;
|
||||||
|
}
|
||||||
long addedSize = doMiniBatchPut(batchOp);
|
long addedSize = doMiniBatchPut(batchOp);
|
||||||
newSize = this.addAndGetGlobalMemstoreSize(addedSize);
|
newSize = this.addAndGetGlobalMemstoreSize(addedSize);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1988,33 +1997,42 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
return batchOp.retCodeDetails;
|
return batchOp.retCodeDetails;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void doPrePutHook(BatchOperationInProgress<Pair<Put, Integer>> batchOp)
|
||||||
|
throws IOException {
|
||||||
|
/* Run coprocessor pre hook outside of locks to avoid deadlock */
|
||||||
|
WALEdit walEdit = new WALEdit();
|
||||||
|
if (coprocessorHost != null) {
|
||||||
|
for (int i = 0 ; i < batchOp.operations.length; i++) {
|
||||||
|
Pair<Put, Integer> nextPair = batchOp.operations[i];
|
||||||
|
Put put = nextPair.getFirst();
|
||||||
|
if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) {
|
||||||
|
// pre hook says skip this Put
|
||||||
|
// mark as success and skip in doMiniBatchPut
|
||||||
|
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
|
||||||
|
}
|
||||||
|
if (!walEdit.isEmpty()) {
|
||||||
|
batchOp.walEditsFromCoprocessors[i] = walEdit;
|
||||||
|
walEdit = new WALEdit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private long doMiniBatchPut(
|
private long doMiniBatchPut(
|
||||||
BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
|
BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
|
||||||
|
|
||||||
final String tableName = getTableDesc().getNameAsString();
|
|
||||||
|
|
||||||
// variable to note if all Put items are for the same CF -- metrics related
|
// variable to note if all Put items are for the same CF -- metrics related
|
||||||
boolean cfSetConsistent = true;
|
boolean cfSetConsistent = true;
|
||||||
|
|
||||||
//The set of columnFamilies first seen.
|
//The set of columnFamilies first seen.
|
||||||
Set<byte[]> cfSet = null;
|
Set<byte[]> cfSet = null;
|
||||||
|
|
||||||
|
WALEdit walEdit = new WALEdit();
|
||||||
|
|
||||||
long startTimeMs = EnvironmentEdgeManager.currentTimeMillis();
|
long startTimeMs = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
|
|
||||||
WALEdit walEdit = new WALEdit();
|
|
||||||
/* Run coprocessor pre hook outside of locks to avoid deadlock */
|
|
||||||
if (coprocessorHost != null) {
|
|
||||||
for (int i = 0; i < batchOp.operations.length; i++) {
|
|
||||||
Pair<Put, Integer> nextPair = batchOp.operations[i];
|
|
||||||
Put put = nextPair.getFirst();
|
|
||||||
if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) {
|
|
||||||
// pre hook says skip this Put
|
|
||||||
// mark as success and skip below
|
|
||||||
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
MultiVersionConsistencyControl.WriteEntry w = null;
|
MultiVersionConsistencyControl.WriteEntry w = null;
|
||||||
long txid = 0;
|
long txid = 0;
|
||||||
|
@ -2152,7 +2170,15 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
|
|
||||||
Put p = batchOp.operations[i].getFirst();
|
Put p = batchOp.operations[i].getFirst();
|
||||||
if (!p.getWriteToWAL()) continue;
|
if (!p.getWriteToWAL()) continue;
|
||||||
|
// Add WAL edits by CP
|
||||||
|
WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
|
||||||
|
if (fromCP != null) {
|
||||||
|
for (KeyValue kv : fromCP.getKeyValues()) {
|
||||||
|
walEdit.add(kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
addFamilyMapToWALEdit(familyMaps[i], walEdit);
|
addFamilyMapToWALEdit(familyMaps[i], walEdit);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// -------------------------
|
// -------------------------
|
||||||
|
|
Loading…
Reference in New Issue