From a1fb897b437d48aa98935e198c726a5b1eece959 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 6 Jun 2017 11:20:44 -0400 Subject: [PATCH] ARTEMIS-821 Add support for scheduled message for STOMP Adds headers AMQ_SCHEDULED_DELAY and AMQ_SCHEDULED_TIME to STOMP protocol handling to allow for delayed and scheduled time of a message. The AMQ_SCHEDULED_DELAY brings forward the same option from the 5.x broker and the AMQ_SCHEDULED_TIME option adds a fixed time of delivery alternative to match that of AMQP and others. --- .../artemis/core/protocol/stomp/Stomp.java | 8 ++ .../core/protocol/stomp/StompUtils.java | 19 +++ .../tests/integration/stomp/StompTest.java | 119 ++++++++++++++++-- 3 files changed, 139 insertions(+), 7 deletions(-) diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java index 89c14e71ae..f1000ccf01 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java @@ -98,6 +98,14 @@ public interface Stomp { String TYPE = "type"; String PERSISTENT = "persistent"; + + // Extensions + + // ActiveMQ 5.x Scheduled Message Compatibility. + String AMQ_SCHEDULED_DELAY = "AMQ_SCHEDULED_DELAY"; + + // Provides a hard time of delivery option (Epoch based) + String AMQ_SCHEDULED_TIME = "AMQ_SCHEDULED_TIME"; } interface Message { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java index b05058bf49..ae6bcb9318 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.protocol.stomp; +import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; + import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -69,6 +71,23 @@ public class StompUtils { msg.setExpiration(Long.parseLong(expiration)); } + // Extension headers + String scheduledDelay = headers.remove(Stomp.Headers.Send.AMQ_SCHEDULED_DELAY); + if (scheduledDelay != null) { + long delay = Long.parseLong(scheduledDelay); + if (delay > 0) { + msg.putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + delay); + } + } + + String scheduledTime = headers.remove(Stomp.Headers.Send.AMQ_SCHEDULED_TIME); + if (scheduledTime != null) { + long deliveryTime = Long.parseLong(scheduledTime); + if (deliveryTime > 0) { + msg.putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, deliveryTime); + } + } + // now the general headers for (Entry entry : headers.entrySet()) { String name = entry.getKey(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 40dbd9583c..f023cfb25c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -16,12 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.stomp; -import javax.jms.BytesMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.util.HashSet; @@ -32,7 +26,15 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.jms.BytesMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -48,7 +50,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -544,6 +545,110 @@ public class StompTest extends StompTestBase { Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID")); } + @Test + public void testSendMessageWithDelay() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + conn.connect(defUser, defPass); + + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader("foo", "abc") + .addHeader("bar", "123") + .addHeader("correlation-id", "c123") + .addHeader("persistent", "true") + .addHeader("type", "t345") + .addHeader("JMSXGroupID", "abc") + .addHeader("priority", "3") + .addHeader("AMQ_SCHEDULED_DELAY", "2000") + .setBody("Hello World"); + conn.sendFrame(frame); + + assertNull("Should not receive message yet", consumer.receive(1000)); + + TextMessage message = (TextMessage) consumer.receive(4000); + Assert.assertNotNull(message); + Assert.assertEquals("Hello World", message.getText()); + Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID()); + Assert.assertEquals("getJMSType", "t345", message.getJMSType()); + Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority()); + Assert.assertEquals(javax.jms.DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); + Assert.assertEquals("foo", "abc", message.getStringProperty("foo")); + Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID")); + } + + @Test + public void testSendMessageWithDeliveryTime() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + conn.connect(defUser, defPass); + + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader("foo", "abc") + .addHeader("bar", "123") + .addHeader("correlation-id", "c123") + .addHeader("persistent", "true") + .addHeader("type", "t345") + .addHeader("JMSXGroupID", "abc") + .addHeader("priority", "3") + .addHeader("AMQ_SCHEDULED_TIME", Long.toString(System.currentTimeMillis() + 2000)) + .setBody("Hello World"); + conn.sendFrame(frame); + + assertNull("Should not receive message yet", consumer.receive(1000)); + + TextMessage message = (TextMessage) consumer.receive(4000); + Assert.assertNotNull(message); + Assert.assertEquals("Hello World", message.getText()); + Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID()); + Assert.assertEquals("getJMSType", "t345", message.getJMSType()); + Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority()); + Assert.assertEquals(javax.jms.DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); + Assert.assertEquals("foo", "abc", message.getStringProperty("foo")); + Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID")); + } + + @Test + public void testSendMessageWithDelayWithBadValue() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + conn.connect(defUser, defPass); + + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader("AMQ_SCHEDULED_DELAY", "foo") + .setBody("Hello World"); + conn.sendFrame(frame); + + assertNull("Should not receive message yet", consumer.receive(1000)); + + ClientStompFrame error = conn.receiveFrame(); + + Assert.assertNotNull(error); + Assert.assertTrue(error.getCommand().equals("ERROR")); + } + + @Test + public void testSendMessageWithDeliveryTimeWithBadValue() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + conn.connect(defUser, defPass); + + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName()) + .addHeader("AMQ_SCHEDULED_TIME", "foo") + .setBody("Hello World"); + conn.sendFrame(frame); + + assertNull("Should not receive message yet", consumer.receive(1000)); + + ClientStompFrame error = conn.receiveFrame(); + + Assert.assertNotNull(error); + Assert.assertTrue(error.getCommand().equals("ERROR")); + } + @Test public void testSubscribeWithAutoAck() throws Exception { conn.connect(defUser, defPass);