HBASE-5748 Enable lib directory in jar file for coprocessor
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1311498 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
56d1587b7e
commit
1283155ddc
|
@ -39,12 +39,16 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
|
import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
|
||||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.net.URLClassLoader;
|
import java.net.URLClassLoader;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.jar.JarEntry;
|
||||||
|
import java.util.jar.JarFile;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides the common setup framework and runtime services for coprocessor
|
* Provides the common setup framework and runtime services for coprocessor
|
||||||
|
@ -195,6 +199,21 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
||||||
// method which returns URLs for as long as it is available
|
// method which returns URLs for as long as it is available
|
||||||
List<URL> paths = new ArrayList<URL>();
|
List<URL> paths = new ArrayList<URL>();
|
||||||
paths.add(new File(dst.toString()).getCanonicalFile().toURL());
|
paths.add(new File(dst.toString()).getCanonicalFile().toURL());
|
||||||
|
|
||||||
|
JarFile jarFile = new JarFile(dst.toString());
|
||||||
|
Enumeration<JarEntry> entries = jarFile.entries();
|
||||||
|
while (entries.hasMoreElements()) {
|
||||||
|
JarEntry entry = entries.nextElement();
|
||||||
|
if (entry.getName().matches("/lib/[^/]+\\.jar")) {
|
||||||
|
File file = new File(System.getProperty("java.io.tmpdir") +
|
||||||
|
java.io.File.separator +"." + pathPrefix +
|
||||||
|
"." + className + "." + System.currentTimeMillis() + "." + entry.getName().substring(5));
|
||||||
|
IOUtils.copyBytes(jarFile.getInputStream(entry), new FileOutputStream(file), conf, true);
|
||||||
|
file.deleteOnExit();
|
||||||
|
paths.add(file.toURL());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
StringTokenizer st = new StringTokenizer(cp, File.pathSeparator);
|
StringTokenizer st = new StringTokenizer(cp, File.pathSeparator);
|
||||||
while (st.hasMoreTokens()) {
|
while (st.hasMoreTokens()) {
|
||||||
paths.add((new File(st.nextToken())).getCanonicalFile().toURL());
|
paths.add((new File(st.nextToken())).getCanonicalFile().toURL());
|
||||||
|
|
|
@ -382,6 +382,93 @@ public class TestClassLoading {
|
||||||
assertFalse("Configuration key 'k4' wasn't configured", found5_k4);
|
assertFalse("Configuration key 'k4' wasn't configured", found5_k4);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClassLoadingFromLibDirInJar() throws Exception {
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
|
||||||
|
File innerJarFile1 = buildCoprocessorJar(cpName1);
|
||||||
|
File innerJarFile2 = buildCoprocessorJar(cpName2);
|
||||||
|
File outerJarFile = new File(TEST_UTIL.getDataTestDir().toString(), "outer.jar");
|
||||||
|
|
||||||
|
byte buffer[] = new byte[BUFFER_SIZE];
|
||||||
|
// Open archive file
|
||||||
|
FileOutputStream stream = new FileOutputStream(outerJarFile);
|
||||||
|
JarOutputStream out = new JarOutputStream(stream, new Manifest());
|
||||||
|
|
||||||
|
for (File jarFile: new File[] { innerJarFile1, innerJarFile2 }) {
|
||||||
|
// Add archive entry
|
||||||
|
JarEntry jarAdd = new JarEntry("/lib/" + jarFile.getName());
|
||||||
|
jarAdd.setTime(jarFile.lastModified());
|
||||||
|
out.putNextEntry(jarAdd);
|
||||||
|
|
||||||
|
// Write file to archive
|
||||||
|
FileInputStream in = new FileInputStream(jarFile);
|
||||||
|
while (true) {
|
||||||
|
int nRead = in.read(buffer, 0, buffer.length);
|
||||||
|
if (nRead <= 0)
|
||||||
|
break;
|
||||||
|
out.write(buffer, 0, nRead);
|
||||||
|
}
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
out.close();
|
||||||
|
stream.close();
|
||||||
|
LOG.info("Adding jar file to outer jar file completed");
|
||||||
|
|
||||||
|
// copy the jars into dfs
|
||||||
|
fs.copyFromLocalFile(new Path(outerJarFile.getPath()),
|
||||||
|
new Path(fs.getUri().toString() + Path.SEPARATOR));
|
||||||
|
String jarFileOnHDFS = fs.getUri().toString() + Path.SEPARATOR +
|
||||||
|
outerJarFile.getName();
|
||||||
|
assertTrue("Copy jar file to HDFS failed.",
|
||||||
|
fs.exists(new Path(jarFileOnHDFS)));
|
||||||
|
LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS);
|
||||||
|
|
||||||
|
// create a table that references the coprocessors
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||||
|
htd.addFamily(new HColumnDescriptor("test"));
|
||||||
|
// without configuration values
|
||||||
|
htd.setValue("COPROCESSOR$1", jarFileOnHDFS.toString() + "|" + cpName1 +
|
||||||
|
"|" + Coprocessor.PRIORITY_USER);
|
||||||
|
// with configuration values
|
||||||
|
htd.setValue("COPROCESSOR$2", jarFileOnHDFS.toString() + "|" + cpName2 +
|
||||||
|
"|" + Coprocessor.PRIORITY_USER + "|k1=v1,k2=v2,k3=v3");
|
||||||
|
HBaseAdmin admin = new HBaseAdmin(this.conf);
|
||||||
|
if (admin.tableExists(tableName)) {
|
||||||
|
admin.disableTable(tableName);
|
||||||
|
admin.deleteTable(tableName);
|
||||||
|
}
|
||||||
|
admin.createTable(htd);
|
||||||
|
|
||||||
|
// verify that the coprocessors were loaded
|
||||||
|
boolean found1 = false, found2 = false, found2_k1 = false,
|
||||||
|
found2_k2 = false, found2_k3 = false;
|
||||||
|
MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
|
||||||
|
for (HRegion region:
|
||||||
|
hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
|
||||||
|
if (region.getRegionNameAsString().startsWith(tableName)) {
|
||||||
|
CoprocessorEnvironment env;
|
||||||
|
env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1);
|
||||||
|
if (env != null) {
|
||||||
|
found1 = true;
|
||||||
|
}
|
||||||
|
env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName2);
|
||||||
|
if (env != null) {
|
||||||
|
found2 = true;
|
||||||
|
Configuration conf = env.getConfiguration();
|
||||||
|
found2_k1 = conf.get("k1") != null;
|
||||||
|
found2_k2 = conf.get("k2") != null;
|
||||||
|
found2_k3 = conf.get("k3") != null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("Class " + cpName1 + " was missing on a region", found1);
|
||||||
|
assertTrue("Class " + cpName2 + " was missing on a region", found2);
|
||||||
|
assertTrue("Configuration key 'k1' was missing on a region", found2_k1);
|
||||||
|
assertTrue("Configuration key 'k2' was missing on a region", found2_k2);
|
||||||
|
assertTrue("Configuration key 'k3' was missing on a region", found2_k3);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRegionServerCoprocessorsReported() throws Exception {
|
public void testRegionServerCoprocessorsReported() throws Exception {
|
||||||
// HBASE 4070: Improve region server metrics to report loaded coprocessors
|
// HBASE 4070: Improve region server metrics to report loaded coprocessors
|
||||||
|
|
Loading…
Reference in New Issue