This closes #113

This commit is contained in:
Timothy Bish 2015-07-07 17:02:31 -04:00
commit 1dcdf69f3e
12 changed files with 564 additions and 0 deletions

View File

@ -227,6 +227,11 @@ public abstract class AbstractSubscription implements Subscription {
return info != null && info.isBrowser();
}
@Override
public long getInFlightMessageSize() {
return subscriptionStatistics.getInflightMessageSize().getTotalSize();
}
@Override
public int getInFlightUsage() {
if (info.getPrefetchSize() > 0) {

View File

@ -237,6 +237,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
savedDispateched = new ArrayList<MessageReference>(dispatched);
}
dispatched.clear();
getSubscriptionStatistics().getInflightMessageSize().reset();
}
if (!keepDurableSubsActive && pending.isTransient()) {
try {

View File

@ -176,6 +176,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
pending.remove();
createMessageDispatch(node, node.getMessage());
dispatched.add(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
onDispatch(node, node.getMessage());
}
return;
@ -240,6 +241,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
for (final MessageReference node : removeList) {
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
}
// this only happens after a reconnect - get an ack which is not
// valid
@ -257,6 +259,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
getSubscriptionStatistics().getDequeues().increment();
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
} else {
registerRemoveSync(context, node);
}
@ -379,6 +382,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
for (final MessageReference node : removeList) {
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
}
if (!callDispatchMatched) {
throw new JMSException(
@ -427,6 +431,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
synchronized(dispatchLock) {
getSubscriptionStatistics().getDequeues().increment();
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
nodeDest.getDestinationStatistics().getInflight().decrement();
}
nodeDest.wakeup();
@ -620,6 +625,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
for (MessageReference r : dispatched) {
if (r.getRegionDestination() == destination) {
references.add(r);
getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
}
}
rc.addAll(references);
@ -697,6 +703,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (node != QueueMessageReference.NULL_MESSAGE) {
getSubscriptionStatistics().getDispatched().increment();
dispatched.add(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
}
if (getPrefetchSize() == 0) {
while (true) {

View File

@ -196,6 +196,11 @@ public interface Subscription extends SubscriptionRecovery {
*/
int getInFlightSize();
/**
* @return the size in bytes of the messages awaiting acknowledgement
*/
long getInFlightMessageSize();
/**
* @return the in flight messages as a percentage of the prefetch size
*/

View File

@ -18,6 +18,7 @@
package org.apache.activemq.broker.region;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.SizeStatisticImpl;
import org.apache.activemq.management.StatsImpl;
/**
@ -29,6 +30,7 @@ public class SubscriptionStatistics extends StatsImpl {
protected CountStatisticImpl enqueues;
protected CountStatisticImpl dequeues;
protected CountStatisticImpl dispatched;
protected SizeStatisticImpl inflightMessageSize;
public SubscriptionStatistics() {
@ -41,11 +43,13 @@ public class SubscriptionStatistics extends StatsImpl {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the subscription");
dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the subscription");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the subscription");
inflightMessageSize = new SizeStatisticImpl("inflightMessageSize", "The size in bytes of messages dispatched but awaiting acknowledgement");
addStatistic("consumedCount", consumedCount);
addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
addStatistic("inflightMessageSize", inflightMessageSize);
this.setEnabled(enabled);
}
@ -66,6 +70,10 @@ public class SubscriptionStatistics extends StatsImpl {
return dispatched;
}
public SizeStatisticImpl getInflightMessageSize() {
return inflightMessageSize;
}
public void reset() {
if (this.isDoReset()) {
super.reset();
@ -73,6 +81,7 @@ public class SubscriptionStatistics extends StatsImpl {
enqueues.reset();
dequeues.reset();
dispatched.reset();
inflightMessageSize.reset();
}
}
@ -82,6 +91,7 @@ public class SubscriptionStatistics extends StatsImpl {
enqueues.setEnabled(enabled);
dispatched.setEnabled(enabled);
dequeues.setEnabled(enabled);
inflightMessageSize.setEnabled(enabled);
}
public void setParent(SubscriptionStatistics parent) {
@ -90,11 +100,13 @@ public class SubscriptionStatistics extends StatsImpl {
enqueues.setParent(parent.enqueues);
dispatched.setParent(parent.dispatched);
dequeues.setParent(parent.dequeues);
inflightMessageSize.setParent(parent.inflightMessageSize);
} else {
consumedCount.setParent(null);
enqueues.setParent(null);
dispatched.setParent(null);
dequeues.setParent(null);
inflightMessageSize.setParent(null);
}
}

View File

@ -17,7 +17,11 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -37,6 +41,7 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.thread.Scheduler;
@ -71,6 +76,20 @@ public class TopicSubscription extends AbstractSubscription {
protected boolean active = false;
protected boolean discarding = false;
/**
* This Map is used to keep track of messages that have been dispatched in sorted order to
* optimize message acknowledgement
*/
private NavigableMap<MessageId, MessageReference> dispatched = new ConcurrentSkipListMap<>(
new Comparator<MessageId>() {
@Override
public int compare(MessageId m1, MessageId m2) {
return m1 == null ? (m2 == null ? 0 : -1) : (m2 == null ? 1
: Long.compare(m1.getBrokerSequenceId(), m2.getBrokerSequenceId()));
}
});
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info);
this.usageManager = usageManager;
@ -250,6 +269,8 @@ public class TopicSubscription extends AbstractSubscription {
if (node.getMessageId().equals(mdn.getMessageId())) {
matched.remove();
getSubscriptionStatistics().getDispatched().increment();
dispatched.put(node.getMessageId(), node);
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
node.decrementReferenceCount();
break;
}
@ -277,6 +298,7 @@ public class TopicSubscription extends AbstractSubscription {
}
}
getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
updateInflightMessageSizeOnAck(ack);
dispatchMatched();
}
});
@ -289,6 +311,7 @@ public class TopicSubscription extends AbstractSubscription {
}
}
getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
updateInflightMessageSizeOnAck(ack);
}
while (true) {
int currentExtension = prefetchExtension.get();
@ -379,6 +402,27 @@ public class TopicSubscription extends AbstractSubscription {
}
}
/**
* Update the inflight statistics on message ack. Since a message ack could be a range,
* we need to grab a subtree of the dispatched map to acknowledge messages. Finding the
* subMap is an O(log n) operation.
* @param ack
*/
private void updateInflightMessageSizeOnAck(final MessageAck ack) {
if (ack.getFirstMessageId() != null) {
NavigableMap<MessageId, MessageReference> acked = dispatched
.subMap(ack.getFirstMessageId(), true, ack.getLastMessageId(), true);
Iterator<MessageId> i = acked.keySet().iterator();
while (i.hasNext()) {
getSubscriptionStatistics().getInflightMessageSize().addSize(-acked.get(i.next()).getSize());
i.remove();
}
} else {
getSubscriptionStatistics().getInflightMessageSize().addSize(-dispatched.get(ack.getLastMessageId()).getSize());
dispatched.remove(ack.getLastMessageId());
}
}
@Override
public int countBeforeFull() {
return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize();
@ -602,6 +646,8 @@ public class TopicSubscription extends AbstractSubscription {
if (node != null) {
md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
getSubscriptionStatistics().getDispatched().increment();
dispatched.put(node.getMessageId(), node);
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
// Keep track if this subscription is receiving messages from a single destination.
if (singleDestination) {
if (destination == null) {
@ -683,6 +729,7 @@ public class TopicSubscription extends AbstractSubscription {
}
}
setSlowConsumer(false);
dispatched.clear();
}
@Override

View File

@ -366,6 +366,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
public SubscriptionStatistics getSubscriptionStatistics() {
return subscriptionStatistics;
}
@Override
public long getInFlightMessageSize() {
return subscriptionStatistics.getInflightMessageSize().getTotalSize();
}
};
queue.addSubscription(contextNotInTx, subscription);

View File

@ -376,5 +376,10 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
return subscriptionStatistics;
}
@Override
public long getInFlightMessageSize() {
return subscriptionStatistics.getInflightMessageSize().getTotalSize();
}
}
}

View File

@ -0,0 +1,295 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.statistics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized.Parameters;
/**
* This test shows Inflight Message sizes are correct for various acknowledgement modes.
*/
public abstract class AbstractInflightMessageSizeTest {
protected BrokerService brokerService;
protected Connection connection;
protected String brokerUrlString;
protected Session session;
protected javax.jms.Destination dest;
protected Destination amqDestination;
protected MessageConsumer consumer;
protected int prefetch = 100;
final protected int ackType;
final protected boolean optimizeAcknowledge;
final protected String destName = "testDest";
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{ActiveMQSession.SESSION_TRANSACTED, true},
{ActiveMQSession.AUTO_ACKNOWLEDGE, true},
{ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true},
{ActiveMQSession.CLIENT_ACKNOWLEDGE, true},
{ActiveMQSession.SESSION_TRANSACTED, false},
{ActiveMQSession.AUTO_ACKNOWLEDGE, false},
{ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false},
{ActiveMQSession.CLIENT_ACKNOWLEDGE, false}
});
}
public AbstractInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
this.ackType = ackType;
this.optimizeAcknowledge = optimizeAcknowledge;
}
@Before
public void setUp() throws Exception {
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true);
TransportConnector tcp = brokerService
.addConnector("tcp://localhost:0");
brokerService.start();
//used to test optimizeAcknowledge works
String optAckString = optimizeAcknowledge ? "?jms.optimizeAcknowledge=true&jms.optimizedAckScheduledAckInterval=2000" : "";
brokerUrlString = tcp.getPublishableConnectString() + optAckString;
connection = createConnectionFactory().createConnection();
connection.setClientID("client1");
connection.start();
session = connection.createSession(ackType == ActiveMQSession.SESSION_TRANSACTED, ackType);
dest = getDestination();
consumer = getMessageConsumer();
amqDestination = TestSupport.getDestination(brokerService, getActiveMQDestination());
}
protected ActiveMQConnectionFactory createConnectionFactory()
throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlString);
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setTopicPrefetch(prefetch);
prefetchPolicy.setQueuePrefetch(prefetch);
prefetchPolicy.setOptimizeDurableTopicPrefetch(prefetch);
factory.setPrefetchPolicy(prefetchPolicy);
return factory;
}
@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
brokerService.stop();
}
/**
* Tests that inflight message size goes up and comes back down to 0 after
* messages are consumed
*
* @throws javax.jms.JMSException
* @throws InterruptedException
*/
@Test(timeout=15000)
public void testInflightMessageSize() throws Exception {
final long size = sendMessages(10);
assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() > size;
}
}));
receiveMessages(10);
assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() == 0;
}
}));
}
/**
* Test that the in flight message size won't rise after prefetch is filled
*
* @throws Exception
*/
@Test(timeout=15000)
public void testInflightMessageSizePrefetchFilled() throws Exception {
final long size = sendMessages(prefetch);
assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() > size;
}
}));
final long inFlightSize = getSubscription().getInFlightMessageSize();
sendMessages(10);
//Prefetch has been filled, so the size should not change with 10 more messages
assertEquals("Inflight message size should not change", inFlightSize, getSubscription().getInFlightMessageSize());
receiveMessages(prefetch + 10);
assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() == 0;
}
}));
}
/**
* Test that the in flight message size will still rise if prefetch is not filled
*
* @throws Exception
*/
@Test(timeout=15000)
public void testInflightMessageSizePrefetchNotFilled() throws Exception {
final long size = sendMessages(prefetch - 10);
assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() > size;
}
}));
//capture the inflight size and send 10 more messages
final long inFlightSize = getSubscription().getInFlightMessageSize();
sendMessages(10);
//Prefetch has NOT been filled, so the size should rise with 10 more messages
assertTrue("Inflight message size should be greater than previous inlight size", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() > inFlightSize;
}
}));
receiveMessages(prefetch);
assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() == 0;
}
}));
}
/**
* Tests that inflight message size goes up and doesn't go down if receive is rolledback
*
* @throws javax.jms.JMSException
* @throws InterruptedException
*/
@Test(timeout=15000)
public void testInflightMessageSizeRollback() throws Exception {
Assume.assumeTrue(ackType == ActiveMQSession.SESSION_TRANSACTED);
final long size = sendMessages(10);
assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() > size;
}
}));
long inFlightSize = getSubscription().getInFlightMessageSize();
for (int i = 0; i < 10; i++) {
consumer.receive();
}
session.rollback();
assertEquals("Inflight message size should not change on rollback", inFlightSize, getSubscription().getInFlightMessageSize());
}
/**
* This method will generate random sized messages up to 150000 bytes.
*
* @param count
* @throws JMSException
*/
protected long sendMessages(int count) throws JMSException {
MessageProducer producer = session.createProducer(dest);
long totalSize = 0;
for (int i = 0; i < count; i++) {
Random r = new Random();
int size = r.nextInt(150000);
totalSize += size;
byte[] bytes = new byte[size > 0 ? size : 1];
r.nextBytes(bytes);
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(bytes);
producer.send(bytesMessage);
}
if (session.getTransacted()) {
session.commit();
}
return totalSize;
}
protected void receiveMessages(int count) throws JMSException {
for (int i = 0; i < count; i++) {
javax.jms.Message m = consumer.receive();
if (ackType == ActiveMQSession.SESSION_TRANSACTED) {
session.commit();
} else if (ackType != ActiveMQSession.AUTO_ACKNOWLEDGE) {
m.acknowledge();
}
}
}
protected abstract Subscription getSubscription();
protected abstract ActiveMQDestination getActiveMQDestination();
protected abstract MessageConsumer getMessageConsumer() throws JMSException;
protected abstract javax.jms.Destination getDestination() throws JMSException ;
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.statistics;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.SubscriptionKey;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* This test shows Inflight Message sizes are correct for various acknowledgement modes
* using a DurableSubscription
*/
@RunWith(Parameterized.class)
public class DurableSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
public DurableSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
super(ackType, optimizeAcknowledge);
}
@Override
protected MessageConsumer getMessageConsumer() throws JMSException {
return session.createDurableSubscriber((javax.jms.Topic)dest, "sub1");
}
@Override
protected Subscription getSubscription() {
return ((Topic)amqDestination).getDurableTopicSubs().get(new SubscriptionKey("client1", "sub1"));
}
@Override
protected javax.jms.Topic getDestination() throws JMSException {
return session.createTopic(destName);
}
@Override
protected ActiveMQDestination getActiveMQDestination() {
return new ActiveMQTopic(destName);
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.statistics;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* This test shows Inflight Message sizes are correct for various acknowledgement modes
* using a QueueSubscription
*/
@RunWith(Parameterized.class)
public class QueueSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
public QueueSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
super(ackType, optimizeAcknowledge);
}
@Override
protected MessageConsumer getMessageConsumer() throws JMSException {
return session.createConsumer(dest);
}
@Override
protected Subscription getSubscription() {
return ((Queue)amqDestination).getConsumers().get(0);
}
@Override
protected Destination getDestination() throws JMSException {
return session.createQueue(destName);
}
@Override
protected ActiveMQDestination getActiveMQDestination() {
return new ActiveMQQueue(destName);
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.statistics;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* This test shows Inflight Message sizes are correct for various acknowledgement modes
* using a TopicSubscription
*/
@RunWith(Parameterized.class)
public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
public TopicSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
super(ackType, optimizeAcknowledge);
}
@Override
protected MessageConsumer getMessageConsumer() throws JMSException {
return session.createConsumer(dest);
}
@Override
protected Subscription getSubscription() {
return amqDestination.getConsumers().get(0);
}
@Override
protected Destination getDestination() throws JMSException {
return session.createTopic(destName);
}
@Override
protected ActiveMQDestination getActiveMQDestination() {
return new ActiveMQTopic(destName);
}
}