From 3c580c9351d06969b0062ae6ec5d0104ba36a205 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20=C5=A0mucr?= Date: Fri, 19 Jan 2024 06:49:22 +0100 Subject: [PATCH] ARTEMIS-4579 Add `peekFirstMessage*` and `peekFirstScheduledMessage*` functions --- .../activemq/artemis/logs/AuditLogger.java | 28 ++++ .../api/core/management/QueueControl.java | 13 ++ .../management/impl/QueueControlImpl.java | 86 ++++++++++++ .../activemq/artemis/core/server/Queue.java | 4 + .../core/server/ScheduledDeliveryHandler.java | 2 + .../artemis/core/server/impl/QueueImpl.java | 11 ++ .../impl/ScheduledDeliveryHandlerImpl.java | 36 +++++ .../management/QueueControlTest.java | 71 ++++++++++ .../management/QueueControlUsingCoreTest.java | 16 +++ .../tests/leak/MessageReferenceLeakTest.java | 123 ++++++++++++++++++ 10 files changed, 390 insertions(+) create mode 100755 tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MessageReferenceLeakTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index ffb2535150..004a9a38c6 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2682,4 +2682,32 @@ public interface AuditLogger { @LogMessage(id = 601772, value = "User {} is getting producerWindowSize on target resource: {}", level = LogMessage.Level.INFO) void getProducerWindowSize(String user, Object source); + static void peekFirstScheduledMessage(Object source) { + BASE_LOGGER.peekFirstScheduledMessage(getCaller(), source); + } + + @LogMessage(id = 601773, value = "User {} is getting first scheduled message on target resource: {}", level = LogMessage.Level.INFO) + void peekFirstScheduledMessage(String user, Object source); + + static void peekFirstScheduledMessageAsJSON(Object source) { + BASE_LOGGER.peekFirstScheduledMessageAsJSON(getCaller(), source); + } + + @LogMessage(id = 601774, value = "User {} is getting first scheduled message as json on target resource: {}", level = LogMessage.Level.INFO) + void peekFirstScheduledMessageAsJSON(String user, Object source); + + static void peekFirstMessage(Object source) { + BASE_LOGGER.peekFirstMessage(getCaller(), source); + } + + @LogMessage(id = 601775, value = "User {} is getting first message on target resource: {}", level = LogMessage.Level.INFO) + void peekFirstMessage(String user, Object source); + + static void peekFirstMessageAsJSON(Object source) { + BASE_LOGGER.peekFirstMessageAsJSON(getCaller(), source); + } + + @LogMessage(id = 601776, value = "User {} is getting first message as json on target resource: {}", level = LogMessage.Level.INFO) + void peekFirstMessageAsJSON(String user, Object source); + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index eecbcd904c..882d1de602 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -788,4 +788,17 @@ public interface QueueControl { */ @Attribute(desc = "whether this queue is available for auto deletion") boolean isAutoDelete(); + + /** + * Returns the first message on the queue as JSON + */ + @Operation(desc = "Returns first message on the queue as JSON", impact = MBeanOperationInfo.INFO) + String peekFirstMessageAsJSON() throws Exception; + + /** + * Returns the first scheduled message on the queue as JSON + */ + @Operation(desc = "Returns first scheduled message on the queue as JSON", impact = MBeanOperationInfo.INFO) + String peekFirstScheduledMessageAsJSON() throws Exception; + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 6c7713e12f..4dae496fbd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -888,6 +888,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { * or null if there's no first message. * @return * @throws Exception + * @deprecated Use {@link #peekFirstMessage()} instead. */ protected Map getFirstMessage() throws Exception { if (AuditLogger.isBaseLoggingEnabled()) { @@ -910,6 +911,59 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } + /** + * this method returns a Map representing the first message. + * or null if there's no first message. + * @return A result of {@link Message#toMap()} + */ + protected Map peekFirstMessage() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.peekFirstMessage(queue); + } + checkStarted(); + + clearIO(); + try { + MessageReference firstMessage = queue.peekFirstMessage(); + if (firstMessage != null) { + return firstMessage.getMessage().toMap(); + } else { + return null; + } + } finally { + blockOnIO(); + } + + } + + /** + * this method returns a Map representing the first scheduled message. + * or null if there's no first message. + * @return A result of {@link Message#toMap()} + */ + protected Map peekFirstScheduledMessage() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.peekFirstScheduledMessage(queue); + } + checkStarted(); + + clearIO(); + try { + MessageReference firstScheduledMessage = queue.peekFirstScheduledMessage(); + if (firstScheduledMessage != null) { + return firstScheduledMessage.getMessage().toMap(); + } else { + return null; + } + } finally { + blockOnIO(); + } + + } + + /** + * @deprecated Use {@link #peekFirstMessageAsJSON()} instead. + */ @Override public String getFirstMessageAsJSON() throws Exception { if (AuditLogger.isBaseLoggingEnabled()) { @@ -921,6 +975,38 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { return toJSON(message == null ? new Map[1] : new Map[]{message}); } + /** + * Uses {@link #peekFirstMessage()} and returns the result as JSON. + * @return A {@link Message} instance as a JSON object, or "null" if there's no such message. + */ + @Override + public String peekFirstMessageAsJSON() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.peekFirstMessageAsJSON(queue); + } + Map message = peekFirstMessage(); + if (message == null) { + return "null"; + } + return JsonUtil.toJsonObject(message).toString(); + } + + /** + * Uses {@link #peekFirstScheduledMessage()} and returns the result as JSON. + * @return A {@link Message} instance as a JSON object, or "null" if there's no such message. + */ + @Override + public String peekFirstScheduledMessageAsJSON() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.peekFirstScheduledMessageAsJSON(queue); + } + Map message = peekFirstScheduledMessage(); + if (message == null) { + return "null"; + } + return JsonUtil.toJsonObject(message).toString(); + } + @Override public Long getFirstMessageTimestamp() throws Exception { if (AuditLogger.isBaseLoggingEnabled()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 77d8df858b..dac6b00aea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -436,6 +436,10 @@ public interface Queue extends Bindable,CriticalComponent { return null; } + default MessageReference peekFirstScheduledMessage() { + return null; + } + LinkedListIterator browserIterator(); SimpleString getExpiryAddress(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java index f419ae8d7c..f8a5005676 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java @@ -34,6 +34,8 @@ public interface ScheduledDeliveryHandler { long getDurableScheduledSize(); + MessageReference peekFirstScheduledMessage(); + List getScheduledReferences(); List cancel(Predicate predicate) throws ActiveMQException; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 43e0efbc01..8dd7582917 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1745,6 +1745,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return null; } + @Override + public MessageReference peekFirstScheduledMessage() { + synchronized (this) { + if (scheduledDeliveryHandler != null) { + return scheduledDeliveryHandler.peekFirstScheduledMessage(); + } + } + + return null; + } + @Override public synchronized MessageReference removeReferenceWithID(final long id1) throws Exception { try (LinkedListIterator iterator = iterator()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java index 881a9b35a2..b8e1418a79 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java @@ -51,10 +51,14 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { // This contains RefSchedules which are delegates to the real references // just adding some information to keep it in order accordingly to the initial operations + // Do not forget to call notifyScheduledReferencesUpdated() when updating the set. private final TreeSet scheduledReferences = new TreeSet<>(new MessageReferenceComparator()); private final QueueMessageMetrics metrics; + // Oldest by timestamp, not by scheduled delivery time + private MessageReference oldestMessage = null; + public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor, final Queue queue) { this.scheduledExecutor = scheduledExecutor; @@ -82,6 +86,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { public void addInPlace(final long deliveryTime, final MessageReference ref, final boolean tail) { synchronized (scheduledReferences) { scheduledReferences.add(new RefScheduled(ref, tail)); + notifyScheduledReferencesUpdated(); } metrics.incrementMetrics(ref); } @@ -129,6 +134,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { MessageReference ref = iter.next().getRef(); if (predicate.test(ref)) { iter.remove(); + notifyScheduledReferencesUpdated(); refs.add(ref); metrics.decrementMetrics(ref); } @@ -151,6 +157,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { if (ref.getMessage().getMessageID() == id) { ref.acknowledge(tx); iter.remove(); + notifyScheduledReferencesUpdated(); metrics.decrementMetrics(ref); return ref; } @@ -188,6 +195,34 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { } } + protected void notifyScheduledReferencesUpdated() { + oldestMessage = null; + } + + @Override + public MessageReference peekFirstScheduledMessage() { + synchronized (scheduledReferences) { + if (scheduledReferences.isEmpty()) { + return null; + } + if (oldestMessage != null) { + return oldestMessage; + } + MessageReference result = null; + long oldestTimestamp = Long.MAX_VALUE; + for (RefScheduled scheduledReference : scheduledReferences) { + MessageReference ref = scheduledReference.getRef(); + long refTimestamp = ref.getMessage().getTimestamp(); + if (refTimestamp < oldestTimestamp) { + oldestTimestamp = refTimestamp; + result = ref; + } + } + oldestMessage = result; + return result; + } + } + private class ScheduledDeliveryRunnable implements Runnable { long deliveryTime; @@ -232,6 +267,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { } iter.remove(); + notifyScheduledReferencesUpdated(); metrics.decrementMetrics(reference); reference.setScheduledDeliveryTime(0); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 19af20d74f..b82d78f807 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -975,6 +975,77 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(queue); } + @Test + public void testPeekFirstMessage() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable)); + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(0, getMessageCount(queueControl)); + + assertEquals("null", queueControl.peekFirstMessageAsJSON()); + + String fooValue = RandomUtil.randomString(); + ClientProducer producer = session.createProducer(address); + producer.send(session.createMessage(false).putStringProperty("foo", fooValue)); + Wait.assertEquals(1, queueControl::getMessageCount); + + JsonObject messageAsJson = JsonUtil.readJsonObject(queueControl.peekFirstMessageAsJSON()); + assertEquals(fooValue, messageAsJson.getString("foo")); + + session.deleteQueue(queue); + } + + @Test + public void testPeekFirstScheduledMessage() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable)); + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(0, getMessageCount(queueControl)); + + // It's empty, so it's supposed to be like this + assertEquals("null", queueControl.peekFirstScheduledMessageAsJSON()); + + long timestampBeforeSend = System.currentTimeMillis(); + + ClientProducer producer = addClientProducer(session.createProducer(address)); + ClientMessage message = session.createMessage(durable) + .putStringProperty("x", "valueX") + .putStringProperty("y", "valueY") + .putBooleanProperty("durable", durable) + .putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timestampBeforeSend + 5000); + producer.send(message); + + consumeMessages(0, session, queue); + assertScheduledMetrics(queueControl, 1, durable); + + long timestampAfterSend = System.currentTimeMillis(); + + JsonObject messageAsJson = JsonUtil.readJsonObject(queueControl.peekFirstScheduledMessageAsJSON()); + assertEquals("valueX", messageAsJson.getString("x")); + assertEquals("valueY", messageAsJson.getString("y")); + assertEquals(durable, messageAsJson.getBoolean("durable")); + + long messageTimestamp = messageAsJson.getJsonNumber("timestamp").longValue(); + assertTrue(messageTimestamp >= timestampBeforeSend); + assertTrue(messageTimestamp <= timestampAfterSend); + + // Make sure that the message is no longer available the "not scheduled" way + assertEquals("[{}]", queueControl.getFirstMessageAsJSON()); + + queueControl.deliverScheduledMessage(messageAsJson.getInt("messageID")); + queueControl.flushExecutor(); + assertScheduledMetrics(queueControl, 0, durable); + + consumeMessages(1, session, queue); + session.deleteQueue(queue); + } + @Test public void testMessageAttributeLimits() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index 5feca43665..fe613253ba 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -415,6 +415,22 @@ public class QueueControlUsingCoreTest extends QueueControlTest { return (String) proxy.invokeOperation("getFirstMessageAsJSON"); } + /** + * Returns the first message on the queue as JSON + */ + @Override + public String peekFirstMessageAsJSON() throws Exception { + return (String) proxy.invokeOperation("peekFirstMessageAsJSON"); + } + + /** + * Returns the first scheduled message on the queue as JSON + */ + @Override + public String peekFirstScheduledMessageAsJSON() throws Exception { + return (String) proxy.invokeOperation("peekFirstScheduledMessageAsJSON"); + } + /** * Returns the timestamp of the first message in milliseconds. */ diff --git a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MessageReferenceLeakTest.java b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MessageReferenceLeakTest.java new file mode 100755 index 0000000000..ee4860f6ca --- /dev/null +++ b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MessageReferenceLeakTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.leak; + +import io.github.checkleak.core.CheckLeak; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; + +public class MessageReferenceLeakTest extends ActiveMQTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + ActiveMQServer server; + ClientSession session; + + public void startServer() throws Exception { + server = createServer(false, false); + server.start(); + } + + @BeforeClass + public static void beforeClass() throws Exception { + Assume.assumeTrue(CheckLeak.isLoaded()); + } + + @Override + @Before + public void setUp() throws Exception { + startServer(); + ServerLocator locator = addServerLocator(createInVMNonHALocator().setBlockOnNonDurableSend(true).setConsumerWindowSize(0)); + ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); + session = addClientSession(sf.createSession(false, true, false)); + session.start(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + server = null; + } + + @Test + public void testScheduledMessageReferenceLeak() throws Exception { + + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(new QueueConfiguration().setAddress(address).setName(queue).setDurable(false)); + Queue serverQueue = server.locateQueue(queue); + + try (ClientProducer producer = session.createProducer(address)) { + ClientMessage message = createTextMessage(session, "Hello world") + .putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + 5000); + producer.send(message); + } + + assertNull(serverQueue.peekFirstMessage()); + + MessageReference ref = serverQueue.peekFirstScheduledMessage(); + assertNotNull(ref); + + // Store this for later to check for leaks + String refClassName = ref.getClass().getCanonicalName(); + long messageId = ref.getMessageID(); + // Get rid of the message reference. + ref = null; + assertNull(ref); + + // Override Message.HDR_SCHEDULED_DELIVERY_TIME + serverQueue.deliverScheduledMessage(messageId); + serverQueue.flushExecutor(); + + try (ClientConsumer consumer = session.createConsumer(queue)) { + ClientMessage message = consumer.receive(1000); + assertNotNull(message); + message.individualAcknowledge(); + session.commit(true); + assertEquals(messageId, message.getMessageID()); + } + + // Now that I've consumed the message there should be no reference left. + // I cannot just assert that there's no org.apache.activemq.artemis.core.server.MessageReference because there's + // a static instance of it: org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl.RETRY_MARK + MemoryAssertions.assertMemory(new CheckLeak(), 0, refClassName); + + session.deleteQueue(queue); + } +}