This commit adds an Inflight message size statistic to SubscriptionStatistics
so we can know the size of all the messages that are inflight, besides just the count.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-06-01 18:12:22 +00:00 committed by Timothy Bish
parent 2b320ac065
commit 46055034c9
12 changed files with 564 additions and 0 deletions

View File

@ -227,6 +227,11 @@ public abstract class AbstractSubscription implements Subscription {
return info != null && info.isBrowser(); return info != null && info.isBrowser();
} }
@Override
public long getInFlightMessageSize() {
return subscriptionStatistics.getInflightMessageSize().getTotalSize();
}
@Override @Override
public int getInFlightUsage() { public int getInFlightUsage() {
if (info.getPrefetchSize() > 0) { if (info.getPrefetchSize() > 0) {

View File

@ -237,6 +237,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
savedDispateched = new ArrayList<MessageReference>(dispatched); savedDispateched = new ArrayList<MessageReference>(dispatched);
} }
dispatched.clear(); dispatched.clear();
getSubscriptionStatistics().getInflightMessageSize().reset();
} }
if (!keepDurableSubsActive && pending.isTransient()) { if (!keepDurableSubsActive && pending.isTransient()) {
try { try {

View File

@ -176,6 +176,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
pending.remove(); pending.remove();
createMessageDispatch(node, node.getMessage()); createMessageDispatch(node, node.getMessage());
dispatched.add(node); dispatched.add(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
onDispatch(node, node.getMessage()); onDispatch(node, node.getMessage());
} }
return; return;
@ -240,6 +241,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
for (final MessageReference node : removeList) { for (final MessageReference node : removeList) {
dispatched.remove(node); dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
} }
// this only happens after a reconnect - get an ack which is not // this only happens after a reconnect - get an ack which is not
// valid // valid
@ -257,6 +259,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
getSubscriptionStatistics().getDequeues().increment(); getSubscriptionStatistics().getDequeues().increment();
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
dispatched.remove(node); dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
} else { } else {
registerRemoveSync(context, node); registerRemoveSync(context, node);
} }
@ -379,6 +382,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
for (final MessageReference node : removeList) { for (final MessageReference node : removeList) {
dispatched.remove(node); dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
} }
if (!callDispatchMatched) { if (!callDispatchMatched) {
throw new JMSException( throw new JMSException(
@ -427,6 +431,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
synchronized(dispatchLock) { synchronized(dispatchLock) {
getSubscriptionStatistics().getDequeues().increment(); getSubscriptionStatistics().getDequeues().increment();
dispatched.remove(node); dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
nodeDest.getDestinationStatistics().getInflight().decrement(); nodeDest.getDestinationStatistics().getInflight().decrement();
} }
nodeDest.wakeup(); nodeDest.wakeup();
@ -620,6 +625,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
for (MessageReference r : dispatched) { for (MessageReference r : dispatched) {
if (r.getRegionDestination() == destination) { if (r.getRegionDestination() == destination) {
references.add(r); references.add(r);
getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
} }
} }
rc.addAll(references); rc.addAll(references);
@ -697,6 +703,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (node != QueueMessageReference.NULL_MESSAGE) { if (node != QueueMessageReference.NULL_MESSAGE) {
getSubscriptionStatistics().getDispatched().increment(); getSubscriptionStatistics().getDispatched().increment();
dispatched.add(node); dispatched.add(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
} }
if (getPrefetchSize() == 0) { if (getPrefetchSize() == 0) {
while (true) { while (true) {

View File

@ -196,6 +196,11 @@ public interface Subscription extends SubscriptionRecovery {
*/ */
int getInFlightSize(); int getInFlightSize();
/**
* @return the size in bytes of the messages awaiting acknowledgement
*/
long getInFlightMessageSize();
/** /**
* @return the in flight messages as a percentage of the prefetch size * @return the in flight messages as a percentage of the prefetch size
*/ */

View File

@ -18,6 +18,7 @@
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.SizeStatisticImpl;
import org.apache.activemq.management.StatsImpl; import org.apache.activemq.management.StatsImpl;
/** /**
@ -29,6 +30,7 @@ public class SubscriptionStatistics extends StatsImpl {
protected CountStatisticImpl enqueues; protected CountStatisticImpl enqueues;
protected CountStatisticImpl dequeues; protected CountStatisticImpl dequeues;
protected CountStatisticImpl dispatched; protected CountStatisticImpl dispatched;
protected SizeStatisticImpl inflightMessageSize;
public SubscriptionStatistics() { public SubscriptionStatistics() {
@ -41,11 +43,13 @@ public class SubscriptionStatistics extends StatsImpl {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the subscription"); enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the subscription");
dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the subscription"); dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the subscription");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the subscription"); dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the subscription");
inflightMessageSize = new SizeStatisticImpl("inflightMessageSize", "The size in bytes of messages dispatched but awaiting acknowledgement");
addStatistic("consumedCount", consumedCount); addStatistic("consumedCount", consumedCount);
addStatistic("enqueues", enqueues); addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched); addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues); addStatistic("dequeues", dequeues);
addStatistic("inflightMessageSize", inflightMessageSize);
this.setEnabled(enabled); this.setEnabled(enabled);
} }
@ -66,6 +70,10 @@ public class SubscriptionStatistics extends StatsImpl {
return dispatched; return dispatched;
} }
public SizeStatisticImpl getInflightMessageSize() {
return inflightMessageSize;
}
public void reset() { public void reset() {
if (this.isDoReset()) { if (this.isDoReset()) {
super.reset(); super.reset();
@ -73,6 +81,7 @@ public class SubscriptionStatistics extends StatsImpl {
enqueues.reset(); enqueues.reset();
dequeues.reset(); dequeues.reset();
dispatched.reset(); dispatched.reset();
inflightMessageSize.reset();
} }
} }
@ -82,6 +91,7 @@ public class SubscriptionStatistics extends StatsImpl {
enqueues.setEnabled(enabled); enqueues.setEnabled(enabled);
dispatched.setEnabled(enabled); dispatched.setEnabled(enabled);
dequeues.setEnabled(enabled); dequeues.setEnabled(enabled);
inflightMessageSize.setEnabled(enabled);
} }
public void setParent(SubscriptionStatistics parent) { public void setParent(SubscriptionStatistics parent) {
@ -90,11 +100,13 @@ public class SubscriptionStatistics extends StatsImpl {
enqueues.setParent(parent.enqueues); enqueues.setParent(parent.enqueues);
dispatched.setParent(parent.dispatched); dispatched.setParent(parent.dispatched);
dequeues.setParent(parent.dequeues); dequeues.setParent(parent.dequeues);
inflightMessageSize.setParent(parent.inflightMessageSize);
} else { } else {
consumedCount.setParent(null); consumedCount.setParent(null);
enqueues.setParent(null); enqueues.setParent(null);
dispatched.setParent(null); dispatched.setParent(null);
dequeues.setParent(null); dequeues.setParent(null);
inflightMessageSize.setParent(null);
} }
} }

View File

@ -17,7 +17,11 @@
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -37,6 +41,7 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Scheduler;
@ -71,6 +76,20 @@ public class TopicSubscription extends AbstractSubscription {
protected boolean active = false; protected boolean active = false;
protected boolean discarding = false; protected boolean discarding = false;
/**
* This Map is used to keep track of messages that have been dispatched in sorted order to
* optimize message acknowledgement
*/
private NavigableMap<MessageId, MessageReference> dispatched = new ConcurrentSkipListMap<>(
new Comparator<MessageId>() {
@Override
public int compare(MessageId m1, MessageId m2) {
return m1 == null ? (m2 == null ? 0 : -1) : (m2 == null ? 1
: Long.compare(m1.getBrokerSequenceId(), m2.getBrokerSequenceId()));
}
});
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info); super(broker, context, info);
this.usageManager = usageManager; this.usageManager = usageManager;
@ -250,6 +269,8 @@ public class TopicSubscription extends AbstractSubscription {
if (node.getMessageId().equals(mdn.getMessageId())) { if (node.getMessageId().equals(mdn.getMessageId())) {
matched.remove(); matched.remove();
getSubscriptionStatistics().getDispatched().increment(); getSubscriptionStatistics().getDispatched().increment();
dispatched.put(node.getMessageId(), node);
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
node.decrementReferenceCount(); node.decrementReferenceCount();
break; break;
} }
@ -277,6 +298,7 @@ public class TopicSubscription extends AbstractSubscription {
} }
} }
getSubscriptionStatistics().getDequeues().add(ack.getMessageCount()); getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
updateInflightMessageSizeOnAck(ack);
dispatchMatched(); dispatchMatched();
} }
}); });
@ -289,6 +311,7 @@ public class TopicSubscription extends AbstractSubscription {
} }
} }
getSubscriptionStatistics().getDequeues().add(ack.getMessageCount()); getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
updateInflightMessageSizeOnAck(ack);
} }
while (true) { while (true) {
int currentExtension = prefetchExtension.get(); int currentExtension = prefetchExtension.get();
@ -379,6 +402,27 @@ public class TopicSubscription extends AbstractSubscription {
} }
} }
/**
* Update the inflight statistics on message ack. Since a message ack could be a range,
* we need to grab a subtree of the dispatched map to acknowledge messages. Finding the
* subMap is an O(log n) operation.
* @param ack
*/
private void updateInflightMessageSizeOnAck(final MessageAck ack) {
if (ack.getFirstMessageId() != null) {
NavigableMap<MessageId, MessageReference> acked = dispatched
.subMap(ack.getFirstMessageId(), true, ack.getLastMessageId(), true);
Iterator<MessageId> i = acked.keySet().iterator();
while (i.hasNext()) {
getSubscriptionStatistics().getInflightMessageSize().addSize(-acked.get(i.next()).getSize());
i.remove();
}
} else {
getSubscriptionStatistics().getInflightMessageSize().addSize(-dispatched.get(ack.getLastMessageId()).getSize());
dispatched.remove(ack.getLastMessageId());
}
}
@Override @Override
public int countBeforeFull() { public int countBeforeFull() {
return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize(); return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize();
@ -602,6 +646,8 @@ public class TopicSubscription extends AbstractSubscription {
if (node != null) { if (node != null) {
md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination()); md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
getSubscriptionStatistics().getDispatched().increment(); getSubscriptionStatistics().getDispatched().increment();
dispatched.put(node.getMessageId(), node);
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
// Keep track if this subscription is receiving messages from a single destination. // Keep track if this subscription is receiving messages from a single destination.
if (singleDestination) { if (singleDestination) {
if (destination == null) { if (destination == null) {
@ -683,6 +729,7 @@ public class TopicSubscription extends AbstractSubscription {
} }
} }
setSlowConsumer(false); setSlowConsumer(false);
dispatched.clear();
} }
@Override @Override

View File

@ -366,6 +366,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
public SubscriptionStatistics getSubscriptionStatistics() { public SubscriptionStatistics getSubscriptionStatistics() {
return subscriptionStatistics; return subscriptionStatistics;
} }
@Override
public long getInFlightMessageSize() {
return subscriptionStatistics.getInflightMessageSize().getTotalSize();
}
}; };
queue.addSubscription(contextNotInTx, subscription); queue.addSubscription(contextNotInTx, subscription);

View File

@ -376,5 +376,10 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
return subscriptionStatistics; return subscriptionStatistics;
} }
@Override
public long getInFlightMessageSize() {
return subscriptionStatistics.getInflightMessageSize().getTotalSize();
}
} }
} }

