From c5622d674d91a779159b8fb2109751fc76110dd6 Mon Sep 17 00:00:00 2001 From: jbonofre Date: Wed, 2 Dec 2020 10:06:11 +0100 Subject: [PATCH] Quick fix on InMemoryJobSchedulerTest --- .../memory/InMemoryJobSchedulerTest.java | 271 +++++++++++++++++- 1 file changed, 262 insertions(+), 9 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java index 36771b0352..9b86693054 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java @@ -16,21 +16,274 @@ */ package org.apache.activemq.broker.scheduler.memory; -import org.apache.activemq.broker.scheduler.JobSchedulerStore; -import org.apache.activemq.broker.scheduler.JobSchedulerTest; +import org.apache.activemq.broker.scheduler.*; +import org.apache.activemq.util.ByteSequence; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Calendar; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * In-Memory store based variation of the JobSchedulerTest */ -public class InMemoryJobSchedulerTest extends JobSchedulerTest { +public class InMemoryJobSchedulerTest { - @Override - public void testAddStopThenDeliver() throws Exception { - // In Memory store that's stopped doesn't retain the jobs. + private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerTest.class); + + private JobSchedulerStore store; + private JobScheduler scheduler; + + @Test(timeout = 60000) + public void testAddLongStringByteSequence() throws Exception { + final int COUNT = 10; + final CountDownLatch latch = new CountDownLatch(COUNT); + scheduler.addListener(new JobListener() { + @Override + public void scheduledJob(String id, ByteSequence job) { + latch.countDown(); + } + + }); + for (int i = 0; i < COUNT; i++) { + String test = new String("test" + i); + scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), 1000); + } + latch.await(5, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); } - @Override - protected JobSchedulerStore createJobSchedulerStore() throws Exception { - return new InMemoryJobSchedulerStore(); + @Test(timeout = 60000) + public void testAddCronAndByteSequence() throws Exception { + + final CountDownLatch latch = new CountDownLatch(1); + scheduler.addListener(new JobListener() { + @Override + public void scheduledJob(String id, ByteSequence job) { + latch.countDown(); + } + }); + + Calendar current = Calendar.getInstance(); + current.add(Calendar.MINUTE, 1); + int minutes = current.get(Calendar.MINUTE); + int hour = current.get(Calendar.HOUR_OF_DAY); + int day = current.get(Calendar.DAY_OF_WEEK) - 1; + + String cronTab = String.format("%d %d * * %d", minutes, hour, day); + + String str = new String("test1"); + scheduler.schedule("id:1", new ByteSequence(str.getBytes()), cronTab, 0, 0, 0); + + // need a little slack so go over 60 seconds + assertTrue(latch.await(70, TimeUnit.SECONDS)); + assertEquals(0, latch.getCount()); + } + + @Test(timeout = 60000) + public void testAddLongLongIntStringByteSequence() throws Exception { + final int COUNT = 10; + final CountDownLatch latch = new CountDownLatch(COUNT); + scheduler.addListener(new JobListener() { + @Override + public void scheduledJob(String id, ByteSequence job) { + latch.countDown(); + } + }); + long time = 2000; + for (int i = 0; i < COUNT; i++) { + String test = new String("test" + i); + scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), "", time, 10, -1); + } + assertTrue(latch.getCount() == COUNT); + latch.await(3000, TimeUnit.SECONDS); + assertTrue(latch.getCount() == 0); + } + + @Test(timeout = 60000) + @Ignore + public void testAddStopThenDeliver() throws Exception { + final int COUNT = 10; + final CountDownLatch latch = new CountDownLatch(COUNT); + long time = 2000; + for (int i = 0; i < COUNT; i++) { + String test = new String("test" + i); + scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), "", time, 1000, -1); + } + File directory = store.getDirectory(); + tearDown(); + setUp(); + scheduler.addListener(new JobListener() { + @Override + public void scheduledJob(String id, ByteSequence job) { + latch.countDown(); + } + }); + assertTrue(latch.getCount() == COUNT); + latch.await(3000, TimeUnit.SECONDS); + assertTrue(latch.getCount() == 0); + } + + @Test(timeout = 60000) + public void testRemoveLong() throws Exception { + final int COUNT = 10; + + long time = 60000; + for (int i = 0; i < COUNT; i++) { + String str = new String("test" + i); + scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), "", time, 1000, -1); + } + + int size = scheduler.getAllJobs().size(); + assertEquals(size, COUNT); + + long removeTime = scheduler.getNextScheduleTime(); + scheduler.remove(removeTime); + + // If all jobs are not started within the same second we need to call remove again + if (size != 0) { + removeTime = scheduler.getNextScheduleTime(); + scheduler.remove(removeTime); + } + + size = scheduler.getAllJobs().size(); + assertEquals(0, size); + } + + @Test(timeout = 60000) + public void testRemoveString() throws Exception { + final int COUNT = 10; + final String test = "TESTREMOVE"; + long time = 20000; + + for (int i = 0; i < COUNT; i++) { + String str = new String("test" + i); + scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), "", time, 1000, -1); + if (i == COUNT / 2) { + scheduler.schedule(test, new ByteSequence(test.getBytes()), "", time, 1000, -1); + } + } + + int size = scheduler.getAllJobs().size(); + assertEquals(size, COUNT + 1); + scheduler.remove(test); + size = scheduler.getAllJobs().size(); + assertEquals(size, COUNT); + } + + @Test(timeout = 60000) + public void testGetExecutionCount() throws Exception { + final String jobId = "Job-1"; + long time = 10000; + final CountDownLatch done = new CountDownLatch(10); + + String str = new String("test"); + scheduler.schedule(jobId, new ByteSequence(str.getBytes()), "", time, 1000, 10); + + int size = scheduler.getAllJobs().size(); + assertEquals(size, 1); + + scheduler.addListener(new JobListener() { + @Override + public void scheduledJob(String id, ByteSequence job) { + LOG.info("Job exectued: {}", 11 - done.getCount()); + done.countDown(); + } + }); + + List jobs = scheduler.getNextScheduleJobs(); + assertEquals(1, jobs.size()); + Job job = jobs.get(0); + assertEquals(jobId, job.getJobId()); + assertEquals(0, job.getExecutionCount()); + assertTrue("Should have fired ten times.", done.await(60, TimeUnit.SECONDS)); + // The job is not updated on the last firing as it is removed from the store following + // it's last execution so the count will always be one less than the max firings. + assertTrue(job.getExecutionCount() >= 9); + } + + @Test(timeout = 60000) + public void testgetAllJobs() throws Exception { + final int COUNT = 10; + final String ID = "id:"; + long time = 20000; + + for (int i = 0; i < COUNT; i++) { + String str = new String("test" + i); + scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", time, 10 + i, -1); + } + + List list = scheduler.getAllJobs(); + + assertEquals(list.size(), COUNT); + int count = 0; + for (Job job : list) { + assertEquals(job.getJobId(), ID + count); + count++; + } + } + + @Test(timeout = 60000) + public void testgetAllJobsInRange() throws Exception { + final int COUNT = 10; + final String ID = "id:"; + long start = 10000; + + for (int i = 0; i < COUNT; i++) { + String str = new String("test" + i); + scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", start + (i * 1000), 10000 + i, 0); + } + + start = System.currentTimeMillis(); + long finish = start + 12000 + (COUNT * 1000); + List list = scheduler.getAllJobs(start, finish); + + assertEquals(COUNT, list.size()); + int count = 0; + for (Job job : list) { + assertEquals(job.getJobId(), ID + count); + count++; + } + } + + @Test(timeout = 60000) + public void testRemoveAllJobsInRange() throws Exception { + final int COUNT = 10; + final String ID = "id:"; + long start = 10000; + + for (int i = 0; i < COUNT; i++) { + String str = new String("test" + i); + scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", start + (i * 1000), 10000 + i, 0); + } + start = System.currentTimeMillis(); + long finish = start + 12000 + (COUNT * 1000); + scheduler.removeAllJobs(start, finish); + + assertTrue(scheduler.getAllJobs().isEmpty()); + } + + @Before + public void setUp() throws Exception { + store = new InMemoryJobSchedulerStore(); + store.start(); + scheduler = store.getJobScheduler("test"); + scheduler.startDispatching(); + } + + @After + public void tearDown() throws Exception { + scheduler.stopDispatching(); + store.stop(); } }