https://issues.apache.org/jira/browse/AMQ-6422 - move delivery tracking to pumpoutbound and additional test that shows how the presettle case breaks. Thanks to Robbie Gemmell for the feedback

This commit is contained in:
gtully 2016-09-21 13:59:45 +01:00
parent ffee8b442f
commit 6c01b641b1
2 changed files with 42 additions and 1 deletions

View File

@ -292,7 +292,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
}
pumpOutbound();
logicalDeliveryCount++;
}
@Override
@ -410,6 +409,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
}
currentBuffer = null;
currentDelivery = null;
logicalDeliveryCount++;
}
} else {
return;

View File

@ -223,6 +223,47 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
@Repeat(repetitions = 1)
public void testPresettledReceiverReadsAllMessagesInNonFlowBatch() throws Exception {
final int MSG_COUNT = 100;
sendMessages(getTestName(), MSG_COUNT, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), null, false, true);
QueueViewMBean queueView = getProxyToQueue(getTestName());
assertEquals(MSG_COUNT, queueView.getQueueSize());
assertEquals(0, queueView.getDispatchCount());
receiver.flow(20);
// consume less that flow
for (int j=0;j<10;j++) {
assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
}
// flow more and consume all
receiver.flow(10);
for (int j=0;j<20;j++) {
assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
}
// remainder
receiver.flow(70);
for (int j=0;j<70;j++) {
assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
}
receiver.close();
assertEquals(0, queueView.getQueueSize());
connection.close();
}
@Test(timeout = 60000)
@Repeat(repetitions = 1)
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {