Initial work on making the stomp frame and destination conversion in and out of activemq pluggable. The only implementation is the 4.0 style, but adding others is doable now.

Configuration is done on a per-TransportFactory basis right now, this might need some tweaking. Will discuss on list


git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@453497 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brian McCallister 2006-10-06 04:55:04 +00:00
parent cc1a9be9c8
commit f8485e3b36
6 changed files with 333 additions and 256 deletions

View File

@ -0,0 +1,109 @@
package org.apache.activemq.transport.stomp;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQDestination;
import javax.jms.JMSException;
import javax.jms.Destination;
import java.io.IOException;
import java.util.Map;
import java.util.HashMap;
/**
* Implementations of this interface are used to map back and forth from Stomp to ActiveMQ.
* There are several standard mappings which are semantically the same, the inner class,
* Helper, provides functions to copy those properties from one to the other
*/
public interface FrameTranslator
{
public ActiveMQMessage convertFrame(StompFrame frame) throws JMSException, ProtocolException;
public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException;
public String convertDestination(Destination d);
public ActiveMQDestination convertDestination(String name) throws ProtocolException;
/**
* Helper class which holds commonly needed functions used when implementing
* FrameTranslators
*/
public final static class Helper
{
public static void copyStandardHeadersFromMessageToFrame(ActiveMQMessage message,
StompFrame command,
FrameTranslator ft)
throws IOException
{
final Map headers = command.getHeaders();
headers.put(Stomp.Headers.Message.DESTINATION, ft.convertDestination(message.getDestination()));
headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
if (message.getJMSCorrelationID() != null) {
headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID());
}
headers.put(Stomp.Headers.Message.EXPIRATION_TIME, ""+message.getJMSExpiration());
if (message.getJMSRedelivered()) {
headers.put(Stomp.Headers.Message.REDELIVERED, "true");
}
headers.put(Stomp.Headers.Message.PRORITY, ""+message.getJMSPriority());
if (message.getJMSReplyTo() != null) {
headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertDestination(message.getJMSReplyTo()));
}
headers.put(Stomp.Headers.Message.TIMESTAMP, ""+message.getJMSTimestamp());
if (message.getJMSType() != null) {
headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
}
// now lets add all the message headers
final Map properties = message.getProperties();
if (properties != null) {
headers.putAll(properties);
}
}
public static void copyStandardHeadersFromFrameToMessage(StompFrame command,
ActiveMQMessage msg,
FrameTranslator ft)
throws ProtocolException, JMSException
{
final Map headers = new HashMap(command.getHeaders());
final String destination = (String) headers.remove(Stomp.Headers.Send.DESTINATION);
msg.setDestination( ft.convertDestination(destination));
// the standard JMS headers
msg.setJMSCorrelationID((String) headers.remove(Stomp.Headers.Send.CORRELATION_ID));
Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
if (o != null) {
msg.setJMSExpiration(Long.parseLong((String) o));
}
o = headers.remove(Stomp.Headers.Send.PRIORITY);
if (o != null) {
msg.setJMSPriority(Integer.parseInt((String) o));
}
o = headers.remove(Stomp.Headers.Send.TYPE);
if (o != null) {
msg.setJMSType((String) o);
}
o = headers.remove(Stomp.Headers.Send.REPLY_TO);
if (o != null) {
msg.setJMSReplyTo(ft.convertDestination((String) o));
}
o = headers.remove(Stomp.Headers.Send.PERSISTENT);
if (o != null) {
msg.setPersistent("true".equals(o));
}
// now the general headers
msg.setProperties(headers);
}
}
}

View File

