Small fix to test and check for zero inflight on successive send to
destination that should have no credit on the registered receiver.
This commit is contained in:
Timothy Bish 2016-09-09 13:02:04 -04:00
parent ca11674f37
commit 566e82614a
1 changed files with 25 additions and 1 deletions

View File

@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.junit.ActiveMQTestRunner;
import org.apache.activemq.junit.Repeat;
@ -43,6 +44,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
@ -569,12 +571,18 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpReceiver receiver = session.createReceiver(address);
AmqpSender sender = session.createSender(address);
final DestinationViewMBean destinationView;
if (Queue.class.equals(destType)) {
destinationView = getProxyToQueue(getTestName());
} else {
destinationView = getProxyToTopic(getTestName());
}
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + i);
sender.send(message);
}
sender.close();
List<AmqpMessage> pendingAcks = new ArrayList<AmqpMessage>();
@ -582,12 +590,28 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
pendingAcks.add(received);
}
// Send one more to check in-flight stays at zero with no credit and all
// pending messages settled.
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg-final");
sender.send(message);
for (AmqpMessage pendingAck : pendingAcks) {
pendingAck.accept();
}
assertTrue("Should be no inflight messages: " + destinationView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return destinationView.getInFlightCount() == 0;
}
}));
sender.close();
receiver.close();
connection.close();
}