mirror of https://github.com/apache/activemq.git
Round the start time value not truncate to ensure delay falls on the correct side of the scheduling block.
This commit is contained in:
parent
9d6bc3a5d8
commit
980162233f
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
|
@ -165,6 +166,8 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testScheduleWithDelay() throws Exception {
|
public void testScheduleWithDelay() throws Exception {
|
||||||
|
final long DELAY = 5000;
|
||||||
|
|
||||||
AmqpClient client = createAmqpClient();
|
AmqpClient client = createAmqpClient();
|
||||||
AmqpConnection connection = trackConnection(client.connect());
|
AmqpConnection connection = trackConnection(client.connect());
|
||||||
AmqpSession session = connection.createSession();
|
AmqpSession session = connection.createSession();
|
||||||
|
@ -179,9 +182,10 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
|
||||||
final QueueViewMBean queueView = getProxyToQueue(getTestName());
|
final QueueViewMBean queueView = getProxyToQueue(getTestName());
|
||||||
assertNotNull(queueView);
|
assertNotNull(queueView);
|
||||||
|
|
||||||
|
long sendTime = System.currentTimeMillis();
|
||||||
|
|
||||||
AmqpMessage message = new AmqpMessage();
|
AmqpMessage message = new AmqpMessage();
|
||||||
long delay = 5000;
|
message.setMessageAnnotation("x-opt-delivery-delay", DELAY);
|
||||||
message.setMessageAnnotation("x-opt-delivery-delay", delay);
|
|
||||||
message.setText("Test-Message");
|
message.setText("Test-Message");
|
||||||
sender.send(message);
|
sender.send(message);
|
||||||
sender.close();
|
sender.close();
|
||||||
|
@ -203,10 +207,17 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
|
||||||
fail("Should read the message");
|
fail("Should read the message");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long receivedTime = System.currentTimeMillis();
|
||||||
|
|
||||||
assertNotNull(delivered);
|
assertNotNull(delivered);
|
||||||
Long msgDeliveryTime = (Long) delivered.getMessageAnnotation("x-opt-delivery-delay");
|
Long msgDeliveryTime = (Long) delivered.getMessageAnnotation("x-opt-delivery-delay");
|
||||||
assertNotNull(msgDeliveryTime);
|
assertNotNull(msgDeliveryTime);
|
||||||
assertEquals(delay, msgDeliveryTime.longValue());
|
assertEquals(DELAY, msgDeliveryTime.longValue());
|
||||||
|
|
||||||
|
long totalDelay = receivedTime - sendTime;
|
||||||
|
LOG.debug("Sent at: {}, received at: {} ", new Date(sendTime), new Date(receivedTime), totalDelay);
|
||||||
|
|
||||||
|
assertTrue("Delay not as expected: " + totalDelay, receivedTime - sendTime >= DELAY);
|
||||||
|
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,10 +54,10 @@ public class InMemoryJobScheduler implements JobScheduler {
|
||||||
|
|
||||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
private final String name;
|
private final String name;
|
||||||
private final TreeMap<Long, ScheduledTask> jobs = new TreeMap<Long, ScheduledTask>();
|
private final TreeMap<Long, ScheduledTask> jobs = new TreeMap<>();
|
||||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||||
private final AtomicBoolean dispatchEnabled = new AtomicBoolean(false);
|
private final AtomicBoolean dispatchEnabled = new AtomicBoolean(false);
|
||||||
private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
|
private final List<JobListener> jobListeners = new CopyOnWriteArrayList<>();
|
||||||
private final Timer timer = new Timer();
|
private final Timer timer = new Timer();
|
||||||
|
|
||||||
public InMemoryJobScheduler(String name) {
|
public InMemoryJobScheduler(String name) {
|
||||||
|
@ -165,7 +165,7 @@ public class InMemoryJobScheduler implements JobScheduler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Job> getNextScheduleJobs() throws Exception {
|
public List<Job> getNextScheduleJobs() throws Exception {
|
||||||
List<Job> result = new ArrayList<Job>();
|
List<Job> result = new ArrayList<>();
|
||||||
lock.readLock().lock();
|
lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
if (!jobs.isEmpty()) {
|
if (!jobs.isEmpty()) {
|
||||||
|
@ -179,7 +179,7 @@ public class InMemoryJobScheduler implements JobScheduler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Job> getAllJobs() throws Exception {
|
public List<Job> getAllJobs() throws Exception {
|
||||||
final List<Job> result = new ArrayList<Job>();
|
final List<Job> result = new ArrayList<>();
|
||||||
this.lock.readLock().lock();
|
this.lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) {
|
for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) {
|
||||||
|
@ -194,7 +194,7 @@ public class InMemoryJobScheduler implements JobScheduler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Job> getAllJobs(long start, long finish) throws Exception {
|
public List<Job> getAllJobs(long start, long finish) throws Exception {
|
||||||
final List<Job> result = new ArrayList<Job>();
|
final List<Job> result = new ArrayList<>();
|
||||||
this.lock.readLock().lock();
|
this.lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) {
|
for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) {
|
||||||
|
@ -223,7 +223,7 @@ public class InMemoryJobScheduler implements JobScheduler {
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
long executionTime = 0;
|
long executionTime = 0;
|
||||||
// round startTime - so we can schedule more jobs at the same time
|
// round startTime - so we can schedule more jobs at the same time
|
||||||
startTime = (startTime / 1000) * 1000;
|
startTime = ((startTime + 500) / 1000) * 1000;
|
||||||
if (cronEntry != null && cronEntry.length() > 0) {
|
if (cronEntry != null && cronEntry.length() > 0) {
|
||||||
try {
|
try {
|
||||||
executionTime = CronParser.getNextScheduledTime(cronEntry, startTime);
|
executionTime = CronParser.getNextScheduledTime(cronEntry, startTime);
|
||||||
|
@ -369,7 +369,7 @@ public class InMemoryJobScheduler implements JobScheduler {
|
||||||
*/
|
*/
|
||||||
private class ScheduledTask extends TimerTask {
|
private class ScheduledTask extends TimerTask {
|
||||||
|
|
||||||
private final Map<String, InMemoryJob> jobs = new TreeMap<String, InMemoryJob>();
|
private final Map<String, InMemoryJob> jobs = new TreeMap<>();
|
||||||
private final long executionTime;
|
private final long executionTime;
|
||||||
|
|
||||||
public ScheduledTask(long executionTime) {
|
public ScheduledTask(long executionTime) {
|
||||||
|
@ -384,7 +384,7 @@ public class InMemoryJobScheduler implements JobScheduler {
|
||||||
* @return a Collection containing all the managed jobs for this task.
|
* @return a Collection containing all the managed jobs for this task.
|
||||||
*/
|
*/
|
||||||
public Collection<InMemoryJob> getAllJobs() {
|
public Collection<InMemoryJob> getAllJobs() {
|
||||||
return new ArrayList<InMemoryJob>(jobs.values());
|
return new ArrayList<>(jobs.values());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
private BTreeIndex<Long, List<JobLocation>> index;
|
private BTreeIndex<Long, List<JobLocation>> index;
|
||||||
private Thread thread;
|
private Thread thread;
|
||||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||||
private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
|
private final List<JobListener> jobListeners = new CopyOnWriteArrayList<>();
|
||||||
private static final IdGenerator ID_GENERATOR = new IdGenerator();
|
private static final IdGenerator ID_GENERATOR = new IdGenerator();
|
||||||
private final ScheduleTime scheduleTime = new ScheduleTime();
|
private final ScheduleTime scheduleTime = new ScheduleTime();
|
||||||
|
|
||||||
|
@ -132,7 +132,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Job> getNextScheduleJobs() throws IOException {
|
public List<Job> getNextScheduleJobs() throws IOException {
|
||||||
final List<Job> result = new ArrayList<Job>();
|
final List<Job> result = new ArrayList<>();
|
||||||
this.store.readLockIndex();
|
this.store.readLockIndex();
|
||||||
try {
|
try {
|
||||||
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
|
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
|
||||||
|
@ -169,7 +169,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Job> getAllJobs() throws IOException {
|
public List<Job> getAllJobs() throws IOException {
|
||||||
final List<Job> result = new ArrayList<Job>();
|
final List<Job> result = new ArrayList<>();
|
||||||
this.store.readLockIndex();
|
this.store.readLockIndex();
|
||||||
try {
|
try {
|
||||||
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
|
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
|
||||||
|
@ -198,7 +198,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Job> getAllJobs(final long start, final long finish) throws IOException {
|
public List<Job> getAllJobs(final long start, final long finish) throws IOException {
|
||||||
final List<Job> result = new ArrayList<Job>();
|
final List<Job> result = new ArrayList<>();
|
||||||
this.store.readLockIndex();
|
this.store.readLockIndex();
|
||||||
try {
|
try {
|
||||||
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
|
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
|
||||||
|
@ -229,7 +229,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
// round startTime - so we can schedule more jobs
|
// round startTime - so we can schedule more jobs
|
||||||
// at the same time
|
// at the same time
|
||||||
startTime = (startTime / 1000) * 1000;
|
startTime = ((startTime + 500) / 1000) * 1000;
|
||||||
long time = 0;
|
long time = 0;
|
||||||
if (cronEntry != null && cronEntry.length() > 0) {
|
if (cronEntry != null && cronEntry.length() > 0) {
|
||||||
try {
|
try {
|
||||||
|
@ -329,7 +329,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
values = this.index.remove(tx, nextExecutionTime);
|
values = this.index.remove(tx, nextExecutionTime);
|
||||||
}
|
}
|
||||||
if (values == null) {
|
if (values == null) {
|
||||||
values = new ArrayList<JobLocation>();
|
values = new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
// There can never be more than one instance of the same JobId scheduled at any
|
// There can never be more than one instance of the same JobId scheduled at any
|
||||||
|
@ -407,7 +407,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
target = this.index.remove(tx, command.getNextExecutionTime());
|
target = this.index.remove(tx, command.getNextExecutionTime());
|
||||||
}
|
}
|
||||||
if (target == null) {
|
if (target == null) {
|
||||||
target = new ArrayList<JobLocation>();
|
target = new ArrayList<>();
|
||||||
}
|
}
|
||||||
target.add(result);
|
target.add(result);
|
||||||
|
|
||||||
|
@ -568,7 +568,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
* @throws IOException if an error occurs during the remove operation.
|
* @throws IOException if an error occurs during the remove operation.
|
||||||
*/
|
*/
|
||||||
protected void removeInRange(Transaction tx, long start, long finish, Location location) throws IOException {
|
protected void removeInRange(Transaction tx, long start, long finish, Location location) throws IOException {
|
||||||
List<Long> keys = new ArrayList<Long>();
|
List<Long> keys = new ArrayList<>();
|
||||||
for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx, start); i.hasNext();) {
|
for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx, start); i.hasNext();) {
|
||||||
Map.Entry<Long, List<JobLocation>> entry = i.next();
|
Map.Entry<Long, List<JobLocation>> entry = i.next();
|
||||||
if (entry.getKey().longValue() <= finish) {
|
if (entry.getKey().longValue() <= finish) {
|
||||||
|
@ -662,7 +662,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
* @throws IOException if an error occurs walking the scheduler tree.
|
* @throws IOException if an error occurs walking the scheduler tree.
|
||||||
*/
|
*/
|
||||||
protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
|
protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
|
||||||
List<JobLocation> references = new ArrayList<JobLocation>();
|
List<JobLocation> references = new ArrayList<>();
|
||||||
|
|
||||||
for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
|
for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
|
||||||
Map.Entry<Long, List<JobLocation>> entry = i.next();
|
Map.Entry<Long, List<JobLocation>> entry = i.next();
|
||||||
|
@ -709,8 +709,8 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
// needed before firing the job event.
|
// needed before firing the job event.
|
||||||
Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
|
Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
|
||||||
if (first != null) {
|
if (first != null) {
|
||||||
List<JobLocation> list = new ArrayList<JobLocation>(first.getValue());
|
List<JobLocation> list = new ArrayList<>(first.getValue());
|
||||||
List<JobLocation> toRemove = new ArrayList<JobLocation>(list.size());
|
List<JobLocation> toRemove = new ArrayList<>(list.size());
|
||||||
final long executionTime = first.getKey();
|
final long executionTime = first.getKey();
|
||||||
long nextExecutionTime = 0;
|
long nextExecutionTime = 0;
|
||||||
if (executionTime <= currentTime) {
|
if (executionTime <= currentTime) {
|
||||||
|
@ -852,7 +852,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
}
|
}
|
||||||
|
|
||||||
void createIndexes(Transaction tx) throws IOException {
|
void createIndexes(Transaction tx) throws IOException {
|
||||||
this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), tx.allocate().getPageId());
|
this.index = new BTreeIndex<>(this.store.getPageFile(), tx.allocate().getPageId());
|
||||||
}
|
}
|
||||||
|
|
||||||
void load(Transaction tx) throws IOException {
|
void load(Transaction tx) throws IOException {
|
||||||
|
@ -863,7 +863,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
|
|
||||||
void read(DataInput in) throws IOException {
|
void read(DataInput in) throws IOException {
|
||||||
this.name = in.readUTF();
|
this.name = in.readUTF();
|
||||||
this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), in.readLong());
|
this.index = new BTreeIndex<>(this.store.getPageFile(), in.readLong());
|
||||||
this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
|
this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
|
||||||
this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE);
|
this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue