Add an in-memory variant of the disk based JobScheduler store to allow
for an embedded broker to have scheduler support without needing to use
the disk based version.
This commit is contained in:
Timothy Bish 2014-07-10 12:16:34 -04:00
parent 2b53036b27
commit 433912f79a
17 changed files with 1061 additions and 141 deletions

View File

@ -87,6 +87,7 @@ import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.broker.scheduler.SchedulerBroker;
import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.BrokerId;
@ -1863,7 +1864,14 @@ public class BrokerService implements Service {
if (jobSchedulerStore == null) {
if (!isPersistent()) {
return null;
this.jobSchedulerStore = new InMemoryJobSchedulerStore();
configureService(jobSchedulerStore);
try {
jobSchedulerStore.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
return this.jobSchedulerStore;
}
try {
@ -2799,7 +2807,7 @@ public class BrokerService implements Service {
* @return the schedulerSupport
*/
public boolean isSchedulerSupport() {
return this.schedulerSupport && (isPersistent() || jobSchedulerStore != null);
return this.schedulerSupport;
}
/**

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler.memory;
import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.broker.scheduler.JobSupport;
/**
* A simple in memory Job POJO.
*/
public class InMemoryJob implements Job {
private final String jobId;
private int repeat;
private long start;
private long nextTime;
private long delay;
private long period;
private String cronEntry;
private int executionCount;
private byte[] payload;
public InMemoryJob(String jobId) {
this.jobId = jobId;
}
@Override
public String getJobId() {
return jobId;
}
@Override
public int getRepeat() {
return repeat;
}
public void setRepeat(int repeat) {
this.repeat = repeat;
}
@Override
public long getStart() {
return start;
}
public void setStart(long start) {
this.start = start;
}
public long getNextTime() {
return nextTime;
}
public void setNextTime(long nextTime) {
this.nextTime = nextTime;
}
@Override
public long getDelay() {
return delay;
}
public void setDelay(long delay) {
this.delay = delay;
}
@Override
public long getPeriod() {
return period;
}
public void setPeriod(long period) {
this.period = period;
}
@Override
public String getCronEntry() {
return cronEntry;
}
public void setCronEntry(String cronEntry) {
this.cronEntry = cronEntry;
}
@Override
public byte[] getPayload() {
return payload;
}
public void setPayload(byte[] payload) {
this.payload = payload;
}
@Override
public String getStartTime() {
return JobSupport.getDateTime(getStart());
}
@Override
public String getNextExecutionTime() {
return JobSupport.getDateTime(getNextTime());
}
@Override
public int getExecutionCount() {
return executionCount;
}
public void incrementExecutionCount() {
this.executionCount++;
}
public void decrementRepeatCount() {
if (this.repeat > 0) {
this.repeat--;
}
}
/**
* @return true if this Job represents a Cron entry.
*/
public boolean isCron() {
return getCronEntry() != null && getCronEntry().length() > 0;
}
@Override
public int hashCode() {
return jobId.hashCode();
}
@Override
public String toString() {
return "Job: " + getJobId();
}
}

View File

@ -0,0 +1,482 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler.memory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.MessageFormatException;
import org.apache.activemq.broker.scheduler.CronParser;
import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.broker.scheduler.JobListener;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSupport;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IdGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements an in-memory JobScheduler instance.
*/
public class InMemoryJobScheduler implements JobScheduler {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobScheduler.class);
private static final IdGenerator ID_GENERATOR = new IdGenerator();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final String name;
private final TreeMap<Long, ScheduledTask> jobs = new TreeMap<Long, ScheduledTask>();
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean dispatchEnabled = new AtomicBoolean(false);
private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
private final Timer timer = new Timer();
public InMemoryJobScheduler(String name) {
this.name = name;
}
@Override
public String getName() throws Exception {
return name;
}
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
startDispatching();
LOG.trace("JobScheduler[{}] started", name);
}
}
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
stopDispatching();
timer.cancel();
jobs.clear();
LOG.trace("JobScheduler[{}] stopped", name);
}
}
public boolean isStarted() {
return started.get();
}
public boolean isDispatchEnabled() {
return dispatchEnabled.get();
}
@Override
public void startDispatching() throws Exception {
dispatchEnabled.set(true);
}
@Override
public void stopDispatching() throws Exception {
dispatchEnabled.set(false);
}
@Override
public void addListener(JobListener listener) throws Exception {
this.jobListeners.add(listener);
}
@Override
public void removeListener(JobListener listener) throws Exception {
this.jobListeners.remove(listener);
}
@Override
public void schedule(String jobId, ByteSequence payload, long delay) throws Exception {
doSchedule(jobId, payload, "", 0, delay, 0);
}
@Override
public void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception {
doSchedule(jobId, payload, cronEntry, 0, 0, 0);
}
@Override
public void schedule(String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws Exception {
doSchedule(jobId, payload, cronEntry, delay, period, repeat);
}
@Override
public void remove(long time) throws Exception {
doRemoveRange(time, time);
}
@Override
public void remove(String jobId) throws Exception {
doRemoveJob(jobId);
}
@Override
public void removeAllJobs() throws Exception {
doRemoveRange(0, Long.MAX_VALUE);
}
@Override
public void removeAllJobs(long start, long finish) throws Exception {
doRemoveRange(start, finish);
}
@Override
public long getNextScheduleTime() throws Exception {
long nextExecutionTime = -1L;
lock.readLock().lock();
try {
if (!jobs.isEmpty()) {
nextExecutionTime = jobs.entrySet().iterator().next().getKey();
}
} finally {
lock.readLock().unlock();
}
return nextExecutionTime;
}
@Override
public List<Job> getNextScheduleJobs() throws Exception {
List<Job> result = new ArrayList<Job>();
lock.readLock().lock();
try {
if (!jobs.isEmpty()) {
result.addAll(jobs.entrySet().iterator().next().getValue().getAllJobs());
}
} finally {
lock.readLock().unlock();
}
return result;
}
@Override
public List<Job> getAllJobs() throws Exception {
final List<Job> result = new ArrayList<Job>();
this.lock.readLock().lock();
try {
for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) {
result.addAll(entry.getValue().getAllJobs());
}
} finally {
this.lock.readLock().unlock();
}
return result;
}
@Override
public List<Job> getAllJobs(long start, long finish) throws Exception {
final List<Job> result = new ArrayList<Job>();
this.lock.readLock().lock();
try {
for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) {
long jobTime = entry.getKey();
if (start <= jobTime && jobTime <= finish) {
result.addAll(entry.getValue().getAllJobs());
}
}
} finally {
this.lock.readLock().unlock();
}
return result;
}
@Override
public int hashCode() {
return name.hashCode();
}
@Override
public String toString() {
return "JobScheduler: " + name;
}
private void doSchedule(final String jobId, final ByteSequence payload, final String cronEntry, long delay, long period, int repeat) throws IOException {
long startTime = System.currentTimeMillis();
long executionTime = 0;
// round startTime - so we can schedule more jobs at the same time
startTime = (startTime / 1000) * 1000;
if (cronEntry != null && cronEntry.length() > 0) {
try {
executionTime = CronParser.getNextScheduledTime(cronEntry, startTime);
} catch (MessageFormatException e) {
throw new IOException(e.getMessage());
}
}
if (executionTime == 0) {
// start time not set by CRON - so it it to the current time
executionTime = startTime;
}
if (delay > 0) {
executionTime += delay;
} else {
executionTime += period;
}
InMemoryJob newJob = new InMemoryJob(jobId);
newJob.setStart(startTime);
newJob.setCronEntry(cronEntry);
newJob.setDelay(delay);
newJob.setPeriod(period);
newJob.setRepeat(repeat);
newJob.setNextTime(executionTime);
newJob.setPayload(payload.getData());
LOG.trace("JobScheduler adding job[{}] to fire at: {}", jobId, JobSupport.getDateTime(executionTime));
lock.writeLock().lock();
try {
ScheduledTask task = jobs.get(executionTime);
if (task == null) {
task = new ScheduledTask(executionTime);
task.add(newJob);
jobs.put(task.getExecutionTime(), task);
timer.schedule(task, new Date(newJob.getNextTime()));
} else {
task.add(newJob);
}
} finally {
lock.writeLock().unlock();
}
}
private void doReschedule(InMemoryJob job, long nextExecutionTime) {
job.setNextTime(nextExecutionTime);
job.incrementExecutionCount();
job.decrementRepeatCount();
LOG.trace("JobScheduler rescheduling job[{}] to fire at: {}", job.getJobId(), JobSupport.getDateTime(nextExecutionTime));
lock.writeLock().lock();
try {
ScheduledTask task = jobs.get(nextExecutionTime);
if (task == null) {
task = new ScheduledTask(nextExecutionTime);
task.add(job);
jobs.put(task.getExecutionTime(), task);
timer.schedule(task, new Date(task.getExecutionTime()));
} else {
task.add(job);
}
} finally {
lock.writeLock().unlock();
}
}
private void doRemoveJob(String jobId) throws IOException {
this.lock.writeLock().lock();
try {
Iterator<Map.Entry<Long, ScheduledTask>> scheduled = jobs.entrySet().iterator();
while (scheduled.hasNext()) {
Map.Entry<Long, ScheduledTask> entry = scheduled.next();
ScheduledTask task = entry.getValue();
if (task.remove(jobId)) {
LOG.trace("JobScheduler removing job[{}]", jobId);
if (task.isEmpty()) {
task.cancel();
scheduled.remove();
}
return;
}
}
} finally {
this.lock.writeLock().unlock();
}
}
private void doRemoveRange(long start, long end) throws IOException {
this.lock.writeLock().lock();
try {
Iterator<Map.Entry<Long, ScheduledTask>> scheduled = jobs.entrySet().iterator();
while (scheduled.hasNext()) {
Map.Entry<Long, ScheduledTask> entry = scheduled.next();
long executionTime = entry.getKey();
if (start <= executionTime && executionTime <= end) {
ScheduledTask task = entry.getValue();
task.cancel();
scheduled.remove();
}
// Don't look beyond the end range.
if (end < executionTime) {
break;
}
}
} finally {
this.lock.writeLock().unlock();
}
}
private boolean canDispatch() {
return isStarted() && isDispatchEnabled();
}
private long calculateNextExecutionTime(InMemoryJob job, long currentTime, int repeat) throws MessageFormatException {
long result = currentTime;
String cron = job.getCronEntry();
if (cron != null && cron.length() > 0) {
result = CronParser.getNextScheduledTime(cron, result);
} else if (job.getRepeat() != 0) {
result += job.getPeriod();
}
return result;
}
private void dispatch(InMemoryJob job) throws IllegalStateException, IOException {
if (canDispatch()) {
LOG.debug("Firing: {}", job);
for (JobListener l : jobListeners) {
l.scheduledJob(job.getJobId(), new ByteSequence(job.getPayload()));
}
}
}
/*
* A TimerTask instance that can aggregate the execution of a number
* scheduled Jobs and handle rescheduling the jobs that require it.
*/
private class ScheduledTask extends TimerTask {
private final Map<String, InMemoryJob> jobs = new TreeMap<String, InMemoryJob>();
private final long executionTime;
public ScheduledTask(long executionTime) {
this.executionTime = executionTime;
}
public long getExecutionTime() {
return executionTime;
}
/**
* @return a Collection containing all the managed jobs for this task.
*/
public Collection<InMemoryJob> getAllJobs() {
return new ArrayList<InMemoryJob>(jobs.values());
}
/**
* @return true if the internal list of jobs has become empty.
*/
public boolean isEmpty() {
return jobs.isEmpty();
}
/**
* Adds the job to the internal list of scheduled Jobs managed by this task.
*
* @param newJob
* the new job to add to the list of Jobs.
*/
public void add(InMemoryJob newJob) {
this.jobs.put(newJob.getJobId(), newJob);
}
/**
* Removes the job from the internal list of scheduled Jobs managed by this task.
*
* @param jobId
* the job ID to remove from the list of Jobs.
*
* @return true if the job was removed from the list of managed jobs.
*/
public boolean remove(String jobId) {
return jobs.remove(jobId) != null;
}
@Override
public void run() {
if (!isStarted()) {
return;
}
try {
long currentTime = System.currentTimeMillis();
lock.writeLock().lock();
try {
// Remove this entry as it will now fire any scheduled jobs, if new
// jobs or rescheduled jobs land in the same time slot we want them
// to go into a new ScheduledTask in the Timer instance.
InMemoryJobScheduler.this.jobs.remove(executionTime);
} finally {
lock.writeLock().unlock();
}
long nextExecutionTime = 0;
for (InMemoryJob job : jobs.values()) {
if (!isStarted()) {
break;
}
int repeat = job.getRepeat();
nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat);
if (!job.isCron()) {
dispatch(job);
if (repeat != 0) {
// Reschedule for the next time, the scheduler will take care of
// updating the repeat counter on the update.
doReschedule(job, nextExecutionTime);
}
} else {
if (repeat == 0) {
// This is a non-repeating Cron entry so we can fire and forget it.
dispatch(job);
}
if (nextExecutionTime > currentTime) {
// Reschedule the cron job as a new event, if the cron entry signals
// a repeat then it will be stored separately and fired as a normal
// event with decrementing repeat.
doReschedule(job, nextExecutionTime);
if (repeat != 0) {
// we have a separate schedule to run at this time
// so the cron job is used to set of a separate schedule
// hence we won't fire the original cron job to the
// listeners but we do need to start a separate schedule
String jobId = ID_GENERATOR.generateId();
ByteSequence payload = new ByteSequence(job.getPayload());
schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
}
}
}
}
} catch (Throwable e) {
LOG.error("Error while processing scheduled job(s).", e);
}
}
}
}

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler.memory;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An in-memory JobSchedulerStore implementation used for Brokers that have persistence
* disabled or when the JobSchedulerStore usage doesn't require a file or DB based store
* implementation allowing for better performance.
*/
public class InMemoryJobSchedulerStore extends ServiceSupport implements JobSchedulerStore {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobSchedulerStore.class);
private final ReentrantLock lock = new ReentrantLock();
private final Map<String, InMemoryJobScheduler> schedulers = new HashMap<String, InMemoryJobScheduler>();
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
for (InMemoryJobScheduler scheduler : schedulers.values()) {
try {
scheduler.stop();
} catch (Exception e) {
LOG.error("Failed to stop scheduler: {}", scheduler.getName(), e);
}
}
}
@Override
protected void doStart() throws Exception {
for (InMemoryJobScheduler scheduler : schedulers.values()) {
try {
scheduler.start();
} catch (Exception e) {
LOG.error("Failed to start scheduler: {}", scheduler.getName(), e);
}
}
}
@Override
public JobScheduler getJobScheduler(String name) throws Exception {
this.lock.lock();
try {
InMemoryJobScheduler result = this.schedulers.get(name);
if (result == null) {
LOG.debug("Creating new in-memory scheduler: {}", name);
result = new InMemoryJobScheduler(name);
this.schedulers.put(name, result);
if (isStarted()) {
result.start();
}
}
return result;
} finally {
this.lock.unlock();
}
}
@Override
public boolean removeJobScheduler(String name) throws Exception {
boolean result = false;
this.lock.lock();
try {
InMemoryJobScheduler scheduler = this.schedulers.remove(name);
result = scheduler != null;
if (result) {
LOG.debug("Removing in-memory Job Scheduler: {}", name);
scheduler.stop();
this.schedulers.remove(name);
}
} finally {
this.lock.unlock();
}
return result;
}
//---------- Methods that don't really apply to this implementation ------//
@Override
public long size() {
return 0;
}
@Override
public File getDirectory() {
return null;
}
@Override
public void setDirectory(File directory) {
}
}

