diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index 9a51a837d7..fae9c7c45c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -330,4 +330,37 @@ public abstract class AbstractSubscription implements Subscription { public AtomicInteger getPrefetchExtension() { return this.prefetchExtension; } + + protected void contractPrefetchExtension(int amount) { + if (isUsePrefetchExtension() && getPrefetchSize() != 0) { + decrementPrefetchExtension(amount); + } + } + + protected void expandPrefetchExtension(int amount) { + if (isUsePrefetchExtension() && getPrefetchSize() != 0) { + incrementPrefetchExtension(amount); + } + } + + protected void decrementPrefetchExtension(int amount) { + while (true) { + int currentExtension = prefetchExtension.get(); + int newExtension = Math.max(0, currentExtension - amount); + if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { + break; + } + } + } + + private void incrementPrefetchExtension(int amount) { + while (true) { + int currentExtension = prefetchExtension.get(); + int newExtension = Math.max(currentExtension, currentExtension + amount); + if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { + break; + } + } + } + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 314285fdfa..fc68fc10ff 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -226,6 +226,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { getSubscriptionStatistics().getDequeues().increment(); ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); removeList.add(node); + contractPrefetchExtension(1); } else { registerRemoveSync(context, node); } @@ -258,28 +259,18 @@ public abstract class PrefetchSubscription extends AbstractSubscription { ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); dispatched.remove(node); getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); + contractPrefetchExtension(1); } else { registerRemoveSync(context, node); + expandPrefetchExtension(1); } - - if (isUsePrefetchExtension() && getPrefetchSize() != 0 && ack.isInTransaction()) { - // allow transaction batch to exceed prefetch - while (true) { - int currentExtension = prefetchExtension.get(); - int newExtension = Math.max(currentExtension, currentExtension + 1); - if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { - break; - } - } - } - acknowledge(context, ack, node); destination = (Destination) node.getRegionDestination(); callDispatchMatched = true; break; } } - }else if (ack.isDeliveredAck()) { + } else if (ack.isDeliveredAck()) { // Message was delivered but not acknowledged: update pre-fetch // counters. int index = 0; @@ -287,16 +278,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { final MessageReference node = iter.next(); Destination nodeDest = (Destination) node.getRegionDestination(); if (ack.getLastMessageId().equals(node.getMessageId())) { - if (isUsePrefetchExtension() && getPrefetchSize() != 0) { - // allow batch to exceed prefetch - while (true) { - int currentExtension = prefetchExtension.get(); - int newExtension = Math.max(currentExtension, index + 1); - if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { - break; - } - } - } + expandPrefetchExtension(ack.getMessageCount()); destination = nodeDest; callDispatchMatched = true; break; @@ -327,17 +309,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { nodeDest.getDestinationStatistics().getInflight().decrement(); if (ack.getLastMessageId().equals(messageId)) { - if (isUsePrefetchExtension() && getPrefetchSize() != 0) { - // allow batch to exceed prefetch - while (true) { - int currentExtension = prefetchExtension.get(); - int newExtension = Math.max(currentExtension, index + 1); - if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { - break; - } - } - } - + contractPrefetchExtension(1); destination = (Destination) node.getRegionDestination(); callDispatchMatched = true; break; @@ -399,13 +371,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { index++; acknowledge(context, ack, node); if (ack.getLastMessageId().equals(messageId)) { - while (true) { - int currentExtension = prefetchExtension.get(); - int newExtension = Math.max(0, currentExtension - (index + 1)); - if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { - break; - } - } + contractPrefetchExtension(1); destination = nodeDest; callDispatchMatched = true; break; @@ -441,38 +407,24 @@ public abstract class PrefetchSubscription extends AbstractSubscription { context.getTransaction().addSynchronization( new Synchronization() { - @Override - public void beforeEnd() { - if (isUsePrefetchExtension() && getPrefetchSize() != 0) { - while (true) { - int currentExtension = prefetchExtension.get(); - int newExtension = Math.max(0, currentExtension - 1); - if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { - break; - } - } - } - } - @Override public void afterCommit() throws Exception { Destination nodeDest = (Destination) node.getRegionDestination(); - synchronized(dispatchLock) { + synchronized (dispatchLock) { getSubscriptionStatistics().getDequeues().increment(); dispatched.remove(node); getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); nodeDest.getDestinationStatistics().getInflight().decrement(); } + contractPrefetchExtension(1); nodeDest.wakeup(); dispatchPending(); } @Override public void afterRollback() throws Exception { - synchronized(dispatchLock) { - // poisionAck will decrement - otherwise still inflight on client - } + contractPrefetchExtension(1); } }); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 49dd5a2508..fba78cb5a7 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1681,6 +1681,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index if (!added || browser.atMax()) { browser.decrementQueueRef(); browserDispatches.remove(browserDispatch); + } else { + wakeup(); } } catch (Exception e) { LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 65c2ba954b..4b958f3bd1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -281,24 +281,18 @@ public class TopicSubscription extends AbstractSubscription { throw new JMSException("Poison ack cannot be transacted: " + ack); } updateStatsOnAck(context, ack); - if (getPrefetchSize() != 0) { - decrementPrefetchExtension(ack.getMessageCount()); - } + contractPrefetchExtension(ack.getMessageCount()); } else if (ack.isIndividualAck()) { updateStatsOnAck(context, ack); - if (getPrefetchSize() != 0 && ack.isInTransaction()) { - incrementPrefetchExtension(ack.getMessageCount()); + if (ack.isInTransaction()) { + expandPrefetchExtension(1); } } else if (ack.isExpiredAck()) { updateStatsOnAck(ack); - if (getPrefetchSize() != 0) { - incrementPrefetchExtension(ack.getMessageCount()); - } + contractPrefetchExtension(ack.getMessageCount()); } else if (ack.isDeliveredAck()) { // Message was delivered but not acknowledged: update pre-fetch counters. - if (getPrefetchSize() != 0) { - incrementPrefetchExtension(ack.getMessageCount()); - } + expandPrefetchExtension(ack.getMessageCount()); } else if (ack.isRedeliveredAck()) { // No processing for redelivered needed return; @@ -314,14 +308,13 @@ public class TopicSubscription extends AbstractSubscription { context.getTransaction().addSynchronization(new Synchronization() { @Override - public void beforeEnd() { - if (getPrefetchSize() != 0) { - decrementPrefetchExtension(ack.getMessageCount()); - } + public void afterRollback() { + contractPrefetchExtension(ack.getMessageCount()); } @Override public void afterCommit() throws Exception { + contractPrefetchExtension(ack.getMessageCount()); updateStatsOnAck(ack); dispatchMatched(); } @@ -417,29 +410,9 @@ public class TopicSubscription extends AbstractSubscription { if (ack.isExpiredAck()) { destination.getDestinationStatistics().getExpired().add(ack.getMessageCount()); } - } - } - } - - private void incrementPrefetchExtension(int amount) { - if (!isUsePrefetchExtension()) { - return; - } - while (true) { - int currentExtension = prefetchExtension.get(); - int newExtension = Math.max(0, currentExtension + amount); - if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { - break; - } - } - } - - private void decrementPrefetchExtension(int amount) { - while (true) { - int currentExtension = prefetchExtension.get(); - int newExtension = Math.max(0, currentExtension - amount); - if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { - break; + if (!ack.isInTransaction()) { + contractPrefetchExtension(1); + } } } } diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index c6a50ebdb6..7ded5031b5 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -28,6 +28,7 @@ import java.io.StringReader; import java.net.SocketTimeoutException; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -46,10 +47,15 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.management.ObjectName; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.AbstractSubscription; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.util.Wait; @@ -2263,6 +2269,91 @@ public class StompTest extends StompTestSupport { assertEquals(bigBody, sframe.getBody()); } + @Test(timeout = 60000) + public void testAckInTransactionTopic() throws Exception { + doTestAckInTransaction(true); + } + + @Test(timeout = 60000) + public void testAckInTransactionQueue() throws Exception { + doTestAckInTransaction(false); + } + + public void doTestAckInTransaction(boolean topic) throws Exception { + + String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL; + + stompConnection.sendFrame(frame); + stompConnection.receive(); + String destination = (topic ? "/topic" : "/queue") + "/test"; + stompConnection.subscribe(destination, Stomp.Headers.Subscribe.AckModeValues.CLIENT); + + for (int j = 0; j < 5; j++) { + + for (int i = 0; i < 10; i++) { + stompConnection.send(destination , "message" + i); + } + + stompConnection.begin("tx"+j); + + for (int i = 0; i < 10; i++) { + StompFrame message = stompConnection.receive(); + stompConnection.ack(message, "tx"+j); + + } + stompConnection.commit("tx"+j); + } + + List subs = getDestinationConsumers(brokerService, + ActiveMQDestination.createDestination("test", topic ? ActiveMQDestination.TOPIC_TYPE : ActiveMQDestination.QUEUE_TYPE)); + + + for (Subscription subscription : subs) { + final AbstractSubscription abstractSubscription = (AbstractSubscription) subscription; + + assertTrue("prefetchExtension should be back to Zero after commit", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("ext: " + abstractSubscription.getPrefetchExtension().get()); + return abstractSubscription.getPrefetchExtension().get() == 0; + } + })); + } + } + + public static List getDestinationConsumers(BrokerService broker, ActiveMQDestination destination) { + List result = null; + org.apache.activemq.broker.region.Destination dest = getDestination(broker, destination); + if (dest != null) { + result = dest.getConsumers(); + } + return result; + } + + public static org.apache.activemq.broker.region.Destination getDestination(BrokerService target, ActiveMQDestination destination) { + org.apache.activemq.broker.region.Destination result = null; + for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target, destination).values()) { + if (dest.getName().equals(destination.getPhysicalName())) { + result = dest; + break; + } + } + return result; + } + + private static Map getDestinationMap(BrokerService target, + ActiveMQDestination destination) { + RegionBroker regionBroker = (RegionBroker) target.getRegionBroker(); + if (destination.isTemporary()) { + return destination.isQueue() ? regionBroker.getTempQueueRegion().getDestinationMap() : + regionBroker.getTempTopicRegion().getDestinationMap(); + } + return destination.isQueue() ? + regionBroker.getQueueRegion().getDestinationMap() : + regionBroker.getTopicRegion().getDestinationMap(); + } + + protected SamplePojo createObjectFromJson(String data) throws Exception { HierarchicalStreamReader in = new JettisonMappedXmlDriver().createReader(new StringReader(data)); return createObject(in); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java index 5ccf1bd515..abf9f62e57 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -19,6 +19,7 @@ package org.apache.activemq; import java.lang.Thread.UncaughtExceptionHandler; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -42,11 +43,16 @@ import javax.management.ObjectName; import junit.framework.Test; import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.region.QueueSubscription; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.activemq.TestSupport.getDestinationConsumers; + /** * Test cases used to test the JMS message consumer. */ @@ -223,6 +229,83 @@ public class JMSConsumerTest extends JmsTestSupport { message.acknowledge(); } + public void testReceiveTopicWithPrefetch1() throws Exception { + + // Set prefetch to 1 + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = createDestination(session, Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 4); + + // Make sure 4 messages were delivered. + Message message = null; + for (int i = 0; i < 4; i++) { + message = consumer.receive(1000); + assertNotNull(message); + } + + final List subscriptions = getDestinationConsumers(broker, destination); + + assertTrue("prefetch extension back to 0", + subscriptions.stream(). + filter(s -> s instanceof TopicSubscription). + mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()). + allMatch(e -> e == 4)); + + assertNull(consumer.receiveNoWait()); + message.acknowledge(); + + assertTrue("prefetch extension back to 0", + subscriptions.stream(). + filter(s -> s instanceof TopicSubscription). + mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()). + allMatch(e -> e == 0)); + + } + + public void testReceiveQueueWithPrefetch1() throws Exception { + + // Set prefetch to 1 + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = createDestination(session, Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 4); + + // Make sure 4 messages were delivered. + Message message = null; + for (int i = 0; i < 4; i++) { + message = consumer.receive(1000); + assertNotNull(message); + } + + final List subscriptions = getDestinationConsumers(broker, destination); + + assertTrue("prefetch extension..", + subscriptions.stream(). + filter(s -> s instanceof QueueSubscription). + mapToInt(s -> ((QueueSubscription)s).getPrefetchExtension().get()). + allMatch(e -> e == 4)); + + assertNull(consumer.receiveNoWait()); + message.acknowledge(); + + assertTrue("prefetch extension back to 0", + subscriptions.stream(). + filter(s -> s instanceof QueueSubscription). + mapToInt(s -> ((QueueSubscription)s).getPrefetchExtension().get()). + allMatch(e -> e == 0)); + } + public void initCombosForTestDurableConsumerSelectorChange() { addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java index 89fb25b232..e50e35ff2b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java @@ -51,4 +51,10 @@ public class JMSXAConsumerTest extends JMSConsumerTest { // needs client ack, xa is auto ack if no transaction public void testExceptionOnClientAckAfterConsumerClose() throws Exception { } + + public void testReceiveTopicWithPrefetch1() throws Exception { + } + + public void testReceiveQueueWithPrefetch1() throws Exception { + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java index 9b5354225d..3a7924e3a6 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.usecases; import junit.framework.Test; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.DestinationStatistics; @@ -98,46 +99,76 @@ public class ExpiredMessagesTest extends CombinationTestSupport { verifyDestinationDlq(destination, numMessagesToSend, view); } - public void testExpiredMessages_onTopic_withPrefetchExtension() throws Exception { - final ActiveMQDestination destination = new ActiveMQTopic("test"); - final int numMessagesToSend = 10000; - + public void testClientAckInflight_onTopic_withPrefetchExtension() throws Exception { usePrefetchExtension = true; + doTestClientAckInflight_onTopic_checkPrefetchExtension(); + } + public void testClientAckInflight_onTopic_withOutPrefetchExtension() throws Exception { + usePrefetchExtension = false; + doTestClientAckInflight_onTopic_checkPrefetchExtension(); + } + + public void doTestClientAckInflight_onTopic_checkPrefetchExtension() throws Exception { + final ActiveMQDestination destination = new ActiveMQTopic("test"); buildBroker(destination); - verifyMessageExpirationOnDestination(destination, numMessagesToSend); - // We don't check the DLQ because non-persistent messages on topics are discarded instead. + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + "failover://"+brokerUri); + ActiveMQPrefetchPolicy prefetchTwo = new ActiveMQPrefetchPolicy(); + prefetchTwo.setAll(6); + factory.setPrefetchPolicy(prefetchTwo); + connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(destination); + + produce(10, destination); + + Message m = null; + for (int i=0; i<5; i++) { + m = consumer.receive(4000); + } + assertNotNull(m); final List subscriptions = getDestinationConsumers(broker, destination); assertTrue("prefetch extension was not incremented", - subscriptions.stream(). - filter(s -> s instanceof TopicSubscription). - mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()). - allMatch(e -> e > 0)); - } + subscriptions.stream(). + filter(s -> s instanceof TopicSubscription). + mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()). + allMatch(e -> usePrefetchExtension ? e > 1 : e == 0)); - public void testExpiredMessages_onTopic_withoutPrefetchExtension() throws Exception { - final ActiveMQDestination destination = new ActiveMQTopic("test"); - final int numMessagesToSend = 10000; + m.acknowledge(); - usePrefetchExtension = false; - - buildBroker(destination); - - verifyMessageExpirationOnDestination(destination, numMessagesToSend); - // We don't check the DLQ because non-persistent messages on topics are discarded instead. - - final List subscriptions = getDestinationConsumers(broker, destination); - - assertTrue("prefetch extension was incremented", + assertTrue("prefetch extension was not incremented", subscriptions.stream(). filter(s -> s instanceof TopicSubscription). mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()). allMatch(e -> e == 0)); + } + private void produce(int num, ActiveMQDestination destination) throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + "failover://"+brokerUri); + Connection connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + int i = 0; + while (i++ < num) { + Message message = useTextMessage ? session + .createTextMessage("test") : session + .createObjectMessage("test"); + producer.send(message); + } + connection.close(); + } + + private void buildBroker(ActiveMQDestination destination) throws Exception { broker = createBroker(deleteAllMessages, usePrefetchExtension, 100, destination); brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();