ARTEMIS-2986 deleting scheduled messages not permanent

When deleting a durable scheduled message via the management API the
message would be removed from memory but it wouldn't be removed from
storage so when the broker restarted the message would reappear.

This commit fixes that by acking the message during the delete
operation.
This commit is contained in:
Justin Bertram 2020-11-12 10:24:20 -06:00 committed by clebertsuconic
parent e4de988bbd
commit 4bb9ed2d4e
4 changed files with 53 additions and 4 deletions

View File

@ -20,6 +20,7 @@ import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.transaction.Transaction;
public interface ScheduledDeliveryHandler {
@ -37,5 +38,7 @@ public interface ScheduledDeliveryHandler {
List<MessageReference> cancel(Filter filter) throws ActiveMQException;
MessageReference removeReferenceWithID(long id) throws ActiveMQException;
MessageReference removeReferenceWithID(long id) throws Exception;
MessageReference removeReferenceWithID(long id, Transaction tx) throws Exception;
}

View File

@ -2268,7 +2268,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (!deleted) {
// Look in scheduled deliveries
deleted = scheduledDeliveryHandler.removeReferenceWithID(messageID) != null ? true : false;
deleted = scheduledDeliveryHandler.removeReferenceWithID(messageID, tx) != null ? true : false;
}
tx.commit();

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.jboss.logging.Logger;
/**
@ -135,12 +136,18 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
}
@Override
public MessageReference removeReferenceWithID(final long id) throws ActiveMQException {
public MessageReference removeReferenceWithID(final long id) throws Exception {
return removeReferenceWithID(id, null);
}
@Override
public MessageReference removeReferenceWithID(final long id, Transaction tx) throws Exception {
synchronized (scheduledReferences) {
Iterator<RefScheduled> iter = scheduledReferences.iterator();
while (iter.hasNext()) {
MessageReference ref = iter.next().getRef();
if (ref.getMessage().getMessageID() == id) {
ref.acknowledge(tx);
iter.remove();
metrics.decrementMetrics(ref);
return ref;

View File

@ -78,6 +78,7 @@ import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.RetryRule;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -2143,6 +2144,44 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
@Test
public void testRemoveScheduledMessageRestart() throws Exception {
Assume.assumeTrue(durable);
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
ClientProducer producer = session.createProducer(address);
// send 2 messages on queue, both scheduled
long timeout = System.currentTimeMillis() + 5000;
ClientMessage m1 = session.createMessage(durable);
m1.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timeout);
producer.send(m1);
ClientMessage m2 = session.createMessage(durable);
m2.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timeout);
producer.send(m2);
QueueControl queueControl = createManagementControl(address, queue);
assertScheduledMetrics(queueControl, 2, durable);
// the message IDs are set on the server
Map<String, Object>[] messages = queueControl.listScheduledMessages();
Assert.assertEquals(2, messages.length);
long messageID = (Long) messages[0].get("messageID");
// delete 1st message
boolean deleted = queueControl.removeMessage(messageID);
Assert.assertTrue(deleted);
assertScheduledMetrics(queueControl, 1, durable);
locator.close();
server.stop();
server.start();
assertScheduledMetrics(queueControl, 1, durable);
}
@Test
public void testRemoveMessage2() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
@ -3376,7 +3415,7 @@ public class QueueControlTest extends ManagementTestBase {
public void setUp() throws Exception {
super.setUp();
Configuration conf = createDefaultInVMConfig().setJMXManagementEnabled(true);
server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer, false));
server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer, true));
server.start();