View File

@ -16,7 +16,12 @@
*/
package org.apache.activemq.broker.scheduler;
import java.io.File;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@ -32,25 +37,12 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(BlockJUnit4ClassRunner.class)
public class JmsCronSchedulerTest extends EmbeddedBrokerTestSupport {
@Rule
public TestName testName = new TestName();
public class JmsCronSchedulerTest extends JobSchedulerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(JmsCronSchedulerTest.class);
@ -123,39 +115,4 @@ public class JmsCronSchedulerTest extends EmbeddedBrokerTestSupport {
assertNotNull(consumer.receiveNoWait());
assertNull(consumer.receiveNoWait());
}
@Before
public void setUp() throws Exception {
LOG.info("Starting test {}", testName.getMethodName());
bindAddress = "vm://localhost";
super.setUp();
}
@After
public void tearDown() throws Exception {
super.tearDown();
}
@Override
protected BrokerService createBroker() throws Exception {
return createBroker(true);
}
protected BrokerService createBroker(boolean delete) throws Exception {
File schedulerDirectory = new File("target/scheduler");
if (delete) {
IOHelper.mkdirs(schedulerDirectory);
IOHelper.deleteChildren(schedulerDirectory);
}
BrokerService answer = new BrokerService();
answer.setPersistent(true);
answer.getManagementContext().setCreateConnector(false);
answer.setDeleteAllMessagesOnStartup(true);
answer.setDataDirectory("target");
answer.setSchedulerDirectoryFile(schedulerDirectory);
answer.setSchedulerSupport(true);
answer.setUseJmx(false);
answer.addConnector(bindAddress);
return answer;
}
}

View File

@ -16,7 +16,9 @@
*/
package org.apache.activemq.broker.scheduler;
import java.io.File;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -32,15 +34,14 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
import org.junit.Test;
public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
public class JmsSchedulerTest extends JobSchedulerTestSupport {
@Test
public void testCron() throws Exception {
final int COUNT = 10;
final AtomicInteger count = new AtomicInteger();
@ -80,6 +81,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
assertEquals(COUNT, count.get());
}
@Test
public void testSchedule() throws Exception {
final int COUNT = 1;
Connection connection = createConnection();
@ -112,6 +114,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
assertEquals(latch.getCount(), 0);
}
@Test
public void testTransactedSchedule() throws Exception {
final int COUNT = 1;
Connection connection = createConnection();
@ -150,7 +153,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
assertEquals(latch.getCount(), 0);
}
@Test
public void testScheduleRepeated() throws Exception {
final int NUMBER = 10;
final AtomicInteger count = new AtomicInteger();
@ -186,6 +189,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
assertEquals(NUMBER, count.get());
}
@Test
public void testScheduleRestart() throws Exception {
// send a message
Connection connection = createConnection();
@ -222,12 +226,12 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
producer.close();
}
@Test
public void testJobSchedulerStoreUsage() throws Exception {
// Shrink the store limit down so we get the producer to block
broker.getSystemUsage().getJobSchedulerUsage().setLimit(10 * 1024);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection conn = factory.createConnection();
conn.start();
@ -281,32 +285,4 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
assertEquals("Consumer did not receive all messages.", 0, latch.getCount());
}
@Override
protected void setUp() throws Exception {
bindAddress = "vm://localhost";
super.setUp();
}
@Override
protected BrokerService createBroker() throws Exception {
return createBroker(true);
}
protected BrokerService createBroker(boolean delete) throws Exception {
File schedulerDirectory = new File("target/scheduler");
if (delete) {
IOHelper.mkdirs(schedulerDirectory);
IOHelper.deleteChildren(schedulerDirectory);
}
BrokerService answer = new BrokerService();
answer.setPersistent(true);
answer.setDeleteAllMessagesOnStartup(true);
answer.setDataDirectory("target");
answer.setSchedulerDirectoryFile(schedulerDirectory);
answer.setSchedulerSupport(true);
answer.setUseJmx(false);
answer.addConnector(bindAddress);
return answer;
}
}

