git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1516561 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-08-22 19:45:22 +00:00
parent 92860ae622
commit 85bb229aa7
3 changed files with 237 additions and 51 deletions

View File

@ -32,17 +32,17 @@ import org.apache.activemq.broker.scheduler.CronParser;
import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.broker.scheduler.JobListener;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler {
private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class);
@ -51,12 +51,12 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
private String name;
BTreeIndex<Long, List<JobLocation>> index;
private Thread thread;
private final Object listenerLock = new Object();
private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
private static final IdGenerator ID_GENERATOR = new IdGenerator();
private final ScheduleTime scheduleTime = new ScheduleTime();
JobSchedulerImpl(JobSchedulerStoreImpl store) {
this.store = store;
}
@ -66,65 +66,77 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
/*
* (non-Javadoc)
*
* @see org.apache.activemq.beanstalk.JobScheduler#getName()
*/
@Override
public String getName() {
return this.name;
}
/*
* (non-Javadoc)
* @see
* org.apache.activemq.beanstalk.JobScheduler#addListener(org.apache.activemq
* .beanstalk.JobListener)
*
* @see org.apache.activemq.beanstalk.JobScheduler#addListener(org.apache.activemq .beanstalk.JobListener)
*/
@Override
public void addListener(JobListener l) {
this.jobListeners.add(l);
synchronized (this.listenerLock) {
this.listenerLock.notify();
}
}
/*
* (non-Javadoc)
* @see
* org.apache.activemq.beanstalk.JobScheduler#removeListener(org.apache.
* activemq.beanstalk.JobListener)
*
* @see org.apache.activemq.beanstalk.JobScheduler#removeListener(org.apache. activemq.beanstalk.JobListener)
*/
@Override
public void removeListener(JobListener l) {
this.jobListeners.remove(l);
}
@Override
public synchronized void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
schedule(tx, jobId, payload, "", 0, delay, 0);
}
});
}
@Override
public synchronized void schedule(final String jobId, final ByteSequence payload, final String cronEntry) throws Exception {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
schedule(tx, jobId, payload, cronEntry, 0, 0, 0);
}
});
}
public synchronized void schedule(final String jobId, final ByteSequence payload, final String cronEntry, final long delay,
final long period, final int repeat) throws IOException {
@Override
public synchronized void schedule(final String jobId, final ByteSequence payload, final String cronEntry, final long delay, final long period,
final int repeat) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
schedule(tx, jobId, payload, cronEntry, delay, period, repeat);
}
});
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.beanstalk.JobScheduler#remove(long)
*/
@Override
public synchronized void remove(final long time) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
remove(tx, time);
}
@ -133,6 +145,7 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
synchronized void removeFromIndex(final long time, final String jobId) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
removeFromIndex(tx, time, jobId);
}
@ -141,11 +154,12 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
/*
* (non-Javadoc)
* @see org.apache.activemq.beanstalk.JobScheduler#remove(long,
* java.lang.String)
*
* @see org.apache.activemq.beanstalk.JobScheduler#remove(long, java.lang.String)
*/
public synchronized void remove(final long time, final String jobId) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
remove(tx, time, jobId);
}
@ -154,16 +168,20 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
/*
* (non-Javadoc)
*
* @see org.apache.activemq.beanstalk.JobScheduler#remove(java.lang.String)
*/
@Override
public synchronized void remove(final String jobId) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
remove(tx, jobId);
}
});
}
@Override
public synchronized long getNextScheduleTime() throws IOException {
Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
return first != null ? first.getKey() : -1l;
@ -171,12 +189,15 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
/*
* (non-Javadoc)
*
* @see org.apache.activemq.beanstalk.JobScheduler#getNextScheduleJobs()
*/
@Override
public synchronized List<Job> getNextScheduleJobs() throws IOException {
final List<Job> result = new ArrayList<Job>();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
Map.Entry<Long, List<JobLocation>> first = index.getFirst(store.getPageFile().tx());
if (first != null) {
@ -191,9 +212,11 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
return result;
}
@Override
public synchronized List<Job> getAllJobs() throws IOException {
final List<Job> result = new ArrayList<Job>();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx());
while (iter.hasNext()) {
@ -208,15 +231,16 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
break;
}
}
}
});
return result;
}
@Override
public synchronized List<Job> getAllJobs(final long start, final long finish) throws IOException {
final List<Job> result = new ArrayList<Job>();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx(), start);
while (iter.hasNext()) {
@ -231,35 +255,36 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
break;
}
}
}
});
return result;
}
@Override
public synchronized void removeAllJobs() throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
destroy(tx);
}
});
}
@Override
public synchronized void removeAllJobs(final long start, final long finish) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
destroy(tx, start, finish);
}
});
}
ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
return this.store.getPayload(location);
}
void schedule(Transaction tx, String jobId, ByteSequence payload, String cronEntry, long delay, long period,
int repeat) throws IOException {
void schedule(Transaction tx, String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws IOException {
long startTime = System.currentTimeMillis();
// round startTime - so we can schedule more jobs
// at the same time
@ -301,6 +326,7 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
synchronized void storeJob(final JobLocation jobLocation, final long nextExecutionTime) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
storeJob(tx, jobLocation, nextExecutionTime);
}
@ -318,7 +344,6 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
}
values.add(jobLocation);
this.index.put(tx, nextExecutionTime, values);
}
void remove(Transaction tx, long time, String jobId) throws IOException {
@ -383,6 +408,7 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
}
}
}
for (Long l : keys) {
this.index.remove(tx, l);
}
@ -404,6 +430,7 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
break;
}
}
for (Long l : keys) {
this.index.remove(tx, l);
}
@ -415,7 +442,6 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
return first;
}
return null;
}
void fireJob(JobLocation job) throws IllegalStateException, IOException {
@ -428,6 +454,7 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
}
}
@Override
public void run() {
try {
mainLoop();
@ -453,6 +480,19 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
protected void mainLoop() {
while (this.running.get()) {
// Can't start pumping messages until a listener is added otherwise we'd discard messages
// without any warning.
synchronized (listenerLock) {
while (this.running.get() && this.jobListeners.isEmpty()) {
try {
LOG.debug("Scheduled Message dispatch paused while awaiting a Job Listener");
this.listenerLock.wait();
} catch (InterruptedException e) {
}
}
}
this.scheduleTime.clearNewJob();
try {
// peek the next job
@ -478,8 +518,7 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
if (repeat != 0) {
repeat--;
job.setRepeat(repeat);
// remove this job from the index - so it
// doesn't get destroyed
// remove this job from the index so it doesn't get destroyed
removeFromIndex(executionTime, job.getJobId());
// and re-store it
storeJob(job, nextExecutionTime);
@ -487,28 +526,21 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
} else {
// cron job
if (repeat == 0) {
// we haven't got a separate scheduler to
// execute at
// we haven't got a separate scheduler to execute at
// this time - just a cron job - so fire it
fireJob(job);
//this.scheduleTime.setWaitTime(this.scheduleTime.DEFAULT_WAIT);
}
if (nextExecutionTime > currentTime) {
// we will run again ...
// remove this job from the index - so it
// doesn't get destroyed
// remove this job from the index - so it doesn't get destroyed
removeFromIndex(executionTime, job.getJobId());
// and re-store it
storeJob(job, nextExecutionTime);
if (repeat != 0) {
// we have a separate schedule to run at
// this time
// so the cron job is used to set of a
// seperate scheule
// hence we won't fire the original cron
// job to the listeners
// but we do need to start a separate
// schedule
// we have a separate schedule to run at this time
// so the cron job is used to set of a separate schedule
// hence we won't fire the original cron job to the
// listeners but we do need to start a separate schedule
String jobId = ID_GENERATOR.generateId();
ByteSequence payload = getPayload(job.getLocation());
schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
@ -526,12 +558,11 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
// we need to reset wait time otherwise we'll miss it.
Map.Entry<Long, List<JobLocation>> nextUp = getNextToSchedule();
if (nextUp != null) {
final long timeUntilNextScheduled = nextUp.getKey() - currentTime;
if (timeUntilNextScheduled < this.scheduleTime.getWaitTime()) {
this.scheduleTime.setWaitTime(timeUntilNextScheduled);
}
final long timeUntilNextScheduled = nextUp.getKey() - currentTime;
if (timeUntilNextScheduled < this.scheduleTime.getWaitTime()) {
this.scheduleTime.setWaitTime(timeUntilNextScheduled);
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Not yet time to execute the job, waiting " + (executionTime - currentTime) + " ms");
@ -541,7 +572,6 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
}
}
this.scheduleTime.pause();
} catch (Exception ioe) {
LOG.error(this.name + " Failed to schedule job", ioe);
try {
@ -556,10 +586,12 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
@Override
protected void doStart() throws Exception {
this.running.set(true);
synchronized (this.listenerLock) {
this.listenerLock.notify();
}
this.thread = new Thread(this, "JobScheduler:" + this.name);
this.thread.setDaemon(true);
this.thread.start();
}
@Override
@ -570,7 +602,6 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
if (t != null) {
t.join(1000);
}
}
long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat) throws MessageFormatException {
@ -608,6 +639,8 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
static class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
static ValueMarshaller INSTANCE = new ValueMarshaller();
@Override
public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
List<JobLocation> result = new ArrayList<JobLocation>();
int size = dataIn.readInt();
@ -619,6 +652,7 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
return result;
}
@Override
public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
dataOut.writeInt(value.size());
for (JobLocation jobLocation : value) {
@ -640,6 +674,7 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
long getWaitTime() {
return this.waitTime;
}
/**
* @param waitTime
* the waitTime to set
@ -674,6 +709,5 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
mutex.notifyAll();
}
}
}
}

View File

@ -65,7 +65,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
private boolean enableIndexWriteAsync = false;
// private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
MetaData metaData = new MetaData(this);
final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();

View File

@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
import org.apache.log4j.BasicConfigurator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class LostScheduledMessagesTest {
private BrokerService broker;
private static final File schedulerDirectory = new File("target/test/ScheduledDB");
private static final File messageDirectory = new File("target/test/MessageDB");
private static final String QUEUE_NAME = "test";
@Before
public void setup() throws Exception {
IOHelper.mkdirs(schedulerDirectory);
IOHelper.deleteChildren(schedulerDirectory);
IOHelper.mkdirs(messageDirectory);
IOHelper.deleteChildren(messageDirectory);
}
private void startBroker() throws Exception {
broker = new BrokerService();
broker.setSchedulerSupport(true);
broker.setPersistent(true);
broker.setDeleteAllMessagesOnStartup(false);
broker.setDataDirectory("target");
broker.setSchedulerDirectoryFile(schedulerDirectory);
broker.setDataDirectoryFile(messageDirectory);
broker.setUseJmx(false);
broker.addConnector("vm://localhost");
broker.start();
}
@After
public void tearDown() throws Exception {
broker.stop();
BasicConfigurator.resetConfiguration();
}
@Test
public void MessagePassedNotUsingScheduling() throws Exception {
doTest(false);
}
@Test
public void MessageLostWhenUsingScheduling() throws Exception {
doTest(true);
}
private void doTest(boolean useScheduling) throws Exception {
int DELIVERY_DELAY_MS = 5000;
startBroker();
long startTime = System.currentTimeMillis();
// Send a message scheduled for delivery in 5 seconds
ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
Message message = session.createTextMessage("test");
if (useScheduling) {
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELIVERY_DELAY_MS);
}
producer.send(message);
session.close();
connection.close();
broker.getServices();
// shut down broker
broker.stop();
broker.waitUntilStopped();
// Make sure that broker have stopped within delivery delay
long shutdownTime = System.currentTimeMillis();
assertTrue("Failed to shut down broker in expected time. Test results inconclusive", shutdownTime - startTime < DELIVERY_DELAY_MS);
// make sure that delivery falls into down time window
TimeUnit.MILLISECONDS.sleep(DELIVERY_DELAY_MS);
// Start new broker instance
startBroker();
final AtomicLong receiveCounter = new AtomicLong();
cf = new ActiveMQConnectionFactory("vm://localhost");
connection = cf.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
receiveCounter.incrementAndGet();
}
});
// Wait for a while to let MQ process the message
TimeUnit.MILLISECONDS.sleep(DELIVERY_DELAY_MS * 2);
session.close();
connection.close();
assertEquals(1, receiveCounter.get());
}
}