Add support for amqp style variants of the ActiveMQ job scheduler
options set in message annotations.
This commit is contained in:
Timothy Bish 2015-11-10 18:12:40 -05:00
parent 078a101cf7
commit 5d353e241b
3 changed files with 112 additions and 3 deletions

View File

@ -137,13 +137,32 @@ public abstract class InboundTransformer {
if ("x-opt-jms-type".equals(key) && entry.getValue() != null) {
// Legacy annotation, JMSType value will be replaced by Subject further down if also present.
jms.setJMSType(entry.getValue().toString());
}
if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
} else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
long deliveryTime = ((Number) entry.getValue()).longValue();
long delay = deliveryTime - System.currentTimeMillis();
if (delay > 0) {
jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
}
} else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
long delay = ((Number) entry.getValue()).longValue();
if (delay > 0) {
jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
}
} else if ("x-opt-delivery-repeat".equals(key) && entry.getValue() != null) {
int repeat = ((Number) entry.getValue()).intValue();
if (repeat > 0) {
jms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
}
} else if ("x-opt-delivery-period".equals(key) && entry.getValue() != null) {
long period = ((Number) entry.getValue()).longValue();
if (period > 0) {
jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
}
} else if ("x-opt-delivery-cron".equals(key) && entry.getValue() != null) {
String cronEntry = (String) entry.getValue();
if (cronEntry != null) {
jms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cronEntry);
}
}
setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.TimeUnit;
@ -148,13 +149,100 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
sender.send(message);
sender.close();
// Read the message with short timeout, shouldn't get it.
try {
readMessages(getTestName(), 1, false, 500);
fail("Should not read the message");
} catch (Throwable ex) {
}
// Read the message
readMessages(getTestName(), 1, false);
connection.close();
}
@Test
public void testScheduleWithDelay() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
AmqpSender sender = session.createSender("queue://" + getTestName());
// Get the Queue View early to avoid racing the delivery.
assertEquals(1, brokerService.getAdminView().getQueues().length);
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
long delay = 5000;
message.setMessageAnnotation("x-opt-delivery-delay", delay);
message.setText("Test-Message");
sender.send(message);
sender.close();
// Read the message with short timeout, shouldn't get it.
try {
readMessages(getTestName(), 1, false, 1000);
fail("Should not read the message");
} catch (Throwable ex) {
}
// Read the message with long timeout, should get it.
try {
readMessages(getTestName(), 1, false, 10000);
} catch (Throwable ex) {
fail("Should read the message");
}
connection.close();
}
@Test
public void testScheduleRepeated() throws Exception {
final int NUMBER = 10;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
AmqpSender sender = session.createSender("queue://" + getTestName());
// Get the Queue View early to avoid racing the delivery.
assertEquals(1, brokerService.getAdminView().getQueues().length);
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
long delay = 1000;
message.setMessageAnnotation("x-opt-delivery-delay", delay);
message.setMessageAnnotation("x-opt-delivery-period", 500);
message.setMessageAnnotation("x-opt-delivery-repeat", NUMBER - 1);
message.setText("Test-Message");
sender.send(message);
sender.close();
readMessages(getTestName(), NUMBER, false);
// Read the message with short timeout, shouldn't get it.
try {
readMessages(getTestName(), 1, false, 600);
fail("Should not read more messages");
} catch (Throwable ex) {
}
connection.close();
}
public void readMessages(String destinationName, int count, boolean topic) throws Exception {
readMessages(destinationName, count, topic, 5000);
}
public void readMessages(String destinationName, int count, boolean topic, long timeout) throws Exception {
Connection connection = createJMSConnection();
connection.start();
@ -169,8 +257,9 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
MessageConsumer consumer = session.createConsumer(destination);
for (int i = 1; i <= count; i++) {
Message received = consumer.receive(5000);
Message received = consumer.receive(timeout);
assertNotNull(received);
LOG.info("Read next message: {}", received.getJMSMessageID());
}
} finally {
connection.close();

View File

@ -20,6 +20,7 @@
#
log4j.rootLogger=WARN, console, file
log4j.logger.org.apache.activemq=INFO
log4j.logger.org.apache.activemq.broker.scheduler=TRACE
log4j.logger.org.apache.activemq.transport.amqp=DEBUG
log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
log4j.logger.org.fusesource=INFO