diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 92566e3fc98..752b5ad4ca9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -550,6 +550,10 @@ public class HTableDescriptor implements WritableComparable { setDurability(isDeferredFlush ? Durability.ASYNC_WAL : DEFAULT_DURABLITY); return this; } + Matcher matcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(key.get())); + if (matcher.matches()) { + LOG.warn("Use addCoprocessor* methods to add a coprocessor instead"); + } values.put(key, value); return this; } @@ -1290,7 +1294,6 @@ public class HTableDescriptor implements WritableComparable { return this.families.remove(column); } - /** * Add a table coprocessor to this table. The coprocessor * type must be {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} @@ -1306,7 +1309,6 @@ public class HTableDescriptor implements WritableComparable { return this; } - /** * Add a table coprocessor to this table. The coprocessor * type must be {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} @@ -1324,10 +1326,9 @@ public class HTableDescriptor implements WritableComparable { public HTableDescriptor addCoprocessor(String className, Path jarFilePath, int priority, final Map kvs) throws IOException { - if (hasCoprocessor(className)) { - throw new IOException("Coprocessor " + className + " already exists."); - } - // validate parameter kvs + checkHasCoprocessor(className); + + // Validate parameter kvs and then add key/values to kvString. StringBuilder kvString = new StringBuilder(); if (kvs != null) { for (Map.Entry e: kvs.entrySet()) { @@ -1347,6 +1348,48 @@ public class HTableDescriptor implements WritableComparable { } } + String value = ((jarFilePath == null)? "" : jarFilePath.toString()) + + "|" + className + "|" + Integer.toString(priority) + "|" + + kvString.toString(); + return addCoprocessorToMap(value); + } + + /** + * Add a table coprocessor to this table. The coprocessor + * type must be {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} + * or Endpoint. + * It won't check if the class can be loaded or not. + * Whether a coprocessor is loadable or not will be determined when + * a region is opened. + * @param specStr The Coprocessor specification all in in one String formatted so matches + * {@link HConstants#CP_HTD_ATTR_VALUE_PATTERN} + * @throws IOException + */ + // Pity about ugly method name. addCoprocessor(String) already taken above. + public HTableDescriptor addCoprocessorWithSpec(final String specStr) throws IOException { + String className = getCoprocessorClassNameFromSpecStr(specStr); + if (className == null) { + throw new IllegalArgumentException("Format does not match " + + HConstants.CP_HTD_ATTR_VALUE_PATTERN + ": " + specStr); + } + checkHasCoprocessor(className); + return addCoprocessorToMap(specStr); + } + + private void checkHasCoprocessor(final String className) throws IOException { + if (hasCoprocessor(className)) { + throw new IOException("Coprocessor " + className + " already exists."); + } + } + + /** + * Add coprocessor to values Map + * @param specStr The Coprocessor specification all in in one String formatted so matches + * {@link HConstants#CP_HTD_ATTR_VALUE_PATTERN} + * @return Returns this + */ + private HTableDescriptor addCoprocessorToMap(final String specStr) { + if (specStr == null) return this; // generate a coprocessor key int maxCoprocessorNumber = 0; Matcher keyMatcher; @@ -1358,27 +1401,22 @@ public class HTableDescriptor implements WritableComparable { if (!keyMatcher.matches()) { continue; } - maxCoprocessorNumber = Math.max(Integer.parseInt(keyMatcher.group(1)), - maxCoprocessorNumber); + maxCoprocessorNumber = Math.max(Integer.parseInt(keyMatcher.group(1)), maxCoprocessorNumber); } maxCoprocessorNumber++; - String key = "coprocessor$" + Integer.toString(maxCoprocessorNumber); - String value = ((jarFilePath == null)? "" : jarFilePath.toString()) + - "|" + className + "|" + Integer.toString(priority) + "|" + - kvString.toString(); - setValue(key, value); + this.values.put(new ImmutableBytesWritable(Bytes.toBytes(key)), + new ImmutableBytesWritable(Bytes.toBytes(specStr))); return this; } - /** * Check if the table has an attached co-processor represented by the name className * - * @param className - Class name of the co-processor + * @param classNameToMatch - Class name of the co-processor * @return true of the table has a co-processor className */ - public boolean hasCoprocessor(String className) { + public boolean hasCoprocessor(String classNameToMatch) { Matcher keyMatcher; Matcher valueMatcher; for (Map.Entry e: @@ -1389,15 +1427,9 @@ public class HTableDescriptor implements WritableComparable { if (!keyMatcher.matches()) { continue; } - valueMatcher = - HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher( - Bytes.toString(e.getValue().get())); - if (!valueMatcher.matches()) { - continue; - } - // get className and compare - String clazz = valueMatcher.group(2).trim(); // classname is the 2nd field - if (clazz.equals(className.trim())) { + String className = getCoprocessorClassNameFromSpecStr(Bytes.toString(e.getValue().get())); + if (className == null) continue; + if (className.equals(classNameToMatch.trim())) { return true; } } @@ -1418,16 +1450,23 @@ public class HTableDescriptor implements WritableComparable { if (!keyMatcher.matches()) { continue; } - valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes - .toString(e.getValue().get())); - if (!valueMatcher.matches()) { - continue; - } - result.add(valueMatcher.group(2).trim()); // classname is the 2nd field + String className = getCoprocessorClassNameFromSpecStr(Bytes.toString(e.getValue().get())); + if (className == null) continue; + result.add(className); // classname is the 2nd field } return result; } + /** + * @param spec String formatted as per {@link HConstants#CP_HTD_ATTR_VALUE_PATTERN} + * @return Class parsed from passed in spec or null if no match or classpath found + */ + private static String getCoprocessorClassNameFromSpecStr(final String spec) { + Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec); + // Classname is the 2nd field + return matcher != null && matcher.matches()? matcher.group(2).trim(): null; + } + /** * Remove a coprocessor from those set on the table * @param className Class name of the co-processor diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java index 0e580d815cd..4d9caaeb737 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java @@ -42,6 +42,49 @@ import org.junit.experimental.categories.Category; public class TestHTableDescriptor { private static final Log LOG = LogFactory.getLog(TestHTableDescriptor.class); + @Test (expected=IOException.class) + public void testAddCoprocessorTwice() throws IOException { + HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME); + String cpName = "a.b.c.d"; + htd.addCoprocessor(cpName); + htd.addCoprocessor(cpName); + } + + @Test + public void testAddCoprocessorWithSpecStr() throws IOException { + HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME); + String cpName = "a.b.c.d"; + boolean expected = false; + try { + htd.addCoprocessorWithSpec(cpName); + } catch (IllegalArgumentException iae) { + expected = true; + } + if (!expected) fail(); + // Try minimal spec. + try { + htd.addCoprocessorWithSpec("file:///some/path" + "|" + cpName); + } catch (IllegalArgumentException iae) { + expected = false; + } + if (expected) fail(); + // Try more spec. + String spec = "hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2"; + try { + htd.addCoprocessorWithSpec(spec); + } catch (IllegalArgumentException iae) { + expected = false; + } + if (expected) fail(); + // Try double add of same coprocessor + try { + htd.addCoprocessorWithSpec(spec); + } catch (IOException ioe) { + expected = true; + } + if (!expected) fail(); + } + @Test public void testPb() throws DeserializationException, IOException { HTableDescriptor htd = new HTableDescriptor(HTableDescriptor.META_TABLEDESC); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 067a4dd6be1..e2eb6e0ba92 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -863,6 +863,15 @@ public final class HConstants { public static final Pattern CP_HTD_ATTR_KEY_PATTERN = Pattern.compile("^coprocessor\\$([0-9]+)$", Pattern.CASE_INSENSITIVE); + + /** + * Pattern that matches a coprocessor specification. Form is: + * + *<coprocessor jar file location> '|' < ['|' <priority> ['|' <arguments>]] + * + * ...where arguments are <KEY> '=' <VALUE> [,...] + *

