HBASE-3260 Coprocessors: Add explicit lifecycle management

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1050489 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Helmling 2010-12-17 20:50:34 +00:00
parent 1cb79368d3
commit 96e37125ba
8 changed files with 312 additions and 151 deletions

View File

@ -38,6 +38,7 @@ Release 0.91.0 - Unreleased
HBASE-3328 Added Admin API to specify explicit split points
HBASE-3345 Coprocessors: Allow observers to completely override base
function
HBASE-3260 Coprocessors: Add explicit lifecycle management
NEW FEATURES

View File

@ -51,43 +51,15 @@ public abstract class BaseEndpointCoprocessor implements Coprocessor,
}
@Override
public long getProtocolVersion(String arg0, long arg1) throws IOException {
public void start(CoprocessorEnvironment env) {
this.env = env;
}
@Override
public void stop(CoprocessorEnvironment env) { }
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return HBaseRPCProtocolVersion.versionID;
}
@Override
public void preOpen(CoprocessorEnvironment e) { }
/**
* It initializes the coprocessor resources. If you need to override this
* method, please remember to call super(e).
*/
@Override
public void postOpen(CoprocessorEnvironment e) {
setEnvironment(e);
}
@Override
public void preClose(CoprocessorEnvironment e, boolean abortRequested) { }
@Override
public void postClose(CoprocessorEnvironment e, boolean abortRequested) { }
@Override
public void preFlush(CoprocessorEnvironment e) { }
@Override
public void postFlush(CoprocessorEnvironment e) { }
@Override
public void preCompact(CoprocessorEnvironment e, boolean willSplit) { }
@Override
public void postCompact(CoprocessorEnvironment e, boolean willSplit) { }
@Override
public void preSplit(CoprocessorEnvironment e) { }
@Override
public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) { }
}

View File

