Add option to clean scheduled messages on startup

This commit is contained in:
Ken Liao 2024-11-14 21:43:38 +01:00
parent aa842da287
commit 15b6106324
2 changed files with 27 additions and 0 deletions

View File

@ -184,6 +184,7 @@ public class BrokerService implements Service {
private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
// to other jms messaging systems // to other jms messaging systems
private boolean deleteAllMessagesOnStartup; private boolean deleteAllMessagesOnStartup;
private boolean deleteAllScheduledMessagesOnStartup = false;
private boolean advisorySupport = true; private boolean advisorySupport = true;
private boolean anonymousProducerAdvisorySupport = false; private boolean anonymousProducerAdvisorySupport = false;
private URI vmConnectorURI; private URI vmConnectorURI;
@ -1630,6 +1631,14 @@ public class BrokerService implements Service {
this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
} }
/**
* Sets whether or not all scheduled messages are deleted on startup
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
*/
public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllScheduledMessagesOnStartup) {
this.deleteAllScheduledMessagesOnStartup = deleteAllScheduledMessagesOnStartup;
}
public URI getVmConnectorURI() { public URI getVmConnectorURI() {
if (vmConnectorURI == null) { if (vmConnectorURI == null) {
try { try {
@ -2440,6 +2449,7 @@ public class BrokerService implements Service {
if (isSchedulerSupport()) { if (isSchedulerSupport()) {
SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore()); SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed); sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed);
sb.setDeleteAllSchedulerdMessagesOnStartup(deleteAllScheduledMessagesOnStartup);
if (isUseJmx()) { if (isUseJmx()) {
JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
try { try {

View File

@ -72,6 +72,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
private final JobSchedulerStore store; private final JobSchedulerStore store;
private JobScheduler scheduler; private JobScheduler scheduler;
private int maxRepeatAllowed = MAX_REPEAT_ALLOWED; private int maxRepeatAllowed = MAX_REPEAT_ALLOWED;
private boolean deleteAllSchedulerdMessagesOnStartup;
public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception { public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception {
super(next); super(next);
@ -212,6 +213,9 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
public void start() throws Exception { public void start() throws Exception {
this.started.set(true); this.started.set(true);
getInternalScheduler(); getInternalScheduler();
if (deleteAllSchedulerdMessagesOnStartup) {
deleteAllScheduledMessages();
}
super.start(); super.start();
} }
@ -364,6 +368,11 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat); new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat);
} }
private void deleteAllScheduledMessages() throws Exception {
LOG.info("Deleting all scheduled messages on startup because deleteAllScheduledMessagesOnStartup configuration has been provided");
getInternalScheduler().removeAllJobs();
}
@Override @Override
public void scheduledJob(String id, ByteSequence job) { public void scheduledJob(String id, ByteSequence job) {
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength()); org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength());
@ -487,4 +496,12 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
public void setMaxRepeatAllowed(int maxRepeatAllowed) { public void setMaxRepeatAllowed(int maxRepeatAllowed) {
this.maxRepeatAllowed = maxRepeatAllowed; this.maxRepeatAllowed = maxRepeatAllowed;
} }
public boolean getDeleteAllSchedulerdMessagesOnStartup() {
return deleteAllSchedulerdMessagesOnStartup;
}
public void setDeleteAllSchedulerdMessagesOnStartup(boolean deleteAllSchedulerdMessagesOnStartup) {
this.deleteAllSchedulerdMessagesOnStartup = deleteAllSchedulerdMessagesOnStartup;
}
} }