mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3496 - reply-to header for Stomp
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1221199 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
882c5539d1
commit
1c264085d1
|
@ -16,15 +16,14 @@
|
|||
*/
|
||||
package org.apache.activemq.transport.stomp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Implementations of this interface are used to map back and forth from Stomp
|
||||
|
@ -39,7 +38,7 @@ public interface FrameTranslator {
|
|||
|
||||
String convertDestination(ProtocolConverter converter, Destination d);
|
||||
|
||||
ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws ProtocolException;
|
||||
ActiveMQDestination convertDestination(ProtocolConverter converter, String name, boolean forceFallback) throws ProtocolException;
|
||||
|
||||
/**
|
||||
* Helper class which holds commonly needed functions used when implementing
|
||||
|
@ -98,7 +97,7 @@ public interface FrameTranslator {
|
|||
public static void copyStandardHeadersFromFrameToMessage(ProtocolConverter converter, StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException {
|
||||
final Map<String, String> headers = new HashMap<String, String>(command.getHeaders());
|
||||
final String destination = headers.remove(Stomp.Headers.Send.DESTINATION);
|
||||
msg.setDestination(ft.convertDestination(converter, destination));
|
||||
msg.setDestination(ft.convertDestination(converter, destination, true));
|
||||
|
||||
// the standard JMS headers
|
||||
msg.setJMSCorrelationID(headers.remove(Stomp.Headers.Send.CORRELATION_ID));
|
||||
|
@ -122,7 +121,12 @@ public interface FrameTranslator {
|
|||
|
||||
o = headers.remove(Stomp.Headers.Send.REPLY_TO);
|
||||
if (o != null) {
|
||||
msg.setJMSReplyTo(ft.convertDestination(converter, (String)o));
|
||||
try {
|
||||
ActiveMQDestination dest = ft.convertDestination(converter, (String)o, false);
|
||||
msg.setJMSReplyTo(dest);
|
||||
} catch (ProtocolException pe) {
|
||||
msg.setStringProperty("reply-to", (String)o);
|
||||
}
|
||||
}
|
||||
|
||||
o = headers.remove(Stomp.Headers.Send.PERSISTENT);
|
||||
|
|
|
@ -16,26 +16,20 @@
|
|||
*/
|
||||
package org.apache.activemq.transport.stomp;
|
||||
|
||||
import com.thoughtworks.xstream.XStream;
|
||||
import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
|
||||
import org.apache.activemq.advisory.AdvisorySupport;
|
||||
import org.apache.activemq.command.*;
|
||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.advisory.AdvisorySupport;
|
||||
import org.apache.activemq.command.ActiveMQBytesMessage;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
import org.apache.activemq.command.DataStructure;
|
||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
|
||||
import com.thoughtworks.xstream.XStream;
|
||||
import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
|
||||
|
||||
/**
|
||||
* Implements ActiveMQ 4.0 translations
|
||||
*/
|
||||
|
@ -167,7 +161,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
|
|||
return buffer.toString();
|
||||
}
|
||||
|
||||
public ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws ProtocolException {
|
||||
public ActiveMQDestination convertDestination(ProtocolConverter converter, String name, boolean forceFallback) throws ProtocolException {
|
||||
if (name == null) {
|
||||
return null;
|
||||
} else if (name.startsWith("/queue/")) {
|
||||
|
@ -187,14 +181,16 @@ public class LegacyFrameTranslator implements FrameTranslator {
|
|||
} else if (name.startsWith("/temp-topic/")) {
|
||||
return converter.createTempDestination(name, true);
|
||||
} else {
|
||||
try {
|
||||
ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(name);
|
||||
if (fallback != null) {
|
||||
return fallback;
|
||||
if (forceFallback) {
|
||||
try {
|
||||
ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(name);
|
||||
if (fallback != null) {
|
||||
return fallback;
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
|
||||
+ "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e);
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
|
||||
+ "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e);
|
||||
}
|
||||
throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
|
||||
+ "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
|
||||
|
|
|
@ -16,60 +16,20 @@
|
|||
*/
|
||||
package org.apache.activemq.transport.stomp;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.broker.BrokerContext;
|
||||
import org.apache.activemq.broker.BrokerContextAware;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQTempQueue;
|
||||
import org.apache.activemq.command.ActiveMQTempTopic;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.command.CommandTypes;
|
||||
import org.apache.activemq.command.ConnectionError;
|
||||
import org.apache.activemq.command.ConnectionId;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.DestinationInfo;
|
||||
import org.apache.activemq.command.ExceptionResponse;
|
||||
import org.apache.activemq.command.LocalTransactionId;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.MessageDispatch;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.command.ProducerId;
|
||||
import org.apache.activemq.command.ProducerInfo;
|
||||
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||
import org.apache.activemq.command.Response;
|
||||
import org.apache.activemq.command.SessionId;
|
||||
import org.apache.activemq.command.SessionInfo;
|
||||
import org.apache.activemq.command.ShutdownInfo;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.command.TransactionInfo;
|
||||
import org.apache.activemq.command.*;
|
||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||
import org.apache.activemq.util.FactoryFinder;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
import org.apache.activemq.util.IdGenerator;
|
||||
import org.apache.activemq.util.IntrospectionSupport;
|
||||
import org.apache.activemq.util.LongSequenceGenerator;
|
||||
import org.apache.activemq.util.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import java.io.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* @author <a href="http://hiramchirino.com">chirino</a>
|
||||
*/
|
||||
|
@ -485,7 +445,7 @@ public class ProtocolConverter {
|
|||
throw new ProtocolException("SUBSCRIBE received without a subscription id!");
|
||||
}
|
||||
|
||||
ActiveMQDestination actualDest = translator.convertDestination(this, destination);
|
||||
ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
|
||||
|
||||
if (actualDest == null) {
|
||||
throw new ProtocolException("Invalid Destination.");
|
||||
|
@ -511,7 +471,7 @@ public class ProtocolConverter {
|
|||
|
||||
IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
|
||||
|
||||
consumerInfo.setDestination(translator.convertDestination(this, destination));
|
||||
consumerInfo.setDestination(translator.convertDestination(this, destination, true));
|
||||
|
||||
StompSubscription stompSubscription;
|
||||
if (!consumerInfo.isBrowser()) {
|
||||
|
@ -548,7 +508,7 @@ public class ProtocolConverter {
|
|||
ActiveMQDestination destination = null;
|
||||
Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
|
||||
if (o != null) {
|
||||
destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o);
|
||||
destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true);
|
||||
}
|
||||
|
||||
String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
|
||||
|
|
|
@ -16,31 +16,6 @@
|
|||
*/
|
||||
package org.apache.activemq.transport.stomp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.CombinationTestSupport;
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
|
@ -52,6 +27,18 @@ import org.apache.activemq.command.ActiveMQTextMessage;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.jms.*;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
import java.io.IOException;
|
||||
import java.net.*;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class StompTest extends CombinationTestSupport {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(StompTest.class);
|
||||
|
||||
|
@ -1707,6 +1694,27 @@ public class StompTest extends CombinationTestSupport {
|
|||
assertEquals(2, queueView.getDequeueCount());
|
||||
assertEquals(0, queueView.getQueueSize());
|
||||
}
|
||||
|
||||
public void testReplytoModification() throws Exception {
|
||||
String replyto = "some destination";
|
||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = stompConnection.receiveFrame();
|
||||
assertTrue(frame.startsWith("CONNECTED"));
|
||||
|
||||
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "reply-to:" + replyto + "\n\nhello world" + Stomp.NULL;
|
||||
stompConnection.sendFrame(frame);
|
||||
|
||||
StompFrame message = stompConnection.receive();
|
||||
assertTrue(message.getAction().equals("MESSAGE"));
|
||||
assertEquals(replyto, message.getHeaders().get("reply-to"));
|
||||
|
||||
stompConnection.sendFrame("DISCONNECT\n" + "\n\n" + Stomp.NULL);
|
||||
}
|
||||
|
||||
public void testReplyToDestinationNaming() throws Exception {
|
||||
|
||||
|
|
Loading…
Reference in New Issue