diff --git a/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java b/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java index f598f046e0..fb63d871e1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java +++ b/activemq-core/src/main/java/org/apache/activemq/ScheduledMessage.java @@ -18,7 +18,7 @@ package org.apache.activemq; public interface ScheduledMessage { /** - * The time in milliseconds that a message will wait before being scheduled to be + * The time in milliseconds that a message will wait before being scheduled to be * delivered by the broker */ public static final String AMQ_SCHEDULED_DELAY = "AMQ_SCHEDULED_DELAY"; @@ -34,6 +34,48 @@ public interface ScheduledMessage { * Use a Cron tab entry to set the schedule */ public static final String AMQ_SCHEDULED_CRON = "AMQ_SCHEDULED_CRON"; - + /** + * An Id that is assigned to a Scheduled Message, this value is only available once the + * Message is scheduled, Messages sent to the Browse Destination or delivered to the + * assigned Destination will have this value set. + */ + public static final String AMQ_SCHEDULED_ID = "scheduledJobId"; + + /** + * Special destination to send Message's to with an assigned "action" that the Scheduler + * should perform such as removing a message. + */ + public static final String AMQ_SCHEDULER_MANAGEMENT_DESTINATION = "ActiveMQ.Scheduler.Management"; + /** + * Used to specify that a some operation should be performed on the Scheduled Message, + * the Message must have an assigned Id for this action to be taken. + */ + public static final String AMQ_SCHEDULER_ACTION = "AMQ_SCHEDULER_ACTION"; + + /** + * Indicates that a browse of the Scheduled Messages is being requested. + */ + public static final String AMQ_SCHEDULER_ACTION_BROWSE = "BROWSE"; + /** + * Indicates that a Scheduled Message is to be remove from the Scheduler, the Id of + * the scheduled message must be set as a property in order for this action to have + * any effect. + */ + public static final String AMQ_SCHEDULER_ACTION_REMOVE = "REMOVE"; + /** + * Indicates that all scheduled Messages should be removed. + */ + public static final String AMQ_SCHEDULER_ACTION_REMOVEALL = "REMOVEALL"; + + /** + * A property that holds the beginning of the time interval that the specified action should + * be applied within. Maps to a long value that specified time in milliseconds since UTC. + */ + public static final String AMQ_SCHEDULER_ACTION_START_TIME = "ACTION_START_TIME"; + /** + * A property that holds the end of the time interval that the specified action should be + * applied within. Maps to a long value that specified time in milliseconds since UTC. + */ + public static final String AMQ_SCHEDULER_ACTION_END_TIME = "ACTION_END_TIME"; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java index 438be97365..27f707e3af 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java @@ -18,11 +18,14 @@ package org.apache.activemq.broker.scheduler; import java.io.File; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; @@ -107,31 +110,80 @@ public class SchedulerBroker extends BrokerFilter implements JobListener { long period = 0; int repeat = 0; String cronEntry = ""; + String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID); Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON); Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD); Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY); - if (cronValue != null || periodValue != null || delayValue != null) { + String physicalName = messageSend.getDestination().getPhysicalName(); + boolean schedularManage = physicalName.regionMatches(true, 0, + ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0, + ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length()); + + if (schedularManage == true) { + + JobScheduler scheduler = getInternalScheduler(); + ActiveMQDestination replyTo = messageSend.getReplyTo(); + + String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION); + + if (action != null ) { + + Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME); + Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME); + + if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) { + + if( startTime != null && endTime != null ) { + + long start = (Long) TypeConversionSupport.convert(startTime, Long.class); + long finish = (Long) TypeConversionSupport.convert(endTime, Long.class); + + for (Job job : scheduler.getAllJobs(start, finish)) { + sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo); + } + } else { + for (Job job : scheduler.getAllJobs()) { + sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo); + } + } + } + if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) { + scheduler.remove(jobId); + } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) { + + if( startTime != null && endTime != null ) { + + long start = (Long) TypeConversionSupport.convert(startTime, Long.class); + long finish = (Long) TypeConversionSupport.convert(endTime, Long.class); + + scheduler.removeAllJobs(start, finish); + } else { + scheduler.removeAllJobs(); + } + } + } + + } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) { //clear transaction context Message msg = messageSend.copy(); msg.setTransactionId(null); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg); - if (cronValue != null) { - cronEntry = cronValue.toString(); - } - if (periodValue != null) { - period = (Long) TypeConversionSupport.convert(periodValue, Long.class); - } - if (delayValue != null) { - delay = (Long) TypeConversionSupport.convert(delayValue, Long.class); - } - Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); - if (repeatValue != null) { - repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); - } - getInternalScheduler().schedule(msg.getMessageId().toString(), - new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat); - + if (cronValue != null) { + cronEntry = cronValue.toString(); + } + if (periodValue != null) { + period = (Long) TypeConversionSupport.convert(periodValue, Long.class); + } + if (delayValue != null) { + delay = (Long) TypeConversionSupport.convert(delayValue, Long.class); + } + Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); + if (repeatValue != null) { + repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); + } + getInternalScheduler().schedule(msg.getMessageId().toString(), + new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat); } else { super.send(producerExchange, messageSend); @@ -151,14 +203,14 @@ public class SchedulerBroker extends BrokerFilter implements JobListener { if (repeatValue != null) { repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); } - - if (repeat != 0 || cronStr != null && cronStr.length() > 0) { - // create a unique id - the original message could be sent - // lots of times - messageSend - .setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId())); - } - + + if (repeat != 0 || cronStr != null && cronStr.length() > 0) { + // create a unique id - the original message could be sent + // lots of times + messageSend.setMessageId( + new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId())); + } + // Add the jobId as a property messageSend.setProperty("scheduledJobId", id); @@ -176,7 +228,6 @@ public class SchedulerBroker extends BrokerFilter implements JobListener { } catch (Exception e) { LOG.error("Failed to send scheduled message " + id, e); } - } protected synchronized JobScheduler getInternalScheduler() throws Exception { @@ -202,4 +253,37 @@ public class SchedulerBroker extends BrokerFilter implements JobListener { return null; } + protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo) + throws Exception { + + org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload()); + try { + Message msg = (Message) this.wireFormat.unmarshal(packet); + msg.setOriginalTransactionId(null); + msg.setPersistent(false); + msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); + msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId())); + msg.setDestination(replyTo); + msg.setResponseRequired(false); + msg.setProducerId(this.producerId); + + // Add the jobId as a property + msg.setProperty("scheduledJobId", job.getJobId()); + + final boolean originalFlowControl = context.isProducerFlowControl(); + final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); + producerExchange.setConnectionContext(context); + producerExchange.setMutable(true); + producerExchange.setProducerState(new ProducerState(new ProducerInfo())); + try { + context.setProducerFlowControl(false); + this.next.send(producerExchange, msg); + } finally { + context.setProducerFlowControl(originalFlowControl); + } + } catch (Exception e) { + LOG.error("Failed to send scheduled message " + job.getJobId(), e); + } + + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java new file mode 100644 index 0000000000..3acd570338 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java @@ -0,0 +1,422 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.IdGenerator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport { + + private static final transient Log LOG = LogFactory.getLog(JobSchedulerManagementTest.class); + + public void testRemoveAllScheduled() throws Exception { + final int COUNT = 5; + Connection connection = createConnection(); + + // Setup the scheduled Message + scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6), COUNT); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create the Browse Destination and the Reply To location + Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION); + + // Create the eventual Consumer to receive the scheduled message + MessageConsumer consumer = session.createConsumer(destination); + + final CountDownLatch latch = new CountDownLatch(COUNT); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + latch.countDown(); + } + }); + + connection.start(); + + // Send the remove request + MessageProducer producer = session.createProducer(management); + Message request = session.createMessage(); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL); + producer.send(request); + + // Now wait and see if any get delivered, none should. + latch.await(10, TimeUnit.SECONDS); + assertEquals(latch.getCount(), COUNT); + } + + public void testRemoveAllScheduledAtTime() throws Exception { + final int COUNT = 3; + Connection connection = createConnection(); + + // Setup the scheduled Message + scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6)); + scheduleMessage(connection, TimeUnit.SECONDS.toMillis(15)); + scheduleMessage(connection, TimeUnit.SECONDS.toMillis(20)); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create the Browse Destination and the Reply To location + Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION); + Destination browseDest = session.createTemporaryQueue(); + + // Create the eventual Consumer to receive the scheduled message + MessageConsumer consumer = session.createConsumer(destination); + + final CountDownLatch latch = new CountDownLatch(COUNT); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + latch.countDown(); + } + }); + + // Create the "Browser" + MessageConsumer browser = session.createConsumer(browseDest); + final CountDownLatch browsedLatch = new CountDownLatch(COUNT); + browser.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + browsedLatch.countDown(); + LOG.debug("Scheduled Message Browser got Message: " + message); + } + }); + + connection.start(); + + long start = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10); + long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30); + + // Send the remove request + MessageProducer producer = session.createProducer(management); + Message request = session.createMessage(); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, + ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start)); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end)); + producer.send(request); + + // Send the browse request + request = session.createMessage(); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE); + request.setJMSReplyTo(browseDest); + producer.send(request); + + // now see if we got back only the one remaining message. + latch.await(10, TimeUnit.SECONDS); + assertEquals(2, browsedLatch.getCount()); + + // Now wait and see if any get delivered, none should. + latch.await(10, TimeUnit.SECONDS); + assertEquals(2, latch.getCount()); + } + + public void testBrowseAllScheduled() throws Exception { + final int COUNT = 10; + Connection connection = createConnection(); + + // Setup the scheduled Message + scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9), COUNT); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create the Browse Destination and the Reply To location + Destination requestBrowse = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION); + Destination browseDest = session.createTemporaryQueue(); + + // Create the eventual Consumer to receive the scheduled message + MessageConsumer consumer = session.createConsumer(destination); + + final CountDownLatch latch = new CountDownLatch(COUNT); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + latch.countDown(); + } + }); + + // Create the "Browser" + MessageConsumer browser = session.createConsumer(browseDest); + final CountDownLatch browsedLatch = new CountDownLatch(COUNT); + browser.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + browsedLatch.countDown(); + LOG.debug("Scheduled Message Browser got Message: " + message); + } + }); + + connection.start(); + + // Send the browse request + MessageProducer producer = session.createProducer(requestBrowse); + Message request = session.createMessage(); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE); + request.setJMSReplyTo(browseDest); + producer.send(request); + + // make sure the message isn't delivered early because we browsed it + Thread.sleep(2000); + assertEquals(latch.getCount(), COUNT); + + // now see if we got all the scheduled messages on the browse destination. + latch.await(10, TimeUnit.SECONDS); + assertEquals(browsedLatch.getCount(), 0); + + // now check that they all got delivered + latch.await(10, TimeUnit.SECONDS); + assertEquals(latch.getCount(), 0); + } + + public void testBrowseWindowlScheduled() throws Exception { + final int COUNT = 10; + Connection connection = createConnection(); + + // Setup the scheduled Message + scheduleMessage(connection, TimeUnit.SECONDS.toMillis(5)); + scheduleMessage(connection, TimeUnit.SECONDS.toMillis(10), COUNT); + scheduleMessage(connection, TimeUnit.SECONDS.toMillis(20)); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create the Browse Destination and the Reply To location + Destination requestBrowse = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION); + Destination browseDest = session.createTemporaryQueue(); + + // Create the eventual Consumer to receive the scheduled message + MessageConsumer consumer = session.createConsumer(destination); + + final CountDownLatch latch = new CountDownLatch(COUNT + 2); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + latch.countDown(); + } + }); + + // Create the "Browser" + MessageConsumer browser = session.createConsumer(browseDest); + final CountDownLatch browsedLatch = new CountDownLatch(COUNT); + browser.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + browsedLatch.countDown(); + LOG.debug("Scheduled Message Browser got Message: " + message); + } + }); + + connection.start(); + + long start = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(6); + long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(15); + + // Send the browse request + MessageProducer producer = session.createProducer(requestBrowse); + Message request = session.createMessage(); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start)); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end)); + request.setJMSReplyTo(browseDest); + producer.send(request); + + // make sure the message isn't delivered early because we browsed it + Thread.sleep(2000); + assertEquals(COUNT + 2, latch.getCount()); + + // now see if we got all the scheduled messages on the browse destination. + latch.await(15, TimeUnit.SECONDS); + assertEquals(0, browsedLatch.getCount()); + + // now see if we got all the scheduled messages on the browse destination. + latch.await(20, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + } + + public void testRemoveScheduled() throws Exception { + final int COUNT = 10; + Connection connection = createConnection(); + + // Setup the scheduled Message + scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9), COUNT); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create the Browse Destination and the Reply To location + Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION); + Destination browseDest = session.createTemporaryQueue(); + + // Create the eventual Consumer to receive the scheduled message + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(management); + + final CountDownLatch latch = new CountDownLatch(COUNT); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + latch.countDown(); + } + }); + + // Create the "Browser" + Session browseSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer browser = browseSession.createConsumer(browseDest); + + connection.start(); + + // Send the browse request + Message request = session.createMessage(); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, + ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE); + request.setJMSReplyTo(browseDest); + producer.send(request); + + // Browse all the Scheduled Messages. + for (int i = 0; i < COUNT; ++i) { + Message message = browser.receive(2000); + assertNotNull(message); + + try{ + Message remove = session.createMessage(); + remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, + ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE); + remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, + message.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID)); + producer.send(remove); + } catch(Exception e) { + } + } + + // now check that they all got removed and are not delivered. + latch.await(11, TimeUnit.SECONDS); + assertEquals(COUNT, latch.getCount()); + } + + public void testRemoveNotScheduled() throws Exception { + Connection connection = createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create the Browse Destination and the Reply To location + Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION); + + MessageProducer producer = session.createProducer(management); + + try{ + + // Send the remove request + Message remove = session.createMessage(); + remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, + ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL); + remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, new IdGenerator().generateId()); + producer.send(remove); + } catch(Exception e) { + fail("Caught unexpected exception during remove of unscheduled message."); + } + } + + public void testBrowseWithSelector() throws Exception { + Connection connection = createConnection(); + + // Setup the scheduled Message + scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9)); + scheduleMessage(connection, TimeUnit.SECONDS.toMillis(10)); + scheduleMessage(connection, TimeUnit.SECONDS.toMillis(5)); + scheduleMessage(connection, TimeUnit.SECONDS.toMillis(45)); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create the Browse Destination and the Reply To location + Destination requestBrowse = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION); + Destination browseDest = session.createTemporaryTopic(); + + // Create the "Browser" + MessageConsumer browser = session.createConsumer(browseDest, ScheduledMessage.AMQ_SCHEDULED_DELAY + " = 45000" ); + + connection.start(); + + // Send the browse request + MessageProducer producer = session.createProducer(requestBrowse); + Message request = session.createMessage(); + request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE); + request.setJMSReplyTo(browseDest); + producer.send(request); + + // Now try and receive the one we selected + Message message = browser.receive(5000); + assertNotNull(message); + assertEquals(45000, message.getLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY)); + + // Now check if there are anymore, there shouldn't be + message = browser.receive(5000); + assertNull(message); + } + + + protected void scheduleMessage(Connection connection, long delay) throws Exception { + scheduleMessage(connection, delay, 1); + } + + protected void scheduleMessage(Connection connection, long delay, int count) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("test msg"); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); + + for(int i = 0; i < count; ++i ) { + producer.send(message); + } + + producer.close(); + } + + @Override + protected void setUp() throws Exception { + bindAddress = "vm://localhost"; + super.setUp(); + } + + @Override + protected BrokerService createBroker() throws Exception { + return createBroker(true); + } + + protected BrokerService createBroker(boolean delete) throws Exception { + File schedulerDirectory = new File("target/scheduler"); + if (delete) { + IOHelper.mkdirs(schedulerDirectory); + IOHelper.deleteChildren(schedulerDirectory); + } + BrokerService answer = new BrokerService(); + answer.setPersistent(isPersistent()); + answer.setDeleteAllMessagesOnStartup(true); + answer.setDataDirectory("target"); + answer.setSchedulerDirectoryFile(schedulerDirectory); + answer.setUseJmx(false); + answer.addConnector(bindAddress); + return answer; + } +}