https://issues.apache.org/jira/browse/AMQ-4634 - create ack with known transaction context so that subesquent timeout triggers a rollback and does not leave a null transaction

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1504279 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-07-17 21:11:37 +00:00
parent 1982d54224
commit 1408e7f63b
3 changed files with 115 additions and 16 deletions

View File

@ -857,27 +857,19 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
md.setDeliverySequenceId(getNextDeliveryId());
final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
try {
messageListener.onMessage(message);
} catch (RuntimeException e) {
LOG.error("error dispatching message: ", e);
// A problem while invoking the MessageListener does not
// in general indicate a problem with the connection to the broker, i.e.
// it will usually be sufficient to let the afterDelivery() method either
// commit or roll back in order to deal with the exception.
// However, we notify any registered client internal exception listener
// of the problem.
connection.onClientInternalException(e);
}
try {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
doStartTransaction();
ack.setTransactionId(getTransactionContext().getTransactionId());
if (ack.getTransactionId() != null) {
getTransactionContext().addSynchronization(new Synchronization() {
@Override
public void beforeEnd() throws Exception {
asyncSendPacket(ack);
}
@Override
public void afterRollback() throws Exception {
md.getMessage().onMessageRolledBack();
@ -893,7 +885,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
// Acknowledge the last message.
MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
asyncSendPacket(ack);
} else {
MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
@ -916,10 +910,27 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
});
}
messageListener.onMessage(message);
} catch (Throwable e) {
LOG.error("error dispatching message: ", e);
// A problem while invoking the MessageListener does not
// in general indicate a problem with the connection to the broker, i.e.
// it will usually be sufficient to let the afterDelivery() method either
// commit or roll back in order to deal with the exception.
// However, we notify any registered client internal exception listener
// of the problem.
connection.onClientInternalException(e);
} finally {
if (ack.getTransactionId() == null) {
try {
asyncSendPacket(ack);
} catch (Throwable e) {
connection.onClientInternalException(e);
}
}
}
if (deliveryListener != null) {
deliveryListener.afterDelivery(this, message);

View File

@ -223,7 +223,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
try {
endpoint.afterDelivery();
} catch (Throwable e) {
throw new RuntimeException("Endpoint after delivery notification failure", e);
throw new RuntimeException("Endpoint after delivery notification failure: " + e, e);
} finally {
TransactionContext transactionContext = session.getTransactionContext();
if (transactionContext != null && transactionContext.isInLocalTransaction()) {

View File

@ -502,6 +502,94 @@ public class MDBTest extends TestCase {
}
public void testXaTimeoutRedelivery() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
adapter.setServerUrl("vm://localhost?broker.persistent=false");
adapter.start(new StubBootstrapContext());
final CountDownLatch messageDelivered = new CountDownLatch(2);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
public void onMessage(Message message) {
super.onMessage(message);
try {
messageDelivered.countDown();
if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) {
// simulate abort, timeout
try {
xaresource.end(xid, XAResource.TMFAIL);
xaresource.rollback(xid);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
} else {
try {
assertTrue(message.getJMSRedelivered());
} catch (JMSException e) {
e.printStackTrace();
}
}
} catch (InterruptedException ignored) {
}
};
public void afterDelivery() throws ResourceException {
try {
xaresource.end(xid, XAResource.TMSUCCESS);
xaresource.commit(xid, true);
} catch (Throwable e) {
throw new ResourceException(e);
}
}
};
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
activationSpec.setDestinationType(Queue.class.getName());
activationSpec.setDestination("TEST");
activationSpec.setResourceAdapter(adapter);
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
};
// Activate an Endpoint
adapter.endpointActivation(messageEndpointFactory, activationSpec);
// Give endpoint a chance to setup and register its listeners
try {
Thread.sleep(1000);
} catch (Exception e) {
}
// Send the broker a message to that endpoint
MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
producer.send(session.createTextMessage("Hello!"));
connection.close();
// Wait for the message to be delivered twice.
assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS));
// Shut the Endpoint down.
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
adapter.stop();
}
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);