HBASE-3810 Relax parsing and add convenience methods for HTableDescriptor coprocessor config
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1153040 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
488537b281
commit
da50b95dd5
|
@ -349,6 +349,8 @@ Release 0.91.0 - Unreleased
|
|||
HBASE-4143 HTable.doPut(List) should check the writebuffer length every so often
|
||||
(Doug Meil via Ted Yu)
|
||||
HBASE-3065 Retry all 'retryable' zk operations; e.g. connection loss (Liyin Tang)
|
||||
HBASE-3810 Registering a coprocessor in HTableDescriptor should be easier
|
||||
(Mingjie Lai via garyh)
|
||||
|
||||
TASKS
|
||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||
|
|
|
@ -29,10 +29,13 @@ import java.util.Iterator;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.regex.Matcher;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.coprocessor.Coprocessor;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
|
@ -667,6 +670,110 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
|
|||
return this.families.remove(column);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a table coprocessor to this table. The coprocessor
|
||||
* type must be {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}
|
||||
* or Endpoint.
|
||||
* It won't check if the class can be loaded or not.
|
||||
* Whether a coprocessor is loadable or not will be determined when
|
||||
* a region is opened.
|
||||
* @param className Full class name.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void addCoprocessor(String className) throws IOException {
|
||||
addCoprocessor(className, null, Coprocessor.PRIORITY_USER, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a table coprocessor to this table. The coprocessor
|
||||
* type must be {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}
|
||||
* or Endpoint.
|
||||
* It won't check if the class can be loaded or not.
|
||||
* Whether a coprocessor is loadable or not will be determined when
|
||||
* a region is opened.
|
||||
* @param jarFilePath Path of the jar file. If it's null, the class will be
|
||||
* loaded from default classloader.
|
||||
* @param className Full class name.
|
||||
* @param priority Priority
|
||||
* @param kvs Arbitrary key-value parameter pairs passed into the coprocessor.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void addCoprocessor(String className, Path jarFilePath,
|
||||
int priority, final Map<String, String> kvs)
|
||||
throws IOException {
|
||||
if (hasCoprocessor(className)) {
|
||||
throw new IOException("Coprocessor " + className + " already exists.");
|
||||
}
|
||||
// validate parameter kvs
|
||||
//String kvString = "";
|
||||
StringBuilder kvString = new StringBuilder();
|
||||
if (kvs != null) {
|
||||
for (Map.Entry<String, String> e: kvs.entrySet()) {
|
||||
if (!e.getKey().matches(RegionCoprocessorHost.PARAMETER_KEY_PATTERN)) {
|
||||
throw new IOException("Illegal parameter key = " + e.getKey());
|
||||
}
|
||||
if (!e.getValue().matches(RegionCoprocessorHost.PARAMETER_VALUE_PATTERN)) {
|
||||
throw new IOException("Illegal parameter (" + e.getKey() +
|
||||
") value = " + e.getValue());
|
||||
}
|
||||
if (kvString.length() != 0) {
|
||||
kvString.append(',');
|
||||
}
|
||||
kvString.append(e.getKey());
|
||||
kvString.append('=');
|
||||
kvString.append(e.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
// generate a coprocessor key
|
||||
int maxCoprocessorNumber = 0;
|
||||
Matcher keyMatcher;
|
||||
for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e:
|
||||
this.values.entrySet()) {
|
||||
keyMatcher =
|
||||
RegionCoprocessorHost.CP_KEY_PATTERN.matcher(
|
||||
Bytes.toString(e.getKey().get()));
|
||||
if (!keyMatcher.matches()) {
|
||||
continue;
|
||||
}
|
||||
maxCoprocessorNumber = Math.max(Integer.parseInt(keyMatcher.group(1)),
|
||||
maxCoprocessorNumber);
|
||||
}
|
||||
maxCoprocessorNumber++;
|
||||
|
||||
String key = "coprocessor$" + Integer.toString(maxCoprocessorNumber);
|
||||
String value = ((jarFilePath == null)? "" : jarFilePath.toString()) +
|
||||
"|" + className + "|" + Integer.toString(priority) + "|" +
|
||||
kvString.toString();
|
||||
setValue(key, value);
|
||||
}
|
||||
|
||||
public boolean hasCoprocessor(String className) {
|
||||
Matcher keyMatcher;
|
||||
Matcher valueMatcher;
|
||||
for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e:
|
||||
this.values.entrySet()) {
|
||||
keyMatcher =
|
||||
RegionCoprocessorHost.CP_KEY_PATTERN.matcher(
|
||||
Bytes.toString(e.getKey().get()));
|
||||
if (!keyMatcher.matches()) {
|
||||
continue;
|
||||
}
|
||||
valueMatcher =
|
||||
RegionCoprocessorHost.CP_VALUE_PATTERN.matcher(
|
||||
Bytes.toString(e.getValue().get()));
|
||||
if (!valueMatcher.matches()) {
|
||||
continue;
|
||||
}
|
||||
// get className and compare
|
||||
String clazz = valueMatcher.group(2).trim(); // classname is the 2nd field
|
||||
if (clazz.equals(className.trim())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param rootdir qualified path of HBase root directory
|
||||
* @param tableName name of table
|
||||
|
|
|
@ -92,9 +92,15 @@ public class RegionCoprocessorHost
|
|||
}
|
||||
}
|
||||
|
||||
static final Pattern attrSpecMatch1 = Pattern.compile("(.+)\\|(.+)\\|(.+)\\|(.+)");
|
||||
static final Pattern attrSpecMatch2 = Pattern.compile("(.+)\\|(.+)\\|(.+)");
|
||||
static final Pattern cfgSpecMatch = Pattern.compile("([^=]+)=([^,]+),?");
|
||||
public static final Pattern CP_KEY_PATTERN = Pattern.compile
|
||||
("coprocessor\\$([0-9]+)", Pattern.CASE_INSENSITIVE);
|
||||
public static final Pattern CP_VALUE_PATTERN =
|
||||
Pattern.compile("([^\\|]*)\\|([^\\|]+)\\|[\\s]*([\\d]*)[\\s]*(\\|.*)?");
|
||||
|
||||
public static final String PARAMETER_KEY_PATTERN = "[^=,]+";
|
||||
public static final String PARAMETER_VALUE_PATTERN = "[^,]+";
|
||||
public static final Pattern CFG_SPEC_MATCH = Pattern.compile(
|
||||
"(" + PARAMETER_KEY_PATTERN + ")=(" + PARAMETER_VALUE_PATTERN + "),?");
|
||||
|
||||
/** The region server services */
|
||||
RegionServerServices rsServices;
|
||||
|
@ -117,28 +123,29 @@ public class RegionCoprocessorHost
|
|||
loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
|
||||
|
||||
// load Coprocessor From HDFS
|
||||
loadTableCoprocessors();
|
||||
loadTableCoprocessors(conf);
|
||||
}
|
||||
|
||||
void loadTableCoprocessors() {
|
||||
void loadTableCoprocessors(final Configuration conf) {
|
||||
// scan the table attributes for coprocessor load specifications
|
||||
// initialize the coprocessors
|
||||
List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
|
||||
for (Map.Entry<ImmutableBytesWritable,ImmutableBytesWritable> e:
|
||||
region.getTableDesc().getValues().entrySet()) {
|
||||
String key = Bytes.toString(e.getKey().get());
|
||||
String spec = Bytes.toString(e.getValue().get());
|
||||
if (key.startsWith("COPROCESSOR")) {
|
||||
String key = Bytes.toString(e.getKey().get()).trim();
|
||||
String spec = Bytes.toString(e.getValue().get()).trim();
|
||||
if (CP_KEY_PATTERN.matcher(key).matches()) {
|
||||
// found one
|
||||
try {
|
||||
Matcher matcher = attrSpecMatch1.matcher(spec);
|
||||
if (!matcher.matches()) {
|
||||
matcher = attrSpecMatch2.matcher(spec);
|
||||
}
|
||||
Matcher matcher = CP_VALUE_PATTERN.matcher(spec);
|
||||
if (matcher.matches()) {
|
||||
Path path = new Path(matcher.group(1));
|
||||
String className = matcher.group(2);
|
||||
int priority = Integer.valueOf(matcher.group(3));
|
||||
// jar file path can be empty if the cp class can be loaded
|
||||
// from class loader.
|
||||
Path path = matcher.group(1).trim().isEmpty() ?
|
||||
null : new Path(matcher.group(1).trim());
|
||||
String className = matcher.group(2).trim();
|
||||
int priority = matcher.group(3).trim().isEmpty() ?
|
||||
Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
|
||||
String cfgSpec = null;
|
||||
try {
|
||||
cfgSpec = matcher.group(4);
|
||||
|
@ -146,8 +153,9 @@ public class RegionCoprocessorHost
|
|||
// ignore
|
||||
}
|
||||
if (cfgSpec != null) {
|
||||
Configuration newConf = HBaseConfiguration.create();
|
||||
Matcher m = cfgSpecMatch.matcher(cfgSpec);
|
||||
cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
|
||||
Configuration newConf = HBaseConfiguration.create(conf);
|
||||
Matcher m = CFG_SPEC_MATCH.matcher(cfgSpec);
|
||||
while (m.find()) {
|
||||
newConf.set(m.group(1), m.group(2));
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import java.util.jar.*;
|
|||
import org.junit.*;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
/**
|
||||
* Test coprocessors class loading.
|
||||
|
@ -57,6 +58,8 @@ public class TestClassLoading {
|
|||
static final String cpName1 = "TestCP1";
|
||||
static final String cpName2 = "TestCP2";
|
||||
static final String cpName3 = "TestCP3";
|
||||
static final String cpName4 = "TestCP4";
|
||||
static final String cpName5 = "TestCP5";
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
|
@ -187,6 +190,10 @@ public class TestClassLoading {
|
|||
htd.setValue("COPROCESSOR$2", jarFileOnHDFS2.toString() + "|" + cpName2 +
|
||||
"|" + Coprocessor.PRIORITY_USER + "|k1=v1,k2=v2,k3=v3");
|
||||
HBaseAdmin admin = new HBaseAdmin(this.conf);
|
||||
if (admin.tableExists(tableName)) {
|
||||
admin.disableTable(tableName);
|
||||
admin.deleteTable(tableName);
|
||||
}
|
||||
admin.createTable(htd);
|
||||
|
||||
// verify that the coprocessors were loaded
|
||||
|
@ -240,4 +247,94 @@ public class TestClassLoading {
|
|||
}
|
||||
assertTrue("Class " + cpName3 + " was missing on a region", found);
|
||||
}
|
||||
|
||||
@Test
|
||||
// HBase-3810: Registering a Coprocessor at HTableDescriptor should be
|
||||
// less strict
|
||||
public void testHBase3810() throws Exception {
|
||||
// allowed value pattern: [path] | class name | [priority] | [key values]
|
||||
|
||||
File jarFile1 = buildCoprocessorJar(cpName1);
|
||||
File jarFile2 = buildCoprocessorJar(cpName2);
|
||||
File jarFile4 = buildCoprocessorJar(cpName4);
|
||||
File jarFile5 = buildCoprocessorJar(cpName5);
|
||||
|
||||
String cpKey1 = "COPROCESSOR$1";
|
||||
String cpKey2 = " Coprocessor$2 ";
|
||||
String cpKey3 = " coprocessor$03 ";
|
||||
|
||||
String cpValue1 = jarFile1.toString() + "|" + cpName1 + "|" +
|
||||
Coprocessor.PRIORITY_USER;
|
||||
String cpValue2 = jarFile2.toString() + " | " + cpName2 + " | ";
|
||||
// load from default class loader
|
||||
String cpValue3 =
|
||||
" | org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver | | k=v ";
|
||||
|
||||
// create a table that references the jar
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
htd.addFamily(new HColumnDescriptor("test"));
|
||||
|
||||
// add 3 coprocessors by setting htd attributes directly.
|
||||
htd.setValue(cpKey1, cpValue1);
|
||||
htd.setValue(cpKey2, cpValue2);
|
||||
htd.setValue(cpKey3, cpValue3);
|
||||
|
||||
// add 2 coprocessor by using new htd.addCoprocessor() api
|
||||
htd.addCoprocessor(cpName4, new Path(jarFile4.getPath()),
|
||||
Coprocessor.PRIORITY_USER, null);
|
||||
Map<String, String> kvs = new HashMap<String, String>();
|
||||
kvs.put("k1", "v1");
|
||||
kvs.put("k2", "v2");
|
||||
kvs.put("k3", "v3");
|
||||
htd.addCoprocessor(cpName5, new Path(jarFile5.getPath()),
|
||||
Coprocessor.PRIORITY_USER, kvs);
|
||||
|
||||
HBaseAdmin admin = new HBaseAdmin(this.conf);
|
||||
if (admin.tableExists(tableName)) {
|
||||
admin.disableTable(tableName);
|
||||
admin.deleteTable(tableName);
|
||||
}
|
||||
admin.createTable(htd);
|
||||
|
||||
// verify that the coprocessor was loaded
|
||||
boolean found_2 = false, found_1 = false, found_3 = false,
|
||||
found_4 = false, found_5 = false;
|
||||
boolean found5_k1 = false, found5_k2 = false, found5_k3 = false,
|
||||
found5_k4 = false;
|
||||
|
||||
MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
|
||||
for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
|
||||
if (region.getRegionNameAsString().startsWith(tableName)) {
|
||||
found_1 = found_1 ||
|
||||
(region.getCoprocessorHost().findCoprocessor(cpName1) != null);
|
||||
found_2 = found_2 ||
|
||||
(region.getCoprocessorHost().findCoprocessor(cpName2) != null);
|
||||
found_3 = found_3 ||
|
||||
(region.getCoprocessorHost().findCoprocessor("SimpleRegionObserver")
|
||||
!= null);
|
||||
found_4 = found_4 ||
|
||||
(region.getCoprocessorHost().findCoprocessor(cpName4) != null);
|
||||
|
||||
CoprocessorEnvironment env =
|
||||
region.getCoprocessorHost().findCoprocessorEnvironment(cpName5);
|
||||
if (env != null) {
|
||||
found_5 = true;
|
||||
Configuration conf = env.getConfiguration();
|
||||
found5_k1 = conf.get("k1") != null;
|
||||
found5_k2 = conf.get("k2") != null;
|
||||
found5_k3 = conf.get("k3") != null;
|
||||
}
|
||||
}
|
||||
}
|
||||
assertTrue("Class " + cpName1 + " was missing on a region", found_1);
|
||||
assertTrue("Class " + cpName2 + " was missing on a region", found_2);
|
||||
assertTrue("Class SimpleRegionObserver was missing on a region", found_3);
|
||||
assertTrue("Class " + cpName4 + " was missing on a region", found_4);
|
||||
assertTrue("Class " + cpName5 + " was missing on a region", found_5);
|
||||
|
||||
assertTrue("Configuration key 'k1' was missing on a region", found5_k1);
|
||||
assertTrue("Configuration key 'k2' was missing on a region", found5_k2);
|
||||
assertTrue("Configuration key 'k3' was missing on a region", found5_k3);
|
||||
assertFalse("Configuration key 'k4' wasn't configured", found5_k4);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue