Timothy A. Bish 2011-04-18 22:56:53 +00:00
parent eb24079a93
commit 315b00fa14
2 changed files with 108 additions and 63 deletions

View File

@ -117,52 +117,52 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
String physicalName = messageSend.getDestination().getPhysicalName();
boolean schedularManage = physicalName.regionMatches(true, 0,
ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
if (schedularManage == true) {
JobScheduler scheduler = getInternalScheduler();
ActiveMQDestination replyTo = messageSend.getReplyTo();
JobScheduler scheduler = getInternalScheduler();
ActiveMQDestination replyTo = messageSend.getReplyTo();
String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
if (action != null ) {
if (action != null ) {
Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
if( startTime != null && endTime != null ) {
if( startTime != null && endTime != null ) {
long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
for (Job job : scheduler.getAllJobs(start, finish)) {
sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
}
} else {
for (Job job : scheduler.getAllJobs()) {
sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
}
}
}
if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
scheduler.remove(jobId);
} else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
for (Job job : scheduler.getAllJobs(start, finish)) {
sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
}
} else {
for (Job job : scheduler.getAllJobs()) {
sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
}
}
}
if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
scheduler.remove(jobId);
} else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
if( startTime != null && endTime != null ) {
if( startTime != null && endTime != null ) {
long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
scheduler.removeAllJobs(start, finish);
} else {
scheduler.removeAllJobs();
}
}
}
scheduler.removeAllJobs(start, finish);
} else {
scheduler.removeAllJobs();
}
}
}
} else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
//clear transaction context
@ -197,7 +197,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
Message messageSend = (Message) this.wireFormat.unmarshal(packet);
messageSend.setOriginalTransactionId(null);
Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
String cronStr = cronValue != null ? cronValue.toString() : null;
int repeat = 0;
if (repeatValue != null) {
@ -208,7 +208,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
// create a unique id - the original message could be sent
// lots of times
messageSend.setMessageId(
new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
}
// Add the jobId as a property
@ -220,6 +220,30 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
if (messageSend.getTimestamp() > 0 && messageSend.getExpiration() > 0) {
long oldExpiration = messageSend.getExpiration();
long newTimeStamp = System.currentTimeMillis();
long timeToLive = 0;
long oldTimestamp = messageSend.getTimestamp();
if (oldExpiration > 0) {
timeToLive = oldExpiration - oldTimestamp;
}
long expiration = timeToLive + newTimeStamp;
if(expiration > oldExpiration) {
if (timeToLive > 0 && expiration > 0) {
messageSend.setExpiration(expiration);
}
messageSend.setTimestamp(newTimeStamp);
if (LOG.isDebugEnabled()) {
LOG.debug("Set message " + messageSend.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp);
}
}
}
final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setConnectionContext(context);
producerExchange.setMutable(true);
@ -253,37 +277,37 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
return null;
}
protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo)
throws Exception {
protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo)
throws Exception {
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
try {
Message msg = (Message) this.wireFormat.unmarshal(packet);
msg.setOriginalTransactionId(null);
msg.setPersistent(false);
msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
msg.setDestination(replyTo);
msg.setResponseRequired(false);
msg.setProducerId(this.producerId);
msg.setPersistent(false);
msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
msg.setDestination(replyTo);
msg.setResponseRequired(false);
msg.setProducerId(this.producerId);
// Add the jobId as a property
msg.setProperty("scheduledJobId", job.getJobId());
msg.setProperty("scheduledJobId", job.getJobId());
final boolean originalFlowControl = context.isProducerFlowControl();
final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setConnectionContext(context);
producerExchange.setMutable(true);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
try {
context.setProducerFlowControl(false);
this.next.send(producerExchange, msg);
} finally {
context.setProducerFlowControl(originalFlowControl);
}
final boolean originalFlowControl = context.isProducerFlowControl();
final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setConnectionContext(context);
producerExchange.setMutable(true);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
try {
context.setProducerFlowControl(false);
this.next.send(producerExchange, msg);
} finally {
context.setProducerFlowControl(originalFlowControl);
}
} catch (Exception e) {
LOG.error("Failed to send scheduled message " + job.getJobId(), e);
}
}
}
}

View File

@ -56,18 +56,18 @@ public class JmsCronSchedulerTest extends EmbeddedBrokerTestSupport {
public void onMessage(Message message) {
latch.countDown();
count.incrementAndGet();
LOG.debug("Received one Message, count is at: " + count.get());
LOG.debug("Received one Message, count is at: " + count.get());
}
});
connection.start();
for (int i = 0; i < COUNT; i++) {
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg "+i);
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
producer.send(message);
producer.close();
//wait a couple sec so cron start time is different for next message
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg "+i);
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
producer.send(message);
producer.close();
//wait a couple sec so cron start time is different for next message
Thread.sleep(2000);
}
SchedulerBroker sb = (SchedulerBroker) this.broker.getBroker().getAdaptor(SchedulerBroker.class);
@ -79,6 +79,27 @@ public class JmsCronSchedulerTest extends EmbeddedBrokerTestSupport {
assertEquals(COUNT, count.get());
}
public void testCronScheduleWithTtlSet() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
connection.start();
MessageProducer producer = session.createProducer(destination);
producer.setTimeToLive(TimeUnit.MINUTES.toMillis(1));
TextMessage message = session.createTextMessage("test msg ");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
producer.send(message);
producer.close();
Thread.sleep(TimeUnit.MINUTES.toMillis(2));
assertNotNull(consumer.receiveNoWait());
assertNull(consumer.receiveNoWait());
}
@Override
protected void setUp() throws Exception {
bindAddress = "vm://localhost";