[AMQ-6548] ensure any pending xa transaction is marked rollback only on delivery failure exception from on message, before delegating to potential clientInternalExceptionListener. Variant of patch applied with additional test - thanks to Andrey Dyachikhin for the patch inspiration

This commit is contained in:
gtully 2017-01-17 16:51:27 +00:00
parent 20522394cc
commit 85181d630c
2 changed files with 84 additions and 0 deletions

View File

@ -1043,6 +1043,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
} catch (Throwable e) { } catch (Throwable e) {
LOG.error("error dispatching message: ", e); LOG.error("error dispatching message: ", e);
if (getTransactionContext().isInXATransaction()) {
LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext());
getTransactionContext().setRollbackOnly(true);
}
// A problem while invoking the MessageListener does not // A problem while invoking the MessageListener does not
// in general indicate a problem with the connection to the broker, i.e. // in general indicate a problem with the connection to the broker, i.e.
// it will usually be sufficient to let the afterDelivery() method either // it will usually be sufficient to let the afterDelivery() method either

View File

@ -906,6 +906,85 @@ public class MDBTest {
adapter.stop(); adapter.stop();
} }
@Test(timeout = 90000)
public void testXaOnMessageExceptionRollback() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
adapter.setServerUrl("vm://localhost?broker.persistent=false");
adapter.start(new StubBootstrapContext());
final CountDownLatch messageDelivered = new CountDownLatch(1);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
@Override
public void onMessage(Message message) {
super.onMessage(message);
messageDelivered.countDown();
throw new RuntimeException("Failure");
};
@Override
public void afterDelivery() throws ResourceException {
try {
xaresource.end(xid, XAResource.TMSUCCESS);
xaresource.commit(xid, true);
} catch (Throwable e) {
throw new ResourceException(e);
}
}
};
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
activationSpec.setDestinationType(Queue.class.getName());
activationSpec.setDestination("TEST");
activationSpec.setResourceAdapter(adapter);
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
@Override
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
@Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
};
// Activate an Endpoint
adapter.endpointActivation(messageEndpointFactory, activationSpec);
// Give endpoint a chance to setup and register its listeners
try {
Thread.sleep(1000);
} catch (Exception e) {
}
// Send the broker a message to that endpoint
MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
producer.send(session.createTextMessage("Hello!"));
// Wait for the message to be delivered twice.
assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS));
// Shut the Endpoint down.
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
adapter.stop();
// assert message still available
MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue("TEST"));
assertNotNull("got the message", messageConsumer.receive(5000));
connection.close();
}
public Xid createXid() throws IOException { public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos); DataOutputStream os = new DataOutputStream(baos);