View File

@ -0,0 +1,295 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.statistics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized.Parameters;
/**
* This test shows Inflight Message sizes are correct for various acknowledgement modes.
*/
public abstract class AbstractInflightMessageSizeTest {
protected BrokerService brokerService;
protected Connection connection;
protected String brokerUrlString;
protected Session session;
protected javax.jms.Destination dest;
protected Destination amqDestination;
protected MessageConsumer consumer;
protected int prefetch = 100;
final protected int ackType;
final protected boolean optimizeAcknowledge;
final protected String destName = "testDest";
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{ActiveMQSession.SESSION_TRANSACTED, true},
{ActiveMQSession.AUTO_ACKNOWLEDGE, true},
{ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true},
{ActiveMQSession.CLIENT_ACKNOWLEDGE, true},
{ActiveMQSession.SESSION_TRANSACTED, false},
{ActiveMQSession.AUTO_ACKNOWLEDGE, false},
{ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false},
{ActiveMQSession.CLIENT_ACKNOWLEDGE, false}
});
}
public AbstractInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
this.ackType = ackType;
this.optimizeAcknowledge = optimizeAcknowledge;
}
@Before
public void setUp() throws Exception {
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true);
TransportConnector tcp = brokerService
.addConnector("tcp://localhost:0");
brokerService.start();
//used to test optimizeAcknowledge works
String optAckString = optimizeAcknowledge ? "?jms.optimizeAcknowledge=true&jms.optimizedAckScheduledAckInterval=2000" : "";
brokerUrlString = tcp.getPublishableConnectString() + optAckString;
connection = createConnectionFactory().createConnection();
connection.setClientID("client1");
connection.start();
session = connection.createSession(ackType == ActiveMQSession.SESSION_TRANSACTED, ackType);
dest = getDestination();
consumer = getMessageConsumer();
amqDestination = TestSupport.getDestination(brokerService, getActiveMQDestination());
}
protected ActiveMQConnectionFactory createConnectionFactory()
throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlString);
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setTopicPrefetch(prefetch);
prefetchPolicy.setQueuePrefetch(prefetch);
prefetchPolicy.setOptimizeDurableTopicPrefetch(prefetch);
factory.setPrefetchPolicy(prefetchPolicy);
return factory;
}
@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
brokerService.stop();
}
/**
* Tests that inflight message size goes up and comes back down to 0 after
* messages are consumed
*
* @throws javax.jms.JMSException
* @throws InterruptedException
*/
@Test(timeout=15000)
public void testInflightMessageSize() throws Exception {
final long size = sendMessages(10);
assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() > size;
}
}));
receiveMessages(10);
assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() == 0;
}
}));
}
/**
* Test that the in flight message size won't rise after prefetch is filled
*
* @throws Exception
*/
@Test(timeout=15000)
public void testInflightMessageSizePrefetchFilled() throws Exception {
final long size = sendMessages(prefetch);
assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() > size;
}
}));
final long inFlightSize = getSubscription().getInFlightMessageSize();
sendMessages(10);
//Prefetch has been filled, so the size should not change with 10 more messages
assertEquals("Inflight message size should not change", inFlightSize, getSubscription().getInFlightMessageSize());
receiveMessages(prefetch + 10);
assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() == 0;
}
}));
}
/**
* Test that the in flight message size will still rise if prefetch is not filled
*
* @throws Exception
*/
@Test(timeout=15000)
public void testInflightMessageSizePrefetchNotFilled() throws Exception {
final long size = sendMessages(prefetch - 10);
assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() > size;
}
}));
//capture the inflight size and send 10 more messages
final long inFlightSize = getSubscription().getInFlightMessageSize();
sendMessages(10);
//Prefetch has NOT been filled, so the size should rise with 10 more messages
assertTrue("Inflight message size should be greater than previous inlight size", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() > inFlightSize;
}
}));
receiveMessages(prefetch);
assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() == 0;
}
}));
}
/**
* Tests that inflight message size goes up and doesn't go down if receive is rolledback
*
* @throws javax.jms.JMSException
* @throws InterruptedException
*/
@Test(timeout=15000)
public void testInflightMessageSizeRollback() throws Exception {
Assume.assumeTrue(ackType == ActiveMQSession.SESSION_TRANSACTED);
final long size = sendMessages(10);
assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() > size;
}
}));
long inFlightSize = getSubscription().getInFlightMessageSize();
for (int i = 0; i < 10; i++) {
consumer.receive();
}
session.rollback();
assertEquals("Inflight message size should not change on rollback", inFlightSize, getSubscription().getInFlightMessageSize());
}
/**
* This method will generate random sized messages up to 150000 bytes.
*
* @param count
* @throws JMSException
*/
protected long sendMessages(int count) throws JMSException {
MessageProducer producer = session.createProducer(dest);
long totalSize = 0;
for (int i = 0; i < count; i++) {
Random r = new Random();
int size = r.nextInt(150000);
totalSize += size;
byte[] bytes = new byte[size > 0 ? size : 1];
r.nextBytes(bytes);
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(bytes);
producer.send(bytesMessage);
}
if (session.getTransacted()) {
session.commit();
}
return totalSize;
}
protected void receiveMessages(int count) throws JMSException {
for (int i = 0; i < count; i++) {
javax.jms.Message m = consumer.receive();
if (ackType == ActiveMQSession.SESSION_TRANSACTED) {
session.commit();
} else if (ackType != ActiveMQSession.AUTO_ACKNOWLEDGE) {
m.acknowledge();
}
}
}
protected abstract Subscription getSubscription();
protected abstract ActiveMQDestination getActiveMQDestination();
protected abstract MessageConsumer getMessageConsumer() throws JMSException;
protected abstract javax.jms.Destination getDestination() throws JMSException ;
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.statistics;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.SubscriptionKey;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* This test shows Inflight Message sizes are correct for various acknowledgement modes
* using a DurableSubscription
*/
@RunWith(Parameterized.class)
public class DurableSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
public DurableSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
super(ackType, optimizeAcknowledge);
}
@Override
protected MessageConsumer getMessageConsumer() throws JMSException {
return session.createDurableSubscriber((javax.jms.Topic)dest, "sub1");
}
@Override
protected Subscription getSubscription() {
return ((Topic)amqDestination).getDurableTopicSubs().get(new SubscriptionKey("client1", "sub1"));
}
@Override
protected javax.jms.Topic getDestination() throws JMSException {
return session.createTopic(destName);
}
@Override
protected ActiveMQDestination getActiveMQDestination() {
return new ActiveMQTopic(destName);
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.statistics;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* This test shows Inflight Message sizes are correct for various acknowledgement modes
* using a QueueSubscription
*/
@RunWith(Parameterized.class)
public class QueueSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
public QueueSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
super(ackType, optimizeAcknowledge);
}
@Override
protected MessageConsumer getMessageConsumer() throws JMSException {
return session.createConsumer(dest);
}
@Override
protected Subscription getSubscription() {
return ((Queue)amqDestination).getConsumers().get(0);
}
@Override
protected Destination getDestination() throws JMSException {
return session.createQueue(destName);
}
@Override
protected ActiveMQDestination getActiveMQDestination() {
return new ActiveMQQueue(destName);
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.statistics;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* This test shows Inflight Message sizes are correct for various acknowledgement modes
* using a TopicSubscription
*/
@RunWith(Parameterized.class)
public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
public TopicSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
super(ackType, optimizeAcknowledge);
}
@Override
protected MessageConsumer getMessageConsumer() throws JMSException {
return session.createConsumer(dest);
}
@Override
protected Subscription getSubscription() {
return amqDestination.getConsumers().get(0);
}
@Override
protected Destination getDestination() throws JMSException {
return session.createTopic(destName);
}
@Override
protected ActiveMQDestination getActiveMQDestination() {
return new ActiveMQTopic(destName);
}
}