getRegionObserver() {
+ return Optional.of(this);
+ }
+
/**
* Create the constraint processor.
*
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseEnvironment.java
new file mode 100644
index 00000000000..a491d609a19
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseEnvironment.java
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTableWrapper;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Encapsulation of the environment of each coprocessor
+ */
+@InterfaceAudience.Private
+public class BaseEnvironment implements CoprocessorEnvironment {
+ private static final Log LOG = LogFactory.getLog(BaseEnvironment.class);
+
+ /** The coprocessor */
+ public C impl;
+ /** Chaining priority */
+ protected int priority = Coprocessor.PRIORITY_USER;
+ /** Current coprocessor state */
+ Coprocessor.State state = Coprocessor.State.UNINSTALLED;
+ /** Accounting for tables opened by the coprocessor */
+ protected List openTables =
+ Collections.synchronizedList(new ArrayList());
+ private int seq;
+ private Configuration conf;
+ private ClassLoader classLoader;
+
+ /**
+ * Constructor
+ * @param impl the coprocessor instance
+ * @param priority chaining priority
+ */
+ public BaseEnvironment(final C impl, final int priority,
+ final int seq, final Configuration conf) {
+ this.impl = impl;
+ this.classLoader = impl.getClass().getClassLoader();
+ this.priority = priority;
+ this.state = Coprocessor.State.INSTALLED;
+ this.seq = seq;
+ this.conf = conf;
+ }
+
+ /** Initialize the environment */
+ @Override
+ public void startup() throws IOException {
+ if (state == Coprocessor.State.INSTALLED ||
+ state == Coprocessor.State.STOPPED) {
+ state = Coprocessor.State.STARTING;
+ Thread currentThread = Thread.currentThread();
+ ClassLoader hostClassLoader = currentThread.getContextClassLoader();
+ try {
+ currentThread.setContextClassLoader(this.getClassLoader());
+ impl.start(this);
+ state = Coprocessor.State.ACTIVE;
+ } finally {
+ currentThread.setContextClassLoader(hostClassLoader);
+ }
+ } else {
+ LOG.warn("Not starting coprocessor " + impl.getClass().getName() +
+ " because not inactive (state=" + state.toString() + ")");
+ }
+ }
+
+ /** Clean up the environment */
+ @Override
+ public void shutdown() {
+ if (state == Coprocessor.State.ACTIVE) {
+ state = Coprocessor.State.STOPPING;
+ Thread currentThread = Thread.currentThread();
+ ClassLoader hostClassLoader = currentThread.getContextClassLoader();
+ try {
+ currentThread.setContextClassLoader(this.getClassLoader());
+ impl.stop(this);
+ state = Coprocessor.State.STOPPED;
+ } catch (IOException ioe) {
+ LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
+ } finally {
+ currentThread.setContextClassLoader(hostClassLoader);
+ }
+ } else {
+ LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
+ " because not active (state="+state.toString()+")");
+ }
+ synchronized (openTables) {
+ // clean up any table references
+ for (Table table: openTables) {
+ try {
+ ((HTableWrapper)table).internalClose();
+ } catch (IOException e) {
+ // nothing can be done here
+ LOG.warn("Failed to close " +
+ table.getName(), e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public C getInstance() {
+ return impl;
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+
+ @Override
+ public int getPriority() {
+ return priority;
+ }
+
+ @Override
+ public int getLoadSequence() {
+ return seq;
+ }
+
+ /** @return the coprocessor environment version */
+ @Override
+ public int getVersion() {
+ return Coprocessor.VERSION;
+ }
+
+ /** @return the HBase release */
+ @Override
+ public String getHBaseVersion() {
+ return VersionInfo.getVersion();
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
+ * Open a table from within the Coprocessor environment
+ * @param tableName the table name
+ * @return an interface for manipulating the table
+ * @exception IOException Exception
+ */
+ @Override
+ public Table getTable(TableName tableName) throws IOException {
+ return this.getTable(tableName, null);
+ }
+
+ /**
+ * Open a table from within the Coprocessor environment
+ * @param tableName the table name
+ * @return an interface for manipulating the table
+ * @exception IOException Exception
+ */
+ @Override
+ public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
+ return HTableWrapper.createWrapper(openTables, tableName, this, pool);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java
index 5886715746a..df3ed233452 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java
@@ -20,11 +20,11 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.Optional;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
@@ -46,19 +46,19 @@ import com.google.protobuf.Service;
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
-public abstract class BaseRowProcessorEndpoint
-extends RowProcessorService implements CoprocessorService, Coprocessor {
+public abstract class BaseRowProcessorEndpoint
+extends RowProcessorService implements RegionCoprocessor {
private RegionCoprocessorEnvironment env;
/**
* Pass a processor to region to process multiple rows atomically.
- *
+ *
* The RowProcessor implementations should be the inner classes of your
* RowProcessorEndpoint. This way the RowProcessor can be class-loaded with
* the Coprocessor endpoint together.
*
* See {@code TestRowProcessorEndpoint} for example.
*
- * The request contains information for constructing processor
+ * The request contains information for constructing processor
* (see {@link #constructRowProcessorFromRequest}. The processor object defines
* the read-modify-write procedure.
*/
@@ -83,8 +83,8 @@ extends RowProcessorService implements CoprocessorService, Coprocessor {
}
@Override
- public Service getService() {
- return this;
+ public Optional getService() {
+ return Optional.of(this);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java
index e891cc0fda9..25e6522018e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java
@@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBul
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
-public interface BulkLoadObserver extends Coprocessor {
+public interface BulkLoadObserver {
/**
* Called as part of SecureBulkLoadEndpoint.prepareBulkLoad() RPC call.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
index 27ac33a96b6..da07c40a698 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
@@ -25,12 +25,13 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -44,22 +45,22 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTableWrapper;
-import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
import org.apache.hadoop.hbase.util.SortedList;
-import org.apache.hadoop.hbase.util.VersionInfo;
/**
* Provides the common setup framework and runtime services for coprocessor
* invocation from HBase services.
- * @param the specific environment extension that a concrete implementation
+ * @param type of specific coprocessor this host will handle
+ * @param type of specific coprocessor environment this host requires.
* provides
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
-public abstract class CoprocessorHost {
+public abstract class CoprocessorHost> {
public static final String REGION_COPROCESSOR_CONF_KEY =
"hbase.coprocessor.region.classes";
public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
@@ -81,7 +82,8 @@ public abstract class CoprocessorHost {
private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
protected Abortable abortable;
/** Ordered set of loaded coprocessors with lock */
- protected SortedList coprocessors = new SortedList<>(new EnvironmentPriorityComparator());
+ protected final SortedList coprocEnvironments =
+ new SortedList<>(new EnvironmentPriorityComparator());
protected Configuration conf;
// unique file prefix to use for local copies of jars when classloading
protected String pathPrefix;
@@ -96,7 +98,7 @@ public abstract class CoprocessorHost {
* Not to be confused with the per-object _coprocessors_ (above),
* coprocessorNames is static and stores the set of all coprocessors ever
* loaded by any thread in this JVM. It is strictly additive: coprocessors are
- * added to coprocessorNames, by loadInstance() but are never removed, since
+ * added to coprocessorNames, by checkAndLoadInstance() but are never removed, since
* the intention is to preserve a history of all loaded coprocessors for
* diagnosis in case of server crash (HBASE-4014).
*/
@@ -118,7 +120,7 @@ public abstract class CoprocessorHost {
*/
public Set getCoprocessors() {
Set returnValue = new TreeSet<>();
- for (CoprocessorEnvironment e: coprocessors) {
+ for (E e: coprocEnvironments) {
returnValue.add(e.getInstance().getClass().getSimpleName());
}
return returnValue;
@@ -135,7 +137,7 @@ public abstract class CoprocessorHost {
return;
}
- Class> implClass = null;
+ Class> implClass;
// load default coprocessors from configure file
String[] defaultCPClasses = conf.getStrings(confKey);
@@ -156,10 +158,13 @@ public abstract class CoprocessorHost {
implClass = cl.loadClass(className);
// Add coprocessors as we go to guard against case where a coprocessor is specified twice
// in the configuration
- this.coprocessors.add(loadInstance(implClass, priority, conf));
- LOG.info("System coprocessor " + className + " was loaded " +
- "successfully with priority (" + priority + ").");
- ++priority;
+ E env = checkAndLoadInstance(implClass, priority, conf);
+ if (env != null) {
+ this.coprocEnvironments.add(env);
+ LOG.info(
+ "System coprocessor " + className + " was loaded " + "successfully with priority (" + priority + ").");
+ ++priority;
+ }
} catch (Throwable t) {
// We always abort if system coprocessors cannot be loaded
abortServer(className, t);
@@ -196,7 +201,7 @@ public abstract class CoprocessorHost {
*/
public E load(Path path, String className, int priority,
Configuration conf, String[] includedClassPrefixes) throws IOException {
- Class> implClass = null;
+ Class> implClass;
LOG.debug("Loading coprocessor class " + className + " with path " +
path + " and priority " + priority);
@@ -223,7 +228,7 @@ public abstract class CoprocessorHost {
try{
// switch temporarily to the thread classloader for custom CP
currentThread.setContextClassLoader(cl);
- E cpInstance = loadInstance(implClass, priority, conf);
+ E cpInstance = checkAndLoadInstance(implClass, priority, conf);
return cpInstance;
} finally {
// restore the fresh (host) classloader
@@ -231,16 +236,11 @@ public abstract class CoprocessorHost {
}
}
- /**
- * @param implClass Implementation class
- * @param priority priority
- * @param conf configuration
- * @throws java.io.IOException Exception
- */
- public void load(Class> implClass, int priority, Configuration conf)
+ @VisibleForTesting
+ public void load(Class extends C> implClass, int priority, Configuration conf)
throws IOException {
- E env = loadInstance(implClass, priority, conf);
- coprocessors.add(env);
+ E env = checkAndLoadInstance(implClass, priority, conf);
+ coprocEnvironments.add(env);
}
/**
@@ -249,29 +249,22 @@ public abstract class CoprocessorHost {
* @param conf configuration
* @throws java.io.IOException Exception
*/
- public E loadInstance(Class> implClass, int priority, Configuration conf)
+ public E checkAndLoadInstance(Class> implClass, int priority, Configuration conf)
throws IOException {
- if (!Coprocessor.class.isAssignableFrom(implClass)) {
- throw new IOException("Configured class " + implClass.getName() + " must implement "
- + Coprocessor.class.getName() + " interface ");
- }
-
// create the instance
- Coprocessor impl;
- Object o = null;
+ C impl;
try {
- o = implClass.newInstance();
- impl = (Coprocessor)o;
- } catch (InstantiationException e) {
- throw new IOException(e);
- } catch (IllegalAccessException e) {
+ impl = checkAndGetInstance(implClass);
+ if (impl == null) {
+ LOG.error("Cannot load coprocessor " + implClass.getSimpleName());
+ return null;
+ }
+ } catch (InstantiationException|IllegalAccessException e) {
throw new IOException(e);
}
// create the environment
- E env = createEnvironment(implClass, impl, priority, loadSequence.incrementAndGet(), conf);
- if (env instanceof Environment) {
- ((Environment)env).startup();
- }
+ E env = createEnvironment(impl, priority, loadSequence.incrementAndGet(), conf);
+ env.startup();
// HBASE-4014: maintain list of loaded coprocessors for later crash analysis
// if server (master or regionserver) aborts.
coprocessorNames.add(implClass.getName());
@@ -281,28 +274,30 @@ public abstract class CoprocessorHost {
/**
* Called when a new Coprocessor class is loaded
*/
- public abstract E createEnvironment(Class> implClass, Coprocessor instance,
- int priority, int sequence, Configuration conf);
+ public abstract E createEnvironment(C instance, int priority, int sequence, Configuration conf);
- public void shutdown(CoprocessorEnvironment e) {
- if (e instanceof Environment) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
- }
- ((Environment)e).shutdown();
- } else {
- LOG.warn("Shutdown called on unknown environment: "+
- e.getClass().getName());
+ /**
+ * Called when a new Coprocessor class needs to be loaded. Checks if type of the given class
+ * is what the corresponding host implementation expects. If it is of correct type, returns an
+ * instance of the coprocessor to be loaded. If not, returns null.
+ * If an exception occurs when trying to create instance of a coprocessor, it's passed up and
+ * eventually results into server aborting.
+ */
+ public abstract C checkAndGetInstance(Class> implClass)
+ throws InstantiationException, IllegalAccessException;
+
+ public void shutdown(E e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
}
+ e.shutdown();
}
/**
- * Find a coprocessor implementation by class name
- * @param className the class name
- * @return the coprocessor, or null if not found
+ * Find coprocessors by full class name or simple name.
*/
- public Coprocessor findCoprocessor(String className) {
- for (E env: coprocessors) {
+ public C findCoprocessor(String className) {
+ for (E env: coprocEnvironments) {
if (env.getInstance().getClass().getName().equals(className) ||
env.getInstance().getClass().getSimpleName().equals(className)) {
return env.getInstance();
@@ -311,16 +306,26 @@ public abstract class CoprocessorHost {
return null;
}
+ @VisibleForTesting
+ public T findCoprocessor(Class cls) {
+ for (E env: coprocEnvironments) {
+ if (cls.isAssignableFrom(env.getInstance().getClass())) {
+ return (T) env.getInstance();
+ }
+ }
+ return null;
+ }
+
/**
* Find list of coprocessors that extend/implement the given class/interface
* @param cls the class/interface to look for
* @return the list of coprocessors, or null if not found
*/
- public List findCoprocessors(Class cls) {
+ public List findCoprocessors(Class cls) {
ArrayList ret = new ArrayList<>();
- for (E env: coprocessors) {
- Coprocessor cp = env.getInstance();
+ for (E env: coprocEnvironments) {
+ C cp = env.getInstance();
if(cp != null) {
if (cls.isAssignableFrom(cp.getClass())) {
@@ -331,33 +336,14 @@ public abstract class CoprocessorHost {
return ret;
}
- /**
- * Find list of CoprocessorEnvironment that extend/implement the given class/interface
- * @param cls the class/interface to look for
- * @return the list of CoprocessorEnvironment, or null if not found
- */
- public List findCoprocessorEnvironment(Class> cls) {
- ArrayList ret = new ArrayList<>();
-
- for (E env: coprocessors) {
- Coprocessor cp = env.getInstance();
-
- if(cp != null) {
- if (cls.isAssignableFrom(cp.getClass())) {
- ret.add(env);
- }
- }
- }
- return ret;
- }
-
/**
* Find a coprocessor environment by class name
* @param className the class name
* @return the coprocessor, or null if not found
*/
- public CoprocessorEnvironment findCoprocessorEnvironment(String className) {
- for (E env: coprocessors) {
+ @VisibleForTesting
+ public E findCoprocessorEnvironment(String className) {
+ for (E env: coprocEnvironments) {
if (env.getInstance().getClass().getName().equals(className) ||
env.getInstance().getClass().getSimpleName().equals(className)) {
return env;
@@ -374,7 +360,7 @@ public abstract class CoprocessorHost {
Set getExternalClassLoaders() {
Set externalClassLoaders = new HashSet<>();
final ClassLoader systemClassLoader = this.getClass().getClassLoader();
- for (E env : coprocessors) {
+ for (E env : coprocEnvironments) {
ClassLoader cl = env.getInstance().getClass().getClassLoader();
if (cl != systemClassLoader){
//do not include system classloader
@@ -388,8 +374,7 @@ public abstract class CoprocessorHost {
* Environment priority comparator.
* Coprocessors are chained in sorted order.
*/
- static class EnvironmentPriorityComparator
- implements Comparator {
+ static class EnvironmentPriorityComparator implements Comparator {
@Override
public int compare(final CoprocessorEnvironment env1,
final CoprocessorEnvironment env2) {
@@ -407,153 +392,7 @@ public abstract class CoprocessorHost {
}
}
- /**
- * Encapsulation of the environment of each coprocessor
- */
- public static class Environment implements CoprocessorEnvironment {
-
- /** The coprocessor */
- public Coprocessor impl;
- /** Chaining priority */
- protected int priority = Coprocessor.PRIORITY_USER;
- /** Current coprocessor state */
- Coprocessor.State state = Coprocessor.State.UNINSTALLED;
- /** Accounting for tables opened by the coprocessor */
- protected List openTables =
- Collections.synchronizedList(new ArrayList());
- private int seq;
- private Configuration conf;
- private ClassLoader classLoader;
-
- /**
- * Constructor
- * @param impl the coprocessor instance
- * @param priority chaining priority
- */
- public Environment(final Coprocessor impl, final int priority,
- final int seq, final Configuration conf) {
- this.impl = impl;
- this.classLoader = impl.getClass().getClassLoader();
- this.priority = priority;
- this.state = Coprocessor.State.INSTALLED;
- this.seq = seq;
- this.conf = conf;
- }
-
- /** Initialize the environment */
- public void startup() throws IOException {
- if (state == Coprocessor.State.INSTALLED ||
- state == Coprocessor.State.STOPPED) {
- state = Coprocessor.State.STARTING;
- Thread currentThread = Thread.currentThread();
- ClassLoader hostClassLoader = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(this.getClassLoader());
- impl.start(this);
- state = Coprocessor.State.ACTIVE;
- } finally {
- currentThread.setContextClassLoader(hostClassLoader);
- }
- } else {
- LOG.warn("Not starting coprocessor "+impl.getClass().getName()+
- " because not inactive (state="+state.toString()+")");
- }
- }
-
- /** Clean up the environment */
- protected void shutdown() {
- if (state == Coprocessor.State.ACTIVE) {
- state = Coprocessor.State.STOPPING;
- Thread currentThread = Thread.currentThread();
- ClassLoader hostClassLoader = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(this.getClassLoader());
- impl.stop(this);
- state = Coprocessor.State.STOPPED;
- } catch (IOException ioe) {
- LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
- } finally {
- currentThread.setContextClassLoader(hostClassLoader);
- }
- } else {
- LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
- " because not active (state="+state.toString()+")");
- }
- synchronized (openTables) {
- // clean up any table references
- for (Table table: openTables) {
- try {
- ((HTableWrapper)table).internalClose();
- } catch (IOException e) {
- // nothing can be done here
- LOG.warn("Failed to close " +
- table.getName(), e);
- }
- }
- }
- }
-
- @Override
- public Coprocessor getInstance() {
- return impl;
- }
-
- @Override
- public ClassLoader getClassLoader() {
- return classLoader;
- }
-
- @Override
- public int getPriority() {
- return priority;
- }
-
- @Override
- public int getLoadSequence() {
- return seq;
- }
-
- /** @return the coprocessor environment version */
- @Override
- public int getVersion() {
- return Coprocessor.VERSION;
- }
-
- /** @return the HBase release */
- @Override
- public String getHBaseVersion() {
- return VersionInfo.getVersion();
- }
-
- @Override
- public Configuration getConfiguration() {
- return conf;
- }
-
- /**
- * Open a table from within the Coprocessor environment
- * @param tableName the table name
- * @return an interface for manipulating the table
- * @exception java.io.IOException Exception
- */
- @Override
- public Table getTable(TableName tableName) throws IOException {
- return this.getTable(tableName, null);
- }
-
- /**
- * Open a table from within the Coprocessor environment
- * @param tableName the table name
- * @return an interface for manipulating the table
- * @exception java.io.IOException Exception
- */
- @Override
- public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
- return HTableWrapper.createWrapper(openTables, tableName, this, pool);
- }
- }
-
- protected void abortServer(final CoprocessorEnvironment environment, final Throwable e) {
+ protected void abortServer(final E environment, final Throwable e) {
abortServer(environment.getInstance().getClass().getName(), e);
}
@@ -586,8 +425,7 @@ public abstract class CoprocessorHost {
// etc) mention this nuance of our exception handling so that coprocessor can throw appropriate
// exceptions depending on situation. If any changes are made to this logic, make sure to
// update all classes' comments.
- protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e)
- throws IOException {
+ protected void handleCoprocessorThrowable(final E env, final Throwable e) throws IOException {
if (e instanceof IOException) {
throw (IOException)e;
}
@@ -610,7 +448,7 @@ public abstract class CoprocessorHost {
"environment",e);
}
- coprocessors.remove(env);
+ coprocEnvironments.remove(env);
try {
shutdown(env);
} catch (Exception x) {
@@ -695,4 +533,192 @@ public abstract class CoprocessorHost {
"'. Details of the problem: " + message);
}
}
+
+ /**
+ * Implementations defined function to get an observer of type {@code O} from a coprocessor of
+ * type {@code C}. Concrete implementations of CoprocessorHost define one getter for each
+ * observer they can handle. For e.g. RegionCoprocessorHost will use 3 getters, one for
+ * each of RegionObserver, EndpointObserver and BulkLoadObserver.
+ * These getters are used by {@code ObserverOperation} to get appropriate observer from the
+ * coprocessor.
+ */
+ @FunctionalInterface
+ public interface ObserverGetter extends Function> {}
+
+ private abstract class ObserverOperation extends ObserverContext {
+ ObserverGetter observerGetter;
+
+ ObserverOperation(ObserverGetter observerGetter) {
+ this(observerGetter, RpcServer.getRequestUser());
+ }
+
+ ObserverOperation(ObserverGetter observerGetter, User user) {
+ super(user);
+ this.observerGetter = observerGetter;
+ }
+
+ abstract void callObserver() throws IOException;
+ protected void postEnvCall() {}
+ }
+
+ // Can't derive ObserverOperation from ObserverOperationWithResult (R = Void) because then all
+ // ObserverCaller implementations will have to have a return statement.
+ // O = observer, E = environment, C = coprocessor, R=result type
+ public abstract class ObserverOperationWithoutResult extends ObserverOperation {
+ protected abstract void call(O observer) throws IOException;
+
+ public ObserverOperationWithoutResult(ObserverGetter observerGetter) {
+ super(observerGetter);
+ }
+
+ public ObserverOperationWithoutResult(ObserverGetter observerGetter, User user) {
+ super(observerGetter, user);
+ }
+
+ /**
+ * In case of coprocessors which have many kinds of observers (for eg, {@link RegionCoprocessor}
+ * has BulkLoadObserver, RegionObserver, etc), some implementations may not need all
+ * observers, in which case they will return null for that observer's getter.
+ * We simply ignore such cases.
+ */
+ @Override
+ void callObserver() throws IOException {
+ Optional observer = observerGetter.apply(getEnvironment().getInstance());
+ if (observer.isPresent()) {
+ call(observer.get());
+ }
+ }
+ }
+
+ public abstract class ObserverOperationWithResult extends ObserverOperation {
+ protected abstract R call(O observer) throws IOException;
+
+ private R result;
+
+ public ObserverOperationWithResult(ObserverGetter observerGetter) {
+ super(observerGetter);
+ }
+
+ public ObserverOperationWithResult(ObserverGetter observerGetter, User user) {
+ super(observerGetter, user);
+ }
+
+ void setResult(final R result) {
+ this.result = result;
+ }
+
+ protected R getResult() {
+ return this.result;
+ }
+
+ void callObserver() throws IOException {
+ Optional observer = observerGetter.apply(getEnvironment().getInstance());
+ if (observer.isPresent()) {
+ result = call(observer.get());
+ }
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////////////////
+ // Functions to execute observer hooks and handle results (if any)
+ //////////////////////////////////////////////////////////////////////////////////////////
+ protected R execOperationWithResult(final R defaultValue,
+ final ObserverOperationWithResult observerOperation) throws IOException {
+ if (observerOperation == null) {
+ return defaultValue;
+ }
+ observerOperation.setResult(defaultValue);
+ execOperation(observerOperation);
+ return observerOperation.getResult();
+ }
+
+ // what does bypass mean?
+ protected R execOperationWithResult(final boolean ifBypass, final R defaultValue,
+ final ObserverOperationWithResult observerOperation) throws IOException {
+ if (observerOperation == null) {
+ return ifBypass ? null : defaultValue;
+ } else {
+ observerOperation.setResult(defaultValue);
+ boolean bypass = execOperation(true, observerOperation);
+ R result = observerOperation.getResult();
+ return bypass == ifBypass ? result : null;
+ }
+ }
+
+ protected boolean execOperation(final ObserverOperation observerOperation)
+ throws IOException {
+ return execOperation(true, observerOperation);
+ }
+
+ protected boolean execOperation(final boolean earlyExit,
+ final ObserverOperation observerOperation) throws IOException {
+ if (observerOperation == null) return false;
+ boolean bypass = false;
+ List envs = coprocEnvironments.get();
+ for (E env : envs) {
+ observerOperation.prepare(env);
+ Thread currentThread = Thread.currentThread();
+ ClassLoader cl = currentThread.getContextClassLoader();
+ try {
+ currentThread.setContextClassLoader(env.getClassLoader());
+ observerOperation.callObserver();
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ } finally {
+ currentThread.setContextClassLoader(cl);
+ }
+ bypass |= observerOperation.shouldBypass();
+ if (earlyExit && observerOperation.shouldComplete()) {
+ break;
+ }
+ observerOperation.postEnvCall();
+ }
+ return bypass;
+ }
+
+
+ /**
+ * Coprocessor classes can be configured in any order, based on that priority is set and
+ * chained in a sorted order. Should be used preStop*() hooks i.e. when master/regionserver is
+ * going down. This function first calls coprocessor methods (using ObserverOperation.call())
+ * and then shutdowns the environment in postEnvCall().
+ * Need to execute all coprocessor methods first then postEnvCall(), otherwise some coprocessors
+ * may remain shutdown if any exception occurs during next coprocessor execution which prevent
+ * master/regionserver stop or cluster shutdown. (Refer:
+ * HBASE-16663
+ * @return true if bypaas coprocessor execution, false if not.
+ * @throws IOException
+ */
+ protected boolean execShutdown(final ObserverOperation observerOperation)
+ throws IOException {
+ if (observerOperation == null) return false;
+ boolean bypass = false;
+ List envs = coprocEnvironments.get();
+ // Iterate the coprocessors and execute ObserverOperation's call()
+ for (E env : envs) {
+ observerOperation.prepare(env);
+ Thread currentThread = Thread.currentThread();
+ ClassLoader cl = currentThread.getContextClassLoader();
+ try {
+ currentThread.setContextClassLoader(env.getClassLoader());
+ observerOperation.callObserver();
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ } finally {
+ currentThread.setContextClassLoader(cl);
+ }
+ bypass |= observerOperation.shouldBypass();
+ if (observerOperation.shouldComplete()) {
+ break;
+ }
+ }
+
+ // Iterate the coprocessors and execute ObserverOperation's postEnvCall()
+ for (E env : envs) {
+ observerOperation.prepare(env);
+ observerOperation.postEnvCall();
+ }
+ return bypass;
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorService.java
index efee64c63c2..f6102290dec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorService.java
@@ -26,7 +26,9 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
/**
* Coprocessor endpoints providing protobuf services should implement this
* interface and return the {@link Service} instance via {@link #getService()}.
+ * @deprecated Since 2.0. Will be removed in 3.0
*/
+@Deprecated
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public interface CoprocessorService {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorServiceBackwardCompatiblity.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorServiceBackwardCompatiblity.java
new file mode 100644
index 00000000000..c677d638487
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorServiceBackwardCompatiblity.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import com.google.protobuf.Service;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.Optional;
+
+/**
+ * Classes to help maintain backward compatibility with now deprecated {@link CoprocessorService}
+ * and {@link SingletonCoprocessorService}.
+ * From 2.0 onwards, implementors of coprocessor service should also implement the relevant
+ * coprocessor class (For eg {@link MasterCoprocessor} for coprocessor service in master), and
+ * override get*Service() method to return the {@link com.google.protobuf.Service} object.
+ * To maintain backward compatibility with 1.0 implementation, we'll wrap implementation of
+ * CoprocessorService/SingletonCoprocessorService in the new
+ * {Master, Region, RegionServer}Coprocessor class.
+ * Since there is no backward compatibility guarantee for Observers, we leave get*Observer() to
+ * default which returns null.
+ * This approach to maintain backward compatibility seems cleaner and more explicit.
+ */
+@InterfaceAudience.Private
+@Deprecated
+public class CoprocessorServiceBackwardCompatiblity {
+
+ static public class MasterCoprocessorService implements MasterCoprocessor {
+
+ CoprocessorService service;
+
+ public MasterCoprocessorService(CoprocessorService service) {
+ this.service = service;
+ }
+
+ @Override
+ public Optional getService() {
+ return Optional.of(service.getService());
+ }
+ }
+
+ static public class RegionCoprocessorService implements RegionCoprocessor {
+
+ CoprocessorService service;
+
+ public RegionCoprocessorService(CoprocessorService service) {
+ this.service = service;
+ }
+
+ @Override
+ public Optional getService() {
+ return Optional.of(service.getService());
+ }
+ }
+
+ static public class RegionServerCoprocessorService implements RegionServerCoprocessor {
+
+ SingletonCoprocessorService service;
+
+ public RegionServerCoprocessorService(SingletonCoprocessorService service) {
+ this.service = service;
+ }
+
+ @Override
+ public Optional getService() {
+ return Optional.of(service.getService());
+ }
+ }
+}
+
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointObserver.java
index fc0e666ac77..096247cf9fb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointObserver.java
@@ -50,7 +50,7 @@ import com.google.protobuf.Service;
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
-public interface EndpointObserver extends Coprocessor {
+public interface EndpointObserver {
/**
* Called before an Endpoint service method is invoked.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterCoprocessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterCoprocessor.java
new file mode 100644
index 00000000000..d940385ffae
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterCoprocessor.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import java.util.Optional;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public interface MasterCoprocessor extends Coprocessor {
+ default Optional getMasterObserver() {
+ return Optional.empty();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterCoprocessorEnvironment.java
index adab32f0405..1668b69f2db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterCoprocessorEnvironment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterCoprocessorEnvironment.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.metrics.MetricRegistry;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
-public interface MasterCoprocessorEnvironment extends CoprocessorEnvironment {
+public interface MasterCoprocessorEnvironment extends CoprocessorEnvironment {
/** @return reference to the HMaster services */
MasterServices getMasterServices();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index 87b9679471a..bfa88e6c94a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -77,7 +77,7 @@ import org.apache.yetus.audience.InterfaceStability;
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
-public interface MasterObserver extends Coprocessor {
+public interface MasterObserver {
/**
* Called before a new table is created by
* {@link org.apache.hadoop.hbase.master.HMaster}. Called as part of create
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
index a4dcae0da27..c4fb440795e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
@@ -77,8 +77,7 @@ import com.google.protobuf.Service;
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
-public class MultiRowMutationEndpoint extends MultiRowMutationService implements
-CoprocessorService, Coprocessor {
+public class MultiRowMutationEndpoint extends MultiRowMutationService implements RegionCoprocessor {
private RegionCoprocessorEnvironment env;
@Override
public void mutateRows(RpcController controller, MutateRowsRequest request,
@@ -120,10 +119,9 @@ CoprocessorService, Coprocessor {
done.run(response);
}
-
@Override
- public Service getService() {
- return this;
+ public Optional getService() {
+ return Optional.of(this);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContext.java
index ba71129ecc2..0192ea37680 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContext.java
@@ -116,13 +116,13 @@ public class ObserverContext {
* @param env The coprocessor environment to set
* @param context An existing ObserverContext instance to use, or null
* to create a new instance
- * @param The environment type for the context
+ * @param The environment type for the context
* @return An instance of ObserverContext
with the environment set
*/
@Deprecated
// TODO: Remove this method, ObserverContext should not depend on RpcServer
- public static ObserverContext createAndPrepare(
- T env, ObserverContext context) {
+ public static ObserverContext createAndPrepare(
+ E env, ObserverContext< E> context) {
if (context == null) {
context = new ObserverContext<>(RpcServer.getRequestUser());
}
@@ -140,11 +140,11 @@ public class ObserverContext {
* @param context An existing ObserverContext instance to use, or null
* to create a new instance
* @param user The requesting caller for the execution context
- * @param The environment type for the context
+ * @param The environment type for the context
* @return An instance of ObserverContext
with the environment set
*/
- public static ObserverContext createAndPrepare(
- T env, ObserverContext context, User user) {
+ public static ObserverContext createAndPrepare(
+ E env, ObserverContext context, User user) {
if (context == null) {
context = new ObserverContext<>(user);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessor.java
new file mode 100644
index 00000000000..16c6d399040
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessor.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import java.util.Optional;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public interface RegionCoprocessor extends Coprocessor {
+
+ default Optional getRegionObserver() {
+ return Optional.empty();
+ }
+
+ default Optional getEndpointObserver() {
+ return Optional.empty();
+ }
+
+ default Optional getBulkLoadObserver() {
+ return Optional.empty();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
index dceb3d4d03a..b29cd287832 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
@@ -32,7 +32,7 @@ import org.apache.yetus.audience.InterfaceStability;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
-public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment {
+public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment {
/** @return the region associated with this coprocessor */
Region getRegion();
@@ -61,6 +61,4 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment {
// so we do not want to allow coprocessors to export metrics at the region level. We can allow
// getMetricRegistryForTable() to allow coprocessors to track metrics per-table, per-regionserver.
MetricRegistry getMetricRegistryForRegionServer();
-
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 60e5f40ba37..75c1da9fcd1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -99,7 +99,7 @@ import org.apache.yetus.audience.InterfaceStability;
// TODO as method signatures need to break, update to
// ObserverContext extends RegionCoprocessorEnvironment>
// so we can use additional environment state that isn't exposed to coprocessors.
-public interface RegionObserver extends Coprocessor {
+public interface RegionObserver {
/** Mutation type for postMutationBeforeWAL hook */
enum MutationType {
APPEND, INCREMENT
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerCoprocessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerCoprocessor.java
new file mode 100644
index 00000000000..66d8113a87a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerCoprocessor.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import java.util.Optional;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public interface RegionServerCoprocessor extends Coprocessor {
+ default Optional getRegionServerObserver() {
+ return Optional.empty();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerCoprocessorEnvironment.java
index da3189fbf48..ecd0f3e15f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerCoprocessorEnvironment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerCoprocessorEnvironment.java
@@ -27,7 +27,8 @@ import org.apache.yetus.audience.InterfaceStability;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
-public interface RegionServerCoprocessorEnvironment extends CoprocessorEnvironment {
+public interface RegionServerCoprocessorEnvironment
+ extends CoprocessorEnvironment {
/**
* Gets the region server services.
*
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
index 5d68eec54b7..c1af3fb3787 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
@@ -53,7 +53,7 @@ import org.apache.yetus.audience.InterfaceStability;
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
-public interface RegionServerObserver extends Coprocessor {
+public interface RegionServerObserver {
/**
* Called before stopping region server.
* @param ctx the environment to interact with the framework and region server.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/SingletonCoprocessorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/SingletonCoprocessorService.java
index c7131ff79bb..719acf76eef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/SingletonCoprocessorService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/SingletonCoprocessorService.java
@@ -26,7 +26,9 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
/**
* Coprocessor endpoints registered once per server and providing protobuf services should implement
* this interface and return the {@link Service} instance via {@link #getService()}.
+ * @deprecated Since 2.0. Will be removed in 3.0
*/
+@Deprecated
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public interface SingletonCoprocessorService {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessor.java
new file mode 100644
index 00000000000..d87c06d6ced
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import java.util.Optional;
+
+/**
+ * WALCoprocessor don't support loading services using {@link #getService()}.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public interface WALCoprocessor extends Coprocessor {
+ Optional getWALObserver();
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java
index 8ea399d5a85..71c72a2e7f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALCoprocessorEnvironment.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.wal.WAL;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
-public interface WALCoprocessorEnvironment extends CoprocessorEnvironment {
+public interface WALCoprocessorEnvironment extends CoprocessorEnvironment {
/** @return reference to the region server's WAL */
WAL getWAL();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
index 52c27f7bc99..2190abf6067 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
@@ -66,7 +66,7 @@ import org.apache.yetus.audience.InterfaceStability;
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
-public interface WALObserver extends Coprocessor {
+public interface WALObserver {
/**
* Called before a {@link WALEdit}
* is writen to WAL.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/package-info.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/package-info.java
index 8a677eef868..a6b5c4bc200 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/package-info.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/package-info.java
@@ -181,9 +181,8 @@ To implement an Endpoint, you need to:
protocol buffer guide
for more details on defining services.
Generate the Service and Message code using the protoc compiler
- Implement the generated Service interface in your coprocessor class and implement the
- CoprocessorService
interface. The CoprocessorService.getService()
- method should return a reference to the Endpoint's protocol buffer Service instance.
+ Implement the generated Service interface and override get*Service() method in
+ relevant Coprocessor to return a reference to the Endpoint's protocol buffer Service instance.
For a more detailed discussion of how to implement a coprocessor Endpoint, along with some sample
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 30d801585d5..56cf496297a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -23,11 +23,9 @@ import java.io.IOException;
import java.util.List;
import java.util.Set;
-import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
@@ -38,13 +36,15 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
@@ -65,7 +65,7 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
public class MasterCoprocessorHost
- extends CoprocessorHost {
+ extends CoprocessorHost {
private static final Log LOG = LogFactory.getLog(MasterCoprocessorHost.class);
@@ -73,21 +73,20 @@ public class MasterCoprocessorHost
* Coprocessor environment extension providing access to master related
* services.
*/
- static class MasterEnvironment extends CoprocessorHost.Environment
+ private static class MasterEnvironment extends BaseEnvironment
implements MasterCoprocessorEnvironment {
private final MasterServices masterServices;
private final boolean supportGroupCPs;
private final MetricRegistry metricRegistry;
- public MasterEnvironment(final Class> implClass, final Coprocessor impl,
- final int priority, final int seq, final Configuration conf,
- final MasterServices services) {
+ public MasterEnvironment(final MasterCoprocessor impl, final int priority, final int seq,
+ final Configuration conf, final MasterServices services) {
super(impl, priority, seq, conf);
this.masterServices = services;
supportGroupCPs = !useLegacyMethod(impl.getClass(),
"preBalanceRSGroup", ObserverContext.class, String.class);
this.metricRegistry =
- MetricsCoprocessor.createRegistryForMasterCoprocessor(implClass.getName());
+ MetricsCoprocessor.createRegistryForMasterCoprocessor(impl.getClass().getName());
}
@Override
@@ -101,7 +100,7 @@ public class MasterCoprocessorHost
}
@Override
- protected void shutdown() {
+ public void shutdown() {
super.shutdown();
MetricsCoprocessor.removeRegistry(this.metricRegistry);
}
@@ -122,120 +121,142 @@ public class MasterCoprocessorHost
loadSystemCoprocessors(conf, MASTER_COPROCESSOR_CONF_KEY);
}
+
+
@Override
- public MasterEnvironment createEnvironment(final Class> implClass,
- final Coprocessor instance, final int priority, final int seq,
- final Configuration conf) {
- for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
- Class> c = (Class>) itf;
- if (CoprocessorService.class.isAssignableFrom(c)) {
- masterServices.registerService(((CoprocessorService)instance).getService());
- }
- }
- return new MasterEnvironment(implClass, instance, priority, seq, conf,
- masterServices);
+ public MasterEnvironment createEnvironment(final MasterCoprocessor instance, final int priority,
+ final int seq, final Configuration conf) {
+ instance.getService().ifPresent(masterServices::registerService);
+ return new MasterEnvironment(instance, priority, seq, conf, masterServices);
}
+ @Override
+ public MasterCoprocessor checkAndGetInstance(Class> implClass)
+ throws InstantiationException, IllegalAccessException {
+ if (MasterCoprocessor.class.isAssignableFrom(implClass)) {
+ return (MasterCoprocessor)implClass.newInstance();
+ } else if (CoprocessorService.class.isAssignableFrom(implClass)) {
+ // For backward compatibility with old CoprocessorService impl which don't extend
+ // MasterCoprocessor.
+ return new CoprocessorServiceBackwardCompatiblity.MasterCoprocessorService(
+ (CoprocessorService)implClass.newInstance());
+ } else {
+ LOG.error(implClass.getName() + " is not of type MasterCoprocessor. Check the "
+ + "configuration " + CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
+ return null;
+ }
+ }
+
+ private ObserverGetter masterObserverGetter =
+ MasterCoprocessor::getMasterObserver;
+
+ abstract class MasterObserverOperation extends
+ ObserverOperationWithoutResult {
+ public MasterObserverOperation(){
+ super(masterObserverGetter);
+ }
+
+ public MasterObserverOperation(User user) {
+ super(masterObserverGetter, user);
+ }
+ }
+
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+ // MasterObserver operations
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+
+
public boolean preCreateNamespace(final NamespaceDescriptor ns) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preCreateNamespace(ctx, ns);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preCreateNamespace(this, ns);
}
});
}
public void postCreateNamespace(final NamespaceDescriptor ns) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postCreateNamespace(ctx, ns);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postCreateNamespace(this, ns);
}
});
}
public boolean preDeleteNamespace(final String namespaceName) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preDeleteNamespace(ctx, namespaceName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preDeleteNamespace(this, namespaceName);
}
});
}
public void postDeleteNamespace(final String namespaceName) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postDeleteNamespace(ctx, namespaceName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postDeleteNamespace(this, namespaceName);
}
});
}
public boolean preModifyNamespace(final NamespaceDescriptor ns) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preModifyNamespace(ctx, ns);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preModifyNamespace(this, ns);
}
});
}
public void postModifyNamespace(final NamespaceDescriptor ns) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postModifyNamespace(ctx, ns);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postModifyNamespace(this, ns);
}
});
}
public void preGetNamespaceDescriptor(final String namespaceName)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preGetNamespaceDescriptor(ctx, namespaceName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preGetNamespaceDescriptor(this, namespaceName);
}
});
}
public void postGetNamespaceDescriptor(final NamespaceDescriptor ns)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postGetNamespaceDescriptor(ctx, ns);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postGetNamespaceDescriptor(this, ns);
}
});
}
public boolean preListNamespaceDescriptors(final List descriptors)
throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preListNamespaceDescriptors(ctx, descriptors);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preListNamespaceDescriptors(this, descriptors);
}
});
}
public void postListNamespaceDescriptors(final List descriptors)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postListNamespaceDescriptors(ctx, descriptors);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postListNamespaceDescriptors(this, descriptors);
}
});
}
@@ -244,195 +265,175 @@ public class MasterCoprocessorHost
public void preCreateTable(final TableDescriptor htd, final RegionInfo[] regions)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preCreateTable(ctx, htd, regions);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preCreateTable(this, htd, regions);
}
});
}
public void postCreateTable(final TableDescriptor htd, final RegionInfo[] regions)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postCreateTable(ctx, htd, regions);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postCreateTable(this, htd, regions);
}
});
}
public void preCreateTableAction(final TableDescriptor htd, final RegionInfo[] regions,
- final User user)
- throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ final User user) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preCreateTableAction(ctx, htd, regions);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preCreateTableAction(this, htd, regions);
}
});
}
public void postCompletedCreateTableAction(
final TableDescriptor htd, final RegionInfo[] regions, final User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postCompletedCreateTableAction(ctx, htd, regions);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postCompletedCreateTableAction(this, htd, regions);
}
});
}
public void preDeleteTable(final TableName tableName) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preDeleteTable(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preDeleteTable(this, tableName);
}
});
}
public void postDeleteTable(final TableName tableName) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postDeleteTable(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postDeleteTable(this, tableName);
}
});
}
public void preDeleteTableAction(final TableName tableName, final User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preDeleteTableAction(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preDeleteTableAction(this, tableName);
}
});
}
public void postCompletedDeleteTableAction(final TableName tableName, final User user)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postCompletedDeleteTableAction(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postCompletedDeleteTableAction(this, tableName);
}
});
}
public void preTruncateTable(final TableName tableName) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preTruncateTable(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preTruncateTable(this, tableName);
}
});
}
public void postTruncateTable(final TableName tableName) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postTruncateTable(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postTruncateTable(this, tableName);
}
});
}
- public void preTruncateTableAction(final TableName tableName, final User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ public void preTruncateTableAction(final TableName tableName, final User user)
+ throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preTruncateTableAction(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preTruncateTableAction(this, tableName);
}
});
}
public void postCompletedTruncateTableAction(final TableName tableName, final User user)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postCompletedTruncateTableAction(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postCompletedTruncateTableAction(this, tableName);
}
});
}
public void preModifyTable(final TableName tableName, final TableDescriptor htd)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preModifyTable(ctx, tableName, htd);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preModifyTable(this, tableName, htd);
}
});
}
public void postModifyTable(final TableName tableName, final TableDescriptor htd)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postModifyTable(ctx, tableName, htd);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postModifyTable(this, tableName, htd);
}
});
}
public void preModifyTableAction(final TableName tableName, final TableDescriptor htd,
- final User user)
- throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ final User user) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preModifyTableAction(ctx, tableName, htd);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preModifyTableAction(this, tableName, htd);
}
});
}
public void postCompletedModifyTableAction(final TableName tableName, final TableDescriptor htd,
- final User user)
- throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ final User user) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postCompletedModifyTableAction(ctx, tableName, htd);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postCompletedModifyTableAction(this, tableName, htd);
}
});
}
public boolean preAddColumn(final TableName tableName, final ColumnFamilyDescriptor columnFamily)
throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preAddColumnFamily(ctx, tableName, columnFamily);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preAddColumnFamily(this, tableName, columnFamily);
}
});
}
public void postAddColumn(final TableName tableName, final ColumnFamilyDescriptor columnFamily)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postAddColumnFamily(ctx, tableName, columnFamily);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postAddColumnFamily(this, tableName, columnFamily);
}
});
}
@@ -442,11 +443,10 @@ public class MasterCoprocessorHost
final ColumnFamilyDescriptor columnFamily,
final User user)
throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preAddColumnFamilyAction(ctx, tableName, columnFamily);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preAddColumnFamilyAction(this, tableName, columnFamily);
}
});
}
@@ -456,33 +456,30 @@ public class MasterCoprocessorHost
final ColumnFamilyDescriptor columnFamily,
final User user)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postCompletedAddColumnFamilyAction(ctx, tableName, columnFamily);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postCompletedAddColumnFamilyAction(this, tableName, columnFamily);
}
});
}
- public boolean preModifyColumn(final TableName tableName, final ColumnFamilyDescriptor columnFamily)
- throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ public boolean preModifyColumn(final TableName tableName,
+ final ColumnFamilyDescriptor columnFamily) throws IOException {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preModifyColumnFamily(ctx, tableName, columnFamily);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preModifyColumnFamily(this, tableName, columnFamily);
}
});
}
public void postModifyColumn(final TableName tableName, final ColumnFamilyDescriptor columnFamily)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postModifyColumnFamily(ctx, tableName, columnFamily);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postModifyColumnFamily(this, tableName, columnFamily);
}
});
}
@@ -491,11 +488,10 @@ public class MasterCoprocessorHost
final TableName tableName,
final ColumnFamilyDescriptor columnFamily,
final User user) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preModifyColumnFamilyAction(ctx, tableName, columnFamily);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preModifyColumnFamilyAction(this, tableName, columnFamily);
}
});
}
@@ -504,33 +500,30 @@ public class MasterCoprocessorHost
final TableName tableName,
final ColumnFamilyDescriptor columnFamily,
final User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postCompletedModifyColumnFamilyAction(ctx, tableName, columnFamily);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postCompletedModifyColumnFamilyAction(this, tableName, columnFamily);
}
});
}
public boolean preDeleteColumn(final TableName tableName, final byte[] columnFamily)
throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preDeleteColumnFamily(ctx, tableName, columnFamily);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preDeleteColumnFamily(this, tableName, columnFamily);
}
});
}
public void postDeleteColumn(final TableName tableName, final byte[] columnFamily)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postDeleteColumnFamily(ctx, tableName, columnFamily);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postDeleteColumnFamily(this, tableName, columnFamily);
}
});
}
@@ -540,104 +533,94 @@ public class MasterCoprocessorHost
final byte[] columnFamily,
final User user)
throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preDeleteColumnFamilyAction(ctx, tableName, columnFamily);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preDeleteColumnFamilyAction(this, tableName, columnFamily);
}
});
}
public void postCompletedDeleteColumnFamilyAction(
final TableName tableName, final byte[] columnFamily, final User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postCompletedDeleteColumnFamilyAction(ctx, tableName, columnFamily);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postCompletedDeleteColumnFamilyAction(this, tableName, columnFamily);
}
});
}
public void preEnableTable(final TableName tableName) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preEnableTable(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preEnableTable(this, tableName);
}
});
}
public void postEnableTable(final TableName tableName) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postEnableTable(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postEnableTable(this, tableName);
}
});
}
public void preEnableTableAction(final TableName tableName, final User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preEnableTableAction(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preEnableTableAction(this, tableName);
}
});
}
public void postCompletedEnableTableAction(final TableName tableName, final User user)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postCompletedEnableTableAction(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postCompletedEnableTableAction(this, tableName);
}
});
}
public void preDisableTable(final TableName tableName) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preDisableTable(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preDisableTable(this, tableName);
}
});
}
public void postDisableTable(final TableName tableName) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postDisableTable(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postDisableTable(this, tableName);
}
});
}
public void preDisableTableAction(final TableName tableName, final User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preDisableTableAction(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preDisableTableAction(this, tableName);
}
});
}
public void postCompletedDisableTableAction(final TableName tableName, final User user)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postCompletedDisableTableAction(ctx, tableName);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postCompletedDisableTableAction(this, tableName);
}
});
}
@@ -645,208 +628,188 @@ public class MasterCoprocessorHost
public boolean preAbortProcedure(
final ProcedureExecutor procEnv,
final long procId) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preAbortProcedure(ctx, procEnv, procId);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preAbortProcedure(this, procEnv, procId);
}
});
}
public void postAbortProcedure() throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postAbortProcedure(ctx);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postAbortProcedure(this);
}
});
}
public boolean preGetProcedures() throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preGetProcedures(ctx);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preGetProcedures(this);
}
});
}
public void postGetProcedures(final List> procInfoList) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postGetProcedures(ctx, procInfoList);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postGetProcedures(this, procInfoList);
}
});
}
public boolean preGetLocks() throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preGetLocks(ctx);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preGetLocks(this);
}
});
}
public void postGetLocks(final List lockedResources) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postGetLocks(ctx, lockedResources);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postGetLocks(this, lockedResources);
}
});
}
public boolean preMove(final RegionInfo region, final ServerName srcServer,
- final ServerName destServer) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ final ServerName destServer) throws IOException {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preMove(ctx, region, srcServer, destServer);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preMove(this, region, srcServer, destServer);
}
});
}
public void postMove(final RegionInfo region, final ServerName srcServer,
final ServerName destServer) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postMove(ctx, region, srcServer, destServer);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postMove(this, region, srcServer, destServer);
}
});
}
public boolean preAssign(final RegionInfo regionInfo) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preAssign(ctx, regionInfo);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preAssign(this, regionInfo);
}
});
}
public void postAssign(final RegionInfo regionInfo) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postAssign(ctx, regionInfo);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postAssign(this, regionInfo);
}
});
}
public boolean preUnassign(final RegionInfo regionInfo, final boolean force)
throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preUnassign(ctx, regionInfo, force);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preUnassign(this, regionInfo, force);
}
});
}
public void postUnassign(final RegionInfo regionInfo, final boolean force) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postUnassign(ctx, regionInfo, force);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postUnassign(this, regionInfo, force);
}
});
}
public void preRegionOffline(final RegionInfo regionInfo) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preRegionOffline(ctx, regionInfo);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preRegionOffline(this, regionInfo);
}
});
}
public void postRegionOffline(final RegionInfo regionInfo) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postRegionOffline(ctx, regionInfo);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postRegionOffline(this, regionInfo);
}
});
}
public void preMergeRegions(final RegionInfo[] regionsToMerge)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preMergeRegions(ctx, regionsToMerge);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preMergeRegions(this, regionsToMerge);
}
});
}
public void postMergeRegions(final RegionInfo[] regionsToMerge)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postMergeRegions(ctx, regionsToMerge);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postMergeRegions(this, regionsToMerge);
}
});
}
public boolean preBalance() throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preBalance(ctx);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preBalance(this);
}
});
}
public void postBalance(final List plans) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postBalance(ctx, plans);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postBalance(this, plans);
}
});
}
public boolean preSetSplitOrMergeEnabled(final boolean newValue,
final MasterSwitchType switchType) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preSetSplitOrMergeEnabled(ctx, newValue, switchType);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preSetSplitOrMergeEnabled(this, newValue, switchType);
}
});
}
public void postSetSplitOrMergeEnabled(final boolean newValue,
final MasterSwitchType switchType) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postSetSplitOrMergeEnabled(ctx, newValue, switchType);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postSetSplitOrMergeEnabled(this, newValue, switchType);
}
});
}
@@ -860,11 +823,10 @@ public class MasterCoprocessorHost
public void preSplitRegion(
final TableName tableName,
final byte[] splitRow) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preSplitRegion(ctx, tableName, splitRow);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preSplitRegion(this, tableName, splitRow);
}
});
}
@@ -880,11 +842,10 @@ public class MasterCoprocessorHost
final TableName tableName,
final byte[] splitRow,
final User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preSplitRegionAction(ctx, tableName, splitRow);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preSplitRegionAction(this, tableName, splitRow);
}
});
}
@@ -900,11 +861,10 @@ public class MasterCoprocessorHost
final RegionInfo regionInfoA,
final RegionInfo regionInfoB,
final User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postCompletedSplitRegionAction(ctx, regionInfoA, regionInfoB);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postCompletedSplitRegionAction(this, regionInfoA, regionInfoB);
}
});
}
@@ -920,11 +880,10 @@ public class MasterCoprocessorHost
final byte[] splitKey,
final List metaEntries,
final User user) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preSplitRegionBeforePONRAction(ctx, splitKey, metaEntries);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preSplitRegionBeforePONRAction(this, splitKey, metaEntries);
}
});
}
@@ -935,11 +894,10 @@ public class MasterCoprocessorHost
* @throws IOException
*/
public void preSplitAfterPONRAction(final User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preSplitRegionAfterPONRAction(ctx);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preSplitRegionAfterPONRAction(this);
}
});
}
@@ -950,11 +908,10 @@ public class MasterCoprocessorHost
* @throws IOException
*/
public void postRollBackSplitRegionAction(final User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postRollBackSplitRegionAction(ctx);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postRollBackSplitRegionAction(this);
}
});
}
@@ -967,11 +924,10 @@ public class MasterCoprocessorHost
*/
public boolean preMergeRegionsAction(
final RegionInfo[] regionsToMerge, final User user) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver,
- ObserverContext ctx) throws IOException {
- oserver.preMergeRegionsAction(ctx, regionsToMerge);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preMergeRegionsAction(this, regionsToMerge);
}
});
}
@@ -987,11 +943,10 @@ public class MasterCoprocessorHost
final RegionInfo[] regionsToMerge,
final RegionInfo mergedRegion,
final User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver,
- ObserverContext ctx) throws IOException {
- oserver.postCompletedMergeRegionsAction(ctx, regionsToMerge, mergedRegion);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postCompletedMergeRegionsAction(this, regionsToMerge, mergedRegion);
}
});
}
@@ -1007,11 +962,10 @@ public class MasterCoprocessorHost
final RegionInfo[] regionsToMerge,
final @MetaMutationAnnotation List metaEntries,
final User user) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver,
- ObserverContext ctx) throws IOException {
- oserver.preMergeRegionsCommitAction(ctx, regionsToMerge, metaEntries);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preMergeRegionsCommitAction(this, regionsToMerge, metaEntries);
}
});
}
@@ -1027,11 +981,10 @@ public class MasterCoprocessorHost
final RegionInfo[] regionsToMerge,
final RegionInfo mergedRegion,
final User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver,
- ObserverContext ctx) throws IOException {
- oserver.postMergeRegionsCommitAction(ctx, regionsToMerge, mergedRegion);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postMergeRegionsCommitAction(this, regionsToMerge, mergedRegion);
}
});
}
@@ -1044,33 +997,30 @@ public class MasterCoprocessorHost
*/
public void postRollBackMergeRegionsAction(
final RegionInfo[] regionsToMerge, final User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
- public void call(MasterObserver oserver,
- ObserverContext ctx) throws IOException {
- oserver.postRollBackMergeRegionsAction(ctx, regionsToMerge);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postRollBackMergeRegionsAction(this, regionsToMerge);
}
});
}
public boolean preBalanceSwitch(final boolean b) throws IOException {
- return execOperationWithResult(b, coprocessors.isEmpty() ? null :
- new CoprocessorOperationWithResult() {
- @Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- setResult(oserver.preBalanceSwitch(ctx, getResult()));
- }
- });
+ return execOperationWithResult(b, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult(masterObserverGetter) {
+ @Override
+ public Boolean call(MasterObserver observer) throws IOException {
+ return observer.preBalanceSwitch(this, getResult());
+ }
+ });
}
public void postBalanceSwitch(final boolean oldValue, final boolean newValue)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postBalanceSwitch(ctx, oldValue, newValue);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postBalanceSwitch(this, oldValue, newValue);
}
});
}
@@ -1078,16 +1028,15 @@ public class MasterCoprocessorHost
public void preShutdown() throws IOException {
// While stopping the cluster all coprocessors method should be executed first then the
// coprocessor should be cleaned up.
- execShutdown(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execShutdown(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preShutdown(ctx);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preShutdown(this);
}
@Override
- public void postEnvCall(MasterEnvironment env) {
+ public void postEnvCall() {
// invoke coprocessor stop method
- shutdown(env);
+ shutdown(this.getEnvironment());
}
});
}
@@ -1095,228 +1044,207 @@ public class MasterCoprocessorHost
public void preStopMaster() throws IOException {
// While stopping master all coprocessors method should be executed first then the coprocessor
// environment should be cleaned up.
- execShutdown(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execShutdown(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preStopMaster(ctx);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preStopMaster(this);
}
@Override
- public void postEnvCall(MasterEnvironment env) {
+ public void postEnvCall() {
// invoke coprocessor stop method
- shutdown(env);
+ shutdown(this.getEnvironment());
}
});
}
public void preMasterInitialization() throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preMasterInitialization(ctx);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preMasterInitialization(this);
}
});
}
public void postStartMaster() throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postStartMaster(ctx);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postStartMaster(this);
}
});
}
public void preSnapshot(final SnapshotDescription snapshot,
final TableDescriptor hTableDescriptor) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preSnapshot(ctx, snapshot, hTableDescriptor);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preSnapshot(this, snapshot, hTableDescriptor);
}
});
}
public void postSnapshot(final SnapshotDescription snapshot,
final TableDescriptor hTableDescriptor) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postSnapshot(ctx, snapshot, hTableDescriptor);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postSnapshot(this, snapshot, hTableDescriptor);
}
});
}
public void preListSnapshot(final SnapshotDescription snapshot) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver observer, ObserverContext ctx)
- throws IOException {
- observer.preListSnapshot(ctx, snapshot);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preListSnapshot(this, snapshot);
}
});
}
public void postListSnapshot(final SnapshotDescription snapshot) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver observer, ObserverContext ctx)
- throws IOException {
- observer.postListSnapshot(ctx, snapshot);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postListSnapshot(this, snapshot);
}
});
}
public void preCloneSnapshot(final SnapshotDescription snapshot,
final TableDescriptor hTableDescriptor) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preCloneSnapshot(ctx, snapshot, hTableDescriptor);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preCloneSnapshot(this, snapshot, hTableDescriptor);
}
});
}
public void postCloneSnapshot(final SnapshotDescription snapshot,
final TableDescriptor hTableDescriptor) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postCloneSnapshot(ctx, snapshot, hTableDescriptor);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postCloneSnapshot(this, snapshot, hTableDescriptor);
}
});
}
public void preRestoreSnapshot(final SnapshotDescription snapshot,
final TableDescriptor hTableDescriptor) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preRestoreSnapshot(ctx, snapshot, hTableDescriptor);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preRestoreSnapshot(this, snapshot, hTableDescriptor);
}
});
}
public void postRestoreSnapshot(final SnapshotDescription snapshot,
final TableDescriptor hTableDescriptor) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postRestoreSnapshot(ctx, snapshot, hTableDescriptor);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postRestoreSnapshot(this, snapshot, hTableDescriptor);
}
});
}
public void preDeleteSnapshot(final SnapshotDescription snapshot) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preDeleteSnapshot(ctx, snapshot);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preDeleteSnapshot(this, snapshot);
}
});
}
public void postDeleteSnapshot(final SnapshotDescription snapshot) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postDeleteSnapshot(ctx, snapshot);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postDeleteSnapshot(this, snapshot);
}
});
}
public boolean preGetTableDescriptors(final List tableNamesList,
final List descriptors, final String regex) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preGetTableDescriptors(ctx, tableNamesList, descriptors, regex);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preGetTableDescriptors(this, tableNamesList, descriptors, regex);
}
});
}
public void postGetTableDescriptors(final List tableNamesList,
final List descriptors, final String regex) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.postGetTableDescriptors(ctx, tableNamesList, descriptors, regex);
+ public void call(MasterObserver observer) throws IOException {
+ observer.postGetTableDescriptors(this, tableNamesList, descriptors, regex);
}
});
}
public boolean preGetTableNames(final List descriptors,
final String regex) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext ctx)
- throws IOException {
- oserver.preGetTableNames(ctx, descriptors, regex);
+ public void call(MasterObserver observer) throws IOException {
+ observer.preGetTableNames(this, descriptors, regex);
}
});
}
public void postGetTableNames(final List descriptors,
final String regex) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
- public void call(MasterObserver oserver, ObserverContext