ARTEMIS-4579 Add `peekFirstMessage*` and `peekFirstScheduledMessage*` functions

This commit is contained in:
Jan Šmucr 2024-01-19 06:49:22 +01:00 committed by Justin Bertram
parent 9cd48602ab
commit 3c580c9351
10 changed files with 390 additions and 0 deletions

View File

@ -2682,4 +2682,32 @@ public interface AuditLogger {
@LogMessage(id = 601772, value = "User {} is getting producerWindowSize on target resource: {}", level = LogMessage.Level.INFO) @LogMessage(id = 601772, value = "User {} is getting producerWindowSize on target resource: {}", level = LogMessage.Level.INFO)
void getProducerWindowSize(String user, Object source); void getProducerWindowSize(String user, Object source);
static void peekFirstScheduledMessage(Object source) {
BASE_LOGGER.peekFirstScheduledMessage(getCaller(), source);
}
@LogMessage(id = 601773, value = "User {} is getting first scheduled message on target resource: {}", level = LogMessage.Level.INFO)
void peekFirstScheduledMessage(String user, Object source);
static void peekFirstScheduledMessageAsJSON(Object source) {
BASE_LOGGER.peekFirstScheduledMessageAsJSON(getCaller(), source);
}
@LogMessage(id = 601774, value = "User {} is getting first scheduled message as json on target resource: {}", level = LogMessage.Level.INFO)
void peekFirstScheduledMessageAsJSON(String user, Object source);
static void peekFirstMessage(Object source) {
BASE_LOGGER.peekFirstMessage(getCaller(), source);
}
@LogMessage(id = 601775, value = "User {} is getting first message on target resource: {}", level = LogMessage.Level.INFO)
void peekFirstMessage(String user, Object source);
static void peekFirstMessageAsJSON(Object source) {
BASE_LOGGER.peekFirstMessageAsJSON(getCaller(), source);
}
@LogMessage(id = 601776, value = "User {} is getting first message as json on target resource: {}", level = LogMessage.Level.INFO)
void peekFirstMessageAsJSON(String user, Object source);
} }

View File

@ -788,4 +788,17 @@ public interface QueueControl {
*/ */
@Attribute(desc = "whether this queue is available for auto deletion") @Attribute(desc = "whether this queue is available for auto deletion")
boolean isAutoDelete(); boolean isAutoDelete();
/**
* Returns the first message on the queue as JSON
*/
@Operation(desc = "Returns first message on the queue as JSON", impact = MBeanOperationInfo.INFO)
String peekFirstMessageAsJSON() throws Exception;
/**
* Returns the first scheduled message on the queue as JSON
*/
@Operation(desc = "Returns first scheduled message on the queue as JSON", impact = MBeanOperationInfo.INFO)
String peekFirstScheduledMessageAsJSON() throws Exception;
} }

View File

@ -888,6 +888,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
* or null if there's no first message. * or null if there's no first message.
* @return * @return
* @throws Exception * @throws Exception
* @deprecated Use {@link #peekFirstMessage()} instead.
*/ */
protected Map<String, Object> getFirstMessage() throws Exception { protected Map<String, Object> getFirstMessage() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) { if (AuditLogger.isBaseLoggingEnabled()) {
@ -910,6 +911,59 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
} }
/**
* this method returns a Map representing the first message.
* or null if there's no first message.
* @return A result of {@link Message#toMap()}
*/
protected Map<String, Object> peekFirstMessage() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.peekFirstMessage(queue);
}
checkStarted();
clearIO();
try {
MessageReference firstMessage = queue.peekFirstMessage();
if (firstMessage != null) {
return firstMessage.getMessage().toMap();
} else {
return null;
}
} finally {
blockOnIO();
}
}
/**
* this method returns a Map representing the first scheduled message.
* or null if there's no first message.
* @return A result of {@link Message#toMap()}
*/
protected Map<String, Object> peekFirstScheduledMessage() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.peekFirstScheduledMessage(queue);
}
checkStarted();
clearIO();
try {
MessageReference firstScheduledMessage = queue.peekFirstScheduledMessage();
if (firstScheduledMessage != null) {
return firstScheduledMessage.getMessage().toMap();
} else {
return null;
}
} finally {
blockOnIO();
}
}
/**
* @deprecated Use {@link #peekFirstMessageAsJSON()} instead.
*/
@Override @Override
public String getFirstMessageAsJSON() throws Exception { public String getFirstMessageAsJSON() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) { if (AuditLogger.isBaseLoggingEnabled()) {
@ -921,6 +975,38 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
return toJSON(message == null ? new Map[1] : new Map[]{message}); return toJSON(message == null ? new Map[1] : new Map[]{message});
} }
/**
* Uses {@link #peekFirstMessage()} and returns the result as JSON.
* @return A {@link Message} instance as a JSON object, or <code>"null"</code> if there's no such message.
*/
@Override
public String peekFirstMessageAsJSON() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.peekFirstMessageAsJSON(queue);
}
Map<String, Object> message = peekFirstMessage();
if (message == null) {
return "null";
}
return JsonUtil.toJsonObject(message).toString();
}
/**
* Uses {@link #peekFirstScheduledMessage()} and returns the result as JSON.
* @return A {@link Message} instance as a JSON object, or <code>"null"</code> if there's no such message.
*/
@Override
public String peekFirstScheduledMessageAsJSON() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.peekFirstScheduledMessageAsJSON(queue);
}
Map<String, Object> message = peekFirstScheduledMessage();
if (message == null) {
return "null";
}
return JsonUtil.toJsonObject(message).toString();
}
@Override @Override
public Long getFirstMessageTimestamp() throws Exception { public Long getFirstMessageTimestamp() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) { if (AuditLogger.isBaseLoggingEnabled()) {

View File

@ -436,6 +436,10 @@ public interface Queue extends Bindable,CriticalComponent {
return null; return null;
} }
default MessageReference peekFirstScheduledMessage() {
return null;
}
LinkedListIterator<MessageReference> browserIterator(); LinkedListIterator<MessageReference> browserIterator();
SimpleString getExpiryAddress(); SimpleString getExpiryAddress();

View File

@ -34,6 +34,8 @@ public interface ScheduledDeliveryHandler {
long getDurableScheduledSize(); long getDurableScheduledSize();
MessageReference peekFirstScheduledMessage();
List<MessageReference> getScheduledReferences(); List<MessageReference> getScheduledReferences();
List<MessageReference> cancel(Predicate<MessageReference> predicate) throws ActiveMQException; List<MessageReference> cancel(Predicate<MessageReference> predicate) throws ActiveMQException;

View File

@ -1745,6 +1745,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return null; return null;
} }
@Override
public MessageReference peekFirstScheduledMessage() {
synchronized (this) {
if (scheduledDeliveryHandler != null) {
return scheduledDeliveryHandler.peekFirstScheduledMessage();
}
}
return null;
}
@Override @Override
public synchronized MessageReference removeReferenceWithID(final long id1) throws Exception { public synchronized MessageReference removeReferenceWithID(final long id1) throws Exception {
try (LinkedListIterator<MessageReference> iterator = iterator()) { try (LinkedListIterator<MessageReference> iterator = iterator()) {

View File

@ -51,10 +51,14 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
// This contains RefSchedules which are delegates to the real references // This contains RefSchedules which are delegates to the real references
// just adding some information to keep it in order accordingly to the initial operations // just adding some information to keep it in order accordingly to the initial operations
// Do not forget to call notifyScheduledReferencesUpdated() when updating the set.
private final TreeSet<RefScheduled> scheduledReferences = new TreeSet<>(new MessageReferenceComparator()); private final TreeSet<RefScheduled> scheduledReferences = new TreeSet<>(new MessageReferenceComparator());
private final QueueMessageMetrics metrics; private final QueueMessageMetrics metrics;
// Oldest by timestamp, not by scheduled delivery time
private MessageReference oldestMessage = null;
public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor, public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor,
final Queue queue) { final Queue queue) {
this.scheduledExecutor = scheduledExecutor; this.scheduledExecutor = scheduledExecutor;
@ -82,6 +86,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
public void addInPlace(final long deliveryTime, final MessageReference ref, final boolean tail) { public void addInPlace(final long deliveryTime, final MessageReference ref, final boolean tail) {
synchronized (scheduledReferences) { synchronized (scheduledReferences) {
scheduledReferences.add(new RefScheduled(ref, tail)); scheduledReferences.add(new RefScheduled(ref, tail));
notifyScheduledReferencesUpdated();
} }
metrics.incrementMetrics(ref); metrics.incrementMetrics(ref);
} }
@ -129,6 +134,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
MessageReference ref = iter.next().getRef(); MessageReference ref = iter.next().getRef();
if (predicate.test(ref)) { if (predicate.test(ref)) {
iter.remove(); iter.remove();
notifyScheduledReferencesUpdated();
refs.add(ref); refs.add(ref);
metrics.decrementMetrics(ref); metrics.decrementMetrics(ref);
} }
@ -151,6 +157,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
if (ref.getMessage().getMessageID() == id) { if (ref.getMessage().getMessageID() == id) {
ref.acknowledge(tx); ref.acknowledge(tx);
iter.remove(); iter.remove();
notifyScheduledReferencesUpdated();
metrics.decrementMetrics(ref); metrics.decrementMetrics(ref);
return ref; return ref;
} }
@ -188,6 +195,34 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
} }
} }
protected void notifyScheduledReferencesUpdated() {
oldestMessage = null;
}
@Override
public MessageReference peekFirstScheduledMessage() {
synchronized (scheduledReferences) {
if (scheduledReferences.isEmpty()) {
return null;
}
if (oldestMessage != null) {
return oldestMessage;
}
MessageReference result = null;
long oldestTimestamp = Long.MAX_VALUE;
for (RefScheduled scheduledReference : scheduledReferences) {
MessageReference ref = scheduledReference.getRef();
long refTimestamp = ref.getMessage().getTimestamp();
if (refTimestamp < oldestTimestamp) {
oldestTimestamp = refTimestamp;
result = ref;
}
}
oldestMessage = result;
return result;
}
}
private class ScheduledDeliveryRunnable implements Runnable { private class ScheduledDeliveryRunnable implements Runnable {
long deliveryTime; long deliveryTime;
@ -232,6 +267,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
} }
iter.remove(); iter.remove();
notifyScheduledReferencesUpdated();
metrics.decrementMetrics(reference); metrics.decrementMetrics(reference);
reference.setScheduledDeliveryTime(0); reference.setScheduledDeliveryTime(0);

View File

@ -975,6 +975,77 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue); session.deleteQueue(queue);
} }
@Test
public void testPeekFirstMessage() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertEquals(0, getMessageCount(queueControl));
assertEquals("null", queueControl.peekFirstMessageAsJSON());
String fooValue = RandomUtil.randomString();
ClientProducer producer = session.createProducer(address);
producer.send(session.createMessage(false).putStringProperty("foo", fooValue));
Wait.assertEquals(1, queueControl::getMessageCount);
JsonObject messageAsJson = JsonUtil.readJsonObject(queueControl.peekFirstMessageAsJSON());
assertEquals(fooValue, messageAsJson.getString("foo"));
session.deleteQueue(queue);
}
@Test
public void testPeekFirstScheduledMessage() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertEquals(0, getMessageCount(queueControl));
// It's empty, so it's supposed to be like this
assertEquals("null", queueControl.peekFirstScheduledMessageAsJSON());
long timestampBeforeSend = System.currentTimeMillis();
ClientProducer producer = addClientProducer(session.createProducer(address));
ClientMessage message = session.createMessage(durable)
.putStringProperty("x", "valueX")
.putStringProperty("y", "valueY")
.putBooleanProperty("durable", durable)
.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timestampBeforeSend + 5000);
producer.send(message);
consumeMessages(0, session, queue);
assertScheduledMetrics(queueControl, 1, durable);
long timestampAfterSend = System.currentTimeMillis();
JsonObject messageAsJson = JsonUtil.readJsonObject(queueControl.peekFirstScheduledMessageAsJSON());
assertEquals("valueX", messageAsJson.getString("x"));
assertEquals("valueY", messageAsJson.getString("y"));
assertEquals(durable, messageAsJson.getBoolean("durable"));
long messageTimestamp = messageAsJson.getJsonNumber("timestamp").longValue();
assertTrue(messageTimestamp >= timestampBeforeSend);
assertTrue(messageTimestamp <= timestampAfterSend);
// Make sure that the message is no longer available the "not scheduled" way
assertEquals("[{}]", queueControl.getFirstMessageAsJSON());
queueControl.deliverScheduledMessage(messageAsJson.getInt("messageID"));
queueControl.flushExecutor();
assertScheduledMetrics(queueControl, 0, durable);
consumeMessages(1, session, queue);
session.deleteQueue(queue);
}
@Test @Test
public void testMessageAttributeLimits() throws Exception { public void testMessageAttributeLimits() throws Exception {
SimpleString address = RandomUtil.randomSimpleString(); SimpleString address = RandomUtil.randomSimpleString();

View File

@ -415,6 +415,22 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (String) proxy.invokeOperation("getFirstMessageAsJSON"); return (String) proxy.invokeOperation("getFirstMessageAsJSON");
} }
/**
* Returns the first message on the queue as JSON
*/
@Override
public String peekFirstMessageAsJSON() throws Exception {
return (String) proxy.invokeOperation("peekFirstMessageAsJSON");
}
/**
* Returns the first scheduled message on the queue as JSON
*/
@Override
public String peekFirstScheduledMessageAsJSON() throws Exception {
return (String) proxy.invokeOperation("peekFirstScheduledMessageAsJSON");
}
/** /**
* Returns the timestamp of the first message in milliseconds. * Returns the timestamp of the first message in milliseconds.
*/ */

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.leak;
import io.github.checkleak.core.CheckLeak;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
public class MessageReferenceLeakTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
ActiveMQServer server;
ClientSession session;
public void startServer() throws Exception {
server = createServer(false, false);
server.start();
}
@BeforeClass
public static void beforeClass() throws Exception {
Assume.assumeTrue(CheckLeak.isLoaded());
}
@Override
@Before
public void setUp() throws Exception {
startServer();
ServerLocator locator = addServerLocator(createInVMNonHALocator().setBlockOnNonDurableSend(true).setConsumerWindowSize(0));
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
session = addClientSession(sf.createSession(false, true, false));
session.start();
}
@Override
public void tearDown() throws Exception {
super.tearDown();
server = null;
}
@Test
public void testScheduledMessageReferenceLeak() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(new QueueConfiguration().setAddress(address).setName(queue).setDurable(false));
Queue serverQueue = server.locateQueue(queue);
try (ClientProducer producer = session.createProducer(address)) {
ClientMessage message = createTextMessage(session, "Hello world")
.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + 5000);
producer.send(message);
}
assertNull(serverQueue.peekFirstMessage());
MessageReference ref = serverQueue.peekFirstScheduledMessage();
assertNotNull(ref);
// Store this for later to check for leaks
String refClassName = ref.getClass().getCanonicalName();
long messageId = ref.getMessageID();
// Get rid of the message reference.
ref = null;
assertNull(ref);
// Override Message.HDR_SCHEDULED_DELIVERY_TIME
serverQueue.deliverScheduledMessage(messageId);
serverQueue.flushExecutor();
try (ClientConsumer consumer = session.createConsumer(queue)) {
ClientMessage message = consumer.receive(1000);
assertNotNull(message);
message.individualAcknowledge();
session.commit(true);
assertEquals(messageId, message.getMessageID());
}
// Now that I've consumed the message there should be no reference left.
// I cannot just assert that there's no org.apache.activemq.artemis.core.server.MessageReference because there's
// a static instance of it: org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl.RETRY_MARK
MemoryAssertions.assertMemory(new CheckLeak(), 0, refClassName);
session.deleteQueue(queue);
}
}