HBASE-12575 Sanity check table coprocessor classes are loadable
This commit is contained in:
parent
1d880e3f60
commit
a731ea6304
|
@ -106,6 +106,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
|
@ -1244,9 +1245,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
+ "if you want to bypass sanity checks");
|
||||
}
|
||||
|
||||
// check split policy class can be loaded
|
||||
// check that coprocessors and other specified plugin classes can be loaded
|
||||
try {
|
||||
RegionSplitPolicy.getSplitPolicyClass(htd, conf);
|
||||
checkClassLoading(conf, htd);
|
||||
} catch (Exception ex) {
|
||||
throw new DoNotRetryIOException(ex);
|
||||
}
|
||||
|
@ -1399,6 +1400,12 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
EncryptionTest.testEncryption(conf, hcd.getEncryptionType(), hcd.getEncryptionKey());
|
||||
}
|
||||
|
||||
private void checkClassLoading(final Configuration conf, final HTableDescriptor htd)
|
||||
throws IOException {
|
||||
RegionSplitPolicy.getSplitPolicyClass(htd, conf);
|
||||
RegionCoprocessorHost.testTableCoprocessorAttrs(conf, htd);
|
||||
}
|
||||
|
||||
private HRegionInfo[] getHRegionInfos(HTableDescriptor hTableDescriptor,
|
||||
byte[][] splitKeys) {
|
||||
long regionId = System.currentTimeMillis();
|
||||
|
|
|
@ -4855,8 +4855,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
|||
*/
|
||||
protected HRegion openHRegion(final CancelableProgressable reporter)
|
||||
throws IOException {
|
||||
// Refuse to open the region if we are missing local compression support
|
||||
checkCompressionCodecs();
|
||||
// Refuse to open the region if encryption configuration is incorrect or
|
||||
// codec support is missing
|
||||
checkEncryption();
|
||||
// Refuse to open the region if a required class cannot be loaded
|
||||
checkClassLoading();
|
||||
this.openSeqNum = initialize(reporter);
|
||||
this.setSequenceId(openSeqNum);
|
||||
if (wal != null && getRegionServerServices() != null) {
|
||||
|
@ -4878,6 +4883,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
|||
}
|
||||
}
|
||||
|
||||
private void checkClassLoading() throws IOException {
|
||||
RegionSplitPolicy.getSplitPolicyClass(this.htableDescriptor, conf);
|
||||
RegionCoprocessorHost.testTableCoprocessorAttrs(conf, this.htableDescriptor);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a daughter region from given a temp directory with the region data.
|
||||
* @param hri Spec. for daughter region to open.
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
|
@ -77,6 +79,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
|||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
@ -168,6 +171,37 @@ public class RegionCoprocessorHost
|
|||
|
||||
}
|
||||
|
||||
static class TableCoprocessorAttribute {
|
||||
private Path path;
|
||||
private String className;
|
||||
private int priority;
|
||||
private Configuration conf;
|
||||
|
||||
public TableCoprocessorAttribute(Path path, String className, int priority,
|
||||
Configuration conf) {
|
||||
this.path = path;
|
||||
this.className = className;
|
||||
this.priority = priority;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
public Path getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
public String getClassName() {
|
||||
return className;
|
||||
}
|
||||
|
||||
public int getPriority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
|
||||
/** The region server services */
|
||||
RegionServerServices rsServices;
|
||||
/** The region */
|
||||
|
@ -199,15 +233,13 @@ public class RegionCoprocessorHost
|
|||
loadTableCoprocessors(conf);
|
||||
}
|
||||
|
||||
void loadTableCoprocessors(final Configuration conf) {
|
||||
// scan the table attributes for coprocessor load specifications
|
||||
// initialize the coprocessors
|
||||
List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
|
||||
for (Map.Entry<ImmutableBytesWritable,ImmutableBytesWritable> e:
|
||||
region.getTableDesc().getValues().entrySet()) {
|
||||
static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
|
||||
HTableDescriptor htd) {
|
||||
List<TableCoprocessorAttribute> result = Lists.newArrayList();
|
||||
for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e: htd.getValues().entrySet()) {
|
||||
String key = Bytes.toString(e.getKey().get()).trim();
|
||||
String spec = Bytes.toString(e.getValue().get()).trim();
|
||||
if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
|
||||
String spec = Bytes.toString(e.getValue().get()).trim();
|
||||
// found one
|
||||
try {
|
||||
Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
|
||||
|
@ -217,6 +249,11 @@ public class RegionCoprocessorHost
|
|||
Path path = matcher.group(1).trim().isEmpty() ?
|
||||
null : new Path(matcher.group(1).trim());
|
||||
String className = matcher.group(2).trim();
|
||||
if (className.isEmpty()) {
|
||||
LOG.error("Malformed table coprocessor specification: key=" +
|
||||
key + ", spec: " + spec);
|
||||
continue;
|
||||
}
|
||||
int priority = matcher.group(3).trim().isEmpty() ?
|
||||
Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
|
||||
String cfgSpec = null;
|
||||
|
@ -238,20 +275,7 @@ public class RegionCoprocessorHost
|
|||
} else {
|
||||
ourConf = conf;
|
||||
}
|
||||
// Load encompasses classloading and coprocessor initialization
|
||||
try {
|
||||
RegionEnvironment env = load(path, className, priority, ourConf);
|
||||
configured.add(env);
|
||||
LOG.info("Loaded coprocessor " + className + " from HTD of " +
|
||||
region.getTableDesc().getTableName().getNameAsString() + " successfully.");
|
||||
} catch (Throwable t) {
|
||||
// Coprocessor failed to load, do we abort on error?
|
||||
if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
|
||||
abortServer(className, t);
|
||||
} else {
|
||||
LOG.error("Failed to load coprocessor " + className, t);
|
||||
}
|
||||
}
|
||||
result.add(new TableCoprocessorAttribute(path, className, priority, ourConf));
|
||||
} else {
|
||||
LOG.error("Malformed table coprocessor specification: key=" + key +
|
||||
", spec: " + spec);
|
||||
|
@ -262,6 +286,65 @@ public class RegionCoprocessorHost
|
|||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sanity check the table coprocessor attributes of the supplied schema. Will
|
||||
* throw an exception if there is a problem.
|
||||
* @param conf
|
||||
* @param htd
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void testTableCoprocessorAttrs(final Configuration conf,
|
||||
final HTableDescriptor htd) throws IOException {
|
||||
String pathPrefix = UUID.randomUUID().toString();
|
||||
for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
|
||||
if (attr.getPriority() < 0) {
|
||||
throw new IOException("Priority for coprocessor " + attr.getClassName() +
|
||||
" cannot be less than 0");
|
||||
}
|
||||
ClassLoader old = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
ClassLoader cl;
|
||||
if (attr.getPath() != null) {
|
||||
cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
|
||||
CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
|
||||
} else {
|
||||
cl = CoprocessorHost.class.getClassLoader();
|
||||
}
|
||||
Thread.currentThread().setContextClassLoader(cl);
|
||||
cl.loadClass(attr.getClassName());
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(old);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void loadTableCoprocessors(final Configuration conf) {
|
||||
// scan the table attributes for coprocessor load specifications
|
||||
// initialize the coprocessors
|
||||
List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
|
||||
for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf,
|
||||
region.getTableDesc())) {
|
||||
// Load encompasses classloading and coprocessor initialization
|
||||
try {
|
||||
RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
|
||||
attr.getConf());
|
||||
configured.add(env);
|
||||
LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
|
||||
region.getTableDesc().getTableName().getNameAsString() + " successfully.");
|
||||
} catch (Throwable t) {
|
||||
// Coprocessor failed to load, do we abort on error?
|
||||
if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
|
||||
abortServer(attr.getClassName(), t);
|
||||
} else {
|
||||
LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
|
||||
}
|
||||
}
|
||||
}
|
||||
// add together to coprocessor set for COW efficiency
|
||||
coprocessors.addAll(configured);
|
||||
}
|
||||
|
|
|
@ -310,9 +310,9 @@ module Hbase
|
|||
create_test_table(@test_name)
|
||||
|
||||
cp_key = "coprocessor"
|
||||
class_name = "SimpleRegionObserver"
|
||||
class_name = "org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver"
|
||||
|
||||
cp_value = "hdfs:///foo.jar|" + class_name + "|12|arg1=1,arg2=2"
|
||||
cp_value = "|" + class_name + "|12|arg1=1,arg2=2"
|
||||
|
||||
# eval() is used to convert a string to regex
|
||||
assert_no_match(eval("/" + class_name + "/"), admin.describe(@test_name))
|
||||
|
@ -326,22 +326,14 @@ module Hbase
|
|||
drop_test_table(@test_name)
|
||||
create_test_table(@test_name)
|
||||
|
||||
key1 = "coprocessor"
|
||||
key2 = "MAX_FILESIZE"
|
||||
admin.alter(@test_name, true, 'METHOD' => 'table_att', key1 => "|TestCP||")
|
||||
admin.alter(@test_name, true, 'METHOD' => 'table_att', key2 => 12345678)
|
||||
key = "MAX_FILESIZE"
|
||||
admin.alter(@test_name, true, 'METHOD' => 'table_att', key => 12345678)
|
||||
|
||||
# eval() is used to convert a string to regex
|
||||
assert_match(eval("/" + key1 + "\\$(\\d+)/"), admin.describe(@test_name))
|
||||
assert_match(eval("/" + key2 + "/"), admin.describe(@test_name))
|
||||
assert_match(eval("/" + key + "/"), admin.describe(@test_name))
|
||||
|
||||
# get the cp key
|
||||
cp_keys = admin.describe(@test_name).scan(/(coprocessor\$\d+)/i)
|
||||
|
||||
admin.alter(@test_name, true, 'METHOD' => 'table_att_unset', 'NAME' => cp_keys[0][0])
|
||||
admin.alter(@test_name, true, 'METHOD' => 'table_att_unset', 'NAME' => key2)
|
||||
assert_no_match(eval("/" + key1 + "\\$(\\d+)/"), admin.describe(@test_name))
|
||||
assert_no_match(eval("/" + key2 + "/"), admin.describe(@test_name))
|
||||
admin.alter(@test_name, true, 'METHOD' => 'table_att_unset', 'NAME' => key)
|
||||
assert_no_match(eval("/" + key + "/"), admin.describe(@test_name))
|
||||
end
|
||||
|
||||
define_test "get_table should get a real table" do
|
||||
|
|
Loading…
Reference in New Issue