From 7d10e915b6d28e10b4cddc36fed53eed20cf9cbc Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 12 Feb 2021 10:56:32 -0500 Subject: [PATCH] ARTEMIS-3116 Fixing Core->AMQP Conversion of Scheduled Delivery Time --- .../amqp/broker/AMQPStandardMessage.java | 63 +++++++++++++ .../amqp/converter/CoreAmqpConverter.java | 55 ++--------- .../converter/message/DirectConvertTest.java | 79 ++++++++++++++++ .../message/MessageTransformationTest.java | 2 +- ...MQPScheduledCoreOverBrokerConnectTest.java | 94 +++++++++++++++++++ 5 files changed, 245 insertions(+), 48 deletions(-) create mode 100644 artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/DirectConvertTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPScheduledCoreOverBrokerConnectTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java index cb2133bfc2..ce2e3741ea 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java @@ -17,17 +17,27 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import java.nio.ByteBuffer; +import java.util.Map; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.collections.TypedProperties; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Footer; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.codec.EncoderImpl; import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.codec.WritableBuffer; @@ -35,6 +45,59 @@ import org.apache.qpid.proton.codec.WritableBuffer; // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format public class AMQPStandardMessage extends AMQPMessage { + + public static AMQPStandardMessage createMessage(long messageID, + long messageFormat, + SimpleString replyTo, + Header header, + Properties properties, + Map daMap, + Map maMap, + Map apMap, + Map footerMap, + Section body) { + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); + + try { + EncoderImpl encoder = TLSEncode.getEncoder(); + encoder.setByteBuffer(new NettyWritable(buffer)); + + if (header != null) { + encoder.writeObject(header); + } + if (daMap != null) { + encoder.writeObject(new DeliveryAnnotations(daMap)); + } + if (maMap != null) { + encoder.writeObject(new MessageAnnotations(maMap)); + } + if (properties != null) { + encoder.writeObject(properties); + } + if (apMap != null) { + encoder.writeObject(new ApplicationProperties(apMap)); + } + if (body != null) { + encoder.writeObject(body); + } + if (footerMap != null) { + encoder.writeObject(new Footer(footerMap)); + } + + byte[] data = new byte[buffer.writerIndex()]; + buffer.readBytes(data); + + AMQPStandardMessage amqpMessage = new AMQPStandardMessage(messageFormat, data, null); + amqpMessage.setMessageID(messageID); + amqpMessage.setReplyTo(replyTo); + return amqpMessage; + + } finally { + TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null); + buffer.release(); + } + } + // Buffer and state for the data backing this message. protected ReadableBuffer data; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java index 2fa15fe2c4..ee7659e989 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java @@ -57,7 +57,6 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage; import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.ConversionException; import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.CoreMessageWrapper; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; -import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil; @@ -65,22 +64,13 @@ import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedByte; import org.apache.qpid.proton.amqp.UnsignedInteger; -import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; -import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; -import org.apache.qpid.proton.amqp.messaging.Footer; import org.apache.qpid.proton.amqp.messaging.Header; -import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.codec.DecoderImpl; -import org.apache.qpid.proton.codec.EncoderImpl; import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader; -import org.apache.qpid.proton.codec.WritableBuffer; import org.jboss.logging.Logger; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; - public class CoreAmqpConverter { private static Logger logger = Logger.getLogger(CoreAmqpConverter.class); @@ -108,7 +98,6 @@ public class CoreAmqpConverter { CoreMessageWrapper message = CoreMessageWrapper.wrap(coreMessage); message.decode(); - long messageFormat = 0; Header header = null; final Properties properties = new Properties(); Map daMap = null; @@ -158,6 +147,12 @@ public class CoreAmqpConverter { maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(replyTo.toString())); } + long scheduledDelivery = coreMessage.getScheduledDeliveryTime(); + + if (scheduledDelivery > 0) { + maMap.put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledDelivery); + } + Object correlationID = message.getInnerMessage().getCorrelationID(); if (correlationID instanceof String || correlationID instanceof SimpleString) { String c = correlationID instanceof String ? ((String) correlationID) : ((SimpleString) correlationID).toString(); @@ -314,42 +309,8 @@ public class CoreAmqpConverter { apMap.put(key, objectProperty); } - ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); - - try { - EncoderImpl encoder = TLSEncode.getEncoder(); - encoder.setByteBuffer(new NettyWritable(buffer)); - - if (header != null) { - encoder.writeObject(header); - } - if (daMap != null) { - encoder.writeObject(new DeliveryAnnotations(daMap)); - } - encoder.writeObject(new MessageAnnotations(maMap)); - encoder.writeObject(properties); - if (apMap != null) { - encoder.writeObject(new ApplicationProperties(apMap)); - } - if (body != null) { - encoder.writeObject(body); - } - if (footerMap != null) { - encoder.writeObject(new Footer(footerMap)); - } - - byte[] data = new byte[buffer.writerIndex()]; - buffer.readBytes(data); - - AMQPMessage amqpMessage = new AMQPStandardMessage(messageFormat, data, null); - amqpMessage.setMessageID(message.getInnerMessage().getMessageID()); - amqpMessage.setReplyTo(coreMessage.getReplyTo()); - return amqpMessage; - - } finally { - TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null); - buffer.release(); - } + long messageID = message.getInnerMessage().getMessageID(); + return AMQPStandardMessage.createMessage(messageID, 0, replyTo, header, properties, daMap, maMap, apMap, footerMap, body); } private static Object decodeEmbeddedAMQPType(Object payload) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/DirectConvertTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/DirectConvertTest.java new file mode 100644 index 0000000000..1be184c125 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/DirectConvertTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.converter.message; + +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter; +import org.junit.Assert; +import org.junit.Test; + +public class DirectConvertTest { + + @Test + public void testConvertScheduledAMQPCore() { + long deliveryTime = System.currentTimeMillis() + 10_000; + AMQPStandardMessage standardMessage = AMQPStandardMessage.createMessage(1, 0, + null, null, null, + null, null, null, null, null); + standardMessage.setScheduledDeliveryTime(deliveryTime); + + ICoreMessage coreMessage = standardMessage.toCore(); + + Assert.assertEquals((Long)deliveryTime, coreMessage.getScheduledDeliveryTime()); + } + + + @Test + public void testConvertTTLdAMQPCore() { + long time = System.currentTimeMillis() + 10_000; + AMQPStandardMessage standardMessage = AMQPStandardMessage.createMessage(1, 0, + null, null, null, + null, null, null, null, null); + standardMessage.setExpiration(time); + + ICoreMessage coreMessage = standardMessage.toCore(); + + Assert.assertEquals(time, coreMessage.getExpiration()); + } + + @Test + public void testConvertScheduledCoreAMQP() throws Exception { + long deliveryTime = System.currentTimeMillis() + 10_000; + CoreMessage coreMessage = new CoreMessage(); + coreMessage.setScheduledDeliveryTime(deliveryTime); + coreMessage.initBuffer(1024); + + AMQPMessage amqpMessage = CoreAmqpConverter.fromCore(coreMessage, new NullStorageManager()); + Assert.assertEquals((Long)deliveryTime, amqpMessage.getScheduledDeliveryTime()); + } + + @Test + public void testConvertTTLCoreAMQP() throws Exception { + long time = System.currentTimeMillis() + 10_000; + CoreMessage coreMessage = new CoreMessage(); + coreMessage.setExpiration(time); + coreMessage.initBuffer(1024); + + AMQPMessage amqpMessage = CoreAmqpConverter.fromCore(coreMessage, new NullStorageManager()); + Assert.assertEquals(time, amqpMessage.getExpiration()); + } +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java index c4c527ea03..f89e0d7f65 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java @@ -149,7 +149,7 @@ public class MessageTransformationTest { AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core, null); assertEquals(10, outboudMessage.getApplicationProperties().getValue().size()); - assertEquals(4, outboudMessage.getMessageAnnotations().getValue().size()); + assertEquals(5, outboudMessage.getMessageAnnotations().getValue().size()); } private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPScheduledCoreOverBrokerConnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPScheduledCoreOverBrokerConnectTest.java new file mode 100644 index 0000000000..eaa55fc5b3 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPScheduledCoreOverBrokerConnectTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Assert; +import org.junit.Test; + +public class AMQPScheduledCoreOverBrokerConnectTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + + ActiveMQServer server_2; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + + @Test + public void testWithDeliveryDelayCoreSendingConversion() throws Exception { + String queueName = "withScheduled"; + server.setIdentity("targetServer"); + server.start(); + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST)); + + server_2 = createServer(AMQP_PORT_2, false); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setType(AMQPBrokerConnectionAddressType.MIRROR)); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + server_2.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(queueName).addRoutingType(RoutingType.ANYCAST)); + server_2.getConfiguration().addQueueConfiguration(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST)); + server_2.setIdentity("serverWithBridge"); + + server_2.start(); + Wait.assertTrue(server_2::isStarted); + + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2); + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(queueName)); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + producer.setDeliveryDelay(300_000); + producer.send(session.createMessage()); + + ConnectionFactory factory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + Connection connection2 = factory2.createConnection(); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection2.start(); + + MessageConsumer consumer = session2.createConsumer(session2.createQueue(queueName)); + Assert.assertNull(consumer.receive(500)); + } +}