diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index 481a5945be..5bcdad238a 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2976,4 +2976,12 @@ public interface AuditLogger extends BasicLogger { @LogMessage(level = Logger.Level.INFO) @Message(id = 601763, value = "User {0} is remove a connector on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT) void removeConnector(String user, Object source, Object... args); + + static void deliverScheduledMessage(Object source, Object... args) { + BASE_LOGGER.deliverScheduledMessage(getCaller(), source, arrayToString(args)); + } + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 601764, value = "User {0} is calling deliverScheduledMessage on queue: {1} {2}", format = Message.Format.MESSAGE_FORMAT) + void deliverScheduledMessage(String user, Object source, Object... args); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index a3b132637e..98548438c1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -770,4 +770,16 @@ public interface QueueControl { */ @Attribute(desc = "return how many messages are stuck in prepared transactions") int getPreparedTransactionMessageCount(); + + /** + * Deliver the scheduled messages which match the filter + */ + @Operation(desc = "Immediately deliver the scheduled messages which match the filter", impact = MBeanOperationInfo.ACTION) + void deliverScheduledMessages(@Parameter(name = "filter", desc = "filter to match messages to deliver") String filter) throws Exception; + + /** + * Deliver the scheduled message with the specified message ID + */ + @Operation(desc = "Immediately deliver the scheduled message with the specified message ID", impact = MBeanOperationInfo.ACTION) + void deliverScheduledMessage(@Parameter(name = "messageID", desc = "ID of the message to deliver") long messageId) throws Exception; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index b653f8597f..748e001d51 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -1962,8 +1962,35 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override + public void deliverScheduledMessages(String filter) throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.deliverScheduledMessage(queue, filter); + } + checkStarted(); + clearIO(); + try { + queue.deliverScheduledMessages(filter); + } finally { + blockOnIO(); + } + } + @Override + public void deliverScheduledMessage(long messageId) throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.deliverScheduledMessage(queue, messageId); + } + checkStarted(); + + clearIO(); + try { + queue.deliverScheduledMessage(messageId); + } finally { + blockOnIO(); + } + } private void checkStarted() { if (!server.getPostOffice().isStarted()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 67e2b0b31c..cc99a18a7a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -504,6 +504,16 @@ public interface Queue extends Bindable,CriticalComponent { */ void deliverScheduledMessages() throws ActiveMQException; + /** + * cancels scheduled messages which match the filter and send them to the head of the queue. + */ + void deliverScheduledMessages(String filter) throws ActiveMQException; + + /** + * cancels scheduled message with the corresponding message ID and sends it to the head of the queue. + */ + void deliverScheduledMessage(long messageId) throws ActiveMQException; + void postAcknowledge(MessageReference ref, AckReason reason); void postAcknowledge(MessageReference ref, AckReason reason, boolean delivering); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java index 898480e477..f419ae8d7c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java @@ -17,9 +17,9 @@ package org.apache.activemq.artemis.core.server; import java.util.List; +import java.util.function.Predicate; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.transaction.Transaction; public interface ScheduledDeliveryHandler { @@ -36,7 +36,7 @@ public interface ScheduledDeliveryHandler { List getScheduledReferences(); - List cancel(Filter filter) throws ActiveMQException; + List cancel(Predicate predicate) throws ActiveMQException; MessageReference removeReferenceWithID(long id) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 61f9a4655c..e64990642c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2035,7 +2035,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void deliverScheduledMessages() throws ActiveMQException { - List scheduledMessages = scheduledDeliveryHandler.cancel(null); + internalDeliverScheduleMessages(scheduledDeliveryHandler.cancel(ref -> true)); + } + + @Override + public void deliverScheduledMessages(String filterString) throws ActiveMQException { + final Filter filter = filterString == null || filterString.length() == 0 ? null : FilterImpl.createFilter(filterString); + internalDeliverScheduleMessages(scheduledDeliveryHandler.cancel(ref -> filter == null ? true : filter.match(ref.getMessage()))); + } + + @Override + public void deliverScheduledMessage(long messageId) throws ActiveMQException { + internalDeliverScheduleMessages(scheduledDeliveryHandler.cancel(ref -> ref.getMessageID() == messageId)); + } + + private void internalDeliverScheduleMessages(List scheduledMessages) { if (scheduledMessages != null && scheduledMessages.size() > 0) { for (MessageReference ref : scheduledMessages) { ref.getMessage().setScheduledDeliveryTime(ref.getScheduledDeliveryTime()); @@ -2170,7 +2184,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { txCount = 0; } - List cancelled = scheduledDeliveryHandler.cancel(filter1); + List cancelled = scheduledDeliveryHandler.cancel(ref -> filter1.match(ref.getMessage())); for (MessageReference messageReference : cancelled) { messageAction.actMessage(tx, messageReference); count++; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java index f663e60a8f..8d0b4d7f04 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java @@ -27,9 +27,9 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler; @@ -117,7 +117,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { } @Override - public List cancel(final Filter filter) throws ActiveMQException { + public List cancel(Predicate predicate) throws ActiveMQException { List refs = new ArrayList<>(); synchronized (scheduledReferences) { @@ -125,7 +125,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { while (iter.hasNext()) { MessageReference ref = iter.next().getRef(); - if (filter == null || filter.match(ref.getMessage())) { + if (predicate.test(ref)) { iter.remove(); refs.add(ref); metrics.decrementMetrics(ref); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java index ca540cceaa..438075cc83 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java @@ -895,6 +895,16 @@ public class RoutingContextTest { } + @Override + public void deliverScheduledMessages(String filter) throws ActiveMQException { + + } + + @Override + public void deliverScheduledMessage(long messageId) throws ActiveMQException { + + } + @Override public void postAcknowledge(MessageReference ref, AckReason reason) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index b0397b8dbd..f49b81d337 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1623,6 +1623,16 @@ public class ScheduledDeliveryHandlerTest extends Assert { } + @Override + public void deliverScheduledMessages(String filter) throws ActiveMQException { + + } + + @Override + public void deliverScheduledMessage(long messageId) throws ActiveMQException { + + } + @Override public void route(Message message, RoutingContext context) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index be9b65cc08..a75a8e0764 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -58,6 +58,24 @@ public class QueueControlUsingCoreTest extends QueueControlTest { } } + @Override + public void deliverScheduledMessages(String filter) throws Exception { + try { + proxy.invokeOperation("deliverScheduledMessages", filter); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public void deliverScheduledMessage(long messageId) throws Exception { + try { + proxy.invokeOperation("deliverScheduledMessage", messageId); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + @Override public void resetAllGroups() { try { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/scheduling/ScheduledMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/scheduling/ScheduledMessageTest.java index 29b7eb60bf..956df30e93 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/scheduling/ScheduledMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/scheduling/ScheduledMessageTest.java @@ -30,12 +30,15 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.junit.Assert; import org.junit.Before; @@ -505,6 +508,87 @@ public class ScheduledMessageTest extends ActiveMQTestBase { session.close(); } + @Test + public void testManagementDeliveryById() throws Exception { + + ClientSessionFactory sessionFactory = createSessionFactory(locator); + ClientSession session = sessionFactory.createSession(false, false, false); + session.createQueue(new QueueConfiguration(atestq)); + ClientProducer producer = session.createProducer(atestq); + long time = System.currentTimeMillis(); + time += 999_999_999; + + ClientMessage messageToSend = session.createMessage(true); + messageToSend.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time); + producer.send(messageToSend); + + session.commit(); + + session.start(); + ClientConsumer consumer = session.createConsumer(atestq); + ClientMessage message = consumer.receive(500); + assertNull(message); + + QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + atestq); + queueControl.deliverScheduledMessage((long) queueControl.listScheduledMessages()[0].get("messageID")); + + message = consumer.receive(500); + assertNotNull(message); + message.acknowledge(); + + session.commit(); + + Assert.assertNull(consumer.receiveImmediate()); + + session.close(); + } + + @Test + public void testManagementDeliveryByFilter() throws Exception { + final String propertyValue = RandomUtil.randomString(); + final String propertyName = "X" + RandomUtil.randomString().replace("-",""); + ClientSessionFactory sessionFactory = createSessionFactory(locator); + ClientSession session = sessionFactory.createSession(false, false, false); + session.createQueue(new QueueConfiguration(atestq)); + ClientProducer producer = session.createProducer(atestq); + long time = System.currentTimeMillis(); + time += 999_999_999; + + ClientMessage messageToSend = session.createMessage(true); + messageToSend.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time); + messageToSend.putStringProperty(propertyName, propertyValue); + producer.send(messageToSend); + + messageToSend = session.createMessage(true); + messageToSend.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time); + messageToSend.putStringProperty(propertyName, propertyValue); + producer.send(messageToSend); + + session.commit(); + + session.start(); + ClientConsumer consumer = session.createConsumer(atestq); + ClientMessage message = consumer.receive(500); + assertNull(message); + + QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + atestq); + queueControl.deliverScheduledMessages(propertyName + " = '" + propertyValue + "'"); + + message = consumer.receive(500); + assertNotNull(message); + message.acknowledge(); + + message = consumer.receive(500); + assertNotNull(message); + message.acknowledge(); + + session.commit(); + + Assert.assertNull(consumer.receiveImmediate()); + + session.close(); + } + public void testScheduledAndNormalMessagesDeliveredCorrectly(final boolean recover) throws Exception { ClientSessionFactory sessionFactory = createSessionFactory(locator); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 10299cb05f..dd910810ea 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -22,6 +22,7 @@ import java.util.Set; import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.RoutingType; @@ -678,6 +679,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } + @Override + public void deliverScheduledMessages(String filter) throws ActiveMQException { + + } + + @Override + public void deliverScheduledMessage(long messageId) throws ActiveMQException { + + } + @Override public SimpleString getName() { return name;