ARTEMIS-453 - AMQP reply-to property is stripped off message as it goes through broker

TheJMSVendor protocol convertor class was not creating the destinations so any destination calls, setTo and setJMSReplyTo, were ignored. Ive added a server side destination class to bypass the naming checks we have on the client and this now sets everything correctly

https://issues.apache.org/jira/browse/ARTEMIS-453
This commit is contained in:
Andy Taylor 2016-04-12 10:24:13 +01:00
parent fb4ca299ae
commit 8a72f8906a
5 changed files with 85 additions and 13 deletions

View File

@ -24,6 +24,8 @@ import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.qpid.proton.jms.JMSVendor;
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSBytesMessage;
@ -80,7 +82,7 @@ public class ActiveMQJMSVendor extends JMSVendor {
@Override
@SuppressWarnings("deprecation")
public Destination createDestination(String name) {
return super.createDestination(name);
return new ServerDestination(name);
}
@Override
@ -121,6 +123,9 @@ public class ActiveMQJMSVendor extends JMSVendor {
@Override
public String toAddress(Destination destination) {
if (destination instanceof ActiveMQDestination) {
return ((ActiveMQDestination) destination).getAddress();
}
return null;
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.proton.converter.jms;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
/**
* This is just here to avoid all the client checks we ned with valid JMS destinations, protocol convertors don't need to
* adhere to the jms. semantics.
*/
public class ServerDestination extends ActiveMQDestination {
public ServerDestination(String name) {
super(name, name, false, false, null);
}
}

View File

@ -28,7 +28,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.apache.activemq.artemis.reader.MessageUtil;
public class ServerJMSMessage implements Message {
@ -112,7 +111,7 @@ public class ServerJMSMessage implements Message {
public final Destination getJMSReplyTo() throws JMSException {
SimpleString reply = MessageUtil.getJMSReplyTo(message);
if (reply != null) {
return ActiveMQDestination.fromAddress(reply.toString());
return new ServerDestination(reply.toString());
}
else {
return null;
@ -133,12 +132,7 @@ public class ServerJMSMessage implements Message {
return null;
}
else {
if (!sdest.toString().startsWith("jms.")) {
return new ActiveMQQueue(sdest.toString(), sdest.toString());
}
else {
return ActiveMQDestination.fromAddress(sdest.toString());
}
return new ServerDestination(sdest.toString());
}
}

View File

@ -298,10 +298,6 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex());
ServerMessage message = manager.getConverter().inbound(encodedMessage);
//use the address on the receiver if not null, if null let's hope it was set correctly on the message
if (address != null) {
message.setAddress(new SimpleString(address));
}
recoverContext();

View File

@ -20,6 +20,7 @@ import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MapMessage;
@ -163,6 +164,53 @@ public class ProtonTest extends ActiveMQTestBase {
}
@Test
public void testReplyTo() throws Throwable {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
System.out.println("queue:" + queue.getQueueName());
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();
message.setText("Message temporary");
message.setJMSReplyTo(createQueue(address));
p.send(message);
MessageConsumer cons = session.createConsumer(queue);
connection.start();
message = (TextMessage) cons.receive(5000);
Destination jmsReplyTo = message.getJMSReplyTo();
Assert.assertNotNull(jmsReplyTo);
Assert.assertNotNull(message);
}
@Test
public void testReplyToNonJMS() throws Throwable {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
System.out.println("queue:" + queue.getQueueName());
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();
message.setText("Message temporary");
message.setJMSReplyTo(createQueue("someaddress"));
p.send(message);
MessageConsumer cons = session.createConsumer(queue);
connection.start();
message = (TextMessage) cons.receive(5000);
Destination jmsReplyTo = message.getJMSReplyTo();
Assert.assertNotNull(jmsReplyTo);
Assert.assertNotNull(message);
}
/*
// Uncomment testLoopBrowser to validate the hunging on the test
@Test