From 980162233fd3693d1f83d3f95985ac33affa7a8f Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 11 Nov 2016 14:48:49 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6504 Round the start time value not truncate to ensure delay falls on the correct side of the scheduling block. --- .../interop/AmqpScheduledMessageTest.java | 17 +++++++++--- .../memory/InMemoryJobScheduler.java | 16 ++++++------ .../kahadb/scheduler/JobSchedulerImpl.java | 26 +++++++++---------- 3 files changed, 35 insertions(+), 24 deletions(-) diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java index b91dac0193..14dcf8c6bd 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.Date; import java.util.concurrent.TimeUnit; import javax.jms.Connection; @@ -165,6 +166,8 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport { @Test public void testScheduleWithDelay() throws Exception { + final long DELAY = 5000; + AmqpClient client = createAmqpClient(); AmqpConnection connection = trackConnection(client.connect()); AmqpSession session = connection.createSession(); @@ -179,9 +182,10 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport { final QueueViewMBean queueView = getProxyToQueue(getTestName()); assertNotNull(queueView); + long sendTime = System.currentTimeMillis(); + AmqpMessage message = new AmqpMessage(); - long delay = 5000; - message.setMessageAnnotation("x-opt-delivery-delay", delay); + message.setMessageAnnotation("x-opt-delivery-delay", DELAY); message.setText("Test-Message"); sender.send(message); sender.close(); @@ -203,10 +207,17 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport { fail("Should read the message"); } + long receivedTime = System.currentTimeMillis(); + assertNotNull(delivered); Long msgDeliveryTime = (Long) delivered.getMessageAnnotation("x-opt-delivery-delay"); assertNotNull(msgDeliveryTime); - assertEquals(delay, msgDeliveryTime.longValue()); + assertEquals(DELAY, msgDeliveryTime.longValue()); + + long totalDelay = receivedTime - sendTime; + LOG.debug("Sent at: {}, received at: {} ", new Date(sendTime), new Date(receivedTime), totalDelay); + + assertTrue("Delay not as expected: " + totalDelay, receivedTime - sendTime >= DELAY); connection.close(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java index bd2aaf55ca..3e07770d83 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java @@ -54,10 +54,10 @@ public class InMemoryJobScheduler implements JobScheduler { private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final String name; - private final TreeMap jobs = new TreeMap(); + private final TreeMap jobs = new TreeMap<>(); private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean dispatchEnabled = new AtomicBoolean(false); - private final List jobListeners = new CopyOnWriteArrayList(); + private final List jobListeners = new CopyOnWriteArrayList<>(); private final Timer timer = new Timer(); public InMemoryJobScheduler(String name) { @@ -165,7 +165,7 @@ public class InMemoryJobScheduler implements JobScheduler { @Override public List getNextScheduleJobs() throws Exception { - List result = new ArrayList(); + List result = new ArrayList<>(); lock.readLock().lock(); try { if (!jobs.isEmpty()) { @@ -179,7 +179,7 @@ public class InMemoryJobScheduler implements JobScheduler { @Override public List getAllJobs() throws Exception { - final List result = new ArrayList(); + final List result = new ArrayList<>(); this.lock.readLock().lock(); try { for (Map.Entry entry : jobs.entrySet()) { @@ -194,7 +194,7 @@ public class InMemoryJobScheduler implements JobScheduler { @Override public List getAllJobs(long start, long finish) throws Exception { - final List result = new ArrayList(); + final List result = new ArrayList<>(); this.lock.readLock().lock(); try { for (Map.Entry entry : jobs.entrySet()) { @@ -223,7 +223,7 @@ public class InMemoryJobScheduler implements JobScheduler { long startTime = System.currentTimeMillis(); long executionTime = 0; // round startTime - so we can schedule more jobs at the same time - startTime = (startTime / 1000) * 1000; + startTime = ((startTime + 500) / 1000) * 1000; if (cronEntry != null && cronEntry.length() > 0) { try { executionTime = CronParser.getNextScheduledTime(cronEntry, startTime); @@ -369,7 +369,7 @@ public class InMemoryJobScheduler implements JobScheduler { */ private class ScheduledTask extends TimerTask { - private final Map jobs = new TreeMap(); + private final Map jobs = new TreeMap<>(); private final long executionTime; public ScheduledTask(long executionTime) { @@ -384,7 +384,7 @@ public class InMemoryJobScheduler implements JobScheduler { * @return a Collection containing all the managed jobs for this task. */ public Collection getAllJobs() { - return new ArrayList(jobs.values()); + return new ArrayList<>(jobs.values()); } /** diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java index 82b9ff581c..d49acafb3a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java @@ -57,7 +57,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch private BTreeIndex> index; private Thread thread; private final AtomicBoolean started = new AtomicBoolean(false); - private final List jobListeners = new CopyOnWriteArrayList(); + private final List jobListeners = new CopyOnWriteArrayList<>(); private static final IdGenerator ID_GENERATOR = new IdGenerator(); private final ScheduleTime scheduleTime = new ScheduleTime(); @@ -132,7 +132,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch @Override public List getNextScheduleJobs() throws IOException { - final List result = new ArrayList(); + final List result = new ArrayList<>(); this.store.readLockIndex(); try { this.store.getPageFile().tx().execute(new Transaction.Closure() { @@ -169,7 +169,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch @Override public List getAllJobs() throws IOException { - final List result = new ArrayList(); + final List result = new ArrayList<>(); this.store.readLockIndex(); try { this.store.getPageFile().tx().execute(new Transaction.Closure() { @@ -198,7 +198,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch @Override public List getAllJobs(final long start, final long finish) throws IOException { - final List result = new ArrayList(); + final List result = new ArrayList<>(); this.store.readLockIndex(); try { this.store.getPageFile().tx().execute(new Transaction.Closure() { @@ -229,7 +229,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch long startTime = System.currentTimeMillis(); // round startTime - so we can schedule more jobs // at the same time - startTime = (startTime / 1000) * 1000; + startTime = ((startTime + 500) / 1000) * 1000; long time = 0; if (cronEntry != null && cronEntry.length() > 0) { try { @@ -329,7 +329,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch values = this.index.remove(tx, nextExecutionTime); } if (values == null) { - values = new ArrayList(); + values = new ArrayList<>(); } // There can never be more than one instance of the same JobId scheduled at any @@ -407,7 +407,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch target = this.index.remove(tx, command.getNextExecutionTime()); } if (target == null) { - target = new ArrayList(); + target = new ArrayList<>(); } target.add(result); @@ -568,7 +568,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch * @throws IOException if an error occurs during the remove operation. */ protected void removeInRange(Transaction tx, long start, long finish, Location location) throws IOException { - List keys = new ArrayList(); + List keys = new ArrayList<>(); for (Iterator>> i = this.index.iterator(tx, start); i.hasNext();) { Map.Entry> entry = i.next(); if (entry.getKey().longValue() <= finish) { @@ -662,7 +662,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch * @throws IOException if an error occurs walking the scheduler tree. */ protected List getAllScheduledJobs(Transaction tx) throws IOException { - List references = new ArrayList(); + List references = new ArrayList<>(); for (Iterator>> i = this.index.iterator(tx); i.hasNext();) { Map.Entry> entry = i.next(); @@ -709,8 +709,8 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch // needed before firing the job event. Map.Entry> first = getNextToSchedule(); if (first != null) { - List list = new ArrayList(first.getValue()); - List toRemove = new ArrayList(list.size()); + List list = new ArrayList<>(first.getValue()); + List toRemove = new ArrayList<>(list.size()); final long executionTime = first.getKey(); long nextExecutionTime = 0; if (executionTime <= currentTime) { @@ -852,7 +852,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch } void createIndexes(Transaction tx) throws IOException { - this.index = new BTreeIndex>(this.store.getPageFile(), tx.allocate().getPageId()); + this.index = new BTreeIndex<>(this.store.getPageFile(), tx.allocate().getPageId()); } void load(Transaction tx) throws IOException { @@ -863,7 +863,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch void read(DataInput in) throws IOException { this.name = in.readUTF(); - this.index = new BTreeIndex>(this.store.getPageFile(), in.readLong()); + this.index = new BTreeIndex<>(this.store.getPageFile(), in.readLong()); this.index.setKeyMarshaller(LongMarshaller.INSTANCE); this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE); }