HBASE-3348 Coprocessors: Allow Observers to completely override base function

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1049471 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2010-12-15 10:11:59 +00:00
parent 45942d3d52
commit 1842a1ee85
9 changed files with 723 additions and 434 deletions

View File

@ -36,6 +36,8 @@ Release 0.91.0 - Unreleased
HBASE-1861 Multi-Family support for bulk upload tools
HBASE-3308 SplitTransaction.splitStoreFiles slows splits a lot
HBASE-3328 Added Admin API to specify explicit split points
HBASE-3345 Coprocessors: Allow observers to completely override base
function
NEW FEATURES

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import java.io.IOException;
/**
@ -71,70 +73,65 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor,
@Override
public void preGetClosestRowBefore(final CoprocessorEnvironment e,
final byte [] row, final byte [] family)
final byte [] row, final byte [] family, final Result result)
throws IOException {
}
@Override
public Result postGetClosestRowBefore(final CoprocessorEnvironment e,
byte[] row, byte[] family, final Result result)
throws IOException {
return result;
}
@Override
public Get preGet(final CoprocessorEnvironment e, final Get get)
throws IOException {
return get;
}
@Override
public List<KeyValue> postGet(final CoprocessorEnvironment e, final Get get,
List<KeyValue> results) throws IOException {
return results;
}
@Override
public Get preExists(final CoprocessorEnvironment e, final Get get)
public void postGetClosestRowBefore(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final Result result)
throws IOException {
return get;
}
@Override
public boolean postExists(final CoprocessorEnvironment e,
final Get get, boolean exists)
throws IOException {
public void preGet(final CoprocessorEnvironment e, final Get get,
final List<KeyValue> results) throws IOException {
}
@Override
public void postGet(final CoprocessorEnvironment e, final Get get,
final List<KeyValue> results) throws IOException {
}
@Override
public boolean preExists(final CoprocessorEnvironment e, final Get get,
final boolean exists) throws IOException {
return exists;
}
@Override
public Map<byte[], List<KeyValue>> prePut(final CoprocessorEnvironment e,
final Map<byte[], List<KeyValue>> familyMap) throws IOException {
return familyMap;
public boolean postExists(final CoprocessorEnvironment e, final Get get,
boolean exists) throws IOException {
return exists;
}
@Override
public void postPut(final CoprocessorEnvironment e,
final Map<byte[], List<KeyValue>> familyMap)
throws IOException {
public void prePut(final CoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
}
@Override
public Map<byte[], List<KeyValue>> preDelete(final CoprocessorEnvironment e,
final Map<byte[], List<KeyValue>> familyMap) throws IOException {
return familyMap;
public void postPut(final CoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
}
@Override
public void postDelete(CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) throws IOException {
public void preDelete(final CoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
}
@Override
public Put preCheckAndPut(final CoprocessorEnvironment e,
public void postDelete(final CoprocessorEnvironment e,
final Map<byte[], List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException {
}
@Override
public boolean preCheckAndPut(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put) throws IOException {
return put;
final byte [] value, final Put put, final boolean result)
throws IOException {
return result;
}
@Override
@ -146,18 +143,18 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor,
}
@Override
public Delete preCheckAndDelete(final CoprocessorEnvironment e,
public boolean preCheckAndDelete(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Delete delete)
throws IOException {
return delete;
final byte [] value, final Delete delete, final boolean result)
throws IOException {
return result;
}
@Override
public boolean postCheckAndDelete(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Delete delete, final boolean result)
throws IOException {
throws IOException {
return result;
}
@ -172,54 +169,53 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor,
public long postIncrementColumnValue(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final long amount, final boolean writeToWAL, long result)
throws IOException {
return result;
}
@Override
public Increment preIncrement(final CoprocessorEnvironment e,
final Increment increment)
throws IOException {
return increment;
}
@Override
public Result postIncrement(final CoprocessorEnvironment e,
final Increment increment,
final Result result) throws IOException {
return result;
}
@Override
public Scan preScannerOpen(final CoprocessorEnvironment e, final Scan scan)
throws IOException {
return scan;
}
@Override
public void postScannerOpen(final CoprocessorEnvironment e,
final Scan scan,
final long scannerId) throws IOException { }
@Override
public void preScannerNext(final CoprocessorEnvironment e,
final long scannerId) throws IOException {
}
@Override
public List<KeyValue> postScannerNext(final CoprocessorEnvironment e,
final long scannerId, final List<KeyValue> results)
throws IOException {
return results;
return result;
}
@Override
public void preIncrement(final CoprocessorEnvironment e,
final Increment increment, final Result result) throws IOException {
}
@Override
public void postIncrement(final CoprocessorEnvironment e,
final Increment increment, final Result result) throws IOException {
}
@Override
public InternalScanner preScannerOpen(final CoprocessorEnvironment e,
final Scan scan, final InternalScanner s) throws IOException {
return s;
}
@Override
public InternalScanner postScannerOpen(final CoprocessorEnvironment e,
final Scan scan, final InternalScanner s) throws IOException {
return s;
}
@Override
public boolean preScannerNext(final CoprocessorEnvironment e,
final InternalScanner s, final List<KeyValue> results,
final int limit, final boolean hasMore) throws IOException {
return hasMore;
}
@Override
public boolean postScannerNext(final CoprocessorEnvironment e,
final InternalScanner s, final List<KeyValue> results, final int limit,
final boolean hasMore) throws IOException {
return hasMore;
}
@Override
public void preScannerClose(final CoprocessorEnvironment e,
final long scannerId)
throws IOException { }
final InternalScanner s) throws IOException {
}
@Override
public void postScannerClose(final CoprocessorEnvironment e,
final long scannerId)
throws IOException { }
final InternalScanner s) throws IOException {
}
}

View File

@ -45,27 +45,17 @@ public interface CoprocessorEnvironment {
*/
public HTableInterface getTable(byte[] tableName) throws IOException;
// environment variables
/* Control flow changes */
/**
* Get an environment variable
* @param key the key
* @return the object corresponding to the environment variable, if set
* Causes framework to bypass default actions and return with the results
* from a preXXX chain.
*/
public Object get(Object key);
public void bypass();
/**
* Set an environment variable
* @param key the key
* @param value the value
* Mark coprocessor chain processing as complete. Causes framework to return
* immediately without calling any additional chained coprocessors.
*/
public void put(Object key, Object value);
/**
* Remove an environment variable
* @param key the key
* @return the object corresponding to the environment variable, if set
*/
public Object remove(Object key);
public void complete();
}

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import java.io.IOException;
/**
@ -37,62 +39,92 @@ public interface RegionObserver {
/**
* Called before a client makes a GetClosestRowBefore request.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param row the row
* @param family the family
* @param result The result to return to the client if default processing
* is bypassed. Can be modified. Will not be used if default processing
* is not bypassed.
* @throws IOException if an error occurred on the coprocessor
*/
public void preGetClosestRowBefore(final CoprocessorEnvironment e,
final byte [] row, final byte [] family)
throws IOException;
/**
* Called after a client makes a GetClosestRowBefore request.
* @param e the environment provided by the region server
* @param row the row
* @param family the desired family
* @param result the result set
* @return the possible tranformed result set to return to the client
* @throws IOException if an error occurred on the coprocessor
*/
public Result postGetClosestRowBefore(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final Result result)
throws IOException;
/**
* Called before the client perform a get()
* Called after a client makes a GetClosestRowBefore request.
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param get the Get request
* @return the possibly transformed Get object by coprocessor
* @param row the row
* @param family the desired family
* @param result the result to return to the client, modify as necessary
* @throws IOException if an error occurred on the coprocessor
*/
public Get preGet(final CoprocessorEnvironment e, final Get get)
public void postGetClosestRowBefore(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final Result result)
throws IOException;
/**
* Called after the client perform a get()
* Called before the client performs a Get
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param get the Get request
* @param results the result list
* @return the possibly transformed result list to return to client
* @param result The result to return to the client if default processing
* is bypassed. Can be modified. Will not be used if default processing
* is not bypassed.
* @throws IOException if an error occurred on the coprocessor
*/
public List<KeyValue> postGet(final CoprocessorEnvironment e, final Get get,
final List<KeyValue> results)
public void preGet(final CoprocessorEnvironment e, final Get get,
final List<KeyValue> result)
throws IOException;
/**
* Called after the client performs a Get
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param get the Get request
* @param result the result to return to the client, modify as necessary
* @throws IOException if an error occurred on the coprocessor
*/
public void postGet(final CoprocessorEnvironment e, final Get get,
final List<KeyValue> result)
throws IOException;
/**
* Called before the client tests for existence using a Get.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param get the Get request
* @return the possibly transformed Get object by coprocessor
* @param exists
* @return the value to return to the client if bypassing default processing
* @throws IOException if an error occurred on the coprocessor
*/
public Get preExists(final CoprocessorEnvironment e, final Get get)
public boolean preExists(final CoprocessorEnvironment e, final Get get,
final boolean exists)
throws IOException;
/**
* Called after the client tests for existence using a Get.
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param get the Get request
* @param exists the result returned by the region server
@ -105,64 +137,92 @@ public interface RegionObserver {
/**
* Called before the client stores a value.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param familyMap map of family to edits for the given family.
* @return the possibly transformed map to actually use
* @param familyMap map of family to edits for the given family
* @param writeToWAL true if the change should be written to the WAL
* @throws IOException if an error occurred on the coprocessor
*/
public Map<byte[], List<KeyValue>> prePut(final CoprocessorEnvironment e,
final Map<byte[], List<KeyValue>> familyMap)
public void prePut(final CoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException;
/**
* Called after the client stores a value.
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param familyMap map of family to edits for the given family.
* @param familyMap map of family to edits for the given family
* @param writeToWAL true if the change should be written to the WAL
* @throws IOException if an error occurred on the coprocessor
*/
public void postPut(final CoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap)
List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException;
/**
* Called before the client deletes a value.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param familyMap map of family to edits for the given family.
* @return the possibly transformed map to actually use
* @param familyMap map of family to edits for the given family
* @param writeToWAL true if the change should be written to the WAL
* @throws IOException if an error occurred on the coprocessor
*/
public Map<byte[], List<KeyValue>> preDelete(final CoprocessorEnvironment e,
final Map<byte[], List<KeyValue>> familyMap)
public void preDelete(final CoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException;
/**
* Called after the client deletes a value.
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param familyMap map of family to edits for the given family.
* @param familyMap map of family to edits for the given family
* @param writeToWAL true if the change should be written to the WAL
* @throws IOException if an error occurred on the coprocessor
*/
public void postDelete(final CoprocessorEnvironment e,
final Map<byte[], List<KeyValue>> familyMap)
final Map<byte[], List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException;
/**
* Called before checkAndPut
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param value the expected value
* @param put data to put if check succeeds
* @return the possibly transformed map to actually use
* @param result
* @return the return value to return to client if bypassing default
* processing
* @throws IOException if an error occurred on the coprocessor
*/
public Put preCheckAndPut(final CoprocessorEnvironment e,
public boolean preCheckAndPut(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put)
final byte [] value, final Put put, final boolean result)
throws IOException;
/**
* Called after checkAndPut
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
@ -170,7 +230,7 @@ public interface RegionObserver {
* @param value the expected value
* @param put data to put if check succeeds
* @param result from the checkAndPut
* @return the possibly transformed value to return to client
* @return the possibly transformed return value to return to client
* @throws IOException if an error occurred on the coprocessor
*/
public boolean postCheckAndPut(final CoprocessorEnvironment e,
@ -179,23 +239,32 @@ public interface RegionObserver {
throws IOException;
/**
* Called before checkAndPut
* Called before checkAndDelete
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param value the expected value
* @param delete delete to commit if check succeeds
* @return the possibly transformed map to actually use
* @param result
* @return the value to return to client if bypassing default processing
* @throws IOException if an error occurred on the coprocessor
*/
public Delete preCheckAndDelete(final CoprocessorEnvironment e,
public boolean preCheckAndDelete(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Delete delete)
final byte [] value, final Delete delete, final boolean result)
throws IOException;
/**
* Called after checkAndDelete
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
@ -203,7 +272,7 @@ public interface RegionObserver {
* @param value the expected value
* @param delete delete to commit if check succeeds
* @param result from the CheckAndDelete
* @return the possibly transformed value to return to client
* @return the possibly transformed returned value to return to client
* @throws IOException if an error occurred on the coprocessor
*/
public boolean postCheckAndDelete(final CoprocessorEnvironment e,
@ -213,13 +282,18 @@ public interface RegionObserver {
/**
* Called before incrementColumnValue
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param amount long amount to increment
* @param writeToWAL whether to write the increment to the WAL
* @return new amount to increment
* @param writeToWAL true if the change should be written to the WAL
* @return value to return to the client if bypassing default processing
* @throws IOException if an error occurred on the coprocessor
*/
public long preIncrementColumnValue(final CoprocessorEnvironment e,
@ -229,12 +303,15 @@ public interface RegionObserver {
/**
* Called after incrementColumnValue
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param amount long amount to increment
* @param writeToWAL whether to write the increment to the WAL
* @param writeToWAL true if the change should be written to the WAL
* @param result the result returned by incrementColumnValue
* @return the result to return to the client
* @throws IOException if an error occurred on the coprocessor
@ -245,92 +322,137 @@ public interface RegionObserver {
throws IOException;
/**
* Called before incrementColumnValue
* Called before Increment
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param increment increment object
* @param writeToWAL whether to write the increment to the WAL
* @return new Increment instance
* @param result The result to return to the client if default processing
* is bypassed. Can be modified. Will not be used if default processing
* is not bypassed.
* @param writeToWAL true if the change should be written to the WAL
* @throws IOException if an error occurred on the coprocessor
*/
public Increment preIncrement(final CoprocessorEnvironment e,
final Increment increment)
public void preIncrement(final CoprocessorEnvironment e,
final Increment increment, final Result result)
throws IOException;
/**
* Called after increment
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param increment increment object
* @param writeToWAL whether to write the increment to the WAL
* @param result the result returned by increment
* @return the result to return to the client
* @param writeToWAL true if the change should be written to the WAL
* @param result the result returned by increment, can be modified
* @throws IOException if an error occurred on the coprocessor
*/
public Result postIncrement(final CoprocessorEnvironment e,
public void postIncrement(final CoprocessorEnvironment e,
final Increment increment, final Result result)
throws IOException;
/**
* Called before the client opens a new scanner.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param scan the Scan specification
* @return the possibly transformed Scan to actually use
* @param s if not null, the base scanner
* @return an InternalScanner instance to use instead of the base scanner if
* overriding default behavior, null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public Scan preScannerOpen(final CoprocessorEnvironment e, final Scan scan)
public InternalScanner preScannerOpen(final CoprocessorEnvironment e,
final Scan scan, final InternalScanner s)
throws IOException;
/**
* Called after the client opens a new scanner.
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param scan the Scan specification
* @param scannerId the scanner id allocated by the region server
* @param s if not null, the base scanner
* @return the scanner instance to use
* @throws IOException if an error occurred on the coprocessor
*/
public void postScannerOpen(final CoprocessorEnvironment e, final Scan scan,
final long scannerId)
public InternalScanner postScannerOpen(final CoprocessorEnvironment e,
final Scan scan, final InternalScanner s)
throws IOException;
/**
* Called before the client asks for the next row on a scanner.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param scannerId the scanner id
* @param results the result set returned by the region server
* @return the possibly transformed result set to actually return
* @param s the scanner
* @param result The result to return to the client if default processing
* is bypassed. Can be modified. Will not be returned if default processing
* is not bypassed.
* @param limit the maximum number of results to return
* @param hasNext the 'has more' indication
* @return 'has more' indication that should be sent to client
* @throws IOException if an error occurred on the coprocessor
*/
public void preScannerNext(final CoprocessorEnvironment e,
final long scannerId)
public boolean preScannerNext(final CoprocessorEnvironment e,
final InternalScanner s, final List<KeyValue> result,
final int limit, final boolean hasNext)
throws IOException;
/**
* Called after the client asks for the next row on a scanner.
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param scannerId the scanner id
* @param results the result set returned by the region server
* @return the possibly transformed result set to actually return
* @param s the scanner
* @param result the result to return to the client, can be modified
* @param limit the maximum number of results to return
* @param hasNext the 'has more' indication
* @return 'has more' indication that should be sent to client
* @throws IOException if an error occurred on the coprocessor
*/
public List<KeyValue> postScannerNext(final CoprocessorEnvironment e,
final long scannerId, final List<KeyValue> results)
public boolean postScannerNext(final CoprocessorEnvironment e,
final InternalScanner s, final List<KeyValue> result, final int limit,
final boolean hasNext)
throws IOException;
/**
* Called before the client closes a scanner.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param scannerId the scanner id
* @param s the scanner
* @throws IOException if an error occurred on the coprocessor
*/
public void preScannerClose(final CoprocessorEnvironment e,
final long scannerId)
final InternalScanner s)
throws IOException;
/**
* Called after the client closes a scanner.
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors
* @param e the environment provided by the region server
* @param scannerId the scanner id
* @param s the scanner
* @throws IOException if an error occurred on the coprocessor
*/
public void postScannerClose(final CoprocessorEnvironment e,
final long scannerId)
final InternalScanner s)
throws IOException;
}

View File

@ -47,6 +47,7 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -284,6 +285,14 @@ public class CoprocessorHost {
}
}
boolean shouldBypass() {
return bypass.getAndSet(false);
}
boolean shouldComplete() {
return complete.getAndSet(false);
}
/** @return the coprocessor environment version */
@Override
public int getVersion() {
@ -319,30 +328,14 @@ public class CoprocessorHost {
return new HTableWrapper(tableName);
}
/**
* @param key the key
* @return the value, or null if it does not exist
*/
@Override
public Object get(Object key) {
return vars.get(key);
public void complete() {
complete.set(true);
}
/**
* @param key the key
* @param value the value
*/
@Override
public void put(Object key, Object value) {
vars.put(key, value);
}
/**
* @param key the key
*/
@Override
public Object remove(Object key) {
return vars.remove(key);
public void bypass() {
bypass.set(true);
}
}
@ -355,8 +348,10 @@ public class CoprocessorHost {
HRegion region;
/** Ordered set of loaded coprocessors with lock */
final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock();
Set<Environment> coprocessors =
final Set<Environment> coprocessors =
new TreeSet<Environment>(new EnvironmentPriorityComparator());
final AtomicBoolean bypass = new AtomicBoolean(false);
final AtomicBoolean complete = new AtomicBoolean(false);
/**
* Constructor
@ -482,6 +477,7 @@ public class CoprocessorHost {
* @param priority priority
* @throws IOException Exception
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public void load(Class<?> implClass, Coprocessor.Priority priority)
throws IOException {
// create the instance
@ -581,6 +577,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.preOpen(env);
if (env.shouldComplete()) {
break;
}
}
} finally {
coprocessorLock.readLock().unlock();
@ -595,6 +594,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.postOpen(env);
if (env.shouldComplete()) {
break;
}
}
} finally {
coprocessorLock.readLock().unlock();
@ -610,7 +612,6 @@ public class CoprocessorHost {
coprocessorLock.writeLock().lock();
for (Environment env: coprocessors) {
env.impl.preClose(env, abortRequested);
env.shutdown();
}
} finally {
coprocessorLock.writeLock().unlock();
@ -642,6 +643,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.preCompact(env, willSplit);
if (env.shouldComplete()) {
break;
}
}
} finally {
coprocessorLock.readLock().unlock();
@ -657,6 +661,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.postCompact(env, willSplit);
if (env.shouldComplete()) {
break;
}
}
} finally {
coprocessorLock.readLock().unlock();
@ -671,6 +678,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.preFlush(env);
if (env.shouldComplete()) {
break;
}
}
} finally {
coprocessorLock.readLock().unlock();
@ -685,6 +695,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.postFlush(env);
if (env.shouldComplete()) {
break;
}
}
} finally {
coprocessorLock.readLock().unlock();
@ -699,6 +712,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.preSplit(env);
if (env.shouldComplete()) {
break;
}
}
} finally {
coprocessorLock.readLock().unlock();
@ -715,6 +731,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.postSplit(env, l, r);
if (env.shouldComplete()) {
break;
}
}
} finally {
coprocessorLock.readLock().unlock();
@ -727,17 +746,25 @@ public class CoprocessorHost {
* @param row the row key
* @param family the family
* @param result the result set from the region
* @return true if default processing should be bypassed
* @exception IOException Exception
*/
public void preGetClosestRowBefore(final byte[] row, final byte[] family)
throws IOException {
public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
final Result result) throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).preGetClosestRowBefore(env, row, family);
((RegionObserver)env.impl).preGetClosestRowBefore(env, row, family,
result);
bypass |= env.shouldBypass();
if (env.shouldComplete()) {
break;
}
}
}
return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
@ -747,20 +774,21 @@ public class CoprocessorHost {
* @param row the row key
* @param family the family
* @param result the result set from the region
* @return the result set to return to the client
* @exception IOException Exception
*/
public Result postGetClosestRowBefore(final byte[] row, final byte[] family,
Result result) throws IOException {
public void postGetClosestRowBefore(final byte[] row, final byte[] family,
final Result result) throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
result = ((RegionObserver)env.impl)
.postGetClosestRowBefore(env, row, family, result);
((RegionObserver)env.impl).postGetClosestRowBefore(env, row, family,
result);
if (env.shouldComplete()) {
break;
}
}
}
return result;
} finally {
coprocessorLock.readLock().unlock();
}
@ -768,18 +796,24 @@ public class CoprocessorHost {
/**
* @param get the Get request
* @return the possibly transformed Get object by coprocessor
* @return true if default processing should be bypassed
* @exception IOException Exception
*/
public Get preGet(Get get) throws IOException {
public boolean preGet(final Get get, final List<KeyValue> results)
throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
get = ((RegionObserver)env.impl).preGet(env, get);
((RegionObserver)env.impl).preGet(env, get, results);
bypass |= env.shouldBypass();
if (env.shouldComplete()) {
break;
}
}
}
return get;
return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
@ -791,16 +825,18 @@ public class CoprocessorHost {
* @return the possibly transformed result set to use
* @exception IOException Exception
*/
public List<KeyValue> postGet(final Get get, List<KeyValue> results)
throws IOException {
public void postGet(final Get get, final List<KeyValue> results)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
results = ((RegionObserver)env.impl).postGet(env, get, results);
((RegionObserver)env.impl).postGet(env, get, results);
if (env.shouldComplete()) {
break;
}
}
}
return results;
} finally {
coprocessorLock.readLock().unlock();
}
@ -808,18 +844,25 @@ public class CoprocessorHost {
/**
* @param get the Get request
* @param exists the result returned by the region server
* @return true or false to return to client if bypassing normal operation,
* or null otherwise
* @exception IOException Exception
*/
public Get preExists(Get get) throws IOException {
public Boolean preExists(final Get get) throws IOException {
try {
boolean bypass = false;
boolean exists = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
get = ((RegionObserver)env.impl).preExists(env, get);
exists = ((RegionObserver)env.impl).preExists(env, get, exists);
bypass |= env.shouldBypass();
if (env.shouldComplete()) {
break;
}
}
}
return get;
return bypass ? exists : null;
} finally {
coprocessorLock.readLock().unlock();
}
@ -837,7 +880,10 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
exists &= ((RegionObserver)env.impl).postExists(env, get, exists);
exists = ((RegionObserver)env.impl).postExists(env, get, exists);
if (env.shouldComplete()) {
break;
}
}
}
return exists;
@ -848,19 +894,25 @@ public class CoprocessorHost {
/**
* @param familyMap map of family to edits for the given family.
* @return the possibly transformed map to actually use
* @param writeToWAL true if the change should be written to the WAL
* @return true if default processing should be bypassed
* @exception IOException Exception
*/
public Map<byte[], List<KeyValue>> prePut(Map<byte[], List<KeyValue>> familyMap)
throws IOException {
public boolean prePut(final Map<byte[], List<KeyValue>> familyMap,
final boolean writeToWAL) throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
familyMap = ((RegionObserver)env.impl).prePut(env, familyMap);
((RegionObserver)env.impl).prePut(env, familyMap, writeToWAL);
bypass |= env.shouldBypass();
if (env.shouldComplete()) {
break;
}
}
}
return familyMap;
return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
@ -868,15 +920,19 @@ public class CoprocessorHost {
/**
* @param familyMap map of family to edits for the given family.
* @param writeToWAL true if the change should be written to the WAL
* @exception IOException Exception
*/
public void postPut(Map<byte[], List<KeyValue>> familyMap)
throws IOException {
public void postPut(final Map<byte[], List<KeyValue>> familyMap,
final boolean writeToWAL) throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).postPut(env, familyMap);
((RegionObserver)env.impl).postPut(env, familyMap, writeToWAL);
if (env.shouldComplete()) {
break;
}
}
}
} finally {
@ -886,19 +942,25 @@ public class CoprocessorHost {
/**
* @param familyMap map of family to edits for the given family.
* @return the possibly transformed map to actually use
* @param writeToWAL true if the change should be written to the WAL
* @return true if default processing should be bypassed
* @exception IOException Exception
*/
public Map<byte[], List<KeyValue>> preDelete(Map<byte[], List<KeyValue>> familyMap)
throws IOException {
public boolean preDelete(final Map<byte[], List<KeyValue>> familyMap,
final boolean writeToWAL) throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
familyMap = ((RegionObserver)env.impl).preDelete(env, familyMap);
((RegionObserver)env.impl).preDelete(env, familyMap, writeToWAL);
bypass |= env.shouldBypass();
if (env.shouldComplete()) {
break;
}
}
}
return familyMap;
return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
@ -906,15 +968,19 @@ public class CoprocessorHost {
/**
* @param familyMap map of family to edits for the given family.
* @param writeToWAL true if the change should be written to the WAL
* @exception IOException Exception
*/
public void postDelete(Map<byte[], List<KeyValue>> familyMap)
throws IOException {
public void postDelete(final Map<byte[], List<KeyValue>> familyMap,
final boolean writeToWAL) throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).postDelete(env, familyMap);
((RegionObserver)env.impl).postDelete(env, familyMap, writeToWAL);
if (env.shouldComplete()) {
break;
}
}
}
} finally {
@ -928,21 +994,29 @@ public class CoprocessorHost {
* @param qualifier column qualifier
* @param value the expected value
* @param put data to put if check succeeds
* @return true or false to return to client if default processing should
* be bypassed, or null otherwise
* @throws IOException e
*/
public Put preCheckAndPut(final byte [] row, final byte [] family,
public Boolean preCheckAndPut(final byte [] row, final byte [] family,
final byte [] qualifier, final byte [] value, Put put)
throws IOException
{
try {
boolean bypass = false;
boolean result = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
put = ((RegionObserver)env.impl).preCheckAndPut(env, row, family,
qualifier, value, put);
result = ((RegionObserver)env.impl).preCheckAndPut(env, row, family,
qualifier, value, put, result);
bypass |= env.shouldBypass();
if (env.shouldComplete()) {
break;
}
}
}
return put;
return bypass ? result : null;
} finally {
coprocessorLock.readLock().unlock();
}
@ -967,6 +1041,9 @@ public class CoprocessorHost {
if (env.impl instanceof RegionObserver) {
result = ((RegionObserver)env.impl).postCheckAndPut(env, row,
family, qualifier, value, put, result);
if (env.shouldComplete()) {
break;
}
}
}
return result;
@ -981,21 +1058,29 @@ public class CoprocessorHost {
* @param qualifier column qualifier
* @param value the expected value
* @param delete delete to commit if check succeeds
* @return true or false to return to client if default processing should
* be bypassed, or null otherwise
* @throws IOException e
*/
public Delete preCheckAndDelete(final byte [] row, final byte [] family,
public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
final byte [] qualifier, final byte [] value, Delete delete)
throws IOException
{
try {
boolean bypass = false;
boolean result = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
delete = ((RegionObserver)env.impl).preCheckAndDelete(env, row,
family, qualifier, value, delete);
result = ((RegionObserver)env.impl).preCheckAndDelete(env, row,
family, qualifier, value, delete, result);
bypass |= env.shouldBypass();
if (env.shouldComplete()) {
break;
}
}
}
return delete;
return bypass ? result : null;
} finally {
coprocessorLock.readLock().unlock();
}
@ -1020,6 +1105,9 @@ public class CoprocessorHost {
if (env.impl instanceof RegionObserver) {
result = ((RegionObserver)env.impl).postCheckAndDelete(env, row,
family, qualifier, value, delete, result);
if (env.shouldComplete()) {
break;
}
}
}
return result;
@ -1033,25 +1121,31 @@ public class CoprocessorHost {
* @param family column family
* @param qualifier column qualifier
* @param amount long amount to increment
* @param writeToWAL whether to write the increment to the WAL
* @return new amount to increment
* @param writeToWAL true if the change should be written to the WAL
* @return return value for client if default operation should be bypassed,
* or null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public long preIncrementColumnValue(final byte [] row, final byte [] family,
public Long preIncrementColumnValue(final byte [] row, final byte [] family,
final byte [] qualifier, long amount, final boolean writeToWAL)
throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
amount = ((RegionObserver)env.impl).preIncrementColumnValue(env,
row, family, qualifier, amount, writeToWAL);
row, family, qualifier, amount, writeToWAL);
bypass |= env.shouldBypass();
if (env.shouldComplete()) {
break;
}
}
}
return bypass ? amount : null;
} finally {
coprocessorLock.readLock().unlock();
}
return amount;
}
/**
@ -1059,7 +1153,7 @@ public class CoprocessorHost {
* @param family column family
* @param qualifier column qualifier
* @param amount long amount to increment
* @param writeToWAL whether to write the increment to the WAL
* @param writeToWAL true if the change should be written to the WAL
* @param result the result returned by incrementColumnValue
* @return the result to return to the client
* @throws IOException if an error occurred on the coprocessor
@ -1072,7 +1166,10 @@ public class CoprocessorHost {
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
result = ((RegionObserver)env.impl).postIncrementColumnValue(env,
row, family, qualifier, amount, writeToWAL, result);
row, family, qualifier, amount, writeToWAL, result);
if (env.shouldComplete()) {
break;
}
}
}
} finally {
@ -1083,155 +1180,202 @@ public class CoprocessorHost {
/**
* @param increment increment object
* @param writeToWAL whether to write the increment to the WAL
* @return new amount to increment
* @param writeToWAL true if the change should be written to the WAL
* @return result to return to client if default operation should be
* bypassed, null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public Increment preIncrement(Increment increment)
public Result preIncrement(Increment increment)
throws IOException {
try {
boolean bypass = false;
Result result = new Result();
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
increment = ((RegionObserver)env.impl).preIncrement(env, increment);
((RegionObserver)env.impl).preIncrement(env, increment, result);
bypass |= env.shouldBypass();
if (env.shouldComplete()) {
break;
}
}
}
return bypass ? result : null;
} finally {
coprocessorLock.readLock().unlock();
}
return increment;
}
/**
* @param increment increment object
* @param writeToWAL whether to write the increment to the WAL
* @param writeToWAL true if the change should be written to the WAL
* @param result the result returned by incrementColumnValue
* @return the result to return to the client
* @throws IOException if an error occurred on the coprocessor
*/
public Result postIncrement(final Increment increment, Result result)
public void postIncrement(final Increment increment, Result result)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
result = ((RegionObserver)env.impl).postIncrement(env, increment,
result);
((RegionObserver)env.impl).postIncrement(env, increment, result);
if (env.shouldComplete()) {
break;
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
return result;
}
/**
* @param scan the Scan specification
* @return scanner id to return to client if default operation should be
* bypassed, false otherwise
* @exception IOException Exception
*/
public Scan preScannerOpen(Scan scan) throws IOException {
public InternalScanner preScannerOpen(Scan scan) throws IOException {
try {
boolean bypass = false;
InternalScanner s = null;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
scan = ((RegionObserver)env.impl).preScannerOpen(env, scan);
s = ((RegionObserver)env.impl).preScannerOpen(env, scan, s);
bypass |= env.shouldBypass();
if (env.shouldComplete()) {
break;
}
}
}
return bypass ? s : null;
} finally {
coprocessorLock.readLock().unlock();
}
return scan;
}
/**
* @param scan the Scan specification
* @param scannerId the scanner id allocated by the region server
* @return the scanner instance to use
* @exception IOException Exception
*/
public void postScannerOpen(final Scan scan, long scannerId)
public InternalScanner postScannerOpen(final Scan scan, InternalScanner s)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).postScannerOpen(env, scan, scannerId);
s = ((RegionObserver)env.impl).postScannerOpen(env, scan, s);
if (env.shouldComplete()) {
break;
}
}
}
return s;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
* @param scannerId the scanner id
* @param s the scanner
* @param results the result set returned by the region server
* @return the possibly transformed result set to actually return
* @param limit the maximum number of results to return
* @return 'has next' indication to client if bypassing default behavior, or
* null otherwise
* @exception IOException Exception
*/
public void preScannerNext(final long scannerId) throws IOException {
public Boolean preScannerNext(final InternalScanner s,
final List<KeyValue> results, int limit) throws IOException {
try {
boolean bypass = false;
boolean hasNext = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).preScannerNext(env, scannerId);
hasNext = ((RegionObserver)env.impl).preScannerNext(env, s, results,
limit, hasNext);
bypass |= env.shouldBypass();
if (env.shouldComplete()) {
break;
}
}
}
return bypass ? hasNext : null;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
* @param scannerId the scanner id
* @param s the scanner
* @param results the result set returned by the region server
* @return the possibly transformed result set to actually return
* @param limit the maximum number of results to return
* @param hasMore
* @return 'has more' indication to give to client
* @exception IOException Exception
*/
public List<KeyValue> postScannerNext(final long scannerId,
List<KeyValue> results) throws IOException {
public boolean postScannerNext(final InternalScanner s,
final List<KeyValue> results, final int limit, boolean hasMore)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
results = ((RegionObserver)env.impl).postScannerNext(env, scannerId,
results);
hasMore = ((RegionObserver)env.impl).postScannerNext(env, s,
results, limit, hasMore);
if (env.shouldComplete()) {
break;
}
}
}
return results;
return hasMore;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
* @param scannerId the scanner id
* @param s the scanner
* @return true if default behavior should be bypassed, false otherwise
* @exception IOException Exception
*/
public void preScannerClose(final long scannerId)
public boolean preScannerClose(final InternalScanner s)
throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).preScannerClose(env, scannerId);
((RegionObserver)env.impl).preScannerClose(env, s);
bypass |= env.shouldBypass();
if (env.shouldComplete()) {
break;
}
}
}
return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
* @param scannerId the scanner id
* @param s the scanner
* @exception IOException Exception
*/
public void postScannerClose(final long scannerId)
public void postScannerClose(final InternalScanner s)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).postScannerClose(env, scannerId);
((RegionObserver)env.impl).postScannerClose(env, s);
if (env.shouldComplete()) {
break;
}
}
}
} finally {

View File

@ -1112,27 +1112,29 @@ public class HRegion implements HeapSize { // , Writable{
*/
public Result getClosestRowBefore(final byte [] row, final byte [] family)
throws IOException {
Result result = null;
if (coprocessorHost != null) {
Result result = new Result();
if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
return result;
}
}
// look across all the HStores for this region and determine what the
// closest key is across all column families, since the data may be sparse
KeyValue key = null;
checkRow(row);
startRegionOperation();
if (coprocessorHost != null) {
coprocessorHost.preGetClosestRowBefore(row, family);
}
try {
Store store = getStore(family);
KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
// get the closest key. (HStore.getRowKeyAtOrBefore can return null)
key = store.getRowKeyAtOrBefore(kv);
KeyValue key = store.getRowKeyAtOrBefore(kv);
Result result = null;
if (key != null) {
Get get = new Get(key.getRow());
get.addFamily(family);
result = get(get, null);
}
if (coprocessorHost != null) {
result = coprocessorHost.postGetClosestRowBefore(row, family, result);
coprocessorHost.postGetClosestRowBefore(row, family, result);
}
return result;
} finally {
@ -1150,8 +1152,7 @@ public class HRegion implements HeapSize { // , Writable{
* @return InternalScanner
* @throws IOException read exceptions
*/
public InternalScanner getScanner(Scan scan)
throws IOException {
public InternalScanner getScanner(Scan scan) throws IOException {
return getScanner(scan, null);
}
@ -1175,13 +1176,17 @@ public class HRegion implements HeapSize { // , Writable{
}
}
protected InternalScanner instantiateInternalScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
protected InternalScanner instantiateInternalScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
InternalScanner s = null;
if (coprocessorHost != null) {
coprocessorHost.preScannerOpen(scan);
s = coprocessorHost.preScannerOpen(scan);
}
if (s == null) {
s = new RegionScanner(scan, additionalScanners);
}
InternalScanner s = new RegionScanner(scan, additionalScanners);
if (coprocessorHost != null) {
coprocessorHost.postScannerOpen(scan, s.hashCode());
s = coprocessorHost.postScannerOpen(scan, s);
}
return s;
}
@ -1243,17 +1248,20 @@ public class HRegion implements HeapSize { // , Writable{
* @throws IOException
*/
public void delete(Map<byte[], List<KeyValue>> familyMap, boolean writeToWAL)
throws IOException {
throws IOException {
/* Run coprocessor pre hook outside of locks to avoid deadlock */
if (coprocessorHost != null) {
if (coprocessorHost.preDelete(familyMap, writeToWAL)) {
return;
}
}
long now = EnvironmentEdgeManager.currentTimeMillis();
byte [] byteNow = Bytes.toBytes(now);
boolean flush = false;
updatesLock.readLock().lock();
try {
if (coprocessorHost != null) {
familyMap = coprocessorHost.preDelete(familyMap);
}
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
@ -1318,7 +1326,7 @@ public class HRegion implements HeapSize { // , Writable{
flush = isFlushSize(memstoreSize.addAndGet(addedSize));
if (coprocessorHost != null) {
coprocessorHost.postDelete(familyMap);
coprocessorHost.postDelete(familyMap, writeToWAL);
}
} finally {
this.updatesLock.readLock().unlock();
@ -1456,15 +1464,36 @@ public class HRegion implements HeapSize { // , Writable{
return batchOp.retCodes;
}
@SuppressWarnings("unchecked")
private long doMiniBatchPut(BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
/* Run coprocessor pre hook outside of locks to avoid deadlock */
if (coprocessorHost != null) {
List<Pair<Put, Integer>> ops =
new ArrayList<Pair<Put, Integer>>(batchOp.operations.length);
for (int i = 0; i < batchOp.operations.length; i++) {
Pair<Put, Integer> nextPair = batchOp.operations[i];
Put put = nextPair.getFirst();
Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
if (coprocessorHost.prePut(familyMap, put.getWriteToWAL())) {
// pre hook says skip this Put
// adjust nextIndexToProcess if we skipped before it
if (batchOp.nextIndexToProcess > i) {
batchOp.nextIndexToProcess--;
}
continue;
}
ops.add(nextPair);
}
batchOp.operations = ops.toArray(new Pair[ops.size()]);
}
long now = EnvironmentEdgeManager.currentTimeMillis();
byte[] byteNow = Bytes.toBytes(now);
/** Keep track of the locks we hold so we can release them in finally clause */
List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
// reference family maps directly so coprocessors can mutate them if desired
Map<byte[],List<KeyValue>>[] familyMaps =
new Map[batchOp.operations.length];
Map<byte[],List<KeyValue>>[] familyMaps = new Map[batchOp.operations.length];
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex;
@ -1481,12 +1510,6 @@ public class HRegion implements HeapSize { // , Writable{
Integer providedLockId = nextPair.getSecond();
Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
// Check any loaded coprocessors
/* TODO: we should catch any throws coprocessor exceptions here to allow the
rest of the batch to continue. This means fixing HBASE-2898 */
if (coprocessorHost != null) {
familyMap = coprocessorHost.prePut(familyMap);
}
// store the family map reference to allow for mutations
familyMaps[lastIndexExclusive] = familyMap;
@ -1555,15 +1578,22 @@ public class HRegion implements HeapSize { // , Writable{
long addedSize = 0;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
addedSize += applyFamilyMapToMemstore(familyMaps[i]);
batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
}
// execute any coprocessor post-hooks
if (coprocessorHost != null) {
coprocessorHost.postDelete(familyMaps[i]);
// ------------------------------------
// STEP 5. Run coprocessor post hooks
// ------------------------------------
if (coprocessorHost != null) {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// only for successful puts
if (batchOp.retCodes[i] != OperationStatusCode.SUCCESS) continue;
Put p = batchOp.operations[i].getFirst();
coprocessorHost.postPut(familyMaps[i], p.getWriteToWAL());
}
}
success = true;
return addedSize;
} finally {
@ -1738,8 +1768,14 @@ public class HRegion implements HeapSize { // , Writable{
* @param writeToWAL if true, then we should write to the log
* @throws IOException
*/
private void put(Map<byte [], List<KeyValue>> familyMap,
boolean writeToWAL) throws IOException {
private void put(Map<byte [], List<KeyValue>> familyMap, boolean writeToWAL)
throws IOException {
/* run pre put hook outside of lock to avoid deadlock */
if (coprocessorHost != null) {
if (coprocessorHost.prePut(familyMap, writeToWAL)) {
return;
}
}
long now = EnvironmentEdgeManager.currentTimeMillis();
byte[] byteNow = Bytes.toBytes(now);
@ -1747,9 +1783,6 @@ public class HRegion implements HeapSize { // , Writable{
this.updatesLock.readLock().lock();
try {
if (coprocessorHost != null) {
familyMap = coprocessorHost.prePut(familyMap);
}
checkFamilies(familyMap.keySet());
updateKVTimestamps(familyMap.values(), byteNow);
// write/sync to WAL should happen before we touch memstore.
@ -1766,13 +1799,14 @@ public class HRegion implements HeapSize { // , Writable{
long addedSize = applyFamilyMapToMemstore(familyMap);
flush = isFlushSize(memstoreSize.addAndGet(addedSize));
if (coprocessorHost != null) {
coprocessorHost.postPut(familyMap);
}
} finally {
this.updatesLock.readLock().unlock();
}
if (coprocessorHost != null) {
coprocessorHost.postPut(familyMap, writeToWAL);
}
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@ -2350,6 +2384,14 @@ public class HRegion implements HeapSize { // , Writable{
public synchronized boolean next(List<KeyValue> outResults, int limit)
throws IOException {
if (coprocessorHost != null) {
Boolean result = coprocessorHost.preScannerNext((InternalScanner)this,
outResults, limit);
if (result != null) {
return result.booleanValue();
}
}
if (this.filterClosed) {
throw new UnknownScannerException("Scanner was closed (timed out?) " +
"after we renewed it. Could be caused by a very slow scanner " +
@ -2363,14 +2405,11 @@ public class HRegion implements HeapSize { // , Writable{
results.clear();
if (coprocessorHost != null) {
coprocessorHost.preScannerNext(hashCode());
}
boolean returnResult = nextInternal(limit);
if (coprocessorHost != null) {
results = coprocessorHost.postScannerNext(hashCode(), results);
returnResult = coprocessorHost.postScannerNext((InternalScanner)this,
results, limit, returnResult);
}
outResults.addAll(results);
@ -2416,8 +2455,10 @@ public class HRegion implements HeapSize { // , Writable{
do {
this.storeHeap.next(results, limit - results.size());
if (limit > 0 && results.size() == limit) {
if (this.filter != null && filter.hasFilterRow()) throw new IncompatibleFilterException(
if (this.filter != null && filter.hasFilterRow()) {
throw new IncompatibleFilterException(
"Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
}
return true; // we are expecting more yes, but also limited to how many we can return.
}
} while (Bytes.equals(currentRow, nextRow = peekRow()));
@ -2480,7 +2521,9 @@ public class HRegion implements HeapSize { // , Writable{
public synchronized void close() throws IOException {
if (coprocessorHost != null) {
coprocessorHost.preScannerClose(hashCode());
if (coprocessorHost.preScannerClose((InternalScanner)this)) {
return;
}
}
if (storeHeap != null) {
storeHeap.close();
@ -2488,7 +2531,7 @@ public class HRegion implements HeapSize { // , Writable{
}
this.filterClosed = true;
if (coprocessorHost != null) {
coprocessorHost.postScannerClose(hashCode());
coprocessorHost.postScannerClose((InternalScanner)this);
}
}
}
@ -3064,32 +3107,27 @@ public class HRegion implements HeapSize { // , Writable{
throws IOException {
Scan scan = new Scan(get);
List<KeyValue> results = null;
List<KeyValue> getResults = new ArrayList<KeyValue>();
List<KeyValue> results = new ArrayList<KeyValue>();
// pre-get CP hook
if (withCoprocessor && (coprocessorHost != null)) {
get = coprocessorHost.preGet(get);
if (coprocessorHost.preGet(get, results)) {
return results;
}
}
InternalScanner scanner = null;
try {
scanner = getScanner(scan);
scanner.next(getResults);
scanner.next(results);
} finally {
if (scanner != null)
scanner.close();
}
// append get results to pre-get results
if (results != null){
results.addAll(getResults);
}
else {
results = getResults;
}
// post-get CP hook
if (withCoprocessor && (coprocessorHost != null)) {
results = coprocessorHost.postGet(get, results);
coprocessorHost.postGet(get, results);
}
return results;

View File

@ -1599,7 +1599,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
try {
HRegion region = getRegion(regionName);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().preExists(get);
Boolean result = region.getCoprocessorHost().preExists(get);
if (result != null) {
return result.booleanValue();
}
}
Result r = region.get(get, getLockFromId(get.getLockId()));
boolean result = r != null && !r.isEmpty();
@ -1702,8 +1705,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
}
HRegion region = getRegion(regionName);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
value, put);
Boolean result = region.getCoprocessorHost()
.preCheckAndPut(row, family, qualifier, value, put);
if (result != null) {
return result.booleanValue();
}
}
boolean result = checkAndMutate(regionName, row, family, qualifier,
value, put, getLockFromId(put.getLockId()));
@ -1737,8 +1743,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
}
HRegion region = getRegion(regionName);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier,
value, delete);
Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
family, qualifier, value, delete);
if (result != null) {
return result.booleanValue();
}
}
boolean result = checkAndMutate(regionName, row, family, qualifier, value,
delete, getLockFromId(delete.getLockId()));
@ -2462,12 +2471,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
Increment incVal = increment;
Result resVal;
if (region.getCoprocessorHost() != null) {
incVal = region.getCoprocessorHost().preIncrement(incVal);
resVal = region.getCoprocessorHost().preIncrement(incVal);
if (resVal != null) {
return resVal;
}
}
resVal = region.increment(incVal, getLockFromId(increment.getLockId()),
increment.getWriteToWAL());
if (region.getCoprocessorHost() != null) {
resVal = region.getCoprocessorHost().postIncrement(incVal, resVal);
region.getCoprocessorHost().postIncrement(incVal, resVal);
}
return resVal;
} catch (IOException e) {
@ -2489,16 +2501,18 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
long amountVal = amount;
if (region.getCoprocessorHost() != null) {
amountVal = region.getCoprocessorHost().preIncrementColumnValue(row,
family, qualifier, amountVal, writeToWAL);
Long amountVal = region.getCoprocessorHost().preIncrementColumnValue(row,
family, qualifier, amount, writeToWAL);
if (amountVal != null) {
return amountVal.longValue();
}
}
long retval = region.incrementColumnValue(row, family, qualifier, amount,
writeToWAL);
writeToWAL);
if (region.getCoprocessorHost() != null) {
retval = region.getCoprocessorHost().postIncrementColumnValue(row,
family, qualifier, amountVal, writeToWAL, retval);
family, qualifier, amount, writeToWAL, retval);
}
return retval;
} catch (IOException e) {

View File

@ -34,11 +34,8 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import static org.junit.Assert.*;
/**
* A sample region observer that tests the RegionObserver interface.
* It works with TestRegionObserverInterface to provide the test case.
@ -59,22 +56,26 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
boolean hadPreIncrement = false;
boolean hadPostIncrement = false;
// Overriden RegionObserver methods
@Override
public Get preGet(CoprocessorEnvironment e, Get get) {
public void preGet(final CoprocessorEnvironment e, final Get get,
final List<KeyValue> results) throws IOException {
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(get);
assertNotNull(results);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
hadPreGet = true;
assertNotNull(e);
assertNotNull(e.getRegion());
}
return get;
}
@Override
public List<KeyValue> postGet(CoprocessorEnvironment e, Get get,
List<KeyValue> results) {
public void postGet(final CoprocessorEnvironment e, final Get get,
final List<KeyValue> results) {
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(get);
assertNotNull(results);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
boolean foundA = false;
@ -96,12 +97,14 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
assertTrue(foundC);
hadPostGet = true;
}
return results;
}
@Override
public Map<byte[], List<KeyValue>> prePut(CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) {
public void prePut(final CoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(familyMap);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
List<KeyValue> kvs = familyMap.get(TestRegionObserverInterface.A);
@ -121,12 +124,14 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
TestRegionObserverInterface.C));
hadPrePut = true;
}
return familyMap;
}
@Override
public void postPut(CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) {
public void postPut(final CoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(familyMap);
List<KeyValue> kvs = familyMap.get(TestRegionObserverInterface.A);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
@ -149,18 +154,23 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
}
@Override
public Map<byte[], List<KeyValue>> preDelete(CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) {
public void preDelete(final CoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(familyMap);
if (beforeDelete && e.getRegion().getTableDesc().getName().equals(
TestRegionObserverInterface.TEST_TABLE)) {
hadPreDeleted = true;
}
return familyMap;
}
@Override
public void postDelete(CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) {
public void postDelete(final CoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(familyMap);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
beforeDelete = false;
@ -170,7 +180,12 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
@Override
public void preGetClosestRowBefore(final CoprocessorEnvironment e,
final byte[] row, final byte[] family) {
final byte[] row, final byte[] family, final Result result)
throws IOException {
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(row);
assertNotNull(result);
if (beforeDelete && e.getRegion().getTableDesc().getName().equals(
TestRegionObserverInterface.TEST_TABLE)) {
hadPreGetClosestRowBefore = true;
@ -178,70 +193,35 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
}
@Override
public Result postGetClosestRowBefore(final CoprocessorEnvironment e,
final byte[] row, final byte[] family, Result result) {
public void postGetClosestRowBefore(final CoprocessorEnvironment e,
final byte[] row, final byte[] family, final Result result)
throws IOException {
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(row);
assertNotNull(result);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
hadPostGetClosestRowBefore = true;
}
return result;
}
@Override
public Scan preScannerOpen(CoprocessorEnvironment e, Scan scan) {
// not tested -- need to go through the RS to get here
return scan;
}
@Override
public void postScannerOpen(CoprocessorEnvironment e, Scan scan,
long scannerId) {
// not tested -- need to go through the RS to get here
}
@Override
public void preScannerNext(final CoprocessorEnvironment e,
final long scannerId) {
// not tested -- need to go through the RS to get here
}
@Override
public List<KeyValue> postScannerNext(final CoprocessorEnvironment e,
final long scannerId, List<KeyValue> results) {
// not tested -- need to go through the RS to get here
return results;
}
@Override
public void preScannerClose(final CoprocessorEnvironment e,
final long scannerId) {
// not tested -- need to go through the RS to get here
}
@Override
public void postScannerClose(final CoprocessorEnvironment e,
final long scannerId) {
// not tested -- need to go through the RS to get here
}
@Override
public Increment preIncrement(CoprocessorEnvironment e, Increment increment)
throws IOException {
public void preIncrement(final CoprocessorEnvironment e,
final Increment increment, final Result result) throws IOException {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE_2)) {
hadPreIncrement = true;
}
return increment;
}
@Override
public Result postIncrement(CoprocessorEnvironment e, Increment increment,
Result result) throws IOException {
public void postIncrement(final CoprocessorEnvironment e,
final Increment increment, final Result result) throws IOException {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE_2)) {
hadPostIncrement = true;
}
return result;
}
boolean hadPreGet() {

View File

@ -46,8 +46,9 @@ public class TestRegionObserverStacking extends TestCase {
public static class ObserverA extends BaseRegionObserverCoprocessor {
long id;
@Override
public void postPut(final CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) {
public void postPut(final CoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException {
id = System.currentTimeMillis();
try {
Thread.sleep(10);
@ -59,8 +60,9 @@ public class TestRegionObserverStacking extends TestCase {
public static class ObserverB extends BaseRegionObserverCoprocessor {
long id;
@Override
public void postPut(final CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) {
public void postPut(final CoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException {
id = System.currentTimeMillis();
try {
Thread.sleep(10);
@ -73,8 +75,9 @@ public class TestRegionObserverStacking extends TestCase {
long id;
@Override
public void postPut(final CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) {
public void postPut(final CoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException {
id = System.currentTimeMillis();
try {
Thread.sleep(10);