ARTEMIS-3116 Fixing Core->AMQP Conversion of Scheduled Delivery Time

This commit is contained in:
Clebert Suconic 2021-02-12 10:56:32 -05:00
parent 8ff072a4e8
commit 7d10e915b6
5 changed files with 245 additions and 48 deletions

View File

@ -17,17 +17,27 @@
package org.apache.activemq.artemis.protocol.amqp.broker; package org.apache.activemq.artemis.protocol.amqp.broker;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Footer;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.EncoderImpl; import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.codec.WritableBuffer;
@ -35,6 +45,59 @@ import org.apache.qpid.proton.codec.WritableBuffer;
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
public class AMQPStandardMessage extends AMQPMessage { public class AMQPStandardMessage extends AMQPMessage {
public static AMQPStandardMessage createMessage(long messageID,
long messageFormat,
SimpleString replyTo,
Header header,
Properties properties,
Map<Symbol, Object> daMap,
Map<Symbol, Object> maMap,
Map<String, Object> apMap,
Map<Symbol, Object> footerMap,
Section body) {
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
try {
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(new NettyWritable(buffer));
if (header != null) {
encoder.writeObject(header);
}
if (daMap != null) {
encoder.writeObject(new DeliveryAnnotations(daMap));
}
if (maMap != null) {
encoder.writeObject(new MessageAnnotations(maMap));
}
if (properties != null) {
encoder.writeObject(properties);
}
if (apMap != null) {
encoder.writeObject(new ApplicationProperties(apMap));
}
if (body != null) {
encoder.writeObject(body);
}
if (footerMap != null) {
encoder.writeObject(new Footer(footerMap));
}
byte[] data = new byte[buffer.writerIndex()];
buffer.readBytes(data);
AMQPStandardMessage amqpMessage = new AMQPStandardMessage(messageFormat, data, null);
amqpMessage.setMessageID(messageID);
amqpMessage.setReplyTo(replyTo);
return amqpMessage;
} finally {
TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
buffer.release();
}
}
// Buffer and state for the data backing this message. // Buffer and state for the data backing this message.
protected ReadableBuffer data; protected ReadableBuffer data;

View File

@ -57,7 +57,6 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.ConversionException; import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.ConversionException;
import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.CoreMessageWrapper; import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.CoreMessageWrapper;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil; import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
@ -65,22 +64,13 @@ import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte; import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Footer;
import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader; import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
public class CoreAmqpConverter { public class CoreAmqpConverter {
private static Logger logger = Logger.getLogger(CoreAmqpConverter.class); private static Logger logger = Logger.getLogger(CoreAmqpConverter.class);
@ -108,7 +98,6 @@ public class CoreAmqpConverter {
CoreMessageWrapper message = CoreMessageWrapper.wrap(coreMessage); CoreMessageWrapper message = CoreMessageWrapper.wrap(coreMessage);
message.decode(); message.decode();
long messageFormat = 0;
Header header = null; Header header = null;
final Properties properties = new Properties(); final Properties properties = new Properties();
Map<Symbol, Object> daMap = null; Map<Symbol, Object> daMap = null;
@ -158,6 +147,12 @@ public class CoreAmqpConverter {
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(replyTo.toString())); maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(replyTo.toString()));
} }
long scheduledDelivery = coreMessage.getScheduledDeliveryTime();
if (scheduledDelivery > 0) {
maMap.put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledDelivery);
}
Object correlationID = message.getInnerMessage().getCorrelationID(); Object correlationID = message.getInnerMessage().getCorrelationID();
if (correlationID instanceof String || correlationID instanceof SimpleString) { if (correlationID instanceof String || correlationID instanceof SimpleString) {
String c = correlationID instanceof String ? ((String) correlationID) : ((SimpleString) correlationID).toString(); String c = correlationID instanceof String ? ((String) correlationID) : ((SimpleString) correlationID).toString();
@ -314,42 +309,8 @@ public class CoreAmqpConverter {
apMap.put(key, objectProperty); apMap.put(key, objectProperty);
} }
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); long messageID = message.getInnerMessage().getMessageID();
return AMQPStandardMessage.createMessage(messageID, 0, replyTo, header, properties, daMap, maMap, apMap, footerMap, body);
try {
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(new NettyWritable(buffer));
if (header != null) {
encoder.writeObject(header);
}
if (daMap != null) {
encoder.writeObject(new DeliveryAnnotations(daMap));
}
encoder.writeObject(new MessageAnnotations(maMap));
encoder.writeObject(properties);
if (apMap != null) {
encoder.writeObject(new ApplicationProperties(apMap));
}
if (body != null) {
encoder.writeObject(body);
}
if (footerMap != null) {
encoder.writeObject(new Footer(footerMap));
}
byte[] data = new byte[buffer.writerIndex()];
buffer.readBytes(data);
AMQPMessage amqpMessage = new AMQPStandardMessage(messageFormat, data, null);
amqpMessage.setMessageID(message.getInnerMessage().getMessageID());
amqpMessage.setReplyTo(coreMessage.getReplyTo());
return amqpMessage;
} finally {
TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
buffer.release();
}
} }
private static Object decodeEmbeddedAMQPType(Object payload) { private static Object decodeEmbeddedAMQPType(Object payload) {

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
import org.junit.Assert;
import org.junit.Test;
public class DirectConvertTest {
@Test
public void testConvertScheduledAMQPCore() {
long deliveryTime = System.currentTimeMillis() + 10_000;
AMQPStandardMessage standardMessage = AMQPStandardMessage.createMessage(1, 0,
null, null, null,
null, null, null, null, null);
standardMessage.setScheduledDeliveryTime(deliveryTime);
ICoreMessage coreMessage = standardMessage.toCore();
Assert.assertEquals((Long)deliveryTime, coreMessage.getScheduledDeliveryTime());
}
@Test
public void testConvertTTLdAMQPCore() {
long time = System.currentTimeMillis() + 10_000;
AMQPStandardMessage standardMessage = AMQPStandardMessage.createMessage(1, 0,
null, null, null,
null, null, null, null, null);
standardMessage.setExpiration(time);
ICoreMessage coreMessage = standardMessage.toCore();
Assert.assertEquals(time, coreMessage.getExpiration());
}
@Test
public void testConvertScheduledCoreAMQP() throws Exception {
long deliveryTime = System.currentTimeMillis() + 10_000;
CoreMessage coreMessage = new CoreMessage();
coreMessage.setScheduledDeliveryTime(deliveryTime);
coreMessage.initBuffer(1024);
AMQPMessage amqpMessage = CoreAmqpConverter.fromCore(coreMessage, new NullStorageManager());
Assert.assertEquals((Long)deliveryTime, amqpMessage.getScheduledDeliveryTime());
}
@Test
public void testConvertTTLCoreAMQP() throws Exception {
long time = System.currentTimeMillis() + 10_000;
CoreMessage coreMessage = new CoreMessage();
coreMessage.setExpiration(time);
coreMessage.initBuffer(1024);
AMQPMessage amqpMessage = CoreAmqpConverter.fromCore(coreMessage, new NullStorageManager());
Assert.assertEquals(time, amqpMessage.getExpiration());
}
}

View File

@ -149,7 +149,7 @@ public class MessageTransformationTest {
AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core, null); AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core, null);
assertEquals(10, outboudMessage.getApplicationProperties().getValue().size()); assertEquals(10, outboudMessage.getApplicationProperties().getValue().size());
assertEquals(4, outboudMessage.getMessageAnnotations().getValue().size()); assertEquals(5, outboudMessage.getMessageAnnotations().getValue().size());
} }
private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) { private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;
public class AMQPScheduledCoreOverBrokerConnectTest extends AmqpClientTestSupport {
protected static final int AMQP_PORT_2 = 5673;
ActiveMQServer server_2;
@Override
protected ActiveMQServer createServer() throws Exception {
return createServer(AMQP_PORT, false);
}
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Test
public void testWithDeliveryDelayCoreSendingConversion() throws Exception {
String queueName = "withScheduled";
server.setIdentity("targetServer");
server.start();
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
server_2 = createServer(AMQP_PORT_2, false);
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setType(AMQPBrokerConnectionAddressType.MIRROR));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
server_2.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(queueName).addRoutingType(RoutingType.ANYCAST));
server_2.getConfiguration().addQueueConfiguration(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
server_2.setIdentity("serverWithBridge");
server_2.start();
Wait.assertTrue(server_2::isStarted);
ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(queueName));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.setDeliveryDelay(300_000);
producer.send(session.createMessage());
ConnectionFactory factory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
Connection connection2 = factory2.createConnection();
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection2.start();
MessageConsumer consumer = session2.createConsumer(session2.createQueue(queueName));
Assert.assertNull(consumer.receive(500));
}
}