From 65d789b4e1d59a196aa7f8f7e42aa99769352d8e Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 18 Nov 2016 11:59:01 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6500 Better handle prefetch extension and pull consumers over Topics to avoid the remote not receiving all the messages available based on the credit it has issued. (cherry picked from commit e050519ff6ae8079c5183f6e6372ddb3d03e91c7) --- .../transport/amqp/JMSClientTest.java | 77 +++++++- .../amqp/interop/AmqpReceiverDrainTest.java | 148 ++++++++++++--- .../amqp/interop/AmqpReceiverTest.java | 43 ++++- .../amqp/interop/AmqpSendReceiveTest.java | 107 ++++++++++- .../broker/region/TopicSubscription.java | 98 ++++++---- .../TopicSubscriptionZeroPrefetchTest.java | 176 ++++++++++++++++-- 6 files changed, 547 insertions(+), 102 deletions(-) diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 358365619f..97ce106b04 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -230,7 +230,7 @@ public class JMSClientTest extends JMSClientTestSupport { assertEquals(totalCount, proxy.getQueueSize()); // Consume again..check we receive all the messages. - Set messageNumbers = new HashSet(); + Set messageNumbers = new HashSet<>(); for (int i = 1; i <= totalCount; i++) { messageNumbers.add(i); } @@ -644,7 +644,7 @@ public class JMSClientTest extends JMSClientTestSupport { public void testDurableConsumerAsync() throws Exception { ActiveMQAdmin.enableJMSFrameTracing(); final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference received = new AtomicReference(); + final AtomicReference received = new AtomicReference<>(); String durableClientId = getDestinationName() + "-ClientId"; connection = createConnection(durableClientId); @@ -693,7 +693,7 @@ public class JMSClientTest extends JMSClientTestSupport { message.setText("hello"); producer.send(message); - final AtomicReference msg = new AtomicReference(); + final AtomicReference msg = new AtomicReference<>(); assertTrue(Wait.waitFor(new Wait.Condition() { @Override @@ -712,7 +712,7 @@ public class JMSClientTest extends JMSClientTestSupport { public void testTopicConsumerAsync() throws Exception { ActiveMQAdmin.enableJMSFrameTracing(); final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference received = new AtomicReference(); + final AtomicReference received = new AtomicReference<>(); connection = createConnection(); { @@ -760,7 +760,7 @@ public class JMSClientTest extends JMSClientTestSupport { message.setText("hello"); producer.send(message); - final AtomicReference msg = new AtomicReference(); + final AtomicReference msg = new AtomicReference<>(); assertTrue(Wait.waitFor(new Wait.Condition() { @Override @@ -782,7 +782,7 @@ public class JMSClientTest extends JMSClientTestSupport { final ConnectorViewMBean connector = getProxyToConnectionView(getTargetConnectorName()); LOG.info("Current number of Connections is: {}", connector.connectionCount()); - ArrayList connections = new ArrayList(); + ArrayList connections = new ArrayList<>(); for (int i = 0; i < 10; i++) { connections.add(createConnection(null)); @@ -1265,4 +1265,69 @@ public class JMSClientTest extends JMSClientTestSupport { assertFalse(message.getJMSRedelivered()); } } + + @Test(timeout = 30000) + public void testProduceAndConsumeLargeNumbersOfTopicMessagesClientAck() throws Exception { + doTestProduceAndConsumeLargeNumbersOfMessages(true, Session.CLIENT_ACKNOWLEDGE); + } + + @Test(timeout = 30000) + public void testProduceAndConsumeLargeNumbersOfQueueMessagesClientAck() throws Exception { + doTestProduceAndConsumeLargeNumbersOfMessages(false, Session.CLIENT_ACKNOWLEDGE); + } + + @Test(timeout = 30000) + public void testProduceAndConsumeLargeNumbersOfTopicMessagesAutoAck() throws Exception { + doTestProduceAndConsumeLargeNumbersOfMessages(true, Session.AUTO_ACKNOWLEDGE); + } + + @Test(timeout = 30000) + public void testProduceAndConsumeLargeNumbersOfQueueMessagesAutoAck() throws Exception { + doTestProduceAndConsumeLargeNumbersOfMessages(false, Session.AUTO_ACKNOWLEDGE); + } + + public void doTestProduceAndConsumeLargeNumbersOfMessages(boolean topic, int ackMode) throws Exception { + + final int MSG_COUNT = 1000; + final CountDownLatch done = new CountDownLatch(MSG_COUNT); + + JmsConnectionFactory factory = new JmsConnectionFactory(getAmqpURI()); + factory.setForceSyncSend(true); + + connection = factory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, ackMode); + final Destination destination; + if (topic) { + destination = session.createTopic(getDestinationName()); + } else { + destination = session.createQueue(getDestinationName()); + } + + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + try { + message.acknowledge(); + done.countDown(); + } catch (JMSException ex) { + LOG.info("Caught exception.", ex); + } + } + }); + + MessageProducer producer = session.createProducer(destination); + + TextMessage textMessage = session.createTextMessage(); + textMessage.setText("messageText"); + + for (int i = 0; i < MSG_COUNT; i++) { + producer.send(textMessage); + } + + assertTrue("Did not receive all messages: " + MSG_COUNT, done.await(15, TimeUnit.SECONDS)); + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java index aa74100d21..993f26b821 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverDrainTest.java @@ -22,7 +22,9 @@ import static org.junit.Assert.assertNull; import java.util.concurrent.TimeUnit; +import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; import org.apache.activemq.transport.amqp.client.AmqpConnection; @@ -37,47 +39,93 @@ import org.junit.Test; public class AmqpReceiverDrainTest extends AmqpClientTestSupport { @Test(timeout = 60000) - public void testReceiverCanDrainMessages() throws Exception { + public void testReceiverCanDrainMessagesQueue() throws Exception { + doTestReceiverCanDrainMessages(false); + } + + @Test(timeout = 60000) + public void testReceiverCanDrainMessagesTopic() throws Exception { + doTestReceiverCanDrainMessages(true); + } + + private void doTestReceiverCanDrainMessages(boolean topic) throws Exception { + final String destinationName; + if (topic) { + destinationName = "topic://" + getTestName(); + } else { + destinationName = "queue://" + getTestName(); + } + int MSG_COUNT = 20; - sendMessages(getTestName(), MSG_COUNT, false); AmqpClient client = createAmqpClient(); AmqpConnection connection = trackConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + AmqpReceiver receiver = session.createReceiver(destinationName); - QueueViewMBean queueView = getProxyToQueue(getTestName()); - assertEquals(MSG_COUNT, queueView.getQueueSize()); - assertEquals(0, queueView.getDispatchCount()); + sendMessages(getTestName(), MSG_COUNT, topic); + + final DestinationViewMBean destinationView; + if (topic) { + destinationView = getProxyToTopic(getTestName()); + } else { + destinationView = getProxyToQueue(getTestName()); + } + + assertEquals(MSG_COUNT, destinationView.getEnqueueCount()); + assertEquals(0, destinationView.getDispatchCount()); receiver.drain(MSG_COUNT); for (int i = 0; i < MSG_COUNT; ++i) { AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(message); + assertNotNull("Failed to read message: " + (i + 1), message); + LOG.info("Read message: {}", message.getMessageId()); message.accept(); } receiver.close(); - assertEquals(0, queueView.getQueueSize()); + assertEquals(MSG_COUNT, destinationView.getDequeueCount()); connection.close(); } @Test(timeout = 60000) - public void testPullWithNoMessageGetDrained() throws Exception { + public void testPullWithNoMessageGetDrainedQueue() throws Exception { + doTestPullWithNoMessageGetDrained(false); + } + + @Test(timeout = 60000) + public void testPullWithNoMessageGetDrainedTopic() throws Exception { + doTestPullWithNoMessageGetDrained(true); + } + + private void doTestPullWithNoMessageGetDrained(boolean topic) throws Exception { + + final String destinationName; + if (topic) { + destinationName = "topic://" + getTestName(); + } else { + destinationName = "queue://" + getTestName(); + } AmqpClient client = createAmqpClient(); AmqpConnection connection = trackConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + AmqpReceiver receiver = session.createReceiver(destinationName); receiver.flow(10); - QueueViewMBean queueView = getProxyToQueue(getTestName()); - assertEquals(0, queueView.getQueueSize()); - assertEquals(0, queueView.getDispatchCount()); + final DestinationViewMBean destinationView; + if (topic) { + destinationView = getProxyToTopic(getTestName()); + } else { + destinationView = getProxyToQueue(getTestName()); + } + + assertEquals(0, destinationView.getEnqueueCount()); + assertEquals(0, destinationView.getDispatchCount()); assertEquals(10, receiver.getReceiver().getRemoteCredit()); @@ -89,19 +137,42 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport { } @Test(timeout = 60000) - public void testPullOneFromRemote() throws Exception { - int MSG_COUNT = 20; - sendMessages(getTestName(), MSG_COUNT, false); + public void testPullOneFromRemoteQueue() throws Exception { + doTestPullOneFromRemote(false); + } + + @Test(timeout = 60000) + public void testPullOneFromRemoteTopic() throws Exception { + doTestPullOneFromRemote(true); + } + + private void doTestPullOneFromRemote(boolean topic) throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = trackConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + final String destinationName; + if (topic) { + destinationName = "topic://" + getTestName(); + } else { + destinationName = "queue://" + getTestName(); + } - QueueViewMBean queueView = getProxyToQueue(getTestName()); - assertEquals(MSG_COUNT, queueView.getQueueSize()); - assertEquals(0, queueView.getDispatchCount()); + AmqpReceiver receiver = session.createReceiver(destinationName); + + int MSG_COUNT = 20; + sendMessages(getTestName(), MSG_COUNT, topic); + + final DestinationViewMBean destinationView; + if (topic) { + destinationView = getProxyToTopic(getTestName()); + } else { + destinationView = getProxyToQueue(getTestName()); + } + + assertEquals(MSG_COUNT, destinationView.getEnqueueCount()); + assertEquals(0, destinationView.getDispatchCount()); assertEquals(0, receiver.getReceiver().getRemoteCredit()); @@ -113,25 +184,48 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport { receiver.close(); - assertEquals(MSG_COUNT - 1, queueView.getQueueSize()); - assertEquals(1, queueView.getDispatchCount()); + assertEquals(MSG_COUNT - 1, destinationView.getEnqueueCount() - destinationView.getDequeueCount()); + assertEquals(1, destinationView.getDispatchCount()); connection.close(); } @Test(timeout = 60000) - public void testMultipleZeroResultPulls() throws Exception { + public void testMultipleZeroResultPullsQueue() throws Exception { + doTestMultipleZeroResultPulls(false); + } + + @Test(timeout = 60000) + public void testMultipleZeroResultPullsTopic() throws Exception { + doTestMultipleZeroResultPulls(true); + } + + private void doTestMultipleZeroResultPulls(boolean topic) throws Exception { + AmqpClient client = createAmqpClient(); AmqpConnection connection = trackConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + final String destinationName; + if (topic) { + destinationName = "topic://" + getTestName(); + } else { + destinationName = "queue://" + getTestName(); + } + + AmqpReceiver receiver = session.createReceiver(destinationName); receiver.flow(10); - QueueViewMBean queueView = getProxyToQueue(getTestName()); - assertEquals(0, queueView.getQueueSize()); - assertEquals(0, queueView.getDispatchCount()); + if (topic) { + TopicViewMBean topicView = getProxyToTopic(getTestName()); + assertEquals(0, topicView.getEnqueueCount()); + assertEquals(0, topicView.getDispatchCount()); + } else { + QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(0, queueView.getQueueSize()); + assertEquals(0, queueView.getDispatchCount()); + } assertEquals(10, receiver.getReceiver().getRemoteCredit()); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index 08df785c1d..f60af7bbd9 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -29,6 +29,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.junit.ActiveMQTestRunner; import org.apache.activemq.junit.Repeat; @@ -268,19 +269,43 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { @Test(timeout = 60000) @Repeat(repetitions = 1) - public void testPresettledReceiverReadsAllMessagesInNonFlowBatch() throws Exception { + public void testPresettledReceiverReadsAllMessagesInNonFlowBatchQueue() throws Exception { + doTestPresettledReceiverReadsAllMessagesInNonFlowBatch(false); + } + + @Test(timeout = 60000) + @Repeat(repetitions = 1) + public void testPresettledReceiverReadsAllMessagesInNonFlowBatchTopic() throws Exception { + doTestPresettledReceiverReadsAllMessagesInNonFlowBatch(true); + } + + private void doTestPresettledReceiverReadsAllMessagesInNonFlowBatch(boolean topic) throws Exception { + + final String destinationName; + if (topic) { + destinationName = "topic://" + getTestName(); + } else { + destinationName = "queue://" + getTestName(); + } + final int MSG_COUNT = 100; - sendMessages(getTestName(), MSG_COUNT, false); AmqpClient client = createAmqpClient(); AmqpConnection connection = trackConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), null, false, true); + AmqpReceiver receiver = session.createReceiver(destinationName, null, false, true); - QueueViewMBean queueView = getProxyToQueue(getTestName()); - assertEquals(MSG_COUNT, queueView.getQueueSize()); - assertEquals(0, queueView.getDispatchCount()); + sendMessages(getTestName(), MSG_COUNT, topic); + + final DestinationViewMBean destinationView; + if (topic) { + destinationView = getProxyToTopic(getTestName()); + } else { + destinationView = getProxyToQueue(getTestName()); + } + assertEquals(MSG_COUNT, destinationView.getEnqueueCount()); + assertEquals(0, destinationView.getDispatchCount()); receiver.flow(20); // consume less that flow @@ -302,7 +327,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { receiver.close(); - assertEquals(0, queueView.getQueueSize()); + assertEquals(0, destinationView.getEnqueueCount() - destinationView.getDequeueCount()); connection.close(); } @@ -481,7 +506,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { } }); - Map filters = new HashMap(); + Map filters = new HashMap<>(); filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKOWN_FILTER); Source source = new Source(); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java index ef3f27dc37..cb60e545e7 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Queue; import javax.jms.Topic; @@ -60,13 +61,30 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class); @Test(timeout = 60000) - public void testSimpleSendOneReceiveOne() throws Exception { + public void testSimpleSendOneReceiveOneToQueue() throws Exception { + doTestSimpleSendOneReceiveOne(Queue.class); + } + + @Test(timeout = 60000) + public void testSimpleSendOneReceiveOneToTopic() throws Exception { + doTestSimpleSendOneReceiveOne(Topic.class); + } + + public void doTestSimpleSendOneReceiveOne(Class destType) throws Exception { + + final String address; + if (Queue.class.equals(destType)) { + address = "queue://" + getTestName(); + } else { + address = "topic://" + getTestName(); + } AmqpClient client = createAmqpClient(); AmqpConnection connection = trackConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender("queue://" + getTestName()); + AmqpSender sender = session.createSender(address); + AmqpReceiver receiver = session.createReceiver(address); AmqpMessage message = new AmqpMessage(); @@ -78,7 +96,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { sender.close(); LOG.info("Attempting to read message with receiver"); - AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); receiver.flow(2); AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); assertNotNull("Should have read message", received); @@ -366,7 +383,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { private void doTestReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync(Class destType) throws Exception { final AmqpClient client = createAmqpClient(); - final LinkedList errors = new LinkedList(); + final LinkedList errors = new LinkedList<>(); final CountDownLatch receiverReady = new CountDownLatch(1); ExecutorService executorService = Executors.newCachedThreadPool(); @@ -609,7 +626,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { sender.send(message); } - List pendingAcks = new ArrayList(); + List pendingAcks = new ArrayList<>(); for (int i = 0; i < MSG_COUNT; i++) { receiver.flow(1); @@ -719,4 +736,84 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } + + @Test(timeout = 60000) + public void testSendReceiveLotsOfDurableMessagesOnQueue() throws Exception { + doTestSendReceiveLotsOfDurableMessages(Queue.class); + } + + @Test(timeout = 60000) + public void testSendReceiveLotsOfDurableMessagesOnTopic() throws Exception { + doTestSendReceiveLotsOfDurableMessages(Topic.class); + } + + private void doTestSendReceiveLotsOfDurableMessages(Class destType) throws Exception { + final int MSG_COUNT = 1000; + + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + final CountDownLatch done = new CountDownLatch(MSG_COUNT); + final AtomicBoolean error = new AtomicBoolean(false); + final ExecutorService executor = Executors.newSingleThreadExecutor(); + + final String address; + if (Queue.class.equals(destType)) { + address = "queue://" + getTestName(); + } else { + address = "topic://" + getTestName(); + } + + final AmqpReceiver receiver = session.createReceiver(address); + receiver.flow(MSG_COUNT); + + AmqpSender sender = session.createSender(address); + + final DestinationViewMBean destinationView; + if (Queue.class.equals(destType)) { + destinationView = getProxyToQueue(getTestName()); + } else { + destinationView = getProxyToTopic(getTestName()); + } + + executor.execute(new Runnable() { + + @Override + public void run() { + for (int i = 0; i < MSG_COUNT; i++) { + try { + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + received.accept(); + done.countDown(); + } catch (Exception ex) { + LOG.info("Caught error: {}", ex.getClass().getSimpleName()); + error.set(true); + } + } + } + }); + + for (int i = 0; i < MSG_COUNT; i++) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("msg" + i); + sender.send(message); + } + + assertTrue("did not read all messages, waiting on: " + done.getCount(), done.await(10, TimeUnit.SECONDS)); + assertFalse("should not be any errors on receive", error.get()); + + assertTrue("Should be no inflight messages: " + destinationView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return destinationView.getInFlightCount() == 0; + } + })); + + sender.close(); + receiver.close(); + connection.close(); + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 6ab264da75..d993333da0 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.jms.JMSException; @@ -276,37 +275,61 @@ public class TopicSubscription extends AbstractSubscription { public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { super.acknowledge(context, ack); - // Handle the standard acknowledgment case. - if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) { - if (context.isInTransaction()) { - context.getTransaction().addSynchronization(new Synchronization() { - @Override - public void afterCommit() throws Exception { - updateStatsOnAck(ack); - dispatchMatched(); - } - }); - } else { - updateStatsOnAck(ack); + if (ack.isStandardAck()) { + updateStatsOnAck(context, ack); + } else if (ack.isPoisonAck()) { + if (ack.isInTransaction()) { + throw new JMSException("Poison ack cannot be transacted: " + ack); + } + updateStatsOnAck(context, ack); + if (getPrefetchSize() != 0) { + decrementPrefetchExtension(ack.getMessageCount()); + } + } else if (ack.isIndividualAck()) { + updateStatsOnAck(context, ack); + if (getPrefetchSize() != 0 && ack.isInTransaction()) { + incrementPrefetchExtension(ack.getMessageCount()); } - updatePrefetch(ack); - dispatchMatched(); - return; - } else if (ack.isDeliveredAck()) { - // Message was delivered but not acknowledged: update pre-fetch counters. - prefetchExtension.addAndGet(ack.getMessageCount()); - dispatchMatched(); - return; } else if (ack.isExpiredAck()) { updateStatsOnAck(ack); - updatePrefetch(ack); - dispatchMatched(); - return; + if (getPrefetchSize() != 0) { + incrementPrefetchExtension(ack.getMessageCount()); + } + } else if (ack.isDeliveredAck()) { + // Message was delivered but not acknowledged: update pre-fetch counters. + if (getPrefetchSize() != 0) { + incrementPrefetchExtension(ack.getMessageCount()); + } } else if (ack.isRedeliveredAck()) { - // nothing to do atm + // No processing for redelivered needed return; + } else { + throw new JMSException("Invalid acknowledgment: " + ack); + } + + dispatchMatched(); + } + + private void updateStatsOnAck(final ConnectionContext context, final MessageAck ack) { + if (context.isInTransaction()) { + context.getTransaction().addSynchronization(new Synchronization() { + + @Override + public void beforeEnd() { + if (getPrefetchSize() != 0) { + decrementPrefetchExtension(ack.getMessageCount()); + } + } + + @Override + public void afterCommit() throws Exception { + updateStatsOnAck(ack); + dispatchMatched(); + } + }); + } else { + updateStatsOnAck(ack); } - throw new JMSException("Invalid acknowledgment: " + ack); } @Override @@ -399,20 +422,20 @@ public class TopicSubscription extends AbstractSubscription { } } - private void updatePrefetch(MessageAck ack) { + private void incrementPrefetchExtension(int amount) { while (true) { int currentExtension = prefetchExtension.get(); - int newExtension = Math.max(0, currentExtension - ack.getMessageCount()); + int newExtension = Math.max(0, currentExtension + amount); if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { break; } } } - private void decrementPrefetchExtension() { + private void decrementPrefetchExtension(int amount) { while (true) { int currentExtension = prefetchExtension.get(); - int newExtension = Math.max(0, currentExtension - 1); + int newExtension = Math.max(0, currentExtension - amount); if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { break; } @@ -439,7 +462,7 @@ public class TopicSubscription extends AbstractSubscription { @Override public int getDispatchedQueueSize() { return (int)(getSubscriptionStatistics().getDispatched().getCount() - - prefetchExtension.get() - getSubscriptionStatistics().getDequeues().getCount()); + getSubscriptionStatistics().getDequeues().getCount()); } public int getMaximumPendingMessages() { @@ -538,10 +561,7 @@ public class TopicSubscription extends AbstractSubscription { // ------------------------------------------------------------------------- @Override public boolean isFull() { - if (info.getPrefetchSize() == 0) { - return prefetchExtension.get() == 0; - } - return getDispatchedQueueSize() >= info.getPrefetchSize(); + return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : getDispatchedQueueSize() - prefetchExtension.get() >= info.getPrefetchSize(); } @Override @@ -554,7 +574,7 @@ public class TopicSubscription extends AbstractSubscription { */ @Override public boolean isLowWaterMark() { - return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4); + return (getDispatchedQueueSize() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4); } /** @@ -562,7 +582,7 @@ public class TopicSubscription extends AbstractSubscription { */ @Override public boolean isHighWaterMark() { - return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9); + return (getDispatchedQueueSize() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9); } /** @@ -669,10 +689,10 @@ public class TopicSubscription extends AbstractSubscription { } if (getPrefetchSize() == 0) { - decrementPrefetchExtension(); + decrementPrefetchExtension(1); } - } + if (info.isDispatchAsync()) { if (node != null) { md.setTransmitCallback(new TransmitCallback() { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java index 38fa9217ea..5fc040cfba 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,24 +16,42 @@ */ package org.apache.activemq.usecases; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; -import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQTopic; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TopicSubscriptionZeroPrefetchTest { - private static final String TOPIC_NAME = "slow.consumer"; + private static final Logger LOG = LoggerFactory.getLogger(TopicSubscriptionZeroPrefetchTest.class); + + @Rule + public TestName name = new TestName(); + private Connection connection; private Session session; private ActiveMQTopic destination; @@ -41,6 +59,10 @@ public class TopicSubscriptionZeroPrefetchTest { private MessageConsumer consumer; private BrokerService brokerService; + public String getTopicName() { + return name.getMethodName(); + } + @Before public void setUp() throws Exception { @@ -52,7 +74,7 @@ public class TopicSubscriptionZeroPrefetchTest { connection = activeMQConnectionFactory.createConnection(); connection.setClientID("ClientID-1"); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = new ActiveMQTopic(TOPIC_NAME); + destination = new ActiveMQTopic(getTopicName()); producer = session.createProducer(destination); connection.start(); @@ -61,10 +83,10 @@ public class TopicSubscriptionZeroPrefetchTest { /* * test non durable topic subscription with prefetch set to zero */ - @Test(timeout=60000) + @Test(timeout = 60000) public void testTopicConsumerPrefetchZero() throws Exception { - ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0"); + ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0"); consumer = session.createConsumer(consumerDestination); // publish messages @@ -76,31 +98,153 @@ public class TopicSubscriptionZeroPrefetchTest { Assert.assertNotNull("should have received a message the published message", consumedMessage); } - @Test(timeout=60000) - public void testTopicConsumerPrefetchZeroClientAckLoop() throws Exception { - ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0"); + @Test(timeout = 60000) + public void testTopicConsumerPrefetchZeroClientAckLoopReceive() throws Exception { + ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0"); Session consumerClientAckSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); consumer = consumerClientAckSession.createConsumer(consumerDestination); final int count = 10; - for (int i=0;i