HBASE-7205 Coprocessor classloader is replicated for all regions in the HRegionServer (Ted Yu and Adrian Muraru)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1420480 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b017ee8e9a
commit
0aec9dd711
|
@ -65,9 +65,13 @@ public class CoprocessorClassLoader extends URLClassLoader {
|
|||
"org.w3c",
|
||||
"org.xml",
|
||||
"sunw.",
|
||||
// Hadoop/HBase:
|
||||
"org.apache.hadoop",
|
||||
// logging
|
||||
"org.apache.commons.logging",
|
||||
"org.apache.log4j",
|
||||
"com.hadoop",
|
||||
// Hadoop/HBase/ZK:
|
||||
"org.apache.hadoop",
|
||||
"org.apache.zookeeper",
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -80,7 +84,12 @@ public class CoprocessorClassLoader extends URLClassLoader {
|
|||
new Pattern[] {
|
||||
Pattern.compile("^[^-]+-default\\.xml$")
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Parent classloader used to load any class not matching the exemption list.
|
||||
*/
|
||||
private final ClassLoader parent;
|
||||
|
||||
/**
|
||||
* Creates a CoprocessorClassLoader that loads classes from the given paths.
|
||||
* @param paths paths from which to load classes.
|
||||
|
@ -88,8 +97,12 @@ public class CoprocessorClassLoader extends URLClassLoader {
|
|||
*/
|
||||
public CoprocessorClassLoader(List<URL> paths, ClassLoader parent) {
|
||||
super(paths.toArray(new URL[]{}), parent);
|
||||
this.parent = parent;
|
||||
if (parent == null) {
|
||||
throw new IllegalArgumentException("No parent classloader!");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
synchronized public Class<?> loadClass(String name)
|
||||
throws ClassNotFoundException {
|
||||
|
@ -99,9 +112,9 @@ public class CoprocessorClassLoader extends URLClassLoader {
|
|||
LOG.debug("Skipping exempt class " + name +
|
||||
" - delegating directly to parent");
|
||||
}
|
||||
return super.loadClass(name);
|
||||
return parent.loadClass(name);
|
||||
}
|
||||
|
||||
|
||||
// Check whether the class has already been loaded:
|
||||
Class<?> clasz = findLoadedClass(name);
|
||||
if (clasz != null) {
|
||||
|
@ -123,7 +136,7 @@ public class CoprocessorClassLoader extends URLClassLoader {
|
|||
LOG.debug("Class " + name + " not found - delegating to parent");
|
||||
}
|
||||
try {
|
||||
clasz = super.loadClass(name);
|
||||
clasz = parent.loadClass(name);
|
||||
} catch (ClassNotFoundException e2) {
|
||||
// Class not found in this ClassLoader or in the parent ClassLoader
|
||||
// Log some debug output before rethrowing ClassNotFoundException
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import com.google.common.collect.MapMaker;
|
||||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -48,6 +49,7 @@ import java.io.FileOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.jar.JarEntry;
|
||||
import java.util.jar.JarFile;
|
||||
|
||||
|
@ -78,6 +80,15 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
protected String pathPrefix;
|
||||
protected volatile int loadSequence;
|
||||
|
||||
/*
|
||||
* External classloaders cache keyed by external jar path.
|
||||
* ClassLoader instance is stored as a weak-reference
|
||||
* to allow GC'ing when no CoprocessorHost is using it
|
||||
* (@see HBASE-7205)
|
||||
*/
|
||||
static ConcurrentMap<Path, ClassLoader> classLoadersCache =
|
||||
new MapMaker().concurrencyLevel(3).weakValues().makeMap();
|
||||
|
||||
public CoprocessorHost() {
|
||||
pathPrefix = UUID.randomUUID().toString();
|
||||
}
|
||||
|
@ -165,14 +176,27 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
LOG.debug("Loading coprocessor class " + className + " with path " +
|
||||
path + " and priority " + priority);
|
||||
|
||||
// Have we already loaded the class, perhaps from an earlier region open
|
||||
// for the same table?
|
||||
try {
|
||||
implClass = getClass().getClassLoader().loadClass(className);
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.info("Class " + className + " needs to be loaded from a file - " +
|
||||
path + ".");
|
||||
// go ahead to load from file system.
|
||||
ClassLoader cl = null;
|
||||
if (path == null) {
|
||||
try {
|
||||
implClass = getClass().getClassLoader().loadClass(className);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new IOException("No jar path specified for " + className);
|
||||
}
|
||||
} else {
|
||||
// Have we already loaded the class, perhaps from an earlier region open
|
||||
// for the same table?
|
||||
cl = classLoadersCache.get(path);
|
||||
if (cl != null){
|
||||
LOG.debug("Found classloader "+ cl + "for "+path.toString());
|
||||
try {
|
||||
implClass = cl.loadClass(className);
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.info("Class " + className + " needs to be loaded from a file - " +
|
||||
path + ".");
|
||||
// go ahead to load from file system.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If not, load
|
||||
|
@ -204,7 +228,8 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
// unsurprisingly wants URLs, not URIs; so we will use the deprecated
|
||||
// method which returns URLs for as long as it is available
|
||||
List<URL> paths = new ArrayList<URL>();
|
||||
paths.add(new File(dst.toString()).getCanonicalFile().toURL());
|
||||
URL url = new File(dst.toString()).getCanonicalFile().toURL();
|
||||
paths.add(url);
|
||||
|
||||
JarFile jarFile = new JarFile(dst.toString());
|
||||
Enumeration<JarEntry> entries = jarFile.entries();
|
||||
|
@ -221,17 +246,33 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
}
|
||||
jarFile.close();
|
||||
|
||||
ClassLoader cl = new CoprocessorClassLoader(paths,
|
||||
this.getClass().getClassLoader());
|
||||
Thread.currentThread().setContextClassLoader(cl);
|
||||
cl = new CoprocessorClassLoader(paths, this.getClass().getClassLoader());
|
||||
// cache cp classloader as a weak value, will be GC'ed when no reference left
|
||||
ClassLoader prev = classLoadersCache.putIfAbsent(path, cl);
|
||||
if (prev != null) {
|
||||
//lost update race, use already added classloader
|
||||
cl = prev;
|
||||
}
|
||||
|
||||
try {
|
||||
implClass = cl.loadClass(className);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new IOException(e);
|
||||
throw new IOException("Cannot load external coprocessor class " + className, e);
|
||||
}
|
||||
}
|
||||
|
||||
return loadInstance(implClass, priority, conf);
|
||||
//load custom code for coprocessor
|
||||
Thread currentThread = Thread.currentThread();
|
||||
ClassLoader hostClassLoader = currentThread.getContextClassLoader();
|
||||
try{
|
||||
// switch temporarily to the thread classloader for custom CP
|
||||
currentThread.setContextClassLoader(cl);
|
||||
E cpInstance = loadInstance(implClass, priority, conf);
|
||||
return cpInstance;
|
||||
} finally {
|
||||
// restore the fresh (host) classloader
|
||||
currentThread.setContextClassLoader(hostClassLoader);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -312,6 +353,24 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external
|
||||
* jar files.
|
||||
* @return A set of ClassLoader instances
|
||||
*/
|
||||
Set<ClassLoader> getExternalClassLoaders() {
|
||||
Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>();
|
||||
final ClassLoader systemClassLoader = this.getClass().getClassLoader();
|
||||
for (E env : coprocessors) {
|
||||
ClassLoader cl = env.getInstance().getClass().getClassLoader();
|
||||
if (cl != systemClassLoader ){
|
||||
//do not include system classloader
|
||||
externalClassLoaders.add(cl);
|
||||
}
|
||||
}
|
||||
return externalClassLoaders;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a coprocessor environment by class name
|
||||
* @param className the class name
|
||||
|
|
|
@ -61,6 +61,7 @@ public class TestClassLoading {
|
|||
static final String cpName4 = "TestCP4";
|
||||
static final String cpName5 = "TestCP5";
|
||||
static final String cpName6 = "TestCP6";
|
||||
static final String cpNameInvalid = "TestCPInvalid";
|
||||
|
||||
private static Class<?> regionCoprocessor1 = ColumnAggregationEndpoint.class;
|
||||
private static Class<?> regionCoprocessor2 = GenericEndpoint.class;
|
||||
|
@ -202,16 +203,18 @@ public class TestClassLoading {
|
|||
new Path(fs.getUri().toString() + Path.SEPARATOR));
|
||||
String jarFileOnHDFS1 = fs.getUri().toString() + Path.SEPARATOR +
|
||||
jarFile1.getName();
|
||||
Path pathOnHDFS1 = new Path(jarFileOnHDFS1);
|
||||
assertTrue("Copy jar file to HDFS failed.",
|
||||
fs.exists(new Path(jarFileOnHDFS1)));
|
||||
fs.exists(pathOnHDFS1));
|
||||
LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS1);
|
||||
|
||||
fs.copyFromLocalFile(new Path(jarFile2.getPath()),
|
||||
new Path(fs.getUri().toString() + Path.SEPARATOR));
|
||||
String jarFileOnHDFS2 = fs.getUri().toString() + Path.SEPARATOR +
|
||||
jarFile2.getName();
|
||||
Path pathOnHDFS2 = new Path(jarFileOnHDFS2);
|
||||
assertTrue("Copy jar file to HDFS failed.",
|
||||
fs.exists(new Path(jarFileOnHDFS2)));
|
||||
fs.exists(pathOnHDFS2));
|
||||
LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS2);
|
||||
|
||||
// create a table that references the coprocessors
|
||||
|
@ -223,6 +226,9 @@ public class TestClassLoading {
|
|||
// with configuration values
|
||||
htd.setValue("COPROCESSOR$2", jarFileOnHDFS2.toString() + "|" + cpName2 +
|
||||
"|" + Coprocessor.PRIORITY_USER + "|k1=v1,k2=v2,k3=v3");
|
||||
// same jar but invalid class name (should fail to load this class)
|
||||
htd.setValue("COPROCESSOR$3", jarFileOnHDFS2.toString() + "|" + cpNameInvalid +
|
||||
"|" + Coprocessor.PRIORITY_USER);
|
||||
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
|
||||
if (admin.tableExists(tableName)) {
|
||||
if (admin.isTableEnabled(tableName)) {
|
||||
|
@ -230,36 +236,70 @@ public class TestClassLoading {
|
|||
}
|
||||
admin.deleteTable(tableName);
|
||||
}
|
||||
admin.createTable(htd);
|
||||
CoprocessorHost.classLoadersCache.clear();
|
||||
byte[] startKey = {10, 63};
|
||||
byte[] endKey = {12, 43};
|
||||
admin.createTable(htd, startKey, endKey, 4);
|
||||
waitForTable(htd.getName());
|
||||
|
||||
// verify that the coprocessors were loaded
|
||||
boolean found1 = false, found2 = false, found2_k1 = false,
|
||||
found2_k2 = false, found2_k3 = false;
|
||||
boolean foundTableRegion=false;
|
||||
boolean found_invalid = true, found1 = true, found2 = true, found2_k1 = true,
|
||||
found2_k2 = true, found2_k3 = true;
|
||||
Map<HRegion, Set<ClassLoader>> regionsActiveClassLoaders =
|
||||
new HashMap<HRegion, Set<ClassLoader>>();
|
||||
MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
|
||||
for (HRegion region:
|
||||
hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
|
||||
if (region.getRegionNameAsString().startsWith(tableName)) {
|
||||
foundTableRegion = true;
|
||||
CoprocessorEnvironment env;
|
||||
env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1);
|
||||
if (env != null) {
|
||||
found1 = true;
|
||||
}
|
||||
found1 = found1 && (env != null);
|
||||
env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName2);
|
||||
found2 = found2 && (env != null);
|
||||
if (env != null) {
|
||||
found2 = true;
|
||||
Configuration conf = env.getConfiguration();
|
||||
found2_k1 = conf.get("k1") != null;
|
||||
found2_k2 = conf.get("k2") != null;
|
||||
found2_k3 = conf.get("k3") != null;
|
||||
found2_k1 = found2_k1 && (conf.get("k1") != null);
|
||||
found2_k2 = found2_k2 && (conf.get("k2") != null);
|
||||
found2_k3 = found2_k3 && (conf.get("k3") != null);
|
||||
} else {
|
||||
found2_k1 = found2_k2 = found2_k3 = false;
|
||||
}
|
||||
env = region.getCoprocessorHost().findCoprocessorEnvironment(cpNameInvalid);
|
||||
found_invalid = found_invalid && (env != null);
|
||||
|
||||
regionsActiveClassLoaders
|
||||
.put(region, ((CoprocessorHost) region.getCoprocessorHost()).getExternalClassLoaders());
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue("No region was found for table " + tableName, foundTableRegion);
|
||||
assertTrue("Class " + cpName1 + " was missing on a region", found1);
|
||||
assertTrue("Class " + cpName2 + " was missing on a region", found2);
|
||||
//an invalid CP class name is defined for this table, validate that it is not loaded
|
||||
assertFalse("Class " + cpNameInvalid + " was found on a region", found_invalid);
|
||||
assertTrue("Configuration key 'k1' was missing on a region", found2_k1);
|
||||
assertTrue("Configuration key 'k2' was missing on a region", found2_k2);
|
||||
assertTrue("Configuration key 'k3' was missing on a region", found2_k3);
|
||||
// check if CP classloaders are cached
|
||||
assertTrue(jarFileOnHDFS1 + " was not cached",
|
||||
CoprocessorHost.classLoadersCache.containsKey(pathOnHDFS1));
|
||||
assertTrue(jarFileOnHDFS2 + " was not cached",
|
||||
CoprocessorHost.classLoadersCache.containsKey(pathOnHDFS2));
|
||||
//two external jar used, should be one classloader per jar
|
||||
assertEquals("The number of cached classloaders should be equal to the number" +
|
||||
" of external jar files",
|
||||
2, CoprocessorHost.classLoadersCache.size());
|
||||
//check if region active classloaders are shared across all RS regions
|
||||
Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>(
|
||||
CoprocessorHost.classLoadersCache.values());
|
||||
for (Map.Entry<HRegion, Set<ClassLoader>> regionCP : regionsActiveClassLoaders.entrySet()) {
|
||||
assertTrue("Some CP classloaders for region " + regionCP.getKey() + " are not cached."
|
||||
+ " ClassLoader Cache:" + externalClassLoaders
|
||||
+ " Region ClassLoaders:" + regionCP.getValue(),
|
||||
externalClassLoaders.containsAll(regionCP.getValue()));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue