ARTEMIS-821 Add support for scheduled message for STOMP
Adds headers AMQ_SCHEDULED_DELAY and AMQ_SCHEDULED_TIME to STOMP protocol handling to allow for delayed and scheduled time of a message. The AMQ_SCHEDULED_DELAY brings forward the same option from the 5.x broker and the AMQ_SCHEDULED_TIME option adds a fixed time of delivery alternative to match that of AMQP and others.
This commit is contained in:
parent
34362e9803
commit
a1fb897b43
|
@ -98,6 +98,14 @@ public interface Stomp {
|
||||||
String TYPE = "type";
|
String TYPE = "type";
|
||||||
|
|
||||||
String PERSISTENT = "persistent";
|
String PERSISTENT = "persistent";
|
||||||
|
|
||||||
|
// Extensions
|
||||||
|
|
||||||
|
// ActiveMQ 5.x Scheduled Message Compatibility.
|
||||||
|
String AMQ_SCHEDULED_DELAY = "AMQ_SCHEDULED_DELAY";
|
||||||
|
|
||||||
|
// Provides a hard time of delivery option (Epoch based)
|
||||||
|
String AMQ_SCHEDULED_TIME = "AMQ_SCHEDULED_TIME";
|
||||||
}
|
}
|
||||||
|
|
||||||
interface Message {
|
interface Message {
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.protocol.stomp;
|
package org.apache.activemq.artemis.core.protocol.stomp;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
@ -69,6 +71,23 @@ public class StompUtils {
|
||||||
msg.setExpiration(Long.parseLong(expiration));
|
msg.setExpiration(Long.parseLong(expiration));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Extension headers
|
||||||
|
String scheduledDelay = headers.remove(Stomp.Headers.Send.AMQ_SCHEDULED_DELAY);
|
||||||
|
if (scheduledDelay != null) {
|
||||||
|
long delay = Long.parseLong(scheduledDelay);
|
||||||
|
if (delay > 0) {
|
||||||
|
msg.putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + delay);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String scheduledTime = headers.remove(Stomp.Headers.Send.AMQ_SCHEDULED_TIME);
|
||||||
|
if (scheduledTime != null) {
|
||||||
|
long deliveryTime = Long.parseLong(scheduledTime);
|
||||||
|
if (deliveryTime > 0) {
|
||||||
|
msg.putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, deliveryTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// now the general headers
|
// now the general headers
|
||||||
for (Entry<String, String> entry : headers.entrySet()) {
|
for (Entry<String, String> entry : headers.entrySet()) {
|
||||||
String name = entry.getKey();
|
String name = entry.getKey();
|
||||||
|
|
|
@ -16,12 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.stomp;
|
package org.apache.activemq.artemis.tests.integration.stomp;
|
||||||
|
|
||||||
import javax.jms.BytesMessage;
|
|
||||||
import javax.jms.Message;
|
|
||||||
import javax.jms.MessageConsumer;
|
|
||||||
import javax.jms.MessageListener;
|
|
||||||
import javax.jms.MessageProducer;
|
|
||||||
import javax.jms.TextMessage;
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -32,7 +26,15 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import javax.jms.BytesMessage;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageListener;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
|
@ -48,7 +50,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||||
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
|
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
|
||||||
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
|
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
|
@ -544,6 +545,110 @@ public class StompTest extends StompTestBase {
|
||||||
Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
|
Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendMessageWithDelay() throws Exception {
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
|
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
|
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||||
|
.addHeader("foo", "abc")
|
||||||
|
.addHeader("bar", "123")
|
||||||
|
.addHeader("correlation-id", "c123")
|
||||||
|
.addHeader("persistent", "true")
|
||||||
|
.addHeader("type", "t345")
|
||||||
|
.addHeader("JMSXGroupID", "abc")
|
||||||
|
.addHeader("priority", "3")
|
||||||
|
.addHeader("AMQ_SCHEDULED_DELAY", "2000")
|
||||||
|
.setBody("Hello World");
|
||||||
|
conn.sendFrame(frame);
|
||||||
|
|
||||||
|
assertNull("Should not receive message yet", consumer.receive(1000));
|
||||||
|
|
||||||
|
TextMessage message = (TextMessage) consumer.receive(4000);
|
||||||
|
Assert.assertNotNull(message);
|
||||||
|
Assert.assertEquals("Hello World", message.getText());
|
||||||
|
Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
|
||||||
|
Assert.assertEquals("getJMSType", "t345", message.getJMSType());
|
||||||
|
Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
|
||||||
|
Assert.assertEquals(javax.jms.DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
|
||||||
|
Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
|
||||||
|
Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendMessageWithDeliveryTime() throws Exception {
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
|
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
|
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||||
|
.addHeader("foo", "abc")
|
||||||
|
.addHeader("bar", "123")
|
||||||
|
.addHeader("correlation-id", "c123")
|
||||||
|
.addHeader("persistent", "true")
|
||||||
|
.addHeader("type", "t345")
|
||||||
|
.addHeader("JMSXGroupID", "abc")
|
||||||
|
.addHeader("priority", "3")
|
||||||
|
.addHeader("AMQ_SCHEDULED_TIME", Long.toString(System.currentTimeMillis() + 2000))
|
||||||
|
.setBody("Hello World");
|
||||||
|
conn.sendFrame(frame);
|
||||||
|
|
||||||
|
assertNull("Should not receive message yet", consumer.receive(1000));
|
||||||
|
|
||||||
|
TextMessage message = (TextMessage) consumer.receive(4000);
|
||||||
|
Assert.assertNotNull(message);
|
||||||
|
Assert.assertEquals("Hello World", message.getText());
|
||||||
|
Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
|
||||||
|
Assert.assertEquals("getJMSType", "t345", message.getJMSType());
|
||||||
|
Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
|
||||||
|
Assert.assertEquals(javax.jms.DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
|
||||||
|
Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
|
||||||
|
Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendMessageWithDelayWithBadValue() throws Exception {
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
|
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
|
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||||
|
.addHeader("AMQ_SCHEDULED_DELAY", "foo")
|
||||||
|
.setBody("Hello World");
|
||||||
|
conn.sendFrame(frame);
|
||||||
|
|
||||||
|
assertNull("Should not receive message yet", consumer.receive(1000));
|
||||||
|
|
||||||
|
ClientStompFrame error = conn.receiveFrame();
|
||||||
|
|
||||||
|
Assert.assertNotNull(error);
|
||||||
|
Assert.assertTrue(error.getCommand().equals("ERROR"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendMessageWithDeliveryTimeWithBadValue() throws Exception {
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
|
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
|
||||||
|
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
|
||||||
|
.addHeader("AMQ_SCHEDULED_TIME", "foo")
|
||||||
|
.setBody("Hello World");
|
||||||
|
conn.sendFrame(frame);
|
||||||
|
|
||||||
|
assertNull("Should not receive message yet", consumer.receive(1000));
|
||||||
|
|
||||||
|
ClientStompFrame error = conn.receiveFrame();
|
||||||
|
|
||||||
|
Assert.assertNotNull(error);
|
||||||
|
Assert.assertTrue(error.getCommand().equals("ERROR"));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSubscribeWithAutoAck() throws Exception {
|
public void testSubscribeWithAutoAck() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
Loading…
Reference in New Issue