ARTEMIS-1013 Queue deliver after AMQP msg release

This commit is contained in:
Martyn Taylor 2017-03-02 14:50:56 +00:00 committed by Clebert Suconic
parent a353da0caf
commit 543dd4c9e3
3 changed files with 38 additions and 8 deletions

View File

@ -326,7 +326,8 @@ public class AMQPSessionCallback implements SessionCallback {
public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
recoverContext();
try {
((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);;
((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
} finally {
resetContext();
}
@ -560,7 +561,6 @@ public class AMQPSessionCallback implements SessionCallback {
Transaction tx = protonSPI.getTransaction(txid);
tx.rollback();
protonSPI.removeTransaction(txid);
}
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {

View File

@ -44,6 +44,10 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
AmqpReceiver receiver2 = session.createReceiver(getTestName());
assertNotNull("did not receive message first time", message);
assertEquals("MessageID:0", message.getMessageId());
@ -51,12 +55,11 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
assertNotNull(protonMessage);
assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
receiver2.flow(1);
message.release();
// Read the message again and validate its state
AmqpReceiver receiver2 = session.createReceiver(getTestName());
receiver2.flow(1);
// Read the message again and validate its state
message = receiver2.receive(10, TimeUnit.SECONDS);
assertNotNull("did not receive message again", message);
assertEquals("MessageID:0", message.getMessageId());

View File

@ -110,7 +110,6 @@ public class ProtonTest extends ProtonTestBase {
private static final String amqpConnectionUri = "amqp://localhost:5672";
private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
private static final String brokerName = "my-broker";
private static final long maxSizeBytes = 1 * 1024 * 1024;
@ -472,7 +471,7 @@ public class ProtonTest extends ProtonTestBase {
session.close();
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
//because tx commit is executed async on broker, we use a timed wait.
assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10));
assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10));
}
@Test
@ -548,7 +547,7 @@ public class ProtonTest extends ProtonTestBase {
session.rollback();
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
//because tx rollback is executed async on broker, we use a timed wait.
assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10));
assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10));
}
@ -1855,4 +1854,32 @@ public class ProtonTest extends ProtonTestBase {
return count;
}
}
@Test
public void testReleaseDisposition() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(address);
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
AmqpReceiver receiver = session.createReceiver(address);
receiver.flow(10);
AmqpMessage m1 = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(m1);
m1.release();
//receiver.flow(10);
AmqpMessage m2 = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(m2);
m2.accept();
} finally {
connection.close();
}
}
}