From 8a72f8906aea12c219e04f944b9306bf28849341 Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Tue, 12 Apr 2016 10:24:13 +0100 Subject: [PATCH] ARTEMIS-453 - AMQP reply-to property is stripped off message as it goes through broker TheJMSVendor protocol convertor class was not creating the destinations so any destination calls, setTo and setJMSReplyTo, were ignored. Ive added a server side destination class to bypass the naming checks we have on the client and this now sets everything correctly https://issues.apache.org/jira/browse/ARTEMIS-453 --- .../proton/converter/ActiveMQJMSVendor.java | 7 ++- .../converter/jms/ServerDestination.java | 29 +++++++++++ .../converter/jms/ServerJMSMessage.java | 10 +--- .../ProtonSessionIntegrationCallback.java | 4 -- .../tests/integration/proton/ProtonTest.java | 48 +++++++++++++++++++ 5 files changed, 85 insertions(+), 13 deletions(-) create mode 100644 artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerDestination.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java index 7d8e6850e3..639b390fb4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java @@ -24,6 +24,8 @@ import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; +import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.qpid.proton.jms.JMSVendor; import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSBytesMessage; @@ -80,7 +82,7 @@ public class ActiveMQJMSVendor extends JMSVendor { @Override @SuppressWarnings("deprecation") public Destination createDestination(String name) { - return super.createDestination(name); + return new ServerDestination(name); } @Override @@ -121,6 +123,9 @@ public class ActiveMQJMSVendor extends JMSVendor { @Override public String toAddress(Destination destination) { + if (destination instanceof ActiveMQDestination) { + return ((ActiveMQDestination) destination).getAddress(); + } return null; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerDestination.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerDestination.java new file mode 100644 index 0000000000..09a0ae5bf3 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerDestination.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter.jms; + +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; + +/** + * This is just here to avoid all the client checks we ned with valid JMS destinations, protocol convertors don't need to + * adhere to the jms. semantics. + */ +public class ServerDestination extends ActiveMQDestination { + public ServerDestination(String name) { + super(name, name, false, false, null); + } +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java index 7902fa0df6..8f6ef9b2ee 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/jms/ServerJMSMessage.java @@ -28,7 +28,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.activemq.artemis.jms.client.ActiveMQQueue; import org.apache.activemq.artemis.reader.MessageUtil; public class ServerJMSMessage implements Message { @@ -112,7 +111,7 @@ public class ServerJMSMessage implements Message { public final Destination getJMSReplyTo() throws JMSException { SimpleString reply = MessageUtil.getJMSReplyTo(message); if (reply != null) { - return ActiveMQDestination.fromAddress(reply.toString()); + return new ServerDestination(reply.toString()); } else { return null; @@ -133,12 +132,7 @@ public class ServerJMSMessage implements Message { return null; } else { - if (!sdest.toString().startsWith("jms.")) { - return new ActiveMQQueue(sdest.toString(), sdest.toString()); - } - else { - return ActiveMQDestination.fromAddress(sdest.toString()); - } + return new ServerDestination(sdest.toString()); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 1c6ea01f3d..aa42a9232e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -298,10 +298,6 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex()); ServerMessage message = manager.getConverter().inbound(encodedMessage); - //use the address on the receiver if not null, if null let's hope it was set correctly on the message - if (address != null) { - message.setAddress(new SimpleString(address)); - } recoverContext(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index b32f1fa730..efd5a85ee1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -20,6 +20,7 @@ import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; +import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MapMessage; @@ -163,6 +164,53 @@ public class ProtonTest extends ActiveMQTestBase { } + + @Test + public void testReplyTo() throws Throwable { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = session.createTemporaryQueue(); + System.out.println("queue:" + queue.getQueueName()); + MessageProducer p = session.createProducer(queue); + + TextMessage message = session.createTextMessage(); + message.setText("Message temporary"); + message.setJMSReplyTo(createQueue(address)); + p.send(message); + + MessageConsumer cons = session.createConsumer(queue); + connection.start(); + + message = (TextMessage) cons.receive(5000); + Destination jmsReplyTo = message.getJMSReplyTo(); + Assert.assertNotNull(jmsReplyTo); + Assert.assertNotNull(message); + + } + + @Test + public void testReplyToNonJMS() throws Throwable { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = session.createTemporaryQueue(); + System.out.println("queue:" + queue.getQueueName()); + MessageProducer p = session.createProducer(queue); + + TextMessage message = session.createTextMessage(); + message.setText("Message temporary"); + message.setJMSReplyTo(createQueue("someaddress")); + p.send(message); + + MessageConsumer cons = session.createConsumer(queue); + connection.start(); + + message = (TextMessage) cons.receive(5000); + Destination jmsReplyTo = message.getJMSReplyTo(); + Assert.assertNotNull(jmsReplyTo); + Assert.assertNotNull(message); + + } + /* // Uncomment testLoopBrowser to validate the hunging on the test @Test