NO-JIRA AMQP Test updates

Adds support for doing sends and receives that are enrolled in a
transaction created in a session other than the session that created the
sender or receiver.  Adds some tests that show this in action.
This commit is contained in:
Timothy Bish 2016-09-14 18:23:52 -04:00
parent 14c5c5276c
commit fa5514985d
5 changed files with 269 additions and 7 deletions

View File

@ -139,6 +139,22 @@ public class AmqpMessage {
receiver.accept(delivery);
}
/**
* Accepts the message marking it as consumed on the remote peer.
*
* @param session
* The session that is used to manage acceptance of the message.
*
* @throws Exception if an error occurs during the accept.
*/
public void accept(AmqpSession txnSession) throws Exception {
if (receiver == null) {
throw new IllegalStateException("Can't accept non-received message.");
}
receiver.accept(delivery, txnSession);
}
/**
* Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here.
*
@ -374,7 +390,7 @@ public class AmqpMessage {
* @param key
* the name used to lookup the property in the application properties.
*
* @return the propety value or null if not set.
* @return the property value or null if not set.
*/
public Object getApplicationProperty(String key) {
if (applicationPropertiesMap == null) {
@ -560,6 +576,7 @@ public class AmqpMessage {
message.setHeader(new Header());
}
}
private void lazyCreateProperties() {
if (message.getProperties() == null) {
message.setProperties(new Properties());

View File

@ -422,12 +422,38 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
* @throws IOException if an error occurs while sending the accept.
*/
public void accept(final Delivery delivery) throws IOException {
accept(delivery, this.session);
}
/**
* Accepts a message that was dispatched under the given Delivery instance.
*
* This method allows for the session that is used in the accept to be specified by the
* caller. This allows for an accepted message to be involved in a transaction that is
* being managed by some other session other than the one that created this receiver.
*
* @param delivery
* the Delivery instance to accept.
* @param session
* the session under which the message is being accepted.
*
* @throws IOException if an error occurs while sending the accept.
*/
public void accept(final Delivery delivery, final AmqpSession session) throws IOException {
checkClosed();
if (delivery == null) {
throw new IllegalArgumentException("Delivery to accept cannot be null");
}
if (session == null) {
throw new IllegalArgumentException("Session given cannot be null");
}
if (session.getConnection() != this.session.getConnection()) {
throw new IllegalArgumentException("The session used for accept must originate from the connection that created this receiver.");
}
final ClientFuture request = new ClientFuture();
session.getScheduler().execute(new Runnable() {

View File

@ -126,6 +126,21 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
* @throws IOException if an error occurs during the send.
*/
public void send(final AmqpMessage message) throws IOException {
checkClosed();
send(message, null);
}
/**
* Sends the given message to this senders assigned address using the supplied transaction ID.
*
* @param message
* the message to send.
* @param txId
* the transaction ID to assign the outgoing send.
*
* @throws IOException if an error occurs during the send.
*/
public void send(final AmqpMessage message, final AmqpTransactionId txId) throws IOException {
checkClosed();
final ClientFuture sendRequest = new ClientFuture();
@ -134,7 +149,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
@Override
public void run() {
try {
doSend(message, sendRequest);
doSend(message, sendRequest, txId);
session.pumpToProtonTransport(sendRequest);
} catch (Exception e) {
sendRequest.onFailure(e);
@ -319,7 +334,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
}
}
private void doSend(AmqpMessage message, AsyncResult request) throws Exception {
private void doSend(AmqpMessage message, AsyncResult request, AmqpTransactionId txId) throws Exception {
LOG.trace("Producer sending message: {}", message);
Delivery delivery = null;
@ -332,8 +347,14 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
delivery.setContext(request);
if (session.isInTransaction()) {
Binary amqpTxId = session.getTransactionId().getRemoteTxId();
Binary amqpTxId = null;
if (txId != null) {
amqpTxId = txId.getRemoteTxId();
} else if (session.isInTransaction()) {
amqpTxId = session.getTransactionId().getRemoteTxId();
}
if (amqpTxId != null) {
TransactionalState state = new TransactionalState();
state.setTxnId(amqpTxId);
delivery.disposition(state);

View File

@ -464,8 +464,12 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
connection.pumpToProtonTransport(request);
}
AmqpTransactionId getTransactionId() {
return txContext.getTransactionId();
public AmqpTransactionId getTransactionId() {
if (txContext != null && txContext.isInTransaction()) {
return txContext.getTransactionId();
}
return null;
}
AmqpTransactionContext getTransactionContext() {

View File

@ -178,4 +178,198 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
sender.close();
connection.close();
}
@Test(timeout = 60000)
public void testMultipleSessionReceiversInSingleTXNWithCommit() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
// Load up the Queue with some messages
{
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
sender.send(message);
sender.send(message);
sender.close();
}
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
// Create some sender sessions
AmqpSession session1 = connection.createSession();
AmqpSession session2 = connection.createSession();
AmqpSession session3 = connection.createSession();
// Sender linked to each session
AmqpReceiver receiver1 = session1.createReceiver("queue://" + getTestName());
AmqpReceiver receiver2 = session2.createReceiver("queue://" + getTestName());
AmqpReceiver receiver3 = session3.createReceiver("queue://" + getTestName());
final QueueViewMBean queue = getProxyToQueue(getTestName());
assertEquals(3, queue.getQueueSize());
// Begin the transaction that all senders will operate in.
txnSession.begin();
assertTrue(txnSession.isInTransaction());
receiver1.flow(1);
receiver2.flow(1);
receiver3.flow(1);
AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS);
AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS);
message1.accept(txnSession);
message2.accept(txnSession);
message3.accept(txnSession);
assertEquals(3, queue.getQueueSize());
txnSession.commit();
assertEquals(0, queue.getQueueSize());
}
@Test(timeout = 60000)
public void testMultipleSessionReceiversInSingleTXNWithRollback() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
// Load up the Queue with some messages
{
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
sender.send(message);
sender.send(message);
sender.close();
}
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
// Create some sender sessions
AmqpSession session1 = connection.createSession();
AmqpSession session2 = connection.createSession();
AmqpSession session3 = connection.createSession();
// Sender linked to each session
AmqpReceiver receiver1 = session1.createReceiver("queue://" + getTestName());
AmqpReceiver receiver2 = session2.createReceiver("queue://" + getTestName());
AmqpReceiver receiver3 = session3.createReceiver("queue://" + getTestName());
final QueueViewMBean queue = getProxyToQueue(getTestName());
assertEquals(3, queue.getQueueSize());
// Begin the transaction that all senders will operate in.
txnSession.begin();
assertTrue(txnSession.isInTransaction());
receiver1.flow(1);
receiver2.flow(1);
receiver3.flow(1);
AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS);
AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS);
message1.accept(txnSession);
message2.accept(txnSession);
message3.accept(txnSession);
assertEquals(3, queue.getQueueSize());
txnSession.rollback();
assertEquals(3, queue.getQueueSize());
}
@Test(timeout = 60000)
public void testMultipleSessionSendersInSingleTXNWithCommit() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
// Create some sender sessions
AmqpSession session1 = connection.createSession();
AmqpSession session2 = connection.createSession();
AmqpSession session3 = connection.createSession();
// Sender linked to each session
AmqpSender sender1 = session1.createSender("queue://" + getTestName());
AmqpSender sender2 = session2.createSender("queue://" + getTestName());
AmqpSender sender3 = session3.createSender("queue://" + getTestName());
final QueueViewMBean queue = getProxyToQueue(getTestName());
assertEquals(0, queue.getQueueSize());
// Begin the transaction that all senders will operate in.
txnSession.begin();
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
assertTrue(txnSession.isInTransaction());
sender1.send(message, txnSession.getTransactionId());
sender2.send(message, txnSession.getTransactionId());
sender3.send(message, txnSession.getTransactionId());
assertEquals(0, queue.getQueueSize());
txnSession.commit();
assertEquals(3, queue.getQueueSize());
}
@Test(timeout = 60000)
public void testMultipleSessionSendersInSingleTXNWithRollback() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
// Create some sender sessions
AmqpSession session1 = connection.createSession();
AmqpSession session2 = connection.createSession();
AmqpSession session3 = connection.createSession();
// Sender linked to each session
AmqpSender sender1 = session1.createSender("queue://" + getTestName());
AmqpSender sender2 = session2.createSender("queue://" + getTestName());
AmqpSender sender3 = session3.createSender("queue://" + getTestName());
final QueueViewMBean queue = getProxyToQueue(getTestName());
assertEquals(0, queue.getQueueSize());
// Begin the transaction that all senders will operate in.
txnSession.begin();
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
assertTrue(txnSession.isInTransaction());
sender1.send(message, txnSession.getTransactionId());
sender2.send(message, txnSession.getTransactionId());
sender3.send(message, txnSession.getTransactionId());
assertEquals(0, queue.getQueueSize());
txnSession.rollback();
assertEquals(0, queue.getQueueSize());
}
}