AMQ-3166 - implement rollbackOnlyOnAsyncException such that async exceptions on transactional sends or acks result in the transaction being marked rollback only and commit failing with an exception. Test that shows current state of play using alwaySendSync or AsyncCallback. rollbackOnlyOnAsyncException enabled by default.

This commit is contained in:
gtully 2016-05-25 11:24:43 +01:00
parent f46b2927a4
commit fe9d99e7a0
7 changed files with 383 additions and 52 deletions

View File

@ -264,6 +264,7 @@ public class BrokerService implements Service {
private boolean restartAllowed = true;
private boolean restartRequested = false;
private boolean rejectDurableConsumers = false;
private boolean rollbackOnlyOnAsyncException = true;
private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
@ -3205,4 +3206,12 @@ public class BrokerService implements Service {
public void setAdjustUsageLimits(boolean adjustUsageLimits) {
this.adjustUsageLimits = adjustUsageLimits;
}
public void setRollbackOnlyOnAsyncException(boolean rollbackOnlyOnAsyncException) {
this.rollbackOnlyOnAsyncException = rollbackOnlyOnAsyncException;
}
public boolean isRollbackOnlyOnAsyncException() {
return rollbackOnlyOnAsyncException;
}
}

View File

@ -108,6 +108,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
// Keeps track of the broker and connector that created this connection.
protected final Broker broker;
protected final BrokerService brokerService;
protected final TransportConnector connector;
// Keeps track of the state of the connections.
// protected final ConcurrentHashMap localConnectionStates=new
@ -162,6 +163,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
this.connector = connector;
this.broker = broker;
this.brokerService = broker.getBrokerService();
RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
brokerConnectionStates = rb.getConnectionStates();
if (connector != null) {
@ -171,7 +174,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
this.taskRunnerFactory = taskRunnerFactory;
this.stopTaskRunnerFactory = stopTaskRunnerFactory;
this.transport = transport;
final BrokerService brokerService = this.broker.getBrokerService();
if( this.transport instanceof BrokerServiceAware ) {
((BrokerServiceAware)this.transport).setBrokerService(brokerService);
}
@ -223,20 +225,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
public void serviceTransportException(IOException e) {
BrokerService bService = connector.getBrokerService();
if (bService.isShutdownOnSlaveFailure()) {
if (brokerInfo != null) {
if (brokerInfo.isSlaveBroker()) {
LOG.error("Slave has exception: {} shutting down master now.", e.getMessage(), e);
try {
doStop();
bService.stop();
} catch (Exception ex) {
LOG.warn("Failed to stop the master", ex);
}
}
}
}
if (!stopping.get() && !pendingStop) {
transportException.set(e);
if (TRANSPORTLOG.isDebugEnabled()) {
@ -357,6 +345,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
response = new ExceptionResponse(e);
} else {
forceRollbackOnlyOnFailedAsyncTransactionOp(e, command);
serviceException(e);
}
}
@ -379,6 +368,42 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return response;
}
private void forceRollbackOnlyOnFailedAsyncTransactionOp(Throwable e, Command command) {
if (brokerService.isRollbackOnlyOnAsyncException() && !(e instanceof IOException) && isInTransaction(command)) {
Transaction transaction = getActiveTransaction(command);
if (transaction != null && !transaction.isRollbackOnly()) {
LOG.debug("on async exception, force rollback of transaction for: " + command, e);
transaction.setRollbackOnly(e);
}
}
}
private Transaction getActiveTransaction(Command command) {
Transaction transaction = null;
try {
if (command instanceof Message) {
Message messageSend = (Message) command;
ProducerId producerId = messageSend.getProducerId();
ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
transaction = producerExchange.getConnectionContext().getTransactions().get(messageSend.getTransactionId());
} else if (command instanceof MessageAck) {
MessageAck messageAck = (MessageAck) command;
ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(messageAck.getConsumerId());
if (consumerExchange != null) {
transaction = consumerExchange.getConnectionContext().getTransactions().get(messageAck.getTransactionId());
}
}
} catch(Exception ignored){
LOG.trace("failed to find active transaction for command: " + command, ignored);
}
return transaction;
}
private boolean isInTransaction(Command command) {
return command instanceof Message && ((Message)command).isInTransaction()
|| command instanceof MessageAck && ((MessageAck)command).isInTransaction();
}
@Override
public Response processKeepAlive(KeepAliveInfo info) throws Exception {
return null;
@ -1390,10 +1415,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
if (duplexName.contains("#")) {
duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
}
MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), config, broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName));
listener.setCreatedByDuplex(true);
duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
duplexBridge.setBrokerService(broker.getBrokerService());
duplexBridge.setBrokerService(brokerService);
// now turn duplex off this side
info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true);
@ -1483,7 +1508,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
context = state.getContext();
result.setConnectionContext(context);
if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
result.setLastStoredSequenceId(brokerService.getPersistenceAdapter().getLastProducerSequenceId(id));
}
SessionState ss = state.getSessionState(id.getParentId());
if (ss != null) {

View File

@ -24,7 +24,10 @@ import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import javax.jms.TransactionRolledBackException;
import javax.transaction.xa.XAException;
import org.apache.activemq.TransactionContext;
import org.apache.activemq.command.TransactionId;
import org.slf4j.Logger;
@ -41,6 +44,7 @@ public abstract class Transaction {
public static final byte PREPARED_STATE = 2; // can go to: 3
public static final byte FINISHED_STATE = 3;
boolean committed = false;
boolean rollbackOnly = false;
private final ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
private byte state = START_STATE;
@ -103,16 +107,16 @@ public abstract class Transaction {
case IN_USE_STATE:
break;
default:
XAException xae = new XAException("Prepare cannot be called now.");
xae.errorCode = XAException.XAER_PROTO;
XAException xae = newXAException("Prepare cannot be called now", XAException.XAER_PROTO);
throw xae;
}
// // Run the prePrepareTasks
// for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) {
// Callback r = (Callback) iter.next();
// r.execute();
// }
if (rollbackOnly) {
XAException xae = newXAException("COMMIT FAILED: Transaction marked rollback only", XAException.XA_RBROLLBACK);
TransactionRolledBackException transactionRolledBackException = new TransactionRolledBackException(xae.getLocalizedMessage());
xae.initCause(transactionRolledBackException);
throw xae;
}
}
protected void fireBeforeCommit() throws Exception {
@ -184,8 +188,7 @@ public abstract class Transaction {
// I guess this could happen. Post commit task failed
// to execute properly.
getLog().warn("PRE COMMIT FAILED: ", e);
XAException xae = new XAException("PRE COMMIT FAILED");
xae.errorCode = XAException.XAER_RMERR;
XAException xae = newXAException("PRE COMMIT FAILED", XAException.XAER_RMERR);
xae.initCause(e);
throw xae;
}
@ -199,10 +202,27 @@ public abstract class Transaction {
// I guess this could happen. Post commit task failed
// to execute properly.
getLog().warn("POST COMMIT FAILED: ", e);
XAException xae = new XAException("POST COMMIT FAILED");
xae.errorCode = XAException.XAER_RMERR;
XAException xae = newXAException("POST COMMIT FAILED", XAException.XAER_RMERR);
xae.initCause(e);
throw xae;
}
}
public static XAException newXAException(String s, int errorCode) {
XAException xaException = new XAException(s + " " + TransactionContext.xaErrorCodeMarker + errorCode);
xaException.errorCode = errorCode;
return xaException;
}
public void setRollbackOnly(Throwable cause) {
if (!rollbackOnly) {
getLog().trace("setting rollback only, cause:", cause);
rollbackOnly = true;
}
}
public boolean isRollbackOnly() {
return rollbackOnly;
}
}

View File

@ -170,12 +170,6 @@ public class XATransaction extends Transaction {
}
}
public static XAException newXAException(String s, int errorCode) {
XAException xaException = new XAException(s + " " + TransactionContext.xaErrorCodeMarker + errorCode);
xaException.errorCode = errorCode;
return xaException;
}
@Override
public int prepare() throws XAException, IOException {
if (LOG.isDebugEnabled()) {

View File

@ -0,0 +1,294 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.AsyncCallback;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TransactionRolledBackException;
import javax.transaction.xa.XAException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class AMQ3166Test {
private static final Logger LOG = LoggerFactory.getLogger(AMQ3166Test.class);
private BrokerService brokerService;
private AtomicInteger sendAttempts = new AtomicInteger(0);
@Test
public void testCommitThroughAsyncErrorNoForceRollback() throws Exception {
startBroker(false);
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue("QAT"));
for (int i=0; i<10; i++) {
producer.send(session.createTextMessage("Hello A"));
}
session.commit();
assertTrue("only one message made it through", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerService.getAdminView().getTotalEnqueueCount() == 1;
}
}));
connection.close();
}
@Test
public void testCommitThroughAsyncErrorForceRollback() throws Exception {
startBroker(true);
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue("QAT"));
try {
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("Hello A"));
}
session.commit();
fail("Expect TransactionRolledBackException");
} catch (JMSException expected) {
assertTrue(expected.getCause() instanceof XAException);
}
assertTrue("only one message made it through", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerService.getAdminView().getTotalEnqueueCount() == 0;
}
}));
connection.close();
}
@Test
public void testAckCommitThroughAsyncErrorForceRollback() throws Exception {
startBroker(true);
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue destination = session.createQueue("QAT");
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage("Hello A"));
producer.close();
session.commit();
MessageConsumer messageConsumer = session.createConsumer(destination);
assertNotNull("got message", messageConsumer.receive(4000));
try {
session.commit();
fail("Expect TransactionRolledBackException");
} catch (JMSException expected) {
assertTrue(expected.getCause() instanceof XAException);
}
assertTrue("one message still there!", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerService.getAdminView().getTotalMessageCount() == 1;
}
}));
connection.close();
}
@Test
public void testErrorOnSyncSend() throws Exception {
startBroker(false);
ActiveMQConnection connection = (ActiveMQConnection) createConnection();
connection.setAlwaysSyncSend(true);
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue("QAT"));
try {
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("Hello A"));
}
session.commit();
} catch (JMSException expectedSendFail) {
LOG.info("Got expected: " + expectedSendFail);
session.rollback();
}
assertTrue("only one message made it through", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerService.getAdminView().getTotalEnqueueCount() == 0;
}
}));
connection.close();
}
@Test
public void testRollbackOnAsyncErrorAmqApi() throws Exception {
startBroker(false);
ActiveMQConnection connection = (ActiveMQConnection) createConnection();
connection.start();
final ActiveMQSession session = (ActiveMQSession) connection.createSession(true, Session.SESSION_TRANSACTED);
int batchSize = 10;
final CountDownLatch batchSent = new CountDownLatch(batchSize);
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(session.createQueue("QAT"));
for (int i=0; i<batchSize; i++) {
producer.send(session.createTextMessage("Hello A"), new AsyncCallback() {
@Override
public void onSuccess() {
batchSent.countDown();
}
@Override
public void onException(JMSException e) {
session.getTransactionContext().setRollbackOnly(true);
batchSent.countDown();
}
});
if (i==0) {
// transaction context begun on first send
session.getTransactionContext().addSynchronization(new Synchronization() {
@Override
public void beforeEnd() throws Exception {
// await response to all sends in the batch
if (!batchSent.await(10, TimeUnit.SECONDS)) {
LOG.error("TimedOut waiting for aync send requests!");
session.getTransactionContext().setRollbackOnly(true);
};
super.beforeEnd();
}
});
}
}
try {
session.commit();
fail("expect rollback on async error");
} catch (TransactionRolledBackException expected) {
}
assertTrue("only one message made it through", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerService.getAdminView().getTotalEnqueueCount() == 0;
}
}));
connection.close();
}
private Connection createConnection() throws Exception {
String connectionURI = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionURI);
cf.setWatchTopicAdvisories(false);
return cf.createConnection();
}
public void startBroker(boolean forceRollbackOnAsyncSendException) throws Exception {
brokerService = createBroker(forceRollbackOnAsyncSendException);
brokerService.start();
brokerService.waitUntilStarted();
}
@After
public void tearDown() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
brokerService = null;
}
}
protected BrokerService createBroker(boolean forceRollbackOnAsyncSendException) throws Exception {
BrokerService answer = new BrokerService();
answer.setPersistent(true);
answer.setDeleteAllMessagesOnStartup(true);
answer.setAdvisorySupport(false);
answer.setRollbackOnlyOnAsyncException(forceRollbackOnAsyncSendException);
answer.addConnector("tcp://0.0.0.0:0");
answer.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
if (ack.isStandardAck()) {
throw new RuntimeException("no way, won't allow any standard ack");
}
super.acknowledge(consumerExchange, ack);
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
if (sendAttempts.incrementAndGet() > 1) {
throw new RuntimeException("no way, won't accept any messages");
}
super.send(producerExchange, messageSend);
}
}
});
return answer;
}
}

