This commit is contained in:
Dejan Bosanac 2015-08-26 12:28:19 +02:00
parent acb8602ada
commit ee54f09303
4 changed files with 119 additions and 51 deletions

View File

@ -322,6 +322,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
regionDestination.acknowledge(context, this, ack, node);
redeliveredMessages.remove(node.getMessageId());
node.decrementReferenceCount();
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
}
@Override

View File

@ -279,37 +279,16 @@ public class TopicSubscription extends AbstractSubscription {
if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception {
synchronized (TopicSubscription.this) {
if (singleDestination && destination != null) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
}
getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
updateInflightMessageSizeOnAck(ack);
updateStatsOnAck(ack);
dispatchMatched();
}
});
} else {
if (singleDestination && destination != null) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
if (info.isNetworkSubscription()) {
destination.getDestinationStatistics().getForwards().add(ack.getMessageCount());
}
}
getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
updateInflightMessageSizeOnAck(ack);
}
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
updateStatsOnAck(ack);
}
updatePrefetch(ack);
dispatchMatched();
return;
} else if (ack.isDeliveredAck()) {
@ -318,19 +297,8 @@ public class TopicSubscription extends AbstractSubscription {
dispatchMatched();
return;
} else if (ack.isExpiredAck()) {
if (singleDestination && destination != null) {
destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
updateStatsOnAck(ack);
updatePrefetch(ack);
dispatchMatched();
return;
} else if (ack.isRedeliveredAck()) {
@ -393,10 +361,10 @@ public class TopicSubscription extends AbstractSubscription {
}
/**
* Update the inflight statistics on message ack.
* Update the statistics on message ack.
* @param ack
*/
private void updateInflightMessageSizeOnAck(final MessageAck ack) {
private void updateStatsOnAck(final MessageAck ack) {
synchronized(dispatchLock) {
boolean inAckRange = false;
List<MessageReference> removeList = new ArrayList<MessageReference>();
@ -417,6 +385,25 @@ public class TopicSubscription extends AbstractSubscription {
for (final MessageReference node : removeList) {
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
getSubscriptionStatistics().getDequeues().increment();
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
if (info.isNetworkSubscription()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
}
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
}
}
}
}
private void updatePrefetch(MessageAck ack) {
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}

View File

@ -20,21 +20,13 @@ import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.jms.*;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
@ -59,6 +51,7 @@ import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.memory.list.MessageList;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.Wait;
@ -1621,4 +1614,91 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertNotNull("Message: " + i, consumer.receive(5000));
}
}
public void testTopicView() throws Exception {
connection = connectionFactory.createConnection();
connection.setClientID("test");
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
connection.start();
Topic singleTopic = session.createTopic("test.topic");
Topic wildcardTopic = session.createTopic("test.>");
TopicSubscriber durable1 = session.createDurableSubscriber(singleTopic, "single");
TopicSubscriber durable2 = session.createDurableSubscriber(wildcardTopic, "wildcard");
MessageConsumer consumer1 = session.createConsumer(singleTopic);
MessageConsumer consumer2 = session.createConsumer(wildcardTopic);
final ArrayList<Message> messages = new ArrayList<>();
MessageListener listener = new MessageListener() {
@Override
public void onMessage(Message message) {
messages.add(message);
}
};
durable1.setMessageListener(listener);
durable2.setMessageListener(listener);
consumer1.setMessageListener(listener);
consumer2.setMessageListener(listener);
MessageProducer producer = session.createProducer(singleTopic);
producer.send(session.createTextMessage("test"));
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return messages.size() == 4;
}
});
ObjectName topicObjName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test.topic");
final TopicViewMBean topicView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName, TopicViewMBean.class, true);
assertEquals(1, topicView.getEnqueueCount());
assertEquals(4, topicView.getDispatchCount());
assertEquals(4, topicView.getInFlightCount());
assertEquals(0, topicView.getDequeueCount());
ArrayList<SubscriptionViewMBean> subscriberViews = new ArrayList();
for (ObjectName name : topicView.getSubscriptions()) {
subscriberViews.add(MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true));
}
assertEquals(4, subscriberViews.size());
for (SubscriptionViewMBean subscriberView : subscriberViews) {
assertEquals(1, subscriberView.getEnqueueCounter());
assertEquals(1, subscriberView.getDispatchedCounter());
assertEquals(0, subscriberView.getDequeueCounter());
}
for (Message message : messages) {
try {
message.acknowledge();
} catch (JMSException ignore) {}
}
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return topicView.getDequeueCount() == 4;
}
});
assertEquals(1, topicView.getEnqueueCount());
assertEquals(4, topicView.getDispatchCount());
assertEquals(0, topicView.getInFlightCount());
assertEquals(4, topicView.getDequeueCount());
for (SubscriptionViewMBean subscriberView : subscriberViews) {
assertEquals(1, subscriberView.getEnqueueCounter());
assertEquals(1, subscriberView.getDispatchedCounter());
assertEquals(1, subscriberView.getDequeueCounter());
}
}
}

View File

@ -119,7 +119,7 @@ public class DurableSubscriptionOffline2Test extends DurableSubscriptionOfflineT
ObjectName destinationName = broker.getAdminView().getTopics()[0];
TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
assertEquals("topic view dequeue not updated", 5, topicView.getDequeueCount());
assertEquals("inflight", 5, topicView.getInFlightCount());
session.close();
@ -138,7 +138,7 @@ public class DurableSubscriptionOffline2Test extends DurableSubscriptionOfflineT
// destination view
assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
assertEquals("topic view dequeue not updated", 5, topicView.getDequeueCount());
assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
// consume the rest