For example: hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2 + */ public static final Pattern CP_HTD_ATTR_VALUE_PATTERN = Pattern.compile("(^[^\\|]*)\\|([^\\|]+)\\|[\\s]*([\\d]*)[\\s]*(\\|.*)?$"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index 7f689c9956a..ac909d6c093 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -118,14 +118,14 @@ public abstract class CoprocessorHost { */ public Set getCoprocessors() { Set returnValue = new TreeSet(); - for(CoprocessorEnvironment e: coprocessors) { + for (CoprocessorEnvironment e: coprocessors) { returnValue.add(e.getInstance().getClass().getSimpleName()); } return returnValue; } /** - * Load system coprocessors. Read the class names from configuration. + * Load system coprocessors once only. Read the class names from configuration. * Called by constructor. */ protected void loadSystemCoprocessors(Configuration conf, String confKey) { @@ -143,17 +143,20 @@ public abstract class CoprocessorHost { return; int priority = Coprocessor.PRIORITY_SYSTEM; - List configured = new ArrayList(); for (String className : defaultCPClasses) { className = className.trim(); if (findCoprocessor(className) != null) { + // If already loaded will just continue + LOG.warn("Attempted duplicate loading of " + className + "; skipped"); continue; } ClassLoader cl = this.getClass().getClassLoader(); Thread.currentThread().setContextClassLoader(cl); try { implClass = cl.loadClass(className); - configured.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf)); + // Add coprocessors as we go to guard against case where a coprocessor is specified twice + // in the configuration + this.coprocessors.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf)); LOG.info("System coprocessor " + className + " was loaded " + "successfully with priority (" + priority++ + ")."); } catch (Throwable t) { @@ -161,9 +164,6 @@ public abstract class CoprocessorHost { abortServer(className, t); } } - - // add entire set to the collection for COW efficiency - coprocessors.addAll(configured); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorHost.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorHost.java new file mode 100644 index 00000000000..2faaf746893 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorHost.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({SmallTests.class}) +public class TestCoprocessorHost { + /** + * An {@link Abortable} implementation for tests. + */ + class TestAbortable implements Abortable { + private volatile boolean aborted = false; + + @Override + public void abort(String why, Throwable e) { + this.aborted = true; + Assert.fail(); + } + + @Override + public boolean isAborted() { + return this.aborted; + } + } + + @Test + public void testDoubleLoading() { + final Configuration conf = HBaseConfiguration.create(); + CoprocessorHost host = + new CoprocessorHost(new TestAbortable()) { + final Configuration cpHostConf = conf; + + @Override + public CoprocessorEnvironment createEnvironment(Class implClass, + final Coprocessor instance, int priority, int sequence, Configuration conf) { + return new CoprocessorEnvironment() { + final Coprocessor envInstance = instance; + + @Override + public int getVersion() { + return 0; + } + + @Override + public String getHBaseVersion() { + return "0.0.0"; + } + + @Override + public Coprocessor getInstance() { + return envInstance; + } + + @Override + public int getPriority() { + return 0; + } + + @Override + public int getLoadSequence() { + return 0; + } + + @Override + public Configuration getConfiguration() { + return cpHostConf; + } + + @Override + public HTableInterface getTable(TableName tableName) throws IOException { + return null; + } + + @Override + public HTableInterface getTable(TableName tableName, ExecutorService service) + throws IOException { + return null; + } + + @Override + public ClassLoader getClassLoader() { + return null; + } + }; + } + }; + final String key = "KEY"; + final String coprocessor = "org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver"; + // Try and load coprocessor three times. + conf.setStrings(key, coprocessor, coprocessor, coprocessor); + host.loadSystemCoprocessors(conf, key); + // Only one coprocessor loaded + Assert.assertEquals(1, host.coprocessors.size()); + } +} \ No newline at end of file diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 3833c88b52a..0014df7b024 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -582,25 +582,9 @@ module Hbase k.strip! if (k =~ /coprocessor/i) - # validate coprocessor specs v = String.new(value) v.strip! - if !(v =~ /^([^\|]*)\|([^\|]+)\|[\s]*([\d]*)[\s]*(\|.*)?$/) - raise ArgumentError, "Coprocessor value doesn't match spec: #{v}" - end - - # generate a coprocessor ordinal by checking max id of existing cps - maxId = 0 - htd.getValues().each do |k1, v1| - attrName = org.apache.hadoop.hbase.util.Bytes.toString(k1.get()) - # a cp key is coprocessor$(\d) - if (attrName =~ /coprocessor\$(\d+)/i) - ids = attrName.scan(/coprocessor\$(\d+)/i) - maxId = ids[0][0].to_i if ids[0][0].to_i > maxId - end - end - maxId += 1 - htd.setValue(k + "\$" + maxId.to_s, value) + htd.addCoprocessor(v) valid_coproc_keys << key end end