View File

@ -90,7 +90,7 @@ public class SimpleNetworkTest {
Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
Message msg = consumer1.receive(3000);
assertNotNull(msg);
assertNotNull("not null? message: " + i, msg);
ActiveMQMessage amqMessage = (ActiveMQMessage) msg;
assertTrue(amqMessage.isCompressed());
}

View File

@ -263,9 +263,6 @@ public class FailoverTransactionTest extends TestSupport {
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session2.createConsumer(destination);
msg = consumer.receive(1000);
if (msg == null) {
msg = consumer.receive(5000);
}
LOG.info("Received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
@ -363,9 +360,6 @@ public class FailoverTransactionTest extends TestSupport {
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session2.createConsumer(destination);
msg = consumer.receive(1000);
if (msg == null) {
msg = consumer.receive(5000);
}
LOG.info("Received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
@ -478,9 +472,6 @@ public class FailoverTransactionTest extends TestSupport {
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session2.createConsumer(destination);
msg = consumer.receive(1000);
if (msg == null) {
msg = consumer.receive(5000);
}
LOG.info("Received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
@ -602,9 +593,6 @@ public class FailoverTransactionTest extends TestSupport {
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session2.createConsumer(destination);
msg = consumer.receive(1000);
if (msg == null) {
msg = consumer.receive(5000);
}
LOG.info("Received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
@ -626,7 +614,11 @@ public class FailoverTransactionTest extends TestSupport {
broker.stop();
startBroker(false, url);
session.commit();
try {
session.commit();
fail("expect ex for rollback only on async exc");
} catch (JMSException expected) {
}
// without tracking producers, message will not be replayed on recovery
assertNull("we got the message", consumer.receive(5000));
@ -886,9 +878,6 @@ public class FailoverTransactionTest extends TestSupport {
Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer sweeper = sweeperSession.createConsumer(destination);
msg = sweeper.receive(1000);
if (msg == null) {
msg = sweeper.receive(5000);
}
LOG.info("Sweep received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();