HBASE-7206 ForeignException framework v2 (simplifies and replaces HBASE-6571)

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290@1445798 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Hsieh 2013-02-13 18:21:26 +00:00
parent eb9353b244
commit 3ca47fab92
19 changed files with 3110 additions and 761 deletions

View File

@ -0,0 +1,218 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.errorhandling;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.GenericExceptionMessage;
import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.StackTraceElementMessage;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* A ForeignException is an exception from another thread or process.
* <p>
* ForeignExceptions are sent to 'remote' peers to signal an abort in the face of failures.
* When serialized for transmission we encode using Protobufs to ensure version compatibility.
* <p>
* Foreign exceptions contain a Throwable as its cause. This can be a "regular" exception
* generated locally or a ProxyThrowable that is a representation of the original exception
* created on original 'remote' source. These ProxyThrowables have their their stacks traces and
* messages overridden to reflect the original 'remote' exception. The only way these
* ProxyThrowables are generated are by this class's {@link #deserialize(byte[])} method.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@SuppressWarnings("serial")
public class ForeignException extends IOException {
/**
* Name of the throwable's source such as a host or thread name. Must be non-null.
*/
private final String source;
/**
* Name of the original throwable's class. Must be non-null.
*/
private final String clazz;
/**
* Create a new ForeignException that can be serialized. It is assumed that this came from a
* remote source.
* @param source
* @param cause
*/
private ForeignException(String source, String clazz, ProxyThrowable cause) {
super(cause);
assert source != null;
assert cause != null;
assert clazz != null;
this.source = source;
this.clazz = clazz;
}
/**
* Create a new ForeignException that can be serialized. It is assumed that this came form a
* local source.
* @param source
* @param cause
*/
public ForeignException(String source, Throwable cause) {
super(cause);
assert source != null;
assert cause != null;
this.source = source;
this.clazz = getCause().getClass().getName();
}
/**
* Create a new ForeignException that can be serialized. It is assumed that this is locally
* generated.
* @param source
* @param msg
*/
public ForeignException(String source, String msg) {
super(new IllegalArgumentException(msg));
this.source = source;
this.clazz = getCause().getClass().getName();
}
public String getSource() {
return source;
}
public String getSourceClass() {
return clazz;
}
/**
* The cause of a ForeignException can be an exception that was generated on a local in process
* thread, or a thread from a 'remote' separate process.
*
* If the cause is a ProxyThrowable, we know it came from deserialization which usually means
* it came from not only another thread, but also from a remote thread.
*
* @return true if went through deserialization, false if locally generated
*/
public boolean isRemote() {
return getCause() instanceof ProxyThrowable;
}
@Override
public String toString() {
return clazz + " via " + getSource() + ":" + getLocalizedMessage();
}
/**
* Convert a stack trace to list of {@link StackTraceElement}.
* @param stackTrace the stack trace to convert to protobuf message
* @return <tt>null</tt> if the passed stack is <tt>null</tt>.
*/
private static List<StackTraceElementMessage> toStackTraceElementMessages(
StackTraceElement[] trace) {
// if there is no stack trace, ignore it and just return the message
if (trace == null) return null;
// build the stack trace for the message
List<StackTraceElementMessage> pbTrace =
new ArrayList<StackTraceElementMessage>(trace.length);
for (StackTraceElement elem : trace) {
StackTraceElementMessage.Builder stackBuilder = StackTraceElementMessage.newBuilder();
stackBuilder.setDeclaringClass(elem.getClassName());
stackBuilder.setFileName(elem.getFileName());
stackBuilder.setLineNumber(elem.getLineNumber());
stackBuilder.setMethodName(elem.getMethodName());
pbTrace.add(stackBuilder.build());
}
return pbTrace;
}
/**
* This is a Proxy Throwable that contains the information of the original remote exception
*/
private static class ProxyThrowable extends Throwable {
ProxyThrowable(String msg, StackTraceElement[] trace) {
super(msg);
this.setStackTrace(trace);
}
}
/**
* Converts an ForeignException to a array of bytes.
* @param source the name of the external exception source
* @param ee the "local" external exception (local)
* @return protobuf serialized version of ForeignThreadException
*/
public static byte[] serialize(String source, Throwable t) {
GenericExceptionMessage.Builder gemBuilder = GenericExceptionMessage.newBuilder();
gemBuilder.setClassName(t.getClass().getName());
if (t.getMessage() != null) {
gemBuilder.setMessage(t.getMessage());
}
// set the stack trace, if there is one
List<StackTraceElementMessage> stack = ForeignException.toStackTraceElementMessages(t.getStackTrace());
if (stack != null) {
gemBuilder.addAllTrace(stack);
}
GenericExceptionMessage payload = gemBuilder.build();
ForeignExceptionMessage.Builder exception = ForeignExceptionMessage.newBuilder();
exception.setGenericException(payload).setSource(source);
ForeignExceptionMessage eem = exception.build();
return eem.toByteArray();
}
/**
* Takes a series of bytes and tries to generate an ForeignException instance for it.
* @param bytes
* @return the ExternalExcpetion instance
* @throws InvalidProtocolBufferException if there was deserialization problem this is thrown.
*/
public static ForeignException deserialize(byte[] bytes) throws InvalidProtocolBufferException {
// figure out the data we need to pass
ForeignExceptionMessage eem = ForeignExceptionMessage.parseFrom(bytes);
GenericExceptionMessage gem = eem.getGenericException();
StackTraceElement [] trace = ForeignException.toStack(gem.getTraceList());
ProxyThrowable dfe = new ProxyThrowable(gem.getMessage(), trace);
ForeignException e = new ForeignException(eem.getSource(), gem.getClassName(), dfe);
return e;
}
/**
* Unwind a serialized array of {@link StackTraceElementMessage}s to a
* {@link StackTraceElement}s.
* @param traceList list that was serialized
* @return the deserialized list or <tt>null</tt> if it couldn't be unwound (e.g. wasn't set on
* the sender).
*/
private static StackTraceElement[] toStack(List<StackTraceElementMessage> traceList) {
if (traceList == null || traceList.size() == 0) {
return new StackTraceElement[0]; // empty array
}
StackTraceElement[] trace = new StackTraceElement[traceList.size()];
for (int i = 0; i < traceList.size(); i++) {
StackTraceElementMessage elem = traceList.get(i);
trace[i] = new StackTraceElement(
elem.getDeclaringClass(), elem.getMethodName(), elem.getFileName(), elem.getLineNumber());
}
return trace;
}
}

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.errorhandling;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* The dispatcher acts as the state holding entity for foreign error handling. The first
* exception received by the dispatcher get passed directly to the listeners. Subsequent
* exceptions are dropped.
* <p>
* If there are multiple dispatchers that are all in the same foreign exception monitoring group,
* ideally all these monitors are "peers" -- any error on one dispatcher should get propagated to
* all others (via rpc, or some other mechanism). Due to racing error conditions the exact reason
* for failure may be different on different peers, but the fact that they are in error state
* should eventually hold on all.
* <p>
* This is thread-safe and must be because this is expected to be used to propagate exceptions
* from foreign threads.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ForeignExceptionDispatcher implements ForeignExceptionListener, ForeignExceptionSnare {
public static final Log LOG = LogFactory.getLog(ForeignExceptionDispatcher.class);
protected final String name;
protected final List<ForeignExceptionListener> listeners = new ArrayList<ForeignExceptionListener>();
private ForeignException exception;
public ForeignExceptionDispatcher(String name) {
this.name = name;
}
public ForeignExceptionDispatcher() {
this("");
}
public String getName() {
return name;
}
public synchronized void receive(String message) {
receive(new ForeignException(name, message));
}
public synchronized void receive(ForeignException e) {
receive(e);
}
@Override
public synchronized void receive(String message, ForeignException e) {
// if we already have an exception, then ignore it
if (exception != null) return;
LOG.debug(name + " accepting received error:" + message);
// mark that we got the error
if (e != null) {
exception = e;
} else {
exception = new ForeignException(name, message);
}
// notify all the listeners
dispatch(message, e);
}
@Override
public void rethrowException() throws ForeignException {
if (exception != null) {
throw exception;
}
}
@Override
public boolean hasException() {
return exception != null;
}
@Override
synchronized public ForeignException getException() {
return exception;
}
/**
* Sends an exception to all listeners.
* @param message human readable message passed to the listener
* @param e {@link ForeignException} containing the cause. Can be null.
*/
private void dispatch(String message, ForeignException e) {
// update all the listeners with the passed error
LOG.debug(name + " Recieved error, notifying listeners...");
for (ForeignExceptionListener l: listeners) {
l.receive(message, e);
}
}
/**
* Listen for failures to a given process. This method should only be used during
* initialization and not added to after exceptions are accepted.
* @param errorable listener for the errors. may be null.
*/
public synchronized void addListener(ForeignExceptionListener errorable) {
this.listeners.add(errorable);
}
}

View File

@ -15,24 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
package org.apache.hadoop.hbase.errorhandling;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Utility class for testing error propagation
* The ForeignExceptionListener is an interface for objects that can receive a ForeignException.
* <p>
* Implementations must be thread-safe, because this is expected to be used to propagate exceptions
* from foreign threads.
*/
public class ExceptionTestingUtils {
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface ForeignExceptionListener {
/**
* Determine if the stack trace contains the given calling class
* @param stack trace to examine
* @param clazz Class to search for
* @return <tt>true</tt> if the stack contains the calling class
* Receive a ForeignException.
* <p>
* Implementers must ensure that this method is thread-safe.
* @param message reason for the error
* @param e exception causing the error. Implementations must accept and handle null here.
*/
public static boolean stackContainsClass(StackTraceElement[] stack, Class<?> clazz) {
String name = clazz.getName();
for (StackTraceElement elem : stack) {
if (elem.getClassName().equals(name)) return true;
}
return false;
}
public void receive(String message, ForeignException e);
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.errorhandling;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This is an interface for a cooperative exception throwing mechanism. Implementations are
* containers that holds an exception from a separate thread. This can be used to receive
* exceptions from 'foreign' threads or from separate 'foreign' processes.
* <p>
* To use, one would pass an implementation of this object to a long running method and
* periodically check by calling {@link #rethrowException()}. If any foreign exceptions have
* been received, the calling thread is then responsible for handling the rethrown exception.
* <p>
* One could use the boolean {@link #hasException()} to determine if there is an exceptoin as well.
* <p>
* NOTE: This is very similar to the InterruptedException/interrupt/interrupted pattern. There,
* the notification state is bound to a Thread. Using this, applications receive Exceptions in
* the snare. The snare is referenced and checked by multiple threads which enables exception
* notification in all the involved threads/processes.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface ForeignExceptionSnare {
/**
* Rethrow an exception currently held by the {@link ForeignExceptionSnare}. If there is
* no exception this is a no-op
*
* @throws ForeignException
* all exceptions from remote sources are procedure exceptions
*/
public void rethrowException() throws ForeignException;
/**
* Non-exceptional form of {@link #rethrowException()}. Checks to see if any
* process to which the exception checkers is bound has created an error that
* would cause a failure.
*
* @return <tt>true</tt> if there has been an error,<tt>false</tt> otherwise
*/
public boolean hasException();
/**
* Get the value of the captured exception.
*
* @return the captured foreign exception or null if no exception captured.
*/
public ForeignException getException();
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.errorhandling;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Exception for a timeout of a task.
* @see TimeoutExceptionInjector
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@SuppressWarnings("serial")
public class TimeoutException extends Exception {
private final String sourceName;
private final long start;
private final long end;
private final long expected;
/**
* Exception indicating that an operation attempt has timed out
* @param start time the operation started (ms since epoch)
* @param end time the timeout was triggered (ms since epoch)
* @param expected expected amount of time for the operation to complete (ms) (ideally, expected <= end-start)
*/
public TimeoutException(String sourceName, long start, long end, long expected) {
super("Timeout elapsed! Source:" + sourceName + " Start:" + start + ", End:" + end
+ ", diff:" + (end - start) + ", max:" + expected + " ms");
this.sourceName = sourceName;
this.start = start;
this.end = end;
this.expected = expected;
}
public long getStart() {
return start;
}
public long getEnd() {
return end;
}
public long getMaxAllowedOperationTime() {
return expected;
}
public String getSourceName() {
return sourceName;
}
}

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.errorhandling;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Time a given process/operation and report a failure if the elapsed time exceeds the max allowed
* time.
* <p>
* The timer won't start tracking time until calling {@link #start()}. If {@link #complete()} or
* {@link #trigger()} is called before {@link #start()}, calls to {@link #start()} will fail.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class TimeoutExceptionInjector {
private static final Log LOG = LogFactory.getLog(TimeoutExceptionInjector.class);
private final long maxTime;
private volatile boolean complete;
private final Timer timer;
private final TimerTask timerTask;
private long start = -1;
/**
* Create a generic timer for a task/process.
* @param listener listener to notify if the process times out
* @param maxTime max allowed running time for the process. Timer starts on calls to
* {@link #start()}
* @param info information about the process to pass along if the timer expires
*/
public TimeoutExceptionInjector(final ForeignExceptionListener listener, final long maxTime) {
this.maxTime = maxTime;
timer = new Timer();
timerTask = new TimerTask() {
@Override
public void run() {
// ensure we don't run this task multiple times
synchronized (this) {
// quick exit if we already marked the task complete
if (TimeoutExceptionInjector.this.complete) return;
// mark the task is run, to avoid repeats
TimeoutExceptionInjector.this.complete = true;
}
long end = EnvironmentEdgeManager.currentTimeMillis();
TimeoutException tee = new TimeoutException(
"Timeout caused Foreign Exception", start, end, maxTime);
String source = "timer-" + timer;
listener.receive("Timeout elapsed!", new ForeignException(source, tee));
}
};
}
public long getMaxTime() {
return maxTime;
}
/**
* For all time forward, do not throw an error because the process has completed.
*/
public void complete() {
// warn if the timer is already marked complete. This isn't going to be thread-safe, but should
// be good enough and its not worth locking just for a warning.
if (this.complete) {
LOG.warn("Timer already marked completed, ignoring!");
return;
}
LOG.debug("Marking timer as complete - no error notifications will be received for this timer.");
synchronized (this.timerTask) {
this.complete = true;
}
this.timer.cancel();
}
/**
* Start a timer to fail a process if it takes longer than the expected time to complete.
* <p>
* Non-blocking.
* @throws IllegalStateException if the timer has already been marked done via {@link #complete()}
* or {@link #trigger()}
*/
public synchronized void start() throws IllegalStateException {
if (this.start >= 0) {
LOG.warn("Timer already started, can't be started again. Ignoring second request.");
return;
}
LOG.debug("Scheduling process timer to run in: " + maxTime + " ms");
timer.schedule(timerTask, maxTime);
this.start = EnvironmentEdgeManager.currentTimeMillis();
}
/**
* Trigger the timer immediately.
* <p>
* Exposed for testing.
*/
public void trigger() {
synchronized (timerTask) {
if (this.complete) {
LOG.warn("Timer already completed, not triggering.");
return;
}
LOG.debug("Triggering timer immediately!");
this.timer.cancel();
this.timerTask.run();
}
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This file contains protocol buffers that used to error handling
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
option java_outer_classname = "ErrorHandlingProtos";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
/**
* Protobuf version of a java.lang.StackTraceElement
* so we can serialize exceptions.
*/
message StackTraceElementMessage {
optional string declaringClass = 1;
optional string methodName = 2;
optional string fileName = 3;
optional int32 lineNumber = 4;
}
/**
* Cause of a remote failure for a generic exception. Contains
* all the information for a generic exception as well as
* optional info about the error for generic info passing
* (which should be another protobuffed class).
*/
message GenericExceptionMessage {
optional string className = 1;
optional string message = 2;
optional bytes errorInfo = 3;
repeated StackTraceElementMessage trace = 4;
}
/**
* Exception sent across the wire when a remote task needs
* to notify other tasks that it failed and why
*/
message ForeignExceptionMessage {
optional string source = 1;
optional GenericExceptionMessage genericException = 2;
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.errorhandling;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
/**
* Test that we propagate errors through an dispatcher exactly once via different failure
* injection mechanisms.
*/
@Category(SmallTests.class)
public class TestForeignExceptionDispatcher {
private static final Log LOG = LogFactory.getLog(TestForeignExceptionDispatcher.class);
/**
* Exception thrown from the test
*/
final ForeignException EXTEXN = new ForeignException("FORTEST", new IllegalArgumentException("FORTEST"));
final ForeignException EXTEXN2 = new ForeignException("FORTEST2", new IllegalArgumentException("FORTEST2"));
/**
* Tests that a dispatcher only dispatches only the first exception, and does not propagate
* subsequent exceptions.
*/
@Test
public void testErrorPropagation() {
ForeignExceptionListener listener1 = Mockito.mock(ForeignExceptionListener.class);
ForeignExceptionListener listener2 = Mockito.mock(ForeignExceptionListener.class);
ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher();
// add the listeners
dispatcher.addListener(listener1);
dispatcher.addListener(listener2);
// create an artificial error
String message = "Some error";
dispatcher.receive(message, EXTEXN);
// make sure the listeners got the error
Mockito.verify(listener1, Mockito.times(1)).receive(message, EXTEXN);
Mockito.verify(listener2, Mockito.times(1)).receive(message, EXTEXN);
// make sure that we get an exception
try {
dispatcher.rethrowException();
fail("Monitor should have thrown an exception after getting error.");
} catch (ForeignException ex) {
assertTrue("Got an unexpected exception:" + ex, ex == EXTEXN);
LOG.debug("Got the testing exception!");
}
// push another error, which should be not be passed to listeners
message = "another error";
dispatcher.receive(message, EXTEXN2);
Mockito.verify(listener1, Mockito.never()).receive(message, EXTEXN2);
Mockito.verify(listener2, Mockito.never()).receive(message, EXTEXN2);
}
@Test
public void testSingleDispatcherWithTimer() {
ForeignExceptionListener listener1 = Mockito.mock(ForeignExceptionListener.class);
ForeignExceptionListener listener2 = Mockito.mock(ForeignExceptionListener.class);
ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher();
// add the listeners
monitor.addListener(listener1);
monitor.addListener(listener2);
TimeoutExceptionInjector timer = new TimeoutExceptionInjector(monitor, 1000);
timer.start();
timer.trigger();
assertTrue("Monitor didn't get timeout", monitor.hasException());
// verify that that we propagated the error
Mockito.verify(listener1).receive(Mockito.anyString(), Mockito.any(ForeignException.class));
Mockito.verify(listener2).receive(Mockito.anyString(), Mockito.any(ForeignException.class));
}
/**
* Test that the dispatcher can receive an error via the timer mechanism.
*/
@Test
public void testAttemptTimer() {
ForeignExceptionListener listener1 = Mockito.mock(ForeignExceptionListener.class);
ForeignExceptionListener listener2 = Mockito.mock(ForeignExceptionListener.class);
ForeignExceptionDispatcher orchestrator = new ForeignExceptionDispatcher();
// add the listeners
orchestrator.addListener(listener1);
orchestrator.addListener(listener2);
// now create a timer and check for that error
TimeoutExceptionInjector timer = new TimeoutExceptionInjector(orchestrator, 1000);
timer.start();
timer.trigger();
// make sure that we got the timer error
Mockito.verify(listener1, Mockito.times(1)).receive(Mockito.anyString(),
Mockito.any(ForeignException.class));
Mockito.verify(listener2, Mockito.times(1)).receive(Mockito.anyString(),
Mockito.any(ForeignException.class));
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.errorhandling;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Test that we correctly serialize exceptions from a remote source
*/
@Category(SmallTests.class)
public class TestForeignExceptionSerialization {
private static final String srcName = "someNode";
/**
* Verify that we get back similar stack trace information before an after serialization.
* @throws InvalidProtocolBufferException
*/
@Test
public void testSimpleException() throws InvalidProtocolBufferException {
String data = "some bytes";
ForeignException in = new ForeignException("SRC", new IllegalArgumentException(data));
// check that we get the data back out
ForeignException e = ForeignException.deserialize(ForeignException.serialize(srcName, in));
assertNotNull(e);
// now check that we get the right stack trace
StackTraceElement elem = new StackTraceElement(this.getClass().toString(), "method", "file", 1);
in.setStackTrace(new StackTraceElement[] { elem });
e = ForeignException.deserialize(ForeignException.serialize(srcName, in));
assertNotNull(e);
assertEquals("Stack trace got corrupted", elem, e.getCause().getStackTrace()[0]);
assertEquals("Got an unexpectedly long stack trace", 1, e.getCause().getStackTrace().length);
}
/**
* Compare that a generic exception's stack trace has the same stack trace elements after
* serialization and deserialization
* @throws InvalidProtocolBufferException
*/
@Test
public void testRemoteFromLocal() throws InvalidProtocolBufferException {
String errorMsg = "some message";
Exception generic = new Exception(errorMsg);
generic.printStackTrace();
assertTrue(generic.getMessage().contains(errorMsg));
ForeignException e = ForeignException.deserialize(ForeignException.serialize(srcName, generic));
assertArrayEquals("Local stack trace got corrupted", generic.getStackTrace(), e.getCause().getStackTrace());
e.printStackTrace(); // should have ForeignException and source node in it.
assertTrue(e.getCause().getCause() == null);
// verify that original error message is present in Foreign exception message
assertTrue(e.getCause().getMessage().contains(errorMsg));
}
}

View File

@ -15,57 +15,61 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
package org.apache.hadoop.hbase.errorhandling;
import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.OperationAttemptTimer;
import org.apache.hadoop.hbase.server.errorhandling.exception.OperationAttemptTimeoutException;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
/**
* Test the {@link OperationAttemptTimer} to ensure we fulfill contracts
* Test the {@link TimeoutExceptionInjector} to ensure we fulfill contracts
*/
@Category(SmallTests.class)
@SuppressWarnings("unchecked")
public class TestOperationAttemptTimer {
public class TestTimeoutExceptionInjector {
private static final Log LOG = LogFactory.getLog(TestOperationAttemptTimer.class);
private static final Log LOG = LogFactory.getLog(TestTimeoutExceptionInjector.class);
/**
* Test that a manually triggered timer fires an exception.
*/
@Test(timeout = 1000)
public void testTimerTrigger() {
final long time = 10000000;
ExceptionListener<Exception> listener = Mockito.mock(ExceptionListener.class);
OperationAttemptTimer timer = new OperationAttemptTimer(listener, time);
final long time = 10000000; // pick a value that is very far in the future
ForeignExceptionListener listener = Mockito.mock(ForeignExceptionListener.class);
TimeoutExceptionInjector timer = new TimeoutExceptionInjector(listener, time);
timer.start();
timer.trigger();
Mockito.verify(listener, Mockito.times(1)).receiveError(Mockito.anyString(),
Mockito.any(OperationAttemptTimeoutException.class));
Mockito.verify(listener, Mockito.times(1)).receive(Mockito.anyString(),
Mockito.any(ForeignException.class));
}
/**
* Test that a manually triggered exception with data fires with the data in receiveError.
*/
@Test
public void testTimerPassesOnErrorInfo() {
final long time = 10;
ExceptionListener<Exception> listener = Mockito.mock(ExceptionListener.class);
final Object[] data = new Object[] { "data" };
OperationAttemptTimer timer = new OperationAttemptTimer(listener, time, data);
final long time = 1000000;
ForeignExceptionListener listener = Mockito.mock(ForeignExceptionListener.class);
TimeoutExceptionInjector timer = new TimeoutExceptionInjector(listener, time);
timer.start();
timer.trigger();
Mockito.verify(listener).receiveError(Mockito.anyString(),
Mockito.any(OperationAttemptTimeoutException.class), Mockito.eq(data[0]));
Mockito.verify(listener).receive(Mockito.anyString(), Mockito.any(ForeignException.class));
}
/**
* Demonstrate TimeoutExceptionInjector semantics -- completion means no more exceptions passed to
* error listener.
*/
@Test(timeout = 1000)
public void testStartAfterComplete() throws InterruptedException {
final long time = 10;
ExceptionListener<Exception> listener = Mockito.mock(ExceptionListener.class);
OperationAttemptTimer timer = new OperationAttemptTimer(listener, time);
ForeignExceptionListener listener = Mockito.mock(ForeignExceptionListener.class);
TimeoutExceptionInjector timer = new TimeoutExceptionInjector(listener, time);
timer.complete();
try {
timer.start();
@ -77,11 +81,15 @@ public class TestOperationAttemptTimer {
Mockito.verifyZeroInteractions(listener);
}
/**
* Demonstrate TimeoutExceptionInjector semantics -- triggering fires exception and completes
* the timer.
*/
@Test(timeout = 1000)
public void testStartAfterTrigger() throws InterruptedException {
final long time = 10;
ExceptionListener<Exception> listener = Mockito.mock(ExceptionListener.class);
OperationAttemptTimer timer = new OperationAttemptTimer(listener, time);
ForeignExceptionListener listener = Mockito.mock(ForeignExceptionListener.class);
TimeoutExceptionInjector timer = new TimeoutExceptionInjector(listener, time);
timer.trigger();
try {
timer.start();
@ -90,8 +98,8 @@ public class TestOperationAttemptTimer {
LOG.debug("Correctly failed timer: " + e.getMessage());
}
Thread.sleep(time * 2);
Mockito.verify(listener, Mockito.times(1)).receiveError(Mockito.anyString(),
Mockito.any(OperationAttemptTimeoutException.class));
Mockito.verify(listener, Mockito.times(1)).receive(Mockito.anyString(),
Mockito.any(ForeignException.class));
Mockito.verifyNoMoreInteractions(listener);
}
}

View File

@ -1,28 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
/**
* Exception thrown from the test
*/
@SuppressWarnings("serial")
public class ExceptionForTesting extends Exception {
public ExceptionForTesting(String msg) {
super(msg);
}
}

View File

@ -1,157 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.util.Pair;
/**
* Decoratable policy for if a fault should be injected for a given stack trace.
* <p>
* Passed in policies are combined with the current policy via a {@link PolicyCombination}.
* <p>
* Using {@link PolicyCombination#AND} means that <i>all</i> policies must agree to inject a fault
* (including the current implemented) before a fault is injected.
* <p>
* Using {@link PolicyCombination#OR} means that if <i>any</i> of the policies may assert that a
* fault be injected, in which case all remaining injectors will be ignored.
* <p>
* Order of operations occurs in reverse order of the operations added via
* {@link #and(FaultInjectionPolicy)} or {@link #or(FaultInjectionPolicy)}. For example, if this is
* the default policy 'a', which we {@link #and(FaultInjectionPolicy)} with 'b' and then 'c', we get
* the following policy chain:
* <p>
* a && (b && c)
* <p>
* Similarly, if this is the default policy 'a', which we {@link #or(FaultInjectionPolicy)} with 'b'
* and then 'c', we get the following policy chain:
* <p>
* a || (b || c).
* <p>
* Naturally, more complex policies can then be built using this style. Suppose we have policy A,
* which is actually the 'and' of two policies, a and b:
* <p>
* A = a && b
* <p>
* and similarly we also have B which is an 'or' of c and d:
* <p>
* B = c || d
* <p>
* then we could combine the two by calling A {@link #and(FaultInjectionPolicy)} B, to get:
* <p>
* A && B = (a && b) && (c || d)
*/
public class FaultInjectionPolicy {
public enum PolicyCombination {
AND, OR;
/**
* Apply the combination to the policy outputs
* @param current current policy value
* @param next next policy to value to consider
* @return <tt>true</tt> if the logical combination is valid, <tt>false</tt> otherwise
*/
public boolean apply(boolean current, boolean next) {
switch (this) {
case AND:
return current && next;
case OR:
return current || next;
default:
throw new IllegalArgumentException("Unrecognized policy!" + this);
}
}
}
private List<Pair<PolicyCombination, FaultInjectionPolicy>> policies = new ArrayList<Pair<PolicyCombination, FaultInjectionPolicy>>();
/**
* And the current chain with another policy.
* <p>
* For example, if this is the default policy 'a', which we {@link #and(FaultInjectionPolicy)}
* with 'b' and then 'c', we get the following policy chain:
* <p>
* a && (b && c)
* @param policy policy to logical AND with the current policies
* @return <tt>this</tt> for chaining
*/
public FaultInjectionPolicy and(FaultInjectionPolicy policy) {
return addPolicy(PolicyCombination.AND, policy);
}
/**
* And the current chain with another policy.
* <p>
* For example, if this is the default policy 'a', which we {@link #or(FaultInjectionPolicy)} with
* 'b' and then 'c', we get the following policy chain:
* <p>
* a || (b || c)
* @param policy policy to logical OR with the current policies
* @return <tt>this</tt> for chaining
*/
public FaultInjectionPolicy or(FaultInjectionPolicy policy) {
return addPolicy(PolicyCombination.OR, policy);
}
private FaultInjectionPolicy addPolicy(PolicyCombination combinator, FaultInjectionPolicy policy) {
policies.add(new Pair<PolicyCombination, FaultInjectionPolicy>(combinator, policy));
return this;
}
/**
* Check to see if this, or any of the policies this decorates, find that a fault should be
* injected .
* @param stack
* @return <tt>true</tt> if a fault should be injected, <tt>false</tt> otherwise.
*/
public final boolean shouldFault(StackTraceElement[] stack) {
boolean current = checkForFault(stack);
return eval(current, policies, stack);
}
/**
* @param current
* @param policies2
* @param stack
* @return
*/
private boolean eval(boolean current,
List<Pair<PolicyCombination, FaultInjectionPolicy>> policies, StackTraceElement[] stack) {
// base condition: if there are no more to evaluate, the comparison is the last
if (policies.size() == 0) return current;
// otherwise we have to evaluate the rest of chain
Pair<PolicyCombination, FaultInjectionPolicy> policy = policies.get(0);
boolean next = policy.getSecond().shouldFault(stack);
return policy.getFirst()
.apply(current, eval(next, policies.subList(1, policies.size()), stack));
}
/**
* Check to see if we should generate a fault for the given stacktrace.
* <p>
* Subclass hook for providing custom fault checking behavior
* @param stack
* @return if a fault should be injected for this error check request. <tt>false</tt> by default
*/
protected boolean checkForFault(StackTraceElement[] stack) {
return false;
}
}

View File

@ -1,58 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.server.errorhandling.FaultInjector;
import org.apache.hadoop.hbase.util.Pair;
/**
* Fault injector that can take a policy for when to inject a fault
* @param <E> type of exception that should be returned
*/
public abstract class PoliciedFaultInjector<E extends Exception> implements FaultInjector<E> {
private static final Log LOG = LogFactory.getLog(PoliciedFaultInjector.class);
private FaultInjectionPolicy policy;
public PoliciedFaultInjector(FaultInjectionPolicy policy) {
this.policy = policy;
}
@Override
public final Pair<E, Object[]> injectFault(StackTraceElement[] trace) {
if (policy.shouldFault(trace)) {
return this.getInjectedError(trace);
}
LOG.debug("NOT injecting fault, stack:" + Arrays.toString(Arrays.copyOfRange(trace, 3, 6)));
return null;
}
/**
* Get the error that should be returned to the caller when the {@link FaultInjectionPolicy}
* determines we need to inject a fault
* @param trace trace for which the {@link FaultInjectionPolicy} specified we should have an error
* @return the information about the fault that should be returned if there was a fault, null
* otherwise
*/
protected abstract Pair<E, Object[]> getInjectedError(StackTraceElement[] trace);
}

View File

@ -1,37 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
/**
* Simple error listener that can be checked to see if it received an error ({@link #error}) and the
* information about the error received ({@link #info}).
*/
@SuppressWarnings("javadoc")
public class SimpleErrorListener<E extends Exception> implements ExceptionListener<E> {
public boolean error = false;
public Object[] info = null;
@Override
public void receiveError(String message, Exception e, Object... info) {
this.error = true;
this.info = info;
}
}

View File

@ -1,139 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.util.Arrays;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.OperationAttemptTimer;
import org.apache.hadoop.hbase.server.errorhandling.exception.OperationAttemptTimeoutException;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
/**
* Test that we propagate errors through an orchestrator as expected
*/
@Category(SmallTests.class)
@SuppressWarnings({ "rawtypes", "unchecked" })
public class TestExceptionOrchestrator {
@Test
public void testErrorPropagation() {
ExceptionListener listener1 = Mockito.mock(ExceptionListener.class);
ExceptionListener listener2 = Mockito.mock(ExceptionListener.class);
ExceptionOrchestrator<Exception> orchestrator = new ExceptionOrchestrator<Exception>();
// add the listeners
orchestrator.addErrorListener(orchestrator.genericVisitor, listener1);
orchestrator.addErrorListener(orchestrator.genericVisitor, listener2);
// create an artificial error
String message = "Some error";
Object[] info = new Object[] { "info1" };
Exception e = new ExceptionForTesting("error");
orchestrator.receiveError(message, e, info);
// make sure the listeners got the error
Mockito.verify(listener1, Mockito.times(1)).receiveError(message, e, info);
Mockito.verify(listener2, Mockito.times(1)).receiveError(message, e, info);
// push another error, which should be passed to listeners
message = "another error";
e = new ExceptionForTesting("hello");
info[0] = "info2";
orchestrator.receiveError(message, e, info);
Mockito.verify(listener1, Mockito.times(1)).receiveError(message, e, info);
Mockito.verify(listener2, Mockito.times(1)).receiveError(message, e, info);
// now create a timer and check for that error
info[0] = "timer";
OperationAttemptTimer timer = new OperationAttemptTimer(orchestrator, 1000, info);
timer.start();
timer.trigger();
// make sure that we got the timer error
Mockito.verify(listener1, Mockito.times(1)).receiveError(Mockito.anyString(),
Mockito.any(OperationAttemptTimeoutException.class),
Mockito.argThat(new VarArgMatcher<Object>(Object.class, info)));
Mockito.verify(listener2, Mockito.times(1)).receiveError(Mockito.anyString(),
Mockito.any(OperationAttemptTimeoutException.class),
Mockito.argThat(new VarArgMatcher<Object>(Object.class, info)));
}
/**
* Matcher that matches var-args elements
* @param <T> Type of args to match
*/
private static class VarArgMatcher<T> extends BaseMatcher<T> {
private T[] expected;
private Class<T> clazz;
private String reason;
/**
* Setup the matcher to expect args of the given type
* @param clazz type of args to expect
* @param expected expected arguments
*/
public VarArgMatcher(Class<T> clazz, T... expected) {
this.expected = expected;
this.clazz = clazz;
}
@Override
public boolean matches(Object arg0) {
// null check early exit
if (expected == null && arg0 == null) return true;
// single arg matching
if (clazz.isAssignableFrom(arg0.getClass())) {
if (expected.length == 1) {
if (arg0.equals(expected[0])) return true;
reason = "single argument received, but didn't match argument";
} else {
reason = "single argument received, but expected array of args, size = "
+ expected.length;
}
} else if (arg0.getClass().isArray()) {
// array matching
try {
T[] arg = (T[]) arg0;
if (Arrays.equals(expected, arg)) return true;
reason = "Array of args didn't match expected";
} catch (Exception e) {
reason = "Exception while matching arguments:" + e.getMessage();
}
} else reason = "Objet wasn't the same as passed class or not an array";
// nothing worked - fail
return false;
}
@Override
public void describeTo(Description arg0) {
arg0.appendText(reason);
}
}
}

View File

@ -1,95 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionCheckable;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionVisitor;
import org.apache.hadoop.hbase.server.errorhandling.FaultInjector;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
/**
* Test that we can correctly inject faults for testing
*/
@Category(SmallTests.class)
@SuppressWarnings({ "rawtypes", "unchecked" })
public class TestFaultInjecting {
private static final Log LOG = LogFactory.getLog(TestFaultInjecting.class);
public static final ExceptionVisitor<ExceptionListener> VISITOR = new ExceptionVisitor<ExceptionListener>() {
@Override
public void visit(ExceptionListener listener, String message, Exception e, Object... info) {
listener.receiveError(message, e, info);
}
};
@Test
public void testSimpleFaultInjection() {
ExceptionDispatcherFactory<ExceptionListener> factory = Mockito
.spy(new ExceptionDispatcherFactory<ExceptionListener>(TestFaultInjecting.VISITOR));
ExceptionDispatcher<ExceptionListener, Exception> dispatcher = new ExceptionDispatcher<ExceptionListener, Exception>();
Mockito.when(factory.buildErrorHandler(VISITOR)).thenReturn(dispatcher);
String info = "info";
ExceptionOrchestratorFactory.addFaultInjector(new StringFaultInjector(info));
ExceptionCheckable<Exception> monitor = factory.createErrorHandler();
// make sure we wrap the dispatcher with the fault injection
assertNotSame(dispatcher, monitor);
// test that we actually inject a fault
assertTrue("Monitor didn't get an injected error", monitor.checkForError());
try {
monitor.failOnError();
fail("Monitor didn't get an exception from the fault injected in the factory.");
} catch (ExceptionForTesting e) {
LOG.debug("Correctly got an exception from the test!");
} catch (Exception e) {
fail("Got an unexpected exception:" + e);
}
}
/**
* Fault injector that will always throw a string error
*/
public static class StringFaultInjector implements FaultInjector<ExceptionForTesting> {
private final String info;
public StringFaultInjector(String info) {
this.info = info;
}
@Override
public Pair<ExceptionForTesting, Object[]> injectFault(StackTraceElement[] trace) {
if (ExceptionTestingUtils.stackContainsClass(trace, TestFaultInjecting.class)) {
return new Pair<ExceptionForTesting, Object[]>(new ExceptionForTesting(
"injected!"), new String[] { info });
}
return null;
}
}
}

View File

@ -1,95 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test the fault injection policies and combinations
*/
@Category(SmallTests.class)
public class TestFaultInjectionPolicies {
@Test
public void testAndCombination() {
FaultInjectionPolicy alwaysFalse = new FaultInjectionPolicy();
assertFalse("Default policy isn't false", alwaysFalse.shouldFault(null));
FaultInjectionPolicy alwaysTrue = new AlwaysTrue();
FaultInjectionPolicy andTrue = new AlwaysTrue().or(alwaysTrue).or(alwaysTrue);
assertTrue("And True isn't always returning true", andTrue.shouldFault(null));
FaultInjectionPolicy andFalse = new FaultInjectionPolicy().and(alwaysTrue);
assertFalse("false AND true", andFalse.shouldFault(null));
assertFalse("true AND false", alwaysTrue.and(alwaysFalse).shouldFault(null));
assertFalse("true AND (false AND true)",
new AlwaysTrue().and(new FaultInjectionPolicy().and(new AlwaysTrue())).shouldFault(null));
assertFalse("(true AND false AND true)",
new AlwaysTrue().and(new FaultInjectionPolicy()).and(new AlwaysTrue()).shouldFault(null));
}
@Test
public void testORCombination() {
FaultInjectionPolicy alwaysTrue = new AlwaysTrue();
FaultInjectionPolicy andTrue = new AlwaysTrue().or(alwaysTrue).or(alwaysTrue);
assertTrue("OR True isn't always returning true", andTrue.shouldFault(null));
FaultInjectionPolicy andFalse = new FaultInjectionPolicy().or(alwaysTrue);
assertTrue("Combination of true OR false should be true", andFalse.shouldFault(null));
assertTrue("Combining multiple ands isn't correct",
new FaultInjectionPolicy().or(andTrue).or(andFalse).shouldFault(null));
}
@Test
public void testMixedAndOr() {
assertTrue("true AND (false OR true)",
new AlwaysTrue().and(new FaultInjectionPolicy().or(new AlwaysTrue())).shouldFault(null));
assertTrue("(true AND false) OR true",
new AlwaysTrue().or(new AlwaysTrue().and(new FaultInjectionPolicy())).shouldFault(null));
assertFalse(
"(true AND false) OR false",
new FaultInjectionPolicy().or(new AlwaysTrue().and(new FaultInjectionPolicy())).shouldFault(
null));
}
private static class AlwaysTrue extends FaultInjectionPolicy {
protected boolean checkForFault(StackTraceElement[] stack) {
return true;
}
}
public static class SimplePolicyFaultInjector extends PoliciedFaultInjector<Exception> {
public SimplePolicyFaultInjector(FaultInjectionPolicy policy) {
super(policy);
}
@Override
protected Pair<Exception, Object[]> getInjectedError(StackTraceElement[] trace) {
return new Pair<Exception, Object[]>(new RuntimeException("error"), null);
}
}
}

View File

@ -1,113 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.OperationAttemptTimer;
import org.apache.hadoop.hbase.server.errorhandling.exception.OperationAttemptTimeoutException;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
/**
* Test using the single error dispatcher
*/
@SuppressWarnings("unchecked")
@Category(SmallTests.class)
public class TestSingleExceptionDispatcher {
private static final Log LOG = LogFactory.getLog(TestSingleExceptionDispatcher.class);
@Test
public void testErrorPropagation() {
ExceptionListener<Exception> listener1 = Mockito.mock(ExceptionListener.class);
ExceptionListener<Exception> listener2 = Mockito.mock(ExceptionListener.class);
ExceptionDispatcher<? extends ExceptionListener<Exception>, Exception> monitor = new ExceptionDispatcher<ExceptionListener<Exception>, Exception>();
// add the listeners
monitor.addErrorListener(monitor.genericVisitor, listener1);
monitor.addErrorListener(monitor.genericVisitor, listener2);
// create an artificial error
String message = "Some error";
Exception expected = new ExceptionForTesting("error");
Object info = "info1";
monitor.receiveError(message, expected, info);
// make sure the listeners got the error
Mockito.verify(listener1).receiveError(message, expected, info);
Mockito.verify(listener2).receiveError(message, expected, info);
// make sure that we get an exception
try {
monitor.failOnError();
fail("Monitor should have thrown an exception after getting error.");
} catch (Exception e) {
assertTrue("Got an unexpected exception:" + e, e instanceof ExceptionForTesting);
LOG.debug("Got the testing exception!");
}
// push another error, but this shouldn't be passed to the listeners
monitor.receiveError("another error", new ExceptionForTesting("hello"),
"shouldn't be found");
// make sure we don't re-propagate the error
Mockito.verifyNoMoreInteractions(listener1, listener2);
}
@Test
public void testSingleDispatcherWithTimer() {
ExceptionListener<Exception> listener1 = Mockito.mock(ExceptionListener.class);
ExceptionListener<Exception> listener2 = Mockito.mock(ExceptionListener.class);
ExceptionDispatcher<? extends ExceptionListener<Exception>, Exception> monitor = new ExceptionDispatcher<ExceptionListener<Exception>, Exception>();
// add the listeners
monitor.addErrorListener(monitor.genericVisitor, listener1);
monitor.addErrorListener(monitor.genericVisitor, listener2);
Object info = "message";
OperationAttemptTimer timer = new OperationAttemptTimer(monitor, 1000, info);
timer.start();
timer.trigger();
assertTrue("Monitor didn't get timeout", monitor.checkForError());
// verify that that we propagated the error
Mockito.verify(listener1).receiveError(Mockito.anyString(),
Mockito.any(OperationAttemptTimeoutException.class), Mockito.eq(info));
Mockito.verify(listener2).receiveError(Mockito.anyString(),
Mockito.any(OperationAttemptTimeoutException.class), Mockito.eq(info));
}
@Test
public void testAddListenerWithoutVisitor() {
SimpleErrorListener<Exception> listener = new SimpleErrorListener<Exception>();
ExceptionDispatcher<SimpleErrorListener<Exception>, Exception> monitor = new ExceptionDispatcher<SimpleErrorListener<Exception>, Exception>();
try {
monitor.addErrorListener(listener);
fail("Monitor needs t have a visitor for adding generically typed listeners");
} catch (UnsupportedOperationException e) {
LOG.debug("Correctly failed to add listener without visitor: " + e.getMessage());
}
}
}