From 7a0168a4f5368183d912c3e94bfe98bb59cb9a74 Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 26 Mar 2014 16:19:46 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4636 - tidy up commit failure case to redirect via IOExceptionHandler - failover still suppressed the commit on recovery - resulting in rollback exception the client due to indoubt commit --- .../store/jdbc/TransactionContext.java | 41 +++--- .../org/apache/activemq/bugs/AMQ4636Test.java | 121 ++++++++++++++---- 2 files changed, 116 insertions(+), 46 deletions(-) diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java index 6a933b0840..8b4ac97c5a 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java @@ -177,10 +177,12 @@ public class TransactionContext { } } catch (SQLException e) { JDBCPersistenceAdapter.log("Commit failed: ", e); - - this.rollback(); - - throw IOExceptionSupport.create(e); + try { + doRollback(); + } catch (Exception ignored) {} + IOException ioe = IOExceptionSupport.create(e); + persistenceAdapter.getBrokerService().handleIOException(ioe); + throw ioe; } finally { inTx = false; close(); @@ -192,20 +194,7 @@ public class TransactionContext { throw new IOException("Not started."); } try { - if (addMessageStatement != null) { - addMessageStatement.close(); - addMessageStatement = null; - } - if (removedMessageStatement != null) { - removedMessageStatement.close(); - removedMessageStatement = null; - } - if (updateLastAckStatement != null) { - updateLastAckStatement.close(); - updateLastAckStatement = null; - } - connection.rollback(); - + doRollback(); } catch (SQLException e) { JDBCPersistenceAdapter.log("Rollback failed: ", e); throw IOExceptionSupport.create(e); @@ -215,6 +204,22 @@ public class TransactionContext { } } + private void doRollback() throws SQLException { + if (addMessageStatement != null) { + addMessageStatement.close(); + addMessageStatement = null; + } + if (removedMessageStatement != null) { + removedMessageStatement.close(); + removedMessageStatement = null; + } + if (updateLastAckStatement != null) { + updateLastAckStatement.close(); + updateLastAckStatement = null; + } + connection.rollback(); + } + public PreparedStatement getAddMessageStatement() { return addMessageStatement; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java index 419bfedd80..4373d49369 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java @@ -18,6 +18,7 @@ package org.apache.activemq.bugs; import java.io.IOException; import java.sql.SQLException; +import java.util.concurrent.CountDownLatch; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -27,9 +28,10 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSubscriber; -import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.store.jdbc.DataSourceServiceSupport; import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; @@ -37,8 +39,12 @@ import org.apache.activemq.store.jdbc.LeaseDatabaseLocker; import org.apache.activemq.store.jdbc.TransactionContext; import org.apache.activemq.util.IOHelper; import org.apache.derby.jdbc.EmbeddedDataSource; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.fail; /** * Testing how the broker reacts when a SQL Exception is thrown from @@ -46,35 +52,66 @@ import org.slf4j.LoggerFactory; *

* see https://issues.apache.org/jira/browse/AMQ-4636 */ - -public class AMQ4636Test extends TestCase { +public class AMQ4636Test { private static final String MY_TEST_TOPIC = "MY_TEST_TOPIC"; private static final Logger LOG = LoggerFactory .getLogger(AMQ4636Test.class); private String transportUrl = "tcp://0.0.0.0:0"; private BrokerService broker; - private TestTransactionContext testTransactionContext; + EmbeddedDataSource embeddedDataSource; + CountDownLatch throwSQLException = new CountDownLatch(0); - protected BrokerService createBroker(boolean withJMX) throws Exception { - BrokerService broker = new BrokerService(); + @Before + public void startBroker() throws Exception { + broker = createBroker(); + broker.deleteAllMessages(); + broker.start(); + broker.waitUntilStarted(); + LOG.info("Broker started..."); + } - broker.setUseJmx(withJMX); + @After + public void stopBroker() throws Exception { + if (broker != null) { + LOG.info("Stopping broker..."); + broker.stop(); + broker.waitUntilStopped(); + } + try { + if (embeddedDataSource != null) { + // ref http://svn.apache.org/viewvc/db/derby/code/trunk/java/testing/org/apache/derbyTesting/junit/JDBCDataSource.java?view=markup + embeddedDataSource.setShutdownDatabase("shutdown"); + embeddedDataSource.getConnection(); + } + } catch (Exception ignored) { + } finally { + embeddedDataSource.setShutdownDatabase(null); + } + } - EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()); + protected BrokerService createBroker() throws Exception { + + embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()); embeddedDataSource.setCreateDatabase("create"); + embeddedDataSource.getConnection().close(); //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch() // method that can be configured to throw a SQL exception on demand JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter(); jdbc.setDataSource(embeddedDataSource); - testTransactionContext = new TestTransactionContext(jdbc); jdbc.setLockKeepAlivePeriod(1000l); LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); jdbc.setLocker(leaseDatabaseLocker); + broker = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(0); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); broker.setPersistenceAdapter(jdbc); broker.setIoExceptionHandler(new JDBCIOExceptionHandler()); @@ -90,26 +127,48 @@ public class AMQ4636Test extends TestCase { * Expectation: SQLException triggers a connection shutdown and failover should kick and try to redeliver the * message. SQLException should NOT be returned to client */ - + @Test public void testProducerWithDBShutdown() throws Exception { - broker = this.createBroker(false); - broker.deleteAllMessages(); - broker.start(); - broker.waitUntilStarted(); - - LOG.info("***Broker started..."); - // failover but timeout in 1 seconds so the test does not hang String failoverTransportURL = "failover:(" + transportUrl + ")?timeout=1000"; this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL); - this.sendMessage(MY_TEST_TOPIC, failoverTransportURL); + this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, false, false); } + @Test + public void testTransactedProducerCommitWithDBShutdown() throws Exception { + + // failover but timeout in 1 seconds so the test does not hang + String failoverTransportURL = "failover:(" + transportUrl + + ")?timeout=1000"; + + this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL); + + try { + this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, true); + fail("Expect rollback after failover - inddoubt commit"); + } catch (javax.jms.TransactionRolledBackException expectedInDoubt) { + LOG.info("Got rollback after failover failed commit", expectedInDoubt); + } + } + + @Test + public void testTransactedProducerRollbackWithDBShutdown() throws Exception { + + // failover but timeout in 1 seconds so the test does not hang + String failoverTransportURL = "failover:(" + transportUrl + + ")?timeout=1000"; + + this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL); + + this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, false); + } + public void createDurableConsumer(String topic, String transportURL) throws JMSException { Connection connection = null; @@ -135,7 +194,7 @@ public class AMQ4636Test extends TestCase { } } - public void sendMessage(String topic, String transportURL) + public void sendMessage(String topic, String transportURL, boolean transacted, boolean commit) throws JMSException { Connection connection = null; @@ -145,8 +204,8 @@ public class AMQ4636Test extends TestCase { transportURL); connection = factory.createConnection(); - Session session = connection.createSession(false, - Session.AUTO_ACKNOWLEDGE); + Session session = connection.createSession(transacted, + transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic(topic); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); @@ -155,9 +214,17 @@ public class AMQ4636Test extends TestCase { LOG.info("*** send message to broker..."); // trigger SQL exception in transactionContext - testTransactionContext.throwSQLException = true; + throwSQLException = new CountDownLatch(1); producer.send(m); + if (transacted) { + if (commit) { + session.commit(); + } else { + session.rollback(); + } + } + LOG.info("*** Finished send message to broker"); } finally { @@ -174,29 +241,27 @@ public class AMQ4636Test extends TestCase { public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter { public TransactionContext getTransactionContext() throws IOException { - return testTransactionContext; + return new TestTransactionContext(this); } } public class TestTransactionContext extends TransactionContext { - public boolean throwSQLException; - public TestTransactionContext( JDBCPersistenceAdapter jdbcPersistenceAdapter) throws IOException { super(jdbcPersistenceAdapter); } + @Override public void executeBatch() throws SQLException { - if (throwSQLException) { + if (throwSQLException.getCount() > 0) { // only throw exception once - throwSQLException = false; + throwSQLException.countDown(); throw new SQLException("TEST SQL EXCEPTION"); } super.executeBatch(); } - } } \ No newline at end of file