ARTEMIS-1379 Some queue stats not threadsafe

This commit is contained in:
Justin Bertram 2017-08-29 22:13:27 -05:00 committed by Clebert Suconic
parent f37093e33c
commit 2f5a9322d0
3 changed files with 112 additions and 34 deletions

View File

@ -177,13 +177,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final ScheduledDeliveryHandler scheduledDeliveryHandler; private final ScheduledDeliveryHandler scheduledDeliveryHandler;
private long messagesAdded; private AtomicLong messagesAdded = new AtomicLong(0);
private long messagesAcknowledged; private AtomicLong messagesAcknowledged = new AtomicLong(0);
private long messagesExpired; private AtomicLong messagesExpired = new AtomicLong(0);
private long messagesKilled; private AtomicLong messagesKilled = new AtomicLong(0);
protected final AtomicInteger deliveringCount = new AtomicInteger(0); protected final AtomicInteger deliveringCount = new AtomicInteger(0);
@ -637,7 +637,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
directDeliver = false; directDeliver = false;
if (!ref.isPaged()) { if (!ref.isPaged()) {
messagesAdded++; messagesAdded.incrementAndGet();
} }
} }
@ -702,7 +702,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (scheduledDeliveryHandler.checkAndSchedule(ref, true)) { if (scheduledDeliveryHandler.checkAndSchedule(ref, true)) {
synchronized (this) { synchronized (this) {
if (!ref.isPaged()) { if (!ref.isPaged()) {
messagesAdded++; messagesAdded.incrementAndGet();
} }
} }
@ -1132,11 +1132,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
if (reason == AckReason.EXPIRED) { if (reason == AckReason.EXPIRED) {
messagesExpired++; messagesExpired.incrementAndGet();
} else if (reason == AckReason.KILLED) { } else if (reason == AckReason.KILLED) {
messagesKilled++; messagesKilled.incrementAndGet();
} else { } else {
messagesAcknowledged++; messagesAcknowledged.incrementAndGet();
} }
if (server != null) { if (server != null) {
@ -1170,11 +1170,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
if (reason == AckReason.EXPIRED) { if (reason == AckReason.EXPIRED) {
messagesExpired++; messagesExpired.incrementAndGet();
} else if (reason == AckReason.KILLED) { } else if (reason == AckReason.KILLED) {
messagesKilled++; messagesKilled.incrementAndGet();
} else { } else {
messagesAcknowledged++; messagesAcknowledged.incrementAndGet();
} }
if (server != null) { if (server != null) {
@ -1195,7 +1195,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// https://issues.jboss.org/browse/HORNETQ-609 // https://issues.jboss.org/browse/HORNETQ-609
incDelivering(); incDelivering();
messagesAcknowledged++; messagesAcknowledged.incrementAndGet();
} }
private RefsOperation getRefsOperation(final Transaction tx) { private RefsOperation getRefsOperation(final Transaction tx) {
@ -1314,7 +1314,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override @Override
public void incrementMesssagesAdded() { public void incrementMesssagesAdded() {
messagesAdded++; messagesAdded.incrementAndGet();
} }
@Override @Override
@ -1332,25 +1332,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override @Override
public long getMessagesAdded() { public long getMessagesAdded() {
if (pageSubscription != null) { if (pageSubscription != null) {
return messagesAdded + pageSubscription.getCounter().getValueAdded(); return messagesAdded.get() + pageSubscription.getCounter().getValueAdded();
} else { } else {
return messagesAdded; return messagesAdded.get();
} }
} }
@Override @Override
public long getMessagesAcknowledged() { public long getMessagesAcknowledged() {
return messagesAcknowledged; return messagesAcknowledged.get();
} }
@Override @Override
public long getMessagesExpired() { public long getMessagesExpired() {
return messagesExpired; return messagesExpired.get();
} }
@Override @Override
public long getMessagesKilled() { public long getMessagesKilled() {
return messagesKilled; return messagesKilled.get();
} }
@Override @Override
@ -2057,7 +2057,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
internalAddTail(ref); internalAddTail(ref);
if (!ref.isPaged()) { if (!ref.isPaged()) {
messagesAdded++; messagesAdded.incrementAndGet();
} }
if (added++ > MAX_DELIVERIES_IN_LOOP) { if (added++ > MAX_DELIVERIES_IN_LOOP) {
@ -2734,7 +2734,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
groups.put(groupID, consumer); groups.put(groupID, consumer);
} }
messagesAdded++; messagesAdded.incrementAndGet();
deliveriesInTransit.countUp(); deliveriesInTransit.countUp();
proceedDeliver(consumer, ref); proceedDeliver(consumer, ref);
@ -2911,22 +2911,22 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override @Override
public synchronized void resetMessagesAdded() { public synchronized void resetMessagesAdded() {
messagesAdded = 0; messagesAdded.set(0);
} }
@Override @Override
public synchronized void resetMessagesAcknowledged() { public synchronized void resetMessagesAcknowledged() {
messagesAcknowledged = 0; messagesAcknowledged.set(0);
} }
@Override @Override
public synchronized void resetMessagesExpired() { public synchronized void resetMessagesExpired() {
messagesExpired = 0; messagesExpired.set(0);
} }
@Override @Override
public synchronized void resetMessagesKilled() { public synchronized void resetMessagesKilled() {
messagesKilled = 0; messagesKilled.set(0);
} }
@Override @Override

View File

@ -20,6 +20,7 @@ import javax.management.MBeanServer;
import javax.management.MBeanServerFactory; import javax.management.MBeanServerFactory;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
@ -104,6 +105,14 @@ public abstract class ManagementTestBase extends ActiveMQTestBase {
return queueControl; return queueControl;
} }
protected QueueControl createManagementControl(final SimpleString address,
final SimpleString queue,
final RoutingType routingType) throws Exception {
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, routingType, mbeanServer);
return queueControl;
}
protected long getMessageCount(QueueControl control) throws Exception { protected long getMessageCount(QueueControl control) throws Exception {
control.flushExecutor(); control.flushExecutor();
return control.getMessageCount(); return control.getMessageCount();

View File

@ -24,11 +24,14 @@ import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
@ -48,7 +51,6 @@ import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManage
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil; import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.Base64;
@ -605,6 +607,81 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue); session.deleteQueue(queue);
} }
@Test
public void testMessagesAddedAndMessagesAcknowledged() throws Exception {
final int THREAD_COUNT = 5;
final int MSG_COUNT = 1000;
CountDownLatch producerCountDown = new CountDownLatch(THREAD_COUNT);
CountDownLatch consumerCountDown = new CountDownLatch(THREAD_COUNT);
ExecutorService producerExecutor = Executors.newFixedThreadPool(THREAD_COUNT);
ExecutorService consumerExecutor = Executors.newFixedThreadPool(THREAD_COUNT);
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
try {
session.createQueue(address, RoutingType.ANYCAST, queue, null, false);
for (int i = 0; i < THREAD_COUNT; i++) {
producerExecutor.submit(() -> {
try (ClientSessionFactory sf = locator.createSessionFactory(); ClientSession session = sf.createSession(false, true, false); ClientProducer producer = session.createProducer(address)) {
for (int j = 0; j < MSG_COUNT; j++) {
producer.send(session.createMessage(false));
Thread.sleep(5);
}
producerCountDown.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
}
for (int i = 0; i < THREAD_COUNT; i++) {
consumerExecutor.submit(() -> {
try (ClientSessionFactory sf = locator.createSessionFactory(); ClientSession session = sf.createSession(false, true, false); ClientConsumer consumer = session.createConsumer(queue)) {
session.start();
for (int j = 0; j < MSG_COUNT; j++) {
ClientMessage message = consumer.receive(500);
Assert.assertNotNull(message);
message.acknowledge();
}
session.commit();
consumerCountDown.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
}
producerCountDown.await(30, TimeUnit.SECONDS);
consumerCountDown.await(30, TimeUnit.SECONDS);
QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST);
Assert.assertEquals(0, queueControl.getMessageCount());
Assert.assertEquals(0, queueControl.getConsumerCount());
Assert.assertEquals(0, queueControl.getDeliveringCount());
Assert.assertEquals(THREAD_COUNT * MSG_COUNT, queueControl.getMessagesAdded());
Assert.assertEquals(THREAD_COUNT * MSG_COUNT, queueControl.getMessagesAcknowledged());
session.deleteQueue(queue);
} finally {
shutdownExecutor(producerExecutor);
shutdownExecutor(consumerExecutor);
}
}
private void shutdownExecutor(ExecutorService executor) {
try {
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
} finally {
executor.shutdownNow();
}
}
@Test @Test
public void testListMessagesAsJSONWithNullFilter() throws Exception { public void testListMessagesAsJSONWithNullFilter() throws Exception {
SimpleString address = RandomUtil.randomSimpleString(); SimpleString address = RandomUtil.randomSimpleString();
@ -2153,14 +2230,6 @@ public class QueueControlTest extends ManagementTestBase {
session.start(); session.start();
} }
@Override
protected QueueControl createManagementControl(final SimpleString address,
final SimpleString queue) throws Exception {
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, mbeanServer);
return queueControl;
}
protected long getFirstMessageId(final QueueControl queueControl) throws Exception { protected long getFirstMessageId(final QueueControl queueControl) throws Exception {
JsonArray array = JsonUtil.readJsonArray(queueControl.getFirstMessageAsJSON()); JsonArray array = JsonUtil.readJsonArray(queueControl.getFirstMessageAsJSON());
JsonObject object = (JsonObject) array.get(0); JsonObject object = (JsonObject) array.get(0);