mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4636 - tidy up commit failure case to redirect via IOExceptionHandler - failover still suppressed the commit on recovery - resulting in rollback exception the client due to indoubt commit
This commit is contained in:
parent
75eb814ca7
commit
7a0168a4f5
|
@ -177,10 +177,12 @@ public class TransactionContext {
|
||||||
}
|
}
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
JDBCPersistenceAdapter.log("Commit failed: ", e);
|
JDBCPersistenceAdapter.log("Commit failed: ", e);
|
||||||
|
try {
|
||||||
this.rollback();
|
doRollback();
|
||||||
|
} catch (Exception ignored) {}
|
||||||
throw IOExceptionSupport.create(e);
|
IOException ioe = IOExceptionSupport.create(e);
|
||||||
|
persistenceAdapter.getBrokerService().handleIOException(ioe);
|
||||||
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
inTx = false;
|
inTx = false;
|
||||||
close();
|
close();
|
||||||
|
@ -192,20 +194,7 @@ public class TransactionContext {
|
||||||
throw new IOException("Not started.");
|
throw new IOException("Not started.");
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (addMessageStatement != null) {
|
doRollback();
|
||||||
addMessageStatement.close();
|
|
||||||
addMessageStatement = null;
|
|
||||||
}
|
|
||||||
if (removedMessageStatement != null) {
|
|
||||||
removedMessageStatement.close();
|
|
||||||
removedMessageStatement = null;
|
|
||||||
}
|
|
||||||
if (updateLastAckStatement != null) {
|
|
||||||
updateLastAckStatement.close();
|
|
||||||
updateLastAckStatement = null;
|
|
||||||
}
|
|
||||||
connection.rollback();
|
|
||||||
|
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
JDBCPersistenceAdapter.log("Rollback failed: ", e);
|
JDBCPersistenceAdapter.log("Rollback failed: ", e);
|
||||||
throw IOExceptionSupport.create(e);
|
throw IOExceptionSupport.create(e);
|
||||||
|
@ -215,6 +204,22 @@ public class TransactionContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void doRollback() throws SQLException {
|
||||||
|
if (addMessageStatement != null) {
|
||||||
|
addMessageStatement.close();
|
||||||
|
addMessageStatement = null;
|
||||||
|
}
|
||||||
|
if (removedMessageStatement != null) {
|
||||||
|
removedMessageStatement.close();
|
||||||
|
removedMessageStatement = null;
|
||||||
|
}
|
||||||
|
if (updateLastAckStatement != null) {
|
||||||
|
updateLastAckStatement.close();
|
||||||
|
updateLastAckStatement = null;
|
||||||
|
}
|
||||||
|
connection.rollback();
|
||||||
|
}
|
||||||
|
|
||||||
public PreparedStatement getAddMessageStatement() {
|
public PreparedStatement getAddMessageStatement() {
|
||||||
return addMessageStatement;
|
return addMessageStatement;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.bugs;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.DeliveryMode;
|
import javax.jms.DeliveryMode;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
|
@ -27,9 +28,10 @@ import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.Topic;
|
import javax.jms.Topic;
|
||||||
import javax.jms.TopicSubscriber;
|
import javax.jms.TopicSubscriber;
|
||||||
import junit.framework.TestCase;
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
|
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
|
||||||
import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler;
|
import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler;
|
||||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||||
|
@ -37,8 +39,12 @@ import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
|
||||||
import org.apache.activemq.store.jdbc.TransactionContext;
|
import org.apache.activemq.store.jdbc.TransactionContext;
|
||||||
import org.apache.activemq.util.IOHelper;
|
import org.apache.activemq.util.IOHelper;
|
||||||
import org.apache.derby.jdbc.EmbeddedDataSource;
|
import org.apache.derby.jdbc.EmbeddedDataSource;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Testing how the broker reacts when a SQL Exception is thrown from
|
* Testing how the broker reacts when a SQL Exception is thrown from
|
||||||
|
@ -46,35 +52,66 @@ import org.slf4j.LoggerFactory;
|
||||||
* <p/>
|
* <p/>
|
||||||
* see https://issues.apache.org/jira/browse/AMQ-4636
|
* see https://issues.apache.org/jira/browse/AMQ-4636
|
||||||
*/
|
*/
|
||||||
|
public class AMQ4636Test {
|
||||||
public class AMQ4636Test extends TestCase {
|
|
||||||
|
|
||||||
private static final String MY_TEST_TOPIC = "MY_TEST_TOPIC";
|
private static final String MY_TEST_TOPIC = "MY_TEST_TOPIC";
|
||||||
private static final Logger LOG = LoggerFactory
|
private static final Logger LOG = LoggerFactory
|
||||||
.getLogger(AMQ4636Test.class);
|
.getLogger(AMQ4636Test.class);
|
||||||
private String transportUrl = "tcp://0.0.0.0:0";
|
private String transportUrl = "tcp://0.0.0.0:0";
|
||||||
private BrokerService broker;
|
private BrokerService broker;
|
||||||
private TestTransactionContext testTransactionContext;
|
EmbeddedDataSource embeddedDataSource;
|
||||||
|
CountDownLatch throwSQLException = new CountDownLatch(0);
|
||||||
|
|
||||||
protected BrokerService createBroker(boolean withJMX) throws Exception {
|
@Before
|
||||||
BrokerService broker = new BrokerService();
|
public void startBroker() throws Exception {
|
||||||
|
broker = createBroker();
|
||||||
|
broker.deleteAllMessages();
|
||||||
|
broker.start();
|
||||||
|
broker.waitUntilStarted();
|
||||||
|
LOG.info("Broker started...");
|
||||||
|
}
|
||||||
|
|
||||||
broker.setUseJmx(withJMX);
|
@After
|
||||||
|
public void stopBroker() throws Exception {
|
||||||
|
if (broker != null) {
|
||||||
|
LOG.info("Stopping broker...");
|
||||||
|
broker.stop();
|
||||||
|
broker.waitUntilStopped();
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
if (embeddedDataSource != null) {
|
||||||
|
// ref http://svn.apache.org/viewvc/db/derby/code/trunk/java/testing/org/apache/derbyTesting/junit/JDBCDataSource.java?view=markup
|
||||||
|
embeddedDataSource.setShutdownDatabase("shutdown");
|
||||||
|
embeddedDataSource.getConnection();
|
||||||
|
}
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
} finally {
|
||||||
|
embeddedDataSource.setShutdownDatabase(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
|
protected BrokerService createBroker() throws Exception {
|
||||||
|
|
||||||
|
embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
|
||||||
embeddedDataSource.setCreateDatabase("create");
|
embeddedDataSource.setCreateDatabase("create");
|
||||||
|
embeddedDataSource.getConnection().close();
|
||||||
|
|
||||||
//wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch()
|
//wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch()
|
||||||
// method that can be configured to throw a SQL exception on demand
|
// method that can be configured to throw a SQL exception on demand
|
||||||
JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter();
|
JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter();
|
||||||
jdbc.setDataSource(embeddedDataSource);
|
jdbc.setDataSource(embeddedDataSource);
|
||||||
testTransactionContext = new TestTransactionContext(jdbc);
|
|
||||||
|
|
||||||
jdbc.setLockKeepAlivePeriod(1000l);
|
jdbc.setLockKeepAlivePeriod(1000l);
|
||||||
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
|
LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
|
||||||
leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
|
leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
|
||||||
jdbc.setLocker(leaseDatabaseLocker);
|
jdbc.setLocker(leaseDatabaseLocker);
|
||||||
|
|
||||||
|
broker = new BrokerService();
|
||||||
|
PolicyMap policyMap = new PolicyMap();
|
||||||
|
PolicyEntry defaultEntry = new PolicyEntry();
|
||||||
|
defaultEntry.setExpireMessagesPeriod(0);
|
||||||
|
policyMap.setDefaultEntry(defaultEntry);
|
||||||
|
broker.setDestinationPolicy(policyMap);
|
||||||
broker.setPersistenceAdapter(jdbc);
|
broker.setPersistenceAdapter(jdbc);
|
||||||
|
|
||||||
broker.setIoExceptionHandler(new JDBCIOExceptionHandler());
|
broker.setIoExceptionHandler(new JDBCIOExceptionHandler());
|
||||||
|
@ -90,26 +127,48 @@ public class AMQ4636Test extends TestCase {
|
||||||
* Expectation: SQLException triggers a connection shutdown and failover should kick and try to redeliver the
|
* Expectation: SQLException triggers a connection shutdown and failover should kick and try to redeliver the
|
||||||
* message. SQLException should NOT be returned to client
|
* message. SQLException should NOT be returned to client
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testProducerWithDBShutdown() throws Exception {
|
public void testProducerWithDBShutdown() throws Exception {
|
||||||
|
|
||||||
broker = this.createBroker(false);
|
|
||||||
broker.deleteAllMessages();
|
|
||||||
broker.start();
|
|
||||||
broker.waitUntilStarted();
|
|
||||||
|
|
||||||
LOG.info("***Broker started...");
|
|
||||||
|
|
||||||
// failover but timeout in 1 seconds so the test does not hang
|
// failover but timeout in 1 seconds so the test does not hang
|
||||||
String failoverTransportURL = "failover:(" + transportUrl
|
String failoverTransportURL = "failover:(" + transportUrl
|
||||||
+ ")?timeout=1000";
|
+ ")?timeout=1000";
|
||||||
|
|
||||||
this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
|
this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
|
||||||
|
|
||||||
this.sendMessage(MY_TEST_TOPIC, failoverTransportURL);
|
this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, false, false);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTransactedProducerCommitWithDBShutdown() throws Exception {
|
||||||
|
|
||||||
|
// failover but timeout in 1 seconds so the test does not hang
|
||||||
|
String failoverTransportURL = "failover:(" + transportUrl
|
||||||
|
+ ")?timeout=1000";
|
||||||
|
|
||||||
|
this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, true);
|
||||||
|
fail("Expect rollback after failover - inddoubt commit");
|
||||||
|
} catch (javax.jms.TransactionRolledBackException expectedInDoubt) {
|
||||||
|
LOG.info("Got rollback after failover failed commit", expectedInDoubt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTransactedProducerRollbackWithDBShutdown() throws Exception {
|
||||||
|
|
||||||
|
// failover but timeout in 1 seconds so the test does not hang
|
||||||
|
String failoverTransportURL = "failover:(" + transportUrl
|
||||||
|
+ ")?timeout=1000";
|
||||||
|
|
||||||
|
this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
|
||||||
|
|
||||||
|
this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, false);
|
||||||
|
}
|
||||||
|
|
||||||
public void createDurableConsumer(String topic,
|
public void createDurableConsumer(String topic,
|
||||||
String transportURL) throws JMSException {
|
String transportURL) throws JMSException {
|
||||||
Connection connection = null;
|
Connection connection = null;
|
||||||
|
@ -135,7 +194,7 @@ public class AMQ4636Test extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendMessage(String topic, String transportURL)
|
public void sendMessage(String topic, String transportURL, boolean transacted, boolean commit)
|
||||||
throws JMSException {
|
throws JMSException {
|
||||||
Connection connection = null;
|
Connection connection = null;
|
||||||
|
|
||||||
|
@ -145,8 +204,8 @@ public class AMQ4636Test extends TestCase {
|
||||||
transportURL);
|
transportURL);
|
||||||
|
|
||||||
connection = factory.createConnection();
|
connection = factory.createConnection();
|
||||||
Session session = connection.createSession(false,
|
Session session = connection.createSession(transacted,
|
||||||
Session.AUTO_ACKNOWLEDGE);
|
transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
|
||||||
Destination destination = session.createTopic(topic);
|
Destination destination = session.createTopic(topic);
|
||||||
MessageProducer producer = session.createProducer(destination);
|
MessageProducer producer = session.createProducer(destination);
|
||||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
@ -155,9 +214,17 @@ public class AMQ4636Test extends TestCase {
|
||||||
LOG.info("*** send message to broker...");
|
LOG.info("*** send message to broker...");
|
||||||
|
|
||||||
// trigger SQL exception in transactionContext
|
// trigger SQL exception in transactionContext
|
||||||
testTransactionContext.throwSQLException = true;
|
throwSQLException = new CountDownLatch(1);
|
||||||
producer.send(m);
|
producer.send(m);
|
||||||
|
|
||||||
|
if (transacted) {
|
||||||
|
if (commit) {
|
||||||
|
session.commit();
|
||||||
|
} else {
|
||||||
|
session.rollback();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("*** Finished send message to broker");
|
LOG.info("*** Finished send message to broker");
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -174,29 +241,27 @@ public class AMQ4636Test extends TestCase {
|
||||||
public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter {
|
public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter {
|
||||||
|
|
||||||
public TransactionContext getTransactionContext() throws IOException {
|
public TransactionContext getTransactionContext() throws IOException {
|
||||||
return testTransactionContext;
|
return new TestTransactionContext(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public class TestTransactionContext extends TransactionContext {
|
public class TestTransactionContext extends TransactionContext {
|
||||||
|
|
||||||
public boolean throwSQLException;
|
|
||||||
|
|
||||||
public TestTransactionContext(
|
public TestTransactionContext(
|
||||||
JDBCPersistenceAdapter jdbcPersistenceAdapter)
|
JDBCPersistenceAdapter jdbcPersistenceAdapter)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super(jdbcPersistenceAdapter);
|
super(jdbcPersistenceAdapter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void executeBatch() throws SQLException {
|
public void executeBatch() throws SQLException {
|
||||||
if (throwSQLException) {
|
if (throwSQLException.getCount() > 0) {
|
||||||
// only throw exception once
|
// only throw exception once
|
||||||
throwSQLException = false;
|
throwSQLException.countDown();
|
||||||
throw new SQLException("TEST SQL EXCEPTION");
|
throw new SQLException("TEST SQL EXCEPTION");
|
||||||
}
|
}
|
||||||
super.executeBatch();
|
super.executeBatch();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue