This commit is contained in:
Timothy Bish 2013-09-05 15:59:01 -04:00
parent edc1599018
commit d1446c3bca
5 changed files with 284 additions and 81 deletions

View File

@ -20,54 +20,60 @@ import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.broker.scheduler.JobSupport;
import org.apache.activemq.util.ByteSequence;
public class JobImpl implements Job {
private final JobLocation jobLocation;
private final byte[] payload;
protected JobImpl(JobLocation location,ByteSequence bs) {
this.jobLocation=location;
this.payload = new byte[bs.getLength()];
System.arraycopy(bs.getData(), bs.getOffset(), this.payload, 0, bs.getLength());
}
@Override
public String getJobId() {
return this.jobLocation.getJobId();
}
@Override
public byte[] getPayload() {
return this.payload;
}
@Override
public long getPeriod() {
return this.jobLocation.getPeriod();
}
@Override
public int getRepeat() {
return this.jobLocation.getRepeat();
}
@Override
public long getStart() {
return this.jobLocation.getStartTime();
}
@Override
public long getDelay() {
return this.jobLocation.getDelay();
}
@Override
public String getCronEntry() {
return this.jobLocation.getCronEntry();
}
@Override
public String getNextExecutionTime() {
return JobSupport.getDateTime(this.jobLocation.getNextTime());
}
@Override
public String getStartTime() {
return JobSupport.getDateTime(getStart());
}
}

View File

@ -27,7 +27,7 @@ import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
class JobLocation {
private String jobId;
private int repeat;
private long startTime;
@ -39,13 +39,12 @@ class JobLocation {
public JobLocation(Location location) {
this.location = location;
}
public JobLocation() {
this(new Location());
}
public void readExternal(DataInput in) throws IOException {
this.jobId = in.readUTF();
this.repeat = in.readInt();
@ -85,7 +84,6 @@ class JobLocation {
public void setJobId(String jobId) {
this.jobId = jobId;
}
/**
* @return the repeat
@ -116,7 +114,7 @@ class JobLocation {
public void setStartTime(long start) {
this.startTime = start;
}
/**
* @return the nextTime
*/
@ -145,7 +143,7 @@ class JobLocation {
public void setPeriod(long period) {
this.period = period;
}
/**
* @return the cronEntry
*/
@ -159,11 +157,14 @@ class JobLocation {
public synchronized void setCronEntry(String cronEntry) {
this.cronEntry = cronEntry;
}
/**
* @return if this JobLocation represents a cron entry.
*/
public boolean isCron() {
return getCronEntry() != null && getCronEntry().length() > 0;
}
/**
* @return the delay
*/
@ -184,15 +185,17 @@ class JobLocation {
public Location getLocation() {
return this.location;
}
@Override
public String toString() {
return "Job [id=" + jobId + ", startTime=" + new Date(startTime)
+ ", delay=" + delay + ", period=" + period + ", repeat="
+ repeat + ", nextTime=" + new Date(nextTime) + "]";
+ repeat + ", nextTime=" + new Date(nextTime) + "]";
}
static class JobLocationMarshaller extends VariableMarshaller<List<JobLocation>> {
static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller();
@Override
public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
List<JobLocation> result = new ArrayList<JobLocation>();
int size = dataIn.readInt();
@ -204,6 +207,7 @@ class JobLocation {
return result;
}
@Override
public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
dataOut.writeInt(value.size());
for (JobLocation jobLocation : value) {
@ -211,4 +215,78 @@ class JobLocation {
}
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((cronEntry == null) ? 0 : cronEntry.hashCode());
result = prime * result + (int) (delay ^ (delay >>> 32));
result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
result = prime * result + ((location == null) ? 0 : location.hashCode());
result = prime * result + (int) (nextTime ^ (nextTime >>> 32));
result = prime * result + (int) (period ^ (period >>> 32));
result = prime * result + repeat;
result = prime * result + (int) (startTime ^ (startTime >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
JobLocation other = (JobLocation) obj;
if (cronEntry == null) {
if (other.cronEntry != null) {
return false;
}
} else if (!cronEntry.equals(other.cronEntry)) {
return false;
}
if (delay != other.delay) {
return false;
}
if (jobId == null) {
if (other.jobId != null)
return false;
} else if (!jobId.equals(other.jobId)) {
return false;
}
if (location == null) {
if (other.location != null) {
return false;
}
} else if (!location.equals(other.location)) {
return false;
}
if (nextTime != other.nextTime) {
return false;
}
if (period != other.period) {
return false;
}
if (repeat != other.repeat) {
return false;
}
if (startTime != other.startTime) {
return false;
}
return true;
}
}

View File

@ -163,6 +163,15 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
});
}
synchronized void remove(final long time, final List<JobLocation> jobIds) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
remove(tx, time, jobIds);
}
});
}
/*
* (non-Javadoc)
*
@ -369,6 +378,34 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
return result;
}
private void remove(Transaction tx, long time, List<JobLocation> jobIds) throws IOException {
List<JobLocation> result = removeFromIndex(tx, time, jobIds);
if (result != null) {
for (JobLocation jl : result) {
this.store.decrementJournalCount(tx, jl.getLocation());
}
}
}
private List<JobLocation> removeFromIndex(Transaction tx, long time, List<JobLocation> Jobs) throws IOException {
List<JobLocation> result = null;
List<JobLocation> values = this.index.remove(tx, time);
if (values != null) {
result = new ArrayList<JobLocation>(values.size());
for (JobLocation job : Jobs) {
if (values.remove(job)) {
result.add(job);
}
}
if (!values.isEmpty()) {
this.index.put(tx, time, values);
}
}
return result;
}
void remove(Transaction tx, long time) throws IOException {
List<JobLocation> values = this.index.remove(tx, time);
if (values != null) {
@ -482,79 +519,84 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
// peek the next job
long currentTime = System.currentTimeMillis();
// Reads the list of the next entries and removes them from the store in one atomic step.
// Prevents race conditions on short delays, when storeJob() tries to append new items to the
// existing list during this read operation (see AMQ-3141).
synchronized (this) {
Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
if (first != null) {
List<JobLocation> list = new ArrayList<JobLocation>(first.getValue());
final long executionTime = first.getKey();
long nextExecutionTime = 0;
if (executionTime <= currentTime) {
for (final JobLocation job : list) {
int repeat = job.getRepeat();
nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat);
long waitTime = nextExecutionTime - currentTime;
this.scheduleTime.setWaitTime(waitTime);
if (job.isCron() == false) {
// Read the list of scheduled events and fire the jobs. Once done with the batch
// remove all that were fired, and reschedule as needed.
Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
if (first != null) {
List<JobLocation> list = new ArrayList<JobLocation>(first.getValue());
List<JobLocation> fired = new ArrayList<JobLocation>(list.size());
final long executionTime = first.getKey();
long nextExecutionTime = 0;
if (executionTime <= currentTime) {
for (final JobLocation job : list) {
int repeat = job.getRepeat();
nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat);
long waitTime = nextExecutionTime - currentTime;
this.scheduleTime.setWaitTime(waitTime);
if (job.isCron() == false) {
fireJob(job);
if (repeat != 0) {
repeat--;
job.setRepeat(repeat);
// remove this job from the index so it doesn't get destroyed
removeFromIndex(executionTime, job.getJobId());
// and re-store it
storeJob(job, nextExecutionTime);
} else {
fired.add(job);
}
} else {
// cron job will have a repeat time.
if (repeat == 0) {
// we haven't got a separate scheduler to execute at
// this time - just a cron job - so fire it
fireJob(job);
}
if (nextExecutionTime > currentTime) {
// we will run again ...
// remove this job from the index - so it doesn't get destroyed
removeFromIndex(executionTime, job.getJobId());
// and re-store it
storeJob(job, nextExecutionTime);
if (repeat != 0) {
repeat--;
job.setRepeat(repeat);
// remove this job from the index so it doesn't get destroyed
removeFromIndex(executionTime, job.getJobId());
// and re-store it
storeJob(job, nextExecutionTime);
// we have a separate schedule to run at this time
// so the cron job is used to set of a separate schedule
// hence we won't fire the original cron job to the
// listeners but we do need to start a separate schedule
String jobId = ID_GENERATOR.generateId();
ByteSequence payload = getPayload(job.getLocation());
schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
waitTime = job.getDelay() != 0 ? job.getDelay() : job.getPeriod();
this.scheduleTime.setWaitTime(waitTime);
}
} else {
// cron job
if (repeat == 0) {
// we haven't got a separate scheduler to execute at
// this time - just a cron job - so fire it
fireJob(job);
}
if (nextExecutionTime > currentTime) {
// we will run again ...
// remove this job from the index - so it doesn't get destroyed
removeFromIndex(executionTime, job.getJobId());
// and re-store it
storeJob(job, nextExecutionTime);
if (repeat != 0) {
// we have a separate schedule to run at this time
// so the cron job is used to set of a separate schedule
// hence we won't fire the original cron job to the
// listeners but we do need to start a separate schedule
String jobId = ID_GENERATOR.generateId();
ByteSequence payload = getPayload(job.getLocation());
schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
waitTime = job.getDelay() != 0 ? job.getDelay() : job.getPeriod();
this.scheduleTime.setWaitTime(waitTime);
}
}
fired.add(job);
}
}
// now remove all jobs that have not been
// rescheduled from this execution time
remove(executionTime);
// If there is a job that should fire before the currently set wait time
// we need to reset wait time otherwise we'll miss it.
Map.Entry<Long, List<JobLocation>> nextUp = getNextToSchedule();
if (nextUp != null) {
final long timeUntilNextScheduled = nextUp.getKey() - currentTime;
if (timeUntilNextScheduled < this.scheduleTime.getWaitTime()) {
this.scheduleTime.setWaitTime(timeUntilNextScheduled);
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Not yet time to execute the job, waiting " + (executionTime - currentTime) + " ms");
}
this.scheduleTime.setWaitTime(executionTime - currentTime);
}
// now remove all jobs that have not been rescheduled from this execution
// time, if there are no more entries in that time it will be removed.
remove(executionTime, fired);
// If there is a job that should fire before the currently set wait time
// we need to reset wait time otherwise we'll miss it.
Map.Entry<Long, List<JobLocation>> nextUp = getNextToSchedule();
if (nextUp != null) {
final long timeUntilNextScheduled = nextUp.getKey() - currentTime;
if (timeUntilNextScheduled < this.scheduleTime.getWaitTime()) {
this.scheduleTime.setWaitTime(timeUntilNextScheduled);
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Not yet time to execute the job, waiting " + (executionTime - currentTime) + " ms");
}
this.scheduleTime.setWaitTime(executionTime - currentTime);
}
}
this.scheduleTime.pause();
} catch (Exception ioe) {
LOG.error(this.name + " Failed to schedule job", ioe);

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler;
import java.io.File;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ProducerThread;
public class JobSchedulerBrokerShutdownTest extends EmbeddedBrokerTestSupport {
@Override
protected BrokerService createBroker() throws Exception {
File schedulerDirectory = new File("target/scheduler");
IOHelper.mkdirs(schedulerDirectory);
IOHelper.deleteChildren(schedulerDirectory);
BrokerService broker = super.createBroker();
broker.setSchedulerSupport(true);
broker.setSchedulerDirectoryFile(schedulerDirectory);
broker.getSystemUsage().getStoreUsage().setLimit(1 * 512);
broker.deleteAllMessages();
return broker;
}
@Override
protected boolean isPersistent() {
return true;
}
public void testSchedule() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
final long time = 1000;
ProducerThread producer = new ProducerThread(session, destination) {
@Override
protected Message createMessage(int i) throws Exception {
Message message = super.createMessage(i);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
return message;
}
};
producer.setMessageCount(200);
producer.setDaemon(true);
producer.start();
Thread.sleep(5000);
}
}

View File

@ -134,7 +134,7 @@ public class AMQ3140Test {
}
// wait until all scheduled messages has been received
TimeUnit.SECONDS.sleep(10);
TimeUnit.MINUTES.sleep(2);
session.close();
connection.close();