https://issues.apache.org/jira/browse/AMQ-4643 - ensure handled ioexceptions are not propagated back to client when broker or transports are shutdown. additional tests existing tests refactored to reflect new determinism. IOException handler now throws SuppressReplyException which is trapped by the transport connector. If store exception need to be processes by the client, for immediate response rather than failover reconnect, then the ioexception handler should be configured to ignoreAllErrors

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1508602 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-07-30 20:18:07 +00:00
parent 4a270fe1f0
commit 13bbe52646
19 changed files with 414 additions and 115 deletions

View File

@ -122,7 +122,7 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L
}
}
} catch (IOException e) {
LOG.warn("locker keepalive resulted in: " + e, e);
LOG.warn("locker keepAlive resulted in: " + e, e);
}
if (stop) {
stopBroker();

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import java.io.IOException;
/**
* An exception thrown when the broker or transport will be shutdown in response
* to an error, eg. from IOExceptionHandler.
* The transport will die (socket.close()) so we don't want to propagate exceptions
* to the client; failover transport will retry the operation.
*
*/
public class SuppressReplyException extends RuntimeException {
public SuppressReplyException(String reason, IOException cause) {
super(reason, cause);
}
}

View File

@ -301,6 +301,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
+ " command: " + command + ", exception: " + e, e);
}
if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) {
LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause());
responseRequired = false;
}
if (responseRequired) {
response = new ExceptionResponse(e);
} else {

View File

@ -58,7 +58,7 @@ public class LocalTransaction extends Transaction {
LOG.warn("COMMIT FAILED: ", e);
rollback();
// Let them know we rolled back.
XAException xae = new XAException("COMMIT FAILED: Transaction rolled back.");
XAException xae = new XAException("COMMIT FAILED: Transaction rolled back");
xae.errorCode = XAException.XA_RBOTHER;
xae.initCause(e);
throw xae;
@ -66,15 +66,13 @@ public class LocalTransaction extends Transaction {
setState(Transaction.FINISHED_STATE);
context.getTransactions().remove(xid);
// Sync on transaction store to avoid out of order messages in the cursor
// https://issues.apache.org/activemq/browse/AMQ-2594
try {
transactionStore.commit(getTransactionId(), false,preCommitTask, postCommitTask);
transactionStore.commit(getTransactionId(), false, preCommitTask, postCommitTask);
this.waitPostCommitDone(postCommitTask);
} catch (Throwable t) {
LOG.warn("Store COMMIT FAILED: ", t);
rollback();
XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back.");
XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back");
xae.errorCode = XAException.XA_RBOTHER;
xae.initCause(t);
throw xae;
@ -109,7 +107,7 @@ public class LocalTransaction extends Transaction {
@Override
public int prepare() throws XAException {
XAException xae = new XAException("Prepare not implemented on Local Transactions.");
XAException xae = new XAException("Prepare not implemented on Local Transactions");
xae.errorCode = XAException.XAER_RMERR;
throw xae;
}

View File

@ -89,7 +89,7 @@ public class XATransaction extends Transaction {
} catch (Throwable t) {
LOG.warn("Store COMMIT FAILED: ", t);
rollback();
XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back.");
XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back");
xae.errorCode = XAException.XA_RBOTHER;
xae.initCause(t);
throw xae;
@ -104,7 +104,7 @@ public class XATransaction extends Transaction {
private void checkForPreparedState(boolean onePhase) throws XAException {
if (!onePhase) {
XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared.");
XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared");
xae.errorCode = XAException.XAER_PROTO;
throw xae;
}
@ -118,7 +118,7 @@ public class XATransaction extends Transaction {
} catch (Throwable e) {
LOG.warn("PRE-PREPARE FAILED: ", e);
rollback();
XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back.");
XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back");
xae.errorCode = XAException.XA_RBOTHER;
xae.initCause(e);
throw xae;

View File

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.SuppressReplyException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,7 +41,7 @@ import org.slf4j.LoggerFactory;
private String noSpaceMessage = "space";
private String sqlExceptionMessage = ""; // match all
private long resumeCheckSleepPeriod = 5*1000;
private AtomicBoolean stopStartInProgress = new AtomicBoolean(false);
private AtomicBoolean handlingException = new AtomicBoolean(false);
public void handle(IOException exception) {
if (ignoreAllErrors) {
@ -73,59 +74,69 @@ import org.slf4j.LoggerFactory;
}
if (stopStartConnectors) {
if (!stopStartInProgress.compareAndSet(false, true)) {
// we are already working on it
return;
}
LOG.info("Initiating stop/restart of broker transport due to IO exception, " + exception, exception);
if (handlingException.compareAndSet(false, true)) {
LOG.info("Initiating stop/restart of transports on " + broker + " due to IO exception, " + exception, exception);
new Thread("IOExceptionHandler: stop transports") {
public void run() {
try {
ServiceStopper stopper = new ServiceStopper();
broker.stopAllConnectors(stopper);
LOG.info("Successfully stopped transports on " + broker);
} catch (Exception e) {
LOG.warn("Failure occurred while stopping broker connectors", e);
} finally {
// resume again
new Thread("IOExceptionHandler: restart transports") {
public void run() {
try {
while (hasLockOwnership() && isPersistenceAdapterDown()) {
LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
}
broker.startAllConnectors();
LOG.info("Successfully restarted transports on " + broker);
} catch (Exception e) {
LOG.warn("Stopping " + broker + " due to failure while restarting transports", e);
stopBroker(e);
} finally {
handlingException.compareAndSet(true, false);
}
}
private boolean isPersistenceAdapterDown() {
boolean checkpointSuccess = false;
try {
broker.getPersistenceAdapter().checkpoint(true);
checkpointSuccess = true;
} catch (Throwable ignored) {
}
return !checkpointSuccess;
}
}.start();
new Thread("stop transport connectors on IO exception") {
public void run() {
try {
ServiceStopper stopper = new ServiceStopper();
broker.stopAllConnectors(stopper);
} catch (Exception e) {
LOG.warn("Failure occurred while stopping broker connectors", e);
}
}
}.start();
// resume again
new Thread("restart transport connectors post IO exception") {
public void run() {
try {
while (hasLockOwnership() && isPersistenceAdapterDown()) {
LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
}
broker.startAllConnectors();
} catch (Exception e) {
LOG.warn("Stopping broker due to failure while restarting broker connectors", e);
stopBroker(e);
} finally {
stopStartInProgress.compareAndSet(true, false);
}
}
}.start();
}
private boolean isPersistenceAdapterDown() {
boolean checkpointSuccess = false;
try {
broker.getPersistenceAdapter().checkpoint(true);
checkpointSuccess = true;
} catch (Throwable ignored) {}
return !checkpointSuccess;
}
}.start();
return;
throw new SuppressReplyException("Stop/RestartTransportsInitiated", exception);
}
stopBroker(exception);
if (handlingException.compareAndSet(false, true)) {
stopBroker(exception);
}
// we don't want to propagate the exception back to the client
// They will see a delay till they see a disconnect via socket.close
// at which point failover: can kick in.
throw new SuppressReplyException("ShutdownBrokerInitiated", exception);
}
private void stopBroker(Exception exception) {
LOG.info("Stopping the broker due to exception, " + exception, exception);
new Thread("Stopping the broker due to IO exception") {
LOG.info("Stopping " + broker + " due to exception, " + exception, exception);
new Thread("IOExceptionHandler: stopping " + broker) {
public void run() {
try {
if( broker.isRestartAllowed() ) {

View File

@ -1162,6 +1162,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
public void rollback() throws JMSException {
clearDispatchList();
synchronized (unconsumedMessages.getMutex()) {
if (optimizeAcknowledge) {
// remove messages read but not acked at the broker yet through

View File

@ -102,12 +102,12 @@ public class DefaultDatabaseLocker extends AbstractLocker {
try {
connection.rollback();
} catch (SQLException e1) {
LOG.error("Caught exception during rollback on connection: " + e1, e1);
LOG.debug("Caught exception during rollback on connection: " + e1, e1);
}
try {
connection.close();
} catch (SQLException e1) {
LOG.error("Caught exception while closing connection: " + e1, e1);
LOG.debug("Caught exception while closing connection: " + e1, e1);
}
connection = null;

View File

@ -19,6 +19,7 @@ package org.apache.activemq.store.jdbc;
import java.io.IOException;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.broker.SuppressReplyException;
import org.apache.activemq.util.DefaultIOExceptionHandler;
/**
@ -31,6 +32,7 @@ public class JDBCIOExceptionHandler extends DefaultIOExceptionHandler {
setStopStartConnectors(true);
}
// fail only when we get an authoritative answer from the db w/o exceptions
@Override
protected boolean hasLockOwnership() throws IOException {
boolean hasLock = true;
@ -42,11 +44,12 @@ public class JDBCIOExceptionHandler extends DefaultIOExceptionHandler {
if (!locker.keepAlive()) {
hasLock = false;
}
} catch (SuppressReplyException ignoreWhileHandlingInProgress) {
} catch (IOException ignored) {
}
if (!hasLock) {
throw new IOException("PersistenceAdapter lock no longer valid using: " + locker);
throw new IOException("Lock keepAlive failed, no longer lock owner with: " + locker);
}
}
}

View File

@ -369,7 +369,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
Thread thread = new Thread(runnable, "ActiveMQ JDBC PA Scheduled Task");
thread.setDaemon(true);
return thread;
}

View File

@ -74,6 +74,7 @@ public class LeaseDatabaseLocker extends AbstractLocker {
String sql = statements.getLeaseObtainStatement();
LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
long now = 0l;
while (!stopping) {
Connection connection = null;
PreparedStatement statement = null;
@ -84,7 +85,7 @@ public class LeaseDatabaseLocker extends AbstractLocker {
statement = connection.prepareStatement(sql);
setQueryTimeout(statement);
final long now = System.currentTimeMillis() + diffFromCurrentTime;
now = System.currentTimeMillis() + diffFromCurrentTime;
statement.setString(1, getLeaseHolderId());
statement.setLong(2, now + lockAcquireSleepInterval);
statement.setLong(3, now);
@ -113,7 +114,7 @@ public class LeaseDatabaseLocker extends AbstractLocker {
throw new RuntimeException(getLeaseHolderId() + " failing lease acquire due to stop");
}
LOG.info(getLeaseHolderId() + ", becoming the master on dataSource: " + dataSource);
LOG.info(getLeaseHolderId() + ", becoming master with lease expiry " + new Date(now) + " on dataSource: " + dataSource);
}
private void setQueryTimeout(PreparedStatement statement) throws SQLException {
@ -187,8 +188,12 @@ public class LeaseDatabaseLocker extends AbstractLocker {
}
public void doStop(ServiceStopper stopper) throws Exception {
releaseLease();
stopping = true;
if (persistenceAdapter.getBrokerService() != null && persistenceAdapter.getBrokerService().isRestartRequested()) {
// keep our lease for restart
return;
}
releaseLease();
}
private void releaseLease() {
@ -232,6 +237,10 @@ public class LeaseDatabaseLocker extends AbstractLocker {
statement.setString(3, getLeaseHolderId());
result = (statement.executeUpdate() == 1);
if (!result) {
reportLeasOwnerShipAndDuration(connection);
}
} catch (Exception e) {
LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
IOException ioe = IOExceptionSupport.create(e);
@ -280,4 +289,9 @@ public class LeaseDatabaseLocker extends AbstractLocker {
public void setMaxAllowableDiffFromDBTime(int maxAllowableDiffFromDBTime) {
this.maxAllowableDiffFromDBTime = maxAllowableDiffFromDBTime;
}
@Override
public String toString() {
return "LeaseDatabaseLocker owner:" + leaseHolderId + ",duration:" + lockAcquireSleepInterval + ",renew:" + lockAcquireSleepInterval;
}
}

View File

@ -138,7 +138,9 @@ public class TransactionContext {
} catch (SQLException e) {
JDBCPersistenceAdapter.log("Error while closing connection: ", e);
throw IOExceptionSupport.create(e);
IOException ioe = IOExceptionSupport.create(e);
persistenceAdapter.getBrokerService().handleIOException(ioe);
throw ioe;
} finally {
try {
if (connection != null) {

View File

@ -30,7 +30,9 @@ public class DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest extends DbRestartJDBC
@Override
protected void configureBroker(BrokerService brokerService) {
brokerService.setIoExceptionHandler(new JDBCIOExceptionHandler());
// master and slave survive db restart and retain master/slave status
JDBCIOExceptionHandler stopConnectors = new JDBCIOExceptionHandler();
brokerService.setIoExceptionHandler(stopConnectors);
}
@Override
@ -51,6 +53,9 @@ public class DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest extends DbRestartJDBC
protected void verifyExpectedBroker(int inflightMessageCount) {
if (inflightMessageCount == 0 || (inflightMessageCount == failureCount + 10 && restartDelay <= 500)) {
assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
} else {
// lease expired while DB was offline, either or master/slave can grab it so assert is not deterministic
// but we still need to validate sent == received
}
}

View File

@ -20,8 +20,11 @@ import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,6 +39,17 @@ public class DbRestartJDBCQueueMasterSlaveLeaseTest extends DbRestartJDBCQueueMa
persistenceAdapter.setLockKeepAlivePeriod(getLockKeepAlivePeriod());
}
@Override
protected void configureBroker(BrokerService brokerService) {
//let the brokers die on exception and master should have lease on restart
// which will delay slave start till it expires
JDBCIOExceptionHandler trapSQLExceptions = new JDBCIOExceptionHandler();
trapSQLExceptions.setIgnoreSQLExceptions(false);
trapSQLExceptions.setStopStartConnectors(false);
trapSQLExceptions.setResumeCheckSleepPeriod(500l);
brokerService.setIoExceptionHandler(trapSQLExceptions);
}
private long getLockKeepAlivePeriod() {
return 500;
}
@ -43,22 +57,4 @@ public class DbRestartJDBCQueueMasterSlaveLeaseTest extends DbRestartJDBCQueueMa
private long getLockAcquireSleepInterval() {
return 2000;
}
@Override
protected void delayTillRestartRequired() {
LOG.info("delay for less than lease quantum. While Db is offline, master should stay alive");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
protected void verifyExpectedBroker(int inflightMessageCount) {
if (inflightMessageCount == 0 || inflightMessageCount == failureCount + 10) {
assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
}
}
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.broker.ft;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import javax.jms.Connection;
import javax.jms.Destination;
@ -25,6 +28,9 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TransactionRolledBackException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -56,7 +62,7 @@ public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest
if (inflightMessageCount == 0) {
assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
} else if (inflightMessageCount == failureCount + 10) {
assertEquals("connected to slave", slave.get().getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
assertEquals("connected to slave, count:" + inflightMessageCount, slave.get().getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
}
}
@ -67,23 +73,7 @@ public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest
protected void sendToProducer(MessageProducer producer,
Destination producerDestination, Message message) throws JMSException {
{
// do some retries as db failures filter back to the client until broker sees
// db lock failure and shuts down
boolean sent = false;
do {
try {
producer.send(producerDestination, message);
sent = true;
} catch (JMSException e) {
LOG.info("Exception on producer send for: " + message, e);
try {
Thread.sleep(2000);
} catch (InterruptedException ignored) {
}
}
} while(!sent);
}
producer.send(producerDestination, message);
}
@Override
@ -100,25 +90,58 @@ public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest
LOG.info("Failed to commit message receipt: " + message, e);
try {
receiveSession.rollback();
} catch (JMSException ignored) {}
} catch (JMSException ignored) {
}
if (e.getCause() instanceof TransactionRolledBackException) {
TransactionRolledBackException transactionRolledBackException = (TransactionRolledBackException)e.getCause();
if (e instanceof TransactionRolledBackException) {
TransactionRolledBackException transactionRolledBackException = (TransactionRolledBackException) e;
if (transactionRolledBackException.getMessage().indexOf("in doubt") != -1) {
// failover chucked bc there is a missing reply to a commit. the ack may have got there and the reply
// was lost or the ack may be lost.
// so we may not get a resend.
// failover chucked bc there is a missing reply to a commit.
// failover is involved b/c the store exception is handled broker side and the client just
// sees a disconnect (socket.close()).
// If the client needs to be aware of the failure then it should not use IOExceptionHandler
// so that the exception will propagate back
// for this test case:
// the commit may have got there and the reply is lost "or" the commit may be lost.
// so we may or may not get a resend.
//
// REVISIT: A JDBC store IO exception should not cause the connection to drop, so it needs to be wrapped
// possibly by the IOExceptionHandler
// The commit/close wrappers in jdbc TransactionContext need to delegate to the IOExceptionHandler
// this would leave the application aware of the store failure, and possible aware of whether the commit
// was a success, rather than going into failover-retries as it does now.
// At the application level we need to determine if the message is there or not which is not trivial
// for this test we assert received == sent
// so we need to know whether the message will be replayed.
// we can ask the store b/c we know it is jdbc - guess we could go through a destination
// message store interface also or use jmx
java.sql.Connection dbConnection = null;
try {
ActiveMQMessage mqMessage = (ActiveMQMessage) message;
dbConnection = sharedDs.getConnection();
PreparedStatement s = dbConnection.prepareStatement(((JDBCPersistenceAdapter) connectedToBroker().getPersistenceAdapter()).getStatements().getFindMessageStatement());
s.setString(1, mqMessage.getMessageId().getProducerId().toString());
s.setLong(2, mqMessage.getMessageId().getProducerSequenceId());
ResultSet rs = s.executeQuery();
if (!rs.next()) {
// message is gone, so lets count it as consumed
LOG.info("On TransactionRolledBackException we know that the ack/commit got there b/c message is gone so we count it: " + mqMessage);
super.consumeMessage(message, messageList);
} else {
LOG.info("On TransactionRolledBackException we know that the ack/commit was lost so we expect a replay of: " + mqMessage);
}
} catch (Exception dbe) {
dbe.printStackTrace();
} finally {
try {
dbConnection.close();
} catch (SQLException e1) {
e1.printStackTrace();
}
}
}
}
}
}
private BrokerService connectedToBroker() {
return ((ActiveMQConnection)receiveConnection).getBrokerInfo().getBrokerName().equals("master") ? master : slave.get();
}
}

View File

@ -76,7 +76,7 @@ public class DbRestartJDBCQueueTest extends JmsTopicSendReceiveWithTwoConnection
protected void tearDown() throws Exception {
super.tearDown();
broker.stop();
broker.stop();
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.activemq.util.IOHelper;
import org.apache.derby.jdbc.EmbeddedDataSource;
@ -59,7 +60,11 @@ public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
master.start();
}
protected void configureBroker(BrokerService master) {
protected void configureBroker(BrokerService brokerService) {
DefaultIOExceptionHandler stopBrokerOnStoreException = new DefaultIOExceptionHandler();
// we want any store io exception to stop the broker
stopBrokerOnStoreException.setIgnoreSQLExceptions(false);
brokerService.setIoExceptionHandler(stopBrokerOnStoreException);
}
protected void createSlave() throws Exception {

View File

@ -0,0 +1,202 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import java.io.IOException;
import java.sql.SQLException;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.activemq.util.IOHelper;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Testing how the broker reacts when a SQL Exception is thrown from
* org.apache.activemq.store.jdbc.TransactionContext.executeBatch().
* <p/>
* see https://issues.apache.org/jira/browse/AMQ-4636
*/
public class AMQ4636Test extends TestCase {
private static final String MY_TEST_TOPIC = "MY_TEST_TOPIC";
private static final Logger LOG = LoggerFactory
.getLogger(AMQ4636Test.class);
private String transportUrl = "tcp://0.0.0.0:0";
private BrokerService broker;
private TestTransactionContext testTransactionContext;
protected BrokerService createBroker(boolean withJMX) throws Exception {
BrokerService broker = new BrokerService();
broker.setUseJmx(withJMX);
EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
embeddedDataSource.setCreateDatabase("create");
//wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch()
// method that can be configured to throw a SQL exception on demand
JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter();
jdbc.setDataSource(embeddedDataSource);
testTransactionContext = new TestTransactionContext(jdbc);
jdbc.setLockKeepAlivePeriod(1000l);
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
jdbc.setLocker(leaseDatabaseLocker);
broker.setPersistenceAdapter(jdbc);
broker.setIoExceptionHandler(new JDBCIOExceptionHandler());
transportUrl = broker.addConnector(transportUrl).getPublishableConnectString();
return broker;
}
/**
* adding a TestTransactionContext (wrapper to TransactionContext) so an SQLException is triggered
* during TransactionContext.executeBatch() when called in the broker.
* <p/>
* Expectation: SQLException triggers a connection shutdown and failover should kick and try to redeliver the
* message. SQLException should NOT be returned to client
*/
public void testProducerWithDBShutdown() throws Exception {
broker = this.createBroker(false);
broker.deleteAllMessages();
broker.start();
broker.waitUntilStarted();
LOG.info("***Broker started...");
// failover but timeout in 1 seconds so the test does not hang
String failoverTransportURL = "failover:(" + transportUrl
+ ")?timeout=1000";
this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
this.sendMessage(MY_TEST_TOPIC, failoverTransportURL);
}
public void createDurableConsumer(String topic,
String transportURL) throws JMSException {
Connection connection = null;
LOG.info("*** createDurableConsumer() called ...");
try {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
transportURL);
connection = factory.createConnection();
connection.setClientID("myconn1");
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topic);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(
(Topic) destination, "MySub1");
} finally {
if (connection != null) {
connection.close();
}
}
}
public void sendMessage(String topic, String transportURL)
throws JMSException {
Connection connection = null;
try {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
transportURL);
connection = factory.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topic);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = session.createTextMessage("testMessage");
LOG.info("*** send message to broker...");
// trigger SQL exception in transactionContext
testTransactionContext.throwSQLException = true;
producer.send(m);
LOG.info("*** Finished send message to broker");
} finally {
if (connection != null) {
connection.close();
}
}
}
/*
* Mock classes used for testing
*/
public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter {
public TransactionContext getTransactionContext() throws IOException {
return testTransactionContext;
}
}
public class TestTransactionContext extends TransactionContext {
public boolean throwSQLException;
public TestTransactionContext(
JDBCPersistenceAdapter jdbcPersistenceAdapter)
throws IOException {
super(jdbcPersistenceAdapter);
}
public void executeBatch() throws SQLException {
if (throwSQLException) {
// only throw exception once
throwSQLException = false;
throw new SQLException("TEST SQL EXCEPTION");
}
super.executeBatch();
}
}
}

View File

@ -68,7 +68,9 @@ public class JDBCIOExceptionHandlerTest extends TestCase {
jdbc.setLocker(leaseDatabaseLocker);
broker.setPersistenceAdapter(jdbc);
broker.setIoExceptionHandler(new JDBCIOExceptionHandler());
JDBCIOExceptionHandler jdbcioExceptionHandler = new JDBCIOExceptionHandler();
jdbcioExceptionHandler.setResumeCheckSleepPeriod(1000l);
broker.setIoExceptionHandler(jdbcioExceptionHandler);
String connectionUri = broker.addConnector(TRANSPORT_URL).getPublishableConnectString();
factory = new ActiveMQConnectionFactory(connectionUri);