Ensure that unsettled TX messages remain acquired and not redelivered to
the receiver.   Add several tests that demonstrate that a received
message can be released, rejected, accepted or modified after a TX
rollback if it was not settled.
This commit is contained in:
Timothy Bish 2016-09-28 14:56:36 -04:00
parent 634b42016a
commit 0dd806f43f
7 changed files with 232 additions and 61 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp.protocol;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
@ -78,11 +79,11 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
}
@Override
public void commit() throws Exception {
public void commit(LocalTransactionId txnId) throws Exception {
}
@Override
public void rollback() throws Exception {
public void rollback(LocalTransactionId txnId) throws Exception {
}
@Override

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp.protocol;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
@ -60,17 +61,23 @@ public interface AmqpLink extends AmqpResource {
* Handle work necessary on commit of transacted resources associated with
* this Link instance.
*
* @param txnId
* The Transaction ID being committed.
*
* @throws Exception if an error occurs while performing the commit.
*/
void commit() throws Exception;
void commit(LocalTransactionId txnId) throws Exception;
/**
* Handle work necessary on rollback of transacted resources associated with
* this Link instance.
*
* @param txnId
* The Transaction ID being rolled back.
*
* @throws Exception if an error occurs while performing the rollback.
*/
void rollback() throws Exception;
void rollback(LocalTransactionId txnId) throws Exception;
/**
* @return the ActiveMQDestination that this link is servicing.

View File

@ -78,7 +78,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer();
private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
private final LinkedList<Delivery> dispatchedInTx = new LinkedList<Delivery>();
private final ConsumerInfo consumerInfo;
private AbstractSubscription subscription;
@ -208,26 +208,26 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
} else if (endpointCredit >= 0) {
if (endpointCredit == 0 && currentCreditRequest != 0) {
prefetchExtension.set(0);
currentCreditRequest = 0;
logicalDeliveryCount = 0;
LOG.trace("Flow: credit 0 for sub:" + subscription);
} else {
int deltaToAdd = endpointCredit;
int logicalCredit = currentCreditRequest - logicalDeliveryCount;
if (logicalCredit > 0) {
deltaToAdd -= logicalCredit;
} else {
// reset delivery counter - dispatch from broker concurrent with credit=0 flow can go negative
// reset delivery counter - dispatch from broker concurrent with credit=0
// flow can go negative
logicalDeliveryCount = 0;
}
if (deltaToAdd > 0) {
currentCreditRequest = prefetchExtension.addAndGet(deltaToAdd);
subscription.wakeupDestinationsForDispatch();
// force dispatch of matched/pending for topics (pending messages accumulate in the sub and are dispatched on update of prefetch)
// force dispatch of matched/pending for topics (pending messages accumulate
// in the sub and are dispatched on update of prefetch)
subscription.setPrefetchSize(0);
LOG.trace("Flow: credit addition of {} for sub {}", deltaToAdd, subscription);
}
@ -246,14 +246,20 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
if (txState.getOutcome() != null) {
Outcome outcome = txState.getOutcome();
if (outcome instanceof Accepted) {
TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId()));
// Store the message sent in this TX we might need to re-send on rollback
// and we need to ACK it on commit.
session.enlist(txId);
dispatchedInTx.addFirst(delivery);
if (!delivery.remotelySettled()) {
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
txAccepted.setTxnId(((TransactionalState) state).getTxnId());
txAccepted.setTxnId(txState.getTxnId());
delivery.disposition(txAccepted);
}
settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
}
}
} else {
@ -294,12 +300,14 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
}
@Override
public void commit() throws Exception {
public void commit(LocalTransactionId txnId) throws Exception {
if (!dispatchedInTx.isEmpty()) {
for (MessageDispatch md : dispatchedInTx) {
MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
pendingTxAck.setFirstMessageId(md.getMessage().getMessageId());
pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
for (final Delivery delivery : dispatchedInTx) {
MessageDispatch dispatch = (MessageDispatch) delivery.getContext();
MessageAck pendingTxAck = new MessageAck(dispatch, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
pendingTxAck.setFirstMessageId(dispatch.getMessage().getMessageId());
pendingTxAck.setTransactionId(txnId);
LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
@ -310,6 +318,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
Throwable exception = ((ExceptionResponse) response).getException();
exception.printStackTrace();
getEndpoint().close();
} else {
delivery.settle();
}
session.pumpProtonToSocket();
}
@ -321,15 +331,22 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
}
@Override
public void rollback() throws Exception {
public void rollback(LocalTransactionId txnId) throws Exception {
synchronized (outbound) {
LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size());
for (MessageDispatch dispatch : dispatchedInTx) {
dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1);
for (Delivery delivery : dispatchedInTx) {
// Only settled deliveries should be re-dispatched, unsettled deliveries
// remain acquired on the remote end and can be accepted again in a new
// TX or released or rejected etc.
MessageDispatch dispatch = (MessageDispatch) delivery.getContext();
dispatch.getMessage().setTransactionId(null);
outbound.addFirst(dispatch);
if (delivery.remotelySettled()) {
dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1);
outbound.addFirst(dispatch);
}
}
dispatchedInTx.clear();
@ -507,19 +524,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
ack.setMessageCount(1);
ack.setAckType((byte) ackType);
ack.setDestination(md.getDestination());
DeliveryState remoteState = delivery.getRemoteState();
if (remoteState != null && remoteState instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) remoteState;
TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId()));
ack.setTransactionId(txId);
// Store the message sent in this TX we might need to re-send on rollback
session.enlist(txId);
md.getMessage().setTransactionId(txId);
dispatchedInTx.addFirst(md);
}
LOG.trace("Sending Ack to ActiveMQ: {}", ack);
sendToActiveMQ(ack, new ResponseHandler() {

View File

@ -35,6 +35,7 @@ import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
@ -123,11 +124,14 @@ public class AmqpSession implements AmqpResource {
/**
* Commits all pending work for all resources managed under this session.
*
* @param txId
* The specific TransactionId that is being committed.
*
* @throws Exception if an error occurs while attempting to commit work.
*/
public void commit() throws Exception {
public void commit(LocalTransactionId txId) throws Exception {
for (AmqpSender consumer : consumers.values()) {
consumer.commit();
consumer.commit(txId);
}
enlisted = false;
@ -136,11 +140,14 @@ public class AmqpSession implements AmqpResource {
/**
* Rolls back any pending work being down under this session.
*
* @param txId
* The specific TransactionId that is being rolled back.
*
* @throws Exception if an error occurs while attempting to roll back work.
*/
public void rollback() throws Exception {
public void rollback(LocalTransactionId txId) throws Exception {
for (AmqpSender consumer : consumers.values()) {
consumer.rollback();
consumer.rollback(txId);
}
enlisted = false;

View File

@ -98,7 +98,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
TransactionInfo txInfo = new TransactionInfo(connectionId, txId, TransactionInfo.BEGIN);
session.getConnection().registerTransaction(txId, this);
sendToActiveMQ(txInfo, null);
LOG.trace("started transaction {}", txId.getValue());
LOG.trace("started transaction {}", txId);
Declared declared = new Declared();
declared.setTxnId(new Binary(toBytes(txId.getValue())));
@ -110,18 +110,18 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
final byte operation;
if (discharge.getFail()) {
LOG.trace("rollback transaction {}", txId.getValue());
LOG.trace("rollback transaction {}", txId);
operation = TransactionInfo.ROLLBACK;
} else {
LOG.trace("commit transaction {}", txId.getValue());
LOG.trace("commit transaction {}", txId);
operation = TransactionInfo.COMMIT_ONE_PHASE;
}
for (AmqpSession txSession : txSessions) {
if (operation == TransactionInfo.ROLLBACK) {
txSession.rollback();
txSession.rollback(txId);
} else {
txSession.commit();
txSession.commit(txId);
}
}

View File

@ -32,7 +32,11 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Ignore;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.junit.Test;
/**
@ -89,7 +93,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(1, queue.getQueueSize());
sender.close();
connection.close();
}
@ -114,7 +117,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(0, queue.getQueueSize());
sender.close();
connection.close();
}
@ -146,7 +148,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(0, queue.getQueueSize());
sender.close();
connection.close();
}
@ -194,7 +195,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testReceiveMessageWithRollback() throws Exception {
AmqpClient client = createAmqpClient();
@ -223,7 +223,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(1, queue.getQueueSize());
sender.close();
connection.close();
}
@ -421,6 +420,163 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(0, queue.getQueueSize());
}
@Test(timeout = 60000)
public void testAcceptedButNotSettledInTXRemainsAquiredCanBeAccepted() throws Exception {
doTestAcceptedButNotSettledInTXRemainsAquired(Accepted.getInstance());
}
@Test(timeout = 60000)
public void testAcceptedButNotSettledInTXRemainsAquiredCanBeReleased() throws Exception {
doTestAcceptedButNotSettledInTXRemainsAquired(Released.getInstance());
}
@Test(timeout = 60000)
public void testAcceptedButNotSettledInTXRemainsAquiredCanBeRejected() throws Exception {
doTestAcceptedButNotSettledInTXRemainsAquired(new Rejected());
}
@Test(timeout = 60000)
public void testAcceptedButNotSettledInTXRemainsAquiredCanBeModifiedAsFailed() throws Exception {
Modified outcome = new Modified();
outcome.setDeliveryFailed(true);
doTestAcceptedButNotSettledInTXRemainsAquired(outcome);
}
@Test(timeout = 60000)
public void testAcceptedButNotSettledInTXRemainsAquiredCanBeModifiedAsUndeliverable() throws Exception {
Modified outcome = new Modified();
outcome.setDeliveryFailed(true);
outcome.setUndeliverableHere(true);
doTestAcceptedButNotSettledInTXRemainsAquired(outcome);
}
private void doTestAcceptedButNotSettledInTXRemainsAquired(Outcome outcome) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
final QueueViewMBean queue = getProxyToQueue(getTestName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
session.begin();
receiver.flow(10);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
received.accept(false);
session.rollback();
// Message should remain acquired an not be redelivered.
assertEquals(1, queue.getQueueSize());
assertNull(receiver.receive(2, TimeUnit.SECONDS));
if (outcome instanceof Released || outcome instanceof Rejected) {
// Receiver should be able to release the still acquired message and the
// broker should redispatch it to the client again.
received.release();
received = receiver.receive(3, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
received = receiver.receive(2, TimeUnit.SECONDS);
assertNull(received);
assertEquals(0, queue.getQueueSize());
} else if (outcome instanceof Accepted) {
// Receiver should be able to accept the still acquired message and the
// broker should then mark it as consumed.
received.accept();
received = receiver.receive(2, TimeUnit.SECONDS);
assertNull(received);
assertEquals(0, queue.getQueueSize());
} else if (outcome instanceof Modified) {
// Depending on the undeliverable here state the message will either be
// redelivered or DLQ'd
Modified modified = (Modified) outcome;
received.modified(Boolean.TRUE.equals(modified.getDeliveryFailed()), Boolean.TRUE.equals(modified.getUndeliverableHere()));
if (Boolean.TRUE.equals(modified.getUndeliverableHere())) {
received = receiver.receive(2, TimeUnit.SECONDS);
assertNull(received);
assertEquals(0, queue.getQueueSize());
} else {
received = receiver.receive(3, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
received = receiver.receive(2, TimeUnit.SECONDS);
assertNull(received);
assertEquals(0, queue.getQueueSize());
}
}
connection.close();
}
@Test(timeout = 60000)
public void testTransactionallyAcquiredMessageCanBeTransactionallyConsumed() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
final QueueViewMBean queue = getProxyToQueue(getTestName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
session.begin();
receiver.flow(10);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
received.accept(false);
session.rollback();
// Message should remain acquired an not be redelivered.
assertEquals(1, queue.getQueueSize());
assertNull(receiver.receive(1, TimeUnit.SECONDS));
// Consume under TX but settle this time
session.begin();
received.accept(false);
session.rollback();
// Should still be acquired
assertEquals(1, queue.getQueueSize());
assertNull(receiver.receive(1, TimeUnit.SECONDS));
// Consume under TX and settle but rollback, message should be redelivered.
session.begin();
received.accept();
session.rollback();
assertEquals(1, queue.getQueueSize());
received = receiver.receive(1, TimeUnit.SECONDS);
assertNotNull(received);
// Consume under TX and commit it this time.
session.begin();
received.accept(false);
session.commit();
// Check that it is now consumed and no more message available
assertTrue(received.getWrappedDelivery().remotelySettled());
assertEquals(0, queue.getQueueSize());
assertNull(receiver.receive(1, TimeUnit.SECONDS));
connection.close();
}
//----- Tests Ported from AmqpNetLite client -----------------------------//
@Test(timeout = 60000)
@ -621,9 +777,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
connection.close();
}
// TODO - Direct ports of the AmqpNetLite client tests that don't currently with this broker.
@Ignore("Fails due to no support for TX enrollment without settlement.")
@Test(timeout = 60000)
public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception {
final int NUM_MESSAGES = 10;
@ -701,7 +854,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
connection.close();
}
@Ignore("Fails due to no support for TX enrollment without settlement.")
@Test(timeout = 60000)
public void testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception {
final int NUM_MESSAGES = 10;
@ -756,12 +908,12 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
message2.release();
// Should be two message available for dispatch given that we sent and committed one, and
// Should be ten message available for dispatch given that we sent and committed one, and
// releases another we had previously received.
receiver.flow(2);
receiver.flow(10);
for (int i = 1; i <= NUM_MESSAGES; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
assertNotNull("Expected a message for: " + i, message);
assertEquals(i, message.getApplicationProperty("msgId"));
message.accept();
}

View File

@ -480,7 +480,7 @@ public class JMSMappingOutboundTransformerTest {
@Test
public void testConvertObjectMessageToAmqpMessageWithDataBody() throws Exception {
ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
outbound.onSend();
outbound.storeContent();
@ -502,7 +502,7 @@ public class JMSMappingOutboundTransformerTest {
@Test
public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
outbound.onSend();
outbound.storeContent();
@ -525,7 +525,7 @@ public class JMSMappingOutboundTransformerTest {
@Test
public void testConvertObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.onSend();
outbound.storeContent();
@ -571,7 +571,7 @@ public class JMSMappingOutboundTransformerTest {
@Test
public void testConvertCompressedObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
outbound.onSend();
outbound.storeContent();
@ -594,7 +594,7 @@ public class JMSMappingOutboundTransformerTest {
@Test
public void testConvertCompressedObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.onSend();
outbound.storeContent();