View File

@ -104,7 +104,7 @@ public class JobSchedulerJmxManagementTests extends JobSchedulerTestSupport {
long toLate = System.currentTimeMillis() + 63 * 1000;
String next = view.getNextScheduleTime();
long nextTime = JobSupport.getDataTime(next);
LOG.info("Next Scheduled Time: {}", next);
LOG.info("Next Scheduled Time: {} should be after: {}", next, JobSupport.getDateTime(before));
assertTrue(nextTime > before);
assertTrue(nextTime < toLate);
}

View File

@ -275,11 +275,14 @@ public class JobSchedulerTest {
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
startStore(directory);
}
protected JobSchedulerStore createJobSchedulerStore() throws Exception {
return new JobSchedulerStoreImpl();
}
protected void startStore(File directory) throws Exception {
store = new JobSchedulerStoreImpl();
store = createJobSchedulerStore();
store.setDirectory(directory);
store.start();
scheduler = store.getJobScheduler("test");

View File

@ -82,6 +82,10 @@ public class JobSchedulerTestSupport {
return false;
}
protected boolean isPersistent() {
return true;
}
protected JobSchedulerViewMBean getJobSchedulerMBean() throws Exception {
ObjectName objectName = broker.getAdminView().getJMSJobScheduler();
JobSchedulerViewMBean scheduler = null;
@ -101,7 +105,7 @@ public class JobSchedulerTestSupport {
}
BrokerService answer = new BrokerService();
answer.setPersistent(true);
answer.setPersistent(isPersistent());
answer.setDeleteAllMessagesOnStartup(true);
answer.setDataDirectory("target");
answer.setSchedulerDirectoryFile(schedulerDirectory);

View File

@ -19,7 +19,6 @@ package org.apache.activemq.broker.scheduler;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -31,34 +30,10 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class JobSchedulerTxTest {
private BrokerService broker;
private final String connectionUri = "vm://localhost";
private final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
private final ActiveMQQueue destination = new ActiveMQQueue("Target.Queue");
@Before
public void setUp() throws Exception {
broker = createBroker();
broker.start();
broker.waitUntilStarted();
}
@After
public void tearDown() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
public class JobSchedulerTxTest extends JobSchedulerTestSupport {
@Test
public void testTxSendWithRollback() throws Exception {
@ -129,29 +104,4 @@ public class JobSchedulerTxTest {
latch.await(5, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
}
protected Connection createConnection() throws Exception {
return cf.createConnection();
}
protected BrokerService createBroker() throws Exception {
return createBroker(true);
}
protected BrokerService createBroker(boolean delete) throws Exception {
File schedulerDirectory = new File("target/scheduler");
if (delete) {
IOHelper.mkdirs(schedulerDirectory);
IOHelper.deleteChildren(schedulerDirectory);
}
BrokerService answer = new BrokerService();
answer.setPersistent(true);
answer.setDeleteAllMessagesOnStartup(true);
answer.setDataDirectory("target");
answer.setSchedulerDirectoryFile(schedulerDirectory);
answer.setSchedulerSupport(true);
answer.setUseJmx(false);
answer.addConnector(connectionUri);
return answer;
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler.memory;
import org.apache.activemq.broker.scheduler.JmsSchedulerTest;
/**
* Test for the In-Memory Scheduler variant.
*/
public class InMemeoryJmsSchedulerTest extends JmsSchedulerTest {
@Override
protected boolean isPersistent() {
return false;
}
@Override
public void testScheduleRestart() throws Exception {
// No persistence so scheduled jobs don't survive restart.
}
@Override
public void testJobSchedulerStoreUsage() throws Exception {
// No store usage numbers for in-memory store.
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler.memory;
import org.apache.activemq.broker.scheduler.JmsCronSchedulerTest;
/**
* In memory version of the cron scheduler test.
*/
public class InMemoryJmsCronSchedulerTest extends JmsCronSchedulerTest {
@Override
protected boolean isPersistent() {
return false;
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler.memory;
import org.apache.activemq.broker.scheduler.JobSchedulerJmxManagementTests;
/**
* Test for the In-Memory scheduler's JMX management features.
*/
public class InMemoryJobSchedulerJmxManagementTests extends JobSchedulerJmxManagementTests {
@Override
protected boolean isPersistent() {
return false;
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler.memory;
import org.apache.activemq.broker.scheduler.JobSchedulerManagementTest;
/**
* Tests management of in memory scheduler via JMS client.
*/
public class InMemoryJobSchedulerManagementTest extends JobSchedulerManagementTest {
@Override
protected boolean isPersistent() {
return false;
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler.memory;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class InMemoryJobSchedulerStoreTest {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobSchedulerStoreTest.class);
@Test(timeout = 120 * 1000)
public void testRestart() throws Exception {
InMemoryJobSchedulerStore store = new InMemoryJobSchedulerStore();
File directory = new File("target/test/ScheduledDB");
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store.setDirectory(directory);
final int NUMBER = 1000;
store.start();
List<ByteSequence> list = new ArrayList<ByteSequence>();
for (int i = 0; i < NUMBER; i++) {
ByteSequence buff = new ByteSequence(new String("testjob" + i).getBytes());
list.add(buff);
}
JobScheduler js = store.getJobScheduler("test");
js.startDispatching();
int count = 0;
long startTime = 10 * 60 * 1000;
long period = startTime;
for (ByteSequence job : list) {
js.schedule("id:" + (count++), job, "", startTime, period, -1);
}
List<Job> test = js.getAllJobs();
LOG.debug("Found {} jobs in the store before restart", test.size());
assertEquals(list.size(), test.size());
store.stop();
store.start();
js = store.getJobScheduler("test");
test = js.getAllJobs();
LOG.debug("Found {} jobs in the store after restart", test.size());
assertEquals(0, test.size());
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler.memory;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.broker.scheduler.JobSchedulerTest;
/**
* In-Memory store based variation of the JobSchedulerTest
*/
public class InMemoryJobSchedulerTest extends JobSchedulerTest {
@Override
public void testAddStopThenDeliver() throws Exception {
// In Memory store that's stopped doesn't retain the jobs.
}
@Override
protected JobSchedulerStore createJobSchedulerStore() throws Exception {
return new InMemoryJobSchedulerStore();
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler.memory;
import org.apache.activemq.broker.scheduler.JobSchedulerTxTest;
/**
* In memory version of the TX test case
*/
public class InMemoryJobSchedulerTxTest extends JobSchedulerTxTest {
@Override
protected boolean isPersistent() {
return false;
}
}