diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java index 4315a78dba..3a6c7d845e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java @@ -33,13 +33,13 @@ import org.apache.activemq.command.ActiveMQMessage; * from one to the other */ public interface FrameTranslator { - ActiveMQMessage convertFrame(StompFrame frame) throws JMSException, ProtocolException; + ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame frame) throws JMSException, ProtocolException; - StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException; + StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException; - String convertDestination(Destination d); + String convertDestination(ProtocolConverter converter, Destination d); - ActiveMQDestination convertDestination(String name) throws ProtocolException; + ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws ProtocolException; /** * Helper class which holds commonly needed functions used when implementing @@ -50,9 +50,9 @@ public interface FrameTranslator { private Helper() { } - public static void copyStandardHeadersFromMessageToFrame(ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException { + public static void copyStandardHeadersFromMessageToFrame(ProtocolConverter converter, ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException { final Map headers = command.getHeaders(); - headers.put(Stomp.Headers.Message.DESTINATION, ft.convertDestination(message.getDestination())); + headers.put(Stomp.Headers.Message.DESTINATION, ft.convertDestination(converter, message.getDestination())); headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID()); if (message.getJMSCorrelationID() != null) { @@ -66,7 +66,7 @@ public interface FrameTranslator { headers.put(Stomp.Headers.Message.PRORITY, "" + message.getJMSPriority()); if (message.getJMSReplyTo() != null) { - headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertDestination(message.getJMSReplyTo())); + headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertDestination(converter, message.getJMSReplyTo())); } headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getJMSTimestamp()); @@ -83,10 +83,10 @@ public interface FrameTranslator { } } - public static void copyStandardHeadersFromFrameToMessage(StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException { + public static void copyStandardHeadersFromFrameToMessage(ProtocolConverter converter, StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException { final Map headers = new HashMap(command.getHeaders()); final String destination = headers.remove(Stomp.Headers.Send.DESTINATION); - msg.setDestination(ft.convertDestination(destination)); + msg.setDestination(ft.convertDestination(converter, destination)); // the standard JMS headers msg.setJMSCorrelationID(headers.remove(Stomp.Headers.Send.CORRELATION_ID)); @@ -108,7 +108,7 @@ public interface FrameTranslator { o = headers.remove(Stomp.Headers.Send.REPLY_TO); if (o != null) { - msg.setJMSReplyTo(ft.convertDestination((String)o)); + msg.setJMSReplyTo(ft.convertDestination(converter, (String)o)); } o = headers.remove(Stomp.Headers.Send.PERSISTENT); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java index 64623aee71..6cb48883e3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java @@ -32,7 +32,9 @@ import org.apache.activemq.command.ActiveMQTextMessage; * Implements ActiveMQ 4.0 translations */ public class LegacyFrameTranslator implements FrameTranslator { - public ActiveMQMessage convertFrame(StompFrame command) throws JMSException, ProtocolException { + + + public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException { final Map headers = command.getHeaders(); final ActiveMQMessage msg; if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) { @@ -49,17 +51,17 @@ public class LegacyFrameTranslator implements FrameTranslator { } msg = text; } - FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(command, msg, this); + FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this); return msg; } - public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException { + public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException { StompFrame command = new StompFrame(); command.setAction(Stomp.Responses.MESSAGE); Map headers = new HashMap(25); command.setHeaders(headers); - FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(message, command, this); + FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this); if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { @@ -79,23 +81,28 @@ public class LegacyFrameTranslator implements FrameTranslator { return command; } - public String convertDestination(Destination d) { + public String convertDestination(ProtocolConverter converter, Destination d) { if (d == null) { return null; } ActiveMQDestination activeMQDestination = (ActiveMQDestination)d; String physicalName = activeMQDestination.getPhysicalName(); + String rc = converter.getCreatedTempDestinationName(activeMQDestination); + if( rc!=null ) { + return rc; + } + StringBuffer buffer = new StringBuffer(); if (activeMQDestination.isQueue()) { if (activeMQDestination.isTemporary()) { - buffer.append("/temp-queue/"); + buffer.append("/remote-temp-queue/"); } else { buffer.append("/queue/"); } } else { if (activeMQDestination.isTemporary()) { - buffer.append("/temp-topic/"); + buffer.append("/remote-temp-topic/"); } else { buffer.append("/topic/"); } @@ -104,7 +111,7 @@ public class LegacyFrameTranslator implements FrameTranslator { return buffer.toString(); } - public ActiveMQDestination convertDestination(String name) throws ProtocolException { + public ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws ProtocolException { if (name == null) { return null; } else if (name.startsWith("/queue/")) { @@ -113,12 +120,16 @@ public class LegacyFrameTranslator implements FrameTranslator { } else if (name.startsWith("/topic/")) { String tName = name.substring("/topic/".length(), name.length()); return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE); - } else if (name.startsWith("/temp-queue/")) { - String tName = name.substring("/temp-queue/".length(), name.length()); + } else if (name.startsWith("/remote-temp-queue/")) { + String tName = name.substring("/remote-temp-queue/".length(), name.length()); return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE); - } else if (name.startsWith("/temp-topic/")) { - String tName = name.substring("/temp-topic/".length(), name.length()); + } else if (name.startsWith("/remote-temp-topic/")) { + String tName = name.substring("/remote-temp-topic/".length(), name.length()); return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE); + } else if (name.startsWith("/temp-queue/")) { + return converter.createTempQueue(name); + } else if (name.startsWith("/temp-topic/")) { + return converter.createTempTopic(name); } else { throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " + "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/"); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index e063bbc7c3..d33b3f5a21 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -25,15 +25,19 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.Destination; import javax.jms.JMSException; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; @@ -65,9 +69,12 @@ public class ProtocolConverter { private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); + private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap(); private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); + private final ConcurrentHashMap tempDestinations = new ConcurrentHashMap(); + private final ConcurrentHashMap tempDestinationAmqToStompMap = new ConcurrentHashMap(); private final Map transactions = new ConcurrentHashMap(); private final StompTransportFilter transportFilter; @@ -325,7 +332,7 @@ public class ProtocolConverter { String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID); String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION); - ActiveMQDestination actualDest = frameTranslator.convertDestination(destination); + ActiveMQDestination actualDest = frameTranslator.convertDestination(this, destination); ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); ConsumerInfo consumerInfo = new ConsumerInfo(id); consumerInfo.setPrefetchSize(1000); @@ -336,7 +343,7 @@ public class ProtocolConverter { IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); - consumerInfo.setDestination(frameTranslator.convertDestination(destination)); + consumerInfo.setDestination(frameTranslator.convertDestination(this, destination)); StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo); stompSubscription.setDestination(actualDest); @@ -360,7 +367,7 @@ public class ProtocolConverter { ActiveMQDestination destination = null; Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION); if (o != null) { - destination = frameTranslator.convertDestination((String)o); + destination = frameTranslator.convertDestination(this, (String)o); } String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID); @@ -489,15 +496,40 @@ public class ProtocolConverter { } public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException { - ActiveMQMessage msg = frameTranslator.convertFrame(command); + ActiveMQMessage msg = frameTranslator.convertFrame(this, command); return msg; } public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException { - return frameTranslator.convertMessage(message); + return frameTranslator.convertMessage(this, message); } public StompTransportFilter getTransportFilter() { return transportFilter; } + + public ActiveMQDestination createTempQueue(String name) { + ActiveMQDestination rc = tempDestinations.get(name); + if( rc == null ) { + rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); + sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); + tempDestinations.put(name, rc); + } + return rc; + } + + public ActiveMQDestination createTempTopic(String name) { + ActiveMQDestination rc = tempDestinations.get(name); + if( rc == null ) { + rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); + sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); + tempDestinations.put(name, rc); + tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); + } + return rc; + } + + public String getCreatedTempDestinationName(ActiveMQDestination destination) { + return tempDestinationAmqToStompMap.get(destination.getQualifiedName()); + } }