From 7ae2055c59bc455dc448d538a91613974bd639fb Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Fri, 15 Jan 2010 14:09:30 +0000 Subject: [PATCH] Added fix for https://issues.apache.org/activemq/browse/AMQ-451 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@899633 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/activemq/ScheduledMessage.java | 34 ++ .../apache/activemq/broker/BrokerService.java | 72 ++- .../broker/scheduler/JobListener.java | 30 ++ .../broker/scheduler/JobLocation.java | 148 +++++++ .../broker/scheduler/JobScheduler.java | 89 ++++ .../broker/scheduler/JobSchedulerImpl.java | 414 ++++++++++++++++++ .../broker/scheduler/JobSchedulerStore.java | 378 ++++++++++++++++ .../broker/scheduler/SchedulerBroker.java | 180 ++++++++ .../activemq/command/ActiveMQMessage.java | 11 +- .../org/apache/activemq/command/Message.java | 10 +- .../broker/scheduler/JmsSchedulerTest.java | 116 +++++ .../scheduler/JobSchedulerStoreTest.java | 61 +++ .../broker/scheduler/JobSchedulerTest.java | 159 +++++++ 13 files changed, 1683 insertions(+), 19 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java b/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java new file mode 100644 index 0000000000..842c4b0b61 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +public interface ScheduledMessage { + /** + * The time in milliseconds that a message will be scheduled to be delivered by the broker + */ + public static final String AMQ_SCHEDULED_START = "AMQ_SCHEDULED_START_TIME"; + /** + * The time in milliseconds to wait after the start time to wait before scheduling the message again + */ + public static final String AMQ_SCHEDULED_PERIOD = "AMQ_SCHEDULED_PERIOD"; + /** + * The number of times to repeat scheduling a message for delivery + */ + public static final String AMQ_SCHEDULED_REPEAT = "AMQ_SCHEDULED_REPEAT"; + + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 33112d3e2d..b519bbe7ef 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -31,10 +31,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import javax.management.MalformedObjectNameException; import javax.management.ObjectName; - import org.apache.activemq.ActiveMQConnectionMetaData; import org.apache.activemq.ConfigurationException; import org.apache.activemq.Service; @@ -63,6 +61,7 @@ import org.apache.activemq.broker.region.virtual.MirroredQueue; import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; import org.apache.activemq.broker.region.virtual.VirtualTopic; +import org.apache.activemq.broker.scheduler.SchedulerBroker; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.kaha.Store; @@ -134,11 +133,11 @@ public class BrokerService implements Service { private PersistenceAdapterFactory persistenceFactory; protected DestinationFactory destinationFactory; private MessageAuthorizationPolicy messageAuthorizationPolicy; - private List transportConnectors = new CopyOnWriteArrayList(); - private List networkConnectors = new CopyOnWriteArrayList(); - private List proxyConnectors = new CopyOnWriteArrayList(); - private List jmsConnectors = new CopyOnWriteArrayList(); - private List services = new ArrayList(); + private final List transportConnectors = new CopyOnWriteArrayList(); + private final List networkConnectors = new CopyOnWriteArrayList(); + private final List proxyConnectors = new CopyOnWriteArrayList(); + private final List jmsConnectors = new CopyOnWriteArrayList(); + private final List services = new ArrayList(); private MasterConnector masterConnector; private String masterConnectorURI; private transient Thread shutdownHook; @@ -151,8 +150,8 @@ public class BrokerService implements Service { private boolean advisorySupport = true; private URI vmConnectorURI; private PolicyMap destinationPolicy; - private AtomicBoolean started = new AtomicBoolean(false); - private AtomicBoolean stopped = new AtomicBoolean(false); + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean stopped = new AtomicBoolean(false); private BrokerPlugin[] plugins; private boolean keepDurableSubsActive = true; private boolean useVirtualTopics = true; @@ -164,8 +163,8 @@ public class BrokerService implements Service { private Store tempDataStore; private int persistenceThreadPriority = Thread.MAX_PRIORITY; private boolean useLocalHostBrokerName; - private CountDownLatch stoppedLatch = new CountDownLatch(1); - private CountDownLatch startedLatch = new CountDownLatch(1); + private final CountDownLatch stoppedLatch = new CountDownLatch(1); + private final CountDownLatch startedLatch = new CountDownLatch(1); private boolean supportFailOver; private Broker regionBroker; private int producerSystemUsagePortion = 60; @@ -176,12 +175,14 @@ public class BrokerService implements Service { private boolean dedicatedTaskRunner; private boolean cacheTempDestinations = false;// useful for failover private int timeBeforePurgeTempDestinations = 5000; - private List shutdownHooks = new ArrayList(); + private final List shutdownHooks = new ArrayList(); private boolean systemExitOnShutdown; private int systemExitOnShutdownExitCode; private SslContext sslContext; private boolean forceStart = false; private IOExceptionHandler ioExceptionHandler; + private boolean schedulerSupport = true; + private File schedulerDirectoryFile; static { String localHostName = "localhost"; @@ -512,7 +513,8 @@ public class BrokerService implements Service { if (systemExitOnShutdown) { new Thread() { - public void run() { + @Override + public void run() { System.exit(systemExitOnShutdownExitCode); } }.start(); @@ -1064,7 +1066,7 @@ public class BrokerService implements Service { } public Service[] getServices() { - return (Service[]) services.toArray(new Service[0]); + return services.toArray(new Service[0]); } /** @@ -1675,15 +1677,18 @@ public class BrokerService implements Service { broker = new MutableBrokerFilter(broker) { Broker old; + @Override public void stop() throws Exception { old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { // Just ignore additional stop actions. + @Override public void stop() throws Exception { } }); old.stop(); } + @Override public void start() throws Exception { if (forceStart && old != null) { this.next.set(old); @@ -1757,6 +1762,9 @@ public class BrokerService implements Service { */ protected Broker addInterceptors(Broker broker) throws Exception { broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); + if (isSchedulerSupport()) { + broker = new SchedulerBroker(broker,getSchedulerDirectoryFile()); + } if (isAdvisorySupport()) { broker = new AdvisoryBroker(broker); } @@ -1821,6 +1829,7 @@ public class BrokerService implements Service { protected void addShutdownHook() { if (useShutdownHook) { shutdownHook = new Thread("ActiveMQ ShutdownHook") { + @Override public void run() { containerShutdown(); } @@ -2141,6 +2150,41 @@ public class BrokerService implements Service { ioExceptionHandler.setBrokerService(this); this.ioExceptionHandler = ioExceptionHandler; } + + /** + * @return the schedulerSupport + */ + public boolean isSchedulerSupport() { + return this.schedulerSupport; + } + + /** + * @param schedulerSupport the schedulerSupport to set + */ + public void setSchedulerSupport(boolean schedulerSupport) { + this.schedulerSupport = schedulerSupport; + } + + /** + * @return the schedulerDirectory + */ + public File getSchedulerDirectoryFile() { + if (this.schedulerDirectoryFile == null) { + this.schedulerDirectoryFile = new File(IOHelper.getDefaultDataDirectory(),"scheduler"); + } + return schedulerDirectoryFile; + } + + /** + * @param schedulerDirectory the schedulerDirectory to set + */ + public void setSchedulerDirectoryFile(File schedulerDirectory) { + this.schedulerDirectoryFile = schedulerDirectory; + } + + public void setSchedulerDirectory(String schedulerDirectory) { + setSchedulerDirectoryFile(new File(schedulerDirectory)); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java new file mode 100644 index 0000000000..9a5998159c --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import org.apache.kahadb.util.ByteSequence; + +public interface JobListener { + + /** + * A Job that has been scheduled is now ready + * @param id + * @param job + */ + public void scheduledJob(String id,ByteSequence job); + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java new file mode 100644 index 0000000000..69e3c6948d --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.kahadb.journal.Location; +import org.apache.kahadb.util.VariableMarshaller; + +class JobLocation { + + private String jobId; + private int repeat; + private long start; + private long period; + private final Location location; + + public JobLocation(Location location) { + this.location = location; + + } + + public JobLocation() { + this(new Location()); + } + + public void readExternal(DataInput in) throws IOException { + this.jobId = in.readUTF(); + this.repeat = in.readInt(); + this.start = in.readLong(); + this.period = in.readLong(); + this.location.readExternal(in); + } + + public void writeExternal(DataOutput out) throws IOException { + out.writeUTF(this.jobId); + out.writeInt(repeat); + out.writeLong(start); + out.writeLong(period); + this.location.writeExternal(out); + } + + /** + * @return the jobId + */ + public String getJobId() { + return this.jobId; + } + + /** + * @param jobId + * the jobId to set + */ + public void setJobId(String jobId) { + this.jobId = jobId; + } + + + /** + * @return the repeat + */ + public int getRepeat() { + return this.repeat; + } + + /** + * @param repeat + * the repeat to set + */ + public void setRepeat(int repeat) { + this.repeat = repeat; + } + + /** + * @return the start + */ + public long getStart() { + return this.start; + } + + /** + * @param start + * the start to set + */ + public void setStart(long start) { + this.start = start; + } + + /** + * @return the period + */ + public long getPeriod() { + return this.period; + } + + /** + * @param period + * the period to set + */ + public void setPeriod(long period) { + this.period = period; + } + + /** + * @return the location + */ + public Location getLocation() { + return this.location; + } + + static class JobLocationMarshaller extends VariableMarshaller> { + static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller(); + public List readPayload(DataInput dataIn) throws IOException { + List result = new ArrayList(); + int size = dataIn.readInt(); + for (int i = 0; i < size; i++) { + JobLocation jobLocation = new JobLocation(); + jobLocation.readExternal(dataIn); + result.add(jobLocation); + } + return result; + } + + public void writePayload(List value, DataOutput dataOut) throws IOException { + dataOut.writeInt(value.size()); + for (JobLocation jobLocation : value) { + jobLocation.writeExternal(dataOut); + } + } + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java new file mode 100644 index 0000000000..b0beb44abb --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import java.io.IOException; +import java.util.List; +import org.apache.kahadb.util.ByteSequence; + +interface JobScheduler { + + /** + * @return the name of the scheduler + */ + public abstract String getName(); +/** + * Add a Job listener + * @param l + */ + public abstract void addListener(JobListener l); +/** + * remove a JobListener + * @param l + */ + public abstract void removeListener(JobListener l); + + /** + * Add a job to be scheduled + * @param jobId a unique identifier for the job + * @param payload the message to be sent when the job is scheduled + * @param delay the time in milliseconds before the job will be run + * @throws IOException + */ + public abstract void schedule(String jobId, ByteSequence payload,long delay) throws IOException; + + + /** + * Add a job to be scheduled + * @param jobId a unique identifier for the job + * @param payload the message to be sent when the job is scheduled + * @param start + * @param period the time in milliseconds between successive executions of the Job + * @param repeat the number of times to execute the job - less than 0 will be repeated forever + * @throws IOException + */ + public abstract void schedule(String jobId, ByteSequence payload,long start, long period, int repeat) throws IOException; + + /** + * remove all jobs scheduled to run at this time + * @param time + * @throws IOException + */ + public abstract void remove(long time) throws IOException; + + /** + * remove a job with the matching jobId + * @param jobId + * @throws IOException + */ + public abstract void remove(String jobId) throws IOException; + + /** + * Get all the jobs scheduled to run next + * @return a list of messages that will be scheduled next + * @throws IOException + */ + public abstract List getNextScheduleJobs() throws IOException; + + /** + * Get the next time jobs will be fired + * @return the time in milliseconds + * @throws IOException + */ + public abstract long getNextScheduleTime() throws IOException; + +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java new file mode 100644 index 0000000000..3d2d8d590e --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java @@ -0,0 +1,414 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.kahadb.index.BTreeIndex; +import org.apache.kahadb.journal.Location; +import org.apache.kahadb.page.Transaction; +import org.apache.kahadb.util.ByteSequence; +import org.apache.kahadb.util.LongMarshaller; +import org.apache.kahadb.util.VariableMarshaller; + +class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler { + private static final Log LOG = LogFactory.getLog(JobSchedulerImpl.class); + final JobSchedulerStore store; + private final AtomicBoolean running = new AtomicBoolean(); + private String name; + BTreeIndex> index; + private Thread thread; + private final List jobListeners = new CopyOnWriteArrayList(); + + JobSchedulerImpl(JobSchedulerStore store) { + + this.store = store; + } + + public void setName(String name) { + this.name = name; + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.beanstalk.JobScheduler#getName() + */ + public String getName() { + return this.name; + } + + /* + * (non-Javadoc) + * @see + * org.apache.activemq.beanstalk.JobScheduler#addListener(org.apache.activemq + * .beanstalk.JobListener) + */ + public void addListener(JobListener l) { + this.jobListeners.add(l); + } + + /* + * (non-Javadoc) + * @see + * org.apache.activemq.beanstalk.JobScheduler#removeListener(org.apache. + * activemq.beanstalk.JobListener) + */ + public void removeListener(JobListener l) { + this.jobListeners.remove(l); + } + + + public void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + schedule(tx, jobId, payload, 0, delay, 0); + } + }); + } + + + public void schedule(final String jobId, final ByteSequence payload, final long start, final long period, final int repeat) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + schedule(tx, jobId, payload, start, period, repeat); + } + }); + + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.beanstalk.JobScheduler#remove(long) + */ + public synchronized void remove(final long time) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + remove(tx, time); + } + }); + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.beanstalk.JobScheduler#remove(long, + * java.lang.String) + */ + public synchronized void remove(final long time, final String jobId) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + remove(tx, time, jobId); + } + }); + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.beanstalk.JobScheduler#remove(java.lang.String) + */ + public synchronized void remove(final String jobId) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + remove(tx, jobId); + } + }); + } + + public synchronized long getNextScheduleTime() throws IOException { + Map.Entry> first = this.index.getFirst(this.store.getPageFile().tx()); + return first != null ? first.getKey() : -1l; + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.beanstalk.JobScheduler#getNextScheduleJobs() + */ + public synchronized List getNextScheduleJobs() throws IOException { + final List result = new ArrayList(); + + this.store.getPageFile().tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + Map.Entry> first = index.getFirst(store.getPageFile().tx()); + if (first != null) { + for (JobLocation jl : first.getValue()) { + ByteSequence bs = getJob(jl.getLocation()); + result.add(bs); + } + } + } + }); + return result; + } + + ByteSequence getJob(Location location) throws IllegalStateException, IOException { + return this.store.getJob(location); + } + + void schedule(Transaction tx, String jobId, ByteSequence payload,long start, long period, int repeat) + throws IOException { + List values = null; + long startTime; + long time; + if (start > 0) { + time = startTime = start; + }else { + startTime = System.currentTimeMillis(); + time = startTime + period; + } + if (this.index.containsKey(tx, time)) { + values = this.index.remove(tx, time); + } + if (values == null) { + values = new ArrayList(); + } + + Location location = this.store.write(payload, false); + JobLocation jobLocation = new JobLocation(location); + jobLocation.setJobId(jobId); + jobLocation.setPeriod(period); + jobLocation.setRepeat(repeat); + values.add(jobLocation); + this.index.put(tx, time, values); + this.store.incrementJournalCount(tx, location); + poke(); + } + + void remove(Transaction tx, long time, String jobId) throws IOException { + List values = this.index.remove(tx, time); + if (values != null) { + for (int i = 0; i < values.size(); i++) { + JobLocation jl = values.get(i); + if (jl.getJobId().equals(jobId)) { + values.remove(i); + if (!values.isEmpty()) { + this.index.put(tx, time, values); + } + this.store.decrementJournalCount(tx, jl.getLocation()); + break; + } + } + } + } + + void remove(Transaction tx, long time) throws IOException { + List values = this.index.remove(tx, time); + if (values != null) { + for (JobLocation jl : values) { + this.store.decrementJournalCount(tx, jl.getLocation()); + } + } + } + + void remove(Transaction tx, String id) throws IOException { + for (Iterator>> i = this.index.iterator(tx); i.hasNext();) { + Map.Entry> entry = i.next(); + List values = entry.getValue(); + if (values != null) { + for (JobLocation jl : values) { + if (jl.getJobId().equals(id)) { + remove(tx, entry.getKey(), id); + return; + } + } + } + } + } + + synchronized void destroy(Transaction tx) throws IOException { + for (Iterator>> i = this.index.iterator(tx); i.hasNext();) { + Map.Entry> entry = i.next(); + List values = entry.getValue(); + if (values != null) { + for (JobLocation jl : values) { + this.store.decrementJournalCount(tx, jl.getLocation()); + } + } + + } + } + + synchronized Map.Entry> getNextToSchedule() throws IOException { + if (!this.store.isStopped() && !this.store.isStopping()) { + Map.Entry> first = this.index.getFirst(this.store.getPageFile().tx()); + return first; + } + return null; + + } + + void fireJobs(List list) throws IllegalStateException, IOException { + for (JobLocation jl : list) { + ByteSequence bs = this.store.getJob(jl.getLocation()); + for (JobListener l : jobListeners) { + l.scheduledJob(jl.getJobId(), bs); + } + } + } + + public void run() { + try { + mainLoop(); + } catch (Throwable e) { + if (this.running.get() && isStarted()) { + LOG.error(this + " Caught exception in mainloop", e); + } + } finally { + if (running.get()) { + try { + stop(); + } catch (Exception e) { + LOG.error("Failed to stop " + this); + } + } + } + } + + @Override + public String toString() { + return "JobScheduler:" + this.name; + } + + protected void mainLoop() { + while (this.running.get()) { + try { + // peek the next job + long currentTime = System.currentTimeMillis(); + + Map.Entry> first = getNextToSchedule(); + if (first != null) { + List list = new ArrayList(first.getValue()); + long executionTime = first.getKey(); + if (executionTime <= currentTime) { + fireJobs(list); + for (JobLocation jl : list) { + int repeat = jl.getRepeat(); + if (repeat != 0) { + repeat--; + ByteSequence payload = this.store.getJob(jl.getLocation()); + String jobId = jl.getJobId(); + long period = jl.getPeriod(); + schedule(jobId, payload,0, period, repeat); + } + } + // now remove jobs from this execution time + remove(executionTime); + } else { + long waitTime = executionTime - currentTime; + synchronized (this.running) { + this.running.wait(waitTime); + } + } + } else { + synchronized (this.running) { + this.running.wait(250); + } + } + + } catch (InterruptedException e) { + } catch (IOException ioe) { + LOG.error(this.name + " Failed to schedule job", ioe); + try { + this.store.stop(); + } catch (Exception e) { + LOG.error(this.name + " Failed to shutdown JobSchedulerStore", e); + } + } + } + } + + @Override + protected void doStart() throws Exception { + this.running.set(true); + this.thread = new Thread(this, "JobScheduler:" + this.name); + this.thread.setDaemon(true); + this.thread.start(); + + } + + @Override + protected void doStop(ServiceStopper stopper) throws Exception { + this.running.set(false); + poke(); + Thread t = this.thread; + if (t != null) { + t.join(1000); + } + + } + + protected void poke() { + synchronized (this.running) { + this.running.notifyAll(); + } + } + + void createIndexes(Transaction tx) throws IOException { + this.index = new BTreeIndex>(this.store.getPageFile(), tx.allocate().getPageId()); + } + + void load(Transaction tx) throws IOException { + this.index.setKeyMarshaller(LongMarshaller.INSTANCE); + this.index.setValueMarshaller(ValueMarshaller.INSTANCE); + this.index.load(tx); + } + + void read(DataInput in) throws IOException { + this.name = in.readUTF(); + this.index = new BTreeIndex>(this.store.getPageFile(), in.readLong()); + this.index.setKeyMarshaller(LongMarshaller.INSTANCE); + this.index.setValueMarshaller(ValueMarshaller.INSTANCE); + } + + public void write(DataOutput out) throws IOException { + out.writeUTF(name); + out.writeLong(this.index.getPageId()); + } + + static class ValueMarshaller extends VariableMarshaller> { + static ValueMarshaller INSTANCE = new ValueMarshaller(); + public List readPayload(DataInput dataIn) throws IOException { + List result = new ArrayList(); + int size = dataIn.readInt(); + for (int i = 0; i < size; i++) { + JobLocation jobLocation = new JobLocation(); + jobLocation.readExternal(dataIn); + result.add(jobLocation); + } + return result; + } + + public void writePayload(List value, DataOutput dataOut) throws IOException { + dataOut.writeInt(value.size()); + for (JobLocation jobLocation : value) { + jobLocation.writeExternal(dataOut); + } + } + } + + + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java new file mode 100644 index 0000000000..99eeaa2acd --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.kahadb.index.BTreeIndex; +import org.apache.kahadb.journal.Journal; +import org.apache.kahadb.journal.Location; +import org.apache.kahadb.page.Page; +import org.apache.kahadb.page.PageFile; +import org.apache.kahadb.page.Transaction; +import org.apache.kahadb.util.ByteSequence; +import org.apache.kahadb.util.IntegerMarshaller; +import org.apache.kahadb.util.LockFile; +import org.apache.kahadb.util.StringMarshaller; +import org.apache.kahadb.util.VariableMarshaller; + +public class JobSchedulerStore extends ServiceSupport { + static final Log LOG = LogFactory.getLog(JobSchedulerStore.class); + private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; + + public static final int CLOSED_STATE = 1; + public static final int OPEN_STATE = 2; + + private File directory; + PageFile pageFile; + private Journal journal; + private LockFile lockFile; + private boolean failIfDatabaseIsLocked; + private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; + private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; + private boolean enableIndexWriteAsync = false; + // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; + MetaData metaData = new MetaData(this); + final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); + Map schedulers = new HashMap(); + + protected class MetaData { + protected MetaData(JobSchedulerStore store) { + this.store = store; + } + private final JobSchedulerStore store; + Page page; + BTreeIndex journalRC; + BTreeIndex storedSchedulers; + + void createIndexes(Transaction tx) throws IOException { + this.storedSchedulers = new BTreeIndex(pageFile, tx.allocate().getPageId()); + this.journalRC = new BTreeIndex(pageFile, tx.allocate().getPageId()); + } + + void load(Transaction tx) throws IOException { + this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); + this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); + this.storedSchedulers.load(tx); + this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.load(tx); + } + + void loadScheduler(Transaction tx, Map schedulers) throws IOException { + for (Iterator> i = this.storedSchedulers.iterator(tx); i.hasNext();) { + Entry entry = i.next(); + entry.getValue().load(tx); + schedulers.put(entry.getKey(), entry.getValue()); + } + } + + public void read(DataInput is) throws IOException { + this.storedSchedulers = new BTreeIndex(pageFile, is.readLong()); + this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); + this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); + this.journalRC = new BTreeIndex(pageFile, is.readLong()); + this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); + } + + public void write(DataOutput os) throws IOException { + os.writeLong(this.storedSchedulers.getPageId()); + os.writeLong(this.journalRC.getPageId()); + + } + } + + class MetaDataMarshaller extends VariableMarshaller { + private final JobSchedulerStore store; + + MetaDataMarshaller(JobSchedulerStore store) { + this.store = store; + } + public MetaData readPayload(DataInput dataIn) throws IOException { + MetaData rc = new MetaData(this.store); + rc.read(dataIn); + return rc; + } + + public void writePayload(MetaData object, DataOutput dataOut) throws IOException { + object.write(dataOut); + } + } + + class ValueMarshaller extends VariableMarshaller> { + public List readPayload(DataInput dataIn) throws IOException { + List result = new ArrayList(); + int size = dataIn.readInt(); + for (int i = 0; i < size; i++) { + JobLocation jobLocation = new JobLocation(); + jobLocation.readExternal(dataIn); + result.add(jobLocation); + } + return result; + } + + public void writePayload(List value, DataOutput dataOut) throws IOException { + dataOut.writeInt(value.size()); + for (JobLocation jobLocation : value) { + jobLocation.writeExternal(dataOut); + } + } + } + + class JobSchedulerMarshaller extends VariableMarshaller { + private final JobSchedulerStore store; + JobSchedulerMarshaller(JobSchedulerStore store) { + this.store = store; + } + public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException { + JobSchedulerImpl result = new JobSchedulerImpl(this.store); + result.read(dataIn); + return result; + } + + public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException { + js.write(dataOut); + } + } + + public File getDirectory() { + return directory; + } + + public void setDirectory(File directory) { + this.directory = directory; + } + + public JobScheduler getJobScheduler(final String name) throws Exception { + JobSchedulerImpl result = this.schedulers.get(name); + if (result == null) { + final JobSchedulerImpl js = new JobSchedulerImpl(this); + js.setName(name); + getPageFile().tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + js.createIndexes(tx); + js.load(tx); + metaData.storedSchedulers.put(tx, name, js); + } + }); + result = js; + this.schedulers.put(name, js); + if (isStarted()) { + result.start(); + } + } + return result; + } + + synchronized public boolean removeJobScheduler(final String name) throws Exception { + boolean result = false; + final JobSchedulerImpl js = this.schedulers.remove(name); + result = js != null; + if (result) { + js.stop(); + getPageFile().tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + metaData.storedSchedulers.remove(tx, name); + js.destroy(tx); + } + }); + } + return result; + } + + @Override + protected synchronized void doStart() throws Exception { + if (this.directory == null) { + this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); + } + IOHelper.mkdirs(this.directory); + lock(); + this.journal = new Journal(); + this.journal.setDirectory(directory); + this.journal.setMaxFileLength(getJournalMaxFileLength()); + this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); + this.journal.start(); + this.pageFile = new PageFile(directory, "scheduleDB"); + this.pageFile.load(); + + this.pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + if (pageFile.getPageCount() == 0) { + Page page = tx.allocate(); + assert page.getPageId() == 0; + page.set(metaData); + metaData.page = page; + metaData.createIndexes(tx); + tx.store(metaData.page, metaDataMarshaller, true); + + } else { + Page page = tx.load(0, metaDataMarshaller); + metaData = page.get(); + metaData.page = page; + } + metaData.load(tx); + metaData.loadScheduler(tx, schedulers); + for (JobSchedulerImpl js :schedulers.values()) { + try { + js.start(); + } catch (Exception e) { + JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e); + } + } + } + }); + + this.pageFile.flush(); + LOG.info(this + " started"); + } + + @Override + protected synchronized void doStop(ServiceStopper stopper) throws Exception { + for (JobSchedulerImpl js : this.schedulers.values()) { + js.stop(); + } + if (this.pageFile != null) { + this.pageFile.unload(); + } + if (this.journal != null) { + journal.close(); + } + if (this.lockFile != null) { + this.lockFile.unlock(); + } + this.lockFile = null; + LOG.info(this + " stopped"); + + } + + synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException { + int logId = location.getDataFileId(); + Integer val = this.metaData.journalRC.get(tx, logId); + int refCount = val != null ? val.intValue() + 1 : 1; + this.metaData.journalRC.put(tx, logId, refCount); + + } + + synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException { + int logId = location.getDataFileId(); + int refCount = this.metaData.journalRC.get(tx, logId); + refCount--; + if (refCount <= 0) { + this.metaData.journalRC.remove(tx, logId); + Set set = new HashSet(); + set.add(logId); + this.journal.removeDataFiles(set); + } else { + this.metaData.journalRC.put(tx, logId, refCount); + } + + } + + synchronized ByteSequence getJob(Location location) throws IllegalStateException, IOException { + ByteSequence result = null; + result = this.journal.read(location); + return result; + } + + synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { + return this.journal.write(payload, sync); + } + + private void lock() throws IOException { + if (lockFile == null) { + File lockFileName = new File(directory, "lock"); + lockFile = new LockFile(lockFileName, true); + if (failIfDatabaseIsLocked) { + lockFile.lock(); + } else { + while (true) { + try { + lockFile.lock(); + break; + } catch (IOException e) { + LOG.info("Database " + lockFileName + " is locked... waiting " + + (DATABASE_LOCKED_WAIT_DELAY / 1000) + + " seconds for the database to be unlocked. Reason: " + e); + try { + Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); + } catch (InterruptedException e1) { + } + } + } + } + } + } + + PageFile getPageFile() { + this.pageFile.isLoaded(); + return this.pageFile; + } + + public boolean isFailIfDatabaseIsLocked() { + return failIfDatabaseIsLocked; + } + + public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { + this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; + } + + public int getJournalMaxFileLength() { + return journalMaxFileLength; + } + + public void setJournalMaxFileLength(int journalMaxFileLength) { + this.journalMaxFileLength = journalMaxFileLength; + } + + public int getJournalMaxWriteBatchSize() { + return journalMaxWriteBatchSize; + } + + public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { + this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; + } + + public boolean isEnableIndexWriteAsync() { + return enableIndexWriteAsync; + } + + public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { + this.enableIndexWriteAsync = enableIndexWriteAsync; + } + + @Override + public String toString() { + return "JobSchedulerStore:" + this.directory; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java new file mode 100644 index 0000000000..c943d22a55 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import java.io.File; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.security.SecurityContext; +import org.apache.activemq.state.ProducerState; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.util.LongSequenceGenerator; +import org.apache.activemq.util.TypeConversionSupport; +import org.apache.activemq.wireformat.WireFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.kahadb.util.ByteSequence; + +public class SchedulerBroker extends BrokerFilter implements JobListener { + private static final Log LOG = LogFactory.getLog(SchedulerBroker.class); + private static final IdGenerator ID_GENERATOR = new IdGenerator(); + private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); + private final AtomicBoolean started = new AtomicBoolean(); + private final WireFormat wireFormat = new OpenWireFormat(); + private final ConnectionContext context = new ConnectionContext(); + private final ProducerId producerId = new ProducerId(); + private File directory; + + private JobSchedulerStore store; + private JobScheduler scheduler; + + public SchedulerBroker(Broker next, File directory) throws Exception { + super(next); + this.directory = directory; + this.producerId.setConnectionId(ID_GENERATOR.generateId()); + this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); + context.setBroker(next); + LOG.info("Scheduler using directory: " + directory); + + } + /** + * @return the directory + */ + public File getDirectory() { + return this.directory; + } + /** + * @param directory + * the directory to set + */ + public void setDirectory(File directory) { + this.directory = directory; + } + + @Override + public void start() throws Exception { + this.started.set(true); + super.start(); + } + + @Override + public void stop() throws Exception { + if (this.started.compareAndSet(true, false)) { + + if (this.store != null) { + this.store.stop(); + } + if (this.scheduler != null) { + this.scheduler.removeListener(this); + this.scheduler = null; + } + } + super.stop(); + } + + @Override + public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { + long start = 0; + long period = 0; + int repeat = 0; + + Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD); + + if (periodValue != null) { + period = (Long) TypeConversionSupport.convert(periodValue, Long.class); + Object startValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_START); + if (startValue != null) { + start = (Long) TypeConversionSupport.convert(startValue, Long.class); + } + Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); + if (repeatValue != null) { + repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); + } + org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(messageSend); + getScheduler().schedule( messageSend.getMessageId().toString(), + new ByteSequence(packet.data, packet.offset, packet.length),start, period, repeat); + + } else { + + super.send(producerExchange, messageSend); + } + } + + public void scheduledJob(String id, ByteSequence job) { + org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job + .getOffset(), job.getLength()); + try { + Message messageSend = (Message) this.wireFormat.unmarshal(packet); + messageSend.setOriginalTransactionId(null); + Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); + if (repeatValue != null) { + int repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); + if (repeat != 0) { + //create a unique id - the original message could be sent lots of times + messageSend.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId())); + } + } + + //if this goes across a network - we don't want it rescheduled + messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD); + messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_START); + messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); + + final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); + producerExchange.setConnectionContext(context); + producerExchange.setMutable(true); + producerExchange.setProducerState(new ProducerState(new ProducerInfo())); + super.send(producerExchange, messageSend); + } catch (Exception e) { + LOG.error("Failed to send scheduled message " + id, e); + } + + } + + private JobScheduler getScheduler() throws Exception { + if (this.started.get()) { + if (this.scheduler == null) { + this.scheduler = getStore().getJobScheduler("ActiveMQ"); + this.scheduler.addListener(this); + } + return this.scheduler; + } + return null; + } + + private JobSchedulerStore getStore() throws Exception { + if (started.get()) { + if (this.store == null) { + this.store = new JobSchedulerStore(); + this.store.setDirectory(directory); + this.store.start(); + } + return this.store; + } + return null; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java b/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java index 15e9f3e13e..45cab08610 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java @@ -24,14 +24,13 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Vector; - import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageFormatException; import javax.jms.MessageNotWriteableException; - import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ScheduledMessage; import org.apache.activemq.filter.PropertyExpression; import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.util.Callback; @@ -42,8 +41,7 @@ import org.apache.activemq.util.TypeConversionSupport; * @version $Revision:$ * @openwire:marshaller code="23" */ -public class ActiveMQMessage extends Message implements org.apache.activemq.Message { - +public class ActiveMQMessage extends Message implements org.apache.activemq.Message, ScheduledMessage { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE; private static final Map JMS_PROPERTY_SETERS = new HashMap(); @@ -54,6 +52,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess } + @Override public Message copy() { ActiveMQMessage copy = new ActiveMQMessage(); copy(copy); @@ -65,6 +64,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess copy.acknowledgeCallback = acknowledgeCallback; } + @Override public int hashCode() { MessageId id = getMessageId(); if (id != null) { @@ -74,6 +74,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess } } + @Override public boolean equals(Object o) { if (this == o) { return true; @@ -100,6 +101,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess } } + @Override public void clearBody() throws JMSException { setContent(null); readOnlyBody = false; @@ -262,6 +264,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess this.setPriority((byte) priority); } + @Override public void clearProperties() { super.clearProperties(); readOnlyProperties = false; diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Message.java b/activemq-core/src/main/java/org/apache/activemq/command/Message.java index 9c0226f5c2..4db2cff1a2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/Message.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import javax.jms.JMSException; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.region.Destination; @@ -173,6 +172,11 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess lazyCreateProperties(); properties.put(name, value); } + + public void removeProperty(String name) throws IOException { + lazyCreateProperties(); + properties.remove(name); + } protected void lazyCreateProperties() throws IOException { if (properties == null) { @@ -583,6 +587,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess this.memoryUsage=usage; } + @Override public boolean isMarshallAware() { return true; } @@ -687,6 +692,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess this.cluster = cluster; } + @Override public boolean isMessage() { return true; } @@ -717,10 +723,12 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess return false; } + @Override public String toString() { return toString(null); } + @Override public String toString(MapoverrideFields) { try { getProperties(); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java new file mode 100644 index 0000000000..8224740af9 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.BrokerService; + +public class JmsSchedulerTest extends EmbeddedBrokerTestSupport { + + + public void testSchedule() throws Exception { + final int COUNT = 1; + Connection connection = createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(destination); + + final CountDownLatch latch = new CountDownLatch(COUNT); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + latch.countDown(); + } + }); + + connection.start(); + long time = 5000; + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("test msg"); + + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, time); + + producer.send(message); + producer.close(); + //make sure the message isn't delivered early + Thread.sleep(2000); + assertEquals(latch.getCount(), COUNT); + latch.await(5, TimeUnit.SECONDS); + assertEquals(latch.getCount(), 0); + } + + public void testScheduleRepeated() throws Exception { + final int NUMBER = 10; + final AtomicInteger count = new AtomicInteger(); + Connection connection = createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(destination); + + final CountDownLatch latch = new CountDownLatch(NUMBER); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + latch.countDown(); + count.incrementAndGet(); + } + }); + + connection.start(); + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("test msg"); + long time = System.currentTimeMillis() + 1000; + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_START, time); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 50); + message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, NUMBER-1); + producer.send(message); + producer.close(); + assertEquals(latch.getCount(), NUMBER); + latch.await(5, TimeUnit.SECONDS); + assertEquals(latch.getCount(), 0); + //wait a little longer - make sure we only get NUMBER of replays + Thread.sleep(1000); + assertEquals(NUMBER, count.get()); + } + + @Override + protected void setUp() throws Exception { + bindAddress = "vm://localhost"; + super.setUp(); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(isPersistent()); + answer.setDataDirectory("target"); + answer.setUseJmx(false); + answer.addConnector(bindAddress); + return answer; + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java new file mode 100644 index 0000000000..86b16f7b8a --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import junit.framework.TestCase; +import org.apache.activemq.util.IOHelper; +import org.apache.kahadb.util.ByteSequence; + +public class JobSchedulerStoreTest extends TestCase { + + public void testRestart() throws Exception { + JobSchedulerStore store = new JobSchedulerStore(); + File directory = new File("target/test/ScheduledDB"); + IOHelper.mkdirs(directory); + IOHelper.deleteChildren(directory); + store.setDirectory(directory); + final int NUMBER = 1000; + store.start(); + Listlist = new ArrayList(); + for (int i = 0; i < NUMBER;i++ ) { + ByteSequence buff = new ByteSequence(new String("testjob"+i).getBytes()); + list.add(buff); + } + JobScheduler js = store.getJobScheduler("test"); + int count = 0; + long startTime = System.currentTimeMillis()+10000; + for (ByteSequence job:list) { + js.schedule("id:"+(count++), job,startTime,10000,-1); + } + Listtest = js.getNextScheduleJobs(); + assertEquals(list.size(),test.size()); + store.stop(); + + store.start(); + js = store.getJobScheduler("test"); + test = js.getNextScheduleJobs(); + assertEquals(list.size(),test.size()); + for (int i = 0; i < list.size();i++) { + String orig = new String(list.get(i).getData()); + String payload = new String(test.get(i).getData()); + assertEquals(orig,payload); + } + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java new file mode 100644 index 0000000000..a00345cdd1 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.activemq.util.IOHelper; +import org.apache.kahadb.util.ByteSequence; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class JobSchedulerTest { + + private JobSchedulerStore store; + private JobScheduler scheduler; + + @Test + public void testAddLongStringByteSequence() throws Exception { + final int COUNT = 10; + final CountDownLatch latch = new CountDownLatch(COUNT); + scheduler.addListener(new JobListener() { + + public void scheduledJob(String id, ByteSequence job) { + latch.countDown(); + } + + }); + for (int i = 0; i < COUNT; i++) { + String test = new String("test" + i); + scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), 10); + } + latch.await(1, TimeUnit.SECONDS); + assertTrue(latch.getCount() == 0); + } + + @Test + public void testAddLongLongIntStringByteSequence() throws Exception { + final int COUNT = 10; + final CountDownLatch latch = new CountDownLatch(COUNT); + scheduler.addListener(new JobListener() { + + public void scheduledJob(String id, ByteSequence job) { + latch.countDown(); + } + + }); + long time = System.currentTimeMillis() + 2000; + for (int i = 0; i < COUNT; i++) { + String test = new String("test" + i); + scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), time, 10, -1); + } + assertTrue(latch.getCount() == COUNT); + latch.await(3000, TimeUnit.SECONDS); + assertTrue(latch.getCount() == 0); + } + + @Test + public void testAddStopThenDeliver() throws Exception { + final int COUNT = 10; + final CountDownLatch latch = new CountDownLatch(COUNT); + long time = System.currentTimeMillis() + 2000; + for (int i = 0; i < COUNT; i++) { + String test = new String("test" + i); + scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), time, 10, -1); + } + File directory = store.getDirectory(); + tearDown(); + startStore(directory); + scheduler.addListener(new JobListener() { + + public void scheduledJob(String id, ByteSequence job) { + latch.countDown(); + } + + }); + assertTrue(latch.getCount() == COUNT); + latch.await(3000, TimeUnit.SECONDS); + assertTrue(latch.getCount() == 0); + } + + @Test + public void testRemoveLong() throws Exception { + final int COUNT = 10; + + long time = System.currentTimeMillis() + 20000; + for (int i = 0; i < COUNT; i++) { + String str = new String("test" + i); + scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), time, 10, -1); + + } + int size = scheduler.getNextScheduleJobs().size(); + assertEquals(size, COUNT); + long removeTime = scheduler.getNextScheduleTime(); + scheduler.remove(removeTime); + size = scheduler.getNextScheduleJobs().size(); + assertEquals(0, size); + } + + @Test + public void testRemoveString() throws IOException { + final int COUNT = 10; + final String test = "TESTREMOVE"; + long time = System.currentTimeMillis() + 20000; + for (int i = 0; i < COUNT; i++) { + String str = new String("test" + i); + scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), time, 10, -1); + if (i == COUNT / 2) { + scheduler.schedule(test, new ByteSequence(test.getBytes()), time, 10, -1); + } + } + + int size = scheduler.getNextScheduleJobs().size(); + assertEquals(size, COUNT + 1); + scheduler.remove(test); + size = scheduler.getNextScheduleJobs().size(); + assertEquals(size, COUNT); + } + + @Before + public void setUp() throws Exception { + File directory = new File("target/test/ScheduledDB"); + IOHelper.mkdirs(directory); + IOHelper.deleteChildren(directory); + startStore(directory); + + } + + protected void startStore(File directory) throws Exception { + store = new JobSchedulerStore(); + store.setDirectory(directory); + store.start(); + scheduler = store.getJobScheduler("test"); + } + + @After + public void tearDown() throws Exception { + store.stop(); + } + +}