git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@899633 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-01-15 14:09:30 +00:00
parent d4794e2420
commit 7ae2055c59
13 changed files with 1683 additions and 19 deletions

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
public interface ScheduledMessage {
/**
* The time in milliseconds that a message will be scheduled to be delivered by the broker
*/
public static final String AMQ_SCHEDULED_START = "AMQ_SCHEDULED_START_TIME";
/**
* The time in milliseconds to wait after the start time to wait before scheduling the message again
*/
public static final String AMQ_SCHEDULED_PERIOD = "AMQ_SCHEDULED_PERIOD";
/**
* The number of times to repeat scheduling a message for delivery
*/
public static final String AMQ_SCHEDULED_REPEAT = "AMQ_SCHEDULED_REPEAT";
}

View File

@ -31,10 +31,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.Service;
@ -63,6 +61,7 @@ import org.apache.activemq.broker.region.virtual.MirroredQueue;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.broker.scheduler.SchedulerBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.kaha.Store;
@ -134,11 +133,11 @@ public class BrokerService implements Service {
private PersistenceAdapterFactory persistenceFactory;
protected DestinationFactory destinationFactory;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
private List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
private List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
private List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
private List<Service> services = new ArrayList<Service>();
private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
private final List<Service> services = new ArrayList<Service>();
private MasterConnector masterConnector;
private String masterConnectorURI;
private transient Thread shutdownHook;
@ -151,8 +150,8 @@ public class BrokerService implements Service {
private boolean advisorySupport = true;
private URI vmConnectorURI;
private PolicyMap destinationPolicy;
private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean stopped = new AtomicBoolean(false);
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);
private BrokerPlugin[] plugins;
private boolean keepDurableSubsActive = true;
private boolean useVirtualTopics = true;
@ -164,8 +163,8 @@ public class BrokerService implements Service {
private Store tempDataStore;
private int persistenceThreadPriority = Thread.MAX_PRIORITY;
private boolean useLocalHostBrokerName;
private CountDownLatch stoppedLatch = new CountDownLatch(1);
private CountDownLatch startedLatch = new CountDownLatch(1);
private final CountDownLatch stoppedLatch = new CountDownLatch(1);
private final CountDownLatch startedLatch = new CountDownLatch(1);
private boolean supportFailOver;
private Broker regionBroker;
private int producerSystemUsagePortion = 60;
@ -176,12 +175,14 @@ public class BrokerService implements Service {
private boolean dedicatedTaskRunner;
private boolean cacheTempDestinations = false;// useful for failover
private int timeBeforePurgeTempDestinations = 5000;
private List<Runnable> shutdownHooks = new ArrayList<Runnable>();
private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
private boolean systemExitOnShutdown;
private int systemExitOnShutdownExitCode;
private SslContext sslContext;
private boolean forceStart = false;
private IOExceptionHandler ioExceptionHandler;
private boolean schedulerSupport = true;
private File schedulerDirectoryFile;
static {
String localHostName = "localhost";
@ -512,6 +513,7 @@ public class BrokerService implements Service {
if (systemExitOnShutdown) {
new Thread() {
@Override
public void run() {
System.exit(systemExitOnShutdownExitCode);
}
@ -1064,7 +1066,7 @@ public class BrokerService implements Service {
}
public Service[] getServices() {
return (Service[]) services.toArray(new Service[0]);
return services.toArray(new Service[0]);
}
/**
@ -1675,15 +1677,18 @@ public class BrokerService implements Service {
broker = new MutableBrokerFilter(broker) {
Broker old;
@Override
public void stop() throws Exception {
old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
// Just ignore additional stop actions.
@Override
public void stop() throws Exception {
}
});
old.stop();
}
@Override
public void start() throws Exception {
if (forceStart && old != null) {
this.next.set(old);
@ -1757,6 +1762,9 @@ public class BrokerService implements Service {
*/
protected Broker addInterceptors(Broker broker) throws Exception {
broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
if (isSchedulerSupport()) {
broker = new SchedulerBroker(broker,getSchedulerDirectoryFile());
}
if (isAdvisorySupport()) {
broker = new AdvisoryBroker(broker);
}
@ -1821,6 +1829,7 @@ public class BrokerService implements Service {
protected void addShutdownHook() {
if (useShutdownHook) {
shutdownHook = new Thread("ActiveMQ ShutdownHook") {
@Override
public void run() {
containerShutdown();
}
@ -2142,5 +2151,40 @@ public class BrokerService implements Service {
this.ioExceptionHandler = ioExceptionHandler;
}
/**
* @return the schedulerSupport
*/
public boolean isSchedulerSupport() {
return this.schedulerSupport;
}
/**
* @param schedulerSupport the schedulerSupport to set
*/
public void setSchedulerSupport(boolean schedulerSupport) {
this.schedulerSupport = schedulerSupport;
}
/**
* @return the schedulerDirectory
*/
public File getSchedulerDirectoryFile() {
if (this.schedulerDirectoryFile == null) {
this.schedulerDirectoryFile = new File(IOHelper.getDefaultDataDirectory(),"scheduler");
}
return schedulerDirectoryFile;
}
/**
* @param schedulerDirectory the schedulerDirectory to set
*/
public void setSchedulerDirectoryFile(File schedulerDirectory) {
this.schedulerDirectoryFile = schedulerDirectory;
}
public void setSchedulerDirectory(String schedulerDirectory) {
setSchedulerDirectoryFile(new File(schedulerDirectory));
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler;
import org.apache.kahadb.util.ByteSequence;
public interface JobListener {
/**
* A Job that has been scheduled is now ready
* @param id
* @param job
*/
public void scheduledJob(String id,ByteSequence job);
}

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.util.VariableMarshaller;
class JobLocation {
private String jobId;
private int repeat;
private long start;
private long period;
private final Location location;
public JobLocation(Location location) {
this.location = location;
}
public JobLocation() {
this(new Location());
}
public void readExternal(DataInput in) throws IOException {
this.jobId = in.readUTF();
this.repeat = in.readInt();
this.start = in.readLong();
this.period = in.readLong();
this.location.readExternal(in);
}
public void writeExternal(DataOutput out) throws IOException {
out.writeUTF(this.jobId);
out.writeInt(repeat);
out.writeLong(start);
out.writeLong(period);
this.location.writeExternal(out);
}
/**
* @return the jobId
*/
public String getJobId() {
return this.jobId;
}
/**
* @param jobId
* the jobId to set
*/
public void setJobId(String jobId) {
this.jobId = jobId;
}
/**
* @return the repeat
*/
public int getRepeat() {
return this.repeat;
}
/**
* @param repeat
* the repeat to set
*/
public void setRepeat(int repeat) {
this.repeat = repeat;
}
/**
* @return the start
*/
public long getStart() {
return this.start;
}
/**
* @param start
* the start to set
*/
public void setStart(long start) {
this.start = start;
}
/**
* @return the period
*/
public long getPeriod() {
return this.period;
}
/**
* @param period
* the period to set
*/
public void setPeriod(long period) {
this.period = period;
}
/**
* @return the location
*/
public Location getLocation() {
return this.location;
}
static class JobLocationMarshaller extends VariableMarshaller<List<JobLocation>> {
static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller();
public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
List<JobLocation> result = new ArrayList<JobLocation>();
int size = dataIn.readInt();
for (int i = 0; i < size; i++) {
JobLocation jobLocation = new JobLocation();
jobLocation.readExternal(dataIn);
result.add(jobLocation);
}
return result;
}
public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
dataOut.writeInt(value.size());
for (JobLocation jobLocation : value) {
jobLocation.writeExternal(dataOut);
}
}
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler;
import java.io.IOException;
import java.util.List;
import org.apache.kahadb.util.ByteSequence;
interface JobScheduler {
/**
* @return the name of the scheduler
*/
public abstract String getName();
/**
* Add a Job listener
* @param l
*/
public abstract void addListener(JobListener l);
/**
* remove a JobListener
* @param l
*/
public abstract void removeListener(JobListener l);
/**
* Add a job to be scheduled
* @param jobId a unique identifier for the job
* @param payload the message to be sent when the job is scheduled
* @param delay the time in milliseconds before the job will be run
* @throws IOException
*/
public abstract void schedule(String jobId, ByteSequence payload,long delay) throws IOException;
/**
* Add a job to be scheduled
* @param jobId a unique identifier for the job
* @param payload the message to be sent when the job is scheduled
* @param start
* @param period the time in milliseconds between successive executions of the Job
* @param repeat the number of times to execute the job - less than 0 will be repeated forever
* @throws IOException
*/
public abstract void schedule(String jobId, ByteSequence payload,long start, long period, int repeat) throws IOException;
/**
* remove all jobs scheduled to run at this time
* @param time
* @throws IOException
*/
public abstract void remove(long time) throws IOException;
/**
* remove a job with the matching jobId
* @param jobId
* @throws IOException
*/
public abstract void remove(String jobId) throws IOException;
/**
* Get all the jobs scheduled to run next
* @return a list of messages that will be scheduled next
* @throws IOException
*/
public abstract List<ByteSequence> getNextScheduleJobs() throws IOException;
/**
* Get the next time jobs will be fired
* @return the time in milliseconds
* @throws IOException
*/
public abstract long getNextScheduleTime() throws IOException;
}

View File

@ -0,0 +1,414 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.LongMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler {
private static final Log LOG = LogFactory.getLog(JobSchedulerImpl.class);
final JobSchedulerStore store;
private final AtomicBoolean running = new AtomicBoolean();
private String name;
BTreeIndex<Long, List<JobLocation>> index;
private Thread thread;
private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
JobSchedulerImpl(JobSchedulerStore store) {
this.store = store;
}
public void setName(String name) {
this.name = name;
}
/*
* (non-Javadoc)
* @see org.apache.activemq.beanstalk.JobScheduler#getName()
*/
public String getName() {
return this.name;
}
/*
* (non-Javadoc)
* @see
* org.apache.activemq.beanstalk.JobScheduler#addListener(org.apache.activemq
* .beanstalk.JobListener)
*/
public void addListener(JobListener l) {
this.jobListeners.add(l);
}
/*
* (non-Javadoc)
* @see
* org.apache.activemq.beanstalk.JobScheduler#removeListener(org.apache.
* activemq.beanstalk.JobListener)
*/
public void removeListener(JobListener l) {
this.jobListeners.remove(l);
}
public void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
schedule(tx, jobId, payload, 0, delay, 0);
}
});
}
public void schedule(final String jobId, final ByteSequence payload, final long start, final long period, final int repeat) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
schedule(tx, jobId, payload, start, period, repeat);
}
});
}
/*
* (non-Javadoc)
* @see org.apache.activemq.beanstalk.JobScheduler#remove(long)
*/
public synchronized void remove(final long time) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
remove(tx, time);
}
});
}
/*
* (non-Javadoc)
* @see org.apache.activemq.beanstalk.JobScheduler#remove(long,
* java.lang.String)
*/
public synchronized void remove(final long time, final String jobId) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
remove(tx, time, jobId);
}
});
}
/*
* (non-Javadoc)
* @see org.apache.activemq.beanstalk.JobScheduler#remove(java.lang.String)
*/
public synchronized void remove(final String jobId) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
remove(tx, jobId);
}
});
}
public synchronized long getNextScheduleTime() throws IOException {
Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
return first != null ? first.getKey() : -1l;
}
/*
* (non-Javadoc)
* @see org.apache.activemq.beanstalk.JobScheduler#getNextScheduleJobs()
*/
public synchronized List<ByteSequence> getNextScheduleJobs() throws IOException {
final List<ByteSequence> result = new ArrayList<ByteSequence>();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
Map.Entry<Long, List<JobLocation>> first = index.getFirst(store.getPageFile().tx());
if (first != null) {
for (JobLocation jl : first.getValue()) {
ByteSequence bs = getJob(jl.getLocation());
result.add(bs);
}
}
}
});
return result;
}
ByteSequence getJob(Location location) throws IllegalStateException, IOException {
return this.store.getJob(location);
}
void schedule(Transaction tx, String jobId, ByteSequence payload,long start, long period, int repeat)
throws IOException {
List<JobLocation> values = null;
long startTime;
long time;
if (start > 0) {
time = startTime = start;
}else {
startTime = System.currentTimeMillis();
time = startTime + period;
}
if (this.index.containsKey(tx, time)) {
values = this.index.remove(tx, time);
}
if (values == null) {
values = new ArrayList<JobLocation>();
}
Location location = this.store.write(payload, false);
JobLocation jobLocation = new JobLocation(location);
jobLocation.setJobId(jobId);
jobLocation.setPeriod(period);
jobLocation.setRepeat(repeat);
values.add(jobLocation);
this.index.put(tx, time, values);
this.store.incrementJournalCount(tx, location);
poke();
}
void remove(Transaction tx, long time, String jobId) throws IOException {
List<JobLocation> values = this.index.remove(tx, time);
if (values != null) {
for (int i = 0; i < values.size(); i++) {
JobLocation jl = values.get(i);
if (jl.getJobId().equals(jobId)) {
values.remove(i);
if (!values.isEmpty()) {
this.index.put(tx, time, values);
}
this.store.decrementJournalCount(tx, jl.getLocation());
break;
}
}
}
}
void remove(Transaction tx, long time) throws IOException {
List<JobLocation> values = this.index.remove(tx, time);
if (values != null) {
for (JobLocation jl : values) {
this.store.decrementJournalCount(tx, jl.getLocation());
}
}
}
void remove(Transaction tx, String id) throws IOException {
for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
Map.Entry<Long, List<JobLocation>> entry = i.next();
List<JobLocation> values = entry.getValue();
if (values != null) {
for (JobLocation jl : values) {
if (jl.getJobId().equals(id)) {
remove(tx, entry.getKey(), id);
return;
}
}
}
}
}
synchronized void destroy(Transaction tx) throws IOException {
for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
Map.Entry<Long, List<JobLocation>> entry = i.next();
List<JobLocation> values = entry.getValue();
if (values != null) {
for (JobLocation jl : values) {
this.store.decrementJournalCount(tx, jl.getLocation());
}
}
}
}
synchronized Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException {
if (!this.store.isStopped() && !this.store.isStopping()) {
Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
return first;
}
return null;
}
void fireJobs(List<JobLocation> list) throws IllegalStateException, IOException {
for (JobLocation jl : list) {
ByteSequence bs = this.store.getJob(jl.getLocation());
for (JobListener l : jobListeners) {
l.scheduledJob(jl.getJobId(), bs);
}
}
}
public void run() {
try {
mainLoop();
} catch (Throwable e) {
if (this.running.get() && isStarted()) {
LOG.error(this + " Caught exception in mainloop", e);
}
} finally {
if (running.get()) {
try {
stop();
} catch (Exception e) {
LOG.error("Failed to stop " + this);
}
}
}
}
@Override
public String toString() {
return "JobScheduler:" + this.name;
}
protected void mainLoop() {
while (this.running.get()) {
try {
// peek the next job
long currentTime = System.currentTimeMillis();
Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
if (first != null) {
List<JobLocation> list = new ArrayList<JobLocation>(first.getValue());
long executionTime = first.getKey();
if (executionTime <= currentTime) {
fireJobs(list);
for (JobLocation jl : list) {
int repeat = jl.getRepeat();
if (repeat != 0) {
repeat--;
ByteSequence payload = this.store.getJob(jl.getLocation());
String jobId = jl.getJobId();
long period = jl.getPeriod();
schedule(jobId, payload,0, period, repeat);
}
}
// now remove jobs from this execution time
remove(executionTime);
} else {
long waitTime = executionTime - currentTime;
synchronized (this.running) {
this.running.wait(waitTime);
}
}
} else {
synchronized (this.running) {
this.running.wait(250);
}
}
} catch (InterruptedException e) {
} catch (IOException ioe) {
LOG.error(this.name + " Failed to schedule job", ioe);
try {
this.store.stop();
} catch (Exception e) {
LOG.error(this.name + " Failed to shutdown JobSchedulerStore", e);
}
}
}
}
@Override
protected void doStart() throws Exception {
this.running.set(true);
this.thread = new Thread(this, "JobScheduler:" + this.name);
this.thread.setDaemon(true);
this.thread.start();
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
this.running.set(false);
poke();
Thread t = this.thread;
if (t != null) {
t.join(1000);
}
}
protected void poke() {
synchronized (this.running) {
this.running.notifyAll();
}
}
void createIndexes(Transaction tx) throws IOException {
this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), tx.allocate().getPageId());
}
void load(Transaction tx) throws IOException {
this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
this.index.setValueMarshaller(ValueMarshaller.INSTANCE);
this.index.load(tx);
}
void read(DataInput in) throws IOException {
this.name = in.readUTF();
this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), in.readLong());
this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
this.index.setValueMarshaller(ValueMarshaller.INSTANCE);
}
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeLong(this.index.getPageId());
}
static class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
static ValueMarshaller INSTANCE = new ValueMarshaller();
public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
List<JobLocation> result = new ArrayList<JobLocation>();
int size = dataIn.readInt();
for (int i = 0; i < size; i++) {
JobLocation jobLocation = new JobLocation();
jobLocation.readExternal(dataIn);
result.add(jobLocation);
}
return result;
}
public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
dataOut.writeInt(value.size());
for (JobLocation jobLocation : value) {
jobLocation.writeExternal(dataOut);
}
}
}
}

