diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0756a1196f4..6e1b475a644 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1928,10 +1928,12 @@ public class HRegion implements HeapSize { // , Writable{ T[] operations; int nextIndexToProcess = 0; OperationStatus[] retCodeDetails; + WALEdit[] walEditsFromCoprocessors; public BatchOperationInProgress(T[] operations) { this.operations = operations; this.retCodeDetails = new OperationStatus[operations.length]; + this.walEditsFromCoprocessors = new WALEdit[operations.length]; Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN); } @@ -1968,14 +1970,21 @@ public class HRegion implements HeapSize { // , Writable{ BatchOperationInProgress> batchOp = new BatchOperationInProgress>(putsAndLocks); + boolean initialized = false; + while (!batchOp.isDone()) { checkReadOnly(); checkResources(); long newSize; startRegionOperation(); - this.writeRequestsCount.increment(); + try { + if (!initialized) { + this.writeRequestsCount.increment(); + doPrePutHook(batchOp); + initialized = true; + } long addedSize = doMiniBatchPut(batchOp); newSize = this.addAndGetGlobalMemstoreSize(addedSize); } finally { @@ -1988,33 +1997,42 @@ public class HRegion implements HeapSize { // , Writable{ return batchOp.retCodeDetails; } + private void doPrePutHook(BatchOperationInProgress> batchOp) + throws IOException { + /* Run coprocessor pre hook outside of locks to avoid deadlock */ + WALEdit walEdit = new WALEdit(); + if (coprocessorHost != null) { + for (int i = 0 ; i < batchOp.operations.length; i++) { + Pair nextPair = batchOp.operations[i]; + Put put = nextPair.getFirst(); + if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) { + // pre hook says skip this Put + // mark as success and skip in doMiniBatchPut + batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + } + if (!walEdit.isEmpty()) { + batchOp.walEditsFromCoprocessors[i] = walEdit; + walEdit = new WALEdit(); + } + } + } + } + + @SuppressWarnings("unchecked") private long doMiniBatchPut( BatchOperationInProgress> batchOp) throws IOException { - final String tableName = getTableDesc().getNameAsString(); - // variable to note if all Put items are for the same CF -- metrics related boolean cfSetConsistent = true; //The set of columnFamilies first seen. Set cfSet = null; + + WALEdit walEdit = new WALEdit(); long startTimeMs = EnvironmentEdgeManager.currentTimeMillis(); - WALEdit walEdit = new WALEdit(); - /* Run coprocessor pre hook outside of locks to avoid deadlock */ - if (coprocessorHost != null) { - for (int i = 0; i < batchOp.operations.length; i++) { - Pair nextPair = batchOp.operations[i]; - Put put = nextPair.getFirst(); - if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) { - // pre hook says skip this Put - // mark as success and skip below - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; - } - } - } MultiVersionConsistencyControl.WriteEntry w = null; long txid = 0; @@ -2152,7 +2170,15 @@ public class HRegion implements HeapSize { // , Writable{ Put p = batchOp.operations[i].getFirst(); if (!p.getWriteToWAL()) continue; + // Add WAL edits by CP + WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; + if (fromCP != null) { + for (KeyValue kv : fromCP.getKeyValues()) { + walEdit.add(kv); + } + } addFamilyMapToWALEdit(familyMaps[i], walEdit); + } // -------------------------