This closes #4120
This commit is contained in:
commit
73e57e5ea8
|
@ -2976,4 +2976,12 @@ public interface AuditLogger extends BasicLogger {
|
|||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 601763, value = "User {0} is remove a connector on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void removeConnector(String user, Object source, Object... args);
|
||||
|
||||
static void deliverScheduledMessage(Object source, Object... args) {
|
||||
BASE_LOGGER.deliverScheduledMessage(getCaller(), source, arrayToString(args));
|
||||
}
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 601764, value = "User {0} is calling deliverScheduledMessage on queue: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void deliverScheduledMessage(String user, Object source, Object... args);
|
||||
}
|
||||
|
|
|
@ -770,4 +770,16 @@ public interface QueueControl {
|
|||
*/
|
||||
@Attribute(desc = "return how many messages are stuck in prepared transactions")
|
||||
int getPreparedTransactionMessageCount();
|
||||
|
||||
/**
|
||||
* Deliver the scheduled messages which match the filter
|
||||
*/
|
||||
@Operation(desc = "Immediately deliver the scheduled messages which match the filter", impact = MBeanOperationInfo.ACTION)
|
||||
void deliverScheduledMessages(@Parameter(name = "filter", desc = "filter to match messages to deliver") String filter) throws Exception;
|
||||
|
||||
/**
|
||||
* Deliver the scheduled message with the specified message ID
|
||||
*/
|
||||
@Operation(desc = "Immediately deliver the scheduled message with the specified message ID", impact = MBeanOperationInfo.ACTION)
|
||||
void deliverScheduledMessage(@Parameter(name = "messageID", desc = "ID of the message to deliver") long messageId) throws Exception;
|
||||
}
|
||||
|
|
|
@ -1962,8 +1962,35 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliverScheduledMessages(String filter) throws Exception {
|
||||
if (AuditLogger.isBaseLoggingEnabled()) {
|
||||
AuditLogger.deliverScheduledMessage(queue, filter);
|
||||
}
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
queue.deliverScheduledMessages(filter);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliverScheduledMessage(long messageId) throws Exception {
|
||||
if (AuditLogger.isBaseLoggingEnabled()) {
|
||||
AuditLogger.deliverScheduledMessage(queue, messageId);
|
||||
}
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
queue.deliverScheduledMessage(messageId);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
private void checkStarted() {
|
||||
if (!server.getPostOffice().isStarted()) {
|
||||
|
|
|
@ -504,6 +504,16 @@ public interface Queue extends Bindable,CriticalComponent {
|
|||
*/
|
||||
void deliverScheduledMessages() throws ActiveMQException;
|
||||
|
||||
/**
|
||||
* cancels scheduled messages which match the filter and send them to the head of the queue.
|
||||
*/
|
||||
void deliverScheduledMessages(String filter) throws ActiveMQException;
|
||||
|
||||
/**
|
||||
* cancels scheduled message with the corresponding message ID and sends it to the head of the queue.
|
||||
*/
|
||||
void deliverScheduledMessage(long messageId) throws ActiveMQException;
|
||||
|
||||
void postAcknowledge(MessageReference ref, AckReason reason);
|
||||
|
||||
void postAcknowledge(MessageReference ref, AckReason reason, boolean delivering);
|
||||
|
|
|
@ -17,9 +17,9 @@
|
|||
package org.apache.activemq.artemis.core.server;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
|
||||
public interface ScheduledDeliveryHandler {
|
||||
|
@ -36,7 +36,7 @@ public interface ScheduledDeliveryHandler {
|
|||
|
||||
List<MessageReference> getScheduledReferences();
|
||||
|
||||
List<MessageReference> cancel(Filter filter) throws ActiveMQException;
|
||||
List<MessageReference> cancel(Predicate<MessageReference> predicate) throws ActiveMQException;
|
||||
|
||||
MessageReference removeReferenceWithID(long id) throws Exception;
|
||||
|
||||
|
|
|
@ -2035,7 +2035,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
@Override
|
||||
public void deliverScheduledMessages() throws ActiveMQException {
|
||||
List<MessageReference> scheduledMessages = scheduledDeliveryHandler.cancel(null);
|
||||
internalDeliverScheduleMessages(scheduledDeliveryHandler.cancel(ref -> true));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliverScheduledMessages(String filterString) throws ActiveMQException {
|
||||
final Filter filter = filterString == null || filterString.length() == 0 ? null : FilterImpl.createFilter(filterString);
|
||||
internalDeliverScheduleMessages(scheduledDeliveryHandler.cancel(ref -> filter == null ? true : filter.match(ref.getMessage())));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliverScheduledMessage(long messageId) throws ActiveMQException {
|
||||
internalDeliverScheduleMessages(scheduledDeliveryHandler.cancel(ref -> ref.getMessageID() == messageId));
|
||||
}
|
||||
|
||||
private void internalDeliverScheduleMessages(List<MessageReference> scheduledMessages) {
|
||||
if (scheduledMessages != null && scheduledMessages.size() > 0) {
|
||||
for (MessageReference ref : scheduledMessages) {
|
||||
ref.getMessage().setScheduledDeliveryTime(ref.getScheduledDeliveryTime());
|
||||
|
@ -2170,7 +2184,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
txCount = 0;
|
||||
}
|
||||
|
||||
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter1);
|
||||
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(ref -> filter1.match(ref.getMessage()));
|
||||
for (MessageReference messageReference : cancelled) {
|
||||
messageAction.actMessage(tx, messageReference);
|
||||
count++;
|
||||
|
|
|
@ -27,9 +27,9 @@ import java.util.TreeSet;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
|
||||
|
@ -117,7 +117,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<MessageReference> cancel(final Filter filter) throws ActiveMQException {
|
||||
public List<MessageReference> cancel(Predicate<MessageReference> predicate) throws ActiveMQException {
|
||||
List<MessageReference> refs = new ArrayList<>();
|
||||
|
||||
synchronized (scheduledReferences) {
|
||||
|
@ -125,7 +125,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
|
|||
|
||||
while (iter.hasNext()) {
|
||||
MessageReference ref = iter.next().getRef();
|
||||
if (filter == null || filter.match(ref.getMessage())) {
|
||||
if (predicate.test(ref)) {
|
||||
iter.remove();
|
||||
refs.add(ref);
|
||||
metrics.decrementMetrics(ref);
|
||||
|
|
|
@ -895,6 +895,16 @@ public class RoutingContextTest {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliverScheduledMessages(String filter) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliverScheduledMessage(long messageId) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postAcknowledge(MessageReference ref, AckReason reason) {
|
||||
|
||||
|
|
|
@ -1623,6 +1623,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliverScheduledMessages(String filter) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliverScheduledMessage(long messageId) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void route(Message message, RoutingContext context) throws Exception {
|
||||
|
||||
|
|
|
@ -58,6 +58,24 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliverScheduledMessages(String filter) throws Exception {
|
||||
try {
|
||||
proxy.invokeOperation("deliverScheduledMessages", filter);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliverScheduledMessage(long messageId) throws Exception {
|
||||
try {
|
||||
proxy.invokeOperation("deliverScheduledMessage", messageId);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetAllGroups() {
|
||||
try {
|
||||
|
|
|
@ -30,12 +30,15 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.core.management.QueueControl;
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.RandomUtil;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -505,6 +508,87 @@ public class ScheduledMessageTest extends ActiveMQTestBase {
|
|||
session.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testManagementDeliveryById() throws Exception {
|
||||
|
||||
ClientSessionFactory sessionFactory = createSessionFactory(locator);
|
||||
ClientSession session = sessionFactory.createSession(false, false, false);
|
||||
session.createQueue(new QueueConfiguration(atestq));
|
||||
ClientProducer producer = session.createProducer(atestq);
|
||||
long time = System.currentTimeMillis();
|
||||
time += 999_999_999;
|
||||
|
||||
ClientMessage messageToSend = session.createMessage(true);
|
||||
messageToSend.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
|
||||
producer.send(messageToSend);
|
||||
|
||||
session.commit();
|
||||
|
||||
session.start();
|
||||
ClientConsumer consumer = session.createConsumer(atestq);
|
||||
ClientMessage message = consumer.receive(500);
|
||||
assertNull(message);
|
||||
|
||||
QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + atestq);
|
||||
queueControl.deliverScheduledMessage((long) queueControl.listScheduledMessages()[0].get("messageID"));
|
||||
|
||||
message = consumer.receive(500);
|
||||
assertNotNull(message);
|
||||
message.acknowledge();
|
||||
|
||||
session.commit();
|
||||
|
||||
Assert.assertNull(consumer.receiveImmediate());
|
||||
|
||||
session.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testManagementDeliveryByFilter() throws Exception {
|
||||
final String propertyValue = RandomUtil.randomString();
|
||||
final String propertyName = "X" + RandomUtil.randomString().replace("-","");
|
||||
ClientSessionFactory sessionFactory = createSessionFactory(locator);
|
||||
ClientSession session = sessionFactory.createSession(false, false, false);
|
||||
session.createQueue(new QueueConfiguration(atestq));
|
||||
ClientProducer producer = session.createProducer(atestq);
|
||||
long time = System.currentTimeMillis();
|
||||
time += 999_999_999;
|
||||
|
||||
ClientMessage messageToSend = session.createMessage(true);
|
||||
messageToSend.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
|
||||
messageToSend.putStringProperty(propertyName, propertyValue);
|
||||
producer.send(messageToSend);
|
||||
|
||||
messageToSend = session.createMessage(true);
|
||||
messageToSend.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
|
||||
messageToSend.putStringProperty(propertyName, propertyValue);
|
||||
producer.send(messageToSend);
|
||||
|
||||
session.commit();
|
||||
|
||||
session.start();
|
||||
ClientConsumer consumer = session.createConsumer(atestq);
|
||||
ClientMessage message = consumer.receive(500);
|
||||
assertNull(message);
|
||||
|
||||
QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + atestq);
|
||||
queueControl.deliverScheduledMessages(propertyName + " = '" + propertyValue + "'");
|
||||
|
||||
message = consumer.receive(500);
|
||||
assertNotNull(message);
|
||||
message.acknowledge();
|
||||
|
||||
message = consumer.receive(500);
|
||||
assertNotNull(message);
|
||||
message.acknowledge();
|
||||
|
||||
session.commit();
|
||||
|
||||
Assert.assertNull(consumer.receiveImmediate());
|
||||
|
||||
session.close();
|
||||
}
|
||||
|
||||
public void testScheduledAndNormalMessagesDeliveredCorrectly(final boolean recover) throws Exception {
|
||||
|
||||
ClientSessionFactory sessionFactory = createSessionFactory(locator);
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
|
@ -678,6 +679,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliverScheduledMessages(String filter) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliverScheduledMessage(long messageId) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString getName() {
|
||||
return name;
|
||||
|
|
Loading…
Reference in New Issue