mirror of https://github.com/apache/activemq.git
NO-JIRA: Add some more variants of the .NET transaction tests
Adds ability to not settle accepted messages on the client to enable creation of tests that are equivalent to the AmqpNetLite client's transaction tests which hold settlement and expect the resource to handle it on successful discharge.
This commit is contained in:
parent
9f812a2103
commit
5d53aa2d11
|
@ -132,11 +132,36 @@ public class AmqpMessage {
|
|||
* @throws Exception if an error occurs during the accept.
|
||||
*/
|
||||
public void accept() throws Exception {
|
||||
accept(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Accepts the message marking it as consumed on the remote peer.
|
||||
*
|
||||
* @param settle
|
||||
* true if the client should also settle the delivery when sending the accept.
|
||||
*
|
||||
* @throws Exception if an error occurs during the accept.
|
||||
*/
|
||||
public void accept(boolean settle) throws Exception {
|
||||
if (receiver == null) {
|
||||
throw new IllegalStateException("Can't accept non-received message.");
|
||||
}
|
||||
|
||||
receiver.accept(delivery);
|
||||
receiver.accept(delivery, settle);
|
||||
}
|
||||
|
||||
/**
|
||||
* Accepts the message marking it as consumed on the remote peer. This method
|
||||
* will automatically settle the accepted delivery.
|
||||
*
|
||||
* @param session
|
||||
* The session that is used to manage acceptance of the message.
|
||||
*
|
||||
* @throws Exception if an error occurs during the accept.
|
||||
*/
|
||||
public void accept(AmqpSession txnSession) throws Exception {
|
||||
accept(txnSession, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -147,12 +172,12 @@ public class AmqpMessage {
|
|||
*
|
||||
* @throws Exception if an error occurs during the accept.
|
||||
*/
|
||||
public void accept(AmqpSession txnSession) throws Exception {
|
||||
public void accept(AmqpSession txnSession, boolean settle) throws Exception {
|
||||
if (receiver == null) {
|
||||
throw new IllegalStateException("Can't accept non-received message.");
|
||||
}
|
||||
|
||||
receiver.accept(delivery, txnSession);
|
||||
receiver.accept(delivery, txnSession, settle);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -414,20 +414,34 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Accepts a message that was dispatched under the given Delivery instance.
|
||||
* Accepts a message that was dispatched under the given Delivery instance and settles the delivery.
|
||||
*
|
||||
* @param delivery
|
||||
* the Delivery instance to accept.
|
||||
*
|
||||
* @throws IOException if an error occurs while sending the accept.
|
||||
*/
|
||||
public void accept(final Delivery delivery) throws IOException {
|
||||
accept(delivery, this.session);
|
||||
public void accept(Delivery delivery) throws IOException {
|
||||
accept(delivery, this.session, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Accepts a message that was dispatched under the given Delivery instance.
|
||||
*
|
||||
* @param delivery
|
||||
* the Delivery instance to accept.
|
||||
* @param settle
|
||||
* true if the receiver should settle the delivery or just send the disposition.
|
||||
*
|
||||
* @throws IOException if an error occurs while sending the accept.
|
||||
*/
|
||||
public void accept(Delivery delivery, boolean settle) throws IOException {
|
||||
accept(delivery, this.session, settle);
|
||||
}
|
||||
|
||||
/**
|
||||
* Accepts a message that was dispatched under the given Delivery instance and settles the delivery.
|
||||
*
|
||||
* This method allows for the session that is used in the accept to be specified by the
|
||||
* caller. This allows for an accepted message to be involved in a transaction that is
|
||||
* being managed by some other session other than the one that created this receiver.
|
||||
|
@ -440,6 +454,26 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
|||
* @throws IOException if an error occurs while sending the accept.
|
||||
*/
|
||||
public void accept(final Delivery delivery, final AmqpSession session) throws IOException {
|
||||
accept(delivery, session, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Accepts a message that was dispatched under the given Delivery instance.
|
||||
*
|
||||
* This method allows for the session that is used in the accept to be specified by the
|
||||
* caller. This allows for an accepted message to be involved in a transaction that is
|
||||
* being managed by some other session other than the one that created this receiver.
|
||||
*
|
||||
* @param delivery
|
||||
* the Delivery instance to accept.
|
||||
* @param session
|
||||
* the session under which the message is being accepted.
|
||||
* @param settle
|
||||
* true if the receiver should settle the delivery or just send the disposition.
|
||||
*
|
||||
* @throws IOException if an error occurs while sending the accept.
|
||||
*/
|
||||
public void accept(final Delivery delivery, final AmqpSession session, final boolean settle) throws IOException {
|
||||
checkClosed();
|
||||
|
||||
if (delivery == null) {
|
||||
|
@ -469,11 +503,13 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
|||
txState.setOutcome(Accepted.getInstance());
|
||||
txState.setTxnId(txnId);
|
||||
delivery.disposition(txState);
|
||||
delivery.settle();
|
||||
session.getTransactionContext().registerTxConsumer(AmqpReceiver.this);
|
||||
}
|
||||
} else {
|
||||
delivery.disposition(Accepted.getInstance());
|
||||
}
|
||||
|
||||
if (settle) {
|
||||
delivery.settle();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
|||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -574,4 +575,156 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
// TODO - Direct ports of the AmqpNetLite client tests that don't currently with this broker.
|
||||
|
||||
@Ignore("Fails due to no support for TX enrollment without settlement.")
|
||||
@Test(timeout = 60000)
|
||||
public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception {
|
||||
final int NUM_MESSAGES = 10;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
|
||||
// Root TXN session controls all TXN send lifetimes.
|
||||
AmqpSession txnSession = connection.createSession();
|
||||
|
||||
// Normal Session which won't create an TXN itself
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
||||
for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setText("Test-Message");
|
||||
message.setApplicationProperty("msgId", i);
|
||||
sender.send(message, txnSession.getTransactionId());
|
||||
}
|
||||
|
||||
// Read all messages from the Queue, do not accept them yet.
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
|
||||
receiver.flow((NUM_MESSAGES + 2) * 2);
|
||||
for (int i = 0; i < NUM_MESSAGES; ++i) {
|
||||
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(message);
|
||||
messages.add(message);
|
||||
}
|
||||
|
||||
// Commit half the consumed messages
|
||||
txnSession.begin();
|
||||
for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
|
||||
messages.get(i).accept(txnSession, false);
|
||||
}
|
||||
txnSession.commit();
|
||||
|
||||
// Rollback the other half the consumed messages
|
||||
txnSession.begin();
|
||||
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
|
||||
messages.get(i).accept(txnSession, false);
|
||||
}
|
||||
txnSession.rollback();
|
||||
|
||||
// After rollback message should still be acquired so we read last sent message.
|
||||
{
|
||||
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(message);
|
||||
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
|
||||
message.release();
|
||||
}
|
||||
|
||||
// Commit the other half the consumed messages
|
||||
txnSession.begin();
|
||||
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
|
||||
messages.get(i).accept(txnSession);
|
||||
}
|
||||
txnSession.commit();
|
||||
|
||||
// The final message should still be pending.
|
||||
{
|
||||
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||
receiver.flow(1);
|
||||
assertNotNull(message);
|
||||
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
|
||||
message.accept();
|
||||
}
|
||||
|
||||
// We should have now drained the Queue
|
||||
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||
receiver.flow(1);
|
||||
assertNull(message);
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Ignore("Fails due to no support for TX enrollment without settlement.")
|
||||
@Test(timeout = 60000)
|
||||
public void testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception {
|
||||
final int NUM_MESSAGES = 10;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
|
||||
// Root TXN session controls all TXN send lifetimes.
|
||||
AmqpSession txnSession = connection.createSession();
|
||||
|
||||
// Normal Session which won't create an TXN itself
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
||||
for (int i = 0; i < NUM_MESSAGES; ++i) {
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setText("Test-Message");
|
||||
message.setApplicationProperty("msgId", i);
|
||||
sender.send(message, txnSession.getTransactionId());
|
||||
}
|
||||
|
||||
// Read all messages from the Queue, do not accept them yet.
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
receiver.flow(2);
|
||||
AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
|
||||
AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
|
||||
|
||||
// Accept the first one in a TXN and send a new message in that TXN as well
|
||||
txnSession.begin();
|
||||
{
|
||||
message1.accept(txnSession, false);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setText("Test-Message");
|
||||
message.setApplicationProperty("msgId", NUM_MESSAGES);
|
||||
|
||||
sender.send(message, txnSession.getTransactionId());
|
||||
}
|
||||
txnSession.commit();
|
||||
|
||||
// Accept the second one in a TXN and send a new message in that TXN as well but rollback
|
||||
txnSession.begin();
|
||||
{
|
||||
message2.accept(txnSession, false);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setText("Test-Message");
|
||||
message.setApplicationProperty("msgId", NUM_MESSAGES + 1);
|
||||
sender.send(message, txnSession.getTransactionId());
|
||||
}
|
||||
txnSession.rollback();
|
||||
|
||||
message2.release();
|
||||
|
||||
// Should be two message available for dispatch given that we sent and committed one, and
|
||||
// releases another we had previously received.
|
||||
receiver.flow(2);
|
||||
for (int i = 1; i <= NUM_MESSAGES; ++i) {
|
||||
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(message);
|
||||
assertEquals(i, message.getApplicationProperty("msgId"));
|
||||
message.accept();
|
||||
}
|
||||
|
||||
// Should be nothing left.
|
||||
receiver.flow(1);
|
||||
assertNull(receiver.receive(1, TimeUnit.SECONDS));
|
||||
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue