From 1229397a95c89ae9bb774cf487e5924dcdf5ebe5 Mon Sep 17 00:00:00 2001 From: Gary Helmling Date: Tue, 21 Dec 2010 20:39:26 +0000 Subject: [PATCH] HBASE-3256: Add coprocessor host and observer for HMaster git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1051639 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hbase/coprocessor/BaseMasterObserver.java | 179 +++++ .../BaseRegionObserverCoprocessor.java | 71 +- .../coprocessor/CoprocessorEnvironment.java | 8 +- .../hbase/coprocessor/CoprocessorHost.java | 556 +++++++++++++ .../MasterCoprocessorEnvironment.java | 28 + .../hbase/coprocessor/MasterObserver.java | 215 +++++ .../RegionCoprocessorEnvironment.java | 33 + .../hbase/coprocessor/RegionObserver.java | 72 +- .../hbase/coprocessor/package-info.java | 4 +- .../apache/hadoop/hbase/master/HMaster.java | 131 +++- .../hbase/master/MasterCoprocessorHost.java | 553 +++++++++++++ .../hadoop/hbase/master/MasterServices.java | 13 + .../hadoop/hbase/regionserver/HRegion.java | 8 +- ...orHost.java => RegionCoprocessorHost.java} | 739 ++++-------------- src/main/resources/hbase-default.xml | 21 +- .../ColumnAggregationEndpoint.java | 3 +- .../coprocessor/SimpleRegionObserver.java | 41 +- .../coprocessor/TestCoprocessorEndpoint.java | 2 +- .../coprocessor/TestCoprocessorInterface.java | 28 +- .../hbase/coprocessor/TestMasterObserver.java | 499 ++++++++++++ .../TestRegionObserverInterface.java | 12 +- .../TestRegionObserverStacking.java | 13 +- .../hbase/master/TestCatalogJanitor.java | 11 +- 24 files changed, 2510 insertions(+), 731 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/MasterCoprocessorEnvironment.java create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java create mode 100644 src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java rename src/main/java/org/apache/hadoop/hbase/regionserver/{CoprocessorHost.java => RegionCoprocessorHost.java} (51%) create mode 100644 src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java diff --git a/CHANGES.txt b/CHANGES.txt index c7023cb6c37..5bde4e8a773 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -46,6 +46,7 @@ Release 0.91.0 - Unreleased HBASE-3287 Add option to cache blocks on hfile write and evict blocks on hfile close HBASE-3335 Add BitComparator for filtering (Nathaniel Cook via Stack) + HBASE-3256 Coprocessors: Coprocessor host and observer for HMaster Release 0.90.0 - Unreleased diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java new file mode 100644 index 00000000000..9576c485f05 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java @@ -0,0 +1,179 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.UnknownRegionException; + +import java.io.IOException; + +public class BaseMasterObserver implements MasterObserver { + @Override + public void preCreateTable(MasterCoprocessorEnvironment env, + HTableDescriptor desc, byte[][] splitKeys) throws IOException { + } + + @Override + public void postCreateTable(MasterCoprocessorEnvironment env, + HRegionInfo[] regions, boolean sync) throws IOException { + } + + @Override + public void preDeleteTable(MasterCoprocessorEnvironment env, byte[] tableName) + throws IOException { + } + + @Override + public void postDeleteTable(MasterCoprocessorEnvironment env, byte[] tableName) + throws IOException { + } + + @Override + public void preModifyTable(MasterCoprocessorEnvironment env, + byte[] tableName, HTableDescriptor htd) throws IOException { + } + + @Override + public void postModifyTable(MasterCoprocessorEnvironment env, + byte[] tableName, HTableDescriptor htd) throws IOException { + } + + @Override + public void preAddColumn(MasterCoprocessorEnvironment env, + byte[] tableName, HColumnDescriptor column) throws IOException { + } + + @Override + public void postAddColumn(MasterCoprocessorEnvironment env, byte[] tableName, + HColumnDescriptor column) throws IOException { + } + + @Override + public void preModifyColumn(MasterCoprocessorEnvironment env, + byte[] tableName, HColumnDescriptor descriptor) throws IOException { + } + + @Override + public void postModifyColumn(MasterCoprocessorEnvironment env, + byte[] tableName, HColumnDescriptor descriptor) throws IOException { + } + + @Override + public void preDeleteColumn(MasterCoprocessorEnvironment env, + byte[] tableName, byte[] c) throws IOException { + } + + @Override + public void postDeleteColumn(MasterCoprocessorEnvironment env, + byte[] tableName, byte[] c) throws IOException { + } + + @Override + public void preEnableTable(MasterCoprocessorEnvironment env, byte[] tableName) + throws IOException { + } + + @Override + public void postEnableTable(MasterCoprocessorEnvironment env, + byte[] tableName) throws IOException { + } + + @Override + public void preDisableTable(MasterCoprocessorEnvironment env, + byte[] tableName) throws IOException { + } + + @Override + public void postDisableTable(MasterCoprocessorEnvironment env, + byte[] tableName) throws IOException { + } + + @Override + public void preMove(MasterCoprocessorEnvironment env, HRegionInfo region, + HServerInfo srcServer, HServerInfo destServer) + throws UnknownRegionException { + } + + @Override + public void postMove(MasterCoprocessorEnvironment env, HRegionInfo region, + HServerInfo srcServer, HServerInfo destServer) + throws UnknownRegionException { + } + + @Override + public void preAssign(MasterCoprocessorEnvironment env, byte[] regionName, + boolean force) throws IOException { + } + + @Override + public void postAssign(MasterCoprocessorEnvironment env, + HRegionInfo regionInfo) throws IOException { + } + + @Override + public void preUnassign(MasterCoprocessorEnvironment env, byte[] regionName, + boolean force) throws IOException { + } + + @Override + public void postUnassign(MasterCoprocessorEnvironment env, + HRegionInfo regionInfo, boolean force) throws IOException { + } + + @Override + public void preBalance(MasterCoprocessorEnvironment env) throws IOException { + } + + @Override + public void postBalance(MasterCoprocessorEnvironment env) throws IOException { + } + + @Override + public boolean preBalanceSwitch(MasterCoprocessorEnvironment env, boolean b) + throws IOException { + return b; + } + + @Override + public void postBalanceSwitch(MasterCoprocessorEnvironment env, + boolean oldValue, boolean newValue) throws IOException { + } + + @Override + public void preShutdown(MasterCoprocessorEnvironment env) throws IOException { + } + + @Override + public void preStopMaster(MasterCoprocessorEnvironment env) + throws IOException { + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java index 1ffead04e42..a8ee5e76a78 100644 --- a/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java @@ -36,8 +36,7 @@ import java.io.IOException; * By extending it, you can create you own region observer without * overriding all abstract methods of Coprocessor and RegionObserver. */ -public abstract class BaseRegionObserverCoprocessor implements Coprocessor, - RegionObserver { +public abstract class BaseRegionObserverCoprocessor implements RegionObserver { @Override public void start(CoprocessorEnvironment e) { } @@ -45,94 +44,94 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor, public void stop(CoprocessorEnvironment e) { } @Override - public void preOpen(CoprocessorEnvironment e) { } + public void preOpen(RegionCoprocessorEnvironment e) { } @Override - public void postOpen(CoprocessorEnvironment e) { } + public void postOpen(RegionCoprocessorEnvironment e) { } @Override - public void preClose(CoprocessorEnvironment e, boolean abortRequested) + public void preClose(RegionCoprocessorEnvironment e, boolean abortRequested) { } @Override - public void postClose(CoprocessorEnvironment e, boolean abortRequested) + public void postClose(RegionCoprocessorEnvironment e, boolean abortRequested) { } @Override - public void preFlush(CoprocessorEnvironment e) { } + public void preFlush(RegionCoprocessorEnvironment e) { } @Override - public void postFlush(CoprocessorEnvironment e) { } + public void postFlush(RegionCoprocessorEnvironment e) { } @Override - public void preSplit(CoprocessorEnvironment e) { } + public void preSplit(RegionCoprocessorEnvironment e) { } @Override - public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) { } + public void postSplit(RegionCoprocessorEnvironment e, HRegion l, HRegion r) { } @Override - public void preCompact(CoprocessorEnvironment e, boolean willSplit) { } + public void preCompact(RegionCoprocessorEnvironment e, boolean willSplit) { } @Override - public void postCompact(CoprocessorEnvironment e, boolean willSplit) { } + public void postCompact(RegionCoprocessorEnvironment e, boolean willSplit) { } @Override - public void preGetClosestRowBefore(final CoprocessorEnvironment e, + public void preGetClosestRowBefore(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final Result result) throws IOException { } @Override - public void postGetClosestRowBefore(final CoprocessorEnvironment e, + public void postGetClosestRowBefore(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final Result result) throws IOException { } @Override - public void preGet(final CoprocessorEnvironment e, final Get get, + public void preGet(final RegionCoprocessorEnvironment e, final Get get, final List results) throws IOException { } @Override - public void postGet(final CoprocessorEnvironment e, final Get get, + public void postGet(final RegionCoprocessorEnvironment e, final Get get, final List results) throws IOException { } @Override - public boolean preExists(final CoprocessorEnvironment e, final Get get, + public boolean preExists(final RegionCoprocessorEnvironment e, final Get get, final boolean exists) throws IOException { return exists; } @Override - public boolean postExists(final CoprocessorEnvironment e, final Get get, + public boolean postExists(final RegionCoprocessorEnvironment e, final Get get, boolean exists) throws IOException { return exists; } @Override - public void prePut(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException { } @Override - public void postPut(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException { } @Override - public void preDelete(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException { } @Override - public void postDelete(final CoprocessorEnvironment e, + public void postDelete(final RegionCoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException { } @Override - public boolean preCheckAndPut(final CoprocessorEnvironment e, + public boolean preCheckAndPut(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, final Put put, final boolean result) throws IOException { @@ -140,7 +139,7 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor, } @Override - public boolean postCheckAndPut(final CoprocessorEnvironment e, + public boolean postCheckAndPut(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, final Put put, final boolean result) throws IOException { @@ -148,7 +147,7 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor, } @Override - public boolean preCheckAndDelete(final CoprocessorEnvironment e, + public boolean preCheckAndDelete(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, final Delete delete, final boolean result) throws IOException { @@ -156,7 +155,7 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor, } @Override - public boolean postCheckAndDelete(final CoprocessorEnvironment e, + public boolean postCheckAndDelete(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, final Delete delete, final boolean result) throws IOException { @@ -164,14 +163,14 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor, } @Override - public long preIncrementColumnValue(final CoprocessorEnvironment e, + public long preIncrementColumnValue(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, final long amount, final boolean writeToWAL) throws IOException { return amount; } @Override - public long postIncrementColumnValue(final CoprocessorEnvironment e, + public long postIncrementColumnValue(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, final long amount, final boolean writeToWAL, long result) throws IOException { @@ -179,48 +178,48 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor, } @Override - public void preIncrement(final CoprocessorEnvironment e, + public void preIncrement(final RegionCoprocessorEnvironment e, final Increment increment, final Result result) throws IOException { } @Override - public void postIncrement(final CoprocessorEnvironment e, + public void postIncrement(final RegionCoprocessorEnvironment e, final Increment increment, final Result result) throws IOException { } @Override - public InternalScanner preScannerOpen(final CoprocessorEnvironment e, + public InternalScanner preScannerOpen(final RegionCoprocessorEnvironment e, final Scan scan, final InternalScanner s) throws IOException { return s; } @Override - public InternalScanner postScannerOpen(final CoprocessorEnvironment e, + public InternalScanner postScannerOpen(final RegionCoprocessorEnvironment e, final Scan scan, final InternalScanner s) throws IOException { return s; } @Override - public boolean preScannerNext(final CoprocessorEnvironment e, + public boolean preScannerNext(final RegionCoprocessorEnvironment e, final InternalScanner s, final List results, final int limit, final boolean hasMore) throws IOException { return hasMore; } @Override - public boolean postScannerNext(final CoprocessorEnvironment e, + public boolean postScannerNext(final RegionCoprocessorEnvironment e, final InternalScanner s, final List results, final int limit, final boolean hasMore) throws IOException { return hasMore; } @Override - public void preScannerClose(final CoprocessorEnvironment e, + public void preScannerClose(final RegionCoprocessorEnvironment e, final InternalScanner s) throws IOException { } @Override - public void postScannerClose(final CoprocessorEnvironment e, + public void postScannerClose(final RegionCoprocessorEnvironment e, final InternalScanner s) throws IOException { } } diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java index c4fa5266343..b1acf421d9a 100644 --- a/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java @@ -33,11 +33,11 @@ public interface CoprocessorEnvironment { /** @return the HBase version as a string (e.g. "0.21.0") */ public String getHBaseVersion(); - /** @return the region associated with this coprocessor */ - public HRegion getRegion(); + /** @return the loaded coprocessor instance */ + public Coprocessor getInstance(); - /** @return reference to the region server services */ - public RegionServerServices getRegionServerServices(); + /** @return the priority assigned to the loaded coprocessor */ + public Coprocessor.Priority getPriority(); /** * @return an interface for accessing the given table diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java new file mode 100644 index 00000000000..68fe7b2cdcb --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -0,0 +1,556 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.VersionInfo; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.*; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Provides the common setup framework and runtime services for coprocessor + * invocation from HBase services. + * @param the specific environment extension that a concrete implementation + * provides + */ +public abstract class CoprocessorHost { + public static final String REGION_COPROCESSOR_CONF_KEY = + "hbase.coprocessor.region.classes"; + public static final String MASTER_COPROCESSOR_CONF_KEY = + "hbase.coprocessor.master.classes"; + private static final Log LOG = LogFactory.getLog(CoprocessorHost.class); + /** Ordered set of loaded coprocessors with lock */ + protected final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock(); + protected Set coprocessors = + new TreeSet(new EnvironmentPriorityComparator()); + // unique file prefix to use for local copies of jars when classloading + protected String pathPrefix; + + public CoprocessorHost() { + pathPrefix = UUID.randomUUID().toString(); + } + + /** + * Load system coprocessors. Read the class names from configuration. + * Called by constructor. + */ + protected void loadSystemCoprocessors(Configuration conf, String confKey) { + Class implClass = null; + + // load default coprocessors from configure file + String defaultCPClasses = conf.get(confKey); + if (defaultCPClasses == null || defaultCPClasses.length() == 0) + return; + StringTokenizer st = new StringTokenizer(defaultCPClasses, ","); + int priority = Coprocessor.Priority.SYSTEM.intValue(); + while (st.hasMoreTokens()) { + String className = st.nextToken(); + if (findCoprocessor(className) != null) { + continue; + } + ClassLoader cl = ClassLoader.getSystemClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + implClass = cl.loadClass(className); + load(implClass, Coprocessor.Priority.SYSTEM); + LOG.info("System coprocessor " + className + " was loaded " + + "successfully with priority (" + priority++ + ")."); + } catch (ClassNotFoundException e) { + LOG.warn("Class " + className + " cannot be found. " + + e.getMessage()); + } catch (IOException e) { + LOG.warn("Load coprocessor " + className + " failed. " + + e.getMessage()); + } + } + } + + /** + * Load a coprocessor implementation into the host + * @param path path to implementation jar + * @param className the main class name + * @param priority chaining priority + * @throws java.io.IOException Exception + */ + @SuppressWarnings("deprecation") + public void load(Path path, String className, Coprocessor.Priority priority) + throws IOException { + Class implClass = null; + + // Have we already loaded the class, perhaps from an earlier region open + // for the same table? + try { + implClass = getClass().getClassLoader().loadClass(className); + } catch (ClassNotFoundException e) { + LOG.info("Class " + className + " needs to be loaded from a file - " + + path.toString() + "."); + // go ahead to load from file system. + } + + // If not, load + if (implClass == null) { + // copy the jar to the local filesystem + if (!path.toString().endsWith(".jar")) { + throw new IOException(path.toString() + ": not a jar file?"); + } + FileSystem fs = path.getFileSystem(HBaseConfiguration.create()); + Path dst = new Path("/tmp/." + pathPrefix + + "." + className + "." + System.currentTimeMillis() + ".jar"); + fs.copyToLocalFile(path, dst); + fs.deleteOnExit(dst); + + // TODO: code weaving goes here + + // TODO: wrap heap allocations and enforce maximum usage limits + + /* TODO: inject code into loop headers that monitors CPU use and + aborts runaway user code */ + + // load the jar and get the implementation main class + String cp = System.getProperty("java.class.path"); + // NOTE: Path.toURL is deprecated (toURI instead) but the URLClassLoader + // unsuprisingly wants URLs, not URIs; so we will use the deprecated + // method which returns URLs for as long as it is available + List paths = new ArrayList(); + paths.add(new File(dst.toString()).getCanonicalFile().toURL()); + StringTokenizer st = new StringTokenizer(cp, File.pathSeparator); + while (st.hasMoreTokens()) { + paths.add((new File(st.nextToken())).getCanonicalFile().toURL()); + } + ClassLoader cl = new URLClassLoader(paths.toArray(new URL[]{}), + ClassLoader.getSystemClassLoader()); + Thread.currentThread().setContextClassLoader(cl); + try { + implClass = cl.loadClass(className); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + load(implClass, priority); + } + + /** + * @param implClass Implementation class + * @param priority priority + * @throws java.io.IOException Exception + */ + public void load(Class implClass, Coprocessor.Priority priority) + throws IOException { + // create the instance + Coprocessor impl; + Object o = null; + try { + o = implClass.newInstance(); + impl = (Coprocessor)o; + } catch (InstantiationException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + // create the environment + E env = createEnvironment(implClass, impl, priority); + if (env instanceof Environment) { + ((Environment)env).startup(); + } + + try { + coprocessorLock.writeLock().lock(); + coprocessors.add(env); + } finally { + coprocessorLock.writeLock().unlock(); + } + } + + /** + * Called when a new Coprocessor class is loaded + */ + public abstract E createEnvironment(Class implClass, Coprocessor instance, + Coprocessor.Priority priority); + + public void shutdown(CoprocessorEnvironment e) { + if (e instanceof Environment) { + ((Environment)e).shutdown(); + } else { + LOG.warn("Shutdown called on unknown environment: "+ + e.getClass().getName()); + } + } + + /** + * Find a coprocessor implementation by class name + * @param className the class name + * @return the coprocessor, or null if not found + */ + public Coprocessor findCoprocessor(String className) { + // initialize the coprocessors + try { + coprocessorLock.readLock().lock(); + for (E env: coprocessors) { + if (env.getInstance().getClass().getName().equals(className) || + env.getInstance().getClass().getSimpleName().equals(className)) { + return env.getInstance(); + } + } + return null; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * Environment priority comparator. + * Coprocessors are chained in sorted order. + */ + static class EnvironmentPriorityComparator implements Comparator { + public int compare(CoprocessorEnvironment env1, CoprocessorEnvironment env2) { + if (env1.getPriority().intValue() < env2.getPriority().intValue()) { + return -1; + } else if (env1.getPriority().intValue() > env2.getPriority().intValue()) { + return 1; + } + return 0; + } + } + + /** + * Encapsulation of the environment of each coprocessor + */ + public static class Environment implements CoprocessorEnvironment { + + /** + * A wrapper for HTable. Can be used to restrict privilege. + * + * Currently it just helps to track tables opened by a Coprocessor and + * facilitate close of them if it is aborted. + * + * We also disallow row locking. + * + * There is nothing now that will stop a coprocessor from using HTable + * objects directly instead of this API, but in the future we intend to + * analyze coprocessor implementations as they are loaded and reject those + * which attempt to use objects and methods outside the Environment + * sandbox. + */ + class HTableWrapper implements HTableInterface { + + private byte[] tableName; + private HTable table; + + public HTableWrapper(byte[] tableName) throws IOException { + this.tableName = tableName; + this.table = new HTable(tableName); + openTables.add(this); + } + + void internalClose() throws IOException { + table.close(); + } + + public Configuration getConfiguration() { + return table.getConfiguration(); + } + + public void close() throws IOException { + try { + internalClose(); + } finally { + openTables.remove(this); + } + } + + public Result getRowOrBefore(byte[] row, byte[] family) + throws IOException { + return table.getRowOrBefore(row, family); + } + + public Result get(Get get) throws IOException { + return table.get(get); + } + + public boolean exists(Get get) throws IOException { + return table.exists(get); + } + + public void put(Put put) throws IOException { + table.put(put); + } + + public void put(List puts) throws IOException { + table.put(puts); + } + + public void delete(Delete delete) throws IOException { + table.delete(delete); + } + + public void delete(List deletes) throws IOException { + table.delete(deletes); + } + + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) throws IOException { + return table.checkAndPut(row, family, qualifier, value, put); + } + + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) throws IOException { + return table.checkAndDelete(row, family, qualifier, value, delete); + } + + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount) throws IOException { + return table.incrementColumnValue(row, family, qualifier, amount); + } + + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + return table.incrementColumnValue(row, family, qualifier, amount, + writeToWAL); + } + + @Override + public Result increment(Increment increment) throws IOException { + return table.increment(increment); + } + + public void flushCommits() throws IOException { + table.flushCommits(); + } + + public boolean isAutoFlush() { + return table.isAutoFlush(); + } + + public ResultScanner getScanner(Scan scan) throws IOException { + return table.getScanner(scan); + } + + public ResultScanner getScanner(byte[] family) throws IOException { + return table.getScanner(family); + } + + public ResultScanner getScanner(byte[] family, byte[] qualifier) + throws IOException { + return table.getScanner(family, qualifier); + } + + public HTableDescriptor getTableDescriptor() throws IOException { + return table.getTableDescriptor(); + } + + public byte[] getTableName() { + return tableName; + } + + public RowLock lockRow(byte[] row) throws IOException { + throw new RuntimeException( + "row locking is not allowed within the coprocessor environment"); + } + + public void unlockRow(RowLock rl) throws IOException { + throw new RuntimeException( + "row locking is not allowed within the coprocessor environment"); + } + + @Override + public void batch(List actions, Object[] results) + throws IOException, InterruptedException { + table.batch(actions, results); + } + + @Override + public Object[] batch(List actions) + throws IOException, InterruptedException { + return table.batch(actions); + } + + @Override + public Result[] get(List gets) throws IOException { + return table.get(gets); + } + + @Override + public void coprocessorExec(Class protocol, + byte[] startKey, byte[] endKey, Batch.Call callable, + Batch.Callback callback) throws IOException, Throwable { + table.coprocessorExec(protocol, startKey, endKey, callable, callback); + } + + @Override + public Map coprocessorExec( + Class protocol, byte[] startKey, byte[] endKey, Batch.Call callable) + throws IOException, Throwable { + return table.coprocessorExec(protocol, startKey, endKey, callable); + } + + @Override + public T coprocessorProxy(Class protocol, + byte[] row) { + return table.coprocessorProxy(protocol, row); + } + } + + /** The coprocessor */ + public Coprocessor impl; + /** Chaining priority */ + protected Coprocessor.Priority priority = Coprocessor.Priority.USER; + /** Current coprocessor state */ + Coprocessor.State state = Coprocessor.State.UNINSTALLED; + /** Accounting for tables opened by the coprocessor */ + protected List openTables = + Collections.synchronizedList(new ArrayList()); + static final ThreadLocal bypass = new ThreadLocal() { + @Override protected Boolean initialValue() { + return Boolean.FALSE; + } + }; + static final ThreadLocal complete = new ThreadLocal() { + @Override protected Boolean initialValue() { + return Boolean.FALSE; + } + }; + + /** + * Constructor + * @param impl the coprocessor instance + * @param priority chaining priority + */ + public Environment(final Coprocessor impl, Coprocessor.Priority priority) { + this.impl = impl; + this.priority = priority; + this.state = Coprocessor.State.INSTALLED; + } + + /** Initialize the environment */ + public void startup() { + if (state == Coprocessor.State.INSTALLED || + state == Coprocessor.State.STOPPED) { + state = Coprocessor.State.STARTING; + try { + impl.start(this); + state = Coprocessor.State.ACTIVE; + } catch (IOException ioe) { + LOG.error("Error starting coprocessor "+impl.getClass().getName(), ioe); + } + } else { + LOG.warn("Not starting coprocessor "+impl.getClass().getName()+ + " because not inactive (state="+state.toString()+")"); + } + } + + /** Clean up the environment */ + protected void shutdown() { + if (state == Coprocessor.State.ACTIVE) { + state = Coprocessor.State.STOPPING; + try { + impl.stop(this); + state = Coprocessor.State.STOPPED; + } catch (IOException ioe) { + LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe); + } + } else { + LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+ + " because not active (state="+state.toString()+")"); + } + // clean up any table references + for (HTableInterface table: openTables) { + try { + ((HTableWrapper)table).internalClose(); + } catch (IOException e) { + // nothing can be done here + LOG.warn("Failed to close " + + Bytes.toStringBinary(table.getTableName()), e); + } + } + } + + public boolean shouldBypass() { + boolean current = bypass.get(); + bypass.set(false); + return current; + } + + public boolean shouldComplete() { + boolean current = complete.get(); + complete.set(false); + return current; + } + + @Override + public Coprocessor getInstance() { + return impl; + } + + @Override + public Coprocessor.Priority getPriority() { + return priority; + } + + /** @return the coprocessor environment version */ + @Override + public int getVersion() { + return Coprocessor.VERSION; + } + + /** @return the HBase release */ + @Override + public String getHBaseVersion() { + return VersionInfo.getVersion(); + } + + /** + * Open a table from within the Coprocessor environment + * @param tableName the table name + * @return an interface for manipulating the table + * @exception java.io.IOException Exception + */ + @Override + public HTableInterface getTable(byte[] tableName) throws IOException { + return new HTableWrapper(tableName); + } + + @Override + public void complete() { + complete.set(true); + } + + @Override + public void bypass() { + bypass.set(true); + } + }} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterCoprocessorEnvironment.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterCoprocessorEnvironment.java new file mode 100644 index 00000000000..5d8cf4c2543 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterCoprocessorEnvironment.java @@ -0,0 +1,28 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.hadoop.hbase.master.MasterServices; + +public interface MasterCoprocessorEnvironment extends CoprocessorEnvironment { + /** @return reference to the HMaster services */ + MasterServices getMasterServices(); +} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java new file mode 100644 index 00000000000..db0870b5cc4 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java @@ -0,0 +1,215 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.hadoop.hbase.*; + +import java.io.IOException; + +/** + * Defines coprocessor hooks for interacting with operations on the + * {@link org.apache.hadoop.hbase.master.HMaster} process. + */ +public interface MasterObserver extends Coprocessor { + + /** + * Called before a new table is created by + * {@link org.apache.hadoop.hbase.master.HMaster}. + */ + void preCreateTable(MasterCoprocessorEnvironment env, + HTableDescriptor desc, byte[][] splitKeys) throws IOException; + + /** + * Called after the initial table regions have been created. + * @param env the environment to interact with the framework and master + * @param regions the initial regions created for the table + * @param sync whether the client call is waiting for region assignment to + * complete before returning + * @throws IOException + */ + void postCreateTable(MasterCoprocessorEnvironment env, + HRegionInfo[] regions, boolean sync) throws IOException; + + /** + * Called before {@link org.apache.hadoop.hbase.master.HMaster} deletes a + * table + */ + void preDeleteTable(MasterCoprocessorEnvironment env, byte[] tableName) + throws IOException; + + /** + * Called after the table has been deleted, before returning to the client. + */ + void postDeleteTable(MasterCoprocessorEnvironment env, byte[] tableName) + throws IOException; + + /** + * Called prior to modifying a table's properties. + */ + void preModifyTable(MasterCoprocessorEnvironment env, final byte[] tableName, + HTableDescriptor htd) throws IOException; + + /** + * Called after {@link org.apache.hadoop.hbase.master.HMaster} has modified + * the table's properties in all the table regions. + */ + void postModifyTable(MasterCoprocessorEnvironment env, final byte[] tableName, + HTableDescriptor htd) throws IOException; + + /** + * Called prior to adding a new column family to the table. + */ + void preAddColumn(MasterCoprocessorEnvironment env, byte[] tableName, + HColumnDescriptor column) throws IOException; + + /** + * Called after the new column family has been created. + */ + void postAddColumn(MasterCoprocessorEnvironment env, byte[] tableName, + HColumnDescriptor column) throws IOException; + + /** + * Called prior to modifying a column family's attributes. + */ + void preModifyColumn(MasterCoprocessorEnvironment env, + byte [] tableName, HColumnDescriptor descriptor) throws IOException; + + /** + * Called after the column family has been updated. + */ + void postModifyColumn(MasterCoprocessorEnvironment env, byte[] tableName, + HColumnDescriptor descriptor) throws IOException; + + /** + * Called prior to deleting the entire column family. + */ + void preDeleteColumn(MasterCoprocessorEnvironment env, + final byte [] tableName, final byte[] c) throws IOException; + + /** + * Called after the column family has been deleted. + */ + void postDeleteColumn(MasterCoprocessorEnvironment env, + final byte [] tableName, final byte[] c) throws IOException; + + /** + * Called prior to enabling a table. + */ + void preEnableTable(MasterCoprocessorEnvironment env, final byte[] tableName) + throws IOException; + + /** + * Called after the table has been enabled. + */ + void postEnableTable(MasterCoprocessorEnvironment env, final byte[] tableName) + throws IOException; + + /** + * Called prior to disabling a table. + */ + void preDisableTable(MasterCoprocessorEnvironment env, final byte[] tableName) + throws IOException; + + /** + * Called after the table has been disabled. + */ + void postDisableTable(MasterCoprocessorEnvironment env, final byte[] tableName) + throws IOException; + + /** + * Called prior to moving a given region from one region server to another. + */ + void preMove(MasterCoprocessorEnvironment env, final HRegionInfo region, + final HServerInfo srcServer, final HServerInfo destServer) + throws UnknownRegionException; + + /** + * Called after the region move has been requested. + */ + void postMove(MasterCoprocessorEnvironment env, final HRegionInfo region, + final HServerInfo srcServer, final HServerInfo destServer) + throws UnknownRegionException; + + /** + * Called prior to assigning a specific region. + */ + void preAssign(MasterCoprocessorEnvironment env, final byte [] regionName, + final boolean force) throws IOException; + + /** + * Called after the region assignment has been requested. + */ + void postAssign(MasterCoprocessorEnvironment env, final HRegionInfo regionInfo) + throws IOException; + + /** + * Called prior to unassigning a given region. + */ + void preUnassign(MasterCoprocessorEnvironment env, final byte [] regionName, + final boolean force) throws IOException; + + /** + * Called after the region unassignment has been requested. + */ + void postUnassign(MasterCoprocessorEnvironment env, + final HRegionInfo regionInfo, final boolean force) throws IOException; + + /** + * Called prior to requesting rebalancing of the cluster regions, though after + * the initial checks for regions in transition and the balance switch flag. + */ + void preBalance(MasterCoprocessorEnvironment env) throws IOException; + + /** + * Called after the balancing plan has been submitted. + */ + void postBalance(MasterCoprocessorEnvironment env) throws IOException; + + /** + * Called prior to modifying the flag used to enable/disable region balancing. + * @param env the coprocessor instance's environment + * @param newValue the new flag value submitted in the call + */ + boolean preBalanceSwitch(MasterCoprocessorEnvironment env, + final boolean newValue) throws IOException; + + /** + * Called after the flag to enable/disable balancing has changed. + * @param env the coprocessor instance's environment + * @param oldValue the previously set balanceSwitch value + * @param newValue the newly set balanceSwitch value + */ + void postBalanceSwitch(MasterCoprocessorEnvironment env, + final boolean oldValue, final boolean newValue) throws IOException; + + /** + * Called prior to shutting down the full HBase cluster, including this + * {@link org.apache.hadoop.hbase.master.HMaster} process. + */ + void preShutdown(MasterCoprocessorEnvironment env) throws IOException; + + + /** + * Called immediatly prior to stopping this + * {@link org.apache.hadoop.hbase.master.HMaster} process. + */ + void preStopMaster(MasterCoprocessorEnvironment env) throws IOException; +} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java new file mode 100644 index 00000000000..da8076c0c8b --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java @@ -0,0 +1,33 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment { + /** @return the region associated with this coprocessor */ + public HRegion getRegion(); + + /** @return reference to the region server services */ + public RegionServerServices getRegionServerServices(); + +} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 97198ecb51f..42356c92c58 100644 --- a/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -42,25 +42,25 @@ public interface RegionObserver extends Coprocessor { * Called before the region is reported as open to the master. * @param e the environment provided by the region server */ - public void preOpen(final CoprocessorEnvironment e); + public void preOpen(final RegionCoprocessorEnvironment e); /** * Called after the region is reported as open to the master. * @param e the environment provided by the region server */ - public void postOpen(final CoprocessorEnvironment e); + public void postOpen(final RegionCoprocessorEnvironment e); /** * Called before the memstore is flushed to disk. * @param e the environment provided by the region server */ - public void preFlush(final CoprocessorEnvironment e); + public void preFlush(final RegionCoprocessorEnvironment e); /** * Called after the memstore is flushed to disk. * @param e the environment provided by the region server */ - public void postFlush(final CoprocessorEnvironment e); + public void postFlush(final RegionCoprocessorEnvironment e); /** * Called before compaction. @@ -68,7 +68,7 @@ public interface RegionObserver extends Coprocessor { * @param willSplit true if compaction will result in a split, false * otherwise */ - public void preCompact(final CoprocessorEnvironment e, + public void preCompact(final RegionCoprocessorEnvironment e, final boolean willSplit); /** @@ -77,7 +77,7 @@ public interface RegionObserver extends Coprocessor { * @param willSplit true if compaction will result in a split, false * otherwise */ - public void postCompact(final CoprocessorEnvironment e, + public void postCompact(final RegionCoprocessorEnvironment e, final boolean willSplit); /** @@ -85,7 +85,7 @@ public interface RegionObserver extends Coprocessor { * @param e the environment provided by the region server * (e.getRegion() returns the parent region) */ - public void preSplit(final CoprocessorEnvironment e); + public void preSplit(final RegionCoprocessorEnvironment e); /** * Called after the region is split. @@ -94,7 +94,7 @@ public interface RegionObserver extends Coprocessor { * @param l the left daughter region * @param r the right daughter region */ - public void postSplit(final CoprocessorEnvironment e, final HRegion l, + public void postSplit(final RegionCoprocessorEnvironment e, final HRegion l, final HRegion r); /** @@ -102,14 +102,16 @@ public interface RegionObserver extends Coprocessor { * @param e the environment provided by the region server * @param abortRequested true if the region server is aborting */ - public void preClose(final CoprocessorEnvironment e, boolean abortRequested); + public void preClose(final RegionCoprocessorEnvironment e, + boolean abortRequested); /** * Called after the region is reported as closed to the master. * @param e the environment provided by the region server * @param abortRequested true if the region server is aborting */ - public void postClose(final CoprocessorEnvironment e, boolean abortRequested); + public void postClose(final RegionCoprocessorEnvironment e, + boolean abortRequested); /** * Called before a client makes a GetClosestRowBefore request. @@ -126,7 +128,7 @@ public interface RegionObserver extends Coprocessor { * is not bypassed. * @throws IOException if an error occurred on the coprocessor */ - public void preGetClosestRowBefore(final CoprocessorEnvironment e, + public void preGetClosestRowBefore(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final Result result) throws IOException; @@ -141,7 +143,7 @@ public interface RegionObserver extends Coprocessor { * @param result the result to return to the client, modify as necessary * @throws IOException if an error occurred on the coprocessor */ - public void postGetClosestRowBefore(final CoprocessorEnvironment e, + public void postGetClosestRowBefore(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final Result result) throws IOException; @@ -159,7 +161,7 @@ public interface RegionObserver extends Coprocessor { * is not bypassed. * @throws IOException if an error occurred on the coprocessor */ - public void preGet(final CoprocessorEnvironment e, final Get get, + public void preGet(final RegionCoprocessorEnvironment e, final Get get, final List result) throws IOException; @@ -173,7 +175,7 @@ public interface RegionObserver extends Coprocessor { * @param result the result to return to the client, modify as necessary * @throws IOException if an error occurred on the coprocessor */ - public void postGet(final CoprocessorEnvironment e, final Get get, + public void postGet(final RegionCoprocessorEnvironment e, final Get get, final List result) throws IOException; @@ -190,7 +192,7 @@ public interface RegionObserver extends Coprocessor { * @return the value to return to the client if bypassing default processing * @throws IOException if an error occurred on the coprocessor */ - public boolean preExists(final CoprocessorEnvironment e, final Get get, + public boolean preExists(final RegionCoprocessorEnvironment e, final Get get, final boolean exists) throws IOException; @@ -205,7 +207,7 @@ public interface RegionObserver extends Coprocessor { * @return the result to return to the client * @throws IOException if an error occurred on the coprocessor */ - public boolean postExists(final CoprocessorEnvironment e, final Get get, + public boolean postExists(final RegionCoprocessorEnvironment e, final Get get, final boolean exists) throws IOException; @@ -221,7 +223,7 @@ public interface RegionObserver extends Coprocessor { * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ - public void prePut(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException; @@ -235,7 +237,7 @@ public interface RegionObserver extends Coprocessor { * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ - public void postPut(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException; @@ -251,7 +253,7 @@ public interface RegionObserver extends Coprocessor { * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ - public void preDelete(final CoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException; @@ -265,7 +267,7 @@ public interface RegionObserver extends Coprocessor { * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ - public void postDelete(final CoprocessorEnvironment e, + public void postDelete(final RegionCoprocessorEnvironment e, final Map> familyMap, final boolean writeToWAL) throws IOException; @@ -287,7 +289,7 @@ public interface RegionObserver extends Coprocessor { * processing * @throws IOException if an error occurred on the coprocessor */ - public boolean preCheckAndPut(final CoprocessorEnvironment e, + public boolean preCheckAndPut(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, final Put put, final boolean result) throws IOException; @@ -307,7 +309,7 @@ public interface RegionObserver extends Coprocessor { * @return the possibly transformed return value to return to client * @throws IOException if an error occurred on the coprocessor */ - public boolean postCheckAndPut(final CoprocessorEnvironment e, + public boolean postCheckAndPut(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, final Put put, final boolean result) throws IOException; @@ -329,7 +331,7 @@ public interface RegionObserver extends Coprocessor { * @return the value to return to client if bypassing default processing * @throws IOException if an error occurred on the coprocessor */ - public boolean preCheckAndDelete(final CoprocessorEnvironment e, + public boolean preCheckAndDelete(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, final Delete delete, final boolean result) throws IOException; @@ -349,7 +351,7 @@ public interface RegionObserver extends Coprocessor { * @return the possibly transformed returned value to return to client * @throws IOException if an error occurred on the coprocessor */ - public boolean postCheckAndDelete(final CoprocessorEnvironment e, + public boolean postCheckAndDelete(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, final Delete delete, final boolean result) throws IOException; @@ -370,7 +372,7 @@ public interface RegionObserver extends Coprocessor { * @return value to return to the client if bypassing default processing * @throws IOException if an error occurred on the coprocessor */ - public long preIncrementColumnValue(final CoprocessorEnvironment e, + public long preIncrementColumnValue(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, final long amount, final boolean writeToWAL) throws IOException; @@ -390,7 +392,7 @@ public interface RegionObserver extends Coprocessor { * @return the result to return to the client * @throws IOException if an error occurred on the coprocessor */ - public long postIncrementColumnValue(final CoprocessorEnvironment e, + public long postIncrementColumnValue(final RegionCoprocessorEnvironment e, final byte [] row, final byte [] family, final byte [] qualifier, final long amount, final boolean writeToWAL, final long result) throws IOException; @@ -407,10 +409,9 @@ public interface RegionObserver extends Coprocessor { * @param result The result to return to the client if default processing * is bypassed. Can be modified. Will not be used if default processing * is not bypassed. - * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ - public void preIncrement(final CoprocessorEnvironment e, + public void preIncrement(final RegionCoprocessorEnvironment e, final Increment increment, final Result result) throws IOException; @@ -421,11 +422,10 @@ public interface RegionObserver extends Coprocessor { * coprocessors * @param e the environment provided by the region server * @param increment increment object - * @param writeToWAL true if the change should be written to the WAL * @param result the result returned by increment, can be modified * @throws IOException if an error occurred on the coprocessor */ - public void postIncrement(final CoprocessorEnvironment e, + public void postIncrement(final RegionCoprocessorEnvironment e, final Increment increment, final Result result) throws IOException; @@ -443,7 +443,7 @@ public interface RegionObserver extends Coprocessor { * overriding default behavior, null otherwise * @throws IOException if an error occurred on the coprocessor */ - public InternalScanner preScannerOpen(final CoprocessorEnvironment e, + public InternalScanner preScannerOpen(final RegionCoprocessorEnvironment e, final Scan scan, final InternalScanner s) throws IOException; @@ -458,7 +458,7 @@ public interface RegionObserver extends Coprocessor { * @return the scanner instance to use * @throws IOException if an error occurred on the coprocessor */ - public InternalScanner postScannerOpen(final CoprocessorEnvironment e, + public InternalScanner postScannerOpen(final RegionCoprocessorEnvironment e, final Scan scan, final InternalScanner s) throws IOException; @@ -479,7 +479,7 @@ public interface RegionObserver extends Coprocessor { * @return 'has more' indication that should be sent to client * @throws IOException if an error occurred on the coprocessor */ - public boolean preScannerNext(final CoprocessorEnvironment e, + public boolean preScannerNext(final RegionCoprocessorEnvironment e, final InternalScanner s, final List result, final int limit, final boolean hasNext) throws IOException; @@ -497,7 +497,7 @@ public interface RegionObserver extends Coprocessor { * @return 'has more' indication that should be sent to client * @throws IOException if an error occurred on the coprocessor */ - public boolean postScannerNext(final CoprocessorEnvironment e, + public boolean postScannerNext(final RegionCoprocessorEnvironment e, final InternalScanner s, final List result, final int limit, final boolean hasNext) throws IOException; @@ -513,7 +513,7 @@ public interface RegionObserver extends Coprocessor { * @param s the scanner * @throws IOException if an error occurred on the coprocessor */ - public void preScannerClose(final CoprocessorEnvironment e, + public void preScannerClose(final RegionCoprocessorEnvironment e, final InternalScanner s) throws IOException; @@ -526,7 +526,7 @@ public interface RegionObserver extends Coprocessor { * @param s the scanner * @throws IOException if an error occurred on the coprocessor */ - public void postScannerClose(final CoprocessorEnvironment e, + public void postScannerClose(final RegionCoprocessorEnvironment e, final InternalScanner s) throws IOException; } diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/package-info.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/package-info.java index 1b7918cb826..05330f9cce8 100644 --- a/src/main/java/org/apache/hadoop/hbase/coprocessor/package-info.java +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/package-info.java @@ -286,7 +286,7 @@ or by HTableDescriptor for a newly created table. opened regions.)

Load from configuration

Whenever a region is opened, it will read coprocessor class names from -hbase.coprocessor.default.classes from Configuration. +hbase.coprocessor.region.classes from Configuration. Coprocessor framework will automatically load the configured classes as default coprocessors. The classes must be included in the classpath already. @@ -294,7 +294,7 @@ default coprocessors. The classes must be included in the classpath already.
   <property>
-    <name>hbase.coprocessor.default.classes</name>
+    <name>hbase.coprocessor.region.classes</name>
     <value>org.apache.hadoop.hbase.coprocessor.AccessControllCoprocessor, org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol</value>
     <description>A comma-separated list of Coprocessors that are loaded by
     default. For any override coprocessor method from RegionObservor or
diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 18f7787e1e1..cfd0fbc13cf 100644
--- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -170,6 +170,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
   private Thread catalogJanitorChore;
   private LogCleaner logCleaner;
 
+  private MasterCoprocessorHost cpHost;
+
   /**
    * Initializes the HMaster. The steps are as follows:
    * 

@@ -369,6 +371,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) + ", cluster-up flag was=" + wasUp); + // initialize master side coprocessors before we start handling requests + this.cpHost = new MasterCoprocessorHost(this, this.conf); + // start up all service threads. startServiceThreads(); @@ -675,6 +680,19 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.serverManager.getDeadServers()); return false; } + + if (this.cpHost != null) { + try { + if (this.cpHost.preBalance()) { + LOG.debug("Coprocessor bypassing balancer request"); + return false; + } + } catch (IOException ioe) { + LOG.error("Error invoking master coprocessor preBalance()", ioe); + return false; + } + } + Map> assignments = this.assignmentManager.getAssignments(); // Returned Map from AM does not include mention of servers w/o assignments. @@ -692,6 +710,14 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.assignmentManager.balance(plan); } } + if (this.cpHost != null) { + try { + this.cpHost.postBalance(); + } catch (IOException ioe) { + // balancing already succeeded so don't change the result + LOG.error("Error invoking master coprocessor postBalance()", ioe); + } + } } return true; } @@ -699,8 +725,19 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { @Override public boolean balanceSwitch(final boolean b) { boolean oldValue = this.balanceSwitch; - this.balanceSwitch = b; - LOG.info("Balance=" + b); + boolean newValue = b; + try { + if (this.cpHost != null) { + newValue = this.cpHost.preBalanceSwitch(newValue); + } + this.balanceSwitch = newValue; + LOG.info("Balance=" + newValue); + if (this.cpHost != null) { + this.cpHost.postBalanceSwitch(oldValue, newValue); + } + } catch (IOException ioe) { + LOG.warn("Error flipping balance switch", ioe); + } return oldValue; } @@ -721,8 +758,15 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.assignmentManager.unassign(hri); } else { dest = this.serverManager.getServerInfo(new String(destServerName)); + + if (this.cpHost != null) { + this.cpHost.preMove(p.getFirst(), p.getSecond(), dest); + } RegionPlan rp = new RegionPlan(p.getFirst(), p.getSecond(), dest); this.assignmentManager.balance(rp); + if (this.cpHost != null) { + this.cpHost.postMove(p.getFirst(), p.getSecond(), dest); + } } } @@ -737,6 +781,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { if (!isMasterRunning()) { throw new MasterNotRunningException(); } + if (cpHost != null) { + cpHost.preCreateTable(desc, splitKeys); + } HRegionInfo [] newRegions = null; if(splitKeys == null || splitKeys.length == 0) { newRegions = new HRegionInfo [] { new HRegionInfo(desc, null, null) }; @@ -810,6 +857,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { } } } + + if (cpHost != null) { + cpHost.postCreateTable(newRegions, sync); + } } private static boolean isCatalogTable(final byte [] tableName) { @@ -818,32 +869,68 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { } public void deleteTable(final byte [] tableName) throws IOException { + if (cpHost != null) { + cpHost.preDeleteTable(tableName); + } this.executorService.submit(new DeleteTableHandler(tableName, this, this)); + if (cpHost != null) { + cpHost.postDeleteTable(tableName); + } } public void addColumn(byte [] tableName, HColumnDescriptor column) throws IOException { + if (cpHost != null) { + cpHost.preAddColumn(tableName, column); + } new TableAddFamilyHandler(tableName, column, this, this).process(); + if (cpHost != null) { + cpHost.postAddColumn(tableName, column); + } } public void modifyColumn(byte [] tableName, HColumnDescriptor descriptor) throws IOException { + if (cpHost != null) { + cpHost.preModifyColumn(tableName, descriptor); + } new TableModifyFamilyHandler(tableName, descriptor, this, this).process(); + if (cpHost != null) { + cpHost.postModifyColumn(tableName, descriptor); + } } public void deleteColumn(final byte [] tableName, final byte [] c) throws IOException { + if (cpHost != null) { + cpHost.preDeleteColumn(tableName, c); + } new TableDeleteFamilyHandler(tableName, c, this, this).process(); + if (cpHost != null) { + cpHost.postDeleteColumn(tableName, c); + } } public void enableTable(final byte [] tableName) throws IOException { + if (cpHost != null) { + cpHost.preEnableTable(tableName); + } this.executorService.submit(new EnableTableHandler(this, tableName, catalogTracker, assignmentManager)); + if (cpHost != null) { + cpHost.postEnableTable(tableName); + } } public void disableTable(final byte [] tableName) throws IOException { + if (cpHost != null) { + cpHost.preDisableTable(tableName); + } this.executorService.submit(new DisableTableHandler(this, tableName, catalogTracker, assignmentManager)); + if (cpHost != null) { + cpHost.postDisableTable(tableName); + } } /** @@ -886,7 +973,13 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { @Override public void modifyTable(final byte[] tableName, HTableDescriptor htd) throws IOException { + if (cpHost != null) { + cpHost.preModifyTable(tableName, htd); + } this.executorService.submit(new ModifyTableHandler(tableName, htd, this, this)); + if (cpHost != null) { + cpHost.postModifyTable(tableName, htd); + } } @Override @@ -935,6 +1028,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { return zooKeeper; } + public MasterCoprocessorHost getCoprocessorHost() { + return cpHost; + } + @Override public String getServerName() { return address.toString(); @@ -952,6 +1049,13 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { @Override public void shutdown() { + if (cpHost != null) { + try { + cpHost.preShutdown(); + } catch (IOException ioe) { + LOG.error("Error call master coprocessor preShutdown()", ioe); + } + } this.serverManager.shutdownCluster(); try { this.clusterStatusTracker.setClusterDown(); @@ -962,6 +1066,13 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { @Override public void stopMaster() { + if (cpHost != null) { + try { + cpHost.preStopMaster(); + } catch (IOException ioe) { + LOG.error("Error call master coprocessor preStopMaster()", ioe); + } + } stop("Stopped by " + Thread.currentThread().getName()); } @@ -1008,10 +1119,18 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { @Override public void assign(final byte [] regionName, final boolean force) throws IOException { + if (cpHost != null) { + if (cpHost.preAssign(regionName, force)) { + return; + } + } Pair pair = MetaReader.getRegion(this.catalogTracker, regionName); if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName)); assignRegion(pair.getFirst()); + if (cpHost != null) { + cpHost.postAssign(pair.getFirst()); + } } public void assignRegion(HRegionInfo hri) { @@ -1021,12 +1140,20 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { @Override public void unassign(final byte [] regionName, final boolean force) throws IOException { + if (cpHost != null) { + if (cpHost.preUnassign(regionName, force)) { + return; + } + } Pair pair = MetaReader.getRegion(this.catalogTracker, regionName); if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName)); HRegionInfo hri = pair.getFirst(); if (force) this.assignmentManager.clearRegionFromTransition(hri); this.assignmentManager.unassign(hri, force); + if (cpHost != null) { + cpHost.postUnassign(hri, force); + } } /** diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java new file mode 100644 index 00000000000..d4c98725466 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -0,0 +1,553 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.coprocessor.*; + +import java.io.IOException; + +/** + * Provides the coprocessor framework and environment for master oriented + * operations. {@link HMaster} interacts with the loaded coprocessors + * through this class. + */ +public class MasterCoprocessorHost + extends CoprocessorHost { + + /** + * Coprocessor environment extension providing access to master related + * services. + */ + static class MasterEnvironment extends CoprocessorHost.Environment + implements MasterCoprocessorEnvironment { + private MasterServices masterServices; + + public MasterEnvironment(Class implClass, Coprocessor impl, + Coprocessor.Priority priority, MasterServices services) { + super(impl, priority); + this.masterServices = services; + } + + public MasterServices getMasterServices() { + return masterServices; + } + } + + private MasterServices masterServices; + + MasterCoprocessorHost(final MasterServices services, final Configuration conf) { + this.masterServices = services; + + loadSystemCoprocessors(conf, MASTER_COPROCESSOR_CONF_KEY); + } + + @Override + public MasterEnvironment createEnvironment(Class implClass, + Coprocessor instance, Coprocessor.Priority priority) { + return new MasterEnvironment(implClass, instance, priority, masterServices); + } + + /* Implementation of hooks for invoking MasterObservers */ + void preCreateTable(HTableDescriptor desc, byte[][] splitKeys) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).preCreateTable(env, desc, splitKeys); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void postCreateTable(HRegionInfo[] regions, boolean sync) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).postCreateTable(env, regions, sync); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void preDeleteTable(byte[] tableName) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).preDeleteTable(env, tableName); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void postDeleteTable(byte[] tableName) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).postDeleteTable(env, tableName); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void preModifyTable(final byte[] tableName, HTableDescriptor htd) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).preModifyTable(env, tableName, htd); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void postModifyTable(final byte[] tableName, HTableDescriptor htd) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).postModifyTable(env, tableName, htd); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void preAddColumn(byte [] tableName, HColumnDescriptor column) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).preAddColumn(env, tableName, column); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void postAddColumn(byte [] tableName, HColumnDescriptor column) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).postAddColumn(env, tableName, column); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void preModifyColumn(byte [] tableName, HColumnDescriptor descriptor) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).preModifyColumn( + env, tableName, descriptor); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void postModifyColumn(byte [] tableName, HColumnDescriptor descriptor) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).postModifyColumn( + env, tableName, descriptor); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void preDeleteColumn(final byte [] tableName, final byte [] c) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).preDeleteColumn(env, tableName, c); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void postDeleteColumn(final byte [] tableName, final byte [] c) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).postDeleteColumn(env, tableName, c); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void preEnableTable(final byte [] tableName) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).preEnableTable(env, tableName); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void postEnableTable(final byte [] tableName) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).postEnableTable(env, tableName); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void preDisableTable(final byte [] tableName) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).preDisableTable(env, tableName); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void postDisableTable(final byte [] tableName) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).postDisableTable(env, tableName); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void preMove(final HRegionInfo region, final HServerInfo srcServer, final HServerInfo destServer) + throws UnknownRegionException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).preMove( + env, region, srcServer, destServer); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void postMove(final HRegionInfo region, final HServerInfo srcServer, final HServerInfo destServer) + throws UnknownRegionException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).postMove( + env, region, srcServer, destServer); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + boolean preAssign(final byte [] regionName, final boolean force) + throws IOException { + boolean bypass = false; + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).preAssign(env, regionName, force); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + return bypass; + } + + void postAssign(final HRegionInfo regionInfo) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).postAssign(env, regionInfo); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + boolean preUnassign(final byte [] regionName, final boolean force) + throws IOException { + boolean bypass = false; + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).preUnassign( + env, regionName, force); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + return bypass; + } + + void postUnassign(final HRegionInfo regionInfo, final boolean force) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).postUnassign( + env, regionInfo, force); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + boolean preBalance() throws IOException { + try { + boolean bypass = false; + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).preBalance(env); + bypass |= env.shouldBypass(); + if (env.shouldComplete()) { + break; + } + } + } + return bypass; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void postBalance() throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).postBalance(env); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + boolean preBalanceSwitch(final boolean b) throws IOException { + boolean balance = b; + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + balance = ((MasterObserver)env.getInstance()).preBalanceSwitch( + env, balance); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + return balance; + } + + void postBalanceSwitch(final boolean oldValue, final boolean newValue) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).postBalanceSwitch( + env, oldValue, newValue); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void preShutdown() throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).preShutdown(env); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void preStopMaster() throws IOException { + try { + coprocessorLock.readLock().lock(); + for (MasterEnvironment env: coprocessors) { + if (env.getInstance() instanceof MasterObserver) { + ((MasterObserver)env.getInstance()).preStopMaster(env); + if (env.shouldComplete()) { + break; + } + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 593254ba38e..2b876901c0c 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -23,7 +23,9 @@ import java.io.IOException; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; /** * Services Master supplies @@ -56,4 +58,15 @@ public interface MasterServices { * @throws TableNotFoundException */ public void checkTableModifiable(final byte [] tableName) throws IOException; + + /** + * @return Implementation of {@link org.apache.hadoop.hbase.catalog.CatalogTracker} or null. + */ + public CatalogTracker getCatalogTracker(); + + /* + * @return Implementation of {@link ZooKeeperWatcher} or null. + */ + public ZooKeeperWatcher getZooKeeperWatcher(); + } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 1d48131799f..098b23a10b3 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -248,7 +248,7 @@ public class HRegion implements HeapSize { // , Writable{ new ReadWriteConsistencyControl(); // Coprocessor host - private CoprocessorHost coprocessorHost; + private RegionCoprocessorHost coprocessorHost; /** * Name of the region info file that resides just under the region directory. @@ -319,7 +319,7 @@ public class HRegion implements HeapSize { // , Writable{ // don't initialize coprocessors if not running within a regionserver // TODO: revisit if coprocessors should load in other cases if (rsServices != null) { - this.coprocessorHost = new CoprocessorHost(this, rsServices, conf); + this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf); } if (LOG.isDebugEnabled()) { // Write out region name as string and its encoded name. @@ -3557,12 +3557,12 @@ public class HRegion implements HeapSize { // , Writable{ } /** @return the coprocessor host */ - public CoprocessorHost getCoprocessorHost() { + public RegionCoprocessorHost getCoprocessorHost() { return coprocessorHost; } /** @param coprocessorHost the new coprocessor host */ - public void setCoprocessorHost(final CoprocessorHost coprocessorHost) { + public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) { this.coprocessorHost = coprocessorHost; } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java similarity index 51% rename from src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java rename to src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index f71fea646cc..7c4e1a1a1f1 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -30,11 +30,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; -import org.apache.hadoop.hbase.coprocessor.Coprocessor; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.*; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.util.Bytes; @@ -47,298 +46,39 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; /** - * Implements the coprocessor environment and runtime support. + * Implements the coprocessor environment and runtime support for coprocessors + * loaded within a {@link HRegion}. */ -public class CoprocessorHost { +public class RegionCoprocessorHost + extends CoprocessorHost { - /** - * Environment priority comparator. - * Coprocessors are chained in sorted order. - */ - class EnvironmentPriorityComparator implements Comparator { - public int compare(Environment env1, Environment env2) { - if (env1.priority.intValue() < env2.priority.intValue()) { - return -1; - } else if (env1.priority.intValue() > env2.priority.intValue()) { - return 1; - } - return 0; - } - } + private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class); /** * Encapsulation of the environment of each coprocessor */ - class Environment implements CoprocessorEnvironment { + static class RegionEnvironment extends CoprocessorHost.Environment + implements RegionCoprocessorEnvironment { - /** - * A wrapper for HTable. Can be used to restrict privilege. - * - * Currently it just helps to track tables opened by a Coprocessor and - * facilitate close of them if it is aborted. - * - * We also disallow row locking. - * - * There is nothing now that will stop a coprocessor from using HTable - * objects directly instead of this API, but in the future we intend to - * analyze coprocessor implementations as they are loaded and reject those - * which attempt to use objects and methods outside the Environment - * sandbox. - */ - class HTableWrapper implements HTableInterface { - - private byte[] tableName; - private HTable table; - - public HTableWrapper(byte[] tableName) throws IOException { - this.tableName = tableName; - this.table = new HTable(tableName); - openTables.add(this); - } - - void internalClose() throws IOException { - table.close(); - } - - public Configuration getConfiguration() { - return table.getConfiguration(); - } - - public void close() throws IOException { - try { - internalClose(); - } finally { - openTables.remove(this); - } - } - - public Result getRowOrBefore(byte[] row, byte[] family) - throws IOException { - return table.getRowOrBefore(row, family); - } - - public Result get(Get get) throws IOException { - return table.get(get); - } - - public boolean exists(Get get) throws IOException { - return table.exists(get); - } - - public void put(Put put) throws IOException { - table.put(put); - } - - public void put(List puts) throws IOException { - table.put(puts); - } - - public void delete(Delete delete) throws IOException { - table.delete(delete); - } - - public void delete(List deletes) throws IOException { - table.delete(deletes); - } - - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Put put) throws IOException { - return table.checkAndPut(row, family, qualifier, value, put); - } - - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Delete delete) throws IOException { - return table.checkAndDelete(row, family, qualifier, value, delete); - } - - public long incrementColumnValue(byte[] row, byte[] family, - byte[] qualifier, long amount) throws IOException { - return table.incrementColumnValue(row, family, qualifier, amount); - } - - public long incrementColumnValue(byte[] row, byte[] family, - byte[] qualifier, long amount, boolean writeToWAL) - throws IOException { - return table.incrementColumnValue(row, family, qualifier, amount, - writeToWAL); - } - - @Override - public Result increment(Increment increment) throws IOException { - return table.increment(increment); - } - - public void flushCommits() throws IOException { - table.flushCommits(); - } - - public boolean isAutoFlush() { - return table.isAutoFlush(); - } - - public ResultScanner getScanner(Scan scan) throws IOException { - return table.getScanner(scan); - } - - public ResultScanner getScanner(byte[] family) throws IOException { - return table.getScanner(family); - } - - public ResultScanner getScanner(byte[] family, byte[] qualifier) - throws IOException { - return table.getScanner(family, qualifier); - } - - public HTableDescriptor getTableDescriptor() throws IOException { - return table.getTableDescriptor(); - } - - public byte[] getTableName() { - return tableName; - } - - public RowLock lockRow(byte[] row) throws IOException { - throw new RuntimeException( - "row locking is not allowed within the coprocessor environment"); - } - - public void unlockRow(RowLock rl) throws IOException { - throw new RuntimeException( - "row locking is not allowed within the coprocessor environment"); - } - - @Override - public void batch(List actions, Object[] results) - throws IOException, InterruptedException { - table.batch(actions, results); - } - - @Override - public Object[] batch(List actions) - throws IOException, InterruptedException { - return table.batch(actions); - } - - @Override - public Result[] get(List gets) throws IOException { - return table.get(gets); - } - - @Override - public void coprocessorExec(Class protocol, - byte[] startKey, byte[] endKey, Call callable, - Callback callback) throws IOException, Throwable { - table.coprocessorExec(protocol, startKey, endKey, callable, callback); - } - - @Override - public Map coprocessorExec( - Class protocol, byte[] startKey, byte[] endKey, Call callable) - throws IOException, Throwable { - return table.coprocessorExec(protocol, startKey, endKey, callable); - } - - @Override - public T coprocessorProxy(Class protocol, - byte[] row) { - return table.coprocessorProxy(protocol, row); - } - } - - /** The coprocessor */ - Coprocessor impl; - /** Environment variables */ - Map vars = new ConcurrentHashMap(); - /** Chaining priority */ - Coprocessor.Priority priority = Coprocessor.Priority.USER; - /** Current coprocessor state */ - Coprocessor.State state = Coprocessor.State.UNINSTALLED; - /** Accounting for tables opened by the coprocessor */ - List openTables = - Collections.synchronizedList(new ArrayList()); + private HRegion region; + private RegionServerServices rsServices; /** * Constructor * @param impl the coprocessor instance * @param priority chaining priority */ - public Environment(final Coprocessor impl, Coprocessor.Priority priority) { - this.impl = impl; - this.priority = priority; - state = Coprocessor.State.INSTALLED; - } - - /** Initialize the environment */ - void startup() { - if (state == Coprocessor.State.INSTALLED || - state == Coprocessor.State.STOPPED) { - state = Coprocessor.State.STARTING; - try { - impl.start(this); - state = Coprocessor.State.ACTIVE; - } catch (IOException ioe) { - LOG.error("Error starting coprocessor "+impl.getClass().getName(), ioe); - } - } else { - LOG.warn("Not starting coprocessor "+impl.getClass().getName()+ - " because not inactive (state="+state.toString()+")"); - } - } - - /** Clean up the environment */ - void shutdown() { - if (state == Coprocessor.State.ACTIVE) { - state = Coprocessor.State.STOPPING; - try { - impl.stop(this); - state = Coprocessor.State.STOPPED; - } catch (IOException ioe) { - LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe); - } - } else { - LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+ - " because not active (state="+state.toString()+")"); - } - // clean up any table references - for (HTableInterface table: openTables) { - try { - ((HTableWrapper)table).internalClose(); - } catch (IOException e) { - // nothing can be done here - LOG.warn("Failed to close " + - Bytes.toStringBinary(table.getTableName()), e); - } - } - } - - boolean shouldBypass() { - boolean current = bypass.get(); - bypass.set(false); - return current; - } - - boolean shouldComplete() { - boolean current = complete.get(); - complete.set(false); - return current; - } - - /** @return the coprocessor environment version */ - @Override - public int getVersion() { - return Coprocessor.VERSION; - } - - /** @return the HBase release */ - @Override - public String getHBaseVersion() { - return VersionInfo.getVersion(); + public RegionEnvironment(final Coprocessor impl, + Coprocessor.Priority priority, final HRegion region, + final RegionServerServices services) { + super(impl, priority); + this.region = region; + this.rsServices = services; } /** @return the region */ @@ -353,236 +93,37 @@ public class CoprocessorHost { return rsServices; } - /** - * Open a table from within the Coprocessor environment - * @param tableName the table name - * @return an interface for manipulating the table - * @exception IOException Exception - */ - @Override - public HTableInterface getTable(byte[] tableName) throws IOException { - return new HTableWrapper(tableName); - } - - @Override - public void complete() { - complete.set(true); - } - - @Override - public void bypass() { - bypass.set(true); + public void shutdown() { + super.shutdown(); } } - static final Log LOG = LogFactory.getLog(CoprocessorHost.class); static final Pattern attrSpecMatch = Pattern.compile("(.+):(.+):(.+)"); /** The region server services */ RegionServerServices rsServices; /** The region */ HRegion region; - /** Ordered set of loaded coprocessors with lock */ - final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock(); - final Set coprocessors = - new TreeSet(new EnvironmentPriorityComparator()); - static final ThreadLocal bypass = new ThreadLocal() { - @Override protected Boolean initialValue() { - return Boolean.FALSE; - } - }; - static final ThreadLocal complete = new ThreadLocal() { - @Override protected Boolean initialValue() { - return Boolean.FALSE; - } - }; /** * Constructor * @param region the region - * @param rsServices an interface provide access to region server facilities + * @param rsServices interface to available region server functionality * @param conf the configuration */ - public CoprocessorHost(final HRegion region, + public RegionCoprocessorHost(final HRegion region, final RegionServerServices rsServices, final Configuration conf) { this.rsServices = rsServices; this.region = region; + this.pathPrefix = this.region.getRegionNameAsString().replace(',', '_'); // load system default cp's from configuration. - loadSystemCoprocessors(conf); + loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); // load Coprocessor From HDFS loadTableCoprocessors(); } - /** - * Load system coprocessors. Read the class names from configuration. - * Called by constructor. - */ - private void loadSystemCoprocessors(Configuration conf) { - Class implClass = null; - - // load default coprocessors from configure file - String defaultCPClasses = conf.get("hbase.coprocessor.default.classes"); - if (defaultCPClasses == null || defaultCPClasses.length() == 0) - return; - StringTokenizer st = new StringTokenizer(defaultCPClasses, ","); - int priority = Coprocessor.Priority.SYSTEM.intValue(); - while (st.hasMoreTokens()) { - String className = st.nextToken(); - if (findCoprocessor(className) != null) { - continue; - } - ClassLoader cl = ClassLoader.getSystemClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - implClass = cl.loadClass(className); - load(implClass, Coprocessor.Priority.SYSTEM); - LOG.info("System coprocessor " + className + " was loaded " + - "successfully with priority (" + priority++ + ")."); - } catch (ClassNotFoundException e) { - LOG.warn("Class " + className + " cannot be found. " + - e.getMessage()); - } catch (IOException e) { - LOG.warn("Load coprocessor " + className + " failed. " + - e.getMessage()); - } - } - } - - /** - * Load a coprocessor implementation into the host - * @param path path to implementation jar - * @param className the main class name - * @param priority chaining priority - * @throws IOException Exception - */ - @SuppressWarnings("deprecation") - public void load(Path path, String className, Coprocessor.Priority priority) - throws IOException { - Class implClass = null; - - // Have we already loaded the class, perhaps from an earlier region open - // for the same table? - try { - implClass = getClass().getClassLoader().loadClass(className); - } catch (ClassNotFoundException e) { - LOG.info("Class " + className + " needs to be loaded from a file - " + - path.toString() + "."); - // go ahead to load from file system. - } - - // If not, load - if (implClass == null) { - // copy the jar to the local filesystem - if (!path.toString().endsWith(".jar")) { - throw new IOException(path.toString() + ": not a jar file?"); - } - FileSystem fs = path.getFileSystem(HBaseConfiguration.create()); - Path dst = new Path("/tmp/." + - region.getRegionNameAsString().replace(',', '_') + - "." + className + "." + System.currentTimeMillis() + ".jar"); - fs.copyToLocalFile(path, dst); - fs.deleteOnExit(dst); - - // TODO: code weaving goes here - - // TODO: wrap heap allocations and enforce maximum usage limits - - /* TODO: inject code into loop headers that monitors CPU use and - aborts runaway user code */ - - // load the jar and get the implementation main class - String cp = System.getProperty("java.class.path"); - // NOTE: Path.toURL is deprecated (toURI instead) but the URLClassLoader - // unsuprisingly wants URLs, not URIs; so we will use the deprecated - // method which returns URLs for as long as it is available - List paths = new ArrayList(); - paths.add(new File(dst.toString()).getCanonicalFile().toURL()); - StringTokenizer st = new StringTokenizer(cp, File.pathSeparator); - while (st.hasMoreTokens()) { - paths.add((new File(st.nextToken())).getCanonicalFile().toURL()); - } - ClassLoader cl = new URLClassLoader(paths.toArray(new URL[]{}), - ClassLoader.getSystemClassLoader()); - Thread.currentThread().setContextClassLoader(cl); - try { - implClass = cl.loadClass(className); - } catch (ClassNotFoundException e) { - throw new IOException(e); - } - } - - load(implClass, priority); - } - - /** - * @param implClass Implementation class - * @param priority priority - * @throws IOException Exception - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void load(Class implClass, Coprocessor.Priority priority) - throws IOException { - // create the instance - Coprocessor impl; - Object o = null; - try { - o = implClass.newInstance(); - impl = (Coprocessor)o; - } catch (InstantiationException e) { - throw new IOException(e); - } catch (IllegalAccessException e) { - throw new IOException(e); - } - // create the environment - Environment env = new Environment(impl, priority); - env.startup(); - - // Check if it's an Endpoint. - // Due to current dynamic protocol design, Endpoint - // uses a different way to be registered and executed. - // It uses a visitor pattern to invoke registered Endpoint - // method. - for (Class c : implClass.getInterfaces()) { - if (CoprocessorProtocol.class.isAssignableFrom(c)) { - region.registerProtocol(c, (CoprocessorProtocol)o); - break; - } - } - try { - coprocessorLock.writeLock().lock(); - coprocessors.add(env); - } finally { - coprocessorLock.writeLock().unlock(); - } - } - - /** - * Find a coprocessor implementation by class name - * @param className the class name - * @return the coprocessor, or null if not found - */ - public Coprocessor findCoprocessor(String className) { - // initialize the coprocessors - try { - coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl.getClass().getName().equals(className)) { - return env.impl; - } - } - for (Environment env: coprocessors) { - if (env.impl.getClass().getName().endsWith(className)) { - return env.impl; - } - } - return null; - } finally { - coprocessorLock.readLock().unlock(); - } - } - void loadTableCoprocessors () { // scan the table attributes for coprocessor load specifications // initialize the coprocessors @@ -613,6 +154,24 @@ public class CoprocessorHost { } } + @Override + public RegionEnvironment createEnvironment( + Class implClass, Coprocessor instance, Coprocessor.Priority priority) { + // Check if it's an Endpoint. + // Due to current dynamic protocol design, Endpoint + // uses a different way to be registered and executed. + // It uses a visitor pattern to invoke registered Endpoint + // method. + for (Class c : implClass.getInterfaces()) { + if (CoprocessorProtocol.class.isAssignableFrom(c)) { + region.registerProtocol(c, (CoprocessorProtocol)instance); + break; + } + } + + return new RegionEnvironment(instance, priority, region, rsServices); + } + /** * Invoked before a region open */ @@ -620,9 +179,9 @@ public class CoprocessorHost { loadTableCoprocessors(); try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).preOpen(env); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).preOpen(env); if (env.shouldComplete()) { break; } @@ -639,9 +198,9 @@ public class CoprocessorHost { public void postOpen() { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postOpen(env); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).postOpen(env); if (env.shouldComplete()) { break; } @@ -659,9 +218,9 @@ public class CoprocessorHost { public void preClose(boolean abortRequested) { try { coprocessorLock.writeLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).preClose(env, abortRequested); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).preClose(env, abortRequested); } } } finally { @@ -676,11 +235,11 @@ public class CoprocessorHost { public void postClose(boolean abortRequested) { try { coprocessorLock.writeLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postClose(env, abortRequested); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).postClose(env, abortRequested); } - env.shutdown(); + shutdown(env); } } finally { coprocessorLock.writeLock().unlock(); @@ -694,9 +253,9 @@ public class CoprocessorHost { public void preCompact(boolean willSplit) { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).preCompact(env, willSplit); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).preCompact(env, willSplit); if (env.shouldComplete()) { break; } @@ -714,9 +273,9 @@ public class CoprocessorHost { public void postCompact(boolean willSplit) { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postCompact(env, willSplit); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).postCompact(env, willSplit); if (env.shouldComplete()) { break; } @@ -733,9 +292,9 @@ public class CoprocessorHost { public void preFlush() { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).preFlush(env); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).preFlush(env); if (env.shouldComplete()) { break; } @@ -752,9 +311,9 @@ public class CoprocessorHost { public void postFlush() { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postFlush(env); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).postFlush(env); if (env.shouldComplete()) { break; } @@ -771,9 +330,9 @@ public class CoprocessorHost { public void preSplit() { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).preSplit(env); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).preSplit(env); if (env.shouldComplete()) { break; } @@ -792,9 +351,9 @@ public class CoprocessorHost { public void postSplit(HRegion l, HRegion r) { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postSplit(env, l, r); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).postSplit(env, l, r); if (env.shouldComplete()) { break; } @@ -819,9 +378,9 @@ public class CoprocessorHost { try { boolean bypass = false; coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).preGetClosestRowBefore(env, row, family, + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).preGetClosestRowBefore(env, row, family, result); bypass |= env.shouldBypass(); if (env.shouldComplete()) { @@ -845,9 +404,9 @@ public class CoprocessorHost { final Result result) throws IOException { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postGetClosestRowBefore(env, row, family, + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).postGetClosestRowBefore(env, row, family, result); if (env.shouldComplete()) { break; @@ -869,9 +428,9 @@ public class CoprocessorHost { try { boolean bypass = false; coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).preGet(env, get, results); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).preGet(env, get, results); bypass |= env.shouldBypass(); if (env.shouldComplete()) { break; @@ -891,12 +450,12 @@ public class CoprocessorHost { * @exception IOException Exception */ public void postGet(final Get get, final List results) - throws IOException { + throws IOException { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postGet(env, get, results); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).postGet(env, get, results); if (env.shouldComplete()) { break; } @@ -918,15 +477,15 @@ public class CoprocessorHost { boolean bypass = false; boolean exists = false; coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - exists = ((RegionObserver)env.impl).preExists(env, get, exists); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + exists = ((RegionObserver)env.getInstance()).preExists(env, get, exists); bypass |= env.shouldBypass(); if (env.shouldComplete()) { break; - } } } + } return bypass ? exists : null; } finally { coprocessorLock.readLock().unlock(); @@ -943,9 +502,9 @@ public class CoprocessorHost { throws IOException { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - exists = ((RegionObserver)env.impl).postExists(env, get, exists); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + exists = ((RegionObserver)env.getInstance()).postExists(env, get, exists); if (env.shouldComplete()) { break; } @@ -968,9 +527,9 @@ public class CoprocessorHost { try { boolean bypass = false; coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).prePut(env, familyMap, writeToWAL); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).prePut(env, familyMap, writeToWAL); bypass |= env.shouldBypass(); if (env.shouldComplete()) { break; @@ -992,9 +551,9 @@ public class CoprocessorHost { final boolean writeToWAL) throws IOException { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postPut(env, familyMap, writeToWAL); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).postPut(env, familyMap, writeToWAL); if (env.shouldComplete()) { break; } @@ -1016,9 +575,9 @@ public class CoprocessorHost { try { boolean bypass = false; coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).preDelete(env, familyMap, writeToWAL); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).preDelete(env, familyMap, writeToWAL); bypass |= env.shouldBypass(); if (env.shouldComplete()) { break; @@ -1040,9 +599,9 @@ public class CoprocessorHost { final boolean writeToWAL) throws IOException { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postDelete(env, familyMap, writeToWAL); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).postDelete(env, familyMap, writeToWAL); if (env.shouldComplete()) { break; } @@ -1071,9 +630,9 @@ public class CoprocessorHost { boolean bypass = false; boolean result = false; coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - result = ((RegionObserver)env.impl).preCheckAndPut(env, row, family, + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + result = ((RegionObserver)env.getInstance()).preCheckAndPut(env, row, family, qualifier, value, put, result); bypass |= env.shouldBypass(); if (env.shouldComplete()) { @@ -1102,9 +661,9 @@ public class CoprocessorHost { { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - result = ((RegionObserver)env.impl).postCheckAndPut(env, row, + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + result = ((RegionObserver)env.getInstance()).postCheckAndPut(env, row, family, qualifier, value, put, result); if (env.shouldComplete()) { break; @@ -1135,9 +694,9 @@ public class CoprocessorHost { boolean bypass = false; boolean result = false; coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - result = ((RegionObserver)env.impl).preCheckAndDelete(env, row, + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + result = ((RegionObserver)env.getInstance()).preCheckAndDelete(env, row, family, qualifier, value, delete, result); bypass |= env.shouldBypass(); if (env.shouldComplete()) { @@ -1166,15 +725,15 @@ public class CoprocessorHost { { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - result = ((RegionObserver)env.impl).postCheckAndDelete(env, row, + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + result = ((RegionObserver)env.getInstance()).postCheckAndDelete(env, row, family, qualifier, value, delete, result); if (env.shouldComplete()) { break; - } } } + } return result; } finally { coprocessorLock.readLock().unlock(); @@ -1197,10 +756,10 @@ public class CoprocessorHost { try { boolean bypass = false; coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - amount = ((RegionObserver)env.impl).preIncrementColumnValue(env, - row, family, qualifier, amount, writeToWAL); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + amount = ((RegionObserver)env.getInstance()).preIncrementColumnValue(env, + row, family, qualifier, amount, writeToWAL); bypass |= env.shouldBypass(); if (env.shouldComplete()) { break; @@ -1228,10 +787,10 @@ public class CoprocessorHost { long result) throws IOException { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - result = ((RegionObserver)env.impl).postIncrementColumnValue(env, - row, family, qualifier, amount, writeToWAL, result); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + result = ((RegionObserver)env.getInstance()).postIncrementColumnValue(env, + row, family, qualifier, amount, writeToWAL, result); if (env.shouldComplete()) { break; } @@ -1255,9 +814,9 @@ public class CoprocessorHost { boolean bypass = false; Result result = new Result(); coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).preIncrement(env, increment, result); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).preIncrement(env, increment, result); bypass |= env.shouldBypass(); if (env.shouldComplete()) { break; @@ -1279,9 +838,9 @@ public class CoprocessorHost { throws IOException { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postIncrement(env, increment, result); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).postIncrement(env, increment, result); if (env.shouldComplete()) { break; } @@ -1303,9 +862,9 @@ public class CoprocessorHost { boolean bypass = false; InternalScanner s = null; coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - s = ((RegionObserver)env.impl).preScannerOpen(env, scan, s); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + s = ((RegionObserver)env.getInstance()).preScannerOpen(env, scan, s); bypass |= env.shouldBypass(); if (env.shouldComplete()) { break; @@ -1328,9 +887,9 @@ public class CoprocessorHost { throws IOException { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - s = ((RegionObserver)env.impl).postScannerOpen(env, scan, s); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + s = ((RegionObserver)env.getInstance()).postScannerOpen(env, scan, s); if (env.shouldComplete()) { break; } @@ -1356,9 +915,9 @@ public class CoprocessorHost { boolean bypass = false; boolean hasNext = false; coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - hasNext = ((RegionObserver)env.impl).preScannerNext(env, s, results, + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + hasNext = ((RegionObserver)env.getInstance()).preScannerNext(env, s, results, limit, hasNext); bypass |= env.shouldBypass(); if (env.shouldComplete()) { @@ -1385,9 +944,9 @@ public class CoprocessorHost { throws IOException { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - hasMore = ((RegionObserver)env.impl).postScannerNext(env, s, + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + hasMore = ((RegionObserver)env.getInstance()).postScannerNext(env, s, results, limit, hasMore); if (env.shouldComplete()) { break; @@ -1410,9 +969,9 @@ public class CoprocessorHost { try { boolean bypass = false; coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).preScannerClose(env, s); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).preScannerClose(env, s); bypass |= env.shouldBypass(); if (env.shouldComplete()) { break; @@ -1433,9 +992,9 @@ public class CoprocessorHost { throws IOException { try { coprocessorLock.readLock().lock(); - for (Environment env: coprocessors) { - if (env.impl instanceof RegionObserver) { - ((RegionObserver)env.impl).postScannerClose(env, s); + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ((RegionObserver)env.getInstance()).postScannerClose(env, s); if (env.shouldComplete()) { break; } diff --git a/src/main/resources/hbase-default.xml b/src/main/resources/hbase-default.xml index f1cc4ae0f85..c52045a44da 100644 --- a/src/main/resources/hbase-default.xml +++ b/src/main/resources/hbase-default.xml @@ -480,17 +480,28 @@ - hbase.coprocessor.default.classes + hbase.coprocessor.region.classes A comma-separated list of Coprocessors that are loaded by - default. For any override coprocessor method, these classes will be called - in order. After implement your own - Coprocessor, just put it in HBase's classpath and add the fully - qualified class name here. + default on all tables. For any override coprocessor method, these classes + will be called in order. After implementing your own Coprocessor, just put + it in HBase's classpath and add the fully qualified class name here. A coprocessor can also be loaded on demand by setting HTableDescriptor. + + hbase.coprocessor.master.classes + + A comma-separated list of + org.apache.hadoop.hbase.coprocessor.MasterObserver coprocessors that are + loaded by default on the active HMaster process. For any implemented + coprocessor methods, the listed classes will be called in order. After + implementing your own MasterObserver, just put it in HBase's classpath + and add the fully qualified class name here. + + +