diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java index 752c341967..f39fc3e9e0 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import javax.jms.Queue; import javax.jms.Topic; +import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.junit.ActiveMQTestRunner; import org.apache.activemq.junit.Repeat; @@ -43,6 +44,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.util.Wait; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; @@ -569,12 +571,18 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpReceiver receiver = session.createReceiver(address); AmqpSender sender = session.createSender(address); + final DestinationViewMBean destinationView; + if (Queue.class.equals(destType)) { + destinationView = getProxyToQueue(getTestName()); + } else { + destinationView = getProxyToTopic(getTestName()); + } + for (int i = 0; i < MSG_COUNT; i++) { AmqpMessage message = new AmqpMessage(); message.setMessageId("msg" + i); sender.send(message); } - sender.close(); List pendingAcks = new ArrayList(); @@ -582,12 +590,28 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { receiver.flow(1); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(received); + pendingAcks.add(received); } + // Send one more to check in-flight stays at zero with no credit and all + // pending messages settled. + AmqpMessage message = new AmqpMessage(); + message.setMessageId("msg-final"); + sender.send(message); + for (AmqpMessage pendingAck : pendingAcks) { pendingAck.accept(); } + assertTrue("Should be no inflight messages: " + destinationView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return destinationView.getInFlightCount() == 0; + } + })); + + sender.close(); receiver.close(); connection.close(); }