From 8448cf1cb886b242b54235b091259acbf43c2108 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 1 Jun 2016 18:30:31 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6305 Refactor credit handling and drain state tracking to ensure we stay in sync with the remote state and always answer drain requests. Start adding some more tests around drain to the interop suite. --- .../amqp/protocol/AmqpAbstractLink.java | 4 +- .../amqp/protocol/AmqpConnection.java | 1 + .../transport/amqp/protocol/AmqpSender.java | 86 +++--- .../transport/amqp/JMSClientTestSupport.java | 40 +++ .../amqp/JMSClientTransactionTest.java | 102 ++++++- .../transport/amqp/JMSQueueBrowserTest.java | 245 +++++++++++++++- .../amqp/client/AmqpAbstractResource.java | 13 + .../transport/amqp/client/AmqpConnection.java | 32 +- .../transport/amqp/client/AmqpReceiver.java | 277 +++++++++++++++++- .../transport/amqp/client/AmqpResource.java | 12 + .../transport/amqp/client/AmqpSender.java | 4 +- .../transport/amqp/client/AmqpSession.java | 17 +- .../amqp/client/AmqpTransactionContext.java | 6 +- .../amqp/client/util/NoOpAsyncResult.java | 40 +++ .../amqp/interop/AmqpReceiverDrainTest.java | 154 ++++++++++ .../amqp/interop/AmqpReceiverTest.java | 28 -- .../src/test/resources/log4j.properties | 4 +- 17 files changed, 969 insertions(+), 96 deletions(-) create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/NoOpAsyncResult.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java index d4fe301551..798e356f73 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java @@ -142,7 +142,7 @@ public abstract class AmqpAbstractLink implements AmqpLi } /** - * Shorcut method to hand off an ActiveMQ Command to the broker and assign + * Shortcut method to hand off an ActiveMQ Command to the broker and assign * a ResponseHandler to deal with any reply from the broker. * * @param command @@ -153,7 +153,7 @@ public abstract class AmqpAbstractLink implements AmqpLi } /** - * Shorcut method to hand off an ActiveMQ Command to the broker and assign + * Shortcut method to hand off an ActiveMQ Command to the broker and assign * a ResponseHandler to deal with any reply from the broker. * * @param command diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index 4a0916ba5e..25ca37c2f2 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -172,6 +172,7 @@ public class AmqpConnection implements AmqpProtocolConverter { this.protonTransport.bind(this.protonConnection); this.protonTransport.setChannelMax(CHANNEL_MAX); + this.protonTransport.setEmitFlowEventOnSend(false); this.protonConnection.collect(eventCollector); diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 8cf703342e..6c30828049 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -81,7 +81,6 @@ public class AmqpSender extends AmqpAbstractLink { private final ConsumerInfo consumerInfo; private final boolean presettle; - private int currentCredit; private boolean draining; private long lastDeliveredSequenceId; @@ -101,7 +100,6 @@ public class AmqpSender extends AmqpAbstractLink { public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) { super(session, endpoint); - this.currentCredit = endpoint.getRemoteCredit(); this.consumerInfo = consumerInfo; this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; } @@ -120,7 +118,7 @@ public class AmqpSender extends AmqpAbstractLink { if (!isClosed() && isOpened()) { RemoveInfo removeCommand = new RemoveInfo(getConsumerId()); removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); - sendToActiveMQ(removeCommand, null); + sendToActiveMQ(removeCommand); session.unregisterSender(getConsumerId()); } @@ -133,7 +131,7 @@ public class AmqpSender extends AmqpAbstractLink { if (!isClosed() && isOpened()) { RemoveInfo removeCommand = new RemoveInfo(getConsumerId()); removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); - sendToActiveMQ(removeCommand, null); + sendToActiveMQ(removeCommand); if (consumerInfo.isDurable()) { RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); @@ -141,7 +139,7 @@ public class AmqpSender extends AmqpAbstractLink { rsi.setSubscriptionName(getEndpoint().getName()); rsi.setClientId(session.getConnection().getClientId()); - sendToActiveMQ(rsi, null); + sendToActiveMQ(rsi); } session.unregisterSender(getConsumerId()); @@ -152,17 +150,13 @@ public class AmqpSender extends AmqpAbstractLink { @Override public void flow() throws Exception { - int updatedCredit = getEndpoint().getCredit(); - if (LOG.isTraceEnabled()) { - LOG.trace("Flow: currentCredit={}, draining={}, drain={} credit={}, remoteCredit={}, queued={}", - currentCredit, draining, getEndpoint().getDrain(), + LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}", + draining, getEndpoint().getDrain(), getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued()); } - if (getEndpoint().getDrain() && (updatedCredit != currentCredit || !draining)) { - currentCredit = updatedCredit >= 0 ? updatedCredit : 0; - draining = true; + if (getEndpoint().getDrain() && !draining) { // Revert to a pull consumer. ConsumerControl control = new ConsumerControl(); @@ -170,35 +164,42 @@ public class AmqpSender extends AmqpAbstractLink { control.setDestination(getDestination()); control.setPrefetch(0); - LOG.trace("Flow: Pull case -> consumer control with prefetch (0)"); + LOG.trace("Flow: Pull case -> consumer control with prefetch (0) to control output"); - sendToActiveMQ(control, null); + sendToActiveMQ(control); - // Now request dispatch of the drain amount, we request immediate - // timeout and an completion message regardless so that we can know - // when we should marked the link as drained. - MessagePull pullRequest = new MessagePull(); - pullRequest.setConsumerId(getConsumerId()); - pullRequest.setDestination(getDestination()); - pullRequest.setTimeout(-1); - pullRequest.setAlwaysSignalDone(true); - pullRequest.setQuantity(currentCredit); + if (endpoint.getCredit() > 0) { + draining = true; - LOG.trace("Pull case -> consumer pull request quantity = {}", currentCredit); + // Now request dispatch of the drain amount, we request immediate + // timeout and an completion message regardless so that we can know + // when we should marked the link as drained. + MessagePull pullRequest = new MessagePull(); + pullRequest.setConsumerId(getConsumerId()); + pullRequest.setDestination(getDestination()); + pullRequest.setTimeout(-1); + pullRequest.setAlwaysSignalDone(true); + pullRequest.setQuantity(endpoint.getCredit()); - sendToActiveMQ(pullRequest, null); - } else if (updatedCredit != currentCredit) { - currentCredit = updatedCredit >= 0 ? updatedCredit : 0; + LOG.trace("Pull case -> consumer pull request quantity = {}", endpoint.getCredit()); + + sendToActiveMQ(pullRequest); + } else { + LOG.trace("Pull case -> sending any Queued messages and marking drained"); + + pumpOutbound(); + getEndpoint().drained(); + session.pumpProtonToSocket(); + } + } else { ConsumerControl control = new ConsumerControl(); control.setConsumerId(getConsumerId()); control.setDestination(getDestination()); - control.setPrefetch(currentCredit); + control.setPrefetch(getEndpoint().getCredit()); - LOG.trace("Flow: update -> consumer control with prefetch (0)"); + LOG.trace("Flow: update -> consumer control with prefetch {}", control.getPrefetch()); - sendToActiveMQ(control, null); - } else { - LOG.trace("Flow: no credit change -> no broker updates needed"); + sendToActiveMQ(control); } } @@ -415,14 +416,29 @@ public class AmqpSender extends AmqpAbstractLink { // It's the end of browse signal in response to a MessagePull getEndpoint().drained(); draining = false; - currentCredit = 0; } else { if (LOG.isTraceEnabled()) { - LOG.trace("Sender:[{}] msgId={} currentCredit={}, draining={}, drain={} credit={}, remoteCredit={}, queued={}", - getEndpoint().getName(), jms.getJMSMessageID(), currentCredit, draining, getEndpoint().getDrain(), + LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}", + getEndpoint().getName(), jms.getJMSMessageID(), draining, getEndpoint().getDrain(), getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued()); } + if (draining && getEndpoint().getCredit() == 0) { + LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName()); + getEndpoint().drained(); + draining = false; + } else { + LOG.trace("Sender:[{}] updating conumser prefetch:{} after dispatch.", + getEndpoint().getName(), getEndpoint().getCredit()); + + ConsumerControl control = new ConsumerControl(); + control.setConsumerId(getConsumerId()); + control.setDestination(getDestination()); + control.setPrefetch(Math.max(0, getEndpoint().getCredit() - 1)); + + sendToActiveMQ(control); + } + jms.setRedeliveryCounter(md.getRedeliveryCounter()); jms.setReadOnlyBody(true); final EncodedMessage amqp = outboundTransformer.transform(jms); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java index eebceb0517..d855c6b69b 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp; import java.net.URI; +import java.net.URISyntaxException; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -85,6 +86,45 @@ public class JMSClientTestSupport extends AmqpTestSupport { return amqpURI; } + protected URI getAmqpURI() { + return getAmqpURI(""); + } + + protected URI getAmqpURI(String uriOptions) { + + boolean useSSL = getBrokerURI().getScheme().toLowerCase().contains("ssl"); + + String amqpURI = (useSSL ? "amqps://" : "amqp://") + getBrokerURI().getHost() + ":" + getBrokerURI().getPort(); + + if (uriOptions != null && !uriOptions.isEmpty()) { + if (uriOptions.startsWith("?") || uriOptions.startsWith("&")) { + uriOptions = uriOptions.substring(1); + } + } else { + uriOptions = ""; + } + + if (useSSL) { + amqpURI += "?transport.verifyHost=false"; + } + + if (!uriOptions.isEmpty()) { + if (useSSL) { + amqpURI += "&" + uriOptions; + } else { + amqpURI += "?" + uriOptions; + } + } + + URI result = getBrokerURI(); + try { + result = new URI(amqpURI); + } catch (URISyntaxException e) { + } + + return result; + } + protected Connection createConnection() throws JMSException { return createConnection(name.toString(), false); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java index 560edda9a8..e979714444 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java @@ -20,9 +20,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.concurrent.atomic.AtomicInteger; + import javax.jms.DeliveryMode; import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; @@ -30,6 +35,7 @@ import javax.jms.TextMessage; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.SubscriptionViewMBean; +import org.apache.activemq.util.Wait; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -186,13 +192,107 @@ public class JMSClientTransactionTest extends JMSClientTestSupport { assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize()); SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName()); assertNotNull(subscription); + LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize()); assertTrue(subscription.getPrefetchSize() > 0); for (int i = 1; i <= MSG_COUNT; i++) { LOG.info("Trying to receive message: {}", i); TextMessage message = (TextMessage) consumer.receive(1000); - assertNotNull("Message " + i + "should be available", message); + assertNotNull("Message " + i + " should be available", message); assertEquals("Should get message: " + i, i , message.getIntProperty("MessageSequence")); } + + session.commit(); + } + + @Test(timeout = 60000) + public void testQueueTXRollbackAndCommitAsyncConsumer() throws Exception { + final int MSG_COUNT = 3; + + final AtomicInteger counter = new AtomicInteger(); + + connection = createConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue destination = session.createQueue(getDestinationName()); + + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destination); + + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + try { + LOG.info("Received Message {}", message.getJMSMessageID()); + } catch (JMSException e) { + } + counter.incrementAndGet(); + } + }); + + int msgIndex = 0; + for (int i = 1; i <= MSG_COUNT; i++) { + LOG.info("Sending message: {} to rollback", msgIndex++); + TextMessage message = session.createTextMessage("Rolled back Message: " + msgIndex); + message.setIntProperty("MessageSequence", msgIndex); + producer.send(message); + } + + LOG.info("ROLLBACK of sent message here:"); + session.rollback(); + + assertEquals(0, getProxyToQueue(getDestinationName()).getQueueSize()); + + for (int i = 1; i <= MSG_COUNT; i++) { + LOG.info("Sending message: {} to commit", msgIndex++); + TextMessage message = session.createTextMessage("Commit Message: " + msgIndex); + message.setIntProperty("MessageSequence", msgIndex); + producer.send(message); + } + + LOG.info("COMMIT of sent message here:"); + session.commit(); + + assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize()); + SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName()); + assertNotNull(subscription); + LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize()); + assertTrue(subscription.getPrefetchSize() > 0); + + assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return counter.get() == MSG_COUNT; + } + })); + + LOG.info("COMMIT of first received batch here:"); + session.commit(); + + assertTrue(subscription.getPrefetchSize() > 0); + for (int i = 1; i <= MSG_COUNT; i++) { + LOG.info("Sending message: {} to commit", msgIndex++); + TextMessage message = session.createTextMessage("Commit Message: " + msgIndex); + message.setIntProperty("MessageSequence", msgIndex); + producer.send(message); + } + + LOG.info("COMMIT of next sent message batch here:"); + session.commit(); + + LOG.info("WAITING -> for next three messages to arrive:"); + + assertTrue(subscription.getPrefetchSize() > 0); + assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + LOG.info("Read {} messages so far", counter.get()); + return counter.get() == MSG_COUNT * 2; + } + })); } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSQueueBrowserTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSQueueBrowserTest.java index dfcc108af9..070bb20529 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSQueueBrowserTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSQueueBrowserTest.java @@ -19,13 +19,18 @@ package org.apache.activemq.transport.amqp; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import java.util.Enumeration; +import java.util.concurrent.TimeUnit; import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; +import javax.jms.TextMessage; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.junit.ActiveMQTestRunner; @@ -45,12 +50,12 @@ public class JMSQueueBrowserTest extends JMSClientTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class); @Test(timeout = 60000) - @Repeat(repetitions = 1) + @Repeat(repetitions = 5) public void testBrowseAllInQueueZeroPrefetch() throws Exception { final int MSG_COUNT = 5; - JmsConnectionFactory cf = new JmsConnectionFactory(getBrokerURI() + "?jms.prefetchPolicy.all=0"); + JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=0")); connection = cf.createConnection(); connection.start(); @@ -78,6 +83,242 @@ public class JMSQueueBrowserTest extends JMSClientTestSupport { assertEquals(5, count); } + @Test(timeout = 40000) + public void testCreateQueueBrowser() throws Exception { + JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI()); + + connection = cf.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(getDestinationName()); + session.createConsumer(queue).close(); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + + QueueViewMBean proxy = getProxyToQueue(getDestinationName()); + assertEquals(0, proxy.getQueueSize()); + } + + @Test(timeout = 40000) + public void testNoMessagesBrowserHasNoElements() throws Exception { + JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI()); + + connection = cf.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(getDestinationName()); + session.createConsumer(queue).close(); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + + QueueViewMBean proxy = getProxyToQueue(getDestinationName()); + assertEquals(0, proxy.getQueueSize()); + + Enumeration enumeration = browser.getEnumeration(); + assertFalse(enumeration.hasMoreElements()); + } + + @Test(timeout=30000) + public void testBroseOneInQueue() throws Exception { + JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI()); + + connection = cf.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("hello")); + producer.close(); + + QueueBrowser browser = session.createBrowser(queue); + Enumeration enumeration = browser.getEnumeration(); + while (enumeration.hasMoreElements()) { + Message m = (Message) enumeration.nextElement(); + assertTrue(m instanceof TextMessage); + LOG.debug("Browsed message {} from Queue {}", m, queue); + } + + browser.close(); + + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(5000); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + } + + @Test(timeout = 60000) + @Repeat(repetitions = 5) + public void testBrowseAllInQueue() throws Exception { + JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI()); + + connection = cf.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(getDestinationName()); + sendMessages(name.getMethodName(), 5, false); + + QueueViewMBean proxy = getProxyToQueue(getDestinationName()); + assertEquals(5, proxy.getQueueSize()); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + Enumeration enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message msg = (Message) enumeration.nextElement(); + assertNotNull(msg); + LOG.debug("Recv: {}", msg); + count++; + TimeUnit.MILLISECONDS.sleep(50); + } + assertFalse(enumeration.hasMoreElements()); + assertEquals(5, count); + } + + @Test(timeout = 60000) + @Repeat(repetitions = 5) + public void testBrowseAllInQueuePrefetchOne() throws Exception { + JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=1")); + + connection = cf.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(getDestinationName()); + sendMessages(name.getMethodName(), 5, false); + + QueueViewMBean proxy = getProxyToQueue(getDestinationName()); + assertEquals(5, proxy.getQueueSize()); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + Enumeration enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message msg = (Message) enumeration.nextElement(); + assertNotNull(msg); + LOG.debug("Recv: {}", msg); + count++; + } + assertFalse(enumeration.hasMoreElements()); + assertEquals(5, count); + } + + @Test(timeout = 40000) + public void testBrowseAllInQueueTxSession() throws Exception { + JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI()); + + connection = cf.createConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + assertNotNull(session); + Queue queue = session.createQueue(getDestinationName()); + sendMessages(name.getMethodName(), 5, false); + + QueueViewMBean proxy = getProxyToQueue(getDestinationName()); + assertEquals(5, proxy.getQueueSize()); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + Enumeration enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message msg = (Message) enumeration.nextElement(); + assertNotNull(msg); + LOG.debug("Recv: {}", msg); + count++; + } + assertFalse(enumeration.hasMoreElements()); + assertEquals(5, count); + } + + @Test(timeout = 40000) + public void testQueueBrowserInTxSessionLeavesOtherWorkUnaffected() throws Exception { + JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI()); + + connection = cf.createConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + assertNotNull(session); + Queue queue = session.createQueue(getDestinationName()); + sendMessages(name.getMethodName(), 5, false); + + QueueViewMBean proxy = getProxyToQueue(getDestinationName()); + assertEquals(5, proxy.getQueueSize()); + + // Send some TX work but don't commit. + MessageProducer txProducer = session.createProducer(queue); + for (int i = 0; i < 5; ++i) { + txProducer.send(session.createMessage()); + } + + assertEquals(5, proxy.getQueueSize()); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + Enumeration enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message msg = (Message) enumeration.nextElement(); + assertNotNull(msg); + LOG.debug("Recv: {}", msg); + count++; + } + + assertFalse(enumeration.hasMoreElements()); + assertEquals(5, count); + + browser.close(); + + // Now check that all browser work did not affect the session transaction. + assertEquals(5, proxy.getQueueSize()); + session.commit(); + assertEquals(10, proxy.getQueueSize()); + } + + @Test(timeout = 60000) + public void testBrowseAllInQueueSmallPrefetch() throws Exception { + JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=5")); + + connection = cf.createConnection(); + connection.start(); + + final int MSG_COUNT = 30; + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(getDestinationName()); + sendMessages(name.getMethodName(), MSG_COUNT, false); + + QueueViewMBean proxy = getProxyToQueue(getDestinationName()); + assertEquals(MSG_COUNT, proxy.getQueueSize()); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + Enumeration enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message msg = (Message) enumeration.nextElement(); + assertNotNull(msg); + LOG.debug("Recv: {}", msg); + count++; + } + assertFalse(enumeration.hasMoreElements()); + assertEquals(MSG_COUNT, count); + } + @Override protected boolean isUseOpenWireConnector() { return true; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java index 9d020277ae..e17a0c9f9e 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java @@ -152,6 +152,19 @@ public abstract class AmqpAbstractResource implements AmqpRe connection.fireClientException(error); } + @Override + public void locallyClosed(AmqpConnection connection, Exception error) { + if (endpoint != null) { + // TODO: if this is a producer/consumer link then we may only be detached, + // rather than fully closed, and should respond appropriately. + endpoint.close(); + } + + LOG.info("Resource {} was locally closed", this); + + connection.fireClientException(error); + } + public E getEndpoint() { return this.endpoint; } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 6c35e4c15d..00488b8820 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -17,9 +17,6 @@ package org.apache.activemq.transport.amqp.client; import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.net.URI; @@ -39,8 +36,10 @@ import org.apache.activemq.transport.InactivityIOException; import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator; import org.apache.activemq.transport.amqp.client.transport.NettyTransport; import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener; +import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.IdGenerator; +import org.apache.activemq.transport.amqp.client.util.NoOpAsyncResult; import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.Collector; @@ -54,10 +53,16 @@ import org.apache.qpid.proton.engine.impl.CollectorImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; + public class AmqpConnection extends AmqpAbstractResource implements NettyTransportListener { private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class); + private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult(); + private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 1; // NOTE: Limit default channel max to signed short range to deal with // brokers that don't currently handle the unsigned range well. @@ -66,6 +71,7 @@ public class AmqpConnection extends AmqpAbstractResource implements public static final long DEFAULT_CONNECT_TIMEOUT = 515000; public static final long DEFAULT_CLOSE_TIMEOUT = 30000; + public static final long DEFAULT_DRAIN_TIMEOUT = 60000; private final ScheduledExecutorService serializer; private final AtomicBoolean closed = new AtomicBoolean(); @@ -95,6 +101,7 @@ public class AmqpConnection extends AmqpAbstractResource implements private int channelMax = DEFAULT_CHANNEL_MAX; private long connectTimeout = DEFAULT_CONNECT_TIMEOUT; private long closeTimeout = DEFAULT_CLOSE_TIMEOUT; + private long drainTimeout = DEFAULT_DRAIN_TIMEOUT; public AmqpConnection(NettyTransport transport, String username, String password) { setEndpoint(Connection.Factory.create()); @@ -150,7 +157,7 @@ public class AmqpConnection extends AmqpAbstractResource implements authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction); open(future); - pumpToProtonTransport(); + pumpToProtonTransport(future); } }); @@ -190,7 +197,7 @@ public class AmqpConnection extends AmqpAbstractResource implements request.onSuccess(); } - pumpToProtonTransport(); + pumpToProtonTransport(request); } catch (Exception e) { LOG.debug("Caught exception while closing proton connection"); } @@ -241,7 +248,7 @@ public class AmqpConnection extends AmqpAbstractResource implements session.setEndpoint(getEndpoint().session()); session.setStateInspector(getStateInspector()); session.open(request); - pumpToProtonTransport(); + pumpToProtonTransport(request); } }); @@ -355,6 +362,14 @@ public class AmqpConnection extends AmqpAbstractResource implements this.closeTimeout = closeTimeout; } + public long getDrainTimeout() { + return drainTimeout; + } + + public void setDrainTimeout(long drainTimeout) { + this.drainTimeout = drainTimeout; + } + public List getOfferedCapabilities() { return offeredCapabilities; } @@ -439,6 +454,10 @@ public class AmqpConnection extends AmqpAbstractResource implements } void pumpToProtonTransport() { + pumpToProtonTransport(NOOP_REQUEST); + } + + void pumpToProtonTransport(AsyncResult request) { try { boolean done = false; while (!done) { @@ -454,6 +473,7 @@ public class AmqpConnection extends AmqpAbstractResource implements } } catch (IOException e) { fireClientException(e); + request.onFailure(e); } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 87aa36afea..77a529d865 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -26,14 +26,17 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.InvalidDestinationException; +import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver; +import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; @@ -74,6 +77,9 @@ public class AmqpReceiver extends AmqpAbstractResource { private boolean presettle; private boolean noLocal; + private AsyncResult pullRequest; + private AsyncResult stopRequest; + /** * Create a new receiver instance. * @@ -133,7 +139,7 @@ public class AmqpReceiver extends AmqpAbstractResource { public void run() { checkClosed(); close(request); - session.pumpToProtonTransport(); + session.pumpToProtonTransport(request); } }); @@ -156,7 +162,7 @@ public class AmqpReceiver extends AmqpAbstractResource { public void run() { checkClosed(); detach(request); - session.pumpToProtonTransport(); + session.pumpToProtonTransport(request); } }); @@ -222,6 +228,108 @@ public class AmqpReceiver extends AmqpAbstractResource { return prefetch.poll(); } + /** + * Request a remote peer send a Message to this client waiting until one arrives. + * + * @return the pulled AmqpMessage or null if none was pulled from the remote. + * + * @throws IOException if an error occurs + */ + public AmqpMessage pull() throws IOException { + return pull(-1, TimeUnit.MILLISECONDS); + } + + /** + * Request a remote peer send a Message to this client using an immediate drain request. + * + * @return the pulled AmqpMessage or null if none was pulled from the remote. + * + * @throws IOException if an error occurs + */ + public AmqpMessage pullImmediate() throws IOException { + return pull(0, TimeUnit.MILLISECONDS); + } + + /** + * Request a remote peer send a Message to this client. + * + * {@literal timeout < 0} then it should remain open until a message is received. + * {@literal timeout = 0} then it returns a message or null if none available + * {@literal timeout > 0} then it should remain open for timeout amount of time. + * + * The timeout value when positive is given in milliseconds. + * + * @param timeout + * the amount of time to tell the remote peer to keep this pull request valid. + * @param unit + * the unit of measure that the timeout represents. + * + * @return the pulled AmqpMessage or null if none was pulled from the remote. + * + * @throws IOException if an error occurs + */ + public AmqpMessage pull(final long timeout, final TimeUnit unit) throws IOException { + checkClosed(); + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + + long timeoutMills = unit.toMillis(timeout); + + try { + LOG.trace("Pull on Receiver {} with timeout = {}", getSubscriptionName(), timeoutMills); + if (timeoutMills < 0) { + // Wait until message arrives. Just give credit if needed. + if (getEndpoint().getCredit() == 0) { + LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName()); + getEndpoint().flow(1); + } + + // Await the message arrival + pullRequest = request; + } else if (timeoutMills == 0) { + // If we have no credit then we need to issue some so that we can + // try to fulfill the request, then drain down what is there to + // ensure we consume what is available and remove all credit. + if (getEndpoint().getCredit() == 0){ + LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName()); + getEndpoint().flow(1); + } + + // Drain immediately and wait for the message(s) to arrive, + // or a flow indicating removal of the remaining credit. + stop(request); + } else if (timeoutMills > 0) { + // If we have no credit then we need to issue some so that we can + // try to fulfill the request, then drain down what is there to + // ensure we consume what is available and remove all credit. + if (getEndpoint().getCredit() == 0) { + LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName()); + getEndpoint().flow(1); + } + + // Wait for the timeout for the message(s) to arrive, then drain if required + // and wait for remaining message(s) to arrive or a flow indicating + // removal of the remaining credit. + stopOnSchedule(timeoutMills, request); + } + + session.pumpToProtonTransport(request); + } catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + + return prefetch.poll(); + } + + /** * Controls the amount of credit given to the receiver link. * @@ -240,7 +348,7 @@ public class AmqpReceiver extends AmqpAbstractResource { checkClosed(); try { getEndpoint().flow(credit); - session.pumpToProtonTransport(); + session.pumpToProtonTransport(request); request.onSuccess(); } catch (Exception e) { request.onFailure(e); @@ -269,7 +377,7 @@ public class AmqpReceiver extends AmqpAbstractResource { checkClosed(); try { getEndpoint().drain(credit); - session.pumpToProtonTransport(); + session.pumpToProtonTransport(request); request.onSuccess(); } catch (Exception e) { request.onFailure(e); @@ -280,6 +388,31 @@ public class AmqpReceiver extends AmqpAbstractResource { request.sync(); } + /** + * Stops the receiver, using all link credit and waiting for in-flight messages to arrive. + * + * @throws IOException if an error occurs while sending the drain. + */ + public void stop() throws IOException { + checkClosed(); + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + stop(request); + session.pumpToProtonTransport(request); + } catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + /** * Accepts a message that was dispatched under the given Delivery instance. * @@ -318,7 +451,7 @@ public class AmqpReceiver extends AmqpAbstractResource { delivery.settle(); } } - session.pumpToProtonTransport(); + session.pumpToProtonTransport(request); request.onSuccess(); } catch (Exception e) { request.onFailure(e); @@ -360,7 +493,7 @@ public class AmqpReceiver extends AmqpAbstractResource { disposition.setDeliveryFailed(deliveryFailed); delivery.disposition(disposition); delivery.settle(); - session.pumpToProtonTransport(); + session.pumpToProtonTransport(request); } request.onSuccess(); } catch (Exception e) { @@ -397,7 +530,7 @@ public class AmqpReceiver extends AmqpAbstractResource { if (!delivery.isSettled()) { delivery.disposition(Released.getInstance()); delivery.settle(); - session.pumpToProtonTransport(); + session.pumpToProtonTransport(request); } request.onSuccess(); } catch (Exception e) { @@ -454,6 +587,10 @@ public class AmqpReceiver extends AmqpAbstractResource { this.noLocal = noLocal; } + public long getDrainTimeout() { + return session.getConnection().getDrainTimeout(); + } + //----- Internal implementation ------------------------------------------// @Override @@ -604,6 +741,15 @@ public class AmqpReceiver extends AmqpAbstractResource { LOG.trace("{} has a partial incoming Message(s), deferring.", this); incoming = null; } + } else { + // We have exhausted the locally queued messages on this link. + // Check if we tried to stop and have now run out of credit. + if (getEndpoint().getRemoteCredit() <= 0) { + if (stopRequest != null) { + stopRequest.onSuccess(); + stopRequest = null; + } + } } } while (incoming != null); @@ -624,6 +770,35 @@ public class AmqpReceiver extends AmqpAbstractResource { // Store reference to envelope in delivery context for recovery incoming.setContext(amqpMessage); prefetch.add(amqpMessage); + + // We processed a message, signal completion + // of a message pull request if there is one. + if (pullRequest != null) { + pullRequest.onSuccess(); + pullRequest = null; + } + } + + @Override + public void processFlowUpdates(AmqpConnection connection) throws IOException { + if (pullRequest != null || stopRequest != null) { + Receiver receiver = getEndpoint(); + if (receiver.getRemoteCredit() <= 0 && receiver.getQueued() == 0) { + if (pullRequest != null) { + pullRequest.onSuccess(); + pullRequest = null; + } + + if (stopRequest != null) { + stopRequest.onSuccess(); + stopRequest = null; + } + } + } + + LOG.trace("Consumer {} flow updated, remote credit = {}", getSubscriptionName(), getEndpoint().getRemoteCredit()); + + super.processFlowUpdates(connection); } protected Message decodeIncomingMessage(Delivery incoming) { @@ -661,6 +836,61 @@ public class AmqpReceiver extends AmqpAbstractResource { } } + private void stop(final AsyncResult request) { + Receiver receiver = getEndpoint(); + if (receiver.getRemoteCredit() <= 0) { + if (receiver.getQueued() == 0) { + // We have no remote credit and all the deliveries have been processed. + request.onSuccess(); + } else { + // There are still deliveries to process, wait for them to be. + stopRequest = request; + } + } else { + // TODO: We don't actually want the additional messages that could be sent while + // draining. We could explicitly reduce credit first, or possibly use 'echo' instead + // of drain if it was supported. We would first need to understand what happens + // if we reduce credit below the number of messages already in-flight before + // the peer sees the update. + stopRequest = request; + receiver.drain(0); + + if (getDrainTimeout() > 0) { + // If the remote doesn't respond we will close the consumer and break any + // blocked receive or stop calls that are waiting. + final ScheduledFuture future = getSession().getScheduler().schedule(new Runnable() { + @Override + public void run() { + LOG.trace("Consumer {} drain request timed out", this); + Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time"); + locallyClosed(session.getConnection(), cause); + stopRequest.onFailure(cause); + session.pumpToProtonTransport(stopRequest); + } + }, getDrainTimeout(), TimeUnit.MILLISECONDS); + + stopRequest = new ScheduledRequest(future, stopRequest); + } + } + } + + private void stopOnSchedule(long timeout, final AsyncResult request) { + LOG.trace("Receiver {} scheduling stop", this); + // We need to drain the credit if no message(s) arrive to use it. + final ScheduledFuture future = getSession().getScheduler().schedule(new Runnable() { + @Override + public void run() { + LOG.trace("Receiver {} running scheduled stop", this); + if (getEndpoint().getRemoteCredit() != 0) { + stop(request); + session.pumpToProtonTransport(request); + } + } + }, timeout, TimeUnit.MILLISECONDS); + + stopRequest = new ScheduledRequest(future, request); + } + @Override public String toString() { return getClass().getSimpleName() + "{ address = " + address + "}"; @@ -685,4 +915,37 @@ public class AmqpReceiver extends AmqpAbstractResource { void postRollback() { } + + //----- Inner classes used in message pull operations --------------------// + + protected static final class ScheduledRequest implements AsyncResult { + + private final ScheduledFuture sheduledTask; + private final AsyncResult origRequest; + + public ScheduledRequest(ScheduledFuture completionTask, AsyncResult origRequest) { + this.sheduledTask = completionTask; + this.origRequest = origRequest; + } + + @Override + public void onFailure(Throwable cause) { + sheduledTask.cancel(false); + origRequest.onFailure(cause); + } + + @Override + public void onSuccess() { + boolean cancelled = sheduledTask.cancel(false); + if (cancelled) { + // Signal completion. Otherwise wait for the scheduled task to do it. + origRequest.onSuccess(); + } + } + + @Override + public boolean isComplete() { + return origRequest.isComplete(); + } + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java index af68c2d4cb..95ff2d6af0 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java @@ -91,6 +91,18 @@ public interface AmqpResource extends AmqpEventSink { */ void remotelyClosed(AmqpConnection connection); + /** + * Called to indicate that the local end has become closed but the resource + * was not awaiting a close. This could happen during an open request where + * the remote does not set an error condition or during normal operation. + * + * @param connection + * The connection that owns this resource. + * @param error + * The error that triggered the local close of this resource. + */ + void locallyClosed(AmqpConnection connection, Exception error); + /** * Sets the failed state for this Resource and triggers a failure signal for * any pending ProduverRequest. diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 35fe56a7be..f9d64354c5 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -135,7 +135,7 @@ public class AmqpSender extends AmqpAbstractResource { public void run() { try { doSend(message, sendRequest); - session.pumpToProtonTransport(); + session.pumpToProtonTransport(sendRequest); } catch (Exception e) { sendRequest.onFailure(e); session.getConnection().fireClientException(e); @@ -165,7 +165,7 @@ public class AmqpSender extends AmqpAbstractResource { public void run() { checkClosed(); close(request); - session.pumpToProtonTransport(); + session.pumpToProtonTransport(request); } }); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index c0b097cfe0..e327534cb3 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.client; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; import org.apache.qpid.proton.amqp.messaging.Source; @@ -92,7 +93,7 @@ public class AmqpSession extends AmqpAbstractResource { checkClosed(); sender.setStateInspector(getStateInspector()); sender.open(request); - pumpToProtonTransport(); + pumpToProtonTransport(request); } }); @@ -124,7 +125,7 @@ public class AmqpSession extends AmqpAbstractResource { checkClosed(); sender.setStateInspector(getStateInspector()); sender.open(request); - pumpToProtonTransport(); + pumpToProtonTransport(request); } }); @@ -195,7 +196,7 @@ public class AmqpSession extends AmqpAbstractResource { checkClosed(); receiver.setStateInspector(getStateInspector()); receiver.open(request); - pumpToProtonTransport(); + pumpToProtonTransport(request); } }); @@ -227,7 +228,7 @@ public class AmqpSession extends AmqpAbstractResource { checkClosed(); receiver.setStateInspector(getStateInspector()); receiver.open(request); - pumpToProtonTransport(); + pumpToProtonTransport(request); } }); @@ -308,7 +309,7 @@ public class AmqpSession extends AmqpAbstractResource { checkClosed(); receiver.setStateInspector(getStateInspector()); receiver.open(request); - pumpToProtonTransport(); + pumpToProtonTransport(request); } }); @@ -345,7 +346,7 @@ public class AmqpSession extends AmqpAbstractResource { checkClosed(); receiver.setStateInspector(getStateInspector()); receiver.open(request); - pumpToProtonTransport(); + pumpToProtonTransport(request); } }); @@ -427,8 +428,8 @@ public class AmqpSession extends AmqpAbstractResource { return connection.getProtonConnection(); } - void pumpToProtonTransport() { - connection.pumpToProtonTransport(); + void pumpToProtonTransport(AsyncResult request) { + connection.pumpToProtonTransport(request); } AmqpTransactionId getTransactionId() { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java index 64f854ba0d..cf5ef26211 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java @@ -111,7 +111,7 @@ public class AmqpTransactionContext { } } - session.pumpToProtonTransport(); + session.pumpToProtonTransport(request); } }); @@ -153,7 +153,7 @@ public class AmqpTransactionContext { try { LOG.info("Attempting to commit TX:[{}]", transactionId); coordinator.discharge(transactionId, request, true); - session.pumpToProtonTransport(); + session.pumpToProtonTransport(request); } catch (Exception e) { request.onFailure(e); } @@ -198,7 +198,7 @@ public class AmqpTransactionContext { try { LOG.info("Attempting to roll back TX:[{}]", transactionId); coordinator.discharge(transactionId, request, false); - session.pumpToProtonTransport(); + session.pumpToProtonTransport(request); } catch (Exception e) { request.onFailure(e); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/NoOpAsyncResult.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/NoOpAsyncResult.java new file mode 100644 index 0000000000..9d59eacd22 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/NoOpAsyncResult.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +/** + * Simple NoOp implementation used when the result of the operation does not matter. + */ +public class NoOpAsyncResult implements AsyncResult { + + public final static NoOpAsyncResult INSTANCE = new NoOpAsyncResult(); + + @Override + public void onFailure(Throwable result) { + + } + + @Override + public void onSuccess() { + + } + + @Override + public boolean isComplete() { + return true; + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java new file mode 100644 index 0000000000..8379bfbabf --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +/** + * Tests various behaviors of broker side drain support. + */ +public class AmqpReceiverDrainTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testReceiverCanDrainMessages() throws Exception { + int MSG_COUNT = 20; + sendMessages(getTestName(), MSG_COUNT, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + + QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getQueueSize()); + assertEquals(0, queueView.getDispatchCount()); + + receiver.drain(MSG_COUNT); + for (int i = 0; i < MSG_COUNT; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + } + receiver.close(); + + assertEquals(0, queueView.getQueueSize()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testPullWithNoMessageGetDrained() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + + receiver.flow(10); + + QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(0, queueView.getQueueSize()); + assertEquals(0, queueView.getDispatchCount()); + + assertEquals(10, receiver.getReceiver().getRemoteCredit()); + + assertNull(receiver.pull(1, TimeUnit.SECONDS)); + + assertEquals(0, receiver.getReceiver().getRemoteCredit()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testPullOneFromRemote() throws Exception { + int MSG_COUNT = 20; + sendMessages(getTestName(), MSG_COUNT, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + + QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getQueueSize()); + assertEquals(0, queueView.getDispatchCount()); + + assertEquals(0, receiver.getReceiver().getRemoteCredit()); + + AmqpMessage message = receiver.pull(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + + assertEquals(0, receiver.getReceiver().getRemoteCredit()); + + receiver.close(); + + assertEquals(MSG_COUNT - 1, queueView.getQueueSize()); + assertEquals(1, queueView.getDispatchCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMultipleZeroResultPulls() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + + receiver.flow(10); + + QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(0, queueView.getQueueSize()); + assertEquals(0, queueView.getDispatchCount()); + + assertEquals(10, receiver.getReceiver().getRemoteCredit()); + + assertNull(receiver.pull(1, TimeUnit.SECONDS)); + + assertEquals(0, receiver.getReceiver().getRemoteCredit()); + + assertNull(receiver.pull(1, TimeUnit.SECONDS)); + assertNull(receiver.pull(1, TimeUnit.SECONDS)); + + assertEquals(0, receiver.getReceiver().getRemoteCredit()); + + connection.close(); + } + + @Override + protected boolean isUseOpenWireConnector() { + return true; + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index 1502bdaaf9..1226f3ed5e 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -341,34 +341,6 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { connection.close(); } - @Test(timeout = 60000) - public void testReceiverCanDrainMessages() throws Exception { - int MSG_COUNT = 20; - sendMessages(getTestName(), MSG_COUNT, false); - - AmqpClient client = createAmqpClient(); - AmqpConnection connection = client.connect(); - AmqpSession session = connection.createSession(); - - AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); - - QueueViewMBean queueView = getProxyToQueue(getTestName()); - assertEquals(MSG_COUNT, queueView.getQueueSize()); - assertEquals(0, queueView.getDispatchCount()); - - receiver.drain(MSG_COUNT); - for (int i = 0; i < MSG_COUNT; ++i) { - AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(message); - message.accept(); - } - receiver.close(); - - assertEquals(0, queueView.getQueueSize()); - - connection.close(); - } - @Test(timeout = 60000) public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception { AmqpClient client = createAmqpClient(); diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties index be0efab4ff..5208b3fdec 100755 --- a/activemq-amqp/src/test/resources/log4j.properties +++ b/activemq-amqp/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ # log4j.rootLogger=WARN, console, file log4j.logger.org.apache.activemq=INFO -log4j.logger.org.apache.activemq.transport.amqp=DEBUG +log4j.logger.org.apache.activemq.transport.amqp=INFO log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO log4j.logger.org.fusesource=INFO @@ -30,7 +30,7 @@ log4j.logger.org.apache.qpid.jms.provider=INFO log4j.logger.org.apache.qpid.jms.provider.amqp=INFO log4j.logger.org.apache.qpid.jms.provider.amqp.FRAMES=INFO -# Console will only display warnnings +# Console will only display warnings log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d [%-15.15t] - %-5p %-25.30c{1} - %m%n