Added a new/highly refactored version of the STOMP protocol implementation.

The biggest difference between this and previous implementation is that conversion between the STOMP protocol and 
the ActiveMQ protocol happens at a Transport Filter layer instead of doing it all at the WireFormat layer.

I think this has resulted in cleaner code since there's a better seperating between marshalling/unmarshalling of 
the STOMP protocol and converting the stomp protocol to the activemq protocol.



git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@418550 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-07-02 04:18:44 +00:00
parent 6d550302a2
commit dc98d967b5
10 changed files with 1308 additions and 2 deletions

View File

@ -0,0 +1,627 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp2;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ProtocolException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
public class ProtocolConverter {
private static final IdGenerator connectionIdGenerator = new IdGenerator();
private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1);
private final ProducerId producerId = new ProducerId(sessionId, 1);
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap();
private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap();
private final Map transactions = new ConcurrentHashMap();
private StompTransportFilter transportFilter;
private final Object commnadIdMutex = new Object();
private int lastCommandId;
private final AtomicBoolean connected = new AtomicBoolean(false);
protected int generateCommandId() {
synchronized(commnadIdMutex){
return lastCommandId++;
}
}
protected ResponseHandler createResponseHandler(StompCommand command){
final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
// A response may not be needed.
if( receiptId != null ) {
return new ResponseHandler() {
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
StompCommand sc = new StompCommand();
sc.setHeaders(new HashMap(5));
sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
transportFilter.sendToStomp(sc);
}
};
}
return null;
}
protected void sendToActiveMQ(Command command, ResponseHandler handler) {
command.setCommandId(generateCommandId());
if(handler!=null) {
command.setResponseRequired(true);
resposeHandlers.put(new Integer(command.getCommandId()), handler);
}
transportFilter.sendToActiveMQ(command);
}
protected void sendToStomp(StompCommand command) throws IOException {
transportFilter.sendToStomp(command);
}
/**
* Convert a stomp command
* @param command
*/
public void onStompCommad( StompCommand command ) throws IOException, JMSException {
try {
String action = command.getAction();
if (action.startsWith(Stomp.Commands.SEND))
onStompSend(command);
else if (action.startsWith(Stomp.Commands.ACK))
onStompAck(command);
else if (action.startsWith(Stomp.Commands.BEGIN))
onStompBegin(command);
else if (action.startsWith(Stomp.Commands.COMMIT))
onStompCommit(command);
else if (action.startsWith(Stomp.Commands.ABORT))
onStompAbort(command);
else if (action.startsWith(Stomp.Commands.SUBSCRIBE))
onStompSubscribe(command);
else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE))
onStompUnsubscribe(command);
else if (action.startsWith(Stomp.Commands.CONNECT))
onStompConnect(command);
else if (action.startsWith(Stomp.Commands.DISCONNECT))
onStompDisconnect(command);
else
throw new ProtocolException("Unknown STOMP action: "+action);
} catch (ProtocolException e) {
// Let the stomp client know about any protocol errors.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,"UTF-8"));
e.printStackTrace(stream);
stream.close();
HashMap headers = new HashMap();
headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
if( receiptId != null ) {
headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
}
StompCommand errorMessage = new StompCommand(Stomp.Responses.ERROR,headers,baos.toByteArray());
sendToStomp(errorMessage);
}
}
protected void onStompSend(StompCommand command) throws IOException, JMSException {
checkConnected();
Map headers = command.getHeaders();
String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
ActiveMQMessage message = convertMessage(command);
message.setProducerId(producerId);
MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
message.setMessageId(id);
message.setJMSTimestamp(System.currentTimeMillis());
if (stompTx!=null) {
TransactionId activemqTx = (TransactionId) transactions.get(stompTx);
if (activemqTx == null)
throw new ProtocolException("Invalid transaction id: "+stompTx);
message.setTransactionId(activemqTx);
}
message.onSend();
sendToActiveMQ(message, createResponseHandler(command));
}
protected void onStompAck(StompCommand command) throws ProtocolException {
checkConnected();
// TODO: acking with just a message id is very bogus
// since the same message id could have been sent to 2 different subscriptions
// on the same stomp connection. For example, when 2 subs are created on the same topic.
Map headers = command.getHeaders();
String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID);
if (messageId == null)
throw new ProtocolException("ACK received without a message-id to acknowledge!");
TransactionId activemqTx=null;
String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
if (stompTx!=null) {
activemqTx = (TransactionId) transactions.get(stompTx);
if (activemqTx == null)
throw new ProtocolException("Invalid transaction id: "+stompTx);
}
boolean acked=false;
for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = (StompSubscription) iter.next();
MessageAck ack = sub.onStompMessageAck(messageId);
if( ack!=null ) {
ack.setTransactionId(activemqTx);
sendToActiveMQ(ack,createResponseHandler(command));
acked=true;
break;
}
}
if( !acked )
throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
}
protected void onStompBegin(StompCommand command) throws ProtocolException {
checkConnected();
Map headers = command.getHeaders();
String stompTx = (String)headers.get(Stomp.Headers.TRANSACTION);
if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
throw new ProtocolException("Must specify the transaction you are beginning");
}
if( transactions.get(stompTx)!=null ) {
throw new ProtocolException("The transaction was allready started: "+stompTx);
}
LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
transactions.put(stompTx, activemqTx);
TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId);
tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.BEGIN);
sendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompCommit(StompCommand command) throws ProtocolException {
checkConnected();
Map headers = command.getHeaders();
String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
if (stompTx==null) {
throw new ProtocolException("Must specify the transaction you are committing");
}
TransactionId activemqTx=null;
if (stompTx!=null) {
activemqTx = (TransactionId) transactions.remove(stompTx);
if (activemqTx == null)
throw new ProtocolException("Invalid transaction id: "+stompTx);
}
TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId);
tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
sendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompAbort(StompCommand command) throws ProtocolException {
checkConnected();
Map headers = command.getHeaders();
String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
if (stompTx==null) {
throw new ProtocolException("Must specify the transaction you are committing");
}
TransactionId activemqTx=null;
if (stompTx!=null) {
activemqTx = (TransactionId) transactions.remove(stompTx);
if (activemqTx == null)
throw new ProtocolException("Invalid transaction id: "+stompTx);
}
TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId);
tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.ROLLBACK);
sendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompSubscribe(StompCommand command) throws ProtocolException {
checkConnected();
Map headers = command.getHeaders();
String subscriptionId = (String)headers.get(Stomp.Headers.Subscribe.ID);
String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
ActiveMQDestination actual_dest = convertDestination(destination);
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setPrefetchSize(1000);
consumerInfo.setDispatchAsync(true);
String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR);
consumerInfo.setSelector(selector);
IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
consumerInfo.setDestination(convertDestination(destination));
StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo);
stompSubscription.setDestination(actual_dest);
String ackMode = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
} else {
stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
}
subscriptionsByConsumerId.put(id, stompSubscription);
sendToActiveMQ(consumerInfo, createResponseHandler(command));
}
protected void onStompUnsubscribe(StompCommand command) throws ProtocolException {
checkConnected();
Map headers = command.getHeaders();
ActiveMQDestination destination=null;
Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
if( o!=null )
destination =convertDestination((String) o);
String subscriptionId = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
if (subscriptionId==null && destination==null) {
throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
}
// TODO: Unsubscribing using a destination is a bit wierd if multiple subscriptions
// are created with the same destination. Perhaps this should be removed.
//
for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = (StompSubscription) iter.next();
if (
(subscriptionId!=null && subscriptionId.equals(sub.getSubscriptionId()) ) ||
(destination!=null && destination.equals(sub.getDestination()) )
) {
sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
return;
}
}
throw new ProtocolException("No subscription matched.");
}
protected void onStompConnect(StompCommand command) throws ProtocolException {
if(connected.get()) {
throw new ProtocolException("Allready connected.");
}
final Map headers = command.getHeaders();
// allow anyone to login for now
String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
String clientId = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
final ConnectionInfo connectionInfo = new ConnectionInfo();
IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
connectionInfo.setConnectionId(connectionId);
if( clientId!=null )
connectionInfo.setClientId(clientId);
else
connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString());
connectionInfo.setResponseRequired(true);
connectionInfo.setUserName(login);
connectionInfo.setPassword(passcode);
sendToActiveMQ(connectionInfo, new ResponseHandler(){
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
final SessionInfo sessionInfo = new SessionInfo(sessionId);
sendToActiveMQ(sessionInfo,null);
final ProducerInfo producerInfo = new ProducerInfo(producerId);
sendToActiveMQ(producerInfo,new ResponseHandler(){
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
connected.set(true);
HashMap responseHeaders = new HashMap();
responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
String requestId = (String) headers.get(Stomp.Headers.Connect.REQUEST_ID);
if( requestId !=null ){
responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
}
StompCommand sc = new StompCommand();
sc.setAction(Stomp.Responses.CONNECTED);
sc.setHeaders(responseHeaders);
sendToStomp(sc);
}
});
}
});
}
protected void onStompDisconnect(StompCommand command) throws ProtocolException {
checkConnected();
sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
connected.set(false);
}
protected void checkConnected() throws ProtocolException {
if(!connected.get()) {
throw new ProtocolException("Not connected.");
}
}
/**
* Convert a ActiveMQ command
* @param command
* @throws IOException
*/
public void onActiveMQCommad( Command command ) throws IOException, JMSException {
if ( command.isResponse() ) {
Response response = (Response) command;
ResponseHandler rh = (ResponseHandler) resposeHandlers.remove(new Integer(response.getCorrelationId()));
if( rh !=null ) {
rh.onResponse(this, response);
}
} else if( command.isMessageDispatch() ) {
MessageDispatch md = (MessageDispatch)command;
StompSubscription sub = (StompSubscription) subscriptionsByConsumerId.get(md.getConsumerId());
if (sub != null)
sub.onMessageDispatch(md);
}
}
public ActiveMQMessage convertMessage(StompCommand command) throws IOException, JMSException {
Map headers = command.getHeaders();
// now the body
ActiveMQMessage msg;
if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
headers.remove(Stomp.Headers.CONTENT_LENGTH);
ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
bm.writeBytes(command.getContent());
msg = bm;
} else {
ActiveMQTextMessage text = new ActiveMQTextMessage();
try {
text.setText(new String(command.getContent(), "UTF-8"));
} catch (Throwable e) {
throw (ProtocolException)new ProtocolException("Text could not bet set: "+e).initCause(e);
}
msg = text;
}
String destination = (String) headers.remove(Stomp.Headers.Send.DESTINATION);
msg.setDestination(convertDestination(destination));
// the standard JMS headers
msg.setJMSCorrelationID((String) headers.remove(Stomp.Headers.Send.CORRELATION_ID));
Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
if (o != null) {
msg.setJMSExpiration(Long.parseLong((String) o));
}
o = headers.remove(Stomp.Headers.Send.PRIORITY);
if (o != null) {
msg.setJMSPriority(Integer.parseInt((String)o));
}
o = headers.remove(Stomp.Headers.Send.TYPE);
if (o != null) {
msg.setJMSType((String) o);
}
o = headers.remove(Stomp.Headers.Send.REPLY_TO);
if( o!=null ) {
msg.setJMSReplyTo(convertDestination((String)o));
}
o = headers.remove(Stomp.Headers.Send.PERSISTENT);
if (o != null) {
msg.setPersistent("true".equals(o));
}
// now the general headers
msg.setProperties(headers);
return msg;
}
public StompCommand convertMessage(ActiveMQMessage message) throws IOException, JMSException {
StompCommand command = new StompCommand();
command.setAction(Stomp.Responses.MESSAGE);
HashMap headers = new HashMap();
command.setHeaders(headers);
headers.put(Stomp.Headers.Message.DESTINATION, convertDestination(message.getDestination()));
headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID());
headers.put(Stomp.Headers.Message.EXPIRATION_TIME, ""+message.getJMSExpiration());
if (message.getJMSRedelivered()) {
headers.put(Stomp.Headers.Message.REDELIVERED, "true");
}
headers.put(Stomp.Headers.Message.PRORITY, ""+message.getJMSPriority());
headers.put(Stomp.Headers.Message.REPLY_TO, convertDestination(message.getJMSReplyTo()));
headers.put(Stomp.Headers.Message.TIMESTAMP, ""+message.getJMSTimestamp());
headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
// now lets add all the message headers
Map properties = message.getProperties();
if (properties != null) {
headers.putAll(properties);
}
if( message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) {
ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
command.setContent(msg.getText().getBytes("UTF-8"));
} else if( message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE ) {
ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
byte[] data = new byte[(int)msg.getBodyLength()];
msg.readBytes(data);
headers.put(Stomp.Headers.CONTENT_LENGTH, ""+data.length);
command.setContent(data);
}
return command;
}
protected ActiveMQDestination convertDestination(String name) throws ProtocolException {
if (name == null) {
return null;
}
else if (name.startsWith("/queue/")) {
String q_name = name.substring("/queue/".length(), name.length());
return ActiveMQDestination.createDestination(q_name, ActiveMQDestination.QUEUE_TYPE);
}
else if (name.startsWith("/topic/")) {
String t_name = name.substring("/topic/".length(), name.length());
return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TOPIC_TYPE);
}
else {
throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " + "must begine with /queue/ or /topic/");
}
}
protected String convertDestination(Destination d) {
if (d == null) {
return null;
}
ActiveMQDestination amq_d = (ActiveMQDestination) d;
String p_name = amq_d.getPhysicalName();
StringBuffer buffer = new StringBuffer();
if (amq_d.isQueue()) {
buffer.append("/queue/");
}
if (amq_d.isTopic()) {
buffer.append("/topic/");
}
buffer.append(p_name);
return buffer.toString();
}
public StompTransportFilter getTransportFilter() {
return transportFilter;
}
public void setTransportFilter(StompTransportFilter transportFilter) {
this.transportFilter = transportFilter;
}
public void onStompExcepton(IOException error) {
// TODO Auto-generated method stub
}
}

View File

@ -0,0 +1,30 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp2;
import java.io.IOException;
import org.apache.activemq.command.Response;
/**
* Interface used by the ProtocolConverter for callbacks.
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
interface ResponseHandler {
void onResponse(ProtocolConverter converter, Response response) throws IOException;
}

View File

@ -0,0 +1,149 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp2;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.Response;
import org.apache.activemq.state.CommandVisitor;
/**
* Represents all the data in a STOMP frame.
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
public class StompCommand implements Command {
private static final byte[] NO_DATA = new byte[]{};
private String action;
private Map headers = Collections.EMPTY_MAP;
private byte[] content = NO_DATA;
public StompCommand(String command, HashMap headers, byte[] data) {
this.action = command;
this.headers = headers;
this.content = data;
}
public StompCommand() {
}
public String getAction() {
return action;
}
public void setAction(String command) {
this.action = command;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] data) {
this.content = data;
}
public Map getHeaders() {
return headers;
}
public void setHeaders(Map headers) {
this.headers = headers;
}
//
// Methods in the Command interface
//
public int getCommandId() {
return 0;
}
public Endpoint getFrom() {
return null;
}
public Endpoint getTo() {
return null;
}
public boolean isBrokerInfo() {
return false;
}
public boolean isMessage() {
return false;
}
public boolean isMessageAck() {
return false;
}
public boolean isMessageDispatch() {
return false;
}
public boolean isMessageDispatchNotification() {
return false;
}
public boolean isResponse() {
return false;
}
public boolean isResponseRequired() {
return false;
}
public boolean isShutdownInfo() {
return false;
}
public boolean isWireFormatInfo() {
return false;
}
public void setCommandId(int value) {
}
public void setFrom(Endpoint from) {
}
public void setResponseRequired(boolean responseRequired) {
}
public void setTo(Endpoint to) {
}
public Response visit(CommandVisitor visitor) throws Exception {
return null;
}
public byte getDataStructureType() {
return 0;
}
public boolean isMarshallAware() {
return false;
}
}

View File

@ -0,0 +1,136 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp2;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.transport.stomp.Stomp;
/**
* Keeps track of the STOMP susbscription so that acking is correctly done.
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
public class StompSubscription {
public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
private final ProtocolConverter protocolConverter;
private final String subscriptionId;
private final ConsumerInfo consumerInfo;
private final LinkedHashMap dispatchedMessage = new LinkedHashMap();
private String ackMode = AUTO_ACK;
private ActiveMQDestination destination;
public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo) {
this.protocolConverter = stompTransport;
this.subscriptionId = subscriptionId;
this.consumerInfo = consumerInfo;
}
void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
ActiveMQMessage message = (ActiveMQMessage) md.getMessage();
if (ackMode == CLIENT_ACK) {
synchronized (this) {
dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId());
}
} else if (ackMode == AUTO_ACK) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
protocolConverter.getTransportFilter().sendToActiveMQ(ack);
}
StompCommand command = protocolConverter.convertMessage(message);
command.setAction(Stomp.Responses.MESSAGE);
if (subscriptionId!=null) {
command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
}
protocolConverter.getTransportFilter().sendToStomp(command);
}
synchronized MessageAck onStompMessageAck(String messageId) {
if( !dispatchedMessage.containsKey(messageId) ) {
return null;
}
MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination());
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setConsumerId(consumerInfo.getConsumerId());
int count=0;
for (Iterator iter = dispatchedMessage.keySet().iterator(); iter.hasNext();) {
String id = (String) iter.next();
if( ack.getFirstMessageId()==null )
ack.setFirstMessageId((MessageId) dispatchedMessage.get(id));
iter.remove();
count++;
if( id.equals(messageId) ) {
ack.setLastMessageId((MessageId) dispatchedMessage.get(id));
break;
}
}
ack.setMessageCount(count);
return ack;
}
public String getAckMode() {
return ackMode;
}
public void setAckMode(String ackMode) {
this.ackMode = ackMode;
}
public String getSubscriptionId() {
return subscriptionId;
}
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;
}
public ActiveMQDestination getDestination() {
return destination;
}
public ConsumerInfo getConsumerInfo() {
return consumerInfo;
}
}

View File

@ -0,0 +1,40 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp2;
import java.util.Map;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
/**
* A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
*
* @version $Revision: 1.1.1.1 $
*/
public class StompTransportFactory extends TcpTransportFactory {
protected String getDefaultWireFormatType() {
return "stomp";
}
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new StompTransportFilter(transport);
return super.compositeConfigure(transport, format, options);
}
}

View File

@ -0,0 +1,95 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp2;
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.util.IOExceptionSupport;
/**
* The StompTransportFilter normally sits on top of a TcpTransport
* that has been configured with the StompWireFormat and is used to
* convert STOMP commands to ActiveMQ commands.
*
* All of the coversion work is done by delegating to the ProtocolConverter.
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
public class StompTransportFilter extends TransportFilter {
ProtocolConverter protocolConverter = new ProtocolConverter();
private final Object sendToActiveMQMutex = new Object();
private final Object sendToStompMutex = new Object();
public StompTransportFilter(Transport next) {
super(next);
protocolConverter.setTransportFilter(this);
}
public void start() throws Exception {
super.start();
}
public void stop() throws Exception {
super.stop();
}
public void oneway(Command command) throws IOException {
try {
protocolConverter.onActiveMQCommad(command);
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
}
}
public void onCommand(Command command) {
try {
protocolConverter.onStompCommad((StompCommand) command);
} catch (IOException e) {
onException(e);
} catch (JMSException e) {
onException(IOExceptionSupport.create(e));
}
}
public void onException(IOException error) {
protocolConverter.onStompExcepton(error);
transportListener.onException(error);
}
public void sendToActiveMQ(Command command) {
synchronized(sendToActiveMQMutex) {
transportListener.onCommand(command);
}
}
public void sendToStomp(StompCommand command) throws IOException {
synchronized(sendToStompMutex) {
next.oneway(command);
}
}
}

View File

@ -0,0 +1,200 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp2;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.ProtocolException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.activeio.adapter.PacketInputStream;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activeio.packet.Packet;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.transport.stomp.Stomp;
/**
* Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
public class StompWireFormat implements WireFormat {
private static final byte[] NO_DATA = new byte[]{};
private static final byte[] END_OF_FRAME = new byte[]{0,'\n'};
private static final int MAX_COMMAND_LENGTH = 1024;
private static final int MAX_HEADER_LENGTH = 1024*10;
private static final int MAX_HEADERS = 1000;
private static final int MAX_DATA_LENGTH = 1024*1024*100;
private int version=1;
public Packet marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
marshal(command, dos);
dos.close();
return new ByteArrayPacket(baos.toByteSequence());
}
public Object unmarshal(Packet packet) throws IOException {
PacketInputStream stream = new PacketInputStream(packet);
DataInputStream dis = new DataInputStream(stream);
return unmarshal(dis);
}
public void marshal(Object command, DataOutputStream os) throws IOException {
StompCommand stomp = (org.apache.activemq.transport.stomp2.StompCommand) command;
StringBuffer buffer = new StringBuffer();
buffer.append(stomp.getAction());
buffer.append(Stomp.NEWLINE);
// Output the headers.
for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
buffer.append(entry.getKey());
buffer.append(Stomp.Headers.SEPERATOR);
buffer.append(entry.getValue());
buffer.append(Stomp.NEWLINE);
}
// Add a newline to seperate the headers from the content.
buffer.append(Stomp.NEWLINE);
os.write(buffer.toString().getBytes("UTF-8"));
os.write(stomp.getContent());
os.write(END_OF_FRAME);
}
public Object unmarshal(DataInputStream in) throws IOException {
String action = null;
// skip white space to next real action line
while (true) {
action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
if (action == null) {
throw new IOException("connection was closed");
} else {
action = action.trim();
if (action.length() > 0) {
break;
}
}
}
// Parse the headers
HashMap headers = new HashMap(25);
while (true) {
String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
if (line != null && line.trim().length() > 0) {
if( headers.size() > MAX_HEADERS )
throw new ProtocolException("The maximum number of headers was exceeded");
try {
int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
String name = line.substring(0, seperator_index).trim();
String value = line.substring(seperator_index + 1, line.length()).trim();
headers.put(name, value);
}
catch (Exception e) {
throw new ProtocolException("Unable to parser header line [" + line + "]");
}
}
else {
break;
}
}
// Read in the data part.
byte[] data = NO_DATA;
String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
if (contentLength!=null) {
// Bless the client, he's telling us how much data to read in.
int length;
try {
length = Integer.parseInt(contentLength.trim());
} catch (NumberFormatException e) {
throw new ProtocolException("Specified content-length is not a valid integer");
}
if( length > MAX_DATA_LENGTH )
throw new ProtocolException("The maximum data length was exceeded");
data = new byte[length];
in.readFully(data);
if (in.readByte() != 0) {
throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte");
}
} else {
// We don't know how much to read.. data ends when we hit a 0
byte b;
ByteArrayOutputStream baos=null;
while ((b = in.readByte()) != 0) {
if( baos == null ) {
baos = new ByteArrayOutputStream();
} else if( baos.size() > MAX_DATA_LENGTH ) {
throw new ProtocolException("The maximum data length was exceeded");
}
baos.write(b);
}
if( baos!=null ) {
baos.close();
data = baos.toByteArray();
}
}
return new StompCommand(action, headers, data);
}
private String readLine(DataInputStream in, int maxLength, String errorMessage) throws IOException {
byte b;
ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength);
while ((b = in.readByte()) != '\n') {
if( baos.size() > maxLength )
throw new ProtocolException(errorMessage);
baos.write(b);
}
ByteSequence sequence = baos.toByteSequence();
return new String(sequence.getData(),sequence.getOffset(),sequence.getLength(),"UTF-8");
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
}

View File

@ -0,0 +1,29 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp2;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.command.WireFormatFactory;
/**
* Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
public class StompWireFormatFactory implements WireFormatFactory {
public WireFormat createWireFormat() {
return new StompWireFormat();
}
}

View File

@ -1 +1 @@
class=org.apache.activemq.transport.stomp.StompTransportFactory class=org.apache.activemq.transport.stomp2.StompTransportFactory

View File

@ -1 +1 @@
class=org.apache.activemq.transport.stomp.StompWireFormatFactory class=org.apache.activemq.transport.stomp2.StompWireFormatFactory