Added better temp destination support. We now properly create temp destinations that the client can subscribe to

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@592531 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-11-06 19:44:58 +00:00
parent ada47fbc50
commit 04a637916e
3 changed files with 70 additions and 27 deletions

View File

@ -33,13 +33,13 @@ import org.apache.activemq.command.ActiveMQMessage;
* from one to the other * from one to the other
*/ */
public interface FrameTranslator { public interface FrameTranslator {
ActiveMQMessage convertFrame(StompFrame frame) throws JMSException, ProtocolException; ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame frame) throws JMSException, ProtocolException;
StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException; StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException;
String convertDestination(Destination d); String convertDestination(ProtocolConverter converter, Destination d);
ActiveMQDestination convertDestination(String name) throws ProtocolException; ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws ProtocolException;
/** /**
* Helper class which holds commonly needed functions used when implementing * Helper class which holds commonly needed functions used when implementing
@ -50,9 +50,9 @@ public interface FrameTranslator {
private Helper() { private Helper() {
} }
public static void copyStandardHeadersFromMessageToFrame(ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException { public static void copyStandardHeadersFromMessageToFrame(ProtocolConverter converter, ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException {
final Map<String, String> headers = command.getHeaders(); final Map<String, String> headers = command.getHeaders();
headers.put(Stomp.Headers.Message.DESTINATION, ft.convertDestination(message.getDestination())); headers.put(Stomp.Headers.Message.DESTINATION, ft.convertDestination(converter, message.getDestination()));
headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID()); headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
if (message.getJMSCorrelationID() != null) { if (message.getJMSCorrelationID() != null) {
@ -66,7 +66,7 @@ public interface FrameTranslator {
headers.put(Stomp.Headers.Message.PRORITY, "" + message.getJMSPriority()); headers.put(Stomp.Headers.Message.PRORITY, "" + message.getJMSPriority());
if (message.getJMSReplyTo() != null) { if (message.getJMSReplyTo() != null) {
headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertDestination(message.getJMSReplyTo())); headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertDestination(converter, message.getJMSReplyTo()));
} }
headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getJMSTimestamp()); headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getJMSTimestamp());
@ -83,10 +83,10 @@ public interface FrameTranslator {
} }
} }
public static void copyStandardHeadersFromFrameToMessage(StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException { public static void copyStandardHeadersFromFrameToMessage(ProtocolConverter converter, StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException {
final Map<String, String> headers = new HashMap<String, String>(command.getHeaders()); final Map<String, String> headers = new HashMap<String, String>(command.getHeaders());
final String destination = headers.remove(Stomp.Headers.Send.DESTINATION); final String destination = headers.remove(Stomp.Headers.Send.DESTINATION);
msg.setDestination(ft.convertDestination(destination)); msg.setDestination(ft.convertDestination(converter, destination));
// the standard JMS headers // the standard JMS headers
msg.setJMSCorrelationID(headers.remove(Stomp.Headers.Send.CORRELATION_ID)); msg.setJMSCorrelationID(headers.remove(Stomp.Headers.Send.CORRELATION_ID));
@ -108,7 +108,7 @@ public interface FrameTranslator {
o = headers.remove(Stomp.Headers.Send.REPLY_TO); o = headers.remove(Stomp.Headers.Send.REPLY_TO);
if (o != null) { if (o != null) {
msg.setJMSReplyTo(ft.convertDestination((String)o)); msg.setJMSReplyTo(ft.convertDestination(converter, (String)o));
} }
o = headers.remove(Stomp.Headers.Send.PERSISTENT); o = headers.remove(Stomp.Headers.Send.PERSISTENT);

View File

@ -32,7 +32,9 @@ import org.apache.activemq.command.ActiveMQTextMessage;
* Implements ActiveMQ 4.0 translations * Implements ActiveMQ 4.0 translations
*/ */
public class LegacyFrameTranslator implements FrameTranslator { public class LegacyFrameTranslator implements FrameTranslator {
public ActiveMQMessage convertFrame(StompFrame command) throws JMSException, ProtocolException {
public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
final Map headers = command.getHeaders(); final Map headers = command.getHeaders();
final ActiveMQMessage msg; final ActiveMQMessage msg;
if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) { if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
@ -49,17 +51,17 @@ public class LegacyFrameTranslator implements FrameTranslator {
} }
msg = text; msg = text;
} }
FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(command, msg, this); FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
return msg; return msg;
} }
public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException { public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException {
StompFrame command = new StompFrame(); StompFrame command = new StompFrame();
command.setAction(Stomp.Responses.MESSAGE); command.setAction(Stomp.Responses.MESSAGE);
Map<String, String> headers = new HashMap<String, String>(25); Map<String, String> headers = new HashMap<String, String>(25);
command.setHeaders(headers); command.setHeaders(headers);
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(message, command, this); FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
@ -79,23 +81,28 @@ public class LegacyFrameTranslator implements FrameTranslator {
return command; return command;
} }
public String convertDestination(Destination d) { public String convertDestination(ProtocolConverter converter, Destination d) {
if (d == null) { if (d == null) {
return null; return null;
} }
ActiveMQDestination activeMQDestination = (ActiveMQDestination)d; ActiveMQDestination activeMQDestination = (ActiveMQDestination)d;
String physicalName = activeMQDestination.getPhysicalName(); String physicalName = activeMQDestination.getPhysicalName();
String rc = converter.getCreatedTempDestinationName(activeMQDestination);
if( rc!=null ) {
return rc;
}
StringBuffer buffer = new StringBuffer(); StringBuffer buffer = new StringBuffer();
if (activeMQDestination.isQueue()) { if (activeMQDestination.isQueue()) {
if (activeMQDestination.isTemporary()) { if (activeMQDestination.isTemporary()) {
buffer.append("/temp-queue/"); buffer.append("/remote-temp-queue/");
} else { } else {
buffer.append("/queue/"); buffer.append("/queue/");
} }
} else { } else {
if (activeMQDestination.isTemporary()) { if (activeMQDestination.isTemporary()) {
buffer.append("/temp-topic/"); buffer.append("/remote-temp-topic/");
} else { } else {
buffer.append("/topic/"); buffer.append("/topic/");
} }
@ -104,7 +111,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
return buffer.toString(); return buffer.toString();
} }
public ActiveMQDestination convertDestination(String name) throws ProtocolException { public ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws ProtocolException {
if (name == null) { if (name == null) {
return null; return null;
} else if (name.startsWith("/queue/")) { } else if (name.startsWith("/queue/")) {
@ -113,12 +120,16 @@ public class LegacyFrameTranslator implements FrameTranslator {
} else if (name.startsWith("/topic/")) { } else if (name.startsWith("/topic/")) {
String tName = name.substring("/topic/".length(), name.length()); String tName = name.substring("/topic/".length(), name.length());
return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE); return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
} else if (name.startsWith("/temp-queue/")) { } else if (name.startsWith("/remote-temp-queue/")) {
String tName = name.substring("/temp-queue/".length(), name.length()); String tName = name.substring("/remote-temp-queue/".length(), name.length());
return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE); return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
} else if (name.startsWith("/temp-topic/")) { } else if (name.startsWith("/remote-temp-topic/")) {
String tName = name.substring("/temp-topic/".length(), name.length()); String tName = name.substring("/remote-temp-topic/".length(), name.length());
return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE); return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
} else if (name.startsWith("/temp-queue/")) {
return converter.createTempQueue(name);
} else if (name.startsWith("/temp-topic/")) {
return converter.createTempTopic(name);
} else { } else {
throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
+ "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/"); + "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");

View File

@ -25,15 +25,19 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
@ -65,9 +69,12 @@ public class ProtocolConverter {
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>(); private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>(); private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
private final StompTransportFilter transportFilter; private final StompTransportFilter transportFilter;
@ -325,7 +332,7 @@ public class ProtocolConverter {
String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID); String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION); String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
ActiveMQDestination actualDest = frameTranslator.convertDestination(destination); ActiveMQDestination actualDest = frameTranslator.convertDestination(this, destination);
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id); ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setPrefetchSize(1000); consumerInfo.setPrefetchSize(1000);
@ -336,7 +343,7 @@ public class ProtocolConverter {
IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
consumerInfo.setDestination(frameTranslator.convertDestination(destination)); consumerInfo.setDestination(frameTranslator.convertDestination(this, destination));
StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo); StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo);
stompSubscription.setDestination(actualDest); stompSubscription.setDestination(actualDest);
@ -360,7 +367,7 @@ public class ProtocolConverter {
ActiveMQDestination destination = null; ActiveMQDestination destination = null;
Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION); Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
if (o != null) { if (o != null) {
destination = frameTranslator.convertDestination((String)o); destination = frameTranslator.convertDestination(this, (String)o);
} }
String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID); String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
@ -489,15 +496,40 @@ public class ProtocolConverter {
} }
public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException { public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
ActiveMQMessage msg = frameTranslator.convertFrame(command); ActiveMQMessage msg = frameTranslator.convertFrame(this, command);
return msg; return msg;
} }
public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException { public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException {
return frameTranslator.convertMessage(message); return frameTranslator.convertMessage(this, message);
} }
public StompTransportFilter getTransportFilter() { public StompTransportFilter getTransportFilter() {
return transportFilter; return transportFilter;
} }
public ActiveMQDestination createTempQueue(String name) {
ActiveMQDestination rc = tempDestinations.get(name);
if( rc == null ) {
rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
tempDestinations.put(name, rc);
}
return rc;
}
public ActiveMQDestination createTempTopic(String name) {
ActiveMQDestination rc = tempDestinations.get(name);
if( rc == null ) {
rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
tempDestinations.put(name, rc);
tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
}
return rc;
}
public String getCreatedTempDestinationName(ActiveMQDestination destination) {
return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
}
} }