@ -38,6 +38,11 @@ import java.io.IOException;
*/
public abstract class BaseRegionObserverCoprocessor implements Coprocessor,
RegionObserver {
@Override
public void start(CoprocessorEnvironment e) { }
@Override
public void stop(CoprocessorEnvironment e) { }
@Override
public void preOpen(CoprocessorEnvironment e) { }

View File

@ -16,13 +16,14 @@
package org.apache.hadoop.hbase.coprocessor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import java.io.IOException;
/**
* Coprocess interface.
*/
public interface Coprocessor {
public static final int VERSION = 1;
/**
* Installation priority. Coprocessors will be executed in sequence
* by the order of coprocessor priority.
@ -44,77 +45,20 @@ public interface Coprocessor {
}
}
/**
* Lifecycle state of a given coprocessor instance.
*/
public enum State {
UNINSTALLED,
INSTALLED,
STARTING,
ACTIVE,
STOPPING,
STOPPED
}
// Interface
/**
* Called before the region is reported as open to the master.
* @param e the environment provided by the region server
*/
public void preOpen(final CoprocessorEnvironment e);
void start(CoprocessorEnvironment env) throws IOException;
/**
* Called after the region is reported as open to the master.
* @param e the environment provided by the region server
*/
public void postOpen(final CoprocessorEnvironment e);
/**
* Called before the memstore is flushed to disk.
* @param e the environment provided by the region server
*/
public void preFlush(final CoprocessorEnvironment e);
/**
* Called after the memstore is flushed to disk.
* @param e the environment provided by the region server
*/
public void postFlush(final CoprocessorEnvironment e);
/**
* Called before compaction.
* @param e the environment provided by the region server
* @param willSplit true if compaction will result in a split, false
* otherwise
*/
public void preCompact(final CoprocessorEnvironment e,
final boolean willSplit);
/**
* Called after compaction.
* @param e the environment provided by the region server
* @param willSplit true if compaction will result in a split, false
* otherwise
*/
public void postCompact(final CoprocessorEnvironment e,
final boolean willSplit);
/**
* Called before the region is split.
* @param e the environment provided by the region server
* (e.getRegion() returns the parent region)
*/
public void preSplit(final CoprocessorEnvironment e);
/**
* Called after the region is split.
* @param e the environment provided by the region server
* (e.getRegion() returns the parent region)
* @param l the left daughter region
* @param r the right daughter region
*/
public void postSplit(final CoprocessorEnvironment e, final HRegion l,
final HRegion r);
/**
* Called before the region is reported as closed to the master.
* @param e the environment provided by the region server
* @param abortRequested true if the region server is aborting
*/
public void preClose(final CoprocessorEnvironment e, boolean abortRequested);
/**
* Called after the region is reported as closed to the master.
* @param e the environment provided by the region server
* @param abortRequested true if the region server is aborting
*/
public void postClose(final CoprocessorEnvironment e, boolean abortRequested);
void stop(CoprocessorEnvironment env) throws IOException;
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import java.io.IOException;
@ -35,7 +36,80 @@ import java.io.IOException;
* Coprocessors implement this interface to observe and mediate client actions
* on the region.
*/
public interface RegionObserver {
public interface RegionObserver extends Coprocessor {
/**
* Called before the region is reported as open to the master.
* @param e the environment provided by the region server
*/
public void preOpen(final CoprocessorEnvironment e);
/**
* Called after the region is reported as open to the master.
* @param e the environment provided by the region server
*/
public void postOpen(final CoprocessorEnvironment e);
/**
* Called before the memstore is flushed to disk.
* @param e the environment provided by the region server
*/
public void preFlush(final CoprocessorEnvironment e);
/**
* Called after the memstore is flushed to disk.
* @param e the environment provided by the region server
*/
public void postFlush(final CoprocessorEnvironment e);
/**
* Called before compaction.
* @param e the environment provided by the region server
* @param willSplit true if compaction will result in a split, false
* otherwise
*/
public void preCompact(final CoprocessorEnvironment e,
final boolean willSplit);
/**
* Called after compaction.
* @param e the environment provided by the region server
* @param willSplit true if compaction will result in a split, false
* otherwise
*/
public void postCompact(final CoprocessorEnvironment e,
final boolean willSplit);
/**
* Called before the region is split.
* @param e the environment provided by the region server
* (e.getRegion() returns the parent region)
*/
public void preSplit(final CoprocessorEnvironment e);
/**
* Called after the region is split.
* @param e the environment provided by the region server
* (e.getRegion() returns the parent region)
* @param l the left daughter region
* @param r the right daughter region
*/
public void postSplit(final CoprocessorEnvironment e, final HRegion l,
final HRegion r);
/**
* Called before the region is reported as closed to the master.
* @param e the environment provided by the region server
* @param abortRequested true if the region server is aborting
*/
public void preClose(final CoprocessorEnvironment e, boolean abortRequested);
/**
* Called after the region is reported as closed to the master.
* @param e the environment provided by the region server
* @param abortRequested true if the region server is aborting
*/
public void postClose(final CoprocessorEnvironment e, boolean abortRequested);
/**
* Called before a client makes a GetClosestRowBefore request.

View File

@ -9,7 +9,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -61,7 +61,7 @@ public class CoprocessorHost {
* Environment priority comparator.
* Coprocessors are chained in sorted order.
*/
static class EnvironmentPriorityComparator implements Comparator<Environment> {
class EnvironmentPriorityComparator implements Comparator<Environment> {
public int compare(Environment env1, Environment env2) {
if (env1.priority.intValue() < env2.priority.intValue()) {
return -1;
@ -257,6 +257,8 @@ public class CoprocessorHost {
Map<Object,Object> vars = new ConcurrentHashMap<Object,Object>();
/** Chaining priority */
Coprocessor.Priority priority = Coprocessor.Priority.USER;
/** Current coprocessor state */
Coprocessor.State state = Coprocessor.State.UNINSTALLED;
/** Accounting for tables opened by the coprocessor */
List<HTableInterface> openTables =
Collections.synchronizedList(new ArrayList<HTableInterface>());
@ -269,10 +271,40 @@ public class CoprocessorHost {
public Environment(final Coprocessor impl, Coprocessor.Priority priority) {
this.impl = impl;
this.priority = priority;
state = Coprocessor.State.INSTALLED;
}
/** Initialize the environment */
void startup() {
if (state == Coprocessor.State.INSTALLED ||
state == Coprocessor.State.STOPPED) {
state = Coprocessor.State.STARTING;
try {
impl.start(this);
state = Coprocessor.State.ACTIVE;
} catch (IOException ioe) {
LOG.error("Error starting coprocessor "+impl.getClass().getName(), ioe);
}
} else {
LOG.warn("Not starting coprocessor "+impl.getClass().getName()+
" because not inactive (state="+state.toString()+")");
}
}
/** Clean up the environment */
void shutdown() {
if (state == Coprocessor.State.ACTIVE) {
state = Coprocessor.State.STOPPING;
try {
impl.stop(this);
state = Coprocessor.State.STOPPED;
} catch (IOException ioe) {
LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
}
} else {
LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
" because not active (state="+state.toString()+")");
}
// clean up any table references
for (HTableInterface table: openTables) {
try {
@ -286,11 +318,15 @@ public class CoprocessorHost {
}
boolean shouldBypass() {
return bypass.getAndSet(false);
boolean current = bypass.get();
bypass.set(false);
return current;
}
boolean shouldComplete() {
return complete.getAndSet(false);
boolean current = complete.get();
complete.set(false);
return current;
}
/** @return the coprocessor environment version */
@ -350,13 +386,21 @@ public class CoprocessorHost {
final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock();
final Set<Environment> coprocessors =
new TreeSet<Environment>(new EnvironmentPriorityComparator());
final AtomicBoolean bypass = new AtomicBoolean(false);
final AtomicBoolean complete = new AtomicBoolean(false);
static final ThreadLocal<Boolean> bypass = new ThreadLocal<Boolean>() {
@Override protected Boolean initialValue() {
return Boolean.FALSE;
}
};
static final ThreadLocal<Boolean> complete = new ThreadLocal<Boolean>() {
@Override protected Boolean initialValue() {
return Boolean.FALSE;
}
};
/**
* Constructor
* @param server the regionServer
* @param region the region
* @param rsServices an interface provide access to region server facilities
* @param conf the configuration
*/
public CoprocessorHost(final HRegion region,
@ -493,6 +537,7 @@ public class CoprocessorHost {
}
// create the environment
Environment env = new Environment(impl, priority);
env.startup();
// Check if it's an Endpoint.
// Due to current dynamic protocol design, Endpoint
@ -576,9 +621,11 @@ public class CoprocessorHost {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.preOpen(env);
if (env.shouldComplete()) {
break;
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).preOpen(env);
if (env.shouldComplete()) {
break;
}
}
}
} finally {
@ -593,9 +640,11 @@ public class CoprocessorHost {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.postOpen(env);
if (env.shouldComplete()) {
break;
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).postOpen(env);
if (env.shouldComplete()) {
break;
}
}
}
} finally {
@ -611,7 +660,9 @@ public class CoprocessorHost {
try {
coprocessorLock.writeLock().lock();
for (Environment env: coprocessors) {
env.impl.preClose(env, abortRequested);
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).preClose(env, abortRequested);
}
}
} finally {
coprocessorLock.writeLock().unlock();
@ -626,7 +677,9 @@ public class CoprocessorHost {
try {
coprocessorLock.writeLock().lock();
for (Environment env: coprocessors) {
env.impl.postClose(env, abortRequested);
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).postClose(env, abortRequested);
}
env.shutdown();
}
} finally {
@ -642,9 +695,11 @@ public class CoprocessorHost {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.preCompact(env, willSplit);
if (env.shouldComplete()) {
break;
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).preCompact(env, willSplit);
if (env.shouldComplete()) {
break;
}
}
}
} finally {
@ -660,9 +715,11 @@ public class CoprocessorHost {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.postCompact(env, willSplit);
if (env.shouldComplete()) {
break;
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).postCompact(env, willSplit);
if (env.shouldComplete()) {
break;
}
}
}
} finally {
@ -677,9 +734,11 @@ public class CoprocessorHost {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.preFlush(env);
if (env.shouldComplete()) {
break;
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).preFlush(env);
if (env.shouldComplete()) {
break;
}
}
}
} finally {
@ -694,9 +753,11 @@ public class CoprocessorHost {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.postFlush(env);
if (env.shouldComplete()) {
break;
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).postFlush(env);
if (env.shouldComplete()) {
break;
}
}
}
} finally {
@ -711,9 +772,11 @@ public class CoprocessorHost {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.preSplit(env);
if (env.shouldComplete()) {
break;
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).preSplit(env);
if (env.shouldComplete()) {
break;
}
}
}
} finally {
@ -730,9 +793,11 @@ public class CoprocessorHost {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.postSplit(env, l, r);
if (env.shouldComplete()) {
break;
if (env.impl instanceof RegionObserver) {
((RegionObserver)env.impl).postSplit(env, l, r);
if (env.shouldComplete()) {
break;
}
}
}
} finally {
@ -1180,7 +1245,6 @@ public class CoprocessorHost {
/**
* @param increment increment object
* @param writeToWAL true if the change should be written to the WAL
* @return result to return to client if default operation should be
* bypassed, null otherwise
* @throws IOException if an error occurred on the coprocessor
@ -1208,7 +1272,6 @@ public class CoprocessorHost {
/**
* @param increment increment object
* @param writeToWAL true if the change should be written to the WAL
* @param result the result returned by incrementColumnValue
* @throws IOException if an error occurred on the coprocessor
*/
@ -1257,7 +1320,7 @@ public class CoprocessorHost {
/**
* @param scan the Scan specification
* @param scannerId the scanner id allocated by the region server
* @param s the scanner
* @return the scanner instance to use
* @exception IOException Exception
*/

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -45,6 +46,16 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
boolean beforeDelete = true;
boolean scannerOpened = false;
boolean hadPreOpen;
boolean hadPostOpen;
boolean hadPreClose;
boolean hadPostClose;
boolean hadPreFlush;
boolean hadPostFlush;
boolean hadPreSplit;
boolean hadPostSplit;
boolean hadPreCompact;
boolean hadPostCompact;
boolean hadPreGet = false;
boolean hadPostGet = false;
boolean hadPrePut = false;
@ -56,6 +67,76 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
boolean hadPreIncrement = false;
boolean hadPostIncrement = false;
@Override
public void preOpen(CoprocessorEnvironment e) {
hadPreOpen = true;
}
@Override
public void postOpen(CoprocessorEnvironment e) {
hadPostOpen = true;
}
public boolean wasOpened() {
return hadPreOpen && hadPostOpen;
}
@Override
public void preClose(CoprocessorEnvironment e, boolean abortRequested) {
hadPreClose = true;
}
@Override
public void postClose(CoprocessorEnvironment e, boolean abortRequested) {
hadPostClose = true;
}
public boolean wasClosed() {
return hadPreClose && hadPostClose;
}
@Override
public void preFlush(CoprocessorEnvironment e) {
hadPreFlush = true;
}
@Override
public void postFlush(CoprocessorEnvironment e) {
hadPostFlush = true;
}
public boolean wasFlushed() {
return hadPreFlush && hadPostFlush;
}
@Override
public void preSplit(CoprocessorEnvironment e) {
hadPreSplit = true;
}
@Override
public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) {
hadPostSplit = true;
}
public boolean wasSplit() {
return hadPreSplit && hadPostSplit;
}
@Override
public void preCompact(CoprocessorEnvironment e, boolean willSplit) {
hadPreCompact = true;
}
@Override
public void postCompact(CoprocessorEnvironment e, boolean willSplit) {
hadPostCompact = true;
}
public boolean wasCompacted() {
return hadPreCompact && hadPostCompact;
}
@Override
public void preGet(final CoprocessorEnvironment e, final Get get,
final List<KeyValue> results) throws IOException {

View File

@ -49,8 +49,10 @@ public class TestCoprocessorInterface extends HBaseTestCase {
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
public static class CoprocessorImpl implements Coprocessor {
public static class CoprocessorImpl extends BaseRegionObserverCoprocessor {
private boolean startCalled;
private boolean stopCalled;
private boolean preOpenCalled;
private boolean postOpenCalled;
private boolean preCloseCalled;
@ -62,6 +64,16 @@ public class TestCoprocessorInterface extends HBaseTestCase {
private boolean preSplitCalled;
private boolean postSplitCalled;
@Override
public void start(CoprocessorEnvironment e) {
startCalled = true;
}
@Override
public void stop(CoprocessorEnvironment e) {
stopCalled = true;
}
@Override
public void preOpen(CoprocessorEnvironment e) {
preOpenCalled = true;
@ -103,22 +115,24 @@ public class TestCoprocessorInterface extends HBaseTestCase {
postSplitCalled = true;
}
boolean wasStarted() {
return startCalled;
}
boolean wasStopped() {
return stopCalled;
}
boolean wasOpened() {
return (preOpenCalled && postOpenCalled);
}
boolean wasClosed() {
return (preCloseCalled && postCloseCalled);
}
boolean wasFlushed() {
return (preFlushCalled && postFlushCalled);
}
boolean wasCompacted() {
return (preCompactCalled && postCompactCalled);
}
boolean wasSplit() {
return (preSplitCalled && postSplitCalled);
}
@ -145,6 +159,8 @@ public class TestCoprocessorInterface extends HBaseTestCase {
Coprocessor c = region.getCoprocessorHost()
.findCoprocessor(CoprocessorImpl.class.getName());
assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
assertTrue(((CoprocessorImpl)c).wasOpened());
assertTrue(((CoprocessorImpl)c).wasClosed());
assertTrue(((CoprocessorImpl)c).wasFlushed());
@ -156,6 +172,8 @@ public class TestCoprocessorInterface extends HBaseTestCase {
regions[i].getLog().closeAndDelete();
c = region.getCoprocessorHost()
.findCoprocessor(CoprocessorImpl.class.getName());
assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
assertTrue(((CoprocessorImpl)c).wasOpened());
assertTrue(((CoprocessorImpl)c).wasClosed());
assertTrue(((CoprocessorImpl)c).wasCompacted());
@ -205,6 +223,9 @@ public class TestCoprocessorInterface extends HBaseTestCase {
host.load(implClass, Priority.USER);
Coprocessor c = host.findCoprocessor(implClass.getName());
assertNotNull(c);
// Here we have to call pre and postOpen explicitly.
host.preOpen();
host.postOpen();