View File

@ -0,0 +1,378 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.IntegerMarshaller;
import org.apache.kahadb.util.LockFile;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
public class JobSchedulerStore extends ServiceSupport {
static final Log LOG = LogFactory.getLog(JobSchedulerStore.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
public static final int CLOSED_STATE = 1;
public static final int OPEN_STATE = 2;
private File directory;
PageFile pageFile;
private Journal journal;
private LockFile lockFile;
private boolean failIfDatabaseIsLocked;
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
private boolean enableIndexWriteAsync = false;
// private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
MetaData metaData = new MetaData(this);
final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
protected class MetaData {
protected MetaData(JobSchedulerStore store) {
this.store = store;
}
private final JobSchedulerStore store;
Page<MetaData> page;
BTreeIndex<Integer, Integer> journalRC;
BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
void createIndexes(Transaction tx) throws IOException {
this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
}
void load(Transaction tx) throws IOException {
this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
this.storedSchedulers.load(tx);
this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
this.journalRC.load(tx);
}
void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
Entry<String, JobSchedulerImpl> entry = i.next();
entry.getValue().load(tx);
schedulers.put(entry.getKey(), entry.getValue());
}
}
public void read(DataInput is) throws IOException {
this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
}
public void write(DataOutput os) throws IOException {
os.writeLong(this.storedSchedulers.getPageId());
os.writeLong(this.journalRC.getPageId());
}
}
class MetaDataMarshaller extends VariableMarshaller<MetaData> {
private final JobSchedulerStore store;
MetaDataMarshaller(JobSchedulerStore store) {
this.store = store;
}
public MetaData readPayload(DataInput dataIn) throws IOException {
MetaData rc = new MetaData(this.store);
rc.read(dataIn);
return rc;
}
public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
object.write(dataOut);
}
}
class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
List<JobLocation> result = new ArrayList<JobLocation>();
int size = dataIn.readInt();
for (int i = 0; i < size; i++) {
JobLocation jobLocation = new JobLocation();
jobLocation.readExternal(dataIn);
result.add(jobLocation);
}
return result;
}
public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
dataOut.writeInt(value.size());
for (JobLocation jobLocation : value) {
jobLocation.writeExternal(dataOut);
}
}
}
class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
private final JobSchedulerStore store;
JobSchedulerMarshaller(JobSchedulerStore store) {
this.store = store;
}
public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
JobSchedulerImpl result = new JobSchedulerImpl(this.store);
result.read(dataIn);
return result;
}
public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
js.write(dataOut);
}
}
public File getDirectory() {
return directory;
}
public void setDirectory(File directory) {
this.directory = directory;
}
public JobScheduler getJobScheduler(final String name) throws Exception {
JobSchedulerImpl result = this.schedulers.get(name);
if (result == null) {
final JobSchedulerImpl js = new JobSchedulerImpl(this);
js.setName(name);
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
js.createIndexes(tx);
js.load(tx);
metaData.storedSchedulers.put(tx, name, js);
}
});
result = js;
this.schedulers.put(name, js);
if (isStarted()) {
result.start();
}
}
return result;
}
synchronized public boolean removeJobScheduler(final String name) throws Exception {
boolean result = false;
final JobSchedulerImpl js = this.schedulers.remove(name);
result = js != null;
if (result) {
js.stop();
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
metaData.storedSchedulers.remove(tx, name);
js.destroy(tx);
}
});
}
return result;
}
@Override
protected synchronized void doStart() throws Exception {
if (this.directory == null) {
this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
}
IOHelper.mkdirs(this.directory);
lock();
this.journal = new Journal();
this.journal.setDirectory(directory);
this.journal.setMaxFileLength(getJournalMaxFileLength());
this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
this.journal.start();
this.pageFile = new PageFile(directory, "scheduleDB");
this.pageFile.load();
this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
if (pageFile.getPageCount() == 0) {
Page<MetaData> page = tx.allocate();
assert page.getPageId() == 0;
page.set(metaData);
metaData.page = page;
metaData.createIndexes(tx);
tx.store(metaData.page, metaDataMarshaller, true);
} else {
Page<MetaData> page = tx.load(0, metaDataMarshaller);
metaData = page.get();
metaData.page = page;
}
metaData.load(tx);
metaData.loadScheduler(tx, schedulers);
for (JobSchedulerImpl js :schedulers.values()) {
try {
js.start();
} catch (Exception e) {
JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e);
}
}
}
});
this.pageFile.flush();
LOG.info(this + " started");
}
@Override
protected synchronized void doStop(ServiceStopper stopper) throws Exception {
for (JobSchedulerImpl js : this.schedulers.values()) {
js.stop();
}
if (this.pageFile != null) {
this.pageFile.unload();
}
if (this.journal != null) {
journal.close();
}
if (this.lockFile != null) {
this.lockFile.unlock();
}
this.lockFile = null;
LOG.info(this + " stopped");
}
synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
int logId = location.getDataFileId();
Integer val = this.metaData.journalRC.get(tx, logId);
int refCount = val != null ? val.intValue() + 1 : 1;
this.metaData.journalRC.put(tx, logId, refCount);
}
synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
int logId = location.getDataFileId();
int refCount = this.metaData.journalRC.get(tx, logId);
refCount--;
if (refCount <= 0) {
this.metaData.journalRC.remove(tx, logId);
Set<Integer> set = new HashSet<Integer>();
set.add(logId);
this.journal.removeDataFiles(set);
} else {
this.metaData.journalRC.put(tx, logId, refCount);
}
}
synchronized ByteSequence getJob(Location location) throws IllegalStateException, IOException {
ByteSequence result = null;
result = this.journal.read(location);
return result;
}
synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
return this.journal.write(payload, sync);
}
private void lock() throws IOException {
if (lockFile == null) {
File lockFileName = new File(directory, "lock");
lockFile = new LockFile(lockFileName, true);
if (failIfDatabaseIsLocked) {
lockFile.lock();
} else {
while (true) {
try {
lockFile.lock();
break;
} catch (IOException e) {
LOG.info("Database " + lockFileName + " is locked... waiting "
+ (DATABASE_LOCKED_WAIT_DELAY / 1000)
+ " seconds for the database to be unlocked. Reason: " + e);
try {
Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
} catch (InterruptedException e1) {
}
}
}
}
}
}
PageFile getPageFile() {
this.pageFile.isLoaded();
return this.pageFile;
}
public boolean isFailIfDatabaseIsLocked() {
return failIfDatabaseIsLocked;
}
public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
}
public int getJournalMaxFileLength() {
return journalMaxFileLength;
}
public void setJournalMaxFileLength(int journalMaxFileLength) {
this.journalMaxFileLength = journalMaxFileLength;
}
public int getJournalMaxWriteBatchSize() {
return journalMaxWriteBatchSize;
}
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
}
public boolean isEnableIndexWriteAsync() {
return enableIndexWriteAsync;
}
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
this.enableIndexWriteAsync = enableIndexWriteAsync;
}
@Override
public String toString() {
return "JobSchedulerStore:" + this.directory;
}
}

View File

@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler;
import java.io.File;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.TypeConversionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.util.ByteSequence;
public class SchedulerBroker extends BrokerFilter implements JobListener {
private static final Log LOG = LogFactory.getLog(SchedulerBroker.class);
private static final IdGenerator ID_GENERATOR = new IdGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final AtomicBoolean started = new AtomicBoolean();
private final WireFormat wireFormat = new OpenWireFormat();
private final ConnectionContext context = new ConnectionContext();
private final ProducerId producerId = new ProducerId();
private File directory;
private JobSchedulerStore store;
private JobScheduler scheduler;
public SchedulerBroker(Broker next, File directory) throws Exception {
super(next);
this.directory = directory;
this.producerId.setConnectionId(ID_GENERATOR.generateId());
this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
context.setBroker(next);
LOG.info("Scheduler using directory: " + directory);
}
/**
* @return the directory
*/
public File getDirectory() {
return this.directory;
}
/**
* @param directory
* the directory to set
*/
public void setDirectory(File directory) {
this.directory = directory;
}
@Override
public void start() throws Exception {
this.started.set(true);
super.start();
}
@Override
public void stop() throws Exception {
if (this.started.compareAndSet(true, false)) {
if (this.store != null) {
this.store.stop();
}
if (this.scheduler != null) {
this.scheduler.removeListener(this);
this.scheduler = null;
}
}
super.stop();
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
long start = 0;
long period = 0;
int repeat = 0;
Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
if (periodValue != null) {
period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
Object startValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_START);
if (startValue != null) {
start = (Long) TypeConversionSupport.convert(startValue, Long.class);
}
Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
if (repeatValue != null) {
repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
}
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(messageSend);
getScheduler().schedule( messageSend.getMessageId().toString(),
new ByteSequence(packet.data, packet.offset, packet.length),start, period, repeat);
} else {
super.send(producerExchange, messageSend);
}
}
public void scheduledJob(String id, ByteSequence job) {
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job
.getOffset(), job.getLength());
try {
Message messageSend = (Message) this.wireFormat.unmarshal(packet);
messageSend.setOriginalTransactionId(null);
Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
if (repeatValue != null) {
int repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
if (repeat != 0) {
//create a unique id - the original message could be sent lots of times
messageSend.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
}
}
//if this goes across a network - we don't want it rescheduled
messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_START);
messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setConnectionContext(context);
producerExchange.setMutable(true);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
super.send(producerExchange, messageSend);
} catch (Exception e) {
LOG.error("Failed to send scheduled message " + id, e);
}
}
private JobScheduler getScheduler() throws Exception {
if (this.started.get()) {
if (this.scheduler == null) {
this.scheduler = getStore().getJobScheduler("ActiveMQ");
this.scheduler.addListener(this);
}
return this.scheduler;
}
return null;
}
private JobSchedulerStore getStore() throws Exception {
if (started.get()) {
if (this.store == null) {
this.store = new JobSchedulerStore();
this.store.setDirectory(directory);
this.store.start();
}
return this.store;
}
return null;
}
}

