From cc40251451f90ffa34025ea8c947f60b67a572cd Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Fri, 19 Mar 2010 14:08:07 +0000 Subject: [PATCH] make sure scheduling works with transactions git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@925221 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/scheduler/SchedulerBroker.java | 10 +++-- .../broker/scheduler/JmsSchedulerTest.java | 39 +++++++++++++++++++ 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java index d2819e69f6..1e7b098857 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java @@ -111,7 +111,10 @@ public class SchedulerBroker extends BrokerFilter implements JobListener { Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY); if (cronValue != null || periodValue != null || delayValue != null) { - org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(messageSend); + //clear transaction context + Message msg = messageSend.copy(); + msg.setTransactionId(null); + org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg); if (cronValue != null) { cronEntry = cronValue.toString(); } @@ -121,12 +124,11 @@ public class SchedulerBroker extends BrokerFilter implements JobListener { if (delayValue != null) { delay = (Long) TypeConversionSupport.convert(delayValue, Long.class); } - Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); + Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); if (repeatValue != null) { repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); } - - getInternalScheduler().schedule(messageSend.getMessageId().toString(), + getInternalScheduler().schedule(msg.getMessageId().toString(), new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java index eefe6615a1..29c5067a9f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -103,6 +104,44 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport { latch.await(5, TimeUnit.SECONDS); assertEquals(latch.getCount(), 0); } + + public void testTransactedSchedule() throws Exception { + final int COUNT = 1; + Connection connection = createConnection(); + + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + MessageConsumer consumer = session.createConsumer(destination); + + final CountDownLatch latch = new CountDownLatch(COUNT); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + latch.countDown(); + try { + session.commit(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + + connection.start(); + long time = 5000; + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("test msg"); + + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + + producer.send(message); + session.commit(); + producer.close(); + // make sure the message isn't delivered early + Thread.sleep(2000); + assertEquals(latch.getCount(), COUNT); + latch.await(5, TimeUnit.SECONDS); + assertEquals(latch.getCount(), 0); + } + public void testScheduleRepeated() throws Exception { final int NUMBER = 10;