HBASE-6873. Clean up Coprocessor loading failure handling
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1558869 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e110e51b5e
commit
5e859ec8a7
|
@ -863,12 +863,13 @@ possible configurations would overwhelm and obscure the important.
|
|||
</property>
|
||||
<property>
|
||||
<name>hbase.coprocessor.abortonerror</name>
|
||||
<value>false</value>
|
||||
<description>Set to true to cause the hosting server (master or regionserver) to
|
||||
abort if a coprocessor throws a Throwable object that is not IOException or
|
||||
a subclass of IOException. Setting it to true might be useful in development
|
||||
environments where one wants to terminate the server as soon as possible to
|
||||
simplify coprocessor failure analysis.</description>
|
||||
<value>true</value>
|
||||
<description>Set to true to cause the hosting server (master or regionserver)
|
||||
to abort if a coprocessor fails to load, fails to initialize, or throws an
|
||||
unexpected Throwable object. Setting this to false will allow the server to
|
||||
continue execution but the system wide state of the coprocessor in question
|
||||
will become inconsistent as it will be properly executing in only a subset
|
||||
of servers, so this is most useful for debugging only.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.online.schema.update.enable</name>
|
||||
|
|
|
@ -39,12 +39,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.CoprocessorHConnection;
|
||||
|
@ -92,8 +92,11 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
"hbase.coprocessor.master.classes";
|
||||
public static final String WAL_COPROCESSOR_CONF_KEY =
|
||||
"hbase.coprocessor.wal.classes";
|
||||
public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
|
||||
public static final boolean DEFAULT_ABORT_ON_ERROR = true;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
|
||||
protected Abortable abortable;
|
||||
/** Ordered set of loaded coprocessors with lock */
|
||||
protected SortedSet<E> coprocessors =
|
||||
new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());
|
||||
|
@ -102,8 +105,9 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
protected String pathPrefix;
|
||||
protected AtomicInteger loadSequence = new AtomicInteger();
|
||||
|
||||
public CoprocessorHost() {
|
||||
pathPrefix = UUID.randomUUID().toString();
|
||||
public CoprocessorHost(Abortable abortable) {
|
||||
this.abortable = abortable;
|
||||
this.pathPrefix = UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -161,12 +165,9 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
configured.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
|
||||
LOG.info("System coprocessor " + className + " was loaded " +
|
||||
"successfully with priority (" + priority++ + ").");
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.warn("Class " + className + " cannot be found. " +
|
||||
e.getMessage());
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Load coprocessor " + className + " failed. " +
|
||||
e.getMessage());
|
||||
} catch (Throwable t) {
|
||||
// We always abort if system coprocessors cannot be loaded
|
||||
abortServer(className, t);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -650,7 +651,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
}
|
||||
|
||||
/** Initialize the environment */
|
||||
public void startup() {
|
||||
public void startup() throws IOException {
|
||||
if (state == Coprocessor.State.INSTALLED ||
|
||||
state == Coprocessor.State.STOPPED) {
|
||||
state = Coprocessor.State.STARTING;
|
||||
|
@ -660,8 +661,6 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
currentThread.setContextClassLoader(this.getClassLoader());
|
||||
impl.start(this);
|
||||
state = Coprocessor.State.ACTIVE;
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Error starting coprocessor "+impl.getClass().getName(), ioe);
|
||||
} finally {
|
||||
currentThread.setContextClassLoader(hostClassLoader);
|
||||
}
|
||||
|
@ -763,25 +762,20 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
}
|
||||
}
|
||||
|
||||
protected void abortServer(final String service,
|
||||
final Server server,
|
||||
final CoprocessorEnvironment environment,
|
||||
final Throwable e) {
|
||||
String coprocessorName = (environment.getInstance()).toString();
|
||||
server.abort("Aborting service: " + service + " running on : "
|
||||
+ server.getServerName() + " because coprocessor: "
|
||||
+ coprocessorName + " threw an exception.", e);
|
||||
protected void abortServer(final CoprocessorEnvironment environment, final Throwable e) {
|
||||
abortServer(environment.getInstance().getClass().getName(), e);
|
||||
}
|
||||
|
||||
protected void abortServer(final CoprocessorEnvironment environment,
|
||||
final Throwable e) {
|
||||
String coprocessorName = (environment.getInstance()).toString();
|
||||
LOG.error("The coprocessor: " + coprocessorName + " threw an unexpected " +
|
||||
"exception: " + e + ", but there's no specific implementation of " +
|
||||
" abortServer() for this coprocessor's environment.");
|
||||
protected void abortServer(final String coprocessorName, final Throwable e) {
|
||||
String message = "The coprocessor " + coprocessorName + " threw an unexpected exception";
|
||||
LOG.error(message, e);
|
||||
if (abortable != null) {
|
||||
abortable.abort(message, e);
|
||||
} else {
|
||||
LOG.warn("No available Abortable, process was not aborted");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This is used by coprocessor hooks which are declared to throw IOException
|
||||
* (or its subtypes). For such hooks, we should handle throwable objects
|
||||
|
@ -797,8 +791,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
* @param e Throwable object thrown by coprocessor.
|
||||
* @exception IOException Exception
|
||||
*/
|
||||
protected void handleCoprocessorThrowable(final CoprocessorEnvironment env,
|
||||
final Throwable e)
|
||||
protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e)
|
||||
throws IOException {
|
||||
if (e instanceof IOException) {
|
||||
throw (IOException)e;
|
||||
|
@ -809,7 +802,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
// 'hbase.coprocessor.abortonerror' to true will cause abortServer(),
|
||||
// which may be useful in development and testing environments where
|
||||
// 'failing fast' for error analysis is desired.
|
||||
if (env.getConfiguration().getBoolean("hbase.coprocessor.abortonerror",false)) {
|
||||
if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
|
||||
// server is configured to abort.
|
||||
abortServer(env, e);
|
||||
} else {
|
||||
|
@ -828,5 +821,3 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -60,6 +60,7 @@ public class MasterCoprocessorHost
|
|||
private MasterServices masterServices;
|
||||
|
||||
MasterCoprocessorHost(final MasterServices services, final Configuration conf) {
|
||||
super(services);
|
||||
this.conf = conf;
|
||||
this.masterServices = services;
|
||||
loadSystemCoprocessors(conf, MASTER_COPROCESSOR_CONF_KEY);
|
||||
|
@ -78,11 +79,6 @@ public class MasterCoprocessorHost
|
|||
masterServices);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void abortServer(final CoprocessorEnvironment env, final Throwable e) {
|
||||
abortServer("master", masterServices, env, e);
|
||||
}
|
||||
|
||||
public boolean preCreateNamespace(final NamespaceDescriptor ns) throws IOException {
|
||||
boolean bypass = false;
|
||||
ObserverContext<MasterCoprocessorEnvironment> ctx = null;
|
||||
|
|
|
@ -69,7 +69,6 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.protobuf.Message;
|
||||
|
@ -146,6 +145,7 @@ public class RegionCoprocessorHost
|
|||
*/
|
||||
public RegionCoprocessorHost(final HRegion region,
|
||||
final RegionServerServices rsServices, final Configuration conf) {
|
||||
super(rsServices);
|
||||
this.conf = conf;
|
||||
this.rsServices = rsServices;
|
||||
this.region = region;
|
||||
|
@ -189,29 +189,40 @@ public class RegionCoprocessorHost
|
|||
} catch (IndexOutOfBoundsException ex) {
|
||||
// ignore
|
||||
}
|
||||
Configuration ourConf;
|
||||
if (cfgSpec != null) {
|
||||
cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
|
||||
// do an explicit deep copy of the passed configuration
|
||||
Configuration newConf = new Configuration(false);
|
||||
HBaseConfiguration.merge(newConf, conf);
|
||||
ourConf = new Configuration(false);
|
||||
HBaseConfiguration.merge(ourConf, conf);
|
||||
Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
|
||||
while (m.find()) {
|
||||
newConf.set(m.group(1), m.group(2));
|
||||
ourConf.set(m.group(1), m.group(2));
|
||||
}
|
||||
configured.add(load(path, className, priority, newConf));
|
||||
} else {
|
||||
configured.add(load(path, className, priority, conf));
|
||||
ourConf = conf;
|
||||
}
|
||||
// Load encompasses classloading and coprocessor initialization
|
||||
try {
|
||||
RegionEnvironment env = load(path, className, priority, ourConf);
|
||||
configured.add(env);
|
||||
LOG.info("Loaded coprocessor " + className + " from HTD of " +
|
||||
region.getTableDesc().getTableName().getNameAsString() + " successfully.");
|
||||
} catch (Throwable t) {
|
||||
// Coprocessor failed to load, do we abort on error?
|
||||
if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
|
||||
abortServer(className, t);
|
||||
} else {
|
||||
LOG.error("Failed to load coprocessor " + className, t);
|
||||
}
|
||||
}
|
||||
LOG.info("Load coprocessor " + className + " from HTD of " +
|
||||
region.getTableDesc().getTableName().getNameAsString() +
|
||||
" successfully.");
|
||||
} else {
|
||||
throw new RuntimeException("specification does not match pattern");
|
||||
LOG.error("Malformed table coprocessor specification: key=" + key +
|
||||
", spec: " + spec);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("attribute '" + key +
|
||||
"' has invalid coprocessor specification '" + spec + "'");
|
||||
LOG.warn(StringUtils.stringifyException(ex));
|
||||
} catch (Exception ioe) {
|
||||
LOG.error("Malformed table coprocessor specification: key=" + key +
|
||||
", spec: " + spec);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -247,11 +258,6 @@ public class RegionCoprocessorHost
|
|||
rsServices, classData);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void abortServer(final CoprocessorEnvironment env, final Throwable e) {
|
||||
abortServer("regionserver", rsServices, env, e);
|
||||
}
|
||||
|
||||
/**
|
||||
* HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
|
||||
*
|
||||
|
|
|
@ -39,6 +39,7 @@ public class RegionServerCoprocessorHost extends
|
|||
|
||||
public RegionServerCoprocessorHost(RegionServerServices rsServices,
|
||||
Configuration conf) {
|
||||
super(rsServices);
|
||||
this.rsServices = rsServices;
|
||||
this.conf = conf;
|
||||
// load system default cp's from configuration.
|
||||
|
|
|
@ -73,6 +73,13 @@ public class WALCoprocessorHost
|
|||
* @param conf the configuration
|
||||
*/
|
||||
public WALCoprocessorHost(final FSHLog log, final Configuration conf) {
|
||||
// We don't want to require an Abortable passed down through (FS)HLog, so
|
||||
// this means that a failure to load of a WAL coprocessor won't abort the
|
||||
// server. This isn't ideal, and means that security components that
|
||||
// utilize a WALObserver will have to check the observer initialization
|
||||
// state manually. However, WALObservers will eventually go away so it
|
||||
// should be an acceptable state of affairs.
|
||||
super(null);
|
||||
this.wal = log;
|
||||
// load system default cp's from configuration.
|
||||
loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY);
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -58,6 +59,7 @@ public class TestConstraint {
|
|||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
util = new HBaseTestingUtility();
|
||||
util.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
|
||||
util.startMiniCluster();
|
||||
}
|
||||
|
||||
|
@ -79,13 +81,15 @@ public class TestConstraint {
|
|||
|
||||
util.getHBaseAdmin().createTable(desc);
|
||||
HTable table = new HTable(util.getConfiguration(), tableName);
|
||||
|
||||
// test that we don't fail on a valid put
|
||||
Put put = new Put(row1);
|
||||
byte[] value = Integer.toString(10).getBytes();
|
||||
put.add(dummy, new byte[0], value);
|
||||
table.put(put);
|
||||
|
||||
try {
|
||||
// test that we don't fail on a valid put
|
||||
Put put = new Put(row1);
|
||||
byte[] value = Integer.toString(10).getBytes();
|
||||
put.add(dummy, new byte[0], value);
|
||||
table.put(put);
|
||||
} finally {
|
||||
table.close();
|
||||
}
|
||||
assertTrue(CheckWasRunConstraint.wasRun);
|
||||
}
|
||||
|
||||
|
@ -152,12 +156,14 @@ public class TestConstraint {
|
|||
|
||||
util.getHBaseAdmin().createTable(desc);
|
||||
HTable table = new HTable(util.getConfiguration(), tableName);
|
||||
|
||||
// test that we don't fail because its disabled
|
||||
Put put = new Put(row1);
|
||||
put.add(dummy, new byte[0], "pass".getBytes());
|
||||
table.put(put);
|
||||
|
||||
try {
|
||||
// test that we don't fail because its disabled
|
||||
Put put = new Put(row1);
|
||||
put.add(dummy, new byte[0], "pass".getBytes());
|
||||
table.put(put);
|
||||
} finally {
|
||||
table.close();
|
||||
}
|
||||
assertTrue(CheckWasRunConstraint.wasRun);
|
||||
}
|
||||
|
||||
|
@ -182,13 +188,15 @@ public class TestConstraint {
|
|||
|
||||
util.getHBaseAdmin().createTable(desc);
|
||||
HTable table = new HTable(util.getConfiguration(), tableName);
|
||||
|
||||
// test that we do fail on violation
|
||||
Put put = new Put(row1);
|
||||
put.add(dummy, new byte[0], "pass".getBytes());
|
||||
LOG.warn("Doing put in table");
|
||||
table.put(put);
|
||||
|
||||
try {
|
||||
// test that we do fail on violation
|
||||
Put put = new Put(row1);
|
||||
put.add(dummy, new byte[0], "pass".getBytes());
|
||||
LOG.warn("Doing put in table");
|
||||
table.put(put);
|
||||
} finally {
|
||||
table.close();
|
||||
}
|
||||
assertFalse(CheckWasRunConstraint.wasRun);
|
||||
}
|
||||
|
||||
|
|
|
@ -62,7 +62,6 @@ public class TestClassLoading {
|
|||
static final String cpName4 = "TestCP4";
|
||||
static final String cpName5 = "TestCP5";
|
||||
static final String cpName6 = "TestCP6";
|
||||
static final String cpNameInvalid = "TestCPInvalid";
|
||||
|
||||
private static Class<?> regionCoprocessor1 = ColumnAggregationEndpoint.class;
|
||||
// TOOD: Fix the import of this handler. It is coming in from a package that is far away.
|
||||
|
@ -146,9 +145,6 @@ public class TestClassLoading {
|
|||
// with configuration values
|
||||
htd.setValue("COPROCESSOR$2", jarFileOnHDFS2.toString() + "|" + cpName2 +
|
||||
"|" + Coprocessor.PRIORITY_USER + "|k1=v1,k2=v2,k3=v3");
|
||||
// same jar but invalid class name (should fail to load this class)
|
||||
htd.setValue("COPROCESSOR$3", jarFileOnHDFS2.toString() + "|" + cpNameInvalid +
|
||||
"|" + Coprocessor.PRIORITY_USER);
|
||||
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
|
||||
if (admin.tableExists(tableName)) {
|
||||
if (admin.isTableEnabled(tableName)) {
|
||||
|
@ -164,8 +160,7 @@ public class TestClassLoading {
|
|||
|
||||
// verify that the coprocessors were loaded
|
||||
boolean foundTableRegion=false;
|
||||
boolean found_invalid = true, found1 = true, found2 = true, found2_k1 = true,
|
||||
found2_k2 = true, found2_k3 = true;
|
||||
boolean found1 = true, found2 = true, found2_k1 = true, found2_k2 = true, found2_k3 = true;
|
||||
Map<HRegion, Set<ClassLoader>> regionsActiveClassLoaders =
|
||||
new HashMap<HRegion, Set<ClassLoader>>();
|
||||
MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
|
||||
|
@ -186,9 +181,6 @@ public class TestClassLoading {
|
|||
} else {
|
||||
found2_k1 = found2_k2 = found2_k3 = false;
|
||||
}
|
||||
env = region.getCoprocessorHost().findCoprocessorEnvironment(cpNameInvalid);
|
||||
found_invalid = found_invalid && (env != null);
|
||||
|
||||
regionsActiveClassLoaders
|
||||
.put(region, ((CoprocessorHost) region.getCoprocessorHost()).getExternalClassLoaders());
|
||||
}
|
||||
|
@ -197,8 +189,6 @@ public class TestClassLoading {
|
|||
assertTrue("No region was found for table " + tableName, foundTableRegion);
|
||||
assertTrue("Class " + cpName1 + " was missing on a region", found1);
|
||||
assertTrue("Class " + cpName2 + " was missing on a region", found2);
|
||||
//an invalid CP class name is defined for this table, validate that it is not loaded
|
||||
assertFalse("Class " + cpNameInvalid + " was found on a region", found_invalid);
|
||||
assertTrue("Configuration key 'k1' was missing on a region", found2_k1);
|
||||
assertTrue("Configuration key 'k2' was missing on a region", found2_k2);
|
||||
assertTrue("Configuration key 'k3' was missing on a region", found2_k3);
|
||||
|
@ -460,8 +450,6 @@ public class TestClassLoading {
|
|||
// This was a test for HBASE-4070.
|
||||
// We are removing coprocessors from region load in HBASE-5258.
|
||||
// Therefore, this test now only checks system coprocessors.
|
||||
|
||||
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
|
||||
assertAllRegionServers(regionServerSystemCoprocessors,null);
|
||||
}
|
||||
|
||||
|
|
|
@ -473,6 +473,7 @@ public class TestCoprocessorInterface {
|
|||
1024 * 128);
|
||||
TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster",
|
||||
true);
|
||||
TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
|
||||
|
||||
return TEST_UTIL.getConfiguration();
|
||||
}
|
||||
|
|
|
@ -143,7 +143,7 @@ public class TestMasterCoprocessorExceptionWithAbort {
|
|||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
|
||||
BuggyMasterObserver.class.getName());
|
||||
conf.set("hbase.coprocessor.abortonerror", "true");
|
||||
conf.setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, true);
|
||||
UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
|
|
|
@ -77,6 +77,7 @@ public class TestMasterCoprocessorExceptionWithRemove {
|
|||
private boolean startCalled;
|
||||
private boolean postStartMasterCalled;
|
||||
|
||||
@SuppressWarnings("null")
|
||||
@Override
|
||||
public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> env,
|
||||
HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
|
||||
|
@ -125,6 +126,7 @@ public class TestMasterCoprocessorExceptionWithRemove {
|
|||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
|
||||
BuggyMasterObserver.class.getName());
|
||||
UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
|
||||
UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
|
@ -182,7 +184,7 @@ public class TestMasterCoprocessorExceptionWithRemove {
|
|||
// In this test, there is only a single coprocessor (BuggyMasterObserver).
|
||||
String coprocessorName =
|
||||
BuggyMasterObserver.class.getName();
|
||||
assertTrue(master.getLoadedCoprocessors().contains(coprocessorName));
|
||||
assertTrue(HMaster.getLoadedCoprocessors().contains(coprocessorName));
|
||||
|
||||
HTableDescriptor htd1 = new HTableDescriptor(TableName.valueOf(TEST_TABLE1));
|
||||
htd1.addFamily(new HColumnDescriptor(TEST_FAMILY1));
|
||||
|
@ -209,11 +211,9 @@ public class TestMasterCoprocessorExceptionWithRemove {
|
|||
assertFalse("Master survived coprocessor NPE, as expected.",
|
||||
masterTracker.masterZKNodeWasDeleted);
|
||||
|
||||
String loadedCoprocessors = master.getLoadedCoprocessors();
|
||||
String loadedCoprocessors = HMaster.getLoadedCoprocessors();
|
||||
assertTrue(loadedCoprocessors.contains(coprocessorName));
|
||||
|
||||
|
||||
|
||||
// Verify that BuggyMasterObserver has been removed due to its misbehavior
|
||||
// by creating another table: should not have a problem this time.
|
||||
HTableDescriptor htd2 = new HTableDescriptor(TableName.valueOf(TEST_TABLE2));
|
||||
|
@ -227,4 +227,3 @@ public class TestMasterCoprocessorExceptionWithRemove {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -25,15 +25,14 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.Waiter.Predicate;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
|
@ -51,70 +50,100 @@ public class TestRegionServerCoprocessorExceptionWithAbort {
|
|||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final TableName TABLE_NAME = TableName.valueOf("observed_table");
|
||||
|
||||
@BeforeClass
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
@Test(timeout=60000)
|
||||
public void testExceptionDuringInitialization() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); // Let's fail fast.
|
||||
conf.setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, true);
|
||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
|
||||
TEST_UTIL.startMiniCluster(2);
|
||||
try {
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
// Trigger one regionserver to fail as if it came up with a coprocessor
|
||||
// that fails during initialization
|
||||
final HRegionServer regionServer = cluster.getRegionServer(0);
|
||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
FailedInitializationObserver.class.getName());
|
||||
regionServer.getCoprocessorHost().loadSystemCoprocessors(conf,
|
||||
CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
|
||||
TEST_UTIL.waitFor(10000, 1000, new Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return regionServer.isAborted();
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testExceptionFromCoprocessorDuringPut() throws Exception {
|
||||
// set configure to indicate which cp should be loaded
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); // Let's fail fast.
|
||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, BuggyRegionObserver.class.getName());
|
||||
conf.set("hbase.coprocessor.abortonerror", "true");
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExceptionFromCoprocessorDuringPut()
|
||||
throws IOException, InterruptedException {
|
||||
// When we try to write to TEST_TABLE, the buggy coprocessor will
|
||||
// cause a NullPointerException, which will cause the regionserver (which
|
||||
// hosts the region we attempted to write to) to abort.
|
||||
TableName TEST_TABLE = TABLE_NAME;
|
||||
byte[] TEST_FAMILY = Bytes.toBytes("aaa");
|
||||
|
||||
HTable table = TEST_UTIL.createTable(TEST_TABLE, TEST_FAMILY);
|
||||
TEST_UTIL.createMultiRegions(table, TEST_FAMILY);
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(TEST_TABLE);
|
||||
|
||||
// Note which regionServer will abort (after put is attempted).
|
||||
final HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(TEST_TABLE);
|
||||
|
||||
boolean threwIOE = false;
|
||||
conf.setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, true);
|
||||
TEST_UTIL.startMiniCluster(2);
|
||||
try {
|
||||
final byte[] ROW = Bytes.toBytes("aaa");
|
||||
Put put = new Put(ROW);
|
||||
put.add(TEST_FAMILY, ROW, ROW);
|
||||
table.put(put);
|
||||
table.flushCommits();
|
||||
// We may need two puts to reliably get an exception
|
||||
table.put(put);
|
||||
table.flushCommits();
|
||||
} catch (IOException e) {
|
||||
threwIOE = true;
|
||||
} finally {
|
||||
assertTrue("The regionserver should have thrown an exception", threwIOE);
|
||||
}
|
||||
// When we try to write to TEST_TABLE, the buggy coprocessor will
|
||||
// cause a NullPointerException, which will cause the regionserver (which
|
||||
// hosts the region we attempted to write to) to abort.
|
||||
final byte[] TEST_FAMILY = Bytes.toBytes("aaa");
|
||||
|
||||
// Wait 10 seconds for the regionserver to abort: expected result is that
|
||||
// it will abort.
|
||||
boolean aborted = false;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
aborted = regionServer.isAborted();
|
||||
if (aborted) {
|
||||
break;
|
||||
}
|
||||
HTable table = TEST_UTIL.createTable(TABLE_NAME, TEST_FAMILY);
|
||||
TEST_UTIL.createMultiRegions(table, TEST_FAMILY);
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
|
||||
|
||||
// Note which regionServer will abort (after put is attempted).
|
||||
final HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME);
|
||||
|
||||
boolean threwIOE = false;
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
fail("InterruptedException while waiting for regionserver " +
|
||||
"zk node to be deleted.");
|
||||
final byte[] ROW = Bytes.toBytes("aaa");
|
||||
Put put = new Put(ROW);
|
||||
put.add(TEST_FAMILY, ROW, ROW);
|
||||
table.put(put);
|
||||
table.flushCommits();
|
||||
// We may need two puts to reliably get an exception
|
||||
table.put(put);
|
||||
table.flushCommits();
|
||||
} catch (IOException e) {
|
||||
threwIOE = true;
|
||||
} finally {
|
||||
assertTrue("The regionserver should have thrown an exception", threwIOE);
|
||||
}
|
||||
|
||||
// Wait 10 seconds for the regionserver to abort: expected result is that
|
||||
// it will abort.
|
||||
boolean aborted = false;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
aborted = regionServer.isAborted();
|
||||
if (aborted) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
fail("InterruptedException while waiting for regionserver " +
|
||||
"zk node to be deleted.");
|
||||
}
|
||||
}
|
||||
Assert.assertTrue("The region server should have aborted", aborted);
|
||||
table.close();
|
||||
} finally {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
}
|
||||
|
||||
public static class FailedInitializationObserver extends SimpleRegionObserver {
|
||||
@SuppressWarnings("null")
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment e) throws IOException {
|
||||
// Trigger a NPE to fail the coprocessor
|
||||
Integer i = null;
|
||||
i = i + 1;
|
||||
}
|
||||
Assert.assertTrue("The region server should have aborted", aborted);
|
||||
table.close();
|
||||
}
|
||||
|
||||
public static class BuggyRegionObserver extends SimpleRegionObserver {
|
||||
|
|
|
@ -69,7 +69,8 @@ public class TestRegionServerCoprocessorExceptionWithRemove {
|
|||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
BuggyRegionObserver.class.getName());
|
||||
TEST_UTIL.startMiniCluster(2);
|
||||
TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
|
||||
TEST_UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -49,6 +50,7 @@ public class TestShell {
|
|||
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
|
||||
TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
|
||||
TEST_UTIL.startMiniCluster();
|
||||
|
||||
// Configure jruby runtime
|
||||
|
|
Loading…
Reference in New Issue