HBASE-6571 Generic multi-thread/cross-process error handling framework (Jesse Yates)

The idea for a generic inter-process error-handling framework came
from working on HBASE-6055 (snapshots). Distributed snapshots require tight
time constraints in taking a snapshot to minimize offline time in face of
errors. However, we often need to coordinate errors between processes and
the current Abortable framework is not sufficiently flexible to handle the
multitude of situations that can occur when coordinating between all region
servers, the master and zookeeper. Using this framework error handling for
snapshots was a simple matter, amounting to maybe 200 LOC.
    
This seems to be a generally useful framework and can be used to easily add
inter-process error handling in HBase. The most obvious immediate usage is
as part of HBASE-5487 when coordinating multiple sub-tasks.



git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290@1445770 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Hsieh 2013-02-13 17:55:00 +00:00
parent b251b57390
commit 5f1bc6e52e
25 changed files with 1947 additions and 0 deletions

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Check for errors to a given process.
* @param <E> Type of error that <tt>this</tt> throws if it finds an error
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface ExceptionCheckable<E extends Exception> {
/**
* Checks to see if any process to which the exception checker is bound has created an error that
* would cause a failure.
* @throws E if there has been an error, allowing a fail-fast mechanism
*/
public void failOnError() throws E;
/**
* Non-exceptional form of {@link #failOnError()}. Checks to see if any process to which the
* exception checkers is bound has created an error that would cause a failure.
* @return <tt>true</tt> if there has been an error,<tt>false</tt> otherwise
*/
public boolean checkForError();
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Listen for errors on a process or operation
* @param <E> Type of exception that is expected
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface ExceptionListener<E extends Exception> {
/**
* Receive an error.
* <p>
* Implementers must ensure that this method is thread-safe.
* @param message reason for the error
* @param e exception causing the error
* @param info general information about the error
*/
public void receiveError(String message, E e, Object... info);
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.impl.ExceptionOrchestrator;
/**
* Simple visitor interface to update an error listener with an error notification
* @see ExceptionOrchestrator
* @param <T> Type of listener to update
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface ExceptionVisitor<T> {
/**
* Visit the listener with the given error, possibly transforming or ignoring the error
* @param listener listener to update
* @param message error message
* @param e exception that caused the error
* @param info general information about the error
*/
public void visit(T listener, String message, Exception e, Object... info);
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.impl.ExceptionOrchestratorFactory;
import org.apache.hadoop.hbase.util.Pair;
/**
* Inject faults when classes check to see if an error occurs.
* <p>
* Can be added to any monitoring via
* {@link ExceptionOrchestratorFactory#addFaultInjector(FaultInjector)}
* @see ExceptionListener
* @see ExceptionCheckable
* @param <E> Type of exception that the corresponding {@link ExceptionListener} is expecting
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface FaultInjector<E extends Exception> {
/**
* Called by the specified class whenever checking for process errors. Care needs to be taken when
* using fault injectors to pass the correct size array back or the received error in the listener
* could not receive the correct number of argument and throw an error.
* <p>
* Note that every time the fault injector is called it does not necessarily need to inject a
* fault, but only when the fault is desired.
* @param trace full stack trace of the call to check for an error
* @return the information about the fault that should be returned if there was a fault (expected
* exception to throw and generic error information) or <tt>null</tt> if no fault should
* be injected.
*/
public Pair<E, Object[]> injectFault(StackTraceElement[] trace);
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Base class for an object with a name.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class Name {
private String name;
public Name(String name) {
this.name = name;
}
public void setName(String name) {
this.name = name;
}
/**
* Get the name of the class that should be used for logging
* @return {@link String} prefix for logging
*/
public String getNamePrefixForLog() {
return name != null ? "(" + name + ")" : "";
}
@Override
public String toString() {
return this.name;
}
}

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.exception.OperationAttemptTimeoutException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Time a given process/operation and report a failure if the elapsed time exceeds the max allowed
* time.
* <p>
* The timer won't start tracking time until calling {@link #start()}. If {@link #complete()} or
* {@link #trigger()} is called before {@link #start()}, calls to {@link #start()} will fail.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class OperationAttemptTimer {
private static final Log LOG = LogFactory.getLog(OperationAttemptTimer.class);
private final long maxTime;
private volatile boolean complete;
private final Timer timer;
private final TimerTask timerTask;
private long start = -1;
/**
* Create a generic timer for a task/process.
* @param listener listener to notify if the process times out
* @param maxTime max allowed running time for the process. Timer starts on calls to
* {@link #start()}
* @param info information about the process to pass along if the timer expires
*/
@SuppressWarnings("rawtypes")
public OperationAttemptTimer(final ExceptionListener listener, final long maxTime,
final Object... info) {
this.maxTime = maxTime;
timer = new Timer();
timerTask = new TimerTask() {
@SuppressWarnings("unchecked")
@Override
public void run() {
// ensure we don't run this task multiple times
synchronized (this) {
// quick exit if we already marked the task complete
if (OperationAttemptTimer.this.complete) return;
// mark the task is run, to avoid repeats
OperationAttemptTimer.this.complete = true;
}
long end = EnvironmentEdgeManager.currentTimeMillis();
listener.receiveError("Timeout elapsed!", new OperationAttemptTimeoutException(start, end,
maxTime), info);
}
};
}
/**
* For all time forward, do not throw an error because the process has completed.
*/
public void complete() {
// warn if the timer is already marked complete. This isn't going to be thread-safe, but should
// be good enough and its not worth locking just for a warning.
if (this.complete) {
LOG.warn("Timer already marked completed, ignoring!");
return;
}
LOG.debug("Marking timer as complete - no error notifications will be received for this timer.");
synchronized (this.timerTask) {
this.complete = true;
}
this.timer.cancel();
}
/**
* Start a timer to fail a process if it takes longer than the expected time to complete.
* <p>
* Non-blocking.
* @throws IllegalStateException if the timer has already been marked done via {@link #complete()}
* or {@link #trigger()}
*/
public synchronized void start() throws IllegalStateException {
if (this.start >= 0) {
LOG.warn("Timer already started, can't be started again. Ignoring second request.");
return;
}
LOG.debug("Scheduling process timer to run in: " + maxTime + " ms");
timer.schedule(timerTask, maxTime);
this.start = EnvironmentEdgeManager.currentTimeMillis();
}
/**
* Trigger the timer immediately.
* <p>
* Exposed for testing.
*/
public void trigger() {
synchronized (timerTask) {
if (this.complete) {
LOG.warn("Timer already completed, not triggering.");
return;
}
LOG.debug("Triggering timer immediately!");
this.timer.cancel();
this.timerTask.run();
}
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.exception;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.OperationAttemptTimer;
/**
* Exception for a timeout of a task.
* @see OperationAttemptTimer
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@SuppressWarnings("serial")
public class OperationAttemptTimeoutException extends Exception {
/**
* Exception indicating that an operation attempt has timed out
* @param start time the operation started (ms since epoch)
* @param end time the timeout was triggered (ms since epoch)
* @param allowed max allow amount of time for the operation to complete (ms)
*/
public OperationAttemptTimeoutException(long start, long end, long allowed) {
super("Timeout elapsed! Start:" + start + ", End:" + end + ", diff:" + (end - start) + ", max:"
+ allowed + " ms");
}
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.exception;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.impl.ExceptionSnare;
/**
* Exception when an {@link ExceptionSnare} doens't have an <tt>Exception</tt> when it receives an
* error.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@SuppressWarnings("serial")
public class UnknownErrorException extends RuntimeException {
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionCheckable;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionVisitor;
/**
* The dispatcher acts as a central point of control of error handling. Any exceptions from the
* dispatcher get passed directly to the listeners. Likewise, any errors from the listeners get
* passed to the dispatcher and then back to any listeners.
* <p>
* This is useful, for instance, for informing multiple process in conjunction with an
* {@link Abortable}
* <p>
* This is different than an {@link ExceptionOrchestrator} as it will only propagate an error
* <i>once</i> to all listeners; its single use, just like an {@link ExceptionSnare}. For example,
* if an error is passed to <tt>this</tt> then that error will be passed to all listeners, but a
* second error passed to {@link #receiveError(String, Exception, Object...)} will be ignored. This
* is particularly useful to help avoid accidentally having infinite loops when passing errors.
* <p>
* @param <T> generic exception listener type to update
* @param <E> Type of {@link Exception} to throw when calling {@link #failOnError()}
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ExceptionDispatcher<T, E extends Exception> extends ExceptionOrchestrator<E> implements
ExceptionListener<E>, ExceptionCheckable<E> {
private static final Log LOG = LogFactory.getLog(ExceptionDispatcher.class);
protected final ExceptionVisitor<T> visitor;
private final ExceptionSnare<E> snare = new ExceptionSnare<E>();
public ExceptionDispatcher(String name, ExceptionVisitor<T> visitor) {
super(name);
this.visitor = visitor;
}
public ExceptionDispatcher(ExceptionVisitor<T> visitor) {
this("single-error-dispatcher", visitor);
}
public ExceptionDispatcher() {
this(null);
}
@Override
public synchronized void receiveError(String message, E e, Object... info) {
// if we already have an error, then ignore it
if (snare.checkForError()) return;
LOG.debug(name.getNamePrefixForLog() + "Accepting received error:" + message);
// mark that we got the error
snare.receiveError(message, e, info);
// notify all the listeners
super.receiveError(message, e, info);
}
@Override
public void failOnError() throws E {
snare.failOnError();
}
@Override
public boolean checkForError() {
return snare.checkForError();
}
public ExceptionVisitor<T> getDefaultVisitor() {
return this.visitor;
}
/**
* Add a typed error listener that will be visited by the {@link ExceptionVisitor}, passed in the
* constructor, when receiving errors.
* @param errorable listener for error notifications
*/
public void addErrorListener(T errorable) {
if (this.visitor == null) throw new UnsupportedOperationException("No error visitor for "
+ errorable + ", can't add it to the listeners");
addErrorListener(this.visitor, errorable);
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionVisitor;
import org.apache.hadoop.hbase.server.errorhandling.FaultInjector;
/**
* Generic error dispatcher factory that just creates an error dispatcher on request (potentially
* wrapping with an error injector via the {@link ExceptionOrchestratorFactory}).
* @param <T> Type of generic error listener the dispatchers should handle
* @see ExceptionOrchestratorFactory
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ExceptionDispatcherFactory<T> extends
ExceptionOrchestratorFactory<ExceptionDispatcher<T, Exception>, T> {
/**
* @param visitor to use when building an error handler via {@link #createErrorHandler()}.
*/
public ExceptionDispatcherFactory(ExceptionVisitor<T> visitor) {
super(visitor);
}
@Override
protected ExceptionDispatcher<T, Exception> buildErrorHandler(ExceptionVisitor<T> visitor) {
return new ExceptionDispatcher<T, Exception>(visitor);
}
@Override
protected ExceptionDispatcher<T, Exception> wrapWithInjector(
ExceptionDispatcher<T, Exception> dispatcher,
List<FaultInjector<?>> injectors) {
return new InjectingExceptionDispatcher<ExceptionDispatcher<T, Exception>, T, Exception>(dispatcher,
injectors);
}
}

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionVisitor;
import org.apache.hadoop.hbase.server.errorhandling.Name;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
/**
* The orchestrator acts as a central point of control of error handling. Any exceptions passed to
* <tt>this</tt> get passed directly to the listeners.
* <p>
* Any exception listener added will only be <b>weakly referenced</b>, so you must keep a reference
* to it if you want to use it other places. This allows minimal effort error monitoring, allowing
* you to register an error listener and then not worry about having to unregister the listener.
* <p>
* A single {@link ExceptionOrchestrator} should be used for each set of operation attempts (e.g.
* one parent operation with child operations, potentially multiple levels deep) to monitor. This
* allows for a single source of truth for exception dispatch between all the interested operation
* attempts.
* @param <E> Type of {@link Exception} to expect when receiving errors
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ExceptionOrchestrator<E extends Exception> implements ExceptionListener<E> {
private static final Log LOG = LogFactory.getLog(ExceptionOrchestrator.class);
protected final Name name;
protected final ListMultimap<ExceptionVisitor<?>, WeakReference<?>> listeners = ArrayListMultimap
.create();
/** Error visitor for framework listeners */
final ForwardingErrorVisitor genericVisitor = new ForwardingErrorVisitor();
public ExceptionOrchestrator() {
this("generic-error-dispatcher");
}
public ExceptionOrchestrator(String name) {
this.name = new Name(name);
}
public Name getName() {
return this.name;
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public synchronized void receiveError(String message, E e, Object... info) {
// update all the listeners with the passed error
LOG.debug(name.getNamePrefixForLog() + " Recieved error, notifying listeners...");
List<Pair<ExceptionVisitor<?>, WeakReference<?>>> toRemove = new ArrayList<Pair<ExceptionVisitor<?>, WeakReference<?>>>();
for (Entry<ExceptionVisitor<?>, WeakReference<?>> entry : listeners.entries()) {
Object o = entry.getValue().get();
if (o == null) {
// if the listener doesn't have a reference, then drop it from the list
// need to copy this over b/c guava is finicky with the entries
toRemove.add(new Pair<ExceptionVisitor<?>, WeakReference<?>>(entry.getKey(), entry
.getValue()));
continue;
}
// otherwise notify the listener that we had a failure
((ExceptionVisitor) entry.getKey()).visit(o, message, e, info);
}
// cleanup all visitors that aren't referenced anymore
if (toRemove.size() > 0) LOG.debug(name.getNamePrefixForLog() + " Cleaning up entries.");
for (Pair<ExceptionVisitor<?>, WeakReference<?>> entry : toRemove) {
this.listeners.remove(entry.getFirst(), entry.getSecond());
}
}
/**
* Listen for failures to a given process
* @param visitor pass error notifications onto the typed listener, possibly transforming or
* ignore the error notification
* @param errorable listener for the errors
*/
public synchronized <L> void addErrorListener(ExceptionVisitor<L> visitor, L errorable) {
this.listeners.put(visitor, new WeakReference<L>(errorable));
}
/**
* A simple error visitor that just forwards the received error to a generic listener.
*/
private class ForwardingErrorVisitor implements ExceptionVisitor<ExceptionListener<E>> {
@Override
@SuppressWarnings("unchecked")
public void visit(ExceptionListener<E> listener, String message, Exception e, Object... info) {
listener.receiveError(message, (E) e, info);
}
}
}

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionCheckable;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionVisitor;
import org.apache.hadoop.hbase.server.errorhandling.FaultInjector;
/**
* Error factory that produces an {@link ExceptionOrchestrator}, potentially wrapped with a
* {@link FaultInjector}.
* @param <D> type for {@link ExceptionOrchestrator} that should be used
* @param <T> Type of error listener that the dispatcher from this factory can communicate
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract class ExceptionOrchestratorFactory<D extends ExceptionOrchestrator<?>, T> {
private static final List<FaultInjector<?>> faults = new ArrayList<FaultInjector<?>>();
/**
* Add a fault injector that will run on checks of the {@link ExceptionCheckable} generated by
* this factory. To ensure that faults are injected, this must be called before the the handler is
* created via {@link #createErrorHandler()}.
* <p>
* Exposed for TESTING.
* @param injector fault injector to add
* @param <E> type of exception that will be thrown on checks of
* {@link ExceptionCheckable#failOnError()} from created exception monitors
*/
public static <E extends Exception> void addFaultInjector(FaultInjector<E> injector) {
faults.add(injector);
}
/**
* Complement to {@link #addFaultInjector(FaultInjector)} - removes any existing fault injectors
* set for the factory.
* <p>
* Exposed for TESTING.
*/
public static void clearFaults() {
faults.clear();
}
protected final ExceptionVisitor<T> visitor;
/**
* @param visitor to use when building an error handler via {@link #createErrorHandler()}.
*/
public ExceptionOrchestratorFactory(ExceptionVisitor<T> visitor) {
this.visitor = visitor;
}
/**
* Create a dispatcher with a specific visitor
* @param visitor visitor to pass on error notifications to bound error listeners
* @return an error dispatcher that is passes on errors to all listening objects
*/
public final D createErrorHandler(ExceptionVisitor<T> visitor) {
D handler = buildErrorHandler(visitor);
// wrap with a fault injector, if we need to
if (faults.size() > 0) {
return wrapWithInjector(handler, faults);
}
return handler;
}
/**
* Create a dispatcher with a specific visitor. Uses the default visitor passed in the constructor
* @return an error dispatcher that is passes on errors to all listening objects
*/
public final D createErrorHandler() {
return createErrorHandler(this.visitor);
}
/**
* Build an error handler. This will be wrapped via
* {@link #wrapWithInjector(ErrorMonitorable, List)} if there are fault injectors present.
* @return an error handler
*/
protected abstract D buildErrorHandler(ExceptionVisitor<T> visitor);
/**
* Wrap the built error handler with an error injector. Subclasses should override if they need
* custom error injection. Generally, this will just wrap calls to &ltD&gt by first checking the
* {@link #faults} that were dynamically injected and then, if the {@link FaultInjector} didn't
* inject a fault, that actual methods are called.
* <p>
* This method will only be called if there are fault injectors present. Otherwise, the handler
* will just be built via {@link #buildErrorHandler(ExceptionVisitor)}.
* @param delegate built delegate to wrap with injector checking
* @param injectors injectors that should be checked
* @return a &ltD&gt that also does {@link FaultInjector} checking
*/
protected abstract D wrapWithInjector(D delegate, List<FaultInjector<?>> injectors);
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionCheckable;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.Name;
import org.apache.hadoop.hbase.server.errorhandling.exception.UnknownErrorException;
/**
* Simple exception handler that keeps track of whether of its failure state, and the exception that
* should be thrown based on the received error.
* <p>
* Ensures that an exception is not propagated if an error has already been received, ensuring that
* you don't have infinite error propagation.
* <p>
* You can think of it like a 'one-time-use' {@link ExceptionCheckable}, that once it receives an
* error will not listen to any new error updates.
* <p>
* Thread-safe.
* @param <E> Type of exception to throw when calling {@link #failOnError()}
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ExceptionSnare<E extends Exception> implements ExceptionCheckable<E>,
ExceptionListener<E> {
private static final Log LOG = LogFactory.getLog(ExceptionSnare.class);
private boolean error = false;
protected E exception;
protected Name name;
/**
* Create an exception snare with a generic error name
*/
public ExceptionSnare() {
this.name = new Name("generic-error-snare");
}
@Override
public void failOnError() throws E {
if (checkForError()) {
if (exception == null) throw new UnknownErrorException();
throw exception;
}
}
@Override
public boolean checkForError() {
return this.error;
}
@Override
public void receiveError(String message, E e, Object... info) {
LOG.error(name.getNamePrefixForLog() + "Got an error:" + message + ", info:"
+ Arrays.toString(info));
receiveInternalError(e);
}
/**
* Receive an error notification from internal sources. Can be used by subclasses to set an error.
* <p>
* This method may be called concurrently, so precautions must be taken to not clobber yourself,
* either making the method <tt>synchronized</tt>, synchronizing on <tt>this</tt> of calling this
* method.
* @param e exception that caused the error (can be null).
*/
protected synchronized void receiveInternalError(E e) {
// if we already got the error or we received the error fail fast
if (this.error) return;
// store the error since we haven't seen it before
this.error = true;
this.exception = e;
}
}

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionCheckable;
import org.apache.hadoop.hbase.server.errorhandling.FaultInjector;
import org.apache.hadoop.hbase.server.errorhandling.impl.delegate.DelegatingExceptionDispatcher;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.jasper.compiler.ErrorDispatcher;
/**
* {@link ErrorDispatcher} that delegates calls for all methods, but wraps exception checking to
* allow the fault injectors to have a chance to inject a fault into the running process
* @param <D> {@link ExceptionOrchestrator} to wrap for fault checking
* @param <T> type of generic error listener that should be notified
* @param <E> exception to be thrown on checks of {@link #failOnError()}
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InjectingExceptionDispatcher<D extends ExceptionDispatcher<T, E>, T, E extends Exception> extends
DelegatingExceptionDispatcher<D, T, E> {
private final List<FaultInjector<E>> faults;
/**
* Wrap an exception handler with one that will inject faults on calls to {@link #checkForError()}
* .
* @param delegate base exception handler to wrap
* @param faults injectors to run each time there is a check for an error
*/
@SuppressWarnings("unchecked")
public InjectingExceptionDispatcher(D delegate, List<FaultInjector<?>> faults) {
super(delegate);
// since we don't know the type of fault injector, we need to convert it.
// this is only used in tests, so throwing a class-cast here isn't too bad.
this.faults = new ArrayList<FaultInjector<E>>(faults.size());
for (FaultInjector<?> fault : faults) {
this.faults.add((FaultInjector<E>) fault);
}
}
@Override
public void failOnError() throws E {
// first fail if there is already an error
delegate.failOnError();
// then check for an error via the update mechanism
if (this.checkForError()) delegate.failOnError();
}
/**
* Use the injectors to possibly inject an error into the delegate. Should call
* {@link ExceptionCheckable#checkForError()} or {@link ExceptionCheckable#failOnError()} after calling
* this method on return of <tt>true</tt>.
* @return <tt>true</tt> if an error found via injector or in the delegate, <tt>false</tt>
* otherwise
*/
@Override
public boolean checkForError() {
// if there are fault injectors, run them
if (faults.size() > 0) {
// get the caller of this method. Should be the direct calling class
StackTraceElement[] trace = Thread.currentThread().getStackTrace();
for (FaultInjector<E> injector : faults) {
Pair<E, Object[]> info = injector.injectFault(trace);
if (info != null) {
delegate.receiveError("Injected fail", info.getFirst(), info.getSecond());
}
}
}
return delegate.checkForError();
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl.delegate;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionVisitor;
import org.apache.hadoop.hbase.server.errorhandling.impl.ExceptionDispatcher;
/**
* Helper class for exception handler factories.
* @param <D> Type of delegate to use
* @param <T> type of generic error listener to update
* @param <E> exception to expect for errors
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class DelegatingExceptionDispatcher<D extends ExceptionDispatcher<T, E>, T, E extends Exception>
extends ExceptionDispatcher<T, E> {
protected final D delegate;
public DelegatingExceptionDispatcher(D delegate) {
super("delegate - " + delegate.getName(), delegate.getDefaultVisitor());
this.delegate = delegate;
}
@Override
public ExceptionVisitor<T> getDefaultVisitor() {
return delegate.getDefaultVisitor();
}
@Override
public void receiveError(String message, E e, Object... info) {
delegate.receiveError(message, e, info);
}
@Override
public <L> void addErrorListener(ExceptionVisitor<L> visitor, L errorable) {
delegate.addErrorListener(visitor, errorable);
}
@Override
public void failOnError() throws E {
delegate.failOnError();
}
@Override
public boolean checkForError() {
return delegate.checkForError();
}
@Override
public void addErrorListener(T errorable) {
delegate.addErrorListener(errorable);
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
/**
* Exception thrown from the test
*/
@SuppressWarnings("serial")
public class ExceptionForTesting extends Exception {
public ExceptionForTesting(String msg) {
super(msg);
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
/**
* Utility class for testing error propagation
*/
public class ExceptionTestingUtils {
/**
* Determine if the stack trace contains the given calling class
* @param stack trace to examine
* @param clazz Class to search for
* @return <tt>true</tt> if the stack contains the calling class
*/
public static boolean stackContainsClass(StackTraceElement[] stack, Class<?> clazz) {
String name = clazz.getName();
for (StackTraceElement elem : stack) {
if (elem.getClassName().equals(name)) return true;
}
return false;
}
}

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.util.Pair;
/**
* Decoratable policy for if a fault should be injected for a given stack trace.
* <p>
* Passed in policies are combined with the current policy via a {@link PolicyCombination}.
* <p>
* Using {@link PolicyCombination#AND} means that <i>all</i> policies must agree to inject a fault
* (including the current implemented) before a fault is injected.
* <p>
* Using {@link PolicyCombination#OR} means that if <i>any</i> of the policies may assert that a
* fault be injected, in which case all remaining injectors will be ignored.
* <p>
* Order of operations occurs in reverse order of the operations added via
* {@link #and(FaultInjectionPolicy)} or {@link #or(FaultInjectionPolicy)}. For example, if this is
* the default policy 'a', which we {@link #and(FaultInjectionPolicy)} with 'b' and then 'c', we get
* the following policy chain:
* <p>
* a && (b && c)
* <p>
* Similarly, if this is the default policy 'a', which we {@link #or(FaultInjectionPolicy)} with 'b'
* and then 'c', we get the following policy chain:
* <p>
* a || (b || c).
* <p>
* Naturally, more complex policies can then be built using this style. Suppose we have policy A,
* which is actually the 'and' of two policies, a and b:
* <p>
* A = a && b
* <p>
* and similarly we also have B which is an 'or' of c and d:
* <p>
* B = c || d
* <p>
* then we could combine the two by calling A {@link #and(FaultInjectionPolicy)} B, to get:
* <p>
* A && B = (a && b) && (c || d)
*/
public class FaultInjectionPolicy {
public enum PolicyCombination {
AND, OR;
/**
* Apply the combination to the policy outputs
* @param current current policy value
* @param next next policy to value to consider
* @return <tt>true</tt> if the logical combination is valid, <tt>false</tt> otherwise
*/
public boolean apply(boolean current, boolean next) {
switch (this) {
case AND:
return current && next;
case OR:
return current || next;
default:
throw new IllegalArgumentException("Unrecognized policy!" + this);
}
}
}
private List<Pair<PolicyCombination, FaultInjectionPolicy>> policies = new ArrayList<Pair<PolicyCombination, FaultInjectionPolicy>>();
/**
* And the current chain with another policy.
* <p>
* For example, if this is the default policy 'a', which we {@link #and(FaultInjectionPolicy)}
* with 'b' and then 'c', we get the following policy chain:
* <p>
* a && (b && c)
* @param policy policy to logical AND with the current policies
* @return <tt>this</tt> for chaining
*/
public FaultInjectionPolicy and(FaultInjectionPolicy policy) {
return addPolicy(PolicyCombination.AND, policy);
}
/**
* And the current chain with another policy.
* <p>
* For example, if this is the default policy 'a', which we {@link #or(FaultInjectionPolicy)} with
* 'b' and then 'c', we get the following policy chain:
* <p>
* a || (b || c)
* @param policy policy to logical OR with the current policies
* @return <tt>this</tt> for chaining
*/
public FaultInjectionPolicy or(FaultInjectionPolicy policy) {
return addPolicy(PolicyCombination.OR, policy);
}
private FaultInjectionPolicy addPolicy(PolicyCombination combinator, FaultInjectionPolicy policy) {
policies.add(new Pair<PolicyCombination, FaultInjectionPolicy>(combinator, policy));
return this;
}
/**
* Check to see if this, or any of the policies this decorates, find that a fault should be
* injected .
* @param stack
* @return <tt>true</tt> if a fault should be injected, <tt>false</tt> otherwise.
*/
public final boolean shouldFault(StackTraceElement[] stack) {
boolean current = checkForFault(stack);
return eval(current, policies, stack);
}
/**
* @param current
* @param policies2
* @param stack
* @return
*/
private boolean eval(boolean current,
List<Pair<PolicyCombination, FaultInjectionPolicy>> policies, StackTraceElement[] stack) {
// base condition: if there are no more to evaluate, the comparison is the last
if (policies.size() == 0) return current;
// otherwise we have to evaluate the rest of chain
Pair<PolicyCombination, FaultInjectionPolicy> policy = policies.get(0);
boolean next = policy.getSecond().shouldFault(stack);
return policy.getFirst()
.apply(current, eval(next, policies.subList(1, policies.size()), stack));
}
/**
* Check to see if we should generate a fault for the given stacktrace.
* <p>
* Subclass hook for providing custom fault checking behavior
* @param stack
* @return if a fault should be injected for this error check request. <tt>false</tt> by default
*/
protected boolean checkForFault(StackTraceElement[] stack) {
return false;
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.server.errorhandling.FaultInjector;
import org.apache.hadoop.hbase.util.Pair;
/**
* Fault injector that can take a policy for when to inject a fault
* @param <E> type of exception that should be returned
*/
public abstract class PoliciedFaultInjector<E extends Exception> implements FaultInjector<E> {
private static final Log LOG = LogFactory.getLog(PoliciedFaultInjector.class);
private FaultInjectionPolicy policy;
public PoliciedFaultInjector(FaultInjectionPolicy policy) {
this.policy = policy;
}
@Override
public final Pair<E, Object[]> injectFault(StackTraceElement[] trace) {
if (policy.shouldFault(trace)) {
return this.getInjectedError(trace);
}
LOG.debug("NOT injecting fault, stack:" + Arrays.toString(Arrays.copyOfRange(trace, 3, 6)));
return null;
}
/**
* Get the error that should be returned to the caller when the {@link FaultInjectionPolicy}
* determines we need to inject a fault
* @param trace trace for which the {@link FaultInjectionPolicy} specified we should have an error
* @return the information about the fault that should be returned if there was a fault, null
* otherwise
*/
protected abstract Pair<E, Object[]> getInjectedError(StackTraceElement[] trace);
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
/**
* Simple error listener that can be checked to see if it received an error ({@link #error}) and the
* information about the error received ({@link #info}).
*/
@SuppressWarnings("javadoc")
public class SimpleErrorListener<E extends Exception> implements ExceptionListener<E> {
public boolean error = false;
public Object[] info = null;
@Override
public void receiveError(String message, Exception e, Object... info) {
this.error = true;
this.info = info;
}
}

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.util.Arrays;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.OperationAttemptTimer;
import org.apache.hadoop.hbase.server.errorhandling.exception.OperationAttemptTimeoutException;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
/**
* Test that we propagate errors through an orchestrator as expected
*/
@Category(SmallTests.class)
@SuppressWarnings({ "rawtypes", "unchecked" })
public class TestExceptionOrchestrator {
@Test
public void testErrorPropagation() {
ExceptionListener listener1 = Mockito.mock(ExceptionListener.class);
ExceptionListener listener2 = Mockito.mock(ExceptionListener.class);
ExceptionOrchestrator<Exception> orchestrator = new ExceptionOrchestrator<Exception>();
// add the listeners
orchestrator.addErrorListener(orchestrator.genericVisitor, listener1);
orchestrator.addErrorListener(orchestrator.genericVisitor, listener2);
// create an artificial error
String message = "Some error";
Object[] info = new Object[] { "info1" };
Exception e = new ExceptionForTesting("error");
orchestrator.receiveError(message, e, info);
// make sure the listeners got the error
Mockito.verify(listener1, Mockito.times(1)).receiveError(message, e, info);
Mockito.verify(listener2, Mockito.times(1)).receiveError(message, e, info);
// push another error, which should be passed to listeners
message = "another error";
e = new ExceptionForTesting("hello");
info[0] = "info2";
orchestrator.receiveError(message, e, info);
Mockito.verify(listener1, Mockito.times(1)).receiveError(message, e, info);
Mockito.verify(listener2, Mockito.times(1)).receiveError(message, e, info);
// now create a timer and check for that error
info[0] = "timer";
OperationAttemptTimer timer = new OperationAttemptTimer(orchestrator, 1000, info);
timer.start();
timer.trigger();
// make sure that we got the timer error
Mockito.verify(listener1, Mockito.times(1)).receiveError(Mockito.anyString(),
Mockito.any(OperationAttemptTimeoutException.class),
Mockito.argThat(new VarArgMatcher<Object>(Object.class, info)));
Mockito.verify(listener2, Mockito.times(1)).receiveError(Mockito.anyString(),
Mockito.any(OperationAttemptTimeoutException.class),
Mockito.argThat(new VarArgMatcher<Object>(Object.class, info)));
}
/**
* Matcher that matches var-args elements
* @param <T> Type of args to match
*/
private static class VarArgMatcher<T> extends BaseMatcher<T> {
private T[] expected;
private Class<T> clazz;
private String reason;
/**
* Setup the matcher to expect args of the given type
* @param clazz type of args to expect
* @param expected expected arguments
*/
public VarArgMatcher(Class<T> clazz, T... expected) {
this.expected = expected;
this.clazz = clazz;
}
@Override
public boolean matches(Object arg0) {
// null check early exit
if (expected == null && arg0 == null) return true;
// single arg matching
if (clazz.isAssignableFrom(arg0.getClass())) {
if (expected.length == 1) {
if (arg0.equals(expected[0])) return true;
reason = "single argument received, but didn't match argument";
} else {
reason = "single argument received, but expected array of args, size = "
+ expected.length;
}
} else if (arg0.getClass().isArray()) {
// array matching
try {
T[] arg = (T[]) arg0;
if (Arrays.equals(expected, arg)) return true;
reason = "Array of args didn't match expected";
} catch (Exception e) {
reason = "Exception while matching arguments:" + e.getMessage();
}
} else reason = "Objet wasn't the same as passed class or not an array";
// nothing worked - fail
return false;
}
@Override
public void describeTo(Description arg0) {
arg0.appendText(reason);
}
}
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionCheckable;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionVisitor;
import org.apache.hadoop.hbase.server.errorhandling.FaultInjector;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
/**
* Test that we can correctly inject faults for testing
*/
@Category(SmallTests.class)
@SuppressWarnings({ "rawtypes", "unchecked" })
public class TestFaultInjecting {
private static final Log LOG = LogFactory.getLog(TestFaultInjecting.class);
public static final ExceptionVisitor<ExceptionListener> VISITOR = new ExceptionVisitor<ExceptionListener>() {
@Override
public void visit(ExceptionListener listener, String message, Exception e, Object... info) {
listener.receiveError(message, e, info);
}
};
@Test
public void testSimpleFaultInjection() {
ExceptionDispatcherFactory<ExceptionListener> factory = Mockito
.spy(new ExceptionDispatcherFactory<ExceptionListener>(TestFaultInjecting.VISITOR));
ExceptionDispatcher<ExceptionListener, Exception> dispatcher = new ExceptionDispatcher<ExceptionListener, Exception>();
Mockito.when(factory.buildErrorHandler(VISITOR)).thenReturn(dispatcher);
String info = "info";
ExceptionOrchestratorFactory.addFaultInjector(new StringFaultInjector(info));
ExceptionCheckable<Exception> monitor = factory.createErrorHandler();
// make sure we wrap the dispatcher with the fault injection
assertNotSame(dispatcher, monitor);
// test that we actually inject a fault
assertTrue("Monitor didn't get an injected error", monitor.checkForError());
try {
monitor.failOnError();
fail("Monitor didn't get an exception from the fault injected in the factory.");
} catch (ExceptionForTesting e) {
LOG.debug("Correctly got an exception from the test!");
} catch (Exception e) {
fail("Got an unexpected exception:" + e);
}
}
/**
* Fault injector that will always throw a string error
*/
public static class StringFaultInjector implements FaultInjector<ExceptionForTesting> {
private final String info;
public StringFaultInjector(String info) {
this.info = info;
}
@Override
public Pair<ExceptionForTesting, Object[]> injectFault(StackTraceElement[] trace) {
if (ExceptionTestingUtils.stackContainsClass(trace, TestFaultInjecting.class)) {
return new Pair<ExceptionForTesting, Object[]>(new ExceptionForTesting(
"injected!"), new String[] { info });
}
return null;
}
}
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test the fault injection policies and combinations
*/
@Category(SmallTests.class)
public class TestFaultInjectionPolicies {
@Test
public void testAndCombination() {
FaultInjectionPolicy alwaysFalse = new FaultInjectionPolicy();
assertFalse("Default policy isn't false", alwaysFalse.shouldFault(null));
FaultInjectionPolicy alwaysTrue = new AlwaysTrue();
FaultInjectionPolicy andTrue = new AlwaysTrue().or(alwaysTrue).or(alwaysTrue);
assertTrue("And True isn't always returning true", andTrue.shouldFault(null));
FaultInjectionPolicy andFalse = new FaultInjectionPolicy().and(alwaysTrue);
assertFalse("false AND true", andFalse.shouldFault(null));
assertFalse("true AND false", alwaysTrue.and(alwaysFalse).shouldFault(null));
assertFalse("true AND (false AND true)",
new AlwaysTrue().and(new FaultInjectionPolicy().and(new AlwaysTrue())).shouldFault(null));
assertFalse("(true AND false AND true)",
new AlwaysTrue().and(new FaultInjectionPolicy()).and(new AlwaysTrue()).shouldFault(null));
}
@Test
public void testORCombination() {
FaultInjectionPolicy alwaysTrue = new AlwaysTrue();
FaultInjectionPolicy andTrue = new AlwaysTrue().or(alwaysTrue).or(alwaysTrue);
assertTrue("OR True isn't always returning true", andTrue.shouldFault(null));
FaultInjectionPolicy andFalse = new FaultInjectionPolicy().or(alwaysTrue);
assertTrue("Combination of true OR false should be true", andFalse.shouldFault(null));
assertTrue("Combining multiple ands isn't correct",
new FaultInjectionPolicy().or(andTrue).or(andFalse).shouldFault(null));
}
@Test
public void testMixedAndOr() {
assertTrue("true AND (false OR true)",
new AlwaysTrue().and(new FaultInjectionPolicy().or(new AlwaysTrue())).shouldFault(null));
assertTrue("(true AND false) OR true",
new AlwaysTrue().or(new AlwaysTrue().and(new FaultInjectionPolicy())).shouldFault(null));
assertFalse(
"(true AND false) OR false",
new FaultInjectionPolicy().or(new AlwaysTrue().and(new FaultInjectionPolicy())).shouldFault(
null));
}
private static class AlwaysTrue extends FaultInjectionPolicy {
protected boolean checkForFault(StackTraceElement[] stack) {
return true;
}
}
public static class SimplePolicyFaultInjector extends PoliciedFaultInjector<Exception> {
public SimplePolicyFaultInjector(FaultInjectionPolicy policy) {
super(policy);
}
@Override
protected Pair<Exception, Object[]> getInjectedError(StackTraceElement[] trace) {
return new Pair<Exception, Object[]>(new RuntimeException("error"), null);
}
}
}

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.OperationAttemptTimer;
import org.apache.hadoop.hbase.server.errorhandling.exception.OperationAttemptTimeoutException;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
/**
* Test the {@link OperationAttemptTimer} to ensure we fulfill contracts
*/
@Category(SmallTests.class)
@SuppressWarnings("unchecked")
public class TestOperationAttemptTimer {
private static final Log LOG = LogFactory.getLog(TestOperationAttemptTimer.class);
@Test(timeout = 1000)
public void testTimerTrigger() {
final long time = 10000000;
ExceptionListener<Exception> listener = Mockito.mock(ExceptionListener.class);
OperationAttemptTimer timer = new OperationAttemptTimer(listener, time);
timer.start();
timer.trigger();
Mockito.verify(listener, Mockito.times(1)).receiveError(Mockito.anyString(),
Mockito.any(OperationAttemptTimeoutException.class));
}
@Test
public void testTimerPassesOnErrorInfo() {
final long time = 10;
ExceptionListener<Exception> listener = Mockito.mock(ExceptionListener.class);
final Object[] data = new Object[] { "data" };
OperationAttemptTimer timer = new OperationAttemptTimer(listener, time, data);
timer.start();
timer.trigger();
Mockito.verify(listener).receiveError(Mockito.anyString(),
Mockito.any(OperationAttemptTimeoutException.class), Mockito.eq(data[0]));
}
@Test(timeout = 1000)
public void testStartAfterComplete() throws InterruptedException {
final long time = 10;
ExceptionListener<Exception> listener = Mockito.mock(ExceptionListener.class);
OperationAttemptTimer timer = new OperationAttemptTimer(listener, time);
timer.complete();
try {
timer.start();
fail("Timer should fail to start after complete.");
} catch (IllegalStateException e) {
LOG.debug("Correctly failed timer: " + e.getMessage());
}
Thread.sleep(time + 1);
Mockito.verifyZeroInteractions(listener);
}
@Test(timeout = 1000)
public void testStartAfterTrigger() throws InterruptedException {
final long time = 10;
ExceptionListener<Exception> listener = Mockito.mock(ExceptionListener.class);
OperationAttemptTimer timer = new OperationAttemptTimer(listener, time);
timer.trigger();
try {
timer.start();
fail("Timer should fail to start after complete.");
} catch (IllegalStateException e) {
LOG.debug("Correctly failed timer: " + e.getMessage());
}
Thread.sleep(time * 2);
Mockito.verify(listener, Mockito.times(1)).receiveError(Mockito.anyString(),
Mockito.any(OperationAttemptTimeoutException.class));
Mockito.verifyNoMoreInteractions(listener);
}
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.OperationAttemptTimer;
import org.apache.hadoop.hbase.server.errorhandling.exception.OperationAttemptTimeoutException;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
/**
* Test using the single error dispatcher
*/
@SuppressWarnings("unchecked")
@Category(SmallTests.class)
public class TestSingleExceptionDispatcher {
private static final Log LOG = LogFactory.getLog(TestSingleExceptionDispatcher.class);
@Test
public void testErrorPropagation() {
ExceptionListener<Exception> listener1 = Mockito.mock(ExceptionListener.class);
ExceptionListener<Exception> listener2 = Mockito.mock(ExceptionListener.class);
ExceptionDispatcher<? extends ExceptionListener<Exception>, Exception> monitor = new ExceptionDispatcher<ExceptionListener<Exception>, Exception>();
// add the listeners
monitor.addErrorListener(monitor.genericVisitor, listener1);
monitor.addErrorListener(monitor.genericVisitor, listener2);
// create an artificial error
String message = "Some error";
Exception expected = new ExceptionForTesting("error");
Object info = "info1";
monitor.receiveError(message, expected, info);
// make sure the listeners got the error
Mockito.verify(listener1).receiveError(message, expected, info);
Mockito.verify(listener2).receiveError(message, expected, info);
// make sure that we get an exception
try {
monitor.failOnError();
fail("Monitor should have thrown an exception after getting error.");
} catch (Exception e) {
assertTrue("Got an unexpected exception:" + e, e instanceof ExceptionForTesting);
LOG.debug("Got the testing exception!");
}
// push another error, but this shouldn't be passed to the listeners
monitor.receiveError("another error", new ExceptionForTesting("hello"),
"shouldn't be found");
// make sure we don't re-propagate the error
Mockito.verifyNoMoreInteractions(listener1, listener2);
}
@Test
public void testSingleDispatcherWithTimer() {
ExceptionListener<Exception> listener1 = Mockito.mock(ExceptionListener.class);
ExceptionListener<Exception> listener2 = Mockito.mock(ExceptionListener.class);
ExceptionDispatcher<? extends ExceptionListener<Exception>, Exception> monitor = new ExceptionDispatcher<ExceptionListener<Exception>, Exception>();
// add the listeners
monitor.addErrorListener(monitor.genericVisitor, listener1);
monitor.addErrorListener(monitor.genericVisitor, listener2);
Object info = "message";
OperationAttemptTimer timer = new OperationAttemptTimer(monitor, 1000, info);
timer.start();
timer.trigger();
assertTrue("Monitor didn't get timeout", monitor.checkForError());
// verify that that we propagated the error
Mockito.verify(listener1).receiveError(Mockito.anyString(),
Mockito.any(OperationAttemptTimeoutException.class), Mockito.eq(info));
Mockito.verify(listener2).receiveError(Mockito.anyString(),
Mockito.any(OperationAttemptTimeoutException.class), Mockito.eq(info));
}
@Test
public void testAddListenerWithoutVisitor() {
SimpleErrorListener<Exception> listener = new SimpleErrorListener<Exception>();
ExceptionDispatcher<SimpleErrorListener<Exception>, Exception> monitor = new ExceptionDispatcher<SimpleErrorListener<Exception>, Exception>();
try {
monitor.addErrorListener(listener);
fail("Monitor needs t have a visitor for adding generically typed listeners");
} catch (UnsupportedOperationException e) {
LOG.debug("Correctly failed to add listener without visitor: " + e.getMessage());
}
}
}