@ -0,0 +1,101 @@
package org.apache.activemq.transport.stomp;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQDestination;
import javax.jms.JMSException;
import javax.jms.Destination;
import java.util.Map;
import java.util.HashMap;
import java.io.IOException;
/**
* Implements ActiveMQ 4.0 translations
*/
public class LegacyFrameTranslator implements FrameTranslator
{
public ActiveMQMessage convertFrame(StompFrame command) throws JMSException, ProtocolException {
final Map headers = command.getHeaders();
final ActiveMQMessage msg;
if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
headers.remove(Stomp.Headers.CONTENT_LENGTH);
ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
bm.writeBytes(command.getContent());
msg = bm;
} else {
ActiveMQTextMessage text = new ActiveMQTextMessage();
try {
text.setText(new String(command.getContent(), "UTF-8"));
}
catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}
msg = text;
}
FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(command, msg, this);
return msg;
}
public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException {
StompFrame command = new StompFrame();
command.setAction(Stomp.Responses.MESSAGE);
Map headers = new HashMap(25);
command.setHeaders(headers);
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(message, command, this);
if( message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) {
ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
command.setContent(msg.getText().getBytes("UTF-8"));
} else if( message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE ) {
ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
byte[] data = new byte[(int)msg.getBodyLength()];
msg.readBytes(data);
headers.put(Stomp.Headers.CONTENT_LENGTH, ""+data.length);
command.setContent(data);
}
return command;
}
public String convertDestination(Destination d) {
if (d == null) {
return null;
}
ActiveMQDestination amq_d = (ActiveMQDestination) d;
String p_name = amq_d.getPhysicalName();
StringBuffer buffer = new StringBuffer();
if (amq_d.isQueue()) {
buffer.append("/queue/");
}
if (amq_d.isTopic()) {
buffer.append("/topic/");
}
buffer.append(p_name);
return buffer.toString();
}
public ActiveMQDestination convertDestination(String name) throws ProtocolException {
if (name == null) {
return null;
}
else if (name.startsWith("/queue/")) {
String q_name = name.substring("/queue/".length(), name.length());
return ActiveMQDestination.createDestination(q_name, ActiveMQDestination.QUEUE_TYPE);
}
else if (name.startsWith("/topic/")) {
String t_name = name.substring("/topic/".length(), name.length());
return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TOPIC_TYPE);
}
else {
throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " +
"must begine with /queue/ or /topic/");
}
}
}

View File

@ -57,28 +57,35 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/** /**
* *
* @author <a href="http://hiramchirino.com">chirino</a> * @author <a href="http://hiramchirino.com">chirino</a>
*/ */
public class ProtocolConverter { public class ProtocolConverter {
private static final IdGenerator connectionIdGenerator = new IdGenerator(); private static final IdGenerator connectionIdGenerator = new IdGenerator();
private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId()); private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1); private final SessionId sessionId = new SessionId(connectionId, -1);
private final ProducerId producerId = new ProducerId(sessionId, 1); private final ProducerId producerId = new ProducerId(sessionId, 1);
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap(); private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap();
private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap();
private final Map transactions = new ConcurrentHashMap(); private final Map transactions = new ConcurrentHashMap();
private StompTransportFilter transportFilter; private final StompTransportFilter transportFilter;
private final Object commnadIdMutex = new Object(); private final Object commnadIdMutex = new Object();
private int lastCommandId; private int lastCommandId;
private final AtomicBoolean connected = new AtomicBoolean(false); private final AtomicBoolean connected = new AtomicBoolean(false);
private final FrameTranslator frameTranslator;
public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator)
{
this.transportFilter = stompTransportFilter;
this.frameTranslator = translator;
}
protected int generateCommandId() { protected int generateCommandId() {
synchronized(commnadIdMutex){ synchronized(commnadIdMutex){
@ -102,7 +109,7 @@ public class ProtocolConverter {
} }
return null; return null;
} }
protected void sendToActiveMQ(Command command, ResponseHandler handler) { protected void sendToActiveMQ(Command command, ResponseHandler handler) {
command.setCommandId(generateCommandId()); command.setCommandId(generateCommandId());
if(handler!=null) { if(handler!=null) {
@ -122,11 +129,11 @@ public class ProtocolConverter {
*/ */
public void onStompCommad( StompFrame command ) throws IOException, JMSException { public void onStompCommad( StompFrame command ) throws IOException, JMSException {
try { try {
if( command.getClass() == StompFrameError.class ) { if( command.getClass() == StompFrameError.class ) {
throw ((StompFrameError)command).getException(); throw ((StompFrameError)command).getException();
} }
String action = command.getAction(); String action = command.getAction();
if (action.startsWith(Stomp.Commands.SEND)) if (action.startsWith(Stomp.Commands.SEND))
onStompSend(command); onStompSend(command);
@ -148,9 +155,9 @@ public class ProtocolConverter {
onStompDisconnect(command); onStompDisconnect(command);
else else
throw new ProtocolException("Unknown STOMP action: "+action); throw new ProtocolException("Unknown STOMP action: "+action);
} catch (ProtocolException e) { } catch (ProtocolException e) {
// Let the stomp client know about any protocol errors. // Let the stomp client know about any protocol errors.
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,"UTF-8")); PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,"UTF-8"));
@ -159,20 +166,20 @@ public class ProtocolConverter {
HashMap headers = new HashMap(); HashMap headers = new HashMap();
headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage()); headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
if( receiptId != null ) { if( receiptId != null ) {
headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
} }
StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR,headers,baos.toByteArray()); StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR,headers,baos.toByteArray());
sendToStomp(errorMessage); sendToStomp(errorMessage);
if( e.isFatal() ) if( e.isFatal() )
getTransportFilter().onException(e); getTransportFilter().onException(e);
} }
} }
protected void onStompSend(StompFrame command) throws IOException, JMSException { protected void onStompSend(StompFrame command) throws IOException, JMSException {
checkConnected(); checkConnected();
@ -192,12 +199,12 @@ public class ProtocolConverter {
throw new ProtocolException("Invalid transaction id: "+stompTx); throw new ProtocolException("Invalid transaction id: "+stompTx);
message.setTransactionId(activemqTx); message.setTransactionId(activemqTx);
} }
message.onSend(); message.onSend();
sendToActiveMQ(message, createResponseHandler(command)); sendToActiveMQ(message, createResponseHandler(command));
} }
protected void onStompAck(StompFrame command) throws ProtocolException { protected void onStompAck(StompFrame command) throws ProtocolException {
checkConnected(); checkConnected();
@ -205,7 +212,7 @@ public class ProtocolConverter {
// TODO: acking with just a message id is very bogus // TODO: acking with just a message id is very bogus
// since the same message id could have been sent to 2 different subscriptions // since the same message id could have been sent to 2 different subscriptions
// on the same stomp connection. For example, when 2 subs are created on the same topic. // on the same stomp connection. For example, when 2 subs are created on the same topic.
Map headers = command.getHeaders(); Map headers = command.getHeaders();
String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID); String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID);
if (messageId == null) if (messageId == null)
@ -230,98 +237,94 @@ public class ProtocolConverter {
break; break;
} }
} }
if( !acked ) if( !acked )
throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]"); throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
} }
protected void onStompBegin(StompFrame command) throws ProtocolException { protected void onStompBegin(StompFrame command) throws ProtocolException {
checkConnected(); checkConnected();
Map headers = command.getHeaders(); Map headers = command.getHeaders();
String stompTx = (String)headers.get(Stomp.Headers.TRANSACTION); String stompTx = (String)headers.get(Stomp.Headers.TRANSACTION);
if (!headers.containsKey(Stomp.Headers.TRANSACTION)) { if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
throw new ProtocolException("Must specify the transaction you are beginning"); throw new ProtocolException("Must specify the transaction you are beginning");
} }
if( transactions.get(stompTx)!=null ) { if( transactions.get(stompTx)!=null ) {
throw new ProtocolException("The transaction was allready started: "+stompTx); throw new ProtocolException("The transaction was allready started: "+stompTx);
} }
LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId()); LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
transactions.put(stompTx, activemqTx); transactions.put(stompTx, activemqTx);
TransactionInfo tx = new TransactionInfo(); TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId); tx.setConnectionId(connectionId);
tx.setTransactionId(activemqTx); tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.BEGIN); tx.setType(TransactionInfo.BEGIN);
sendToActiveMQ(tx, createResponseHandler(command)); sendToActiveMQ(tx, createResponseHandler(command));
} }
protected void onStompCommit(StompFrame command) throws ProtocolException { protected void onStompCommit(StompFrame command) throws ProtocolException {
checkConnected(); checkConnected();
Map headers = command.getHeaders(); Map headers = command.getHeaders();
String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION); String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
if (stompTx==null) { if (stompTx==null) {
throw new ProtocolException("Must specify the transaction you are committing"); throw new ProtocolException("Must specify the transaction you are committing");
} }
TransactionId activemqTx=null; TransactionId activemqTx = (TransactionId) transactions.remove(stompTx);
if (stompTx!=null) { if (activemqTx == null) {
activemqTx = (TransactionId) transactions.remove(stompTx); throw new ProtocolException("Invalid transaction id: "+stompTx);
if (activemqTx == null)
throw new ProtocolException("Invalid transaction id: "+stompTx);
} }
TransactionInfo tx = new TransactionInfo(); TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId); tx.setConnectionId(connectionId);
tx.setTransactionId(activemqTx); tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.COMMIT_ONE_PHASE); tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
sendToActiveMQ(tx, createResponseHandler(command)); sendToActiveMQ(tx, createResponseHandler(command));
} }
protected void onStompAbort(StompFrame command) throws ProtocolException { protected void onStompAbort(StompFrame command) throws ProtocolException {
checkConnected(); checkConnected();
Map headers = command.getHeaders(); Map headers = command.getHeaders();
String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION); String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
if (stompTx==null) { if (stompTx==null) {
throw new ProtocolException("Must specify the transaction you are committing"); throw new ProtocolException("Must specify the transaction you are committing");
} }
TransactionId activemqTx=null; TransactionId activemqTx = (TransactionId) transactions.remove(stompTx);
if (stompTx!=null) { if (activemqTx == null) {
activemqTx = (TransactionId) transactions.remove(stompTx); throw new ProtocolException("Invalid transaction id: "+stompTx);
if (activemqTx == null)
throw new ProtocolException("Invalid transaction id: "+stompTx);
} }
TransactionInfo tx = new TransactionInfo(); TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId); tx.setConnectionId(connectionId);
tx.setTransactionId(activemqTx); tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.ROLLBACK); tx.setType(TransactionInfo.ROLLBACK);
sendToActiveMQ(tx, createResponseHandler(command)); sendToActiveMQ(tx, createResponseHandler(command));
} }
protected void onStompSubscribe(StompFrame command) throws ProtocolException { protected void onStompSubscribe(StompFrame command) throws ProtocolException {
checkConnected(); checkConnected();
Map headers = command.getHeaders(); Map headers = command.getHeaders();
String subscriptionId = (String)headers.get(Stomp.Headers.Subscribe.ID); String subscriptionId = (String)headers.get(Stomp.Headers.Subscribe.ID);
String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION); String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
ActiveMQDestination actual_dest = convertDestination(destination); ActiveMQDestination actual_dest = frameTranslator.convertDestination(destination);
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id); ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setPrefetchSize(1000); consumerInfo.setPrefetchSize(1000);
@ -329,14 +332,14 @@ public class ProtocolConverter {
String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR); String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR);
consumerInfo.setSelector(selector); consumerInfo.setSelector(selector);
IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
consumerInfo.setDestination(convertDestination(destination)); consumerInfo.setDestination(frameTranslator.convertDestination(destination));
StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo); StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo);
stompSubscription.setDestination(actual_dest); stompSubscription.setDestination(actual_dest);
String ackMode = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE); String ackMode = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) { if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
stompSubscription.setAckMode(StompSubscription.CLIENT_ACK); stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
@ -346,7 +349,7 @@ public class ProtocolConverter {
subscriptionsByConsumerId.put(id, stompSubscription); subscriptionsByConsumerId.put(id, stompSubscription);
sendToActiveMQ(consumerInfo, createResponseHandler(command)); sendToActiveMQ(consumerInfo, createResponseHandler(command));
} }
protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
@ -355,11 +358,11 @@ public class ProtocolConverter {
ActiveMQDestination destination=null; ActiveMQDestination destination=null;
Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION); Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
if( o!=null ) if( o!=null )
destination =convertDestination((String) o); destination = frameTranslator.convertDestination((String) o);
String subscriptionId = (String)headers.get(Stomp.Headers.Unsubscribe.ID); String subscriptionId = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
if (subscriptionId==null && destination==null) { if (subscriptionId==null && destination==null) {
throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from"); throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
} }
@ -369,7 +372,7 @@ public class ProtocolConverter {
// //
for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = (StompSubscription) iter.next(); StompSubscription sub = (StompSubscription) iter.next();
if ( if (
(subscriptionId!=null && subscriptionId.equals(sub.getSubscriptionId()) ) || (subscriptionId!=null && subscriptionId.equals(sub.getSubscriptionId()) ) ||
(destination!=null && destination.equals(sub.getDestination()) ) (destination!=null && destination.equals(sub.getDestination()) )
) { ) {
@ -377,7 +380,7 @@ public class ProtocolConverter {
return; return;
} }
} }
throw new ProtocolException("No subscription matched."); throw new ProtocolException("No subscription matched.");
} }
@ -388,56 +391,56 @@ public class ProtocolConverter {
} }
final Map headers = command.getHeaders(); final Map headers = command.getHeaders();
// allow anyone to login for now // allow anyone to login for now
String login = (String)headers.get(Stomp.Headers.Connect.LOGIN); String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE); String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
String clientId = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID); String clientId = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
final ConnectionInfo connectionInfo = new ConnectionInfo(); final ConnectionInfo connectionInfo = new ConnectionInfo();
IntrospectionSupport.setProperties(connectionInfo, headers, "activemq."); IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
connectionInfo.setConnectionId(connectionId); connectionInfo.setConnectionId(connectionId);
if( clientId!=null ) if( clientId!=null )
connectionInfo.setClientId(clientId); connectionInfo.setClientId(clientId);
else else
connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString()); connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString());
connectionInfo.setResponseRequired(true); connectionInfo.setResponseRequired(true);
connectionInfo.setUserName(login); connectionInfo.setUserName(login);
connectionInfo.setPassword(passcode); connectionInfo.setPassword(passcode);
sendToActiveMQ(connectionInfo, new ResponseHandler(){ sendToActiveMQ(connectionInfo, new ResponseHandler(){
public void onResponse(ProtocolConverter converter, Response response) throws IOException { public void onResponse(ProtocolConverter converter, Response response) throws IOException {
final SessionInfo sessionInfo = new SessionInfo(sessionId); final SessionInfo sessionInfo = new SessionInfo(sessionId);
sendToActiveMQ(sessionInfo,null); sendToActiveMQ(sessionInfo,null);
final ProducerInfo producerInfo = new ProducerInfo(producerId); final ProducerInfo producerInfo = new ProducerInfo(producerId);
sendToActiveMQ(producerInfo,new ResponseHandler(){ sendToActiveMQ(producerInfo,new ResponseHandler(){
public void onResponse(ProtocolConverter converter, Response response) throws IOException { public void onResponse(ProtocolConverter converter, Response response) throws IOException {
connected.set(true); connected.set(true);
HashMap responseHeaders = new HashMap(); HashMap responseHeaders = new HashMap();
responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId()); responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
String requestId = (String) headers.get(Stomp.Headers.Connect.REQUEST_ID); String requestId = (String) headers.get(Stomp.Headers.Connect.REQUEST_ID);
if( requestId !=null ){ if( requestId !=null ){
responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId); responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
} }
StompFrame sc = new StompFrame(); StompFrame sc = new StompFrame();
sc.setAction(Stomp.Responses.CONNECTED); sc.setAction(Stomp.Responses.CONNECTED);
sc.setHeaders(responseHeaders); sc.setHeaders(responseHeaders);
sendToStomp(sc); sendToStomp(sc);
} }
}); });
} }
}); });
} }
protected void onStompDisconnect(StompFrame command) throws ProtocolException { protected void onStompDisconnect(StompFrame command) throws ProtocolException {
@ -454,182 +457,42 @@ public class ProtocolConverter {
} }
/** /**
* Convert a ActiveMQ command * Dispatch a ActiveMQ command
* @param command * @param command
* @throws IOException * @throws IOException
*/ */
public void onActiveMQCommad( Command command ) throws IOException, JMSException { public void onActiveMQCommad( Command command ) throws IOException, JMSException {
if ( command.isResponse() ) { if ( command.isResponse() ) {
Response response = (Response) command; Response response = (Response) command;
ResponseHandler rh = (ResponseHandler) resposeHandlers.remove(new Integer(response.getCorrelationId())); ResponseHandler rh = (ResponseHandler) resposeHandlers.remove(new Integer(response.getCorrelationId()));
if( rh !=null ) { if( rh !=null ) {
rh.onResponse(this, response); rh.onResponse(this, response);
} }
} else if( command.isMessageDispatch() ) { } else if( command.isMessageDispatch() ) {
MessageDispatch md = (MessageDispatch)command; MessageDispatch md = (MessageDispatch)command;
StompSubscription sub = (StompSubscription) subscriptionsByConsumerId.get(md.getConsumerId()); StompSubscription sub = (StompSubscription) subscriptionsByConsumerId.get(md.getConsumerId());
if (sub != null) if (sub != null) {
sub.onMessageDispatch(md); sub.onMessageDispatch(md);
}
} }
} }
public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException { public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
Map headers = command.getHeaders();
// now the body
ActiveMQMessage msg;
if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
headers.remove(Stomp.Headers.CONTENT_LENGTH);
ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
bm.writeBytes(command.getContent());
msg = bm;
} else {
ActiveMQTextMessage text = new ActiveMQTextMessage();
try {
text.setText(new String(command.getContent(), "UTF-8"));
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: "+e, false, e);
}
msg = text;
}
String destination = (String) headers.remove(Stomp.Headers.Send.DESTINATION); ActiveMQMessage msg = frameTranslator.convertFrame(command);
msg.setDestination(convertDestination(destination));
// the standard JMS headers
msg.setJMSCorrelationID((String) headers.remove(Stomp.Headers.Send.CORRELATION_ID));
Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
if (o != null) {
msg.setJMSExpiration(Long.parseLong((String) o));
}
o = headers.remove(Stomp.Headers.Send.PRIORITY);
if (o != null) {
msg.setJMSPriority(Integer.parseInt((String)o));
}
o = headers.remove(Stomp.Headers.Send.TYPE);
if (o != null) {
msg.setJMSType((String) o);
}
o = headers.remove(Stomp.Headers.Send.REPLY_TO);
if( o!=null ) {
msg.setJMSReplyTo(convertDestination((String)o));
}
o = headers.remove(Stomp.Headers.Send.PERSISTENT);
if (o != null) {
msg.setPersistent("true".equals(o));
}
// now the general headers
msg.setProperties(headers);
return msg;
}
public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException {
StompFrame command = new StompFrame();
command.setAction(Stomp.Responses.MESSAGE);
HashMap headers = new HashMap();
command.setHeaders(headers);
headers.put(Stomp.Headers.Message.DESTINATION, convertDestination(message.getDestination()));
headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
if (message.getJMSCorrelationID() != null) {
headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID());
}
headers.put(Stomp.Headers.Message.EXPIRATION_TIME, ""+message.getJMSExpiration());
if (message.getJMSRedelivered()) {
headers.put(Stomp.Headers.Message.REDELIVERED, "true");
}
headers.put(Stomp.Headers.Message.PRORITY, ""+message.getJMSPriority());
if (message.getJMSReplyTo() != null) {
headers.put(Stomp.Headers.Message.REPLY_TO, convertDestination(message.getJMSReplyTo()));
}
headers.put(Stomp.Headers.Message.TIMESTAMP, ""+message.getJMSTimestamp());
if (message.getJMSType() != null) {
headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
}
// now lets add all the message headers
Map properties = message.getProperties();
if (properties != null) {
headers.putAll(properties);
}
if( message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) {
ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
command.setContent(msg.getText().getBytes("UTF-8"));
} else if( message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE ) {
ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
byte[] data = new byte[(int)msg.getBodyLength()];
msg.readBytes(data);
headers.put(Stomp.Headers.CONTENT_LENGTH, ""+data.length);
command.setContent(data);
}
return command;
}
protected ActiveMQDestination convertDestination(String name) throws ProtocolException {
if (name == null) {
return null;
}
else if (name.startsWith("/queue/")) {
String q_name = name.substring("/queue/".length(), name.length());
return ActiveMQDestination.createDestination(q_name, ActiveMQDestination.QUEUE_TYPE);
}
else if (name.startsWith("/topic/")) {
String t_name = name.substring("/topic/".length(), name.length());
return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TOPIC_TYPE);
}
else {
throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " + "must begine with /queue/ or /topic/");
}
return msg;
} }
protected String convertDestination(Destination d) { public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException {
if (d == null) { return frameTranslator.convertMessage(message);
return null;
}
ActiveMQDestination amq_d = (ActiveMQDestination) d;
String p_name = amq_d.getPhysicalName();
StringBuffer buffer = new StringBuffer();
if (amq_d.isQueue()) {
buffer.append("/queue/");
}
if (amq_d.isTopic()) {
buffer.append("/topic/");
}
buffer.append(p_name);
return buffer.toString();
} }
public StompTransportFilter getTransportFilter() { public StompTransportFilter getTransportFilter() {
return transportFilter; return transportFilter;
} }
public void setTransportFilter(StompTransportFilter transportFilter) {
this.transportFilter = transportFilter;
}
} }

View File

@ -25,7 +25,7 @@ import org.apache.activemq.wireformat.WireFormat;
/** /**
* A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory * A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
* *
* @version $Revision: 1.1.1.1 $ * @version $Revision: 1.1.1.1 $
*/ */
public class StompTransportFactory extends TcpTransportFactory { public class StompTransportFactory extends TcpTransportFactory {
@ -33,9 +33,9 @@ public class StompTransportFactory extends TcpTransportFactory {
protected String getDefaultWireFormatType() { protected String getDefaultWireFormatType() {
return "stomp"; return "stomp";
} }
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new StompTransportFilter(transport); transport = new StompTransportFilter(transport, new LegacyFrameTranslator());
return super.compositeConfigure(transport, format, options); return super.compositeConfigure(transport, format, options);
} }
} }

View File

@ -30,22 +30,25 @@ import org.apache.activemq.util.IOExceptionSupport;
* The StompTransportFilter normally sits on top of a TcpTransport * The StompTransportFilter normally sits on top of a TcpTransport
* that has been configured with the StompWireFormat and is used to * that has been configured with the StompWireFormat and is used to
* convert STOMP commands to ActiveMQ commands. * convert STOMP commands to ActiveMQ commands.
* *
* All of the coversion work is done by delegating to the ProtocolConverter. * All of the coversion work is done by delegating to the ProtocolConverter.
* *
* @author <a href="http://hiramchirino.com">chirino</a> * @author <a href="http://hiramchirino.com">chirino</a>
*/ */
public class StompTransportFilter extends TransportFilter { public class StompTransportFilter extends TransportFilter {
ProtocolConverter protocolConverter = new ProtocolConverter(); private final ProtocolConverter protocolConverter;
private final Object sendToActiveMQMutex = new Object(); private final Object sendToActiveMQMutex = new Object();
private final Object sendToStompMutex = new Object(); private final Object sendToStompMutex = new Object();
public StompTransportFilter(Transport next) { private final FrameTranslator frameTranslator;
public StompTransportFilter(Transport next, FrameTranslator translator) {
super(next); super(next);
protocolConverter.setTransportFilter(this); this.frameTranslator = translator;
} this.protocolConverter = new ProtocolConverter(this, translator);
}
public void oneway(Command command) throws IOException { public void oneway(Command command) throws IOException {
try { try {
@ -54,7 +57,7 @@ public class StompTransportFilter extends TransportFilter {
throw IOExceptionSupport.create(e); throw IOExceptionSupport.create(e);
} }
} }
public void onCommand(Command command) { public void onCommand(Command command) {
try { try {
protocolConverter.onStompCommad((StompFrame) command); protocolConverter.onStompCommad((StompFrame) command);
@ -64,17 +67,21 @@ public class StompTransportFilter extends TransportFilter {
onException(IOExceptionSupport.create(e)); onException(IOExceptionSupport.create(e));
} }
} }
public void sendToActiveMQ(Command command) { public void sendToActiveMQ(Command command) {
synchronized(sendToActiveMQMutex) { synchronized(sendToActiveMQMutex) {
transportListener.onCommand(command); transportListener.onCommand(command);
} }
} }
public void sendToStomp(StompFrame command) throws IOException { public void sendToStomp(StompFrame command) throws IOException {
synchronized(sendToStompMutex) { synchronized(sendToStompMutex) {
next.oneway(command); next.oneway(command);
} }
} }
public FrameTranslator getFrameTranslator()
{
return frameTranslator;
}
} }

View File

@ -40,7 +40,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
/** /**
* *
* @version $Revision$ * @version $Revision$
*/ */
public class StompSubscriptionRemoveTest extends TestCase { public class StompSubscriptionRemoveTest extends TestCase {
@ -49,10 +49,7 @@ public class StompSubscriptionRemoveTest extends TestCase {
private Socket stompSocket; private Socket stompSocket;
private ByteArrayOutputStream inputBuffer; private ByteArrayOutputStream inputBuffer;
/**
* @param args
* @throws Exception
*/
public void testRemoveSubscriber() throws Exception { public void testRemoveSubscriber() throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setPersistent(false); broker.setPersistent(false);
@ -115,7 +112,7 @@ public class StompSubscriptionRemoveTest extends TestCase {
++messagesCount; ++messagesCount;
++count; ++count;
} }
sendFrame("DISCONNECT\n\n"); sendFrame("DISCONNECT\n\n");
Thread.sleep(1000); Thread.sleep(1000);
stompSocket.close(); stompSocket.close();
@ -127,7 +124,7 @@ public class StompSubscriptionRemoveTest extends TestCase {
sendFrame(connect_frame); sendFrame(connect_frame);
f = receiveFrame(5000); f = receiveFrame(5000);
frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n"; frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n";
sendFrame(frame); sendFrame(frame);
try { try {
@ -147,7 +144,7 @@ public class StompSubscriptionRemoveTest extends TestCase {
} }
} }
} }
line = input.readLine(); line = input.readLine();
if (line == null) { if (line == null) {
throw new IOException("connection was closed"); throw new IOException("connection was closed");
@ -166,11 +163,11 @@ public class StompSubscriptionRemoveTest extends TestCase {
catch (IOException ex) { catch (IOException ex) {
ex.printStackTrace(); ex.printStackTrace();
} }
sendFrame("DISCONNECT\n\n"); sendFrame("DISCONNECT\n\n");
stompSocket.close(); stompSocket.close();
broker.stop(); broker.stop();
log.info("Total messages received: " + messagesCount); log.info("Total messages received: " + messagesCount);
assertTrue("Messages received after connection loss: " + messagesCount, messagesCount >= 2000); assertTrue("Messages received after connection loss: " + messagesCount, messagesCount >= 2000);