View File

@ -24,14 +24,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.filter.PropertyExpression;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.util.Callback;
@ -42,8 +41,7 @@ import org.apache.activemq.util.TypeConversionSupport;
* @version $Revision:$
* @openwire:marshaller code="23"
*/
public class ActiveMQMessage extends Message implements org.apache.activemq.Message {
public class ActiveMQMessage extends Message implements org.apache.activemq.Message, ScheduledMessage {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE;
private static final Map<String, PropertySetter> JMS_PROPERTY_SETERS = new HashMap<String, PropertySetter>();
@ -54,6 +52,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
}
@Override
public Message copy() {
ActiveMQMessage copy = new ActiveMQMessage();
copy(copy);
@ -65,6 +64,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
copy.acknowledgeCallback = acknowledgeCallback;
}
@Override
public int hashCode() {
MessageId id = getMessageId();
if (id != null) {
@ -74,6 +74,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
@ -100,6 +101,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
}
}
@Override
public void clearBody() throws JMSException {
setContent(null);
readOnlyBody = false;
@ -262,6 +264,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
this.setPriority((byte) priority);
}
@Override
public void clearProperties() {
super.clearProperties();
readOnlyProperties = false;

View File

@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.Destination;
@ -174,6 +173,11 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
properties.put(name, value);
}
public void removeProperty(String name) throws IOException {
lazyCreateProperties();
properties.remove(name);
}
protected void lazyCreateProperties() throws IOException {
if (properties == null) {
if (marshalledProperties == null) {
@ -583,6 +587,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
this.memoryUsage=usage;
}
@Override
public boolean isMarshallAware() {
return true;
}
@ -687,6 +692,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
this.cluster = cluster;
}
@Override
public boolean isMessage() {
return true;
}
@ -717,10 +723,12 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
return false;
}
@Override
public String toString() {
return toString(null);
}
@Override
public String toString(Map<String, Object>overrideFields) {
try {
getProperties();

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.BrokerService;
public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
public void testSchedule() throws Exception {
final int COUNT = 1;
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
final CountDownLatch latch = new CountDownLatch(COUNT);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
latch.countDown();
}
});
connection.start();
long time = 5000;
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, time);
producer.send(message);
producer.close();
//make sure the message isn't delivered early
Thread.sleep(2000);
assertEquals(latch.getCount(), COUNT);
latch.await(5, TimeUnit.SECONDS);
assertEquals(latch.getCount(), 0);
}
public void testScheduleRepeated() throws Exception {
final int NUMBER = 10;
final AtomicInteger count = new AtomicInteger();
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
final CountDownLatch latch = new CountDownLatch(NUMBER);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
latch.countDown();
count.incrementAndGet();
}
});
connection.start();
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long time = System.currentTimeMillis() + 1000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_START, time);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 50);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, NUMBER-1);
producer.send(message);
producer.close();
assertEquals(latch.getCount(), NUMBER);
latch.await(5, TimeUnit.SECONDS);
assertEquals(latch.getCount(), 0);
//wait a little longer - make sure we only get NUMBER of replays
Thread.sleep(1000);
assertEquals(NUMBER, count.get());
}
@Override
protected void setUp() throws Exception {
bindAddress = "vm://localhost";
super.setUp();
}
@Override
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setPersistent(isPersistent());
answer.setDataDirectory("target");
answer.setUseJmx(false);
answer.addConnector(bindAddress);
return answer;
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.activemq.util.IOHelper;
import org.apache.kahadb.util.ByteSequence;
public class JobSchedulerStoreTest extends TestCase {
public void testRestart() throws Exception {
JobSchedulerStore store = new JobSchedulerStore();
File directory = new File("target/test/ScheduledDB");
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store.setDirectory(directory);
final int NUMBER = 1000;
store.start();
List<ByteSequence>list = new ArrayList<ByteSequence>();
for (int i = 0; i < NUMBER;i++ ) {
ByteSequence buff = new ByteSequence(new String("testjob"+i).getBytes());
list.add(buff);
}
JobScheduler js = store.getJobScheduler("test");
int count = 0;
long startTime = System.currentTimeMillis()+10000;
for (ByteSequence job:list) {
js.schedule("id:"+(count++), job,startTime,10000,-1);
}
List<ByteSequence>test = js.getNextScheduleJobs();
assertEquals(list.size(),test.size());
store.stop();
store.start();
js = store.getJobScheduler("test");
test = js.getNextScheduleJobs();
assertEquals(list.size(),test.size());
for (int i = 0; i < list.size();i++) {
String orig = new String(list.get(i).getData());
String payload = new String(test.get(i).getData());
assertEquals(orig,payload);
}
}
}

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.util.IOHelper;
import org.apache.kahadb.util.ByteSequence;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class JobSchedulerTest {
private JobSchedulerStore store;
private JobScheduler scheduler;
@Test
public void testAddLongStringByteSequence() throws Exception {
final int COUNT = 10;
final CountDownLatch latch = new CountDownLatch(COUNT);
scheduler.addListener(new JobListener() {
public void scheduledJob(String id, ByteSequence job) {
latch.countDown();
}
});
for (int i = 0; i < COUNT; i++) {
String test = new String("test" + i);
scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), 10);
}
latch.await(1, TimeUnit.SECONDS);
assertTrue(latch.getCount() == 0);
}
@Test
public void testAddLongLongIntStringByteSequence() throws Exception {
final int COUNT = 10;
final CountDownLatch latch = new CountDownLatch(COUNT);
scheduler.addListener(new JobListener() {
public void scheduledJob(String id, ByteSequence job) {
latch.countDown();
}
});
long time = System.currentTimeMillis() + 2000;
for (int i = 0; i < COUNT; i++) {
String test = new String("test" + i);
scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), time, 10, -1);
}
assertTrue(latch.getCount() == COUNT);
latch.await(3000, TimeUnit.SECONDS);
assertTrue(latch.getCount() == 0);
}
@Test
public void testAddStopThenDeliver() throws Exception {
final int COUNT = 10;
final CountDownLatch latch = new CountDownLatch(COUNT);
long time = System.currentTimeMillis() + 2000;
for (int i = 0; i < COUNT; i++) {
String test = new String("test" + i);
scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), time, 10, -1);
}
File directory = store.getDirectory();
tearDown();
startStore(directory);
scheduler.addListener(new JobListener() {
public void scheduledJob(String id, ByteSequence job) {
latch.countDown();
}
});
assertTrue(latch.getCount() == COUNT);
latch.await(3000, TimeUnit.SECONDS);
assertTrue(latch.getCount() == 0);
}
@Test
public void testRemoveLong() throws Exception {
final int COUNT = 10;
long time = System.currentTimeMillis() + 20000;
for (int i = 0; i < COUNT; i++) {
String str = new String("test" + i);
scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), time, 10, -1);
}
int size = scheduler.getNextScheduleJobs().size();
assertEquals(size, COUNT);
long removeTime = scheduler.getNextScheduleTime();
scheduler.remove(removeTime);
size = scheduler.getNextScheduleJobs().size();
assertEquals(0, size);
}
@Test
public void testRemoveString() throws IOException {
final int COUNT = 10;
final String test = "TESTREMOVE";
long time = System.currentTimeMillis() + 20000;
for (int i = 0; i < COUNT; i++) {
String str = new String("test" + i);
scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), time, 10, -1);
if (i == COUNT / 2) {
scheduler.schedule(test, new ByteSequence(test.getBytes()), time, 10, -1);
}
}
int size = scheduler.getNextScheduleJobs().size();
assertEquals(size, COUNT + 1);
scheduler.remove(test);
size = scheduler.getNextScheduleJobs().size();
assertEquals(size, COUNT);
}
@Before
public void setUp() throws Exception {
File directory = new File("target/test/ScheduledDB");
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
startStore(directory);
}
protected void startStore(File directory) throws Exception {
store = new JobSchedulerStore();
store.setDirectory(directory);
store.start();
scheduler = store.getJobScheduler("test");
}
@After
public void tearDown() throws Exception {
store.stop();
}
}