HBASE-26714 Introduce path configuration for system coprocessors (#4069)

Signed-off-by: Ankit Singhal <ankit@apache.org>
Signed-off-by: Wellington Ramos Chevreuil <wchevreuil@apache.org>
This commit is contained in:
Tak Lon (Stephen) Wu 2022-02-03 20:19:38 -08:00 committed by GitHub
parent 3da23c22c5
commit e848d3b9d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 135 additions and 7 deletions

View File

@ -213,7 +213,7 @@ public final class ClassLoaderTestHelper {
LOG.info("Adding jar file to outer jar file completed");
}
static String localDirPath(Configuration conf) {
public static String localDirPath(Configuration conf) {
return conf.get(ClassLoaderBase.LOCAL_DIR_KEY)
+ File.separator + "jars" + File.separator;
}

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
import org.apache.hadoop.hbase.util.SortedList;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
/**
* Provides the common setup framework and runtime services for coprocessor
@ -144,14 +145,22 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
int currentSystemPriority = Coprocessor.PRIORITY_SYSTEM;
for (String className : defaultCPClasses) {
String[] classNameAndPriority = className.split("\\|");
// After HBASE-23710 and HBASE-26714 when configuring for system coprocessor, we accept
// an optional format of className|priority|path
String[] classNameToken = className.split("\\|");
boolean hasPriorityOverride = false;
className = classNameAndPriority[0];
boolean hasPath = false;
className = classNameToken[0];
int overridePriority = Coprocessor.PRIORITY_SYSTEM;
if (classNameAndPriority.length > 1){
overridePriority = Integer.parseInt(classNameAndPriority[1]);
Path path = null;
if (classNameToken.length > 1 && !Strings.isNullOrEmpty(classNameToken[1])) {
overridePriority = Integer.parseInt(classNameToken[1]);
hasPriorityOverride = true;
}
if (classNameToken.length > 2 && !Strings.isNullOrEmpty(classNameToken[2])) {
path = new Path(classNameToken[2].trim());
hasPath = true;
}
className = className.trim();
if (findCoprocessor(className) != null) {
// If already loaded will just continue
@ -159,8 +168,13 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
continue;
}
ClassLoader cl = this.getClass().getClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
// override the class loader if a path for the system coprocessor is provided.
if (hasPath) {
cl = CoprocessorClassLoader.getClassLoader(path, this.getClass().getClassLoader(),
pathPrefix, conf);
}
Thread.currentThread().setContextClassLoader(cl);
implClass = cl.loadClass(className);
int coprocPriority = hasPriorityOverride ? overridePriority : currentSystemPriority;
// Add coprocessors as we go to guard against case where a coprocessor is specified twice

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import org.apache.hadoop.conf.Configuration;
@ -27,8 +28,10 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ClassLoaderTestHelper;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
@ -41,6 +44,8 @@ public class TestCoprocessorHost {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCoprocessorHost.class);
private static final HBaseCommonTestingUtil TEST_UTIL = new HBaseCommonTestingUtil();
/**
* An {@link Abortable} implementation for tests.
*/
@ -50,7 +55,7 @@ public class TestCoprocessorHost {
@Override
public void abort(String why, Throwable e) {
this.aborted = true;
Assert.fail();
Assert.fail(e.getMessage());
}
@Override
@ -97,6 +102,108 @@ public class TestCoprocessorHost {
assertEquals(overridePriority, simpleEnv_v3.getPriority());
}
@Test
public void testLoadSystemCoprocessorWithPath() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
final String key = "KEY";
final String testClassName = "TestSystemCoprocessor";
final String testClassNameWithPriorityAndPath = testClassName + "PriorityAndPath";
File jarFile = buildCoprocessorJar(testClassName);
File jarFileWithPriorityAndPath = buildCoprocessorJar(testClassNameWithPriorityAndPath);
try {
CoprocessorHost<RegionCoprocessor, CoprocessorEnvironment<RegionCoprocessor>> host;
host = new CoprocessorHostForTest<>(conf);
// make a string of coprocessor with only priority
int overridePriority = Integer.MAX_VALUE - 1;
final String coprocessorWithPriority =
SimpleRegionObserverV3.class.getName() + "|" + overridePriority;
// make a string of coprocessor with path but no priority
final String coprocessorWithPath =
String.format("%s|%s|%s", testClassName, "", jarFile.getAbsolutePath());
// make a string of coprocessor with priority and path
final String coprocessorWithPriorityAndPath = String
.format("%s|%s|%s", testClassNameWithPriorityAndPath, (overridePriority - 1),
jarFileWithPriorityAndPath.getAbsolutePath());
// Try and load a system coprocessors
conf.setStrings(key, SimpleRegionObserverV2.class.getName(), coprocessorWithPriority,
coprocessorWithPath, coprocessorWithPriorityAndPath);
host.loadSystemCoprocessors(conf, key);
// first loaded system coprocessor with default priority
CoprocessorEnvironment<?> simpleEnv =
host.findCoprocessorEnvironment(SimpleRegionObserverV2.class.getName());
assertNotNull(simpleEnv);
assertEquals(Coprocessor.PRIORITY_SYSTEM, simpleEnv.getPriority());
// external system coprocessor with default priority
CoprocessorEnvironment<?> coprocessorEnvironmentWithPath =
host.findCoprocessorEnvironment(testClassName);
assertNotNull(coprocessorEnvironmentWithPath);
assertEquals(Coprocessor.PRIORITY_SYSTEM + 1, coprocessorEnvironmentWithPath.getPriority());
// system coprocessor with configured priority
CoprocessorEnvironment<?> coprocessorEnvironmentWithPriority =
host.findCoprocessorEnvironment(SimpleRegionObserverV3.class.getName());
assertNotNull(coprocessorEnvironmentWithPriority);
assertEquals(overridePriority, coprocessorEnvironmentWithPriority.getPriority());
// external system coprocessor with override priority
CoprocessorEnvironment<?> coprocessorEnvironmentWithPriorityAndPath =
host.findCoprocessorEnvironment(testClassNameWithPriorityAndPath);
assertNotNull(coprocessorEnvironmentWithPriorityAndPath);
assertEquals(overridePriority - 1, coprocessorEnvironmentWithPriorityAndPath.getPriority());
} finally {
if (jarFile.exists()) {
jarFile.delete();
}
if (jarFileWithPriorityAndPath.exists()) {
jarFileWithPriorityAndPath.delete();
}
}
}
@Test(expected = AssertionError.class)
public void testLoadSystemCoprocessorWithPathDoesNotExist() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
final String key = "KEY";
final String testClassName = "TestSystemCoprocessor";
CoprocessorHost<RegionCoprocessor, CoprocessorEnvironment<RegionCoprocessor>> host;
host = new CoprocessorHostForTest<>(conf);
// make a string of coprocessor with path but no priority
final String coprocessorWithPath = testClassName + "||" + testClassName + ".jar";
// Try and load a system coprocessors
conf.setStrings(key, coprocessorWithPath);
// when loading non-exist with CoprocessorHostForTest host, it aborts with AssertionError
host.loadSystemCoprocessors(conf, key);
}
@Test(expected = AssertionError.class)
public void testLoadSystemCoprocessorWithPathDoesNotExistAndPriority() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
final String key = "KEY";
final String testClassName = "TestSystemCoprocessor";
CoprocessorHost<RegionCoprocessor, CoprocessorEnvironment<RegionCoprocessor>> host;
host = new CoprocessorHostForTest<>(conf);
int overridePriority = Integer.MAX_VALUE - 1;
// make a string of coprocessor with path and priority
final String coprocessor =
testClassName + "|" + overridePriority + "|" + testClassName + ".jar";
// Try and load a system coprocessors
conf.setStrings(key, coprocessor);
// when loading non-exist coprocessor, it aborts with AssertionError
host.loadSystemCoprocessors(conf, key);
}
public static class SimpleRegionObserverV2 extends SimpleRegionObserver { }
public static class SimpleRegionObserverV3 extends SimpleRegionObserver {
@ -128,4 +235,11 @@ public class TestCoprocessorHost {
return new BaseEnvironment<>(instance, priority, 0, cpHostConf);
}
}
private File buildCoprocessorJar(String className) throws Exception {
String dataTestDir = TEST_UTIL.getDataTestDir().toString();
String code = String.format("import org.apache.hadoop.hbase.coprocessor.*; public class %s"
+ " implements RegionCoprocessor {}", className);
return ClassLoaderTestHelper.buildJar(dataTestDir, className, code);
}
}