mirror of https://github.com/apache/activemq.git
Add test for credit grants but no settles for a single receiver.
This commit is contained in:
parent
5de9bdac08
commit
640289868e
|
@ -22,7 +22,9 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
@ -538,4 +540,55 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testReceiveMessageBeyondAckedAmountQueue() throws Exception {
|
||||
doTestReceiveMessageBeyondAckedAmount(Queue.class);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testReceiveMessageBeyondAckedAmountTopic() throws Exception {
|
||||
doTestReceiveMessageBeyondAckedAmount(Topic.class);
|
||||
}
|
||||
|
||||
private void doTestReceiveMessageBeyondAckedAmount(Class<?> destType) throws Exception {
|
||||
final int MSG_COUNT = 50;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
final String address;
|
||||
if (Queue.class.equals(destType)) {
|
||||
address = "queue://" + getTestName();
|
||||
} else {
|
||||
address = "topic://" + getTestName();
|
||||
}
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(address);
|
||||
AmqpSender sender = session.createSender(address);
|
||||
|
||||
for (int i = 0; i < MSG_COUNT; i++) {
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setMessageId("msg" + i);
|
||||
sender.send(message);
|
||||
}
|
||||
sender.close();
|
||||
|
||||
List<AmqpMessage> pendingAcks = new ArrayList<AmqpMessage>();
|
||||
|
||||
for (int i = 0; i < MSG_COUNT; i++) {
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(received);
|
||||
}
|
||||
|
||||
for (AmqpMessage pendingAck : pendingAcks) {
|
||||
pendingAck.accept();
|
||||
}
|
||||
|
||||
receiver.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue