From 122745b5b8b55b41d329e5457796229a15146d19 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 13 Aug 2019 15:05:00 -0400 Subject: [PATCH] AMQ-7274 Update proton-j and fix footer key type on outbound transform Footer annotations maps should be using Symbol keys not strings. --- .../JMSMappingOutboundTransformer.java | 2 +- .../transport/amqp/AmqpTransformerTest.java | 65 +++++++++++++++++ .../amqp/interop/AmqpSendReceiveTest.java | 69 +++++++++++++++++++ pom.xml | 4 +- 4 files changed, 137 insertions(+), 3 deletions(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java index 346a54c985..ffe9ccc33a 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java @@ -319,7 +319,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer { footerMap = new HashMap<>(); } String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length()); - footerMap.put(name, value); + footerMap.put(Symbol.valueOf(name), value); continue; } } else if (key.startsWith(AMQ_SCHEDULED_MESSAGE_PREFIX )) { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java index 201cee2ff8..deb0c0dd22 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.net.URI; +import java.util.LinkedHashMap; +import java.util.Map; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -36,6 +38,11 @@ import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; import org.junit.After; import org.junit.Test; import org.slf4j.Logger; @@ -188,6 +195,54 @@ public class AmqpTransformerTest { openwireConn.close(); } + @Test(timeout = 60000) + public void testSendAMQPMessageWithComplexAnnotationsReceiveCore() throws Exception { + startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=jms")); + + URI remoteURI = new URI("tcp://" + amqpConnectionURI.getHost() + ":" + amqpConnectionURI.getPort()); + AmqpClient client = new AmqpClient(remoteURI, null, null); + AmqpConnection connection = client.connect(); + try { + connection.connect(); + + String annotation = "x-opt-embedded-map"; + Map embeddedMap = new LinkedHashMap<>(); + embeddedMap.put("test-key-1", "value-1"); + embeddedMap.put("test-key-2", "value-2"); + embeddedMap.put("test-key-3", "value-3"); + + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(TEST_QUEUE); + AmqpMessage message = createAmqpMessage((byte) 'A', 65535); + + message.setApplicationProperty("IntProperty", 42); + message.setDurable(true); + message.setMessageAnnotation(annotation, embeddedMap); + sender.send(message); + + session.close(); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireConnectionURI); + Connection connection2 = factory.createConnection(); + try { + + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection2.start(); + MessageConsumer consumer = session2.createConsumer(session2.createQueue(TEST_QUEUE)); + + Message received = consumer.receive(5000); + assertNotNull(received); + assertEquals(42, received.getIntProperty("IntProperty")); + + connection2.close(); + } finally { + connection2.close(); + } + } finally { + connection.close(); + } + } + public void startBrokerWithAmqpTransport(String amqpUrl) throws Exception { brokerService = new BrokerService(); brokerService.setPersistent(false); @@ -211,4 +266,14 @@ public class AmqpTransformerTest { brokerService = null; } } + + private AmqpMessage createAmqpMessage(byte value, int payloadSize) { + AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[payloadSize]; + for (int i = 0; i < payload.length; i++) { + payload[i] = value; + } + message.setBytes(payload); + return message; + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java index a86edbb487..67e0dc9a4b 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java @@ -23,8 +23,10 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -47,10 +49,13 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.util.Wait; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedByte; import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; @@ -65,6 +70,8 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class); + private static final int PAYLOAD = 110 * 1024; + @Test(timeout = 60000) public void testSimpleSendOneReceiveOneToQueue() throws Exception { doTestSimpleSendOneReceiveOne(Queue.class); @@ -858,4 +865,66 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { receiver.close(); connection.close(); } + + @Test(timeout = 60000) + public void testSendAMQPMessageWithComplexAnnotationsReceiveAMQP() throws Exception { + String testQueueName = "ConnectionFrameSize"; + int nMsgs = 200; + + AmqpClient client = createAmqpClient(); + + Symbol annotation = Symbol.valueOf("x-opt-embedded-map"); + Map embeddedMap = new LinkedHashMap<>(); + embeddedMap.put("test-key-1", "value-1"); + embeddedMap.put("test-key-2", "value-2"); + embeddedMap.put("test-key-3", "value-3"); + + { + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(testQueueName); + AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD); + + message.setApplicationProperty("IntProperty", 42); + message.setDurable(true); + message.setMessageAnnotation(annotation.toString(), embeddedMap); + sender.send(message); + session.close(); + connection.close(); + } + + { + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(testQueueName); + receiver.flow(nMsgs); + + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("Failed to read message with embedded map in annotations", message); + MessageImpl wrapped = (MessageImpl) message.getWrappedMessage(); + if (wrapped.getBody() instanceof Data) { + Data data = (Data) wrapped.getBody(); + System.out.println("received : message: " + data.getValue().getLength()); + assertEquals(PAYLOAD, data.getValue().getLength()); + } + + assertNotNull(message.getWrappedMessage().getMessageAnnotations()); + assertNotNull(message.getWrappedMessage().getMessageAnnotations().getValue()); + assertEquals(embeddedMap, message.getWrappedMessage().getMessageAnnotations().getValue().get(annotation)); + + message.accept(); + session.close(); + connection.close(); + } + } + + private AmqpMessage createAmqpMessage(byte value, int payloadSize) { + AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[payloadSize]; + for (int i = 0; i < payload.length; i++) { + payload[i] = value; + } + message.setBytes(payload); + return message; + } } diff --git a/pom.xml b/pom.xml index 32cb379c8a..8194084b18 100644 --- a/pom.xml +++ b/pom.xml @@ -104,10 +104,10 @@ 1.1.2 1.4.0 3.4.14 - 0.33.1 + 0.33.2 0.44.0 4.1.37.Final - 0.33.1 + 0.33.2 4.1.37.Final 1.3 1.12.1