From 3627ba57c9c48cc41d2036c913eb2c3c904a7743 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Tue, 8 Mar 2022 11:06:00 -0600 Subject: [PATCH] ARTEMIS-3711 support AMQ_SCHEDULED_DELAY for OpenWire clients --- .../openwire/OpenWireMessageConverter.java | 8 ++ .../openwire/OpenWireScheduledDelayTest.java | 82 +++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireScheduledDelayTest.java diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index da74a43f35..e203b3add2 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -38,6 +38,7 @@ import java.util.zip.InflaterInputStream; import java.util.zip.InflaterOutputStream; import com.google.common.io.BaseEncoding; +import org.apache.activemq.ScheduledMessage; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ICoreMessage; @@ -224,6 +225,13 @@ public final class OpenWireMessageConverter { coreMessage.putStringProperty(AMQ_MSG_ORIG_DESTINATION, origDest.getQualifiedName()); } + final Object scheduledDelay = messageSend.getProperties().get(ScheduledMessage.AMQ_SCHEDULED_DELAY); + if (scheduledDelay instanceof Long) { + coreMessage.putLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + ((Long) scheduledDelay)); + // this property may have already been copied, but we don't need it anymore + coreMessage.removeProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY); + } + return coreMessage; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireScheduledDelayTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireScheduledDelayTest.java new file mode 100644 index 0000000000..b45c9b89cd --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireScheduledDelayTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.util.Map; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.junit.Assert; +import org.junit.Test; + +public class OpenWireScheduledDelayTest extends OpenWireTestBase { + + @Override + protected void configureAddressSettings(Map addressSettingsMap) { + addressSettingsMap.put("#", new AddressSettings().setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ"))); + } + + @Test + public void testScheduledDelay() throws Exception { + final String QUEUE_NAME = RandomUtil.randomString(); + final long DELAY = 2000; + final String PROP_NAME = RandomUtil.randomString(); + final String FIRST = RandomUtil.randomString(); + final String SECOND = RandomUtil.randomString(); + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); + Connection connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(QUEUE_NAME); + MessageProducer producer = session.createProducer(destination); + Message firstMessage = session.createMessage(); + firstMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELAY); + firstMessage.setStringProperty(PROP_NAME, FIRST); + final long ETA = System.currentTimeMillis() + DELAY; + producer.send(firstMessage); + Message secondMessage = session.createMessage(); + secondMessage.setStringProperty(PROP_NAME, SECOND); + producer.send(secondMessage); + producer.close(); + + MessageConsumer consumer = session.createConsumer(destination); + + Message received = consumer.receive(250); + assertNotNull(received); + assertEquals(SECOND, received.getStringProperty(PROP_NAME)); + + received = consumer.receive(DELAY + 250); + assertNotNull(received); + assertEquals(FIRST, received.getStringProperty(PROP_NAME)); + Assert.assertTrue(System.currentTimeMillis() >= ETA); + + connection.close(); + } +} + +