HBASE-4210 Allow coprocessor to interact with batches per region sent from a client (Anoop)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1450438 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-02-26 21:31:16 +00:00
parent 2eeb5761b3
commit a07485a8d8
8 changed files with 351 additions and 5 deletions

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
@ -247,6 +249,16 @@ public abstract class BaseRegionObserver implements RegionObserver {
final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException { final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException {
} }
@Override
public void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
}
@Override
public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
}
@Override @Override
public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> e, public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> e,
final byte [] row, final byte [] family, final byte [] qualifier, final byte [] row, final byte [] family, final byte [] qualifier,

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
@ -540,6 +542,29 @@ public interface RegionObserver extends Coprocessor {
final Delete delete, final WALEdit edit, final boolean writeToWAL) final Delete delete, final WALEdit edit, final boolean writeToWAL)
throws IOException; throws IOException;
/**
* This will be called for every batch mutation operation happening at the server. This will be
* called after acquiring the locks on the mutating rows and after applying the proper timestamp
* for each Mutation at the server. The batch may contain Put/Delete. By setting OperationStatus
* of Mutations ({@link MiniBatchOperationInProgress#setOperationStatus(int, OperationStatus)}),
* {@link RegionObserver} can make HRegion to skip these Mutations.
* @param c the environment provided by the region server
* @param miniBatchOp batch of Mutations getting applied to region.
* @throws IOException if an error occurred on the coprocessor
*/
void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException;
/**
* This will be called after applying a batch of Mutations on a region. The Mutations are added to
* memstore and WAL.
* @param c the environment provided by the region server
* @param miniBatchOp batch of Mutations applied to region.
* @throws IOException if an error occurred on the coprocessor
*/
void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException;
/** /**
* Called before checkAndPut * Called before checkAndPut
* <p> * <p>

View File

@ -2196,6 +2196,14 @@ public class HRegion implements HeapSize { // , Writable{
// ---------------------------------- // ----------------------------------
w = mvcc.beginMemstoreInsert(); w = mvcc.beginMemstoreInsert();
// calling the pre CP hook for batch mutation
if (coprocessorHost != null) {
MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
}
// ------------------------------------ // ------------------------------------
// STEP 3. Write back to memstore // STEP 3. Write back to memstore
// Write to memstore. It is ok to write to memstore // Write to memstore. It is ok to write to memstore
@ -2270,6 +2278,14 @@ public class HRegion implements HeapSize { // , Writable{
syncOrDefer(txid); syncOrDefer(txid);
} }
walSyncSuccessful = true; walSyncSuccessful = true;
// calling the post CP hook for batch mutation
if (coprocessorHost != null) {
MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
coprocessorHost.postBatchMutate(miniBatchOp);
}
// ------------------------------------------------------------------ // ------------------------------------------------------------------
// STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
// ------------------------------------------------------------------ // ------------------------------------------------------------------

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
/**
* Wraps together the mutations which are applied as a batch to the region and their operation
* status and WALEdits.
* @see RegionObserver#preBatchMutate(ObserverContext, MiniBatchOperationInProgress)
* @see RegionObserver#postBatchMutate(ObserverContext, MiniBatchOperationInProgress)
* @param <T> Pair<Mutation, Integer> pair of Mutations and associated rowlock ids .
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class MiniBatchOperationInProgress<T> {
private final T[] operations;
private final OperationStatus[] retCodeDetails;
private final WALEdit[] walEditsFromCoprocessors;
private final int firstIndex;
private final int lastIndexExclusive;
public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails,
WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive) {
this.operations = operations;
this.retCodeDetails = retCodeDetails;
this.walEditsFromCoprocessors = walEditsFromCoprocessors;
this.firstIndex = firstIndex;
this.lastIndexExclusive = lastIndexExclusive;
}
/**
* @return The number of operations(Mutations) involved in this batch.
*/
public int size() {
return this.lastIndexExclusive - this.firstIndex;
}
/**
* @param index
* @return The operation(Mutation) at the specified position.
*/
public T getOperation(int index) {
return operations[getAbsoluteIndex(index)];
}
/**
* Sets the status code for the operation(Mutation) at the specified position.
* By setting this status, {@link RegionObserver} can make HRegion to skip Mutations.
* @param index
* @param opStatus
*/
public void setOperationStatus(int index, OperationStatus opStatus) {
this.retCodeDetails[getAbsoluteIndex(index)] = opStatus;
}
/**
* @param index
* @return Gets the status code for the operation(Mutation) at the specified position.
*/
public OperationStatus getOperationStatus(int index) {
return this.retCodeDetails[getAbsoluteIndex(index)];
}
/**
* Sets the walEdit for the operation(Mutation) at the specified position.
* @param index
* @param walEdit
*/
public void setWalEdit(int index, WALEdit walEdit) {
this.walEditsFromCoprocessors[getAbsoluteIndex(index)] = walEdit;
}
/**
* @param index
* @return Gets the walEdit for the operation(Mutation) at the specified position.
*/
public WALEdit getWalEdit(int index) {
return this.walEditsFromCoprocessors[getAbsoluteIndex(index)];
}
private int getAbsoluteIndex(int index) {
if (index < 0 || this.firstIndex + index >= this.lastIndexExclusive) {
throw new ArrayIndexOutOfBoundsException(index);
}
return this.firstIndex + index;
}
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -982,6 +983,54 @@ public class RegionCoprocessorHost
} }
} }
/**
* @param miniBatchOp
* @return true if default processing should be bypassed
* @throws IOException
*/
public boolean preBatchMutate(
final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
boolean bypass = false;
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
((RegionObserver) env.getInstance()).preBatchMutate(ctx, miniBatchOp);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
bypass |= ctx.shouldBypass();
if (ctx.shouldComplete()) {
break;
}
}
}
return bypass;
}
/**
* @param miniBatchOp
* @throws IOException
*/
public void postBatchMutate(
final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
((RegionObserver) env.getInstance()).postBatchMutate(ctx, miniBatchOp);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
}
/** /**
* @param row row to check * @param row row to check
* @param family column family * @param family column family

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Increment;
@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Leases; import org.apache.hadoop.hbase.regionserver.Leases;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
@ -98,6 +100,8 @@ public class SimpleRegionObserver extends BaseRegionObserver {
boolean hadPostScannerOpen = false; boolean hadPostScannerOpen = false;
boolean hadPreBulkLoadHFile = false; boolean hadPreBulkLoadHFile = false;
boolean hadPostBulkLoadHFile = false; boolean hadPostBulkLoadHFile = false;
boolean hadPreBatchMutate = false;
boolean hadPostBatchMutate = false;
@Override @Override
public void start(CoprocessorEnvironment e) throws IOException { public void start(CoprocessorEnvironment e) throws IOException {
@ -400,6 +404,26 @@ public class SimpleRegionObserver extends BaseRegionObserver {
hadPostDeleted = true; hadPostDeleted = true;
} }
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
RegionCoprocessorEnvironment e = c.getEnvironment();
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(miniBatchOp);
hadPreBatchMutate = true;
}
@Override
public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
RegionCoprocessorEnvironment e = c.getEnvironment();
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(miniBatchOp);
hadPostBatchMutate = true;
}
@Override @Override
public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c, public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
final byte[] row, final byte[] family, final Result result) final byte[] row, final byte[] family, final Result result)
@ -492,6 +516,15 @@ public class SimpleRegionObserver extends BaseRegionObserver {
public boolean hadPostPut() { public boolean hadPostPut() {
return hadPostPut; return hadPostPut;
} }
public boolean hadPreBatchMutate() {
return hadPreBatchMutate;
}
public boolean hadPostBatchMutate() {
return hadPostBatchMutate;
}
public boolean hadDelete() { public boolean hadDelete() {
return !beforeDelete; return !beforeDelete;
} }

View File

@ -120,9 +120,9 @@ public class TestRegionObserverInterface {
verifyMethodResult(SimpleRegionObserver.class, verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDelete"}, "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
TEST_TABLE, TEST_TABLE,
new Boolean[] {false, false, true, true, false} new Boolean[] {false, false, true, true, true, true, false}
); );
Get get = new Get(ROW); Get get = new Get(ROW);
@ -146,9 +146,9 @@ public class TestRegionObserverInterface {
verifyMethodResult(SimpleRegionObserver.class, verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDelete"}, "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
TEST_TABLE, TEST_TABLE,
new Boolean[] {true, true, true, true, true} new Boolean[] {true, true, true, true, true, true, true}
); );
util.deleteTable(tableName); util.deleteTable(tableName);
table.close(); table.close();

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestMiniBatchOperationInProgress {
@Test
public void testMiniBatchOperationInProgressMethods() {
Pair<Mutation, Integer>[] operations = new Pair[10];
OperationStatus[] retCodeDetails = new OperationStatus[10];
WALEdit[] walEditsFromCoprocessors = new WALEdit[10];
for (int i = 0; i < 10; i++) {
operations[i] = new Pair<Mutation, Integer>(new Put(Bytes.toBytes(i)), null);
}
MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatch =
new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(operations, retCodeDetails,
walEditsFromCoprocessors, 0, 5);
assertEquals(5, miniBatch.size());
assertTrue(Bytes.equals(Bytes.toBytes(0), miniBatch.getOperation(0).getFirst().getRow()));
assertTrue(Bytes.equals(Bytes.toBytes(2), miniBatch.getOperation(2).getFirst().getRow()));
assertTrue(Bytes.equals(Bytes.toBytes(4), miniBatch.getOperation(4).getFirst().getRow()));
try {
miniBatch.getOperation(5);
fail("Should throw Exception while accessing out of range");
} catch (ArrayIndexOutOfBoundsException e) {
}
miniBatch.setOperationStatus(1, OperationStatus.FAILURE);
assertEquals(OperationStatus.FAILURE, retCodeDetails[1]);
try {
miniBatch.setOperationStatus(6, OperationStatus.FAILURE);
fail("Should throw Exception while accessing out of range");
} catch (ArrayIndexOutOfBoundsException e) {
}
try {
miniBatch.setWalEdit(5, new WALEdit());
fail("Should throw Exception while accessing out of range");
} catch (ArrayIndexOutOfBoundsException e) {
}
miniBatch = new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(operations,
retCodeDetails, walEditsFromCoprocessors, 7, 10);
try {
miniBatch.setWalEdit(-1, new WALEdit());
fail("Should throw Exception while accessing out of range");
} catch (ArrayIndexOutOfBoundsException e) {
}
try {
miniBatch.getOperation(-1);
fail("Should throw Exception while accessing out of range");
} catch (ArrayIndexOutOfBoundsException e) {
}
try {
miniBatch.getOperation(3);
fail("Should throw Exception while accessing out of range");
} catch (ArrayIndexOutOfBoundsException e) {
}
try {
miniBatch.getOperationStatus(9);
fail("Should throw Exception while accessing out of range");
} catch (ArrayIndexOutOfBoundsException e) {
}
try {
miniBatch.setOperationStatus(3, OperationStatus.FAILURE);
fail("Should throw Exception while accessing out of range");
} catch (ArrayIndexOutOfBoundsException e) {
}
assertTrue(Bytes.equals(Bytes.toBytes(7), miniBatch.getOperation(0).getFirst().getRow()));
assertTrue(Bytes.equals(Bytes.toBytes(9), miniBatch.getOperation(2).getFirst().getRow()));
miniBatch.setOperationStatus(1, OperationStatus.SUCCESS);
assertEquals(OperationStatus.SUCCESS, retCodeDetails[8]);
WALEdit wal = new WALEdit();
miniBatch.setWalEdit(0, wal);
assertEquals(wal, walEditsFromCoprocessors[7]);
}
}