make sure scheduling works with transactions

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@925221 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-03-19 14:08:07 +00:00
parent efd6bda979
commit cc40251451
2 changed files with 45 additions and 4 deletions

View File

@ -111,7 +111,10 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY); Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
if (cronValue != null || periodValue != null || delayValue != null) { if (cronValue != null || periodValue != null || delayValue != null) {
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(messageSend); //clear transaction context
Message msg = messageSend.copy();
msg.setTransactionId(null);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg);
if (cronValue != null) { if (cronValue != null) {
cronEntry = cronValue.toString(); cronEntry = cronValue.toString();
} }
@ -121,12 +124,11 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
if (delayValue != null) { if (delayValue != null) {
delay = (Long) TypeConversionSupport.convert(delayValue, Long.class); delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
} }
Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
if (repeatValue != null) { if (repeatValue != null) {
repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
} }
getInternalScheduler().schedule(msg.getMessageId().toString(),
getInternalScheduler().schedule(messageSend.getMessageId().toString(),
new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat); new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat);

View File

@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
@ -104,6 +105,44 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
assertEquals(latch.getCount(), 0); assertEquals(latch.getCount(), 0);
} }
public void testTransactedSchedule() throws Exception {
final int COUNT = 1;
Connection connection = createConnection();
final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(destination);
final CountDownLatch latch = new CountDownLatch(COUNT);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
latch.countDown();
try {
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
connection.start();
long time = 5000;
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
producer.send(message);
session.commit();
producer.close();
// make sure the message isn't delivered early
Thread.sleep(2000);
assertEquals(latch.getCount(), COUNT);
latch.await(5, TimeUnit.SECONDS);
assertEquals(latch.getCount(), 0);
}
public void testScheduleRepeated() throws Exception { public void testScheduleRepeated() throws Exception {
final int NUMBER = 10; final int NUMBER = 10;
final AtomicInteger count = new AtomicInteger(); final AtomicInteger count = new AtomicInteger();