From a07485a8d856d48d0e4ec1a79f8f4eb255aebc5a Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Tue, 26 Feb 2013 21:31:16 +0000 Subject: [PATCH] HBASE-4210 Allow coprocessor to interact with batches per region sent from a client (Anoop) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1450438 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/coprocessor/BaseRegionObserver.java | 14 ++- .../hbase/coprocessor/RegionObserver.java | 25 +++++ .../hadoop/hbase/regionserver/HRegion.java | 16 +++ .../MiniBatchOperationInProgress.java | 106 ++++++++++++++++++ .../regionserver/RegionCoprocessorHost.java | 49 ++++++++ .../coprocessor/SimpleRegionObserver.java | 33 ++++++ .../TestRegionObserverInterface.java | 8 +- .../TestMiniBatchOperationInProgress.java | 105 +++++++++++++++++ 8 files changed, 351 insertions(+), 5 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index d245db40f4b..9c54de7af32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -246,6 +248,16 @@ public abstract class BaseRegionObserver implements RegionObserver { public void postDelete(final ObserverContext e, final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException { } + + @Override + public void preBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + } + + @Override + public void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + } @Override public boolean preCheckAndPut(final ObserverContext e, @@ -387,4 +399,4 @@ public abstract class BaseRegionObserver implements RegionObserver { List> familyPaths, boolean hasLoaded) throws IOException { return hasLoaded; } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index e303b290957..461bc316ed1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -539,6 +541,29 @@ public interface RegionObserver extends Coprocessor { void postDelete(final ObserverContext c, final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException; + + /** + * This will be called for every batch mutation operation happening at the server. This will be + * called after acquiring the locks on the mutating rows and after applying the proper timestamp + * for each Mutation at the server. The batch may contain Put/Delete. By setting OperationStatus + * of Mutations ({@link MiniBatchOperationInProgress#setOperationStatus(int, OperationStatus)}), + * {@link RegionObserver} can make HRegion to skip these Mutations. + * @param c the environment provided by the region server + * @param miniBatchOp batch of Mutations getting applied to region. + * @throws IOException if an error occurred on the coprocessor + */ + void preBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress> miniBatchOp) throws IOException; + + /** + * This will be called after applying a batch of Mutations on a region. The Mutations are added to + * memstore and WAL. + * @param c the environment provided by the region server + * @param miniBatchOp batch of Mutations applied to region. + * @throws IOException if an error occurred on the coprocessor + */ + void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress> miniBatchOp) throws IOException; /** * Called before checkAndPut diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3d49ccb988d..02e98c0ffdf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2196,6 +2196,14 @@ public class HRegion implements HeapSize { // , Writable{ // ---------------------------------- w = mvcc.beginMemstoreInsert(); + // calling the pre CP hook for batch mutation + if (coprocessorHost != null) { + MiniBatchOperationInProgress> miniBatchOp = + new MiniBatchOperationInProgress>(batchOp.operations, + batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); + if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; + } + // ------------------------------------ // STEP 3. Write back to memstore // Write to memstore. It is ok to write to memstore @@ -2270,6 +2278,14 @@ public class HRegion implements HeapSize { // , Writable{ syncOrDefer(txid); } walSyncSuccessful = true; + // calling the post CP hook for batch mutation + if (coprocessorHost != null) { + MiniBatchOperationInProgress> miniBatchOp = + new MiniBatchOperationInProgress>(batchOp.operations, + batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); + coprocessorHost.postBatchMutate(miniBatchOp); + } + // ------------------------------------------------------------------ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java new file mode 100644 index 00000000000..5930869f9a7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + +/** + * Wraps together the mutations which are applied as a batch to the region and their operation + * status and WALEdits. + * @see RegionObserver#preBatchMutate(ObserverContext, MiniBatchOperationInProgress) + * @see RegionObserver#postBatchMutate(ObserverContext, MiniBatchOperationInProgress) + * @param Pair pair of Mutations and associated rowlock ids . + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MiniBatchOperationInProgress { + private final T[] operations; + private final OperationStatus[] retCodeDetails; + private final WALEdit[] walEditsFromCoprocessors; + private final int firstIndex; + private final int lastIndexExclusive; + + public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails, + WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive) { + this.operations = operations; + this.retCodeDetails = retCodeDetails; + this.walEditsFromCoprocessors = walEditsFromCoprocessors; + this.firstIndex = firstIndex; + this.lastIndexExclusive = lastIndexExclusive; + } + + /** + * @return The number of operations(Mutations) involved in this batch. + */ + public int size() { + return this.lastIndexExclusive - this.firstIndex; + } + + /** + * @param index + * @return The operation(Mutation) at the specified position. + */ + public T getOperation(int index) { + return operations[getAbsoluteIndex(index)]; + } + + /** + * Sets the status code for the operation(Mutation) at the specified position. + * By setting this status, {@link RegionObserver} can make HRegion to skip Mutations. + * @param index + * @param opStatus + */ + public void setOperationStatus(int index, OperationStatus opStatus) { + this.retCodeDetails[getAbsoluteIndex(index)] = opStatus; + } + + /** + * @param index + * @return Gets the status code for the operation(Mutation) at the specified position. + */ + public OperationStatus getOperationStatus(int index) { + return this.retCodeDetails[getAbsoluteIndex(index)]; + } + + /** + * Sets the walEdit for the operation(Mutation) at the specified position. + * @param index + * @param walEdit + */ + public void setWalEdit(int index, WALEdit walEdit) { + this.walEditsFromCoprocessors[getAbsoluteIndex(index)] = walEdit; + } + + /** + * @param index + * @return Gets the walEdit for the operation(Mutation) at the specified position. + */ + public WALEdit getWalEdit(int index) { + return this.walEditsFromCoprocessors[getAbsoluteIndex(index)]; + } + + private int getAbsoluteIndex(int index) { + if (index < 0 || this.firstIndex + index >= this.lastIndexExclusive) { + throw new ArrayIndexOutOfBoundsException(index); + } + return this.firstIndex + index; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 138d24534d4..02c97ad582a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -981,6 +982,54 @@ public class RegionCoprocessorHost } } } + + /** + * @param miniBatchOp + * @return true if default processing should be bypassed + * @throws IOException + */ + public boolean preBatchMutate( + final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).preBatchMutate(ctx, miniBatchOp); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass; + } + + /** + * @param miniBatchOp + * @throws IOException + */ + public void postBatchMutate( + final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).postBatchMutate(ctx, miniBatchOp); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } /** * @param row row to check diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index 618d4fbab0d..cd703876875 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Increment; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.Leases; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -98,6 +100,8 @@ public class SimpleRegionObserver extends BaseRegionObserver { boolean hadPostScannerOpen = false; boolean hadPreBulkLoadHFile = false; boolean hadPostBulkLoadHFile = false; + boolean hadPreBatchMutate = false; + boolean hadPostBatchMutate = false; @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -399,6 +403,26 @@ public class SimpleRegionObserver extends BaseRegionObserver { beforeDelete = false; hadPostDeleted = true; } + + @Override + public void preBatchMutate(ObserverContext c, + MiniBatchOperationInProgress> miniBatchOp) throws IOException { + RegionCoprocessorEnvironment e = c.getEnvironment(); + assertNotNull(e); + assertNotNull(e.getRegion()); + assertNotNull(miniBatchOp); + hadPreBatchMutate = true; + } + + @Override + public void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + RegionCoprocessorEnvironment e = c.getEnvironment(); + assertNotNull(e); + assertNotNull(e.getRegion()); + assertNotNull(miniBatchOp); + hadPostBatchMutate = true; + } @Override public void preGetClosestRowBefore(final ObserverContext c, @@ -492,6 +516,15 @@ public class SimpleRegionObserver extends BaseRegionObserver { public boolean hadPostPut() { return hadPostPut; } + + public boolean hadPreBatchMutate() { + return hadPreBatchMutate; + } + + public boolean hadPostBatchMutate() { + return hadPostBatchMutate; + } + public boolean hadDelete() { return !beforeDelete; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index eb8f6522b30..72e97267a27 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -120,9 +120,9 @@ public class TestRegionObserverInterface { verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadDelete"}, + "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, TEST_TABLE, - new Boolean[] {false, false, true, true, false} + new Boolean[] {false, false, true, true, true, true, false} ); Get get = new Get(ROW); @@ -146,9 +146,9 @@ public class TestRegionObserverInterface { verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadDelete"}, + "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, TEST_TABLE, - new Boolean[] {true, true, true, true, true} + new Boolean[] {true, true, true, true, true, true, true} ); util.deleteTable(tableName); table.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java new file mode 100644 index 00000000000..a83beb14e6b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestMiniBatchOperationInProgress { + + @Test + public void testMiniBatchOperationInProgressMethods() { + Pair[] operations = new Pair[10]; + OperationStatus[] retCodeDetails = new OperationStatus[10]; + WALEdit[] walEditsFromCoprocessors = new WALEdit[10]; + for (int i = 0; i < 10; i++) { + operations[i] = new Pair(new Put(Bytes.toBytes(i)), null); + } + MiniBatchOperationInProgress> miniBatch = + new MiniBatchOperationInProgress>(operations, retCodeDetails, + walEditsFromCoprocessors, 0, 5); + + assertEquals(5, miniBatch.size()); + assertTrue(Bytes.equals(Bytes.toBytes(0), miniBatch.getOperation(0).getFirst().getRow())); + assertTrue(Bytes.equals(Bytes.toBytes(2), miniBatch.getOperation(2).getFirst().getRow())); + assertTrue(Bytes.equals(Bytes.toBytes(4), miniBatch.getOperation(4).getFirst().getRow())); + try { + miniBatch.getOperation(5); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + miniBatch.setOperationStatus(1, OperationStatus.FAILURE); + assertEquals(OperationStatus.FAILURE, retCodeDetails[1]); + try { + miniBatch.setOperationStatus(6, OperationStatus.FAILURE); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + try { + miniBatch.setWalEdit(5, new WALEdit()); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + + miniBatch = new MiniBatchOperationInProgress>(operations, + retCodeDetails, walEditsFromCoprocessors, 7, 10); + try { + miniBatch.setWalEdit(-1, new WALEdit()); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + try { + miniBatch.getOperation(-1); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + try { + miniBatch.getOperation(3); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + try { + miniBatch.getOperationStatus(9); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + try { + miniBatch.setOperationStatus(3, OperationStatus.FAILURE); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + assertTrue(Bytes.equals(Bytes.toBytes(7), miniBatch.getOperation(0).getFirst().getRow())); + assertTrue(Bytes.equals(Bytes.toBytes(9), miniBatch.getOperation(2).getFirst().getRow())); + miniBatch.setOperationStatus(1, OperationStatus.SUCCESS); + assertEquals(OperationStatus.SUCCESS, retCodeDetails[8]); + WALEdit wal = new WALEdit(); + miniBatch.setWalEdit(0, wal); + assertEquals(wal, walEditsFromCoprocessors[7]); + } +}