HBASE-4048 Support configuration of coprocessor at load time
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1142004 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bca3fcd502
commit
9ed65d97c6
|
@ -13,7 +13,7 @@ Release 0.91.0 - Unreleased
|
|||
HBASE-451 Remove HTableDescriptor from HRegionInfo (Subbu M Iyer)
|
||||
HBASE-451 Remove HTableDescriptor from HRegionInfo
|
||||
addendum that fixes TestTableMapReduce
|
||||
HBASE-3534 Action should not store or serialize regionName (Ted Yu)
|
||||
HBASE-3534 Action should not store or serialize regionName (Ted Yu)
|
||||
|
||||
BUG FIXES
|
||||
HBASE-3280 YouAreDeadException being swallowed in HRS getMaster
|
||||
|
@ -295,6 +295,7 @@ Release 0.91.0 - Unreleased
|
|||
HBASE-3516 Coprocessors: add test cases for loading coprocessor jars
|
||||
(Mingjie Lai via garyh)
|
||||
HBASE-4036 Implementing a MultipleColumnPrefixFilter (Anirudh Todi)
|
||||
HBASE-4048 [Coprocessors] Support configuration of coprocessor at load time
|
||||
|
||||
TASKS
|
||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||
|
|
|
@ -22,28 +22,16 @@ import java.io.IOException;
|
|||
* Coprocess interface.
|
||||
*/
|
||||
public interface Coprocessor {
|
||||
public static final int VERSION = 1;
|
||||
static final int VERSION = 1;
|
||||
|
||||
/**
|
||||
* Installation priority. Coprocessors will be executed in sequence
|
||||
* by the order of coprocessor priority.
|
||||
*/
|
||||
public enum Priority {
|
||||
HIGHEST(0),
|
||||
SYSTEM(Integer.MAX_VALUE/4),
|
||||
USER(Integer.MAX_VALUE/2),
|
||||
LOWEST(Integer.MAX_VALUE);
|
||||
|
||||
private int prio;
|
||||
|
||||
Priority(int prio) {
|
||||
this.prio = prio;
|
||||
}
|
||||
|
||||
public int intValue() {
|
||||
return prio;
|
||||
}
|
||||
}
|
||||
/** Highest installation priority */
|
||||
static final int PRIORITY_HIGHEST = 0;
|
||||
/** High (system) installation priority */
|
||||
static final int PRIORITY_SYSTEM = Integer.MAX_VALUE / 4;
|
||||
/** Default installation priority for user coprocessors */
|
||||
static final int PRIORITY_USER = Integer.MAX_VALUE / 2;
|
||||
/** Lowest installation priority */
|
||||
static final int PRIORITY_LOWEST = Integer.MAX_VALUE;
|
||||
|
||||
/**
|
||||
* Lifecycle state of a given coprocessor instance.
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.coprocessor;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.HTableInterface;
|
||||
|
||||
/**
|
||||
|
@ -35,11 +36,14 @@ public interface CoprocessorEnvironment {
|
|||
public Coprocessor getInstance();
|
||||
|
||||
/** @return the priority assigned to the loaded coprocessor */
|
||||
public Coprocessor.Priority getPriority();
|
||||
public int getPriority();
|
||||
|
||||
/** @return the load sequence number */
|
||||
public int getLoadSequence();
|
||||
|
||||
/** @return the configuration */
|
||||
public Configuration getConfiguration();
|
||||
|
||||
/**
|
||||
* @return an interface for accessing the given table
|
||||
* @throws IOException
|
||||
|
|
|
@ -79,7 +79,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
if (defaultCPClasses == null || defaultCPClasses.length() == 0)
|
||||
return;
|
||||
StringTokenizer st = new StringTokenizer(defaultCPClasses, ",");
|
||||
int priority = Coprocessor.Priority.SYSTEM.intValue();
|
||||
int priority = Coprocessor.PRIORITY_SYSTEM;
|
||||
List<E> configured = new ArrayList<E>();
|
||||
while (st.hasMoreTokens()) {
|
||||
String className = st.nextToken();
|
||||
|
@ -90,7 +90,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
Thread.currentThread().setContextClassLoader(cl);
|
||||
try {
|
||||
implClass = cl.loadClass(className);
|
||||
configured.add(loadInstance(implClass, Coprocessor.Priority.SYSTEM));
|
||||
configured.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
|
||||
LOG.info("System coprocessor " + className + " was loaded " +
|
||||
"successfully with priority (" + priority++ + ").");
|
||||
} catch (ClassNotFoundException e) {
|
||||
|
@ -111,11 +111,12 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
* @param path path to implementation jar
|
||||
* @param className the main class name
|
||||
* @param priority chaining priority
|
||||
* @param conf configuration for coprocessor
|
||||
* @throws java.io.IOException Exception
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public E load(Path path, String className, Coprocessor.Priority priority)
|
||||
throws IOException {
|
||||
public E load(Path path, String className, int priority,
|
||||
Configuration conf) throws IOException {
|
||||
Class<?> implClass = null;
|
||||
|
||||
// Have we already loaded the class, perhaps from an earlier region open
|
||||
|
@ -169,21 +170,28 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
}
|
||||
}
|
||||
|
||||
return loadInstance(implClass, priority);
|
||||
return loadInstance(implClass, priority, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param implClass Implementation class
|
||||
* @param priority priority
|
||||
* @param conf configuration
|
||||
* @throws java.io.IOException Exception
|
||||
*/
|
||||
public void load(Class<?> implClass, Coprocessor.Priority priority)
|
||||
public void load(Class<?> implClass, int priority, Configuration conf)
|
||||
throws IOException {
|
||||
E env = loadInstance(implClass, priority);
|
||||
E env = loadInstance(implClass, priority, conf);
|
||||
coprocessors.add(env);
|
||||
}
|
||||
|
||||
public E loadInstance(Class<?> implClass, Coprocessor.Priority priority)
|
||||
/**
|
||||
* @param implClass Implementation class
|
||||
* @param priority priority
|
||||
* @param conf configuration
|
||||
* @throws java.io.IOException Exception
|
||||
*/
|
||||
public E loadInstance(Class<?> implClass, int priority, Configuration conf)
|
||||
throws IOException {
|
||||
// create the instance
|
||||
Coprocessor impl;
|
||||
|
@ -197,7 +205,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
throw new IOException(e);
|
||||
}
|
||||
// create the environment
|
||||
E env = createEnvironment(implClass, impl, priority, ++loadSequence);
|
||||
E env = createEnvironment(implClass, impl, priority, ++loadSequence, conf);
|
||||
if (env instanceof Environment) {
|
||||
((Environment)env).startup();
|
||||
}
|
||||
|
@ -208,7 +216,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
* Called when a new Coprocessor class is loaded
|
||||
*/
|
||||
public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
|
||||
Coprocessor.Priority priority, int sequence);
|
||||
int priority, int sequence, Configuration conf);
|
||||
|
||||
public void shutdown(CoprocessorEnvironment e) {
|
||||
if (e instanceof Environment) {
|
||||
|
@ -235,15 +243,33 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a coprocessor environment by class name
|
||||
* @param className the class name
|
||||
* @return the coprocessor, or null if not found
|
||||
*/
|
||||
public CoprocessorEnvironment findCoprocessorEnvironment(String className) {
|
||||
// initialize the coprocessors
|
||||
for (E env: coprocessors) {
|
||||
if (env.getInstance().getClass().getName().equals(className) ||
|
||||
env.getInstance().getClass().getSimpleName().equals(className)) {
|
||||
return env;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Environment priority comparator.
|
||||
* Coprocessors are chained in sorted order.
|
||||
*/
|
||||
static class EnvironmentPriorityComparator implements Comparator<CoprocessorEnvironment> {
|
||||
public int compare(CoprocessorEnvironment env1, CoprocessorEnvironment env2) {
|
||||
if (env1.getPriority().intValue() < env2.getPriority().intValue()) {
|
||||
static class EnvironmentPriorityComparator
|
||||
implements Comparator<CoprocessorEnvironment> {
|
||||
public int compare(final CoprocessorEnvironment env1,
|
||||
final CoprocessorEnvironment env2) {
|
||||
if (env1.getPriority() < env2.getPriority()) {
|
||||
return -1;
|
||||
} else if (env1.getPriority().intValue() > env2.getPriority().intValue()) {
|
||||
} else if (env1.getPriority() > env2.getPriority()) {
|
||||
return 1;
|
||||
}
|
||||
if (env1.getLoadSequence() < env2.getLoadSequence()) {
|
||||
|
@ -437,24 +463,27 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
/** The coprocessor */
|
||||
public Coprocessor impl;
|
||||
/** Chaining priority */
|
||||
protected Coprocessor.Priority priority = Coprocessor.Priority.USER;
|
||||
protected int priority = Coprocessor.PRIORITY_USER;
|
||||
/** Current coprocessor state */
|
||||
Coprocessor.State state = Coprocessor.State.UNINSTALLED;
|
||||
/** Accounting for tables opened by the coprocessor */
|
||||
protected List<HTableInterface> openTables =
|
||||
Collections.synchronizedList(new ArrayList<HTableInterface>());
|
||||
private int seq;
|
||||
private Configuration conf;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param impl the coprocessor instance
|
||||
* @param priority chaining priority
|
||||
*/
|
||||
public Environment(final Coprocessor impl, Coprocessor.Priority priority, int seq) {
|
||||
public Environment(final Coprocessor impl, final int priority,
|
||||
final int seq, final Configuration conf) {
|
||||
this.impl = impl;
|
||||
this.priority = priority;
|
||||
this.state = Coprocessor.State.INSTALLED;
|
||||
this.seq = seq;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
/** Initialize the environment */
|
||||
|
@ -506,7 +535,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Coprocessor.Priority getPriority() {
|
||||
public int getPriority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
|
@ -527,6 +556,11 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
return VersionInfo.getVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a table from within the Coprocessor environment
|
||||
* @param tableName the table name
|
||||
|
|
|
@ -42,9 +42,10 @@ public class MasterCoprocessorHost
|
|||
implements MasterCoprocessorEnvironment {
|
||||
private MasterServices masterServices;
|
||||
|
||||
public MasterEnvironment(Class<?> implClass, Coprocessor impl,
|
||||
Coprocessor.Priority priority, int seq, MasterServices services) {
|
||||
super(impl, priority, seq);
|
||||
public MasterEnvironment(final Class<?> implClass, final Coprocessor impl,
|
||||
final int priority, final int seq, final Configuration conf,
|
||||
final MasterServices services) {
|
||||
super(impl, priority, seq, conf);
|
||||
this.masterServices = services;
|
||||
}
|
||||
|
||||
|
@ -57,14 +58,15 @@ public class MasterCoprocessorHost
|
|||
|
||||
MasterCoprocessorHost(final MasterServices services, final Configuration conf) {
|
||||
this.masterServices = services;
|
||||
|
||||
loadSystemCoprocessors(conf, MASTER_COPROCESSOR_CONF_KEY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterEnvironment createEnvironment(Class<?> implClass,
|
||||
Coprocessor instance, Coprocessor.Priority priority, int seq) {
|
||||
return new MasterEnvironment(implClass, instance, priority, seq, masterServices);
|
||||
public MasterEnvironment createEnvironment(final Class<?> implClass,
|
||||
final Coprocessor instance, final int priority, final int seq,
|
||||
final Configuration conf) {
|
||||
return new MasterEnvironment(implClass, instance, priority, seq, conf,
|
||||
masterServices);
|
||||
}
|
||||
|
||||
/* Implementation of hooks for invoking MasterObservers */
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
|
@ -66,10 +67,10 @@ public class RegionCoprocessorHost
|
|||
* @param impl the coprocessor instance
|
||||
* @param priority chaining priority
|
||||
*/
|
||||
public RegionEnvironment(final Coprocessor impl,
|
||||
final Coprocessor.Priority priority, final int seq, final HRegion region,
|
||||
public RegionEnvironment(final Coprocessor impl, final int priority,
|
||||
final int seq, final Configuration conf, final HRegion region,
|
||||
final RegionServerServices services) {
|
||||
super(impl, priority, seq);
|
||||
super(impl, priority, seq, conf);
|
||||
this.region = region;
|
||||
this.rsServices = services;
|
||||
}
|
||||
|
@ -91,7 +92,9 @@ public class RegionCoprocessorHost
|
|||
}
|
||||
}
|
||||
|
||||
static final Pattern attrSpecMatch = Pattern.compile("(.+):(.+):(.+)");
|
||||
static final Pattern attrSpecMatch1 = Pattern.compile("(.+)\\|(.+)\\|(.+)\\|(.+)");
|
||||
static final Pattern attrSpecMatch2 = Pattern.compile("(.+)\\|(.+)\\|(.+)");
|
||||
static final Pattern cfgSpecMatch = Pattern.compile("([^=]+)=([^,]+),?");
|
||||
|
||||
/** The region server services */
|
||||
RegionServerServices rsServices;
|
||||
|
@ -117,32 +120,51 @@ public class RegionCoprocessorHost
|
|||
loadTableCoprocessors();
|
||||
}
|
||||
|
||||
void loadTableCoprocessors () {
|
||||
void loadTableCoprocessors() {
|
||||
// scan the table attributes for coprocessor load specifications
|
||||
// initialize the coprocessors
|
||||
List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
|
||||
for (Map.Entry<ImmutableBytesWritable,ImmutableBytesWritable> e:
|
||||
region.getTableDesc().getValues().entrySet()) {
|
||||
String key = Bytes.toString(e.getKey().get());
|
||||
String spec = Bytes.toString(e.getValue().get());
|
||||
if (key.startsWith("COPROCESSOR")) {
|
||||
// found one
|
||||
try {
|
||||
String spec = Bytes.toString(e.getValue().get());
|
||||
Matcher matcher = attrSpecMatch.matcher(spec);
|
||||
Matcher matcher = attrSpecMatch1.matcher(spec);
|
||||
if (!matcher.matches()) {
|
||||
matcher = attrSpecMatch2.matcher(spec);
|
||||
}
|
||||
if (matcher.matches()) {
|
||||
Path path = new Path(matcher.group(1));
|
||||
String className = matcher.group(2);
|
||||
Coprocessor.Priority priority =
|
||||
Coprocessor.Priority.valueOf(matcher.group(3));
|
||||
configured.add(load(path, className, priority));
|
||||
int priority = Integer.valueOf(matcher.group(3));
|
||||
String cfgSpec = null;
|
||||
try {
|
||||
cfgSpec = matcher.group(4);
|
||||
} catch (IndexOutOfBoundsException ex) {
|
||||
// ignore
|
||||
}
|
||||
if (cfgSpec != null) {
|
||||
Configuration newConf = HBaseConfiguration.create();
|
||||
Matcher m = cfgSpecMatch.matcher(cfgSpec);
|
||||
while (m.find()) {
|
||||
newConf.set(m.group(1), m.group(2));
|
||||
}
|
||||
configured.add(load(path, className, priority, newConf));
|
||||
} else {
|
||||
configured.add(load(path, className, priority, conf));
|
||||
}
|
||||
LOG.info("Load coprocessor " + className + " from HTD of " +
|
||||
Bytes.toString(region.getTableDesc().getName()) +
|
||||
Bytes.toString(region.getTableDesc().getName()) +
|
||||
" successfully.");
|
||||
} else {
|
||||
LOG.warn("attribute '" + key + "' has invalid coprocessor spec");
|
||||
throw new RuntimeException("specification does not match pattern");
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.warn(StringUtils.stringifyException(ex));
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("attribute '" + key +
|
||||
"' has invalid coprocessor specification '" + spec + "'");
|
||||
LOG.warn(StringUtils.stringifyException(ex));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -151,8 +173,8 @@ public class RegionCoprocessorHost
|
|||
}
|
||||
|
||||
@Override
|
||||
public RegionEnvironment createEnvironment(
|
||||
Class<?> implClass, Coprocessor instance, Coprocessor.Priority priority, int seq) {
|
||||
public RegionEnvironment createEnvironment(Class<?> implClass,
|
||||
Coprocessor instance, int priority, int seq, Configuration conf) {
|
||||
// Check if it's an Endpoint.
|
||||
// Due to current dynamic protocol design, Endpoint
|
||||
// uses a different way to be registered and executed.
|
||||
|
@ -164,8 +186,8 @@ public class RegionCoprocessorHost
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return new RegionEnvironment(instance, priority, seq, region, rsServices);
|
||||
return new RegionEnvironment(instance, priority, seq, conf, region,
|
||||
rsServices);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.coprocessor.*;
|
||||
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
|
@ -53,11 +52,13 @@ public class WALCoprocessorHost
|
|||
* @param impl the coprocessor instance
|
||||
* @param priority chaining priority
|
||||
* @param seq load sequence
|
||||
* @param conf configuration
|
||||
* @param hlog HLog
|
||||
*/
|
||||
public WALEnvironment(Class<?> implClass, final Coprocessor impl,
|
||||
final Coprocessor.Priority priority, final int seq, final HLog hlog) {
|
||||
super(impl, priority, seq);
|
||||
final int priority, final int seq, final Configuration conf,
|
||||
final HLog hlog) {
|
||||
super(impl, priority, seq, conf);
|
||||
this.wal = hlog;
|
||||
}
|
||||
}
|
||||
|
@ -75,10 +76,11 @@ public class WALCoprocessorHost
|
|||
}
|
||||
|
||||
@Override
|
||||
public WALEnvironment createEnvironment(Class<?> implClass,
|
||||
Coprocessor instance, Priority priority, int seq) {
|
||||
// TODO Auto-generated method stub
|
||||
return new WALEnvironment(implClass, instance, priority, seq, this.wal);
|
||||
public WALEnvironment createEnvironment(final Class<?> implClass,
|
||||
final Coprocessor instance, final int priority, final int seq,
|
||||
final Configuration conf) {
|
||||
return new WALEnvironment(implClass, instance, priority, seq, conf,
|
||||
this.wal);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -52,19 +52,15 @@ public class TestClassLoading {
|
|||
private static Configuration conf;
|
||||
private static MiniDFSCluster cluster;
|
||||
|
||||
public static int BUFFER_SIZE = 4096;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
}
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
}
|
||||
static final int BUFFER_SIZE = 4096;
|
||||
static final String tableName = "TestClassLoading";
|
||||
static final String cpName1 = "TestCP1";
|
||||
static final String cpName2 = "TestCP2";
|
||||
static final String cpName3 = "TestCP3";
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
cluster = TEST_UTIL.getDFSCluster();
|
||||
}
|
||||
|
@ -113,25 +109,15 @@ public class TestClassLoading {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
// HBASE-3516: Test CP Class loading from HDFS
|
||||
public void testClassLoadingFromHDFS() throws Exception {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
String className = "TestCP";
|
||||
|
||||
private File buildCoprocessorJar(String className) throws Exception {
|
||||
// compose a java source file.
|
||||
String javaCode = "import org.apache.hadoop.hbase.coprocessor.*;" +
|
||||
"public class " + className + " extends BaseRegionObserver {}";
|
||||
|
||||
Path baseDire = TEST_UTIL.getTestDir();
|
||||
Path srcDire = new Path(TEST_UTIL.getTestDir(), "src");
|
||||
|
||||
File srcDirePath = new File(srcDire.toString());
|
||||
srcDirePath.mkdirs();
|
||||
|
||||
File sourceCodeFile = new File(srcDire.toString(),
|
||||
className + ".java");
|
||||
|
||||
"public class " + className + " extends BaseRegionObserver {}";
|
||||
Path baseDir = HBaseTestingUtility.getTestDir();
|
||||
Path srcDir = new Path(HBaseTestingUtility.getTestDir(), "src");
|
||||
File srcDirPath = new File(srcDir.toString());
|
||||
srcDirPath.mkdirs();
|
||||
File sourceCodeFile = new File(srcDir.toString(), className + ".java");
|
||||
BufferedWriter bw = new BufferedWriter(new FileWriter(sourceCodeFile));
|
||||
bw.write(javaCode);
|
||||
bw.close();
|
||||
|
@ -140,129 +126,107 @@ public class TestClassLoading {
|
|||
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
|
||||
ArrayList<String> srcFileNames = new ArrayList<String>();
|
||||
srcFileNames.add(sourceCodeFile.toString());
|
||||
|
||||
StandardJavaFileManager fm = compiler.getStandardFileManager(null, null,
|
||||
null);
|
||||
null);
|
||||
Iterable<? extends JavaFileObject> cu =
|
||||
fm.getJavaFileObjects(sourceCodeFile);
|
||||
|
||||
fm.getJavaFileObjects(sourceCodeFile);
|
||||
List<String> options = new ArrayList<String>();
|
||||
options.add("-classpath");
|
||||
|
||||
// only add hbase classes to classpath. This is a little bit tricky: assume
|
||||
// the classpath is {hbaseSrc}/target/classes.
|
||||
String currentDir = new File(".").getAbsolutePath();
|
||||
options.add(currentDir + Path.SEPARATOR + "target"+ Path.SEPARATOR +
|
||||
"classes");
|
||||
|
||||
JavaCompiler.CompilationTask task = compiler.getTask(
|
||||
null, fm, null, options, null, cu);
|
||||
|
||||
"classes");
|
||||
JavaCompiler.CompilationTask task = compiler.getTask(null, fm, null,
|
||||
options, null, cu);
|
||||
assertTrue("Compile file " + sourceCodeFile + " failed.", task.call());
|
||||
|
||||
// build a jar file by the classes files
|
||||
String jarFileName = className + ".jar";
|
||||
File jarFile = new File(baseDire.toString(), jarFileName);
|
||||
File jarFile = new File(baseDir.toString(), jarFileName);
|
||||
if (!createJarArchive(jarFile,
|
||||
new File[]{new File(srcDire.toString(), className + ".class")})){
|
||||
new File[]{new File(srcDir.toString(), className + ".class")})){
|
||||
assertTrue("Build jar file failed.", false);
|
||||
}
|
||||
|
||||
// copy the jar into dfs
|
||||
fs.copyFromLocalFile(new Path(jarFile.getPath()),
|
||||
new Path(fs.getUri().toString() + Path.SEPARATOR));
|
||||
String jarFileOnHDFS = fs.getUri().toString() + Path.SEPARATOR +
|
||||
jarFileName;
|
||||
return jarFile;
|
||||
}
|
||||
|
||||
@Test
|
||||
// HBASE-3516: Test CP Class loading from HDFS
|
||||
public void testClassLoadingFromHDFS() throws Exception {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
|
||||
File jarFile1 = buildCoprocessorJar(cpName1);
|
||||
File jarFile2 = buildCoprocessorJar(cpName2);
|
||||
|
||||
// copy the jars into dfs
|
||||
fs.copyFromLocalFile(new Path(jarFile1.getPath()),
|
||||
new Path(fs.getUri().toString() + Path.SEPARATOR));
|
||||
String jarFileOnHDFS1 = fs.getUri().toString() + Path.SEPARATOR +
|
||||
jarFile1.getName();
|
||||
assertTrue("Copy jar file to HDFS failed.",
|
||||
fs.exists(new Path(jarFileOnHDFS)));
|
||||
fs.exists(new Path(jarFileOnHDFS1)));
|
||||
LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS1);
|
||||
|
||||
LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS);
|
||||
|
||||
// create a table that references the jar
|
||||
HTableDescriptor htd = new HTableDescriptor(className);
|
||||
fs.copyFromLocalFile(new Path(jarFile2.getPath()),
|
||||
new Path(fs.getUri().toString() + Path.SEPARATOR));
|
||||
String jarFileOnHDFS2 = fs.getUri().toString() + Path.SEPARATOR +
|
||||
jarFile2.getName();
|
||||
assertTrue("Copy jar file to HDFS failed.",
|
||||
fs.exists(new Path(jarFileOnHDFS2)));
|
||||
LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS2);
|
||||
|
||||
// create a table that references the coprocessors
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
htd.addFamily(new HColumnDescriptor("test"));
|
||||
htd.setValue("COPROCESSOR$1",
|
||||
jarFileOnHDFS.toString() +
|
||||
":" + className + ":" + Coprocessor.Priority.USER);
|
||||
// without configuration values
|
||||
htd.setValue("COPROCESSOR$1", jarFileOnHDFS1.toString() + "|" + cpName1 +
|
||||
"|" + Coprocessor.PRIORITY_USER);
|
||||
// with configuration values
|
||||
htd.setValue("COPROCESSOR$2", jarFileOnHDFS2.toString() + "|" + cpName2 +
|
||||
"|" + Coprocessor.PRIORITY_USER + "|k1=v1,k2=v2,k3=v3");
|
||||
HBaseAdmin admin = new HBaseAdmin(this.conf);
|
||||
admin.createTable(htd);
|
||||
|
||||
// verify that the coprocessor was loaded
|
||||
boolean found = false;
|
||||
// verify that the coprocessors were loaded
|
||||
boolean found1 = false, found2 = false, found2_k1 = false, found2_k2 = false,
|
||||
found2_k3 = false;
|
||||
MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
|
||||
for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
|
||||
if (region.getRegionNameAsString().startsWith(className)) {
|
||||
Coprocessor c = region.getCoprocessorHost().findCoprocessor(className);
|
||||
found = (c != null);
|
||||
if (region.getRegionNameAsString().startsWith(tableName)) {
|
||||
CoprocessorEnvironment env;
|
||||
env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1);
|
||||
if (env != null) {
|
||||
found1 = true;
|
||||
}
|
||||
env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName2);
|
||||
if (env != null) {
|
||||
found2 = true;
|
||||
Configuration conf = env.getConfiguration();
|
||||
found2_k1 = conf.get("k1") != null;
|
||||
found2_k2 = conf.get("k2") != null;
|
||||
found2_k3 = conf.get("k3") != null;
|
||||
}
|
||||
}
|
||||
}
|
||||
assertTrue("Class " + className + " cannot be loaded.", found);
|
||||
assertTrue("Class " + cpName1 + " was missing on a region", found1);
|
||||
assertTrue("Class " + cpName2 + " was missing on a region", found2);
|
||||
assertTrue("Configuration key 'k1' was missing on a region", found2_k1);
|
||||
assertTrue("Configuration key 'k2' was missing on a region", found2_k2);
|
||||
assertTrue("Configuration key 'k3' was missing on a region", found2_k3);
|
||||
}
|
||||
|
||||
@Test
|
||||
// HBASE-3516: Test CP Class loading from local file system
|
||||
public void testClassLoadingFromLocalFS() throws Exception {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
String className = "TestCP2";
|
||||
|
||||
// compose a java source file.
|
||||
String javaCode = "import org.apache.hadoop.hbase.coprocessor.*;" +
|
||||
"public class " + className + " extends BaseRegionObserver {}";
|
||||
|
||||
Path baseDire = TEST_UTIL.getTestDir();
|
||||
Path srcDire = new Path(TEST_UTIL.getTestDir(), "src");
|
||||
|
||||
File srcDirePath = new File(srcDire.toString());
|
||||
srcDirePath.mkdirs();
|
||||
|
||||
File sourceCodeFile = new File(srcDire.toString(),
|
||||
className + ".java");
|
||||
|
||||
BufferedWriter bw = new BufferedWriter(new FileWriter(sourceCodeFile));
|
||||
bw.write(javaCode);
|
||||
bw.close();
|
||||
|
||||
// compile it by JavaCompiler
|
||||
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
|
||||
ArrayList<String> srcFileNames = new ArrayList<String>();
|
||||
srcFileNames.add(sourceCodeFile.toString());
|
||||
|
||||
StandardJavaFileManager fm = compiler.getStandardFileManager(null, null,
|
||||
null);
|
||||
Iterable<? extends JavaFileObject> cu =
|
||||
fm.getJavaFileObjects(sourceCodeFile);
|
||||
|
||||
List<String> options = new ArrayList<String>();
|
||||
options.add("-classpath");
|
||||
|
||||
// only add hbase classes to classpath. This is a little bit tricky: assume
|
||||
// the classpath is {hbaseSrc}/target/classes.
|
||||
String currentDir = new File(".").getAbsolutePath();
|
||||
options.add(currentDir + Path.SEPARATOR + "target"+ Path.SEPARATOR +
|
||||
"classes");
|
||||
|
||||
JavaCompiler.CompilationTask task = compiler.getTask(
|
||||
null, fm, null, options, null, cu);
|
||||
|
||||
assertTrue("Compile file " + sourceCodeFile + " failed.", task.call());
|
||||
|
||||
// build a jar file by the classes files
|
||||
String jarFileName = className + ".jar";
|
||||
File jarFile = new File(baseDire.toString(), jarFileName);
|
||||
if (!createJarArchive(jarFile,
|
||||
new File[]{new File(srcDire.toString(), className + ".class")})){
|
||||
assertTrue("Build jar file failed.", false);
|
||||
}
|
||||
File jarFile = buildCoprocessorJar(cpName3);
|
||||
|
||||
// create a table that references the jar
|
||||
HTableDescriptor htd = new HTableDescriptor(className);
|
||||
|
||||
HTableDescriptor htd = new HTableDescriptor(cpName3);
|
||||
htd.addFamily(new HColumnDescriptor("test"));
|
||||
htd.setValue("COPROCESSOR$1",
|
||||
jarFile.toString() +
|
||||
":" + className + ":" + Coprocessor.Priority.USER);
|
||||
htd.setValue("COPROCESSOR$1", jarFile.toString() + "|" + cpName3 + "|" +
|
||||
Coprocessor.PRIORITY_USER);
|
||||
HBaseAdmin admin = new HBaseAdmin(this.conf);
|
||||
admin.createTable(htd);
|
||||
|
||||
|
@ -270,11 +234,10 @@ public class TestClassLoading {
|
|||
boolean found = false;
|
||||
MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
|
||||
for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
|
||||
if (region.getRegionNameAsString().startsWith(className)) {
|
||||
Coprocessor c = region.getCoprocessorHost().findCoprocessor(className);
|
||||
found = (c != null);
|
||||
if (region.getRegionNameAsString().startsWith(cpName3)) {
|
||||
found = (region.getCoprocessorHost().findCoprocessor(cpName3) != null);
|
||||
}
|
||||
}
|
||||
assertTrue("Class " + className + " cannot be loaded.", found);
|
||||
assertTrue("Class " + cpName3 + " was missing on a region", found);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HBaseTestCase;
|
|||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.SplitTransaction;
|
||||
|
@ -193,7 +192,7 @@ public class TestCoprocessorInterface extends HBaseTestCase {
|
|||
RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
|
||||
r.setCoprocessorHost(host);
|
||||
|
||||
host.load(implClass, Priority.USER);
|
||||
host.load(implClass, Coprocessor.PRIORITY_USER, conf);
|
||||
// we need to manually call pre- and postOpen here since the
|
||||
// above load() is not the real case for CP loading. A CP is
|
||||
// expected to be loaded by default from 1) configuration; or 2)
|
||||
|
@ -220,7 +219,7 @@ public class TestCoprocessorInterface extends HBaseTestCase {
|
|||
RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
|
||||
r.setCoprocessorHost(host);
|
||||
|
||||
host.load(implClass, Priority.USER);
|
||||
host.load(implClass, Coprocessor.PRIORITY_USER, conf);
|
||||
|
||||
Coprocessor c = host.findCoprocessor(implClass.getName());
|
||||
assertNotNull(c);
|
||||
|
|
|
@ -27,16 +27,11 @@ import java.util.Arrays;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
|
||||
|
@ -44,8 +39,6 @@ import org.junit.AfterClass;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.el.MethodNotFoundException;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestRegionObserverInterface {
|
||||
|
@ -58,9 +51,6 @@ public class TestRegionObserverInterface {
|
|||
public final static byte[] C = Bytes.toBytes("c");
|
||||
public final static byte[] ROW = Bytes.toBytes("testrow");
|
||||
|
||||
private static final int ROWSIZE = 20;
|
||||
private static byte [][] ROWS = makeN(ROW, ROWSIZE);
|
||||
|
||||
private static HBaseTestingUtility util = new HBaseTestingUtility();
|
||||
private static MiniHBaseCluster cluster = null;
|
||||
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -109,12 +108,13 @@ public class TestRegionObserverStacking extends TestCase {
|
|||
byte[] A = Bytes.toBytes("A");
|
||||
byte[][] FAMILIES = new byte[][] { A } ;
|
||||
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
HRegion region = initHRegion(TABLE, getClass().getName(),
|
||||
HBaseConfiguration.create(), FAMILIES);
|
||||
conf, FAMILIES);
|
||||
RegionCoprocessorHost h = region.getCoprocessorHost();
|
||||
h.load(ObserverA.class, Priority.HIGHEST);
|
||||
h.load(ObserverB.class, Priority.USER);
|
||||
h.load(ObserverC.class, Priority.LOWEST);
|
||||
h.load(ObserverA.class, Coprocessor.PRIORITY_HIGHEST, conf);
|
||||
h.load(ObserverB.class, Coprocessor.PRIORITY_USER, conf);
|
||||
h.load(ObserverC.class, Coprocessor.PRIORITY_LOWEST, conf);
|
||||
|
||||
Put put = new Put(ROW);
|
||||
put.add(A, A, A);
|
||||
|
@ -133,4 +133,3 @@ public class TestRegionObserverStacking extends TestCase {
|
|||
assertTrue(idB < idC);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue