HBASE-3256: Add coprocessor host and observer for HMaster
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1051639 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d1966accd4
commit
1229397a95
|
@ -46,6 +46,7 @@ Release 0.91.0 - Unreleased
|
|||
HBASE-3287 Add option to cache blocks on hfile write and evict blocks on
|
||||
hfile close
|
||||
HBASE-3335 Add BitComparator for filtering (Nathaniel Cook via Stack)
|
||||
HBASE-3256 Coprocessors: Coprocessor host and observer for HMaster
|
||||
|
||||
|
||||
Release 0.90.0 - Unreleased
|
||||
|
|
|
@ -0,0 +1,179 @@
|
|||
/*
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class BaseMasterObserver implements MasterObserver {
|
||||
@Override
|
||||
public void preCreateTable(MasterCoprocessorEnvironment env,
|
||||
HTableDescriptor desc, byte[][] splitKeys) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postCreateTable(MasterCoprocessorEnvironment env,
|
||||
HRegionInfo[] regions, boolean sync) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preDeleteTable(MasterCoprocessorEnvironment env, byte[] tableName)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDeleteTable(MasterCoprocessorEnvironment env, byte[] tableName)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preModifyTable(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, HTableDescriptor htd) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postModifyTable(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, HTableDescriptor htd) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preAddColumn(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, HColumnDescriptor column) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postAddColumn(MasterCoprocessorEnvironment env, byte[] tableName,
|
||||
HColumnDescriptor column) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preModifyColumn(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, HColumnDescriptor descriptor) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postModifyColumn(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, HColumnDescriptor descriptor) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preDeleteColumn(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, byte[] c) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDeleteColumn(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, byte[] c) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preEnableTable(MasterCoprocessorEnvironment env, byte[] tableName)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postEnableTable(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preDisableTable(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDisableTable(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preMove(MasterCoprocessorEnvironment env, HRegionInfo region,
|
||||
HServerInfo srcServer, HServerInfo destServer)
|
||||
throws UnknownRegionException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postMove(MasterCoprocessorEnvironment env, HRegionInfo region,
|
||||
HServerInfo srcServer, HServerInfo destServer)
|
||||
throws UnknownRegionException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preAssign(MasterCoprocessorEnvironment env, byte[] regionName,
|
||||
boolean force) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postAssign(MasterCoprocessorEnvironment env,
|
||||
HRegionInfo regionInfo) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preUnassign(MasterCoprocessorEnvironment env, byte[] regionName,
|
||||
boolean force) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postUnassign(MasterCoprocessorEnvironment env,
|
||||
HRegionInfo regionInfo, boolean force) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preBalance(MasterCoprocessorEnvironment env) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postBalance(MasterCoprocessorEnvironment env) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preBalanceSwitch(MasterCoprocessorEnvironment env, boolean b)
|
||||
throws IOException {
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postBalanceSwitch(MasterCoprocessorEnvironment env,
|
||||
boolean oldValue, boolean newValue) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preShutdown(MasterCoprocessorEnvironment env) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStopMaster(MasterCoprocessorEnvironment env)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment env) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(CoprocessorEnvironment env) throws IOException {
|
||||
}
|
||||
}
|
|
@ -36,8 +36,7 @@ import java.io.IOException;
|
|||
* By extending it, you can create you own region observer without
|
||||
* overriding all abstract methods of Coprocessor and RegionObserver.
|
||||
*/
|
||||
public abstract class BaseRegionObserverCoprocessor implements Coprocessor,
|
||||
RegionObserver {
|
||||
public abstract class BaseRegionObserverCoprocessor implements RegionObserver {
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment e) { }
|
||||
|
||||
|
@ -45,94 +44,94 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor,
|
|||
public void stop(CoprocessorEnvironment e) { }
|
||||
|
||||
@Override
|
||||
public void preOpen(CoprocessorEnvironment e) { }
|
||||
public void preOpen(RegionCoprocessorEnvironment e) { }
|
||||
|
||||
@Override
|
||||
public void postOpen(CoprocessorEnvironment e) { }
|
||||
public void postOpen(RegionCoprocessorEnvironment e) { }
|
||||
|
||||
@Override
|
||||
public void preClose(CoprocessorEnvironment e, boolean abortRequested)
|
||||
public void preClose(RegionCoprocessorEnvironment e, boolean abortRequested)
|
||||
{ }
|
||||
|
||||
@Override
|
||||
public void postClose(CoprocessorEnvironment e, boolean abortRequested)
|
||||
public void postClose(RegionCoprocessorEnvironment e, boolean abortRequested)
|
||||
{ }
|
||||
|
||||
@Override
|
||||
public void preFlush(CoprocessorEnvironment e) { }
|
||||
public void preFlush(RegionCoprocessorEnvironment e) { }
|
||||
|
||||
@Override
|
||||
public void postFlush(CoprocessorEnvironment e) { }
|
||||
public void postFlush(RegionCoprocessorEnvironment e) { }
|
||||
|
||||
@Override
|
||||
public void preSplit(CoprocessorEnvironment e) { }
|
||||
public void preSplit(RegionCoprocessorEnvironment e) { }
|
||||
|
||||
@Override
|
||||
public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) { }
|
||||
public void postSplit(RegionCoprocessorEnvironment e, HRegion l, HRegion r) { }
|
||||
|
||||
@Override
|
||||
public void preCompact(CoprocessorEnvironment e, boolean willSplit) { }
|
||||
public void preCompact(RegionCoprocessorEnvironment e, boolean willSplit) { }
|
||||
|
||||
@Override
|
||||
public void postCompact(CoprocessorEnvironment e, boolean willSplit) { }
|
||||
public void postCompact(RegionCoprocessorEnvironment e, boolean willSplit) { }
|
||||
|
||||
@Override
|
||||
public void preGetClosestRowBefore(final CoprocessorEnvironment e,
|
||||
public void preGetClosestRowBefore(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final Result result)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postGetClosestRowBefore(final CoprocessorEnvironment e,
|
||||
public void postGetClosestRowBefore(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final Result result)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preGet(final CoprocessorEnvironment e, final Get get,
|
||||
public void preGet(final RegionCoprocessorEnvironment e, final Get get,
|
||||
final List<KeyValue> results) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postGet(final CoprocessorEnvironment e, final Get get,
|
||||
public void postGet(final RegionCoprocessorEnvironment e, final Get get,
|
||||
final List<KeyValue> results) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preExists(final CoprocessorEnvironment e, final Get get,
|
||||
public boolean preExists(final RegionCoprocessorEnvironment e, final Get get,
|
||||
final boolean exists) throws IOException {
|
||||
return exists;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean postExists(final CoprocessorEnvironment e, final Get get,
|
||||
public boolean postExists(final RegionCoprocessorEnvironment e, final Get get,
|
||||
boolean exists) throws IOException {
|
||||
return exists;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prePut(final CoprocessorEnvironment e, final Map<byte[],
|
||||
public void prePut(final RegionCoprocessorEnvironment e, final Map<byte[],
|
||||
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postPut(final CoprocessorEnvironment e, final Map<byte[],
|
||||
public void postPut(final RegionCoprocessorEnvironment e, final Map<byte[],
|
||||
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preDelete(final CoprocessorEnvironment e, final Map<byte[],
|
||||
public void preDelete(final RegionCoprocessorEnvironment e, final Map<byte[],
|
||||
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDelete(final CoprocessorEnvironment e,
|
||||
public void postDelete(final RegionCoprocessorEnvironment e,
|
||||
final Map<byte[], List<KeyValue>> familyMap, final boolean writeToWAL)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preCheckAndPut(final CoprocessorEnvironment e,
|
||||
public boolean preCheckAndPut(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final byte [] value, final Put put, final boolean result)
|
||||
throws IOException {
|
||||
|
@ -140,7 +139,7 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor,
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean postCheckAndPut(final CoprocessorEnvironment e,
|
||||
public boolean postCheckAndPut(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final byte [] value, final Put put, final boolean result)
|
||||
throws IOException {
|
||||
|
@ -148,7 +147,7 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor,
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean preCheckAndDelete(final CoprocessorEnvironment e,
|
||||
public boolean preCheckAndDelete(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final byte [] value, final Delete delete, final boolean result)
|
||||
throws IOException {
|
||||
|
@ -156,7 +155,7 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor,
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean postCheckAndDelete(final CoprocessorEnvironment e,
|
||||
public boolean postCheckAndDelete(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final byte [] value, final Delete delete, final boolean result)
|
||||
throws IOException {
|
||||
|
@ -164,14 +163,14 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor,
|
|||
}
|
||||
|
||||
@Override
|
||||
public long preIncrementColumnValue(final CoprocessorEnvironment e,
|
||||
public long preIncrementColumnValue(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final long amount, final boolean writeToWAL) throws IOException {
|
||||
return amount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long postIncrementColumnValue(final CoprocessorEnvironment e,
|
||||
public long postIncrementColumnValue(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final long amount, final boolean writeToWAL, long result)
|
||||
throws IOException {
|
||||
|
@ -179,48 +178,48 @@ public abstract class BaseRegionObserverCoprocessor implements Coprocessor,
|
|||
}
|
||||
|
||||
@Override
|
||||
public void preIncrement(final CoprocessorEnvironment e,
|
||||
public void preIncrement(final RegionCoprocessorEnvironment e,
|
||||
final Increment increment, final Result result) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postIncrement(final CoprocessorEnvironment e,
|
||||
public void postIncrement(final RegionCoprocessorEnvironment e,
|
||||
final Increment increment, final Result result) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalScanner preScannerOpen(final CoprocessorEnvironment e,
|
||||
public InternalScanner preScannerOpen(final RegionCoprocessorEnvironment e,
|
||||
final Scan scan, final InternalScanner s) throws IOException {
|
||||
return s;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalScanner postScannerOpen(final CoprocessorEnvironment e,
|
||||
public InternalScanner postScannerOpen(final RegionCoprocessorEnvironment e,
|
||||
final Scan scan, final InternalScanner s) throws IOException {
|
||||
return s;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preScannerNext(final CoprocessorEnvironment e,
|
||||
public boolean preScannerNext(final RegionCoprocessorEnvironment e,
|
||||
final InternalScanner s, final List<KeyValue> results,
|
||||
final int limit, final boolean hasMore) throws IOException {
|
||||
return hasMore;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean postScannerNext(final CoprocessorEnvironment e,
|
||||
public boolean postScannerNext(final RegionCoprocessorEnvironment e,
|
||||
final InternalScanner s, final List<KeyValue> results, final int limit,
|
||||
final boolean hasMore) throws IOException {
|
||||
return hasMore;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preScannerClose(final CoprocessorEnvironment e,
|
||||
public void preScannerClose(final RegionCoprocessorEnvironment e,
|
||||
final InternalScanner s) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postScannerClose(final CoprocessorEnvironment e,
|
||||
public void postScannerClose(final RegionCoprocessorEnvironment e,
|
||||
final InternalScanner s) throws IOException {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,11 +33,11 @@ public interface CoprocessorEnvironment {
|
|||
/** @return the HBase version as a string (e.g. "0.21.0") */
|
||||
public String getHBaseVersion();
|
||||
|
||||
/** @return the region associated with this coprocessor */
|
||||
public HRegion getRegion();
|
||||
/** @return the loaded coprocessor instance */
|
||||
public Coprocessor getInstance();
|
||||
|
||||
/** @return reference to the region server services */
|
||||
public RegionServerServices getRegionServerServices();
|
||||
/** @return the priority assigned to the loaded coprocessor */
|
||||
public Coprocessor.Priority getPriority();
|
||||
|
||||
/**
|
||||
* @return an interface for accessing the given table
|
||||
|
|
|
@ -0,0 +1,556 @@
|
|||
/*
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
/**
|
||||
* Provides the common setup framework and runtime services for coprocessor
|
||||
* invocation from HBase services.
|
||||
* @param <E> the specific environment extension that a concrete implementation
|
||||
* provides
|
||||
*/
|
||||
public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
||||
public static final String REGION_COPROCESSOR_CONF_KEY =
|
||||
"hbase.coprocessor.region.classes";
|
||||
public static final String MASTER_COPROCESSOR_CONF_KEY =
|
||||
"hbase.coprocessor.master.classes";
|
||||
private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
|
||||
/** Ordered set of loaded coprocessors with lock */
|
||||
protected final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock();
|
||||
protected Set<E> coprocessors =
|
||||
new TreeSet<E>(new EnvironmentPriorityComparator());
|
||||
// unique file prefix to use for local copies of jars when classloading
|
||||
protected String pathPrefix;
|
||||
|
||||
public CoprocessorHost() {
|
||||
pathPrefix = UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Load system coprocessors. Read the class names from configuration.
|
||||
* Called by constructor.
|
||||
*/
|
||||
protected void loadSystemCoprocessors(Configuration conf, String confKey) {
|
||||
Class<?> implClass = null;
|
||||
|
||||
// load default coprocessors from configure file
|
||||
String defaultCPClasses = conf.get(confKey);
|
||||
if (defaultCPClasses == null || defaultCPClasses.length() == 0)
|
||||
return;
|
||||
StringTokenizer st = new StringTokenizer(defaultCPClasses, ",");
|
||||
int priority = Coprocessor.Priority.SYSTEM.intValue();
|
||||
while (st.hasMoreTokens()) {
|
||||
String className = st.nextToken();
|
||||
if (findCoprocessor(className) != null) {
|
||||
continue;
|
||||
}
|
||||
ClassLoader cl = ClassLoader.getSystemClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(cl);
|
||||
try {
|
||||
implClass = cl.loadClass(className);
|
||||
load(implClass, Coprocessor.Priority.SYSTEM);
|
||||
LOG.info("System coprocessor " + className + " was loaded " +
|
||||
"successfully with priority (" + priority++ + ").");
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.warn("Class " + className + " cannot be found. " +
|
||||
e.getMessage());
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Load coprocessor " + className + " failed. " +
|
||||
e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load a coprocessor implementation into the host
|
||||
* @param path path to implementation jar
|
||||
* @param className the main class name
|
||||
* @param priority chaining priority
|
||||
* @throws java.io.IOException Exception
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public void load(Path path, String className, Coprocessor.Priority priority)
|
||||
throws IOException {
|
||||
Class<?> implClass = null;
|
||||
|
||||
// Have we already loaded the class, perhaps from an earlier region open
|
||||
// for the same table?
|
||||
try {
|
||||
implClass = getClass().getClassLoader().loadClass(className);
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.info("Class " + className + " needs to be loaded from a file - " +
|
||||
path.toString() + ".");
|
||||
// go ahead to load from file system.
|
||||
}
|
||||
|
||||
// If not, load
|
||||
if (implClass == null) {
|
||||
// copy the jar to the local filesystem
|
||||
if (!path.toString().endsWith(".jar")) {
|
||||
throw new IOException(path.toString() + ": not a jar file?");
|
||||
}
|
||||
FileSystem fs = path.getFileSystem(HBaseConfiguration.create());
|
||||
Path dst = new Path("/tmp/." + pathPrefix +
|
||||
"." + className + "." + System.currentTimeMillis() + ".jar");
|
||||
fs.copyToLocalFile(path, dst);
|
||||
fs.deleteOnExit(dst);
|
||||
|
||||
// TODO: code weaving goes here
|
||||
|
||||
// TODO: wrap heap allocations and enforce maximum usage limits
|
||||
|
||||
/* TODO: inject code into loop headers that monitors CPU use and
|
||||
aborts runaway user code */
|
||||
|
||||
// load the jar and get the implementation main class
|
||||
String cp = System.getProperty("java.class.path");
|
||||
// NOTE: Path.toURL is deprecated (toURI instead) but the URLClassLoader
|
||||
// unsuprisingly wants URLs, not URIs; so we will use the deprecated
|
||||
// method which returns URLs for as long as it is available
|
||||
List<URL> paths = new ArrayList<URL>();
|
||||
paths.add(new File(dst.toString()).getCanonicalFile().toURL());
|
||||
StringTokenizer st = new StringTokenizer(cp, File.pathSeparator);
|
||||
while (st.hasMoreTokens()) {
|
||||
paths.add((new File(st.nextToken())).getCanonicalFile().toURL());
|
||||
}
|
||||
ClassLoader cl = new URLClassLoader(paths.toArray(new URL[]{}),
|
||||
ClassLoader.getSystemClassLoader());
|
||||
Thread.currentThread().setContextClassLoader(cl);
|
||||
try {
|
||||
implClass = cl.loadClass(className);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
load(implClass, priority);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param implClass Implementation class
|
||||
* @param priority priority
|
||||
* @throws java.io.IOException Exception
|
||||
*/
|
||||
public void load(Class<?> implClass, Coprocessor.Priority priority)
|
||||
throws IOException {
|
||||
// create the instance
|
||||
Coprocessor impl;
|
||||
Object o = null;
|
||||
try {
|
||||
o = implClass.newInstance();
|
||||
impl = (Coprocessor)o;
|
||||
} catch (InstantiationException e) {
|
||||
throw new IOException(e);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
// create the environment
|
||||
E env = createEnvironment(implClass, impl, priority);
|
||||
if (env instanceof Environment) {
|
||||
((Environment)env).startup();
|
||||
}
|
||||
|
||||
try {
|
||||
coprocessorLock.writeLock().lock();
|
||||
coprocessors.add(env);
|
||||
} finally {
|
||||
coprocessorLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a new Coprocessor class is loaded
|
||||
*/
|
||||
public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
|
||||
Coprocessor.Priority priority);
|
||||
|
||||
public void shutdown(CoprocessorEnvironment e) {
|
||||
if (e instanceof Environment) {
|
||||
((Environment)e).shutdown();
|
||||
} else {
|
||||
LOG.warn("Shutdown called on unknown environment: "+
|
||||
e.getClass().getName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a coprocessor implementation by class name
|
||||
* @param className the class name
|
||||
* @return the coprocessor, or null if not found
|
||||
*/
|
||||
public Coprocessor findCoprocessor(String className) {
|
||||
// initialize the coprocessors
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (E env: coprocessors) {
|
||||
if (env.getInstance().getClass().getName().equals(className) ||
|
||||
env.getInstance().getClass().getSimpleName().equals(className)) {
|
||||
return env.getInstance();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Environment priority comparator.
|
||||
* Coprocessors are chained in sorted order.
|
||||
*/
|
||||
static class EnvironmentPriorityComparator implements Comparator<CoprocessorEnvironment> {
|
||||
public int compare(CoprocessorEnvironment env1, CoprocessorEnvironment env2) {
|
||||
if (env1.getPriority().intValue() < env2.getPriority().intValue()) {
|
||||
return -1;
|
||||
} else if (env1.getPriority().intValue() > env2.getPriority().intValue()) {
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Encapsulation of the environment of each coprocessor
|
||||
*/
|
||||
public static class Environment implements CoprocessorEnvironment {
|
||||
|
||||
/**
|
||||
* A wrapper for HTable. Can be used to restrict privilege.
|
||||
*
|
||||
* Currently it just helps to track tables opened by a Coprocessor and
|
||||
* facilitate close of them if it is aborted.
|
||||
*
|
||||
* We also disallow row locking.
|
||||
*
|
||||
* There is nothing now that will stop a coprocessor from using HTable
|
||||
* objects directly instead of this API, but in the future we intend to
|
||||
* analyze coprocessor implementations as they are loaded and reject those
|
||||
* which attempt to use objects and methods outside the Environment
|
||||
* sandbox.
|
||||
*/
|
||||
class HTableWrapper implements HTableInterface {
|
||||
|
||||
private byte[] tableName;
|
||||
private HTable table;
|
||||
|
||||
public HTableWrapper(byte[] tableName) throws IOException {
|
||||
this.tableName = tableName;
|
||||
this.table = new HTable(tableName);
|
||||
openTables.add(this);
|
||||
}
|
||||
|
||||
void internalClose() throws IOException {
|
||||
table.close();
|
||||
}
|
||||
|
||||
public Configuration getConfiguration() {
|
||||
return table.getConfiguration();
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
internalClose();
|
||||
} finally {
|
||||
openTables.remove(this);
|
||||
}
|
||||
}
|
||||
|
||||
public Result getRowOrBefore(byte[] row, byte[] family)
|
||||
throws IOException {
|
||||
return table.getRowOrBefore(row, family);
|
||||
}
|
||||
|
||||
public Result get(Get get) throws IOException {
|
||||
return table.get(get);
|
||||
}
|
||||
|
||||
public boolean exists(Get get) throws IOException {
|
||||
return table.exists(get);
|
||||
}
|
||||
|
||||
public void put(Put put) throws IOException {
|
||||
table.put(put);
|
||||
}
|
||||
|
||||
public void put(List<Put> puts) throws IOException {
|
||||
table.put(puts);
|
||||
}
|
||||
|
||||
public void delete(Delete delete) throws IOException {
|
||||
table.delete(delete);
|
||||
}
|
||||
|
||||
public void delete(List<Delete> deletes) throws IOException {
|
||||
table.delete(deletes);
|
||||
}
|
||||
|
||||
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Put put) throws IOException {
|
||||
return table.checkAndPut(row, family, qualifier, value, put);
|
||||
}
|
||||
|
||||
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Delete delete) throws IOException {
|
||||
return table.checkAndDelete(row, family, qualifier, value, delete);
|
||||
}
|
||||
|
||||
public long incrementColumnValue(byte[] row, byte[] family,
|
||||
byte[] qualifier, long amount) throws IOException {
|
||||
return table.incrementColumnValue(row, family, qualifier, amount);
|
||||
}
|
||||
|
||||
public long incrementColumnValue(byte[] row, byte[] family,
|
||||
byte[] qualifier, long amount, boolean writeToWAL)
|
||||
throws IOException {
|
||||
return table.incrementColumnValue(row, family, qualifier, amount,
|
||||
writeToWAL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result increment(Increment increment) throws IOException {
|
||||
return table.increment(increment);
|
||||
}
|
||||
|
||||
public void flushCommits() throws IOException {
|
||||
table.flushCommits();
|
||||
}
|
||||
|
||||
public boolean isAutoFlush() {
|
||||
return table.isAutoFlush();
|
||||
}
|
||||
|
||||
public ResultScanner getScanner(Scan scan) throws IOException {
|
||||
return table.getScanner(scan);
|
||||
}
|
||||
|
||||
public ResultScanner getScanner(byte[] family) throws IOException {
|
||||
return table.getScanner(family);
|
||||
}
|
||||
|
||||
public ResultScanner getScanner(byte[] family, byte[] qualifier)
|
||||
throws IOException {
|
||||
return table.getScanner(family, qualifier);
|
||||
}
|
||||
|
||||
public HTableDescriptor getTableDescriptor() throws IOException {
|
||||
return table.getTableDescriptor();
|
||||
}
|
||||
|
||||
public byte[] getTableName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
public RowLock lockRow(byte[] row) throws IOException {
|
||||
throw new RuntimeException(
|
||||
"row locking is not allowed within the coprocessor environment");
|
||||
}
|
||||
|
||||
public void unlockRow(RowLock rl) throws IOException {
|
||||
throw new RuntimeException(
|
||||
"row locking is not allowed within the coprocessor environment");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void batch(List<Row> actions, Object[] results)
|
||||
throws IOException, InterruptedException {
|
||||
table.batch(actions, results);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] batch(List<Row> actions)
|
||||
throws IOException, InterruptedException {
|
||||
return table.batch(actions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result[] get(List<Get> gets) throws IOException {
|
||||
return table.get(gets);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends CoprocessorProtocol, R> void coprocessorExec(Class<T> protocol,
|
||||
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable,
|
||||
Batch.Callback<R> callback) throws IOException, Throwable {
|
||||
table.coprocessorExec(protocol, startKey, endKey, callable, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(
|
||||
Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
|
||||
throws IOException, Throwable {
|
||||
return table.coprocessorExec(protocol, startKey, endKey, callable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol,
|
||||
byte[] row) {
|
||||
return table.coprocessorProxy(protocol, row);
|
||||
}
|
||||
}
|
||||
|
||||
/** The coprocessor */
|
||||
public Coprocessor impl;
|
||||
/** Chaining priority */
|
||||
protected Coprocessor.Priority priority = Coprocessor.Priority.USER;
|
||||
/** Current coprocessor state */
|
||||
Coprocessor.State state = Coprocessor.State.UNINSTALLED;
|
||||
/** Accounting for tables opened by the coprocessor */
|
||||
protected List<HTableInterface> openTables =
|
||||
Collections.synchronizedList(new ArrayList<HTableInterface>());
|
||||
static final ThreadLocal<Boolean> bypass = new ThreadLocal<Boolean>() {
|
||||
@Override protected Boolean initialValue() {
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
};
|
||||
static final ThreadLocal<Boolean> complete = new ThreadLocal<Boolean>() {
|
||||
@Override protected Boolean initialValue() {
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param impl the coprocessor instance
|
||||
* @param priority chaining priority
|
||||
*/
|
||||
public Environment(final Coprocessor impl, Coprocessor.Priority priority) {
|
||||
this.impl = impl;
|
||||
this.priority = priority;
|
||||
this.state = Coprocessor.State.INSTALLED;
|
||||
}
|
||||
|
||||
/** Initialize the environment */
|
||||
public void startup() {
|
||||
if (state == Coprocessor.State.INSTALLED ||
|
||||
state == Coprocessor.State.STOPPED) {
|
||||
state = Coprocessor.State.STARTING;
|
||||
try {
|
||||
impl.start(this);
|
||||
state = Coprocessor.State.ACTIVE;
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Error starting coprocessor "+impl.getClass().getName(), ioe);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Not starting coprocessor "+impl.getClass().getName()+
|
||||
" because not inactive (state="+state.toString()+")");
|
||||
}
|
||||
}
|
||||
|
||||
/** Clean up the environment */
|
||||
protected void shutdown() {
|
||||
if (state == Coprocessor.State.ACTIVE) {
|
||||
state = Coprocessor.State.STOPPING;
|
||||
try {
|
||||
impl.stop(this);
|
||||
state = Coprocessor.State.STOPPED;
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
|
||||
" because not active (state="+state.toString()+")");
|
||||
}
|
||||
// clean up any table references
|
||||
for (HTableInterface table: openTables) {
|
||||
try {
|
||||
((HTableWrapper)table).internalClose();
|
||||
} catch (IOException e) {
|
||||
// nothing can be done here
|
||||
LOG.warn("Failed to close " +
|
||||
Bytes.toStringBinary(table.getTableName()), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean shouldBypass() {
|
||||
boolean current = bypass.get();
|
||||
bypass.set(false);
|
||||
return current;
|
||||
}
|
||||
|
||||
public boolean shouldComplete() {
|
||||
boolean current = complete.get();
|
||||
complete.set(false);
|
||||
return current;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Coprocessor getInstance() {
|
||||
return impl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Coprocessor.Priority getPriority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
/** @return the coprocessor environment version */
|
||||
@Override
|
||||
public int getVersion() {
|
||||
return Coprocessor.VERSION;
|
||||
}
|
||||
|
||||
/** @return the HBase release */
|
||||
@Override
|
||||
public String getHBaseVersion() {
|
||||
return VersionInfo.getVersion();
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a table from within the Coprocessor environment
|
||||
* @param tableName the table name
|
||||
* @return an interface for manipulating the table
|
||||
* @exception java.io.IOException Exception
|
||||
*/
|
||||
@Override
|
||||
public HTableInterface getTable(byte[] tableName) throws IOException {
|
||||
return new HTableWrapper(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete() {
|
||||
complete.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bypass() {
|
||||
bypass.set(true);
|
||||
}
|
||||
}}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
|
||||
public interface MasterCoprocessorEnvironment extends CoprocessorEnvironment {
|
||||
/** @return reference to the HMaster services */
|
||||
MasterServices getMasterServices();
|
||||
}
|
|
@ -0,0 +1,215 @@
|
|||
/*
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import org.apache.hadoop.hbase.*;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Defines coprocessor hooks for interacting with operations on the
|
||||
* {@link org.apache.hadoop.hbase.master.HMaster} process.
|
||||
*/
|
||||
public interface MasterObserver extends Coprocessor {
|
||||
|
||||
/**
|
||||
* Called before a new table is created by
|
||||
* {@link org.apache.hadoop.hbase.master.HMaster}.
|
||||
*/
|
||||
void preCreateTable(MasterCoprocessorEnvironment env,
|
||||
HTableDescriptor desc, byte[][] splitKeys) throws IOException;
|
||||
|
||||
/**
|
||||
* Called after the initial table regions have been created.
|
||||
* @param env the environment to interact with the framework and master
|
||||
* @param regions the initial regions created for the table
|
||||
* @param sync whether the client call is waiting for region assignment to
|
||||
* complete before returning
|
||||
* @throws IOException
|
||||
*/
|
||||
void postCreateTable(MasterCoprocessorEnvironment env,
|
||||
HRegionInfo[] regions, boolean sync) throws IOException;
|
||||
|
||||
/**
|
||||
* Called before {@link org.apache.hadoop.hbase.master.HMaster} deletes a
|
||||
* table
|
||||
*/
|
||||
void preDeleteTable(MasterCoprocessorEnvironment env, byte[] tableName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Called after the table has been deleted, before returning to the client.
|
||||
*/
|
||||
void postDeleteTable(MasterCoprocessorEnvironment env, byte[] tableName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Called prior to modifying a table's properties.
|
||||
*/
|
||||
void preModifyTable(MasterCoprocessorEnvironment env, final byte[] tableName,
|
||||
HTableDescriptor htd) throws IOException;
|
||||
|
||||
/**
|
||||
* Called after {@link org.apache.hadoop.hbase.master.HMaster} has modified
|
||||
* the table's properties in all the table regions.
|
||||
*/
|
||||
void postModifyTable(MasterCoprocessorEnvironment env, final byte[] tableName,
|
||||
HTableDescriptor htd) throws IOException;
|
||||
|
||||
/**
|
||||
* Called prior to adding a new column family to the table.
|
||||
*/
|
||||
void preAddColumn(MasterCoprocessorEnvironment env, byte[] tableName,
|
||||
HColumnDescriptor column) throws IOException;
|
||||
|
||||
/**
|
||||
* Called after the new column family has been created.
|
||||
*/
|
||||
void postAddColumn(MasterCoprocessorEnvironment env, byte[] tableName,
|
||||
HColumnDescriptor column) throws IOException;
|
||||
|
||||
/**
|
||||
* Called prior to modifying a column family's attributes.
|
||||
*/
|
||||
void preModifyColumn(MasterCoprocessorEnvironment env,
|
||||
byte [] tableName, HColumnDescriptor descriptor) throws IOException;
|
||||
|
||||
/**
|
||||
* Called after the column family has been updated.
|
||||
*/
|
||||
void postModifyColumn(MasterCoprocessorEnvironment env, byte[] tableName,
|
||||
HColumnDescriptor descriptor) throws IOException;
|
||||
|
||||
/**
|
||||
* Called prior to deleting the entire column family.
|
||||
*/
|
||||
void preDeleteColumn(MasterCoprocessorEnvironment env,
|
||||
final byte [] tableName, final byte[] c) throws IOException;
|
||||
|
||||
/**
|
||||
* Called after the column family has been deleted.
|
||||
*/
|
||||
void postDeleteColumn(MasterCoprocessorEnvironment env,
|
||||
final byte [] tableName, final byte[] c) throws IOException;
|
||||
|
||||
/**
|
||||
* Called prior to enabling a table.
|
||||
*/
|
||||
void preEnableTable(MasterCoprocessorEnvironment env, final byte[] tableName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Called after the table has been enabled.
|
||||
*/
|
||||
void postEnableTable(MasterCoprocessorEnvironment env, final byte[] tableName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Called prior to disabling a table.
|
||||
*/
|
||||
void preDisableTable(MasterCoprocessorEnvironment env, final byte[] tableName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Called after the table has been disabled.
|
||||
*/
|
||||
void postDisableTable(MasterCoprocessorEnvironment env, final byte[] tableName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Called prior to moving a given region from one region server to another.
|
||||
*/
|
||||
void preMove(MasterCoprocessorEnvironment env, final HRegionInfo region,
|
||||
final HServerInfo srcServer, final HServerInfo destServer)
|
||||
throws UnknownRegionException;
|
||||
|
||||
/**
|
||||
* Called after the region move has been requested.
|
||||
*/
|
||||
void postMove(MasterCoprocessorEnvironment env, final HRegionInfo region,
|
||||
final HServerInfo srcServer, final HServerInfo destServer)
|
||||
throws UnknownRegionException;
|
||||
|
||||
/**
|
||||
* Called prior to assigning a specific region.
|
||||
*/
|
||||
void preAssign(MasterCoprocessorEnvironment env, final byte [] regionName,
|
||||
final boolean force) throws IOException;
|
||||
|
||||
/**
|
||||
* Called after the region assignment has been requested.
|
||||
*/
|
||||
void postAssign(MasterCoprocessorEnvironment env, final HRegionInfo regionInfo)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Called prior to unassigning a given region.
|
||||
*/
|
||||
void preUnassign(MasterCoprocessorEnvironment env, final byte [] regionName,
|
||||
final boolean force) throws IOException;
|
||||
|
||||
/**
|
||||
* Called after the region unassignment has been requested.
|
||||
*/
|
||||
void postUnassign(MasterCoprocessorEnvironment env,
|
||||
final HRegionInfo regionInfo, final boolean force) throws IOException;
|
||||
|
||||
/**
|
||||
* Called prior to requesting rebalancing of the cluster regions, though after
|
||||
* the initial checks for regions in transition and the balance switch flag.
|
||||
*/
|
||||
void preBalance(MasterCoprocessorEnvironment env) throws IOException;
|
||||
|
||||
/**
|
||||
* Called after the balancing plan has been submitted.
|
||||
*/
|
||||
void postBalance(MasterCoprocessorEnvironment env) throws IOException;
|
||||
|
||||
/**
|
||||
* Called prior to modifying the flag used to enable/disable region balancing.
|
||||
* @param env the coprocessor instance's environment
|
||||
* @param newValue the new flag value submitted in the call
|
||||
*/
|
||||
boolean preBalanceSwitch(MasterCoprocessorEnvironment env,
|
||||
final boolean newValue) throws IOException;
|
||||
|
||||
/**
|
||||
* Called after the flag to enable/disable balancing has changed.
|
||||
* @param env the coprocessor instance's environment
|
||||
* @param oldValue the previously set balanceSwitch value
|
||||
* @param newValue the newly set balanceSwitch value
|
||||
*/
|
||||
void postBalanceSwitch(MasterCoprocessorEnvironment env,
|
||||
final boolean oldValue, final boolean newValue) throws IOException;
|
||||
|
||||
/**
|
||||
* Called prior to shutting down the full HBase cluster, including this
|
||||
* {@link org.apache.hadoop.hbase.master.HMaster} process.
|
||||
*/
|
||||
void preShutdown(MasterCoprocessorEnvironment env) throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* Called immediatly prior to stopping this
|
||||
* {@link org.apache.hadoop.hbase.master.HMaster} process.
|
||||
*/
|
||||
void preStopMaster(MasterCoprocessorEnvironment env) throws IOException;
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
|
||||
public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment {
|
||||
/** @return the region associated with this coprocessor */
|
||||
public HRegion getRegion();
|
||||
|
||||
/** @return reference to the region server services */
|
||||
public RegionServerServices getRegionServerServices();
|
||||
|
||||
}
|
|
@ -42,25 +42,25 @@ public interface RegionObserver extends Coprocessor {
|
|||
* Called before the region is reported as open to the master.
|
||||
* @param e the environment provided by the region server
|
||||
*/
|
||||
public void preOpen(final CoprocessorEnvironment e);
|
||||
public void preOpen(final RegionCoprocessorEnvironment e);
|
||||
|
||||
/**
|
||||
* Called after the region is reported as open to the master.
|
||||
* @param e the environment provided by the region server
|
||||
*/
|
||||
public void postOpen(final CoprocessorEnvironment e);
|
||||
public void postOpen(final RegionCoprocessorEnvironment e);
|
||||
|
||||
/**
|
||||
* Called before the memstore is flushed to disk.
|
||||
* @param e the environment provided by the region server
|
||||
*/
|
||||
public void preFlush(final CoprocessorEnvironment e);
|
||||
public void preFlush(final RegionCoprocessorEnvironment e);
|
||||
|
||||
/**
|
||||
* Called after the memstore is flushed to disk.
|
||||
* @param e the environment provided by the region server
|
||||
*/
|
||||
public void postFlush(final CoprocessorEnvironment e);
|
||||
public void postFlush(final RegionCoprocessorEnvironment e);
|
||||
|
||||
/**
|
||||
* Called before compaction.
|
||||
|
@ -68,7 +68,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param willSplit true if compaction will result in a split, false
|
||||
* otherwise
|
||||
*/
|
||||
public void preCompact(final CoprocessorEnvironment e,
|
||||
public void preCompact(final RegionCoprocessorEnvironment e,
|
||||
final boolean willSplit);
|
||||
|
||||
/**
|
||||
|
@ -77,7 +77,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param willSplit true if compaction will result in a split, false
|
||||
* otherwise
|
||||
*/
|
||||
public void postCompact(final CoprocessorEnvironment e,
|
||||
public void postCompact(final RegionCoprocessorEnvironment e,
|
||||
final boolean willSplit);
|
||||
|
||||
/**
|
||||
|
@ -85,7 +85,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param e the environment provided by the region server
|
||||
* (e.getRegion() returns the parent region)
|
||||
*/
|
||||
public void preSplit(final CoprocessorEnvironment e);
|
||||
public void preSplit(final RegionCoprocessorEnvironment e);
|
||||
|
||||
/**
|
||||
* Called after the region is split.
|
||||
|
@ -94,7 +94,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param l the left daughter region
|
||||
* @param r the right daughter region
|
||||
*/
|
||||
public void postSplit(final CoprocessorEnvironment e, final HRegion l,
|
||||
public void postSplit(final RegionCoprocessorEnvironment e, final HRegion l,
|
||||
final HRegion r);
|
||||
|
||||
/**
|
||||
|
@ -102,14 +102,16 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param e the environment provided by the region server
|
||||
* @param abortRequested true if the region server is aborting
|
||||
*/
|
||||
public void preClose(final CoprocessorEnvironment e, boolean abortRequested);
|
||||
public void preClose(final RegionCoprocessorEnvironment e,
|
||||
boolean abortRequested);
|
||||
|
||||
/**
|
||||
* Called after the region is reported as closed to the master.
|
||||
* @param e the environment provided by the region server
|
||||
* @param abortRequested true if the region server is aborting
|
||||
*/
|
||||
public void postClose(final CoprocessorEnvironment e, boolean abortRequested);
|
||||
public void postClose(final RegionCoprocessorEnvironment e,
|
||||
boolean abortRequested);
|
||||
|
||||
/**
|
||||
* Called before a client makes a GetClosestRowBefore request.
|
||||
|
@ -126,7 +128,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* is not bypassed.
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public void preGetClosestRowBefore(final CoprocessorEnvironment e,
|
||||
public void preGetClosestRowBefore(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final Result result)
|
||||
throws IOException;
|
||||
|
||||
|
@ -141,7 +143,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param result the result to return to the client, modify as necessary
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public void postGetClosestRowBefore(final CoprocessorEnvironment e,
|
||||
public void postGetClosestRowBefore(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final Result result)
|
||||
throws IOException;
|
||||
|
||||
|
@ -159,7 +161,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* is not bypassed.
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public void preGet(final CoprocessorEnvironment e, final Get get,
|
||||
public void preGet(final RegionCoprocessorEnvironment e, final Get get,
|
||||
final List<KeyValue> result)
|
||||
throws IOException;
|
||||
|
||||
|
@ -173,7 +175,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param result the result to return to the client, modify as necessary
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public void postGet(final CoprocessorEnvironment e, final Get get,
|
||||
public void postGet(final RegionCoprocessorEnvironment e, final Get get,
|
||||
final List<KeyValue> result)
|
||||
throws IOException;
|
||||
|
||||
|
@ -190,7 +192,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @return the value to return to the client if bypassing default processing
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public boolean preExists(final CoprocessorEnvironment e, final Get get,
|
||||
public boolean preExists(final RegionCoprocessorEnvironment e, final Get get,
|
||||
final boolean exists)
|
||||
throws IOException;
|
||||
|
||||
|
@ -205,7 +207,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @return the result to return to the client
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public boolean postExists(final CoprocessorEnvironment e, final Get get,
|
||||
public boolean postExists(final RegionCoprocessorEnvironment e, final Get get,
|
||||
final boolean exists)
|
||||
throws IOException;
|
||||
|
||||
|
@ -221,7 +223,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param writeToWAL true if the change should be written to the WAL
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public void prePut(final CoprocessorEnvironment e, final Map<byte[],
|
||||
public void prePut(final RegionCoprocessorEnvironment e, final Map<byte[],
|
||||
List<KeyValue>> familyMap, final boolean writeToWAL)
|
||||
throws IOException;
|
||||
|
||||
|
@ -235,7 +237,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param writeToWAL true if the change should be written to the WAL
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public void postPut(final CoprocessorEnvironment e, final Map<byte[],
|
||||
public void postPut(final RegionCoprocessorEnvironment e, final Map<byte[],
|
||||
List<KeyValue>> familyMap, final boolean writeToWAL)
|
||||
throws IOException;
|
||||
|
||||
|
@ -251,7 +253,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param writeToWAL true if the change should be written to the WAL
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public void preDelete(final CoprocessorEnvironment e, final Map<byte[],
|
||||
public void preDelete(final RegionCoprocessorEnvironment e, final Map<byte[],
|
||||
List<KeyValue>> familyMap, final boolean writeToWAL)
|
||||
throws IOException;
|
||||
|
||||
|
@ -265,7 +267,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param writeToWAL true if the change should be written to the WAL
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public void postDelete(final CoprocessorEnvironment e,
|
||||
public void postDelete(final RegionCoprocessorEnvironment e,
|
||||
final Map<byte[], List<KeyValue>> familyMap, final boolean writeToWAL)
|
||||
throws IOException;
|
||||
|
||||
|
@ -287,7 +289,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* processing
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public boolean preCheckAndPut(final CoprocessorEnvironment e,
|
||||
public boolean preCheckAndPut(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final byte [] value, final Put put, final boolean result)
|
||||
throws IOException;
|
||||
|
@ -307,7 +309,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @return the possibly transformed return value to return to client
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public boolean postCheckAndPut(final CoprocessorEnvironment e,
|
||||
public boolean postCheckAndPut(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final byte [] value, final Put put, final boolean result)
|
||||
throws IOException;
|
||||
|
@ -329,7 +331,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @return the value to return to client if bypassing default processing
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public boolean preCheckAndDelete(final CoprocessorEnvironment e,
|
||||
public boolean preCheckAndDelete(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final byte [] value, final Delete delete, final boolean result)
|
||||
throws IOException;
|
||||
|
@ -349,7 +351,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @return the possibly transformed returned value to return to client
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public boolean postCheckAndDelete(final CoprocessorEnvironment e,
|
||||
public boolean postCheckAndDelete(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final byte [] value, final Delete delete, final boolean result)
|
||||
throws IOException;
|
||||
|
@ -370,7 +372,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @return value to return to the client if bypassing default processing
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public long preIncrementColumnValue(final CoprocessorEnvironment e,
|
||||
public long preIncrementColumnValue(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final long amount, final boolean writeToWAL)
|
||||
throws IOException;
|
||||
|
@ -390,7 +392,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @return the result to return to the client
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public long postIncrementColumnValue(final CoprocessorEnvironment e,
|
||||
public long postIncrementColumnValue(final RegionCoprocessorEnvironment e,
|
||||
final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final long amount, final boolean writeToWAL, final long result)
|
||||
throws IOException;
|
||||
|
@ -407,10 +409,9 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param result The result to return to the client if default processing
|
||||
* is bypassed. Can be modified. Will not be used if default processing
|
||||
* is not bypassed.
|
||||
* @param writeToWAL true if the change should be written to the WAL
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public void preIncrement(final CoprocessorEnvironment e,
|
||||
public void preIncrement(final RegionCoprocessorEnvironment e,
|
||||
final Increment increment, final Result result)
|
||||
throws IOException;
|
||||
|
||||
|
@ -421,11 +422,10 @@ public interface RegionObserver extends Coprocessor {
|
|||
* coprocessors
|
||||
* @param e the environment provided by the region server
|
||||
* @param increment increment object
|
||||
* @param writeToWAL true if the change should be written to the WAL
|
||||
* @param result the result returned by increment, can be modified
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public void postIncrement(final CoprocessorEnvironment e,
|
||||
public void postIncrement(final RegionCoprocessorEnvironment e,
|
||||
final Increment increment, final Result result)
|
||||
throws IOException;
|
||||
|
||||
|
@ -443,7 +443,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* overriding default behavior, null otherwise
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public InternalScanner preScannerOpen(final CoprocessorEnvironment e,
|
||||
public InternalScanner preScannerOpen(final RegionCoprocessorEnvironment e,
|
||||
final Scan scan, final InternalScanner s)
|
||||
throws IOException;
|
||||
|
||||
|
@ -458,7 +458,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @return the scanner instance to use
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public InternalScanner postScannerOpen(final CoprocessorEnvironment e,
|
||||
public InternalScanner postScannerOpen(final RegionCoprocessorEnvironment e,
|
||||
final Scan scan, final InternalScanner s)
|
||||
throws IOException;
|
||||
|
||||
|
@ -479,7 +479,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @return 'has more' indication that should be sent to client
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public boolean preScannerNext(final CoprocessorEnvironment e,
|
||||
public boolean preScannerNext(final RegionCoprocessorEnvironment e,
|
||||
final InternalScanner s, final List<KeyValue> result,
|
||||
final int limit, final boolean hasNext)
|
||||
throws IOException;
|
||||
|
@ -497,7 +497,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @return 'has more' indication that should be sent to client
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public boolean postScannerNext(final CoprocessorEnvironment e,
|
||||
public boolean postScannerNext(final RegionCoprocessorEnvironment e,
|
||||
final InternalScanner s, final List<KeyValue> result, final int limit,
|
||||
final boolean hasNext)
|
||||
throws IOException;
|
||||
|
@ -513,7 +513,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param s the scanner
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public void preScannerClose(final CoprocessorEnvironment e,
|
||||
public void preScannerClose(final RegionCoprocessorEnvironment e,
|
||||
final InternalScanner s)
|
||||
throws IOException;
|
||||
|
||||
|
@ -526,7 +526,7 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param s the scanner
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public void postScannerClose(final CoprocessorEnvironment e,
|
||||
public void postScannerClose(final RegionCoprocessorEnvironment e,
|
||||
final InternalScanner s)
|
||||
throws IOException;
|
||||
}
|
||||
|
|
|
@ -286,7 +286,7 @@ or by <code>HTableDescriptor</code> for a newly created table.
|
|||
opened regions.)
|
||||
<h3>Load from configuration</h3>
|
||||
Whenever a region is opened, it will read coprocessor class names from
|
||||
<code>hbase.coprocessor.default.classes</code> from <code>Configuration</code>.
|
||||
<code>hbase.coprocessor.region.classes</code> from <code>Configuration</code>.
|
||||
Coprocessor framework will automatically load the configured classes as
|
||||
default coprocessors. The classes must be included in the classpath already.
|
||||
|
||||
|
@ -294,7 +294,7 @@ default coprocessors. The classes must be included in the classpath already.
|
|||
<div style="background-color: #cccccc; padding: 2px">
|
||||
<blockquote><pre>
|
||||
<property>
|
||||
<name>hbase.coprocessor.default.classes</name>
|
||||
<name>hbase.coprocessor.region.classes</name>
|
||||
<value>org.apache.hadoop.hbase.coprocessor.AccessControllCoprocessor, org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol</value>
|
||||
<description>A comma-separated list of Coprocessors that are loaded by
|
||||
default. For any override coprocessor method from RegionObservor or
|
||||
|
|
|
@ -170,6 +170,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
private Thread catalogJanitorChore;
|
||||
private LogCleaner logCleaner;
|
||||
|
||||
private MasterCoprocessorHost cpHost;
|
||||
|
||||
/**
|
||||
* Initializes the HMaster. The steps are as follows:
|
||||
* <p>
|
||||
|
@ -369,6 +371,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) +
|
||||
", cluster-up flag was=" + wasUp);
|
||||
|
||||
// initialize master side coprocessors before we start handling requests
|
||||
this.cpHost = new MasterCoprocessorHost(this, this.conf);
|
||||
|
||||
// start up all service threads.
|
||||
startServiceThreads();
|
||||
|
||||
|
@ -675,6 +680,19 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
this.serverManager.getDeadServers());
|
||||
return false;
|
||||
}
|
||||
|
||||
if (this.cpHost != null) {
|
||||
try {
|
||||
if (this.cpHost.preBalance()) {
|
||||
LOG.debug("Coprocessor bypassing balancer request");
|
||||
return false;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Error invoking master coprocessor preBalance()", ioe);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
Map<HServerInfo, List<HRegionInfo>> assignments =
|
||||
this.assignmentManager.getAssignments();
|
||||
// Returned Map from AM does not include mention of servers w/o assignments.
|
||||
|
@ -692,6 +710,14 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
this.assignmentManager.balance(plan);
|
||||
}
|
||||
}
|
||||
if (this.cpHost != null) {
|
||||
try {
|
||||
this.cpHost.postBalance();
|
||||
} catch (IOException ioe) {
|
||||
// balancing already succeeded so don't change the result
|
||||
LOG.error("Error invoking master coprocessor postBalance()", ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -699,8 +725,19 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
@Override
|
||||
public boolean balanceSwitch(final boolean b) {
|
||||
boolean oldValue = this.balanceSwitch;
|
||||
this.balanceSwitch = b;
|
||||
LOG.info("Balance=" + b);
|
||||
boolean newValue = b;
|
||||
try {
|
||||
if (this.cpHost != null) {
|
||||
newValue = this.cpHost.preBalanceSwitch(newValue);
|
||||
}
|
||||
this.balanceSwitch = newValue;
|
||||
LOG.info("Balance=" + newValue);
|
||||
if (this.cpHost != null) {
|
||||
this.cpHost.postBalanceSwitch(oldValue, newValue);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Error flipping balance switch", ioe);
|
||||
}
|
||||
return oldValue;
|
||||
}
|
||||
|
||||
|
@ -721,8 +758,15 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
this.assignmentManager.unassign(hri);
|
||||
} else {
|
||||
dest = this.serverManager.getServerInfo(new String(destServerName));
|
||||
|
||||
if (this.cpHost != null) {
|
||||
this.cpHost.preMove(p.getFirst(), p.getSecond(), dest);
|
||||
}
|
||||
RegionPlan rp = new RegionPlan(p.getFirst(), p.getSecond(), dest);
|
||||
this.assignmentManager.balance(rp);
|
||||
if (this.cpHost != null) {
|
||||
this.cpHost.postMove(p.getFirst(), p.getSecond(), dest);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -737,6 +781,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
if (!isMasterRunning()) {
|
||||
throw new MasterNotRunningException();
|
||||
}
|
||||
if (cpHost != null) {
|
||||
cpHost.preCreateTable(desc, splitKeys);
|
||||
}
|
||||
HRegionInfo [] newRegions = null;
|
||||
if(splitKeys == null || splitKeys.length == 0) {
|
||||
newRegions = new HRegionInfo [] { new HRegionInfo(desc, null, null) };
|
||||
|
@ -810,6 +857,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (cpHost != null) {
|
||||
cpHost.postCreateTable(newRegions, sync);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isCatalogTable(final byte [] tableName) {
|
||||
|
@ -818,32 +869,68 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
}
|
||||
|
||||
public void deleteTable(final byte [] tableName) throws IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preDeleteTable(tableName);
|
||||
}
|
||||
this.executorService.submit(new DeleteTableHandler(tableName, this, this));
|
||||
if (cpHost != null) {
|
||||
cpHost.postDeleteTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
public void addColumn(byte [] tableName, HColumnDescriptor column)
|
||||
throws IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preAddColumn(tableName, column);
|
||||
}
|
||||
new TableAddFamilyHandler(tableName, column, this, this).process();
|
||||
if (cpHost != null) {
|
||||
cpHost.postAddColumn(tableName, column);
|
||||
}
|
||||
}
|
||||
|
||||
public void modifyColumn(byte [] tableName, HColumnDescriptor descriptor)
|
||||
throws IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preModifyColumn(tableName, descriptor);
|
||||
}
|
||||
new TableModifyFamilyHandler(tableName, descriptor, this, this).process();
|
||||
if (cpHost != null) {
|
||||
cpHost.postModifyColumn(tableName, descriptor);
|
||||
}
|
||||
}
|
||||
|
||||
public void deleteColumn(final byte [] tableName, final byte [] c)
|
||||
throws IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preDeleteColumn(tableName, c);
|
||||
}
|
||||
new TableDeleteFamilyHandler(tableName, c, this, this).process();
|
||||
if (cpHost != null) {
|
||||
cpHost.postDeleteColumn(tableName, c);
|
||||
}
|
||||
}
|
||||
|
||||
public void enableTable(final byte [] tableName) throws IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preEnableTable(tableName);
|
||||
}
|
||||
this.executorService.submit(new EnableTableHandler(this, tableName,
|
||||
catalogTracker, assignmentManager));
|
||||
if (cpHost != null) {
|
||||
cpHost.postEnableTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
public void disableTable(final byte [] tableName) throws IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preDisableTable(tableName);
|
||||
}
|
||||
this.executorService.submit(new DisableTableHandler(this, tableName,
|
||||
catalogTracker, assignmentManager));
|
||||
if (cpHost != null) {
|
||||
cpHost.postDisableTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -886,7 +973,13 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
@Override
|
||||
public void modifyTable(final byte[] tableName, HTableDescriptor htd)
|
||||
throws IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preModifyTable(tableName, htd);
|
||||
}
|
||||
this.executorService.submit(new ModifyTableHandler(tableName, htd, this, this));
|
||||
if (cpHost != null) {
|
||||
cpHost.postModifyTable(tableName, htd);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -935,6 +1028,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
return zooKeeper;
|
||||
}
|
||||
|
||||
public MasterCoprocessorHost getCoprocessorHost() {
|
||||
return cpHost;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getServerName() {
|
||||
return address.toString();
|
||||
|
@ -952,6 +1049,13 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (cpHost != null) {
|
||||
try {
|
||||
cpHost.preShutdown();
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Error call master coprocessor preShutdown()", ioe);
|
||||
}
|
||||
}
|
||||
this.serverManager.shutdownCluster();
|
||||
try {
|
||||
this.clusterStatusTracker.setClusterDown();
|
||||
|
@ -962,6 +1066,13 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
|
||||
@Override
|
||||
public void stopMaster() {
|
||||
if (cpHost != null) {
|
||||
try {
|
||||
cpHost.preStopMaster();
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Error call master coprocessor preStopMaster()", ioe);
|
||||
}
|
||||
}
|
||||
stop("Stopped by " + Thread.currentThread().getName());
|
||||
}
|
||||
|
||||
|
@ -1008,10 +1119,18 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
@Override
|
||||
public void assign(final byte [] regionName, final boolean force)
|
||||
throws IOException {
|
||||
if (cpHost != null) {
|
||||
if (cpHost.preAssign(regionName, force)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
Pair<HRegionInfo, HServerAddress> pair =
|
||||
MetaReader.getRegion(this.catalogTracker, regionName);
|
||||
if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName));
|
||||
assignRegion(pair.getFirst());
|
||||
if (cpHost != null) {
|
||||
cpHost.postAssign(pair.getFirst());
|
||||
}
|
||||
}
|
||||
|
||||
public void assignRegion(HRegionInfo hri) {
|
||||
|
@ -1021,12 +1140,20 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
@Override
|
||||
public void unassign(final byte [] regionName, final boolean force)
|
||||
throws IOException {
|
||||
if (cpHost != null) {
|
||||
if (cpHost.preUnassign(regionName, force)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
Pair<HRegionInfo, HServerAddress> pair =
|
||||
MetaReader.getRegion(this.catalogTracker, regionName);
|
||||
if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName));
|
||||
HRegionInfo hri = pair.getFirst();
|
||||
if (force) this.assignmentManager.clearRegionFromTransition(hri);
|
||||
this.assignmentManager.unassign(hri, force);
|
||||
if (cpHost != null) {
|
||||
cpHost.postUnassign(hri, force);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,553 @@
|
|||
/*
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.coprocessor.*;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Provides the coprocessor framework and environment for master oriented
|
||||
* operations. {@link HMaster} interacts with the loaded coprocessors
|
||||
* through this class.
|
||||
*/
|
||||
public class MasterCoprocessorHost
|
||||
extends CoprocessorHost<MasterCoprocessorHost.MasterEnvironment> {
|
||||
|
||||
/**
|
||||
* Coprocessor environment extension providing access to master related
|
||||
* services.
|
||||
*/
|
||||
static class MasterEnvironment extends CoprocessorHost.Environment
|
||||
implements MasterCoprocessorEnvironment {
|
||||
private MasterServices masterServices;
|
||||
|
||||
public MasterEnvironment(Class<?> implClass, Coprocessor impl,
|
||||
Coprocessor.Priority priority, MasterServices services) {
|
||||
super(impl, priority);
|
||||
this.masterServices = services;
|
||||
}
|
||||
|
||||
public MasterServices getMasterServices() {
|
||||
return masterServices;
|
||||
}
|
||||
}
|
||||
|
||||
private MasterServices masterServices;
|
||||
|
||||
MasterCoprocessorHost(final MasterServices services, final Configuration conf) {
|
||||
this.masterServices = services;
|
||||
|
||||
loadSystemCoprocessors(conf, MASTER_COPROCESSOR_CONF_KEY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterEnvironment createEnvironment(Class<?> implClass,
|
||||
Coprocessor instance, Coprocessor.Priority priority) {
|
||||
return new MasterEnvironment(implClass, instance, priority, masterServices);
|
||||
}
|
||||
|
||||
/* Implementation of hooks for invoking MasterObservers */
|
||||
void preCreateTable(HTableDescriptor desc, byte[][] splitKeys)
|
||||
throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).preCreateTable(env, desc, splitKeys);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void postCreateTable(HRegionInfo[] regions, boolean sync) throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).postCreateTable(env, regions, sync);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void preDeleteTable(byte[] tableName) throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).preDeleteTable(env, tableName);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void postDeleteTable(byte[] tableName) throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).postDeleteTable(env, tableName);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void preModifyTable(final byte[] tableName, HTableDescriptor htd)
|
||||
throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).preModifyTable(env, tableName, htd);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void postModifyTable(final byte[] tableName, HTableDescriptor htd)
|
||||
throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).postModifyTable(env, tableName, htd);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void preAddColumn(byte [] tableName, HColumnDescriptor column)
|
||||
throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).preAddColumn(env, tableName, column);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void postAddColumn(byte [] tableName, HColumnDescriptor column)
|
||||
throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).postAddColumn(env, tableName, column);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void preModifyColumn(byte [] tableName, HColumnDescriptor descriptor)
|
||||
throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).preModifyColumn(
|
||||
env, tableName, descriptor);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void postModifyColumn(byte [] tableName, HColumnDescriptor descriptor)
|
||||
throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).postModifyColumn(
|
||||
env, tableName, descriptor);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void preDeleteColumn(final byte [] tableName, final byte [] c)
|
||||
throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).preDeleteColumn(env, tableName, c);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void postDeleteColumn(final byte [] tableName, final byte [] c)
|
||||
throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).postDeleteColumn(env, tableName, c);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void preEnableTable(final byte [] tableName) throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).preEnableTable(env, tableName);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void postEnableTable(final byte [] tableName) throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).postEnableTable(env, tableName);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void preDisableTable(final byte [] tableName) throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).preDisableTable(env, tableName);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void postDisableTable(final byte [] tableName) throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).postDisableTable(env, tableName);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void preMove(final HRegionInfo region, final HServerInfo srcServer, final HServerInfo destServer)
|
||||
throws UnknownRegionException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).preMove(
|
||||
env, region, srcServer, destServer);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void postMove(final HRegionInfo region, final HServerInfo srcServer, final HServerInfo destServer)
|
||||
throws UnknownRegionException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).postMove(
|
||||
env, region, srcServer, destServer);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
boolean preAssign(final byte [] regionName, final boolean force)
|
||||
throws IOException {
|
||||
boolean bypass = false;
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).preAssign(env, regionName, force);
|
||||
bypass |= env.shouldBypass();
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
return bypass;
|
||||
}
|
||||
|
||||
void postAssign(final HRegionInfo regionInfo) throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).postAssign(env, regionInfo);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
boolean preUnassign(final byte [] regionName, final boolean force)
|
||||
throws IOException {
|
||||
boolean bypass = false;
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).preUnassign(
|
||||
env, regionName, force);
|
||||
bypass |= env.shouldBypass();
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
return bypass;
|
||||
}
|
||||
|
||||
void postUnassign(final HRegionInfo regionInfo, final boolean force)
|
||||
throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).postUnassign(
|
||||
env, regionInfo, force);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
boolean preBalance() throws IOException {
|
||||
try {
|
||||
boolean bypass = false;
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).preBalance(env);
|
||||
bypass |= env.shouldBypass();
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return bypass;
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void postBalance() throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).postBalance(env);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
boolean preBalanceSwitch(final boolean b) throws IOException {
|
||||
boolean balance = b;
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
balance = ((MasterObserver)env.getInstance()).preBalanceSwitch(
|
||||
env, balance);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
return balance;
|
||||
}
|
||||
|
||||
void postBalanceSwitch(final boolean oldValue, final boolean newValue)
|
||||
throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).postBalanceSwitch(
|
||||
env, oldValue, newValue);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void preShutdown() throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).preShutdown(env);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void preStopMaster() throws IOException {
|
||||
try {
|
||||
coprocessorLock.readLock().lock();
|
||||
for (MasterEnvironment env: coprocessors) {
|
||||
if (env.getInstance() instanceof MasterObserver) {
|
||||
((MasterObserver)env.getInstance()).preStopMaster(env);
|
||||
if (env.shouldComplete()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
coprocessorLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -23,7 +23,9 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
||||
/**
|
||||
* Services Master supplies
|
||||
|
@ -56,4 +58,15 @@ public interface MasterServices {
|
|||
* @throws TableNotFoundException
|
||||
*/
|
||||
public void checkTableModifiable(final byte [] tableName) throws IOException;
|
||||
|
||||
/**
|
||||
* @return Implementation of {@link org.apache.hadoop.hbase.catalog.CatalogTracker} or null.
|
||||
*/
|
||||
public CatalogTracker getCatalogTracker();
|
||||
|
||||
/*
|
||||
* @return Implementation of {@link ZooKeeperWatcher} or null.
|
||||
*/
|
||||
public ZooKeeperWatcher getZooKeeperWatcher();
|
||||
|
||||
}
|
||||
|
|
|
@ -248,7 +248,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
new ReadWriteConsistencyControl();
|
||||
|
||||
// Coprocessor host
|
||||
private CoprocessorHost coprocessorHost;
|
||||
private RegionCoprocessorHost coprocessorHost;
|
||||
|
||||
/**
|
||||
* Name of the region info file that resides just under the region directory.
|
||||
|
@ -319,7 +319,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// don't initialize coprocessors if not running within a regionserver
|
||||
// TODO: revisit if coprocessors should load in other cases
|
||||
if (rsServices != null) {
|
||||
this.coprocessorHost = new CoprocessorHost(this, rsServices, conf);
|
||||
this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
// Write out region name as string and its encoded name.
|
||||
|
@ -3557,12 +3557,12 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
/** @return the coprocessor host */
|
||||
public CoprocessorHost getCoprocessorHost() {
|
||||
public RegionCoprocessorHost getCoprocessorHost() {
|
||||
return coprocessorHost;
|
||||
}
|
||||
|
||||
/** @param coprocessorHost the new coprocessor host */
|
||||
public void setCoprocessorHost(final CoprocessorHost coprocessorHost) {
|
||||
public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
|
||||
this.coprocessorHost = coprocessorHost;
|
||||
}
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -480,17 +480,28 @@
|
|||
</property>
|
||||
|
||||
<property>
|
||||
<name>hbase.coprocessor.default.classes</name>
|
||||
<name>hbase.coprocessor.region.classes</name>
|
||||
<value></value>
|
||||
<description>A comma-separated list of Coprocessors that are loaded by
|
||||
default. For any override coprocessor method, these classes will be called
|
||||
in order. After implement your own
|
||||
Coprocessor, just put it in HBase's classpath and add the fully
|
||||
qualified class name here.
|
||||
default on all tables. For any override coprocessor method, these classes
|
||||
will be called in order. After implementing your own Coprocessor, just put
|
||||
it in HBase's classpath and add the fully qualified class name here.
|
||||
A coprocessor can also be loaded on demand by setting HTableDescriptor.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hbase.coprocessor.master.classes</name>
|
||||
<value></value>
|
||||
<description>A comma-separated list of
|
||||
org.apache.hadoop.hbase.coprocessor.MasterObserver coprocessors that are
|
||||
loaded by default on the active HMaster process. For any implemented
|
||||
coprocessor methods, the listed classes will be called in order. After
|
||||
implementing your own MasterObserver, just put it in HBase's classpath
|
||||
and add the fully qualified class name here.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<!--
|
||||
The following three properties are used together to create the list of
|
||||
host:peer_port:leader_port quorum servers for ZooKeeper.
|
||||
|
|
|
@ -43,7 +43,8 @@ implements ColumnAggregationProtocol {
|
|||
scan.addColumn(family, qualifier);
|
||||
int sumResult = 0;
|
||||
|
||||
InternalScanner scanner = getEnvironment().getRegion().getScanner(scan);
|
||||
InternalScanner scanner = ((RegionCoprocessorEnvironment)getEnvironment())
|
||||
.getRegion().getScanner(scan);
|
||||
try {
|
||||
List<KeyValue> curVals = new ArrayList<KeyValue>();
|
||||
boolean done = false;
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
|
@ -68,12 +69,12 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
boolean hadPostIncrement = false;
|
||||
|
||||
@Override
|
||||
public void preOpen(CoprocessorEnvironment e) {
|
||||
public void preOpen(RegionCoprocessorEnvironment e) {
|
||||
hadPreOpen = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postOpen(CoprocessorEnvironment e) {
|
||||
public void postOpen(RegionCoprocessorEnvironment e) {
|
||||
hadPostOpen = true;
|
||||
}
|
||||
|
||||
|
@ -82,12 +83,12 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void preClose(CoprocessorEnvironment e, boolean abortRequested) {
|
||||
public void preClose(RegionCoprocessorEnvironment e, boolean abortRequested) {
|
||||
hadPreClose = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postClose(CoprocessorEnvironment e, boolean abortRequested) {
|
||||
public void postClose(RegionCoprocessorEnvironment e, boolean abortRequested) {
|
||||
hadPostClose = true;
|
||||
}
|
||||
|
||||
|
@ -96,12 +97,12 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void preFlush(CoprocessorEnvironment e) {
|
||||
public void preFlush(RegionCoprocessorEnvironment e) {
|
||||
hadPreFlush = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postFlush(CoprocessorEnvironment e) {
|
||||
public void postFlush(RegionCoprocessorEnvironment e) {
|
||||
hadPostFlush = true;
|
||||
}
|
||||
|
||||
|
@ -110,12 +111,12 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void preSplit(CoprocessorEnvironment e) {
|
||||
public void preSplit(RegionCoprocessorEnvironment e) {
|
||||
hadPreSplit = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) {
|
||||
public void postSplit(RegionCoprocessorEnvironment e, HRegion l, HRegion r) {
|
||||
hadPostSplit = true;
|
||||
}
|
||||
|
||||
|
@ -124,12 +125,12 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void preCompact(CoprocessorEnvironment e, boolean willSplit) {
|
||||
public void preCompact(RegionCoprocessorEnvironment e, boolean willSplit) {
|
||||
hadPreCompact = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postCompact(CoprocessorEnvironment e, boolean willSplit) {
|
||||
public void postCompact(RegionCoprocessorEnvironment e, boolean willSplit) {
|
||||
hadPostCompact = true;
|
||||
}
|
||||
|
||||
|
@ -138,7 +139,7 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void preGet(final CoprocessorEnvironment e, final Get get,
|
||||
public void preGet(final RegionCoprocessorEnvironment e, final Get get,
|
||||
final List<KeyValue> results) throws IOException {
|
||||
assertNotNull(e);
|
||||
assertNotNull(e.getRegion());
|
||||
|
@ -151,7 +152,7 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void postGet(final CoprocessorEnvironment e, final Get get,
|
||||
public void postGet(final RegionCoprocessorEnvironment e, final Get get,
|
||||
final List<KeyValue> results) {
|
||||
assertNotNull(e);
|
||||
assertNotNull(e.getRegion());
|
||||
|
@ -181,7 +182,7 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void prePut(final CoprocessorEnvironment e, final Map<byte[],
|
||||
public void prePut(final RegionCoprocessorEnvironment e, final Map<byte[],
|
||||
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
|
||||
assertNotNull(e);
|
||||
assertNotNull(e.getRegion());
|
||||
|
@ -208,7 +209,7 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void postPut(final CoprocessorEnvironment e, final Map<byte[],
|
||||
public void postPut(final RegionCoprocessorEnvironment e, final Map<byte[],
|
||||
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
|
||||
assertNotNull(e);
|
||||
assertNotNull(e.getRegion());
|
||||
|
@ -235,7 +236,7 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void preDelete(final CoprocessorEnvironment e, final Map<byte[],
|
||||
public void preDelete(final RegionCoprocessorEnvironment e, final Map<byte[],
|
||||
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
|
||||
assertNotNull(e);
|
||||
assertNotNull(e.getRegion());
|
||||
|
@ -247,7 +248,7 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void postDelete(final CoprocessorEnvironment e, final Map<byte[],
|
||||
public void postDelete(final RegionCoprocessorEnvironment e, final Map<byte[],
|
||||
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
|
||||
assertNotNull(e);
|
||||
assertNotNull(e.getRegion());
|
||||
|
@ -260,7 +261,7 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void preGetClosestRowBefore(final CoprocessorEnvironment e,
|
||||
public void preGetClosestRowBefore(final RegionCoprocessorEnvironment e,
|
||||
final byte[] row, final byte[] family, final Result result)
|
||||
throws IOException {
|
||||
assertNotNull(e);
|
||||
|
@ -274,7 +275,7 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void postGetClosestRowBefore(final CoprocessorEnvironment e,
|
||||
public void postGetClosestRowBefore(final RegionCoprocessorEnvironment e,
|
||||
final byte[] row, final byte[] family, final Result result)
|
||||
throws IOException {
|
||||
assertNotNull(e);
|
||||
|
@ -288,7 +289,7 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void preIncrement(final CoprocessorEnvironment e,
|
||||
public void preIncrement(final RegionCoprocessorEnvironment e,
|
||||
final Increment increment, final Result result) throws IOException {
|
||||
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
|
||||
TestRegionObserverInterface.TEST_TABLE_2)) {
|
||||
|
@ -297,7 +298,7 @@ public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void postIncrement(final CoprocessorEnvironment e,
|
||||
public void postIncrement(final RegionCoprocessorEnvironment e,
|
||||
final Increment increment, final Result result) throws IOException {
|
||||
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
|
||||
TestRegionObserverInterface.TEST_TABLE_2)) {
|
||||
|
|
|
@ -53,7 +53,7 @@ public class TestCoprocessorEndpoint {
|
|||
public static void setupBeforeClass() throws Exception {
|
||||
// set configure to indicate which cp should be loaded
|
||||
Configuration conf = util.getConfiguration();
|
||||
conf.set("hbase.coprocessor.default.classes",
|
||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
"org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint");
|
||||
|
||||
util.startMiniCluster(2);
|
||||
|
|
|
@ -30,10 +30,8 @@ import org.apache.hadoop.hbase.HBaseTestCase;
|
|||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.coprocessor.Coprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
|
||||
import org.apache.hadoop.hbase.regionserver.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.SplitTransaction;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -75,43 +73,43 @@ public class TestCoprocessorInterface extends HBaseTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void preOpen(CoprocessorEnvironment e) {
|
||||
public void preOpen(RegionCoprocessorEnvironment e) {
|
||||
preOpenCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void postOpen(CoprocessorEnvironment e) {
|
||||
public void postOpen(RegionCoprocessorEnvironment e) {
|
||||
postOpenCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void preClose(CoprocessorEnvironment e, boolean abortRequested) {
|
||||
public void preClose(RegionCoprocessorEnvironment e, boolean abortRequested) {
|
||||
preCloseCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void postClose(CoprocessorEnvironment e, boolean abortRequested) {
|
||||
public void postClose(RegionCoprocessorEnvironment e, boolean abortRequested) {
|
||||
postCloseCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void preCompact(CoprocessorEnvironment e, boolean willSplit) {
|
||||
public void preCompact(RegionCoprocessorEnvironment e, boolean willSplit) {
|
||||
preCompactCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void postCompact(CoprocessorEnvironment e, boolean willSplit) {
|
||||
public void postCompact(RegionCoprocessorEnvironment e, boolean willSplit) {
|
||||
postCompactCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void preFlush(CoprocessorEnvironment e) {
|
||||
public void preFlush(RegionCoprocessorEnvironment e) {
|
||||
preFlushCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void postFlush(CoprocessorEnvironment e) {
|
||||
public void postFlush(RegionCoprocessorEnvironment e) {
|
||||
postFlushCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void preSplit(CoprocessorEnvironment e) {
|
||||
public void preSplit(RegionCoprocessorEnvironment e) {
|
||||
preSplitCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) {
|
||||
public void postSplit(RegionCoprocessorEnvironment e, HRegion l, HRegion r) {
|
||||
postSplitCalled = true;
|
||||
}
|
||||
|
||||
|
@ -191,7 +189,7 @@ public class TestCoprocessorInterface extends HBaseTestCase {
|
|||
// is secretly loaded at OpenRegionHandler. we don't really
|
||||
// start a region server here, so just manually create cphost
|
||||
// and set it to region.
|
||||
CoprocessorHost host = new CoprocessorHost(r, null, conf);
|
||||
RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
|
||||
r.setCoprocessorHost(host);
|
||||
|
||||
host.load(implClass, Priority.USER);
|
||||
|
@ -218,7 +216,7 @@ public class TestCoprocessorInterface extends HBaseTestCase {
|
|||
HRegion r = HRegion.createHRegion(info, path, conf);
|
||||
|
||||
// this following piece is a hack.
|
||||
CoprocessorHost host = new CoprocessorHost(r, null, conf);
|
||||
RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
|
||||
r.setCoprocessorHost(host);
|
||||
|
||||
host.load(implClass, Priority.USER);
|
||||
|
|
|
@ -0,0 +1,499 @@
|
|||
/*
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Tests invocation of the {@link org.apache.hadoop.hbase.coprocessor.MasterObserver}
|
||||
* interface hooks at all appropriate times during normal HMaster operations.
|
||||
*/
|
||||
public class TestMasterObserver {
|
||||
|
||||
public static class CPMasterObserver implements MasterObserver {
|
||||
|
||||
private boolean preCreateTableCalled;
|
||||
private boolean postCreateTableCalled;
|
||||
private boolean preDeleteTableCalled;
|
||||
private boolean postDeleteTableCalled;
|
||||
private boolean preModifyTableCalled;
|
||||
private boolean postModifyTableCalled;
|
||||
private boolean preAddColumnCalled;
|
||||
private boolean postAddColumnCalled;
|
||||
private boolean preModifyColumnCalled;
|
||||
private boolean postModifyColumnCalled;
|
||||
private boolean preDeleteColumnCalled;
|
||||
private boolean postDeleteColumnCalled;
|
||||
private boolean preEnableTableCalled;
|
||||
private boolean postEnableTableCalled;
|
||||
private boolean preDisableTableCalled;
|
||||
private boolean postDisableTableCalled;
|
||||
private boolean preMoveCalled;
|
||||
private boolean postMoveCalled;
|
||||
private boolean preAssignCalled;
|
||||
private boolean postAssignCalled;
|
||||
private boolean preUnassignCalled;
|
||||
private boolean postUnassignCalled;
|
||||
private boolean preBalanceCalled;
|
||||
private boolean postBalanceCalled;
|
||||
private boolean preBalanceSwitchCalled;
|
||||
private boolean postBalanceSwitchCalled;
|
||||
private boolean preShutdownCalled;
|
||||
private boolean preStopMasterCalled;
|
||||
private boolean startCalled;
|
||||
private boolean stopCalled;
|
||||
|
||||
@Override
|
||||
public void preCreateTable(MasterCoprocessorEnvironment env,
|
||||
HTableDescriptor desc, byte[][] splitKeys) throws IOException {
|
||||
preCreateTableCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postCreateTable(MasterCoprocessorEnvironment env,
|
||||
HRegionInfo[] regions, boolean sync) throws IOException {
|
||||
postCreateTableCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasCreateTableCalled() {
|
||||
return preCreateTableCalled && postCreateTableCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preDeleteTable(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName) throws IOException {
|
||||
preDeleteTableCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDeleteTable(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName) throws IOException {
|
||||
postDeleteTableCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasDeleteTableCalled() {
|
||||
return preDeleteTableCalled && postDeleteTableCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preModifyTable(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, HTableDescriptor htd) throws IOException {
|
||||
preModifyTableCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postModifyTable(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, HTableDescriptor htd) throws IOException {
|
||||
postModifyTableCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasModifyTableCalled() {
|
||||
return preModifyTableCalled && postModifyTableCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preAddColumn(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, HColumnDescriptor column) throws IOException {
|
||||
preAddColumnCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postAddColumn(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, HColumnDescriptor column) throws IOException {
|
||||
postAddColumnCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasAddColumnCalled() {
|
||||
return preAddColumnCalled && postAddColumnCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preModifyColumn(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, HColumnDescriptor descriptor) throws IOException {
|
||||
preModifyColumnCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postModifyColumn(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, HColumnDescriptor descriptor) throws IOException {
|
||||
postModifyColumnCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasModifyColumnCalled() {
|
||||
return preModifyColumnCalled && postModifyColumnCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preDeleteColumn(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, byte[] c) throws IOException {
|
||||
preDeleteColumnCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDeleteColumn(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName, byte[] c) throws IOException {
|
||||
postDeleteColumnCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasDeleteColumnCalled() {
|
||||
return preDeleteColumnCalled && postDeleteColumnCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preEnableTable(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName) throws IOException {
|
||||
preEnableTableCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postEnableTable(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName) throws IOException {
|
||||
postEnableTableCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasEnableTableCalled() {
|
||||
return preEnableTableCalled && postEnableTableCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preDisableTable(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName) throws IOException {
|
||||
preDisableTableCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDisableTable(MasterCoprocessorEnvironment env,
|
||||
byte[] tableName) throws IOException {
|
||||
postDisableTableCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasDisableTableCalled() {
|
||||
return preDisableTableCalled && postDisableTableCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preMove(MasterCoprocessorEnvironment env,
|
||||
HRegionInfo region, HServerInfo srcServer, HServerInfo destServer)
|
||||
throws UnknownRegionException {
|
||||
preMoveCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postMove(MasterCoprocessorEnvironment env, HRegionInfo region,
|
||||
HServerInfo srcServer, HServerInfo destServer)
|
||||
throws UnknownRegionException {
|
||||
postMoveCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasMoveCalled() {
|
||||
return preMoveCalled && postMoveCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preAssign(MasterCoprocessorEnvironment env,
|
||||
final byte [] regionName, final boolean force) throws IOException {
|
||||
preAssignCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postAssign(MasterCoprocessorEnvironment env,
|
||||
final HRegionInfo regionInfo) throws IOException {
|
||||
postAssignCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasAssignCalled() {
|
||||
return preAssignCalled && postAssignCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preUnassign(MasterCoprocessorEnvironment env,
|
||||
final byte [] regionName, final boolean force) throws IOException {
|
||||
preUnassignCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postUnassign(MasterCoprocessorEnvironment env,
|
||||
final HRegionInfo regionInfo, final boolean force) throws IOException {
|
||||
postUnassignCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasUnassignCalled() {
|
||||
return preUnassignCalled && postUnassignCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preBalance(MasterCoprocessorEnvironment env)
|
||||
throws IOException {
|
||||
preBalanceCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postBalance(MasterCoprocessorEnvironment env)
|
||||
throws IOException {
|
||||
postBalanceCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasBalanceCalled() {
|
||||
return preBalanceCalled && postBalanceCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preBalanceSwitch(MasterCoprocessorEnvironment env, boolean b)
|
||||
throws IOException {
|
||||
preBalanceSwitchCalled = true;
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postBalanceSwitch(MasterCoprocessorEnvironment env,
|
||||
boolean oldValue, boolean newValue) throws IOException {
|
||||
postBalanceSwitchCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasBalanceSwitchCalled() {
|
||||
return preBalanceSwitchCalled && postBalanceSwitchCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preShutdown(MasterCoprocessorEnvironment env)
|
||||
throws IOException {
|
||||
preShutdownCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStopMaster(MasterCoprocessorEnvironment env)
|
||||
throws IOException {
|
||||
preStopMasterCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment env) throws IOException {
|
||||
startCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(CoprocessorEnvironment env) throws IOException {
|
||||
stopCalled = true;
|
||||
}
|
||||
|
||||
public boolean wasStarted() { return startCalled; }
|
||||
|
||||
public boolean wasStopped() { return stopCalled; }
|
||||
}
|
||||
|
||||
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static byte[] TEST_TABLE = Bytes.toBytes("observed_table");
|
||||
private static byte[] TEST_FAMILY = Bytes.toBytes("fam1");
|
||||
private static byte[] TEST_FAMILY2 = Bytes.toBytes("fam2");
|
||||
|
||||
@BeforeClass
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
|
||||
CPMasterObserver.class.getName());
|
||||
|
||||
UTIL.startMiniCluster(2);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownAfterClass() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStarted() throws Exception {
|
||||
MiniHBaseCluster cluster = UTIL.getHBaseCluster();
|
||||
|
||||
HMaster master = cluster.getMaster();
|
||||
assertTrue("Master should be active", master.isActiveMaster());
|
||||
MasterCoprocessorHost host = master.getCoprocessorHost();
|
||||
assertNotNull("CoprocessorHost should not be null", host);
|
||||
CPMasterObserver cp = (CPMasterObserver)host.findCoprocessor(
|
||||
CPMasterObserver.class.getName());
|
||||
assertNotNull("CPMasterObserver coprocessor not found or not installed!", cp);
|
||||
|
||||
// check basic lifecycle
|
||||
assertTrue("MasterObserver should have been started", cp.wasStarted());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableOperations() throws Exception {
|
||||
MiniHBaseCluster cluster = UTIL.getHBaseCluster();
|
||||
|
||||
HMaster master = cluster.getMaster();
|
||||
MasterCoprocessorHost host = master.getCoprocessorHost();
|
||||
CPMasterObserver cp = (CPMasterObserver)host.findCoprocessor(
|
||||
CPMasterObserver.class.getName());
|
||||
assertFalse("No table created yet", cp.wasCreateTableCalled());
|
||||
|
||||
// create a table
|
||||
HTableDescriptor htd = new HTableDescriptor(TEST_TABLE);
|
||||
htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
|
||||
HBaseAdmin admin = UTIL.getHBaseAdmin();
|
||||
admin.createTable(htd);
|
||||
assertTrue("Test table should be created", cp.wasCreateTableCalled());
|
||||
|
||||
// disable
|
||||
assertFalse(cp.wasDisableTableCalled());
|
||||
admin.disableTable(TEST_TABLE);
|
||||
assertTrue(admin.isTableDisabled(TEST_TABLE));
|
||||
assertTrue("Coprocessor should have been called on table disable",
|
||||
cp.wasDisableTableCalled());
|
||||
|
||||
// modify table
|
||||
htd.setMaxFileSize(512 * 1024 * 1024);
|
||||
admin.modifyTable(TEST_TABLE, htd);
|
||||
assertTrue("Test table should have been modified",
|
||||
cp.wasModifyTableCalled());
|
||||
|
||||
// add a column family
|
||||
admin.addColumn(TEST_TABLE, new HColumnDescriptor(TEST_FAMILY2));
|
||||
assertTrue("New column family should have been added to test table",
|
||||
cp.wasAddColumnCalled());
|
||||
|
||||
// modify a column family
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY2);
|
||||
hcd.setMaxVersions(25);
|
||||
admin.modifyColumn(TEST_TABLE, hcd);
|
||||
assertTrue("Second column family should be modified",
|
||||
cp.wasModifyColumnCalled());
|
||||
|
||||
// enable
|
||||
assertFalse(cp.wasEnableTableCalled());
|
||||
admin.enableTable(TEST_TABLE);
|
||||
assertTrue(admin.isTableEnabled(TEST_TABLE));
|
||||
assertTrue("Coprocessor should have been called on table enable",
|
||||
cp.wasEnableTableCalled());
|
||||
|
||||
// disable again
|
||||
admin.disableTable(TEST_TABLE);
|
||||
assertTrue(admin.isTableDisabled(TEST_TABLE));
|
||||
|
||||
// delete column
|
||||
assertFalse("No column family deleted yet", cp.wasDeleteColumnCalled());
|
||||
admin.deleteColumn(TEST_TABLE, TEST_FAMILY2);
|
||||
HTableDescriptor tableDesc = admin.getTableDescriptor(TEST_TABLE);
|
||||
assertNull("'"+Bytes.toString(TEST_FAMILY2)+"' should have been removed",
|
||||
tableDesc.getFamily(TEST_FAMILY2));
|
||||
assertTrue("Coprocessor should have been called on column delete",
|
||||
cp.wasDeleteColumnCalled());
|
||||
|
||||
// delete table
|
||||
assertFalse("No table deleted yet", cp.wasDeleteTableCalled());
|
||||
admin.deleteTable(TEST_TABLE);
|
||||
assertFalse("Test table should have been deleted",
|
||||
admin.tableExists(TEST_TABLE));
|
||||
assertTrue("Coprocessor should have been called on table delete",
|
||||
cp.wasDeleteTableCalled());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionTransitionOperations() throws Exception {
|
||||
MiniHBaseCluster cluster = UTIL.getHBaseCluster();
|
||||
|
||||
HMaster master = cluster.getMaster();
|
||||
MasterCoprocessorHost host = master.getCoprocessorHost();
|
||||
CPMasterObserver cp = (CPMasterObserver)host.findCoprocessor(
|
||||
CPMasterObserver.class.getName());
|
||||
|
||||
HTable table = UTIL.createTable(TEST_TABLE, TEST_FAMILY);
|
||||
UTIL.createMultiRegions(table, TEST_FAMILY);
|
||||
|
||||
Map<HRegionInfo,HServerAddress> regions = table.getRegionsInfo();
|
||||
assertFalse(regions.isEmpty());
|
||||
Map.Entry<HRegionInfo,HServerAddress> firstRegion =
|
||||
regions.entrySet().iterator().next();
|
||||
|
||||
// try to force a move
|
||||
Collection<HServerInfo> servers = master.getClusterStatus().getServerInfo();
|
||||
String destName = null;
|
||||
for (HServerInfo info : servers) {
|
||||
if (!info.getServerAddress().equals(firstRegion.getValue())) {
|
||||
destName = info.getServerName();
|
||||
break;
|
||||
}
|
||||
}
|
||||
master.move(firstRegion.getKey().getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(destName));
|
||||
assertTrue("Coprocessor should have been called on region move",
|
||||
cp.wasMoveCalled());
|
||||
|
||||
// make sure balancer is on
|
||||
master.balanceSwitch(true);
|
||||
assertTrue("Coprocessor should have been called on balance switch",
|
||||
cp.wasBalanceSwitchCalled());
|
||||
|
||||
// force region rebalancing
|
||||
master.balanceSwitch(false);
|
||||
// move half the open regions from RS 0 to RS 1
|
||||
HRegionServer rs = cluster.getRegionServer(0);
|
||||
byte[] destRS = Bytes.toBytes(cluster.getRegionServer(1).getServerName());
|
||||
List<HRegionInfo> openRegions = rs.getOnlineRegions();
|
||||
int moveCnt = openRegions.size()/2;
|
||||
for (int i=0; i<moveCnt; i++) {
|
||||
HRegionInfo info = openRegions.get(i);
|
||||
if (!(info.isMetaRegion() || info.isRootRegion())) {
|
||||
master.move(openRegions.get(i).getEncodedNameAsBytes(), destRS);
|
||||
}
|
||||
}
|
||||
|
||||
// wait for assignments to finish
|
||||
AssignmentManager mgr = master.getAssignmentManager();
|
||||
Collection<AssignmentManager.RegionState> transRegions =
|
||||
mgr.getRegionsInTransition().values();
|
||||
for (AssignmentManager.RegionState state : transRegions) {
|
||||
mgr.waitOnRegionToClearRegionsInTransition(state.getRegion());
|
||||
}
|
||||
|
||||
// now trigger a balance
|
||||
master.balanceSwitch(true);
|
||||
boolean balanceRun = master.balance();
|
||||
assertTrue("Balance request should have run", balanceRun);
|
||||
assertTrue("Coprocessor should be called on region rebalancing",
|
||||
cp.wasBalanceCalled());
|
||||
}
|
||||
}
|
|
@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.client.HTable;
|
|||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
|
||||
import org.apache.hadoop.hbase.regionserver.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
|
@ -47,8 +47,6 @@ import org.junit.AfterClass;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestRegionObserverInterface {
|
||||
|
@ -77,7 +75,7 @@ public class TestRegionObserverInterface {
|
|||
public static void setupBeforeClass() throws Exception {
|
||||
// set configure to indicate which cp should be loaded
|
||||
Configuration conf = util.getConfiguration();
|
||||
conf.set("hbase.coprocessor.default.classes",
|
||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
"org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver");
|
||||
|
||||
util.startMiniCluster(2);
|
||||
|
@ -114,7 +112,7 @@ public class TestRegionObserverInterface {
|
|||
// start a region server here, so just manually create cphost
|
||||
// and set it to region.
|
||||
HRegion r = HRegion.createHRegion(info, path, conf);
|
||||
CoprocessorHost host = new CoprocessorHost(r, null, conf);
|
||||
RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
|
||||
r.setCoprocessorHost(host);
|
||||
host.load(implClass, Priority.USER);
|
||||
return r;
|
||||
|
@ -145,7 +143,7 @@ public class TestRegionObserverInterface {
|
|||
if (!Arrays.equals(r.getTableDesc().getName(), TEST_TABLE)) {
|
||||
continue;
|
||||
}
|
||||
CoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
|
||||
RegionCoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
|
||||
getCoprocessorHost();
|
||||
Coprocessor c = cph.findCoprocessor(SimpleRegionObserver.class.getName());
|
||||
assertNotNull(c);
|
||||
|
@ -175,7 +173,7 @@ public class TestRegionObserverInterface {
|
|||
if (!Arrays.equals(r.getTableDesc().getName(), TEST_TABLE_2)) {
|
||||
continue;
|
||||
}
|
||||
CoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
|
||||
RegionCoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
|
||||
getCoprocessorHost();
|
||||
Coprocessor c = cph.findCoprocessor(SimpleRegionObserver.class.getName());
|
||||
assertTrue(((SimpleRegionObserver)c).hadPreIncrement());
|
||||
|
|
|
@ -33,8 +33,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.regionserver.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
|
@ -46,7 +45,7 @@ public class TestRegionObserverStacking extends TestCase {
|
|||
public static class ObserverA extends BaseRegionObserverCoprocessor {
|
||||
long id;
|
||||
@Override
|
||||
public void postPut(final CoprocessorEnvironment e, final Map<byte[],
|
||||
public void postPut(final RegionCoprocessorEnvironment e, final Map<byte[],
|
||||
List<KeyValue>> familyMap, final boolean writeToWAL)
|
||||
throws IOException {
|
||||
id = System.currentTimeMillis();
|
||||
|
@ -60,7 +59,7 @@ public class TestRegionObserverStacking extends TestCase {
|
|||
public static class ObserverB extends BaseRegionObserverCoprocessor {
|
||||
long id;
|
||||
@Override
|
||||
public void postPut(final CoprocessorEnvironment e, final Map<byte[],
|
||||
public void postPut(final RegionCoprocessorEnvironment e, final Map<byte[],
|
||||
List<KeyValue>> familyMap, final boolean writeToWAL)
|
||||
throws IOException {
|
||||
id = System.currentTimeMillis();
|
||||
|
@ -75,7 +74,7 @@ public class TestRegionObserverStacking extends TestCase {
|
|||
long id;
|
||||
|
||||
@Override
|
||||
public void postPut(final CoprocessorEnvironment e, final Map<byte[],
|
||||
public void postPut(final RegionCoprocessorEnvironment e, final Map<byte[],
|
||||
List<KeyValue>> familyMap, final boolean writeToWAL)
|
||||
throws IOException {
|
||||
id = System.currentTimeMillis();
|
||||
|
@ -99,7 +98,7 @@ public class TestRegionObserverStacking extends TestCase {
|
|||
// is secretly loaded at OpenRegionHandler. we don't really
|
||||
// start a region server here, so just manually create cphost
|
||||
// and set it to region.
|
||||
CoprocessorHost host = new CoprocessorHost(r, null, conf);
|
||||
RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
|
||||
r.setCoprocessorHost(host);
|
||||
return r;
|
||||
}
|
||||
|
@ -112,7 +111,7 @@ public class TestRegionObserverStacking extends TestCase {
|
|||
|
||||
HRegion region = initHRegion(TABLE, getClass().getName(),
|
||||
HBaseConfiguration.create(), FAMILIES);
|
||||
CoprocessorHost h = region.getCoprocessorHost();
|
||||
RegionCoprocessorHost h = region.getCoprocessorHost();
|
||||
h.load(ObserverA.class, Priority.HIGHEST);
|
||||
h.load(ObserverB.class, Priority.USER);
|
||||
h.load(ObserverC.class, Priority.LOWEST);
|
||||
|
|
|
@ -150,7 +150,16 @@ public class TestCatalogJanitor {
|
|||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ZooKeeperWatcher getZooKeeperWatcher() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CatalogTracker getCatalogTracker() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue