mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-6422 - move delivery tracking to pumpoutbound and additional test that shows how the presettle case breaks. Thanks to Robbie Gemmell for the feedback
This commit is contained in:
parent
ebbb7ab437
commit
8e6fe414ad
|
@ -292,7 +292,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pumpOutbound();
|
pumpOutbound();
|
||||||
logicalDeliveryCount++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -410,6 +409,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
}
|
}
|
||||||
currentBuffer = null;
|
currentBuffer = null;
|
||||||
currentDelivery = null;
|
currentDelivery = null;
|
||||||
|
logicalDeliveryCount++;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -223,6 +223,47 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
@Repeat(repetitions = 1)
|
||||||
|
public void testPresettledReceiverReadsAllMessagesInNonFlowBatch() throws Exception {
|
||||||
|
final int MSG_COUNT = 100;
|
||||||
|
sendMessages(getTestName(), MSG_COUNT, false);
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = client.connect();
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), null, false, true);
|
||||||
|
|
||||||
|
QueueViewMBean queueView = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(MSG_COUNT, queueView.getQueueSize());
|
||||||
|
assertEquals(0, queueView.getDispatchCount());
|
||||||
|
|
||||||
|
receiver.flow(20);
|
||||||
|
// consume less that flow
|
||||||
|
for (int j=0;j<10;j++) {
|
||||||
|
assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
// flow more and consume all
|
||||||
|
receiver.flow(10);
|
||||||
|
for (int j=0;j<20;j++) {
|
||||||
|
assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
// remainder
|
||||||
|
receiver.flow(70);
|
||||||
|
for (int j=0;j<70;j++) {
|
||||||
|
assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
receiver.close();
|
||||||
|
|
||||||
|
assertEquals(0, queueView.getQueueSize());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
@Repeat(repetitions = 1)
|
@Repeat(repetitions = 1)
|
||||||
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
|
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue