diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 6c30828049..567c50766f 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -427,16 +427,6 @@ public class AmqpSender extends AmqpAbstractLink { LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName()); getEndpoint().drained(); draining = false; - } else { - LOG.trace("Sender:[{}] updating conumser prefetch:{} after dispatch.", - getEndpoint().getName(), getEndpoint().getCredit()); - - ConsumerControl control = new ConsumerControl(); - control.setConsumerId(getConsumerId()); - control.setDestination(getDestination()); - control.setPrefetch(Math.max(0, getEndpoint().getCredit() - 1)); - - sendToActiveMQ(control); } jms.setRedeliveryCounter(md.getRedeliveryCounter()); @@ -467,6 +457,17 @@ public class AmqpSender extends AmqpAbstractLink { tagCache.returnTag(tag); } + int newCredit = Math.max(0, getEndpoint().getCredit() - 1); + LOG.trace("Sender:[{}] updating conumser prefetch:{} after delivery settled.", + getEndpoint().getName(), newCredit); + + ConsumerControl control = new ConsumerControl(); + control.setConsumerId(getConsumerId()); + control.setDestination(getDestination()); + control.setPrefetch(newCredit); + + sendToActiveMQ(control); + if (ackType == -1) { // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ delivery.settle(); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index e327534cb3..6ed7861aed 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -179,12 +179,33 @@ public class AmqpSession extends AmqpAbstractResource { * @throws Exception if an error occurs while creating the receiver. */ public AmqpReceiver createReceiver(String address, String selector, boolean noLocal) throws Exception { + return createReceiver(address, selector, noLocal, false); + } + + /** + * Create a receiver instance using the given address + * + * @param address + * the address to which the receiver will subscribe for its messages. + * @param selector + * the JMS selector to use for the subscription + * @param noLocal + * should the subscription have messages from its connection filtered. + * @param presettle + * should the receiver be created with a settled sender mode. + * + * @return a newly created receiver that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(String address, String selector, boolean noLocal, boolean presettle) throws Exception { checkClosed(); final ClientFuture request = new ClientFuture(); final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId()); receiver.setNoLocal(noLocal); + receiver.setPresettle(presettle); if (selector != null && !selector.isEmpty()) { receiver.setSelector(selector); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index 1226f3ed5e..c68e850024 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.junit.ActiveMQTestRunner; +import org.apache.activemq.junit.Repeat; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; import org.apache.activemq.transport.amqp.client.AmqpConnection; @@ -46,10 +48,12 @@ import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.Message; import org.junit.Test; +import org.junit.runner.RunWith; /** * Test various behaviors of AMQP receivers with the broker. */ +@RunWith(ActiveMQTestRunner.class) public class AmqpReceiverTest extends AmqpClientTestSupport { @Override @@ -193,6 +197,34 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { } @Test(timeout = 60000) + @Repeat(repetitions = 1) + public void testPresettledReceiverReadsAllMessages() throws Exception { + final int MSG_COUNT = 100; + sendMessages(getTestName(), MSG_COUNT, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), null, false, true); + + QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getQueueSize()); + assertEquals(0, queueView.getDispatchCount()); + + receiver.flow(MSG_COUNT); + for (int i = 0; i < MSG_COUNT; ++i) { + assertNotNull(receiver.receive(5, TimeUnit.SECONDS)); + } + receiver.close(); + + assertEquals(0, queueView.getQueueSize()); + + connection.close(); + } + + @Test(timeout = 60000) + @Repeat(repetitions = 1) public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception { int MSG_COUNT = 4; sendMessages(getTestName(), MSG_COUNT, false); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java index 4dd2f0caba..b9dda614a6 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -25,6 +25,8 @@ import static org.junit.Assert.assertTrue; import java.util.concurrent.TimeUnit; import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.junit.ActiveMQTestRunner; +import org.apache.activemq.junit.Repeat; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; import org.apache.activemq.transport.amqp.client.AmqpConnection; @@ -33,12 +35,18 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test basic send and receive scenarios using only AMQP sender and receiver links. */ +@RunWith(ActiveMQTestRunner.class) public class AmqpSendReceiveTest extends AmqpClientTestSupport { + protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class); + @Test(timeout = 60000) public void testCloseBusyReceiver() throws Exception { final int MSG_COUNT = 20; @@ -113,9 +121,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { assertNull(receiver.receive(1, TimeUnit.SECONDS)); receiver.close(); + connection.close(); } - @Test(timeout = 30000) + @Test(timeout = 60000) + @Repeat(repetitions = 1) public void testAdvancedLinkFlowControl() throws Exception { final int MSG_COUNT = 20; @@ -137,10 +147,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { sender.close(); + LOG.info("Attempting to read first two messages with receiver #1"); AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName()); receiver1.flow(2); - AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS); - AmqpMessage message2 = receiver1.receive(5, TimeUnit.SECONDS); + AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS); + AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS); assertNotNull("Should have read message 1", message1); assertNotNull("Should have read message 2", message2); assertEquals("msg0", message1.getMessageId()); @@ -148,10 +159,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { message1.accept(); message2.accept(); + LOG.info("Attempting to read next two messages with receiver #2"); AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName()); receiver2.flow(2); - AmqpMessage message3 = receiver2.receive(5, TimeUnit.SECONDS); - AmqpMessage message4 = receiver2.receive(5, TimeUnit.SECONDS); + AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS); + AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS); assertNotNull("Should have read message 3", message3); assertNotNull("Should have read message 4", message4); assertEquals("msg2", message3.getMessageId()); @@ -159,9 +171,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { message3.accept(); message4.accept(); + LOG.info("Attempting to read remaining messages with receiver #1"); receiver1.flow(MSG_COUNT - 4); for (int i = 4; i < MSG_COUNT - 4; i++) { - AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS); assertNotNull("Should have read a message", message); assertEquals("msg" + i, message.getMessageId()); message.accept(); @@ -169,6 +182,80 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { receiver1.close(); receiver2.close(); + + connection.close(); + } + + @Test(timeout = 60000) + @Repeat(repetitions = 1) + public void testDispatchOrderWithPrefetchOfOne() throws Exception { + final int MSG_COUNT = 20; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + + for (int i = 0; i < MSG_COUNT; i++) { + AmqpMessage message = new AmqpMessage(); + + message.setMessageId("msg" + i); + message.setMessageAnnotation("serialNo", i); + message.setText("Test-Message"); + + sender.send(message); + } + + sender.close(); + + AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName()); + receiver1.flow(1); + + AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName()); + receiver2.flow(1); + + AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS); + AmqpMessage message2 = receiver2.receive(10, TimeUnit.SECONDS); + assertNotNull("Should have read message 1", message1); + assertNotNull("Should have read message 2", message2); + assertEquals("msg0", message1.getMessageId()); + assertEquals("msg1", message2.getMessageId()); + message1.accept(); + message2.accept(); + + receiver1.flow(1); + AmqpMessage message3 = receiver1.receive(10, TimeUnit.SECONDS); + receiver2.flow(1); + AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS); + assertNotNull("Should have read message 3", message3); + assertNotNull("Should have read message 4", message4); + assertEquals("msg2", message3.getMessageId()); + assertEquals("msg3", message4.getMessageId()); + message3.accept(); + message4.accept(); + + LOG.info("Attempting to read remaining messages with both receivers"); + int splitCredit = (MSG_COUNT - 4) / 2; + + receiver1.flow(splitCredit); + for (int i = 4; i < splitCredit; i++) { + AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS); + assertNotNull("Should have read a message", message); + message.accept(); + } + + receiver2.flow(splitCredit); + for (int i = 4; i < splitCredit; i++) { + AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS); + assertNotNull("Should have read a message", message); + message.accept(); + } + + receiver1.close(); + receiver2.close(); + + connection.close(); } @Test(timeout = 60000) diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties index 5208b3fdec..d25017dac9 100755 --- a/activemq-amqp/src/test/resources/log4j.properties +++ b/activemq-amqp/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ # log4j.rootLogger=WARN, console, file log4j.logger.org.apache.activemq=INFO -log4j.logger.org.apache.activemq.transport.amqp=INFO +log4j.logger.org.apache.activemq.transport.amqp=DEBUG log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO log4j.logger.org.fusesource=INFO