Add support for the delivery time header to allow for scheduled messages
at a desired time.
This commit is contained in:
Timothy Bish 2015-11-09 12:00:38 -05:00
parent a42be999cb
commit 480b3e7c36
3 changed files with 204 additions and 1 deletions

View File

@ -23,6 +23,7 @@ import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.ScheduledMessage;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Decimal128;
import org.apache.qpid.proton.amqp.Decimal32;
@ -137,6 +138,13 @@ public abstract class InboundTransformer {
// Legacy annotation, JMSType value will be replaced by Subject further down if also present.
jms.setJMSType(entry.getValue().toString());
}
if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
long deliveryTime = ((Number) entry.getValue()).longValue();
long delay = deliveryTime - System.currentTimeMillis();
if (delay > 0) {
jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
}
}
setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
}

View File

@ -111,7 +111,7 @@ public class AmqpTestSupport {
kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName()));
brokerService.setPersistenceAdapter(kaha);
}
brokerService.setSchedulerSupport(false);
brokerService.setSchedulerSupport(isSchedulerEnabled());
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(isUseJmx());
brokerService.getManagementContext().setCreateConnector(false);
@ -223,6 +223,10 @@ public class AmqpTestSupport {
return true;
}
protected boolean isSchedulerEnabled() {
return false;
}
protected boolean isUseOpenWireConnector() {
return false;
}

View File

@ -0,0 +1,191 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.util.Wait;
import org.junit.Test;
/**
* Test for scheduled message support using AMQP message annotations.
*/
public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
@Override
protected boolean isSchedulerEnabled() {
return true;
}
@Override
protected boolean isUseOpenWireConnector() {
return true;
}
@Test(timeout = 60000)
public void testSendWithDeliveryTimeIsScheduled() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
AmqpSender sender = session.createSender("queue://" + getTestName());
// Get the Queue View early to avoid racing the delivery.
assertEquals(1, brokerService.getAdminView().getQueues().length);
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2);
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
JobSchedulerViewMBean view = getJobSchedulerMBean();
assertNotNull(view);
assertEquals(1, view.getAllJobs().size());
connection.close();
}
@Test(timeout = 60000)
public void testSendRecvWithDeliveryTime() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
AmqpSender sender = session.createSender("queue://" + getTestName());
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
// Get the Queue View early to avoid racing the delivery.
assertEquals(1, brokerService.getAdminView().getQueues().length);
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + 2000;
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
assertTrue("Delayed message should be delivered", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return queueView.getQueueSize() == 1;
}
}));
// Now try and get the message
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time");
assertNotNull(msgDeliveryTime);
assertEquals(deliveryTime, msgDeliveryTime.longValue());
connection.close();
}
@Test(timeout = 60000)
public void testSendScheduledReceiveOverOpenWire() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
AmqpSender sender = session.createSender("queue://" + getTestName());
// Get the Queue View early to avoid racing the delivery.
assertEquals(1, brokerService.getAdminView().getQueues().length);
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + 2000;
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
sender.close();
// Read the message
readMessages(getTestName(), 1, false);
connection.close();
}
public void readMessages(String destinationName, int count, boolean topic) throws Exception {
Connection connection = createJMSConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = null;
if (topic) {
destination = session.createTopic(destinationName);
} else {
destination = session.createQueue(destinationName);
}
MessageConsumer consumer = session.createConsumer(destination);
for (int i = 1; i <= count; i++) {
Message received = consumer.receive(5000);
assertNotNull(received);
}
} finally {
connection.close();
}
}
protected JobSchedulerViewMBean getJobSchedulerMBean() throws Exception {
ObjectName objectName = brokerService.getAdminView().getJMSJobScheduler();
JobSchedulerViewMBean scheduler = null;
if (objectName != null) {
scheduler = (JobSchedulerViewMBean) brokerService.getManagementContext()
.newProxyInstance(objectName, JobSchedulerViewMBean.class, true);
}
return scheduler;
}
}