Make scheduler job dispatching start more deterministic

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1516912 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-08-23 16:01:12 +00:00
parent eb99e05d3a
commit 2d861dae65
5 changed files with 183 additions and 108 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.scheduler; package org.apache.activemq.broker.scheduler;
import java.util.List; import java.util.List;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
public interface JobScheduler { public interface JobScheduler {
@ -25,109 +26,152 @@ public interface JobScheduler {
* @return the name of the scheduler * @return the name of the scheduler
* @throws Exception * @throws Exception
*/ */
public abstract String getName() throws Exception; String getName() throws Exception;
/**
/**
* Starts dispatch of scheduled Jobs to registered listeners.
*
* Any listener added after the start dispatch method can miss jobs so its
* important to register critical listeners before the start of job dispatching.
*
* @throws Exception
*/
void startDispatching() throws Exception;
/**
* Stops dispatching of scheduled Jobs to registered listeners.
*
* @throws Exception
*/
void stopDispatching() throws Exception;
/**
* Add a Job listener * Add a Job listener
*
* @param l * @param l
* @throws Exception * @throws Exception
*/ */
public abstract void addListener(JobListener l) throws Exception; void addListener(JobListener l) throws Exception;
/**
/**
* remove a JobListener * remove a JobListener
*
* @param l * @param l
* @throws Exception * @throws Exception
*/ */
public abstract void removeListener(JobListener l) throws Exception; void removeListener(JobListener l) throws Exception;
/** /**
* Add a job to be scheduled * Add a job to be scheduled
* @param jobId a unique identifier for the job *
* @param payload the message to be sent when the job is scheduled * @param jobId
* @param delay the time in milliseconds before the job will be run * a unique identifier for the job
* @param payload
* the message to be sent when the job is scheduled
* @param delay
* the time in milliseconds before the job will be run
* @throws Exception * @throws Exception
*/ */
public abstract void schedule(String jobId, ByteSequence payload,long delay) throws Exception; void schedule(String jobId, ByteSequence payload, long delay) throws Exception;
/** /**
* Add a job to be scheduled * Add a job to be scheduled
* @param jobId a unique identifier for the job *
* @param payload the message to be sent when the job is scheduled * @param jobId
* @param cronEntry - cron entry * a unique identifier for the job
* @param payload
* the message to be sent when the job is scheduled
* @param cronEntry
* - cron entry
* @throws Exception * @throws Exception
*/ */
public abstract void schedule(String jobId, ByteSequence payload,String cronEntry) throws Exception; void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception;
/** /**
* Add a job to be scheduled * Add a job to be scheduled
* @param jobId a unique identifier for the job *
* @param payload the message to be sent when the job is scheduled * @param jobId
* @param cronEntry - cron entry * a unique identifier for the job
* @param delay time in ms to wait before scheduling * @param payload
* @param period the time in milliseconds between successive executions of the Job * the message to be sent when the job is scheduled
* @param repeat the number of times to execute the job - less than 0 will be repeated forever * @param cronEntry
* - cron entry
* @param delay
* time in ms to wait before scheduling
* @param period
* the time in milliseconds between successive executions of the Job
* @param repeat
* the number of times to execute the job - less than 0 will be repeated forever
* @throws Exception * @throws Exception
*/ */
public abstract void schedule(String jobId, ByteSequence payload,String cronEntry,long delay, long period, int repeat) throws Exception; void schedule(String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws Exception;
/** /**
* remove all jobs scheduled to run at this time * remove all jobs scheduled to run at this time
*
* @param time * @param time
* @throws Exception * @throws Exception
*/ */
public abstract void remove(long time) throws Exception; void remove(long time) throws Exception;
/** /**
* remove a job with the matching jobId * remove a job with the matching jobId
*
* @param jobId * @param jobId
* @throws Exception * @throws Exception
*/ */
public abstract void remove(String jobId) throws Exception; void remove(String jobId) throws Exception;
/** /**
* remove all the Jobs from the scheduler * remove all the Jobs from the scheduler
*
* @throws Exception * @throws Exception
*/ */
public abstract void removeAllJobs() throws Exception; void removeAllJobs() throws Exception;
/** /**
* remove all the Jobs from the scheduler that are due between the start and finish times * remove all the Jobs from the scheduler that are due between the start and finish times
* @param start time in milliseconds *
* @param finish time in milliseconds * @param start
* time in milliseconds
* @param finish
* time in milliseconds
* @throws Exception * @throws Exception
*/ */
public abstract void removeAllJobs(long start,long finish) throws Exception; void removeAllJobs(long start, long finish) throws Exception;
/** /**
* Get the next time jobs will be fired * Get the next time jobs will be fired
*
* @return the time in milliseconds * @return the time in milliseconds
* @throws Exception * @throws Exception
*/ */
public abstract long getNextScheduleTime() throws Exception; long getNextScheduleTime() throws Exception;
/** /**
* Get all the jobs scheduled to run next * Get all the jobs scheduled to run next
*
* @return a list of jobs that will be scheduled next * @return a list of jobs that will be scheduled next
* @throws Exception * @throws Exception
*/ */
public abstract List<Job> getNextScheduleJobs() throws Exception; List<Job> getNextScheduleJobs() throws Exception;
/** /**
* Get all the outstanding Jobs * Get all the outstanding Jobs
*
* @return a list of all jobs * @return a list of all jobs
* @throws Exception * @throws Exception
*/ */
public abstract List<Job> getAllJobs() throws Exception; List<Job> getAllJobs() throws Exception;
/** /**
* Get all outstanding jobs due to run between start and finish * Get all outstanding jobs due to run between start and finish
*
* @param start * @param start
* @param finish * @param finish
* @return a list of jobs * @return a list of jobs
* @throws Exception * @throws Exception
*/ */
public abstract List<Job> getAllJobs(long start,long finish)throws Exception; List<Job> getAllJobs(long start, long finish) throws Exception;
} }

View File

@ -18,118 +18,147 @@ package org.apache.activemq.broker.scheduler;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
public class JobSchedulerFacade implements JobScheduler { public class JobSchedulerFacade implements JobScheduler {
private final SchedulerBroker broker; private final SchedulerBroker broker;
JobSchedulerFacade(SchedulerBroker broker){ JobSchedulerFacade(SchedulerBroker broker) {
this.broker=broker; this.broker = broker;
} }
@Override
public void addListener(JobListener l) throws Exception { public void addListener(JobListener l) throws Exception {
JobScheduler js = this.broker.getInternalScheduler(); JobScheduler js = this.broker.getInternalScheduler();
if (js !=null) { if (js != null) {
js.addListener(l); js.addListener(l);
} }
} }
@Override
public List<Job> getAllJobs() throws Exception { public List<Job> getAllJobs() throws Exception {
JobScheduler js = this.broker.getInternalScheduler(); JobScheduler js = this.broker.getInternalScheduler();
if (js !=null) { if (js != null) {
return js.getAllJobs(); return js.getAllJobs();
} }
return Collections.emptyList(); return Collections.emptyList();
} }
@Override
public List<Job> getAllJobs(long start, long finish) throws Exception { public List<Job> getAllJobs(long start, long finish) throws Exception {
JobScheduler js = this.broker.getInternalScheduler(); JobScheduler js = this.broker.getInternalScheduler();
if (js !=null) { if (js != null) {
return js.getAllJobs(start,finish); return js.getAllJobs(start, finish);
} }
return Collections.emptyList(); return Collections.emptyList();
} }
@Override
public String getName() throws Exception { public String getName() throws Exception {
JobScheduler js = this.broker.getInternalScheduler(); JobScheduler js = this.broker.getInternalScheduler();
if (js !=null) { if (js != null) {
return js.getName(); return js.getName();
} }
return ""; return "";
} }
@Override
public List<Job> getNextScheduleJobs() throws Exception { public List<Job> getNextScheduleJobs() throws Exception {
JobScheduler js = this.broker.getInternalScheduler(); JobScheduler js = this.broker.getInternalScheduler();
if (js !=null) { if (js != null) {
return js.getNextScheduleJobs(); return js.getNextScheduleJobs();
} }
return Collections.emptyList(); return Collections.emptyList();
} }
@Override
public long getNextScheduleTime() throws Exception { public long getNextScheduleTime() throws Exception {
JobScheduler js = this.broker.getInternalScheduler(); JobScheduler js = this.broker.getInternalScheduler();
if (js !=null) { if (js != null) {
return js.getNextScheduleTime(); return js.getNextScheduleTime();
} }
return 0; return 0;
} }
@Override
public void remove(long time) throws Exception { public void remove(long time) throws Exception {
JobScheduler js = this.broker.getInternalScheduler(); JobScheduler js = this.broker.getInternalScheduler();
if (js !=null) { if (js != null) {
js.remove(time); js.remove(time);
} }
} }
@Override
public void remove(String jobId) throws Exception { public void remove(String jobId) throws Exception {
JobScheduler js = this.broker.getInternalScheduler(); JobScheduler js = this.broker.getInternalScheduler();
if (js !=null) { if (js != null) {
js.remove(jobId); js.remove(jobId);
} }
} }
@Override
public void removeAllJobs() throws Exception { public void removeAllJobs() throws Exception {
JobScheduler js = this.broker.getInternalScheduler(); JobScheduler js = this.broker.getInternalScheduler();
if (js !=null) { if (js != null) {
js.removeAllJobs(); js.removeAllJobs();
} }
} }
@Override
public void removeAllJobs(long start, long finish) throws Exception { public void removeAllJobs(long start, long finish) throws Exception {
JobScheduler js = this.broker.getInternalScheduler(); JobScheduler js = this.broker.getInternalScheduler();
if (js !=null) { if (js != null) {
js.removeAllJobs(start,finish); js.removeAllJobs(start, finish);
} }
} }
@Override
public void removeListener(JobListener l) throws Exception { public void removeListener(JobListener l) throws Exception {
JobScheduler js = this.broker.getInternalScheduler(); JobScheduler js = this.broker.getInternalScheduler();
if (js !=null) { if (js != null) {
js.removeListener(l); js.removeListener(l);
} }
} }
@Override
public void schedule(String jobId, ByteSequence payload, long delay) throws Exception { public void schedule(String jobId, ByteSequence payload, long delay) throws Exception {
JobScheduler js = this.broker.getInternalScheduler(); JobScheduler js = this.broker.getInternalScheduler();
if (js !=null) { if (js != null) {
js.schedule(jobId, payload, delay); js.schedule(jobId, payload, delay);
} }
} }
public void schedule(String jobId, ByteSequence payload,String cronEntry, long start, long period, int repeat) throws Exception { @Override
public void schedule(String jobId, ByteSequence payload, String cronEntry, long start, long period, int repeat) throws Exception {
JobScheduler js = this.broker.getInternalScheduler(); JobScheduler js = this.broker.getInternalScheduler();
if (js !=null) { if (js != null) {
js.schedule(jobId, payload, cronEntry,start,period,repeat); js.schedule(jobId, payload, cronEntry, start, period, repeat);
} }
} }
public void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
if (js !=null) {
js.schedule(jobId, payload, cronEntry);
}
@Override
public void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
if (js != null) {
js.schedule(jobId, payload, cronEntry);
}
}
@Override
public void startDispatching() throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
if (js != null) {
js.startDispatching();
}
}
@Override
public void stopDispatching() throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
if (js != null) {
js.stopDispatching();
}
} }
} }

View File

@ -16,14 +16,16 @@
*/ */
package org.apache.activemq.broker.scheduler; package org.apache.activemq.broker.scheduler;
import org.apache.activemq.Service;
import java.io.File; import java.io.File;
import org.apache.activemq.Service;
/** /**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a> * A Job Scheduler Store interface use to manage delay processing of Messaging
* related jobs.
*/ */
public interface JobSchedulerStore extends Service { public interface JobSchedulerStore extends Service {
File getDirectory(); File getDirectory();
void setDirectory(File directory); void setDirectory(File directory);

View File

@ -292,6 +292,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
if (this.scheduler == null) { if (this.scheduler == null) {
this.scheduler = store.getJobScheduler("JMS"); this.scheduler = store.getJobScheduler("JMS");
this.scheduler.addListener(this); this.scheduler.addListener(this);
this.scheduler.startDispatching();
} }
return this.scheduler; return this.scheduler;
} }

View File

@ -51,7 +51,7 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
private String name; private String name;
BTreeIndex<Long, List<JobLocation>> index; BTreeIndex<Long, List<JobLocation>> index;
private Thread thread; private Thread thread;
private final Object listenerLock = new Object(); private final AtomicBoolean started = new AtomicBoolean(false);
private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>(); private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
private static final IdGenerator ID_GENERATOR = new IdGenerator(); private static final IdGenerator ID_GENERATOR = new IdGenerator();
private final ScheduleTime scheduleTime = new ScheduleTime(); private final ScheduleTime scheduleTime = new ScheduleTime();
@ -82,9 +82,6 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
@Override @Override
public void addListener(JobListener l) { public void addListener(JobListener l) {
this.jobListeners.add(l); this.jobListeners.add(l);
synchronized (this.listenerLock) {
this.listenerLock.notify();
}
} }
/* /*
@ -480,19 +477,6 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
protected void mainLoop() { protected void mainLoop() {
while (this.running.get()) { while (this.running.get()) {
// Can't start pumping messages until a listener is added otherwise we'd discard messages
// without any warning.
synchronized (listenerLock) {
while (this.running.get() && this.jobListeners.isEmpty()) {
try {
LOG.debug("Scheduled Message dispatch paused while awaiting a Job Listener");
this.listenerLock.wait();
} catch (InterruptedException e) {
}
}
}
this.scheduleTime.clearNewJob(); this.scheduleTime.clearNewJob();
try { try {
// peek the next job // peek the next job
@ -584,24 +568,39 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
} }
@Override @Override
protected void doStart() throws Exception { public void startDispatching() throws Exception {
this.running.set(true); if (!this.running.get()) {
synchronized (this.listenerLock) { return;
this.listenerLock.notify();
} }
if (started.compareAndSet(false, true)) {
this.thread = new Thread(this, "JobScheduler:" + this.name); this.thread = new Thread(this, "JobScheduler:" + this.name);
this.thread.setDaemon(true); this.thread.setDaemon(true);
this.thread.start(); this.thread.start();
} }
}
@Override
public void stopDispatching() throws Exception {
if (started.compareAndSet(true, false)) {
this.scheduleTime.wakeup();
Thread t = this.thread;
this.thread = null;
if (t != null) {
t.join(1000);
}
}
}
@Override
protected void doStart() throws Exception {
this.running.set(true);
}
@Override @Override
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
this.running.set(false); this.running.set(false);
this.scheduleTime.wakeup(); stopDispatching();
Thread t = this.thread;
if (t != null) {
t.join(1000);
}
} }
long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat) throws MessageFormatException { long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat) throws MessageFormatException {