diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index 92d1c0d39a..d22801c935 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -227,6 +227,11 @@ public abstract class AbstractSubscription implements Subscription { return info != null && info.isBrowser(); } + @Override + public long getInFlightMessageSize() { + return subscriptionStatistics.getInflightMessageSize().getTotalSize(); + } + @Override public int getInFlightUsage() { if (info.getPrefetchSize() > 0) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 12c418a8a2..0107c589dc 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -237,6 +237,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us savedDispateched = new ArrayList(dispatched); } dispatched.clear(); + getSubscriptionStatistics().getInflightMessageSize().reset(); } if (!keepDurableSubsActive && pending.isTransient()) { try { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 2a63c33540..d148e80d04 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -176,6 +176,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { pending.remove(); createMessageDispatch(node, node.getMessage()); dispatched.add(node); + getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); onDispatch(node, node.getMessage()); } return; @@ -240,6 +241,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } for (final MessageReference node : removeList) { dispatched.remove(node); + getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); } // this only happens after a reconnect - get an ack which is not // valid @@ -257,6 +259,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { getSubscriptionStatistics().getDequeues().increment(); ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); dispatched.remove(node); + getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); } else { registerRemoveSync(context, node); } @@ -379,6 +382,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } for (final MessageReference node : removeList) { dispatched.remove(node); + getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); } if (!callDispatchMatched) { throw new JMSException( @@ -427,6 +431,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { synchronized(dispatchLock) { getSubscriptionStatistics().getDequeues().increment(); dispatched.remove(node); + getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); nodeDest.getDestinationStatistics().getInflight().decrement(); } nodeDest.wakeup(); @@ -620,6 +625,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { for (MessageReference r : dispatched) { if (r.getRegionDestination() == destination) { references.add(r); + getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize()); } } rc.addAll(references); @@ -697,6 +703,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { if (node != QueueMessageReference.NULL_MESSAGE) { getSubscriptionStatistics().getDispatched().increment(); dispatched.add(node); + getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); } if (getPrefetchSize() == 0) { while (true) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java index ec37512b43..2c8afedd88 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -196,6 +196,11 @@ public interface Subscription extends SubscriptionRecovery { */ int getInFlightSize(); + /** + * @return the size in bytes of the messages awaiting acknowledgement + */ + long getInFlightMessageSize(); + /** * @return the in flight messages as a percentage of the prefetch size */ diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java index 09fab8a0fd..d6a276e5d1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region; import org.apache.activemq.management.CountStatisticImpl; +import org.apache.activemq.management.SizeStatisticImpl; import org.apache.activemq.management.StatsImpl; /** @@ -29,6 +30,7 @@ public class SubscriptionStatistics extends StatsImpl { protected CountStatisticImpl enqueues; protected CountStatisticImpl dequeues; protected CountStatisticImpl dispatched; + protected SizeStatisticImpl inflightMessageSize; public SubscriptionStatistics() { @@ -41,11 +43,13 @@ public class SubscriptionStatistics extends StatsImpl { enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the subscription"); dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the subscription"); dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the subscription"); + inflightMessageSize = new SizeStatisticImpl("inflightMessageSize", "The size in bytes of messages dispatched but awaiting acknowledgement"); addStatistic("consumedCount", consumedCount); addStatistic("enqueues", enqueues); addStatistic("dispatched", dispatched); addStatistic("dequeues", dequeues); + addStatistic("inflightMessageSize", inflightMessageSize); this.setEnabled(enabled); } @@ -66,6 +70,10 @@ public class SubscriptionStatistics extends StatsImpl { return dispatched; } + public SizeStatisticImpl getInflightMessageSize() { + return inflightMessageSize; + } + public void reset() { if (this.isDoReset()) { super.reset(); @@ -73,6 +81,7 @@ public class SubscriptionStatistics extends StatsImpl { enqueues.reset(); dequeues.reset(); dispatched.reset(); + inflightMessageSize.reset(); } } @@ -82,6 +91,7 @@ public class SubscriptionStatistics extends StatsImpl { enqueues.setEnabled(enabled); dispatched.setEnabled(enabled); dequeues.setEnabled(enabled); + inflightMessageSize.setEnabled(enabled); } public void setParent(SubscriptionStatistics parent) { @@ -90,11 +100,13 @@ public class SubscriptionStatistics extends StatsImpl { enqueues.setParent(parent.enqueues); dispatched.setParent(parent.dispatched); dequeues.setParent(parent.dequeues); + inflightMessageSize.setParent(parent.inflightMessageSize); } else { consumedCount.setParent(null); enqueues.setParent(null); dispatched.setParent(null); dequeues.setParent(null); + inflightMessageSize.setParent(null); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index c59c35973b..17a3137f4b 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -17,7 +17,11 @@ package org.apache.activemq.broker.region; import java.io.IOException; +import java.util.Comparator; +import java.util.Iterator; import java.util.LinkedList; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -37,6 +41,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.Response; import org.apache.activemq.thread.Scheduler; @@ -71,6 +76,20 @@ public class TopicSubscription extends AbstractSubscription { protected boolean active = false; protected boolean discarding = false; + + /** + * This Map is used to keep track of messages that have been dispatched in sorted order to + * optimize message acknowledgement + */ + private NavigableMap dispatched = new ConcurrentSkipListMap<>( + new Comparator() { + @Override + public int compare(MessageId m1, MessageId m2) { + return m1 == null ? (m2 == null ? 0 : -1) : (m2 == null ? 1 + : Long.compare(m1.getBrokerSequenceId(), m2.getBrokerSequenceId())); + } + }); + public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { super(broker, context, info); this.usageManager = usageManager; @@ -250,6 +269,8 @@ public class TopicSubscription extends AbstractSubscription { if (node.getMessageId().equals(mdn.getMessageId())) { matched.remove(); getSubscriptionStatistics().getDispatched().increment(); + dispatched.put(node.getMessageId(), node); + getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); node.decrementReferenceCount(); break; } @@ -277,6 +298,7 @@ public class TopicSubscription extends AbstractSubscription { } } getSubscriptionStatistics().getDequeues().add(ack.getMessageCount()); + updateInflightMessageSizeOnAck(ack); dispatchMatched(); } }); @@ -289,6 +311,7 @@ public class TopicSubscription extends AbstractSubscription { } } getSubscriptionStatistics().getDequeues().add(ack.getMessageCount()); + updateInflightMessageSizeOnAck(ack); } while (true) { int currentExtension = prefetchExtension.get(); @@ -379,6 +402,27 @@ public class TopicSubscription extends AbstractSubscription { } } + /** + * Update the inflight statistics on message ack. Since a message ack could be a range, + * we need to grab a subtree of the dispatched map to acknowledge messages. Finding the + * subMap is an O(log n) operation. + * @param ack + */ + private void updateInflightMessageSizeOnAck(final MessageAck ack) { + if (ack.getFirstMessageId() != null) { + NavigableMap acked = dispatched + .subMap(ack.getFirstMessageId(), true, ack.getLastMessageId(), true); + Iterator i = acked.keySet().iterator(); + while (i.hasNext()) { + getSubscriptionStatistics().getInflightMessageSize().addSize(-acked.get(i.next()).getSize()); + i.remove(); + } + } else { + getSubscriptionStatistics().getInflightMessageSize().addSize(-dispatched.get(ack.getLastMessageId()).getSize()); + dispatched.remove(ack.getLastMessageId()); + } + } + @Override public int countBeforeFull() { return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize(); @@ -602,6 +646,8 @@ public class TopicSubscription extends AbstractSubscription { if (node != null) { md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination()); getSubscriptionStatistics().getDispatched().increment(); + dispatched.put(node.getMessageId(), node); + getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); // Keep track if this subscription is receiving messages from a single destination. if (singleDestination) { if (destination == null) { @@ -683,6 +729,7 @@ public class TopicSubscription extends AbstractSubscription { } } setSlowConsumer(false); + dispatched.clear(); } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java index d692d03c9e..99382d0e0f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -366,6 +366,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase { public SubscriptionStatistics getSubscriptionStatistics() { return subscriptionStatistics; } + + @Override + public long getInFlightMessageSize() { + return subscriptionStatistics.getInflightMessageSize().getTotalSize(); + } }; queue.addSubscription(contextNotInTx, subscription); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java index 50c21363e0..2541a6466c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java @@ -376,5 +376,10 @@ public class SubscriptionAddRemoveQueueTest extends TestCase { return subscriptionStatistics; } + @Override + public long getInFlightMessageSize() { + return subscriptionStatistics.getInflightMessageSize().getTotalSize(); + } + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java new file mode 100644 index 0000000000..07784e7a37 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.statistics; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.runners.Parameterized.Parameters; + +/** + * This test shows Inflight Message sizes are correct for various acknowledgement modes. + */ +public abstract class AbstractInflightMessageSizeTest { + + protected BrokerService brokerService; + protected Connection connection; + protected String brokerUrlString; + protected Session session; + protected javax.jms.Destination dest; + protected Destination amqDestination; + protected MessageConsumer consumer; + protected int prefetch = 100; + final protected int ackType; + final protected boolean optimizeAcknowledge; + final protected String destName = "testDest"; + + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + {ActiveMQSession.SESSION_TRANSACTED, true}, + {ActiveMQSession.AUTO_ACKNOWLEDGE, true}, + {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true}, + {ActiveMQSession.CLIENT_ACKNOWLEDGE, true}, + {ActiveMQSession.SESSION_TRANSACTED, false}, + {ActiveMQSession.AUTO_ACKNOWLEDGE, false}, + {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false}, + {ActiveMQSession.CLIENT_ACKNOWLEDGE, false} + }); + } + + public AbstractInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) { + this.ackType = ackType; + this.optimizeAcknowledge = optimizeAcknowledge; + } + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + TransportConnector tcp = brokerService + .addConnector("tcp://localhost:0"); + brokerService.start(); + //used to test optimizeAcknowledge works + String optAckString = optimizeAcknowledge ? "?jms.optimizeAcknowledge=true&jms.optimizedAckScheduledAckInterval=2000" : ""; + brokerUrlString = tcp.getPublishableConnectString() + optAckString; + connection = createConnectionFactory().createConnection(); + connection.setClientID("client1"); + connection.start(); + session = connection.createSession(ackType == ActiveMQSession.SESSION_TRANSACTED, ackType); + dest = getDestination(); + consumer = getMessageConsumer(); + amqDestination = TestSupport.getDestination(brokerService, getActiveMQDestination()); + } + + protected ActiveMQConnectionFactory createConnectionFactory() + throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlString); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setTopicPrefetch(prefetch); + prefetchPolicy.setQueuePrefetch(prefetch); + prefetchPolicy.setOptimizeDurableTopicPrefetch(prefetch); + factory.setPrefetchPolicy(prefetchPolicy); + return factory; + } + + @After + public void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + brokerService.stop(); + } + + /** + * Tests that inflight message size goes up and comes back down to 0 after + * messages are consumed + * + * @throws javax.jms.JMSException + * @throws InterruptedException + */ + @Test(timeout=15000) + public void testInflightMessageSize() throws Exception { + final long size = sendMessages(10); + + assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getSubscription().getInFlightMessageSize() > size; + } + })); + + receiveMessages(10); + + assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getSubscription().getInFlightMessageSize() == 0; + } + })); + } + + /** + * Test that the in flight message size won't rise after prefetch is filled + * + * @throws Exception + */ + @Test(timeout=15000) + public void testInflightMessageSizePrefetchFilled() throws Exception { + final long size = sendMessages(prefetch); + + assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getSubscription().getInFlightMessageSize() > size; + } + })); + + final long inFlightSize = getSubscription().getInFlightMessageSize(); + sendMessages(10); + + //Prefetch has been filled, so the size should not change with 10 more messages + assertEquals("Inflight message size should not change", inFlightSize, getSubscription().getInFlightMessageSize()); + + receiveMessages(prefetch + 10); + + assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getSubscription().getInFlightMessageSize() == 0; + } + })); + } + + /** + * Test that the in flight message size will still rise if prefetch is not filled + * + * @throws Exception + */ + @Test(timeout=15000) + public void testInflightMessageSizePrefetchNotFilled() throws Exception { + final long size = sendMessages(prefetch - 10); + + assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getSubscription().getInFlightMessageSize() > size; + } + })); + + //capture the inflight size and send 10 more messages + final long inFlightSize = getSubscription().getInFlightMessageSize(); + sendMessages(10); + + //Prefetch has NOT been filled, so the size should rise with 10 more messages + assertTrue("Inflight message size should be greater than previous inlight size", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getSubscription().getInFlightMessageSize() > inFlightSize; + } + })); + + receiveMessages(prefetch); + + assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getSubscription().getInFlightMessageSize() == 0; + } + })); + } + + + /** + * Tests that inflight message size goes up and doesn't go down if receive is rolledback + * + * @throws javax.jms.JMSException + * @throws InterruptedException + */ + @Test(timeout=15000) + public void testInflightMessageSizeRollback() throws Exception { + Assume.assumeTrue(ackType == ActiveMQSession.SESSION_TRANSACTED); + + final long size = sendMessages(10); + + assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getSubscription().getInFlightMessageSize() > size; + } + })); + + long inFlightSize = getSubscription().getInFlightMessageSize(); + + for (int i = 0; i < 10; i++) { + consumer.receive(); + } + session.rollback(); + + assertEquals("Inflight message size should not change on rollback", inFlightSize, getSubscription().getInFlightMessageSize()); + } + + /** + * This method will generate random sized messages up to 150000 bytes. + * + * @param count + * @throws JMSException + */ + protected long sendMessages(int count) throws JMSException { + MessageProducer producer = session.createProducer(dest); + long totalSize = 0; + for (int i = 0; i < count; i++) { + Random r = new Random(); + int size = r.nextInt(150000); + totalSize += size; + byte[] bytes = new byte[size > 0 ? size : 1]; + r.nextBytes(bytes); + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(bytes); + producer.send(bytesMessage); + } + if (session.getTransacted()) { + session.commit(); + } + return totalSize; + } + + protected void receiveMessages(int count) throws JMSException { + for (int i = 0; i < count; i++) { + javax.jms.Message m = consumer.receive(); + if (ackType == ActiveMQSession.SESSION_TRANSACTED) { + session.commit(); + } else if (ackType != ActiveMQSession.AUTO_ACKNOWLEDGE) { + m.acknowledge(); + } + } + } + + protected abstract Subscription getSubscription(); + + protected abstract ActiveMQDestination getActiveMQDestination(); + + protected abstract MessageConsumer getMessageConsumer() throws JMSException; + + protected abstract javax.jms.Destination getDestination() throws JMSException ; + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java new file mode 100644 index 0000000000..29d6cb79a5 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.statistics; + +import javax.jms.JMSException; +import javax.jms.MessageConsumer; + +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.SubscriptionKey; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * This test shows Inflight Message sizes are correct for various acknowledgement modes + * using a DurableSubscription + */ +@RunWith(Parameterized.class) +public class DurableSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest { + + public DurableSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) { + super(ackType, optimizeAcknowledge); + } + + @Override + protected MessageConsumer getMessageConsumer() throws JMSException { + return session.createDurableSubscriber((javax.jms.Topic)dest, "sub1"); + } + + @Override + protected Subscription getSubscription() { + return ((Topic)amqDestination).getDurableTopicSubs().get(new SubscriptionKey("client1", "sub1")); + } + + @Override + protected javax.jms.Topic getDestination() throws JMSException { + return session.createTopic(destName); + } + + @Override + protected ActiveMQDestination getActiveMQDestination() { + return new ActiveMQTopic(destName); + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java new file mode 100644 index 0000000000..84ddc71c27 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.statistics; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; + +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * This test shows Inflight Message sizes are correct for various acknowledgement modes + * using a QueueSubscription + */ +@RunWith(Parameterized.class) +public class QueueSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest { + + public QueueSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) { + super(ackType, optimizeAcknowledge); + } + + @Override + protected MessageConsumer getMessageConsumer() throws JMSException { + return session.createConsumer(dest); + } + + @Override + protected Subscription getSubscription() { + return ((Queue)amqDestination).getConsumers().get(0); + } + + @Override + protected Destination getDestination() throws JMSException { + return session.createQueue(destName); + } + + @Override + protected ActiveMQDestination getActiveMQDestination() { + return new ActiveMQQueue(destName); + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java new file mode 100644 index 0000000000..797d409a56 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.statistics; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; + +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * This test shows Inflight Message sizes are correct for various acknowledgement modes + * using a TopicSubscription + */ +@RunWith(Parameterized.class) +public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest { + + public TopicSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) { + super(ackType, optimizeAcknowledge); + } + + @Override + protected MessageConsumer getMessageConsumer() throws JMSException { + return session.createConsumer(dest); + } + + @Override + protected Subscription getSubscription() { + return amqDestination.getConsumers().get(0); + } + + @Override + protected Destination getDestination() throws JMSException { + return session.createTopic(destName); + } + + @Override + protected ActiveMQDestination getActiveMQDestination() { + return new ActiveMQTopic(destName); + } + +}