From 80c76a8882dbc4d0fdc037bc3ef375ffebca5c3d Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Tue, 4 Jul 2006 14:27:03 +0000 Subject: [PATCH] http://issues.apache.org/activemq/browse/AMQ-793 back porting new stomp impl from 4.1 to 4.0.2 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/branches/activemq-4.0@419020 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/stomp/ProtocolConverter.java | 628 ++++++++++++++++++ .../transport/stomp/ProtocolException.java | 50 ++ .../transport/stomp/ResponseHandler.java | 30 + .../activemq/transport/stomp/Stomp.java | 116 ++++ .../activemq/transport/stomp/StompFrame.java | 149 +++++ .../transport/stomp/StompFrameError.java | 38 ++ .../transport/stomp/StompSubscription.java | 135 ++++ .../stomp/StompTransportFactory.java | 40 ++ .../transport/stomp/StompTransportFilter.java | 79 +++ .../transport/stomp/StompWireFormat.java | 203 ++++++ .../stomp/StompWireFormatFactory.java | 29 + .../activemq/transport/stomp/package.html | 10 + 12 files changed, 1507 insertions(+) create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrameError.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java new file mode 100644 index 0000000000..d17528a113 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -0,0 +1,628 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.JMSException; + +import org.apache.activeio.util.ByteArrayOutputStream; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.LocalTransactionId; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.SessionId; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.LongSequenceGenerator; + +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; + +/** + * + * @author chirino + */ +public class ProtocolConverter { + + private static final IdGenerator connectionIdGenerator = new IdGenerator(); + private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId()); + private final SessionId sessionId = new SessionId(connectionId, -1); + private final ProducerId producerId = new ProducerId(sessionId, 1); + + private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); + private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); + private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); + + private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap(); + private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); + private final Map transactions = new ConcurrentHashMap(); + private StompTransportFilter transportFilter; + + private final Object commnadIdMutex = new Object(); + private int lastCommandId; + private final AtomicBoolean connected = new AtomicBoolean(false); + + protected int generateCommandId() { + synchronized(commnadIdMutex){ + return lastCommandId++; + } + } + + protected ResponseHandler createResponseHandler(StompFrame command){ + final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); + // A response may not be needed. + if( receiptId != null ) { + return new ResponseHandler() { + public void onResponse(ProtocolConverter converter, Response response) throws IOException { + StompFrame sc = new StompFrame(); + sc.setAction(Stomp.Responses.RECEIPT); + sc.setHeaders(new HashMap(1)); + sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); + transportFilter.sendToStomp(sc); + } + }; + } + return null; + } + + protected void sendToActiveMQ(Command command, ResponseHandler handler) { + command.setCommandId(generateCommandId()); + if(handler!=null) { + command.setResponseRequired(true); + resposeHandlers.put(new Integer(command.getCommandId()), handler); + } + transportFilter.sendToActiveMQ(command); + } + + protected void sendToStomp(StompFrame command) throws IOException { + transportFilter.sendToStomp(command); + } + + /** + * Convert a stomp command + * @param command + */ + public void onStompCommad( StompFrame command ) throws IOException, JMSException { + try { + + if( command.getClass() == StompFrameError.class ) { + throw ((StompFrameError)command).getException(); + } + + String action = command.getAction(); + if (action.startsWith(Stomp.Commands.SEND)) + onStompSend(command); + else if (action.startsWith(Stomp.Commands.ACK)) + onStompAck(command); + else if (action.startsWith(Stomp.Commands.BEGIN)) + onStompBegin(command); + else if (action.startsWith(Stomp.Commands.COMMIT)) + onStompCommit(command); + else if (action.startsWith(Stomp.Commands.ABORT)) + onStompAbort(command); + else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) + onStompSubscribe(command); + else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) + onStompUnsubscribe(command); + else if (action.startsWith(Stomp.Commands.CONNECT)) + onStompConnect(command); + else if (action.startsWith(Stomp.Commands.DISCONNECT)) + onStompDisconnect(command); + else + throw new ProtocolException("Unknown STOMP action: "+action); + + } catch (ProtocolException e) { + + // Let the stomp client know about any protocol errors. + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,"UTF-8")); + e.printStackTrace(stream); + stream.close(); + + HashMap headers = new HashMap(); + headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage()); + + final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); + if( receiptId != null ) { + headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); + } + + StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR,headers,baos.toByteArray()); + sendToStomp(errorMessage); + + if( e.isFatal() ) + getTransportFilter().onException(e); + } + } + + protected void onStompSend(StompFrame command) throws IOException, JMSException { + checkConnected(); + + Map headers = command.getHeaders(); + String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION); + + ActiveMQMessage message = convertMessage(command); + + message.setProducerId(producerId); + MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); + message.setMessageId(id); + message.setJMSTimestamp(System.currentTimeMillis()); + + if (stompTx!=null) { + TransactionId activemqTx = (TransactionId) transactions.get(stompTx); + if (activemqTx == null) + throw new ProtocolException("Invalid transaction id: "+stompTx); + message.setTransactionId(activemqTx); + } + + message.onSend(); + sendToActiveMQ(message, createResponseHandler(command)); + + } + + + protected void onStompAck(StompFrame command) throws ProtocolException { + checkConnected(); + + // TODO: acking with just a message id is very bogus + // since the same message id could have been sent to 2 different subscriptions + // on the same stomp connection. For example, when 2 subs are created on the same topic. + + Map headers = command.getHeaders(); + String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID); + if (messageId == null) + throw new ProtocolException("ACK received without a message-id to acknowledge!"); + + TransactionId activemqTx=null; + String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION); + if (stompTx!=null) { + activemqTx = (TransactionId) transactions.get(stompTx); + if (activemqTx == null) + throw new ProtocolException("Invalid transaction id: "+stompTx); + } + + boolean acked=false; + for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { + StompSubscription sub = (StompSubscription) iter.next(); + MessageAck ack = sub.onStompMessageAck(messageId); + if( ack!=null ) { + ack.setTransactionId(activemqTx); + sendToActiveMQ(ack,createResponseHandler(command)); + acked=true; + break; + } + } + + if( !acked ) + throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]"); + + } + + + protected void onStompBegin(StompFrame command) throws ProtocolException { + checkConnected(); + + Map headers = command.getHeaders(); + + String stompTx = (String)headers.get(Stomp.Headers.TRANSACTION); + + if (!headers.containsKey(Stomp.Headers.TRANSACTION)) { + throw new ProtocolException("Must specify the transaction you are beginning"); + } + + if( transactions.get(stompTx)!=null ) { + throw new ProtocolException("The transaction was allready started: "+stompTx); + } + + LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId()); + transactions.put(stompTx, activemqTx); + + TransactionInfo tx = new TransactionInfo(); + tx.setConnectionId(connectionId); + tx.setTransactionId(activemqTx); + tx.setType(TransactionInfo.BEGIN); + + sendToActiveMQ(tx, createResponseHandler(command)); + + } + + protected void onStompCommit(StompFrame command) throws ProtocolException { + checkConnected(); + + Map headers = command.getHeaders(); + + String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION); + if (stompTx==null) { + throw new ProtocolException("Must specify the transaction you are committing"); + } + + TransactionId activemqTx=null; + if (stompTx!=null) { + activemqTx = (TransactionId) transactions.remove(stompTx); + if (activemqTx == null) + throw new ProtocolException("Invalid transaction id: "+stompTx); + } + + TransactionInfo tx = new TransactionInfo(); + tx.setConnectionId(connectionId); + tx.setTransactionId(activemqTx); + tx.setType(TransactionInfo.COMMIT_ONE_PHASE); + + sendToActiveMQ(tx, createResponseHandler(command)); + } + + protected void onStompAbort(StompFrame command) throws ProtocolException { + checkConnected(); + Map headers = command.getHeaders(); + + String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION); + if (stompTx==null) { + throw new ProtocolException("Must specify the transaction you are committing"); + } + + TransactionId activemqTx=null; + if (stompTx!=null) { + activemqTx = (TransactionId) transactions.remove(stompTx); + if (activemqTx == null) + throw new ProtocolException("Invalid transaction id: "+stompTx); + } + + TransactionInfo tx = new TransactionInfo(); + tx.setConnectionId(connectionId); + tx.setTransactionId(activemqTx); + tx.setType(TransactionInfo.ROLLBACK); + + sendToActiveMQ(tx, createResponseHandler(command)); + + } + + protected void onStompSubscribe(StompFrame command) throws ProtocolException { + checkConnected(); + Map headers = command.getHeaders(); + + String subscriptionId = (String)headers.get(Stomp.Headers.Subscribe.ID); + String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION); + + ActiveMQDestination actual_dest = convertDestination(destination); + ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); + ConsumerInfo consumerInfo = new ConsumerInfo(id); + consumerInfo.setPrefetchSize(1000); + consumerInfo.setDispatchAsync(true); + + String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR); + consumerInfo.setSelector(selector); + + IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); + + consumerInfo.setDestination(convertDestination(destination)); + + StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo); + stompSubscription.setDestination(actual_dest); + + String ackMode = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE); + if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) { + stompSubscription.setAckMode(StompSubscription.CLIENT_ACK); + } else { + stompSubscription.setAckMode(StompSubscription.AUTO_ACK); + } + + subscriptionsByConsumerId.put(id, stompSubscription); + sendToActiveMQ(consumerInfo, createResponseHandler(command)); + + } + + protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { + checkConnected(); + Map headers = command.getHeaders(); + + ActiveMQDestination destination=null; + Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION); + if( o!=null ) + destination =convertDestination((String) o); + + String subscriptionId = (String)headers.get(Stomp.Headers.Unsubscribe.ID); + + if (subscriptionId==null && destination==null) { + throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from"); + } + + // TODO: Unsubscribing using a destination is a bit wierd if multiple subscriptions + // are created with the same destination. Perhaps this should be removed. + // + for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { + StompSubscription sub = (StompSubscription) iter.next(); + if ( + (subscriptionId!=null && subscriptionId.equals(sub.getSubscriptionId()) ) || + (destination!=null && destination.equals(sub.getDestination()) ) + ) { + sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); + return; + } + } + + throw new ProtocolException("No subscription matched."); + } + + protected void onStompConnect(StompFrame command) throws ProtocolException { + + if(connected.get()) { + throw new ProtocolException("Allready connected."); + } + + final Map headers = command.getHeaders(); + + // allow anyone to login for now + String login = (String)headers.get(Stomp.Headers.Connect.LOGIN); + String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE); + String clientId = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID); + + final ConnectionInfo connectionInfo = new ConnectionInfo(); + + IntrospectionSupport.setProperties(connectionInfo, headers, "activemq."); + + connectionInfo.setConnectionId(connectionId); + if( clientId!=null ) + connectionInfo.setClientId(clientId); + else + connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString()); + + connectionInfo.setResponseRequired(true); + connectionInfo.setUserName(login); + connectionInfo.setPassword(passcode); + + sendToActiveMQ(connectionInfo, new ResponseHandler(){ + public void onResponse(ProtocolConverter converter, Response response) throws IOException { + + final SessionInfo sessionInfo = new SessionInfo(sessionId); + sendToActiveMQ(sessionInfo,null); + + + final ProducerInfo producerInfo = new ProducerInfo(producerId); + sendToActiveMQ(producerInfo,new ResponseHandler(){ + public void onResponse(ProtocolConverter converter, Response response) throws IOException { + + connected.set(true); + HashMap responseHeaders = new HashMap(); + + responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId()); + String requestId = (String) headers.get(Stomp.Headers.Connect.REQUEST_ID); + if( requestId !=null ){ + responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId); + } + + StompFrame sc = new StompFrame(); + sc.setAction(Stomp.Responses.CONNECTED); + sc.setHeaders(responseHeaders); + sendToStomp(sc); + } + }); + + } + }); + + } + + protected void onStompDisconnect(StompFrame command) throws ProtocolException { + checkConnected(); + sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); + connected.set(false); + } + + + protected void checkConnected() throws ProtocolException { + if(!connected.get()) { + throw new ProtocolException("Not connected."); + } + } + + /** + * Convert a ActiveMQ command + * @param command + * @throws IOException + */ + public void onActiveMQCommad( Command command ) throws IOException, JMSException { + + if ( command.isResponse() ) { + + Response response = (Response) command; + ResponseHandler rh = (ResponseHandler) resposeHandlers.remove(new Integer(response.getCorrelationId())); + if( rh !=null ) { + rh.onResponse(this, response); + } + + } else if( command.isMessageDispatch() ) { + + MessageDispatch md = (MessageDispatch)command; + StompSubscription sub = (StompSubscription) subscriptionsByConsumerId.get(md.getConsumerId()); + if (sub != null) + sub.onMessageDispatch(md); + + } + + } + + public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException { + Map headers = command.getHeaders(); + + // now the body + ActiveMQMessage msg; + if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) { + headers.remove(Stomp.Headers.CONTENT_LENGTH); + ActiveMQBytesMessage bm = new ActiveMQBytesMessage(); + bm.writeBytes(command.getContent()); + msg = bm; + } else { + ActiveMQTextMessage text = new ActiveMQTextMessage(); + try { + text.setText(new String(command.getContent(), "UTF-8")); + } catch (Throwable e) { + throw new ProtocolException("Text could not bet set: "+e, false, e); + } + msg = text; + } + + String destination = (String) headers.remove(Stomp.Headers.Send.DESTINATION); + msg.setDestination(convertDestination(destination)); + + // the standard JMS headers + msg.setJMSCorrelationID((String) headers.remove(Stomp.Headers.Send.CORRELATION_ID)); + + Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME); + if (o != null) { + msg.setJMSExpiration(Long.parseLong((String) o)); + } + + o = headers.remove(Stomp.Headers.Send.PRIORITY); + if (o != null) { + msg.setJMSPriority(Integer.parseInt((String)o)); + } + + o = headers.remove(Stomp.Headers.Send.TYPE); + if (o != null) { + msg.setJMSType((String) o); + } + + o = headers.remove(Stomp.Headers.Send.REPLY_TO); + if( o!=null ) { + msg.setJMSReplyTo(convertDestination((String)o)); + } + + o = headers.remove(Stomp.Headers.Send.PERSISTENT); + if (o != null) { + msg.setPersistent("true".equals(o)); + } + + // now the general headers + msg.setProperties(headers); + + return msg; + } + + public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException { + + StompFrame command = new StompFrame(); + command.setAction(Stomp.Responses.MESSAGE); + + HashMap headers = new HashMap(); + command.setHeaders(headers); + + headers.put(Stomp.Headers.Message.DESTINATION, convertDestination(message.getDestination())); + headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID()); + headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID()); + headers.put(Stomp.Headers.Message.EXPIRATION_TIME, ""+message.getJMSExpiration()); + if (message.getJMSRedelivered()) { + headers.put(Stomp.Headers.Message.REDELIVERED, "true"); + } + headers.put(Stomp.Headers.Message.PRORITY, ""+message.getJMSPriority()); + headers.put(Stomp.Headers.Message.REPLY_TO, convertDestination(message.getJMSReplyTo())); + headers.put(Stomp.Headers.Message.TIMESTAMP, ""+message.getJMSTimestamp()); + headers.put(Stomp.Headers.Message.TYPE, message.getJMSType()); + + // now lets add all the message headers + Map properties = message.getProperties(); + if (properties != null) { + headers.putAll(properties); + } + + if( message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) { + + ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy(); + command.setContent(msg.getText().getBytes("UTF-8")); + + } else if( message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE ) { + + ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy(); + byte[] data = new byte[(int)msg.getBodyLength()]; + msg.readBytes(data); + + headers.put(Stomp.Headers.CONTENT_LENGTH, ""+data.length); + command.setContent(data); + + } + + return command; + } + + protected ActiveMQDestination convertDestination(String name) throws ProtocolException { + if (name == null) { + return null; + } + else if (name.startsWith("/queue/")) { + String q_name = name.substring("/queue/".length(), name.length()); + return ActiveMQDestination.createDestination(q_name, ActiveMQDestination.QUEUE_TYPE); + } + else if (name.startsWith("/topic/")) { + String t_name = name.substring("/topic/".length(), name.length()); + return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TOPIC_TYPE); + } + else { + throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " + "must begine with /queue/ or /topic/"); + } + + } + + protected String convertDestination(Destination d) { + if (d == null) { + return null; + } + ActiveMQDestination amq_d = (ActiveMQDestination) d; + String p_name = amq_d.getPhysicalName(); + + StringBuffer buffer = new StringBuffer(); + if (amq_d.isQueue()) { + buffer.append("/queue/"); + } + if (amq_d.isTopic()) { + buffer.append("/topic/"); + } + buffer.append(p_name); + + return buffer.toString(); + } + + public StompTransportFilter getTransportFilter() { + return transportFilter; + } + + public void setTransportFilter(StompTransportFilter transportFilter) { + this.transportFilter = transportFilter; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java new file mode 100644 index 0000000000..2db34bc451 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java @@ -0,0 +1,50 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.IOException; + +/** + * + * @author chirino + */ +public class ProtocolException extends IOException { + + private static final long serialVersionUID = -2869735532997332242L; + + private final boolean fatal; + + public ProtocolException() { + this(null); + } + public ProtocolException(String s) { + this(s, false); + } + public ProtocolException(String s, boolean fatal) { + this(s,fatal, null); + } + public ProtocolException(String s, boolean fatal, Throwable cause) { + super(s); + this.fatal = fatal; + initCause(cause); + } + + public boolean isFatal() { + return fatal; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java new file mode 100644 index 0000000000..efbce9b8cb --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java @@ -0,0 +1,30 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.IOException; + +import org.apache.activemq.command.Response; + +/** + * Interface used by the ProtocolConverter for callbacks. + * + * @author chirino + */ +interface ResponseHandler { + void onResponse(ProtocolConverter converter, Response response) throws IOException; +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java new file mode 100644 index 0000000000..edd774f5a4 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java @@ -0,0 +1,116 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +public interface Stomp { + String NULL = "\u0000"; + String NEWLINE = "\n"; + + public static interface Commands { + String CONNECT = "CONNECT"; + String SEND = "SEND"; + String DISCONNECT = "DISCONNECT"; + String SUBSCRIBE = "SUB"; + String UNSUBSCRIBE = "UNSUB"; + + String BEGIN_TRANSACTION = "BEGIN"; + String COMMIT_TRANSACTION = "COMMIT"; + String ABORT_TRANSACTION = "ABORT"; + String BEGIN = "BEGIN"; + String COMMIT = "COMMIT"; + String ABORT = "ABORT"; + String ACK = "ACK"; + } + + public interface Responses { + String CONNECTED = "CONNECTED"; + String ERROR = "ERROR"; + String MESSAGE = "MESSAGE"; + String RECEIPT = "RECEIPT"; + } + + public interface Headers { + String SEPERATOR = ":"; + String RECEIPT_REQUESTED = "receipt"; + String TRANSACTION = "transaction"; + String CONTENT_LENGTH = "content-length"; + + public interface Response { + String RECEIPT_ID = "receipt-id"; + } + + public interface Send { + String DESTINATION = "destination"; + String CORRELATION_ID = "correlation-id"; + String REPLY_TO = "reply-to"; + String EXPIRATION_TIME = "expires"; + String PRIORITY = "priority"; + String TYPE = "type"; + Object PERSISTENT = "persistent"; + } + + public interface Message { + String MESSAGE_ID = "message-id"; + String DESTINATION = "destination"; + String CORRELATION_ID = "correlation-id"; + String EXPIRATION_TIME = "expires"; + String REPLY_TO = "reply-to"; + String PRORITY = "priority"; + String REDELIVERED = "redelivered"; + String TIMESTAMP = "timestamp"; + String TYPE = "type"; + String SUBSCRIPTION = "subscription"; + } + + public interface Subscribe { + String DESTINATION = "destination"; + String ACK_MODE = "ack"; + String ID = "id"; + String SELECTOR = "selector"; + + public interface AckModeValues { + String AUTO = "auto"; + String CLIENT = "client"; + } + } + + public interface Unsubscribe { + String DESTINATION = "destination"; + String ID = "id"; + } + + public interface Connect { + String LOGIN = "login"; + String PASSCODE = "passcode"; + String CLIENT_ID = "client-id"; + String REQUEST_ID = "request-id"; + } + + public interface Error { + String MESSAGE = "message"; + } + + public interface Connected { + String SESSION = "session"; + String RESPONSE_ID = "response-id"; + } + + public interface Ack { + String MESSAGE_ID = "message-id"; + } + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java new file mode 100644 index 0000000000..ee3e027275 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java @@ -0,0 +1,149 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.command.Command; +import org.apache.activemq.command.Endpoint; +import org.apache.activemq.command.Response; +import org.apache.activemq.state.CommandVisitor; + +/** + * Represents all the data in a STOMP frame. + * + * @author chirino + */ +public class StompFrame implements Command { + + private static final byte[] NO_DATA = new byte[]{}; + + private String action; + private Map headers = Collections.EMPTY_MAP; + private byte[] content = NO_DATA; + + public StompFrame(String command, HashMap headers, byte[] data) { + this.action = command; + this.headers = headers; + this.content = data; + } + + public StompFrame() { + } + + public String getAction() { + return action; + } + + public void setAction(String command) { + this.action = command; + } + + public byte[] getContent() { + return content; + } + + public void setContent(byte[] data) { + this.content = data; + } + + public Map getHeaders() { + return headers; + } + + public void setHeaders(Map headers) { + this.headers = headers; + } + + // + // Methods in the Command interface + // + public int getCommandId() { + return 0; + } + + public Endpoint getFrom() { + return null; + } + + public Endpoint getTo() { + return null; + } + + public boolean isBrokerInfo() { + return false; + } + + public boolean isMessage() { + return false; + } + + public boolean isMessageAck() { + return false; + } + + public boolean isMessageDispatch() { + return false; + } + + public boolean isMessageDispatchNotification() { + return false; + } + + public boolean isResponse() { + return false; + } + + public boolean isResponseRequired() { + return false; + } + + public boolean isShutdownInfo() { + return false; + } + + public boolean isWireFormatInfo() { + return false; + } + + public void setCommandId(int value) { + } + + public void setFrom(Endpoint from) { + } + + public void setResponseRequired(boolean responseRequired) { + } + + public void setTo(Endpoint to) { + } + + public Response visit(CommandVisitor visitor) throws Exception { + return null; + } + + public byte getDataStructureType() { + return 0; + } + + public boolean isMarshallAware() { + return false; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrameError.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrameError.java new file mode 100644 index 0000000000..52d651c504 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrameError.java @@ -0,0 +1,38 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +/** + * Command indicating that an invalid Stomp Frame was received. + * + * @author chirino + */ +public class StompFrameError extends StompFrame { + + + private final ProtocolException exception; + + public StompFrameError(ProtocolException exception) { + this.exception = exception; + } + + public ProtocolException getException() { + return exception; + } + + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java new file mode 100644 index 0000000000..d5ee9300d7 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -0,0 +1,135 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedHashMap; + +import javax.jms.JMSException; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; + +/** + * Keeps track of the STOMP susbscription so that acking is correctly done. + * + * @author chirino + */ +public class StompSubscription { + + public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO; + public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT; + + private final ProtocolConverter protocolConverter; + private final String subscriptionId; + private final ConsumerInfo consumerInfo; + + private final LinkedHashMap dispatchedMessage = new LinkedHashMap(); + + private String ackMode = AUTO_ACK; + private ActiveMQDestination destination; + + + public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo) { + this.protocolConverter = stompTransport; + this.subscriptionId = subscriptionId; + this.consumerInfo = consumerInfo; + } + + void onMessageDispatch(MessageDispatch md) throws IOException, JMSException { + + ActiveMQMessage message = (ActiveMQMessage) md.getMessage(); + + if (ackMode == CLIENT_ACK) { + synchronized (this) { + dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId()); + } + } else if (ackMode == AUTO_ACK) { + MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); + protocolConverter.getTransportFilter().sendToActiveMQ(ack); + } + + StompFrame command = protocolConverter.convertMessage(message); + + command.setAction(Stomp.Responses.MESSAGE); + if (subscriptionId!=null) { + command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId); + } + + protocolConverter.getTransportFilter().sendToStomp(command); + } + + synchronized MessageAck onStompMessageAck(String messageId) { + + if( !dispatchedMessage.containsKey(messageId) ) { + return null; + } + + MessageAck ack = new MessageAck(); + ack.setDestination(consumerInfo.getDestination()); + ack.setAckType(MessageAck.STANDARD_ACK_TYPE); + ack.setConsumerId(consumerInfo.getConsumerId()); + + int count=0; + for (Iterator iter = dispatchedMessage.keySet().iterator(); iter.hasNext();) { + + String id = (String) iter.next(); + if( ack.getFirstMessageId()==null ) + ack.setFirstMessageId((MessageId) dispatchedMessage.get(id)); + + iter.remove(); + count++; + if( id.equals(messageId) ) { + ack.setLastMessageId((MessageId) dispatchedMessage.get(id)); + break; + } + } + + ack.setMessageCount(count); + return ack; + } + + public String getAckMode() { + return ackMode; + } + + public void setAckMode(String ackMode) { + this.ackMode = ackMode; + } + + public String getSubscriptionId() { + return subscriptionId; + } + + public void setDestination(ActiveMQDestination destination) { + this.destination = destination; + } + + public ActiveMQDestination getDestination() { + return destination; + } + + public ConsumerInfo getConsumerInfo() { + return consumerInfo; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java new file mode 100644 index 0000000000..bb8afebc1f --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java @@ -0,0 +1,40 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.util.Map; + +import org.apache.activeio.command.WireFormat; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.tcp.TcpTransportFactory; + +/** + * A STOMP transport factory + * + * @version $Revision: 1.1.1.1 $ + */ +public class StompTransportFactory extends TcpTransportFactory { + + protected String getDefaultWireFormatType() { + return "stomp"; + } + + public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { + transport = new StompTransportFilter(transport); + return super.compositeConfigure(transport, format, options); + } +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java new file mode 100644 index 0000000000..343884e639 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java @@ -0,0 +1,79 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.IOException; + +import javax.jms.JMSException; + +import org.apache.activemq.command.Command; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.util.IOExceptionSupport; + +/** + * The StompTransportFilter normally sits on top of a TcpTransport + * that has been configured with the StompWireFormat and is used to + * convert STOMP commands to ActiveMQ commands. + * + * All of the coversion work is done by delegating to the ProtocolConverter. + * + * @author chirino + */ +public class StompTransportFilter extends TransportFilter { + + ProtocolConverter protocolConverter = new ProtocolConverter(); + + private final Object sendToActiveMQMutex = new Object(); + private final Object sendToStompMutex = new Object(); + + public StompTransportFilter(Transport next) { + super(next); + protocolConverter.setTransportFilter(this); + } + + public void oneway(Command command) throws IOException { + try { + protocolConverter.onActiveMQCommad(command); + } catch (JMSException e) { + throw IOExceptionSupport.create(e); + } + } + + public void onCommand(Command command) { + try { + protocolConverter.onStompCommad((StompFrame) command); + } catch (IOException e) { + onException(e); + } catch (JMSException e) { + onException(IOExceptionSupport.create(e)); + } + } + + public void sendToActiveMQ(Command command) { + synchronized(sendToActiveMQMutex) { + transportListener.onCommand(command); + } + } + + public void sendToStomp(StompFrame command) throws IOException { + synchronized(sendToStompMutex) { + next.oneway(command); + } + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java new file mode 100644 index 0000000000..91e3fecb5d --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java @@ -0,0 +1,203 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.activeio.adapter.PacketInputStream; +import org.apache.activeio.command.WireFormat; +import org.apache.activeio.packet.ByteArrayPacket; +import org.apache.activeio.packet.ByteSequence; +import org.apache.activeio.packet.Packet; +import org.apache.activeio.util.ByteArrayOutputStream; + +/** + * Implements marshalling and unmarsalling the Stomp protocol. + */ +public class StompWireFormat implements WireFormat { + + private static final byte[] NO_DATA = new byte[]{}; + private static final byte[] END_OF_FRAME = new byte[]{0,'\n'}; + + private static final int MAX_COMMAND_LENGTH = 1024; + private static final int MAX_HEADER_LENGTH = 1024*10; + private static final int MAX_HEADERS = 1000; + private static final int MAX_DATA_LENGTH = 1024*1024*100; + + private int version=1; + + public Packet marshal(Object command) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + marshal(command, dos); + dos.close(); + return new ByteArrayPacket(baos.toByteSequence()); + } + + public Object unmarshal(Packet packet) throws IOException { + PacketInputStream stream = new PacketInputStream(packet); + DataInputStream dis = new DataInputStream(stream); + return unmarshal(dis); + } + + public void marshal(Object command, DataOutputStream os) throws IOException { + StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame) command; + + StringBuffer buffer = new StringBuffer(); + buffer.append(stomp.getAction()); + buffer.append(Stomp.NEWLINE); + + // Output the headers. + for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) { + Map.Entry entry = (Map.Entry) iter.next(); + buffer.append(entry.getKey()); + buffer.append(Stomp.Headers.SEPERATOR); + buffer.append(entry.getValue()); + buffer.append(Stomp.NEWLINE); + } + + // Add a newline to seperate the headers from the content. + buffer.append(Stomp.NEWLINE); + + os.write(buffer.toString().getBytes("UTF-8")); + os.write(stomp.getContent()); + os.write(END_OF_FRAME); + } + + + public Object unmarshal(DataInputStream in) throws IOException { + + try { + String action = null; + + // skip white space to next real action line + while (true) { + action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded"); + if (action == null) { + throw new IOException("connection was closed"); + } else { + action = action.trim(); + if (action.length() > 0) { + break; + } + } + } + + // Parse the headers + HashMap headers = new HashMap(25); + while (true) { + String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded"); + if (line != null && line.trim().length() > 0) { + + if( headers.size() > MAX_HEADERS ) + throw new ProtocolException("The maximum number of headers was exceeded", true); + + try { + int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR); + String name = line.substring(0, seperator_index).trim(); + String value = line.substring(seperator_index + 1, line.length()).trim(); + headers.put(name, value); + } + catch (Exception e) { + throw new ProtocolException("Unable to parser header line [" + line + "]", true); + } + } + else { + break; + } + } + + // Read in the data part. + byte[] data = NO_DATA; + String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH); + if (contentLength!=null) { + + // Bless the client, he's telling us how much data to read in. + int length; + try { + length = Integer.parseInt(contentLength.trim()); + } catch (NumberFormatException e) { + throw new ProtocolException("Specified content-length is not a valid integer", true); + } + + if( length > MAX_DATA_LENGTH ) + throw new ProtocolException("The maximum data length was exceeded", true); + + data = new byte[length]; + in.readFully(data); + + if (in.readByte() != 0) { + throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte", true); + } + + } else { + + // We don't know how much to read.. data ends when we hit a 0 + byte b; + ByteArrayOutputStream baos=null; + while ((b = in.readByte()) != 0) { + + if( baos == null ) { + baos = new ByteArrayOutputStream(); + } else if( baos.size() > MAX_DATA_LENGTH ) { + throw new ProtocolException("The maximum data length was exceeded", true); + } + + baos.write(b); + } + + if( baos!=null ) { + baos.close(); + data = baos.toByteArray(); + } + + } + + return new StompFrame(action, headers, data); + + } catch (ProtocolException e) { + return new StompFrameError(e); + } + + } + + private String readLine(DataInputStream in, int maxLength, String errorMessage) throws IOException { + byte b; + ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength); + while ((b = in.readByte()) != '\n') { + if( baos.size() > maxLength ) + throw new ProtocolException(errorMessage, true); + baos.write(b); + } + ByteSequence sequence = baos.toByteSequence(); + return new String(sequence.getData(),sequence.getOffset(),sequence.getLength(),"UTF-8"); + } + + public int getVersion() { + return version; + } + + public void setVersion(int version) { + this.version = version; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java new file mode 100644 index 0000000000..60d7ab6e04 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java @@ -0,0 +1,29 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import org.apache.activeio.command.WireFormat; +import org.apache.activeio.command.WireFormatFactory; + +/** + * Creates WireFormat objects that marshalls the Stomp protocol. + */ +public class StompWireFormatFactory implements WireFormatFactory { + public WireFormat createWireFormat() { + return new StompWireFormat(); + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html new file mode 100644 index 0000000000..70f8e90c22 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html @@ -0,0 +1,10 @@ + + + + + +An implementation of the Stomp protocol which is a simple wire protocol for writing clients for ActiveMQ in different +languages like Ruby, Python, PHP, C etc. + + +