HBASE-14224 Fix coprocessor handling of duplicate classes

This commit is contained in:
stack 2015-08-24 13:47:19 -07:00
parent 95682e7b7d
commit 5dcb3d85a1
6 changed files with 254 additions and 55 deletions

View File

@ -550,6 +550,10 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
setDurability(isDeferredFlush ? Durability.ASYNC_WAL : DEFAULT_DURABLITY);
return this;
}
Matcher matcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(key.get()));
if (matcher.matches()) {
LOG.warn("Use addCoprocessor* methods to add a coprocessor instead");
}
values.put(key, value);
return this;
}
@ -1290,7 +1294,6 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
return this.families.remove(column);
}
/**
* Add a table coprocessor to this table. The coprocessor
* type must be {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}
@ -1306,7 +1309,6 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
return this;
}
/**
* Add a table coprocessor to this table. The coprocessor
* type must be {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}
@ -1324,10 +1326,9 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
public HTableDescriptor addCoprocessor(String className, Path jarFilePath,
int priority, final Map<String, String> kvs)
throws IOException {
if (hasCoprocessor(className)) {
throw new IOException("Coprocessor " + className + " already exists.");
}
// validate parameter kvs
checkHasCoprocessor(className);
// Validate parameter kvs and then add key/values to kvString.
StringBuilder kvString = new StringBuilder();
if (kvs != null) {
for (Map.Entry<String, String> e: kvs.entrySet()) {
@ -1347,6 +1348,48 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
}
}
String value = ((jarFilePath == null)? "" : jarFilePath.toString()) +
"|" + className + "|" + Integer.toString(priority) + "|" +
kvString.toString();
return addCoprocessorToMap(value);
}
/**
* Add a table coprocessor to this table. The coprocessor
* type must be {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}
* or Endpoint.
* It won't check if the class can be loaded or not.
* Whether a coprocessor is loadable or not will be determined when
* a region is opened.
* @param specStr The Coprocessor specification all in in one String formatted so matches
* {@link HConstants#CP_HTD_ATTR_VALUE_PATTERN}
* @throws IOException
*/
// Pity about ugly method name. addCoprocessor(String) already taken above.
public HTableDescriptor addCoprocessorWithSpec(final String specStr) throws IOException {
String className = getCoprocessorClassNameFromSpecStr(specStr);
if (className == null) {
throw new IllegalArgumentException("Format does not match " +
HConstants.CP_HTD_ATTR_VALUE_PATTERN + ": " + specStr);
}
checkHasCoprocessor(className);
return addCoprocessorToMap(specStr);
}
private void checkHasCoprocessor(final String className) throws IOException {
if (hasCoprocessor(className)) {
throw new IOException("Coprocessor " + className + " already exists.");
}
}
/**
* Add coprocessor to values Map
* @param specStr The Coprocessor specification all in in one String formatted so matches
* {@link HConstants#CP_HTD_ATTR_VALUE_PATTERN}
* @return Returns <code>this</code>
*/
private HTableDescriptor addCoprocessorToMap(final String specStr) {
if (specStr == null) return this;
// generate a coprocessor key
int maxCoprocessorNumber = 0;
Matcher keyMatcher;
@ -1358,27 +1401,22 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
if (!keyMatcher.matches()) {
continue;
}
maxCoprocessorNumber = Math.max(Integer.parseInt(keyMatcher.group(1)),
maxCoprocessorNumber);
maxCoprocessorNumber = Math.max(Integer.parseInt(keyMatcher.group(1)), maxCoprocessorNumber);
}
maxCoprocessorNumber++;
String key = "coprocessor$" + Integer.toString(maxCoprocessorNumber);
String value = ((jarFilePath == null)? "" : jarFilePath.toString()) +
"|" + className + "|" + Integer.toString(priority) + "|" +
kvString.toString();
setValue(key, value);
this.values.put(new ImmutableBytesWritable(Bytes.toBytes(key)),
new ImmutableBytesWritable(Bytes.toBytes(specStr)));
return this;
}
/**
* Check if the table has an attached co-processor represented by the name className
*
* @param className - Class name of the co-processor
* @param classNameToMatch - Class name of the co-processor
* @return true of the table has a co-processor className
*/
public boolean hasCoprocessor(String className) {
public boolean hasCoprocessor(String classNameToMatch) {
Matcher keyMatcher;
Matcher valueMatcher;
for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e:
@ -1389,15 +1427,9 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
if (!keyMatcher.matches()) {
continue;
}
valueMatcher =
HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(
Bytes.toString(e.getValue().get()));
if (!valueMatcher.matches()) {
continue;
}
// get className and compare
String clazz = valueMatcher.group(2).trim(); // classname is the 2nd field
if (clazz.equals(className.trim())) {
String className = getCoprocessorClassNameFromSpecStr(Bytes.toString(e.getValue().get()));
if (className == null) continue;
if (className.equals(classNameToMatch.trim())) {
return true;
}
}
@ -1418,16 +1450,23 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
if (!keyMatcher.matches()) {
continue;
}
valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes
.toString(e.getValue().get()));
if (!valueMatcher.matches()) {
continue;
}
result.add(valueMatcher.group(2).trim()); // classname is the 2nd field
String className = getCoprocessorClassNameFromSpecStr(Bytes.toString(e.getValue().get()));
if (className == null) continue;
result.add(className); // classname is the 2nd field
}
return result;
}
/**
* @param spec String formatted as per {@link HConstants#CP_HTD_ATTR_VALUE_PATTERN}
* @return Class parsed from passed in <code>spec</code> or null if no match or classpath found
*/
private static String getCoprocessorClassNameFromSpecStr(final String spec) {
Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
// Classname is the 2nd field
return matcher != null && matcher.matches()? matcher.group(2).trim(): null;
}
/**
* Remove a coprocessor from those set on the table
* @param className Class name of the co-processor

View File

@ -42,6 +42,49 @@ import org.junit.experimental.categories.Category;
public class TestHTableDescriptor {
private static final Log LOG = LogFactory.getLog(TestHTableDescriptor.class);
@Test (expected=IOException.class)
public void testAddCoprocessorTwice() throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
String cpName = "a.b.c.d";
htd.addCoprocessor(cpName);
htd.addCoprocessor(cpName);
}
@Test
public void testAddCoprocessorWithSpecStr() throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
String cpName = "a.b.c.d";
boolean expected = false;
try {
htd.addCoprocessorWithSpec(cpName);
} catch (IllegalArgumentException iae) {
expected = true;
}
if (!expected) fail();
// Try minimal spec.
try {
htd.addCoprocessorWithSpec("file:///some/path" + "|" + cpName);
} catch (IllegalArgumentException iae) {
expected = false;
}
if (expected) fail();
// Try more spec.
String spec = "hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2";
try {
htd.addCoprocessorWithSpec(spec);
} catch (IllegalArgumentException iae) {
expected = false;
}
if (expected) fail();
// Try double add of same coprocessor
try {
htd.addCoprocessorWithSpec(spec);
} catch (IOException ioe) {
expected = true;
}
if (!expected) fail();
}
@Test
public void testPb() throws DeserializationException, IOException {
HTableDescriptor htd = new HTableDescriptor(HTableDescriptor.META_TABLEDESC);

View File

@ -863,6 +863,15 @@ public final class HConstants {
public static final Pattern CP_HTD_ATTR_KEY_PATTERN =
Pattern.compile("^coprocessor\\$([0-9]+)$", Pattern.CASE_INSENSITIVE);
/**
* Pattern that matches a coprocessor specification. Form is:
* <code>
*&lt;coprocessor jar file location> '|' &lt<class name> ['|' &lt;priority> ['|' &lt;arguments>]]
* </code>
* ...where arguments are <code>&lt;KEY> '=' &lt;VALUE> [,...]</code>
* <p>For example: <code>hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2</code>
*/
public static final Pattern CP_HTD_ATTR_VALUE_PATTERN =
Pattern.compile("(^[^\\|]*)\\|([^\\|]+)\\|[\\s]*([\\d]*)[\\s]*(\\|.*)?$");

View File

@ -125,7 +125,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
}
/**
* Load system coprocessors. Read the class names from configuration.
* Load system coprocessors once only. Read the class names from configuration.
* Called by constructor.
*/
protected void loadSystemCoprocessors(Configuration conf, String confKey) {
@ -143,17 +143,20 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
return;
int priority = Coprocessor.PRIORITY_SYSTEM;
List<E> configured = new ArrayList<E>();
for (String className : defaultCPClasses) {
className = className.trim();
if (findCoprocessor(className) != null) {
// If already loaded will just continue
LOG.warn("Attempted duplicate loading of " + className + "; skipped");
continue;
}
ClassLoader cl = this.getClass().getClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
implClass = cl.loadClass(className);
configured.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
// Add coprocessors as we go to guard against case where a coprocessor is specified twice
// in the configuration
this.coprocessors.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
LOG.info("System coprocessor " + className + " was loaded " +
"successfully with priority (" + priority++ + ").");
} catch (Throwable t) {
@ -161,9 +164,6 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
abortServer(className, t);
}
}
// add entire set to the collection for COW efficiency
coprocessors.addAll(configured);
}
/**

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({SmallTests.class})
public class TestCoprocessorHost {
/**
* An {@link Abortable} implementation for tests.
*/
class TestAbortable implements Abortable {
private volatile boolean aborted = false;
@Override
public void abort(String why, Throwable e) {
this.aborted = true;
Assert.fail();
}
@Override
public boolean isAborted() {
return this.aborted;
}
}
@Test
public void testDoubleLoading() {
final Configuration conf = HBaseConfiguration.create();
CoprocessorHost<CoprocessorEnvironment> host =
new CoprocessorHost<CoprocessorEnvironment>(new TestAbortable()) {
final Configuration cpHostConf = conf;
@Override
public CoprocessorEnvironment createEnvironment(Class<?> implClass,
final Coprocessor instance, int priority, int sequence, Configuration conf) {
return new CoprocessorEnvironment() {
final Coprocessor envInstance = instance;
@Override
public int getVersion() {
return 0;
}
@Override
public String getHBaseVersion() {
return "0.0.0";
}
@Override
public Coprocessor getInstance() {
return envInstance;
}
@Override
public int getPriority() {
return 0;
}
@Override
public int getLoadSequence() {
return 0;
}
@Override
public Configuration getConfiguration() {
return cpHostConf;
}
@Override
public HTableInterface getTable(TableName tableName) throws IOException {
return null;
}
@Override
public HTableInterface getTable(TableName tableName, ExecutorService service)
throws IOException {
return null;
}
@Override
public ClassLoader getClassLoader() {
return null;
}
};
}
};
final String key = "KEY";
final String coprocessor = "org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver";
// Try and load coprocessor three times.
conf.setStrings(key, coprocessor, coprocessor, coprocessor);
host.loadSystemCoprocessors(conf, key);
// Only one coprocessor loaded
Assert.assertEquals(1, host.coprocessors.size());
}
}

View File

@ -582,25 +582,9 @@ module Hbase
k.strip!
if (k =~ /coprocessor/i)
# validate coprocessor specs
v = String.new(value)
v.strip!
if !(v =~ /^([^\|]*)\|([^\|]+)\|[\s]*([\d]*)[\s]*(\|.*)?$/)
raise ArgumentError, "Coprocessor value doesn't match spec: #{v}"
end
# generate a coprocessor ordinal by checking max id of existing cps
maxId = 0
htd.getValues().each do |k1, v1|
attrName = org.apache.hadoop.hbase.util.Bytes.toString(k1.get())
# a cp key is coprocessor$(\d)
if (attrName =~ /coprocessor\$(\d+)/i)
ids = attrName.scan(/coprocessor\$(\d+)/i)
maxId = ids[0][0].to_i if ids[0][0].to_i > maxId
end
end
maxId += 1
htd.setValue(k + "\$" + maxId.to_s, value)
htd.addCoprocessor(v)
valid_coproc_keys << key
end
end