From 1f6aa57df62ae72d75a44ad61d859d499f6ca196 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Tue, 4 Jul 2006 14:14:12 +0000 Subject: [PATCH] Removing old stomp implementation since the new one seems to be working better and has been verified to have fixed some bugs. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@419012 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/transport/stomp/Abort.java | 53 --- .../apache/activemq/transport/stomp/Ack.java | 59 ---- .../activemq/transport/stomp/AsyncHelper.java | 48 --- .../activemq/transport/stomp/Begin.java | 52 --- .../activemq/transport/stomp/Command.java | 36 -- .../transport/stomp/CommandEnvelope.java | 51 --- .../transport/stomp/CommandParser.java | 104 ------ .../activemq/transport/stomp/Commit.java | 56 --- .../activemq/transport/stomp/Connect.java | 124 ------- .../transport/stomp/DestinationNamer.java | 62 ---- .../activemq/transport/stomp/Disconnect.java | 32 -- .../transport/stomp/FrameBuilder.java | 122 ------- .../transport/stomp/HeaderParser.java | 71 ---- .../transport/stomp/ResponseListener.java | 29 -- .../apache/activemq/transport/stomp/Send.java | 153 -------- .../transport/stomp/StompCommand.java | 39 --- .../stomp/StompTransportFactory.java | 32 -- .../transport/stomp/StompWireFormat.java | 327 ------------------ .../stomp/StompWireFormatFactory.java | 29 -- .../activemq/transport/stomp/Subscribe.java | 66 ---- .../transport/stomp/Subscription.java | 148 -------- .../activemq/transport/stomp/Unsubscribe.java | 62 ---- .../transport/stomp2/ProtocolConverter.java | 1 - .../transport/{stomp => stomp2}/Stomp.java | 2 +- .../transport/stomp2/StompSubscription.java | 1 - .../transport/stomp2/StompWireFormat.java | 1 - .../transport/{stomp => stomp2}/package.html | 0 .../activemq/transport/stomp/StompTest.java | 1 + .../transport/stomp/StompWireFormatTest.java | 121 ------- 29 files changed, 2 insertions(+), 1880 deletions(-) delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/Abort.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/Ack.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/AsyncHelper.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/Begin.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/Command.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/CommandEnvelope.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/CommandParser.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/Commit.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/DestinationNamer.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/Disconnect.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameBuilder.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/HeaderParser.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseListener.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/Send.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCommand.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java delete mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/Unsubscribe.java rename activemq-core/src/main/java/org/apache/activemq/transport/{stomp => stomp2}/Stomp.java (98%) rename activemq-core/src/main/java/org/apache/activemq/transport/{stomp => stomp2}/package.html (100%) delete mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Abort.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Abort.java deleted file mode 100644 index 6817791199..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Abort.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.command.TransactionInfo; - -import java.io.DataInput; -import java.io.IOException; -import java.net.ProtocolException; -import java.util.Properties; - -class Abort implements StompCommand { - private StompWireFormat format; - private static final HeaderParser parser = new HeaderParser(); - - Abort(StompWireFormat format) { - this.format = format; - } - - public CommandEnvelope build(String commandLine, DataInput in) throws IOException { - Properties headers = parser.parse(in); - while (in.readByte() != 0) { - } - String user_tx_id = headers.getProperty(Stomp.Headers.TRANSACTION); - - if (!headers.containsKey(Stomp.Headers.TRANSACTION)) { - throw new ProtocolException("Must specify the transaction you are aborting"); - } - - TransactionId txnId = format.getTransactionId(user_tx_id); - TransactionInfo tx = new TransactionInfo(); - tx.setConnectionId(format.getConnectionId()); - tx.setTransactionId(txnId); - tx.setType(TransactionInfo.ROLLBACK); - format.clearTransactionId(user_tx_id); - return new CommandEnvelope(tx, headers); - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Ack.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Ack.java deleted file mode 100644 index 2c27bf19c0..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Ack.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import java.io.DataInput; -import java.io.IOException; -import java.net.ProtocolException; -import java.util.Properties; - -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.TransactionId; - -class Ack implements StompCommand { - private final StompWireFormat format; - private static final HeaderParser parser = new HeaderParser(); - - Ack(StompWireFormat format) { - this.format = format; - } - - public CommandEnvelope build(String commandLine, DataInput in) throws IOException { - Properties headers = parser.parse(in); - String message_id = headers.getProperty(Stomp.Headers.Ack.MESSAGE_ID); - if (message_id == null) - throw new ProtocolException("ACK received without a message-id to acknowledge!"); - - Subscription sub = (Subscription) format.getDispachedMap().get(message_id); - if( sub ==null ) - throw new ProtocolException("Unexpected ACK received for message-id [" + message_id + "]"); - - MessageAck ack = sub.createMessageAck(message_id); - - if (headers.containsKey(Stomp.Headers.TRANSACTION)) { - TransactionId tx_id = format.getTransactionId(headers.getProperty(Stomp.Headers.TRANSACTION)); - if (tx_id == null) - throw new ProtocolException(headers.getProperty(Stomp.Headers.TRANSACTION) + " is an invalid transaction id"); - ack.setTransactionId(tx_id); - } - - while ((in.readByte()) != 0) { - } - - return new CommandEnvelope(ack, headers); - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/AsyncHelper.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/AsyncHelper.java deleted file mode 100644 index cbb4218814..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/AsyncHelper.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -class AsyncHelper { - public static Object tryUntilNotInterrupted(HelperWithReturn helper) { - while (true) { - try { - return helper.cycle(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - static void tryUntilNotInterrupted(final Helper helper) { - tryUntilNotInterrupted(new HelperWithReturn() { - - public Object cycle() throws InterruptedException { - helper.cycle(); - return null; - } - }); - } - - interface HelperWithReturn { - Object cycle() throws InterruptedException; - } - - interface Helper { - void cycle() throws InterruptedException; - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Begin.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Begin.java deleted file mode 100644 index bd08be06d6..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Begin.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.command.TransactionInfo; - -import java.io.DataInput; -import java.io.IOException; -import java.net.ProtocolException; -import java.util.Properties; - -public class Begin implements StompCommand { - private StompWireFormat format; - private static final HeaderParser parser = new HeaderParser(); - - public Begin(StompWireFormat format) { - this.format = format; - } - - public CommandEnvelope build(String commandLine, DataInput in) throws IOException { - Properties headers = parser.parse(in); - while (in.readByte() != 0) { - } - - TransactionInfo tx = new TransactionInfo(); - String user_tx_id = headers.getProperty(Stomp.Headers.TRANSACTION); - if (!headers.containsKey(Stomp.Headers.TRANSACTION)) { - throw new ProtocolException("Must specify the transaction you are beginning"); - } - int tx_id = StompWireFormat.generateTransactionId(); - TransactionId transactionId = format.registerTransactionId(user_tx_id, tx_id); - tx.setConnectionId(format.getConnectionId()); - tx.setTransactionId(transactionId); - tx.setType(TransactionInfo.BEGIN); - return new CommandEnvelope(tx, headers); - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Command.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Command.java deleted file mode 100644 index 904238ebe6..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Command.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import javax.jms.JMSException; - -import java.io.DataInput; -import java.io.IOException; -import java.util.Properties; - -interface Command { - public CommandEnvelope build(String commandLine, DataInput in) throws IOException, JMSException; - - /** - * Returns a command instance which always returns null for a packet - */ - StompCommand NULL_COMMAND = new StompCommand() { - public CommandEnvelope build(String commandLine, DataInput in) { - return new CommandEnvelope(null, new Properties()); - } - }; -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/CommandEnvelope.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/CommandEnvelope.java deleted file mode 100644 index 8770c40568..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/CommandEnvelope.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activemq.command.Command; - -import java.util.Properties; - -public class CommandEnvelope { - - private final Command command; - private final Properties headers; - private final ResponseListener responseListener; - - public CommandEnvelope(Command command, Properties headers) { - this(command, headers, null); - } - - public CommandEnvelope(Command command, Properties headers, ResponseListener responseListener) { - this.command = command; - this.headers = headers; - this.responseListener = responseListener; - } - - public Properties getHeaders() { - return headers; - } - - public Command getCommand() { - return command; - } - - public ResponseListener getResponseListener() { - return responseListener; - } - -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/CommandParser.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/CommandParser.java deleted file mode 100644 index f0f56c9341..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/CommandParser.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activemq.command.Command; -import org.apache.activemq.command.Response; - -import javax.jms.JMSException; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.net.ProtocolException; - -class CommandParser { - private final StompWireFormat format; - - CommandParser(StompWireFormat wireFormat) { - format = wireFormat; - } - - Command parse(DataInput in) throws IOException, JMSException { - String line = null; - - // skip white space to next real line - while (true) { - line = in.readLine(); - if (line == null) { - throw new IOException("connection was closed"); - } - else { - line = line.trim(); - if (line.length() > 0) { - break; - } - } - } - - // figure correct command and return it - StompCommand command = null; - if (line.startsWith(Stomp.Commands.CONNECT)) - command = new Connect(format); - else if (line.startsWith(Stomp.Commands.SUBSCRIBE)) - command = new Subscribe(format); - else if (line.startsWith(Stomp.Commands.SEND)) - command = new Send(format); - else if (line.startsWith(Stomp.Commands.DISCONNECT)) - command = new Disconnect(); - else if (line.startsWith(Stomp.Commands.BEGIN)) - command = new Begin(format); - else if (line.startsWith(Stomp.Commands.COMMIT)) - command = new Commit(format); - else if (line.startsWith(Stomp.Commands.ABORT)) - command = new Abort(format); - else if (line.startsWith(Stomp.Commands.UNSUBSCRIBE)) - command = new Unsubscribe(format); - else if (line.startsWith(Stomp.Commands.ACK)) - command = new Ack(format); - - if (command == null) { - while (in.readByte() == 0) { - } - throw new ProtocolException("Unknown command [" + line + "]"); - } - - final CommandEnvelope envelope = command.build(line, in); - final short commandId = format.generateCommandId(); - final String client_packet_key = envelope.getHeaders().getProperty(Stomp.Headers.RECEIPT_REQUESTED); - final boolean receiptRequested = client_packet_key!=null; - - envelope.getCommand().setCommandId(commandId); - if (receiptRequested || envelope.getResponseListener()!=null ) { - envelope.getCommand().setResponseRequired(true); - if( envelope.getResponseListener()!=null ) { - format.addResponseListener(envelope.getResponseListener()); - } else { - format.addResponseListener(new ResponseListener() { - public boolean onResponse(Response receipt, DataOutput out) throws IOException { - if (receipt.getCorrelationId() != commandId) - return false; - out.write(new FrameBuilder(Stomp.Responses.RECEIPT).addHeader(Stomp.Headers.Response.RECEIPT_ID, client_packet_key).toFrame()); - return true; - } - }); - } - } - - return envelope.getCommand(); - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Commit.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Commit.java deleted file mode 100644 index aae23e3858..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Commit.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.command.TransactionInfo; - -import java.io.DataInput; -import java.io.IOException; -import java.net.ProtocolException; -import java.util.Properties; - -class Commit implements StompCommand { - private StompWireFormat format; - private static final HeaderParser parser = new HeaderParser(); - - Commit(StompWireFormat format) { - this.format = format; - } - - public CommandEnvelope build(String commandLine, DataInput in) throws IOException { - Properties headers = parser.parse(in); - while (in.readByte() != 0) { - } - - String user_tx_id = headers.getProperty(Stomp.Headers.TRANSACTION); - - if (user_tx_id == null) { - throw new ProtocolException("Must specify the transaction you are committing"); - } - - TransactionId tx_id = format.getTransactionId(user_tx_id); - if (tx_id == null) - throw new ProtocolException(user_tx_id + " is an invalid transaction id"); - TransactionInfo tx = new TransactionInfo(); - tx.setConnectionId(format.getConnectionId()); - tx.setTransactionId(tx_id); - tx.setType(TransactionInfo.COMMIT_ONE_PHASE); - format.clearTransactionId(user_tx_id); - return new CommandEnvelope(tx, headers); - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java deleted file mode 100644 index 32c0484cbd..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Properties; - -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.Response; -import org.apache.activemq.command.SessionInfo; -import org.apache.activemq.util.IntrospectionSupport; - -class Connect implements StompCommand { - private HeaderParser headerParser = new HeaderParser(); - private StompWireFormat format; - - Connect(StompWireFormat format) { - this.format = format; - } - - public CommandEnvelope build(String commandLine, DataInput in) throws IOException { - - final Properties headers = headerParser.parse(in); - - - // allow anyone to login for now - String login = headers.getProperty(Stomp.Headers.Connect.LOGIN); - String passcode = headers.getProperty(Stomp.Headers.Connect.PASSCODE); - String clientId = headers.getProperty(Stomp.Headers.Connect.CLIENT_ID); - - final ConnectionInfo connectionInfo = new ConnectionInfo(); - - IntrospectionSupport.setProperties(connectionInfo, headers, "activemq."); - - connectionInfo.setConnectionId(format.getConnectionId()); - if( clientId!=null ) - connectionInfo.setClientId(clientId); - else - connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString()); - connectionInfo.setResponseRequired(true); - connectionInfo.setUserName(login); - connectionInfo.setPassword(passcode); - - while (in.readByte() != 0) { - } - - return new CommandEnvelope(connectionInfo, headers, - new ConnectResponseListener(headers, connectionInfo) ); - } - - class ConnectResponseListener implements ResponseListener{ - - private Properties headers; - private ConnectionInfo connectionInfo; - - public ConnectResponseListener( Properties headers, final ConnectionInfo connectionInfo ){ - this.headers = headers; - this.connectionInfo = connectionInfo; - } - - public boolean onResponse(Response receipt, DataOutput out) throws IOException { - - if (receipt.getCorrelationId() != connectionInfo.getCommandId()) - return false; - - final SessionInfo sessionInfo = new SessionInfo(format.getSessionId()); - sessionInfo.setCommandId(format.generateCommandId()); - sessionInfo.setResponseRequired(false); - - final ProducerInfo producerInfo = new ProducerInfo(format.getProducerId()); - producerInfo.setCommandId(format.generateCommandId()); - producerInfo.setResponseRequired(true); - - format.addResponseListener( new ResponseListener(){ - public boolean onResponse(Response receipt, DataOutput out) throws IOException { - if (receipt.getCorrelationId() != producerInfo.getCommandId() ) - return false; - - format.onFullyConnected(); - - StringBuffer buffer = new StringBuffer(); - buffer.append(Stomp.Responses.CONNECTED); - buffer.append(Stomp.NEWLINE); - buffer.append(Stomp.Headers.Connected.SESSION); - buffer.append(Stomp.Headers.SEPERATOR); - buffer.append(connectionInfo.getClientId()); - if( headers.containsKey(Stomp.Headers.Connect.REQUEST_ID) ){ - buffer.append(Stomp.NEWLINE); - buffer.append(Stomp.Headers.Connected.RESPONSE_ID); - buffer.append(Stomp.Headers.SEPERATOR); - buffer.append(headers.getProperty( Stomp.Headers.Connect.REQUEST_ID )); - } - buffer.append(Stomp.NEWLINE); - buffer.append(Stomp.NEWLINE); - buffer.append(Stomp.NULL); - buffer.append(Stomp.NEWLINE); - out.writeBytes(buffer.toString()); - return true; - } - }); - - format.addToPendingReadCommands(sessionInfo); - format.addToPendingReadCommands(producerInfo); - return true; - } - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/DestinationNamer.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/DestinationNamer.java deleted file mode 100644 index 3b6c10733c..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/DestinationNamer.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activemq.command.ActiveMQDestination; - -import javax.jms.Destination; - -import java.net.ProtocolException; - -class DestinationNamer { - static ActiveMQDestination convert(String name) throws ProtocolException { - if (name == null) { - return null; - } - else if (name.startsWith("/queue/")) { - String q_name = name.substring("/queue/".length(), name.length()); - return ActiveMQDestination.createDestination(q_name, ActiveMQDestination.QUEUE_TYPE); - } - else if (name.startsWith("/topic/")) { - String t_name = name.substring("/topic/".length(), name.length()); - return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TOPIC_TYPE); - } - else { - throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ TTMP destinations " + "must begine with /queue/ or /topic/"); - } - - } - - static String convert(Destination d) { - if (d == null) { - return null; - } - ActiveMQDestination amq_d = (ActiveMQDestination) d; - String p_name = amq_d.getPhysicalName(); - - StringBuffer buffer = new StringBuffer(); - if (amq_d.isQueue()) { - buffer.append("/queue/"); - } - if (amq_d.isTopic()) { - buffer.append("/topic/"); - } - buffer.append(p_name); - - return buffer.toString(); - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Disconnect.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Disconnect.java deleted file mode 100644 index 852f001eca..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Disconnect.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activemq.command.ShutdownInfo; - -import java.io.DataInput; -import java.io.IOException; -import java.util.Properties; - -class Disconnect implements StompCommand { - - public CommandEnvelope build(String line, DataInput in) throws IOException { - while (in.readByte() != 0) { - } - return new CommandEnvelope(new ShutdownInfo(), new Properties()); - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameBuilder.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameBuilder.java deleted file mode 100644 index c82cee52b9..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameBuilder.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activemq.command.ActiveMQMessage; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.Map.Entry; - -class FrameBuilder { - private String command; - private Properties headers = new Properties(); - private byte[] body = new byte[0]; - - public FrameBuilder(String command) { - this.command = command; - } - - public FrameBuilder addHeader(String key, String value) { - if (value != null) { - this.headers.setProperty(key, value); - } - return this; - } - - public FrameBuilder addHeader(String key, long value) { - this.headers.put(key, new Long(value)); - return this; - } - - public FrameBuilder addHeaders(ActiveMQMessage message) throws IOException { - addHeader(Stomp.Headers.Message.DESTINATION, DestinationNamer.convert(message.getDestination())); - addHeader(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID()); - addHeader(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID()); - addHeader(Stomp.Headers.Message.EXPIRATION_TIME, message.getJMSExpiration()); - if (message.getJMSRedelivered()) { - addHeader(Stomp.Headers.Message.REDELIVERED, "true"); - } - addHeader(Stomp.Headers.Message.PRORITY, message.getJMSPriority()); - addHeader(Stomp.Headers.Message.REPLY_TO, DestinationNamer.convert(message.getJMSReplyTo())); - addHeader(Stomp.Headers.Message.TIMESTAMP, message.getJMSTimestamp()); - addHeader(Stomp.Headers.Message.TYPE, message.getJMSType()); - - // now lets add all the message headers - Map properties = message.getProperties(); - if (properties != null) { - headers.putAll(properties); - } - return this; - } - - public FrameBuilder setBody(byte[] body) { - this.body = body; - return this; - } - - public String toString() { - StringBuffer buffer = new StringBuffer(); - buffer.append(command); - buffer.append(Stomp.NEWLINE); - for (Iterator iterator = headers.keySet().iterator(); iterator.hasNext();) { - String key = (String) iterator.next(); - String property = headers.getProperty(key); - if (property != null) { - buffer.append(key); - buffer.append(Stomp.Headers.SEPERATOR); - buffer.append(property); - buffer.append(Stomp.NEWLINE); - } - } - buffer.append(Stomp.NEWLINE); - buffer.append(body); - buffer.append(Stomp.NULL); - buffer.append(Stomp.NEWLINE); - return buffer.toString(); - } - - byte[] toFrame() { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - try { - bout.write(command.getBytes("UTF-8")); - bout.write(Stomp.NEWLINE.getBytes("UTF-8")); - for (Iterator iterator = headers.entrySet().iterator(); iterator.hasNext();) { - Map.Entry entry = (Entry) iterator.next(); - String key = (String) entry.getKey(); - String property = entry.getValue().toString(); - if (property != null) { - bout.write(key.getBytes("UTF-8")); - bout.write(Stomp.Headers.SEPERATOR.getBytes("UTF-8")); - bout.write(property.getBytes("UTF-8")); - bout.write(Stomp.NEWLINE.getBytes("UTF-8")); - } - } - bout.write(Stomp.NEWLINE.getBytes("UTF-8")); - bout.write(body); - bout.write(Stomp.NULL.getBytes("UTF-8")); - bout.write(Stomp.NEWLINE.getBytes("UTF-8")); - } - catch (IOException e) { - throw new RuntimeException("World is caving in, we just got io error writing to" + "a byte array output stream we instantiated!"); - } - return bout.toByteArray(); - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/HeaderParser.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/HeaderParser.java deleted file mode 100644 index 480cce2488..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/HeaderParser.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import java.io.BufferedReader; -import java.io.DataInput; -import java.io.IOException; -import java.net.ProtocolException; -import java.util.Properties; - -class HeaderParser { - /** - * Reads headers up through the blank line - * - * @param in - * @return - * @throws IOException - */ - Properties parse(BufferedReader in) throws IOException { - Properties props = new Properties(); - while (true) { - String line = in.readLine(); - if (line != null && line.trim().length() > 0) { - int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR); - String name = line.substring(0, seperator_index).trim(); - String value = line.substring(seperator_index + 1, line.length()).trim(); - props.setProperty(name, value); - } - else { - break; - } - } - return props; - } - - Properties parse(DataInput in) throws IOException { - Properties props = new Properties(); - while (true) { - String line = in.readLine(); - if (line != null && line.trim().length() > 0) { - try { - int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR); - String name = line.substring(0, seperator_index).trim(); - String value = line.substring(seperator_index + 1, line.length()).trim(); - props.setProperty(name, value); - } - catch (Exception e) { - throw new ProtocolException("Unable to parser header line [" + line + "]"); - } - } - else { - break; - } - } - return props; - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseListener.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseListener.java deleted file mode 100644 index f8223ddf63..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseListener.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activemq.command.Response; - -import java.io.DataOutput; -import java.io.IOException; - -interface ResponseListener { - /** - * Return true if you handled this, false otherwise - */ - boolean onResponse(Response receipt, DataOutput out) throws IOException; -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Send.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Send.java deleted file mode 100644 index 41d061bfc1..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Send.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.IOException; -import java.net.ProtocolException; -import java.util.Properties; - -import javax.jms.JMSException; - -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.command.TransactionId; - -class Send implements StompCommand { - private final HeaderParser parser = new HeaderParser(); - private final StompWireFormat format; - - Send(StompWireFormat format) { - this.format = format; - } - - public CommandEnvelope build(String commandLine, DataInput in) throws IOException, JMSException { - Properties headers = parser.parse(in); - String destination = (String) headers.remove(Stomp.Headers.Send.DESTINATION); - // now the body - ActiveMQMessage msg; - if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) { - ActiveMQBytesMessage bm = new ActiveMQBytesMessage(); - String content_length = headers.getProperty(Stomp.Headers.CONTENT_LENGTH).trim(); - int length; - try { - length = Integer.parseInt(content_length); - } - catch (NumberFormatException e) { - throw new ProtocolException("Specified content-length is not a valid integer"); - } - byte[] bytes = new byte[length]; - in.readFully(bytes); - byte nil = in.readByte(); - if (nil != 0) - throw new ProtocolException("content-length bytes were read and " + "there was no trailing null byte"); - bm.writeBytes(bytes); - msg = bm; - } - else { - ActiveMQTextMessage text = new ActiveMQTextMessage(); - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - byte b; - while ((b = in.readByte()) != 0) { - bytes.write(b); - } - bytes.close(); - String body = new String(bytes.toByteArray()); - try { - text.setText(body); - } - catch (JMSException e) { - throw new RuntimeException("Something is really wrong, we instantiated this thing!"); - } - msg = text; - } - - msg.setProducerId(format.getProducerId()); - msg.setMessageId(format.createMessageId()); - msg.setJMSTimestamp(System.currentTimeMillis()); - - - ActiveMQDestination d = DestinationNamer.convert(destination); - msg.setDestination(d); - // msg.setJMSClientID(format.getClientId()); - - // the standard JMS headers - msg.setJMSCorrelationID((String) headers.remove(Stomp.Headers.Send.CORRELATION_ID)); - - Object expiration = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME); - if (expiration != null) { - msg.setJMSExpiration(asLong(expiration)); - } - Object priority = headers.remove(Stomp.Headers.Send.PRIORITY); - if (priority != null) { - msg.setJMSPriority(asInt(priority)); - } - Object type = headers.remove(Stomp.Headers.Send.TYPE); - if (type != null) { - msg.setJMSType((String) type); - } - - msg.setJMSReplyTo(DestinationNamer.convert((String) headers.remove(Stomp.Headers.Send.REPLY_TO))); - - Object persistent = headers.remove(Stomp.Headers.Send.PERSISTENT); - if (persistent != null) { - msg.setPersistent(asBool(persistent)); - } - - // No need to carry the content length in the JMS headers. - headers.remove(Stomp.Headers.CONTENT_LENGTH); - - // now the general headers - msg.setProperties(headers); - - if (headers.containsKey(Stomp.Headers.TRANSACTION)) { - TransactionId tx_id = format.getTransactionId(headers.getProperty(Stomp.Headers.TRANSACTION)); - if (tx_id == null) - throw new ProtocolException(headers.getProperty(Stomp.Headers.TRANSACTION) + " is an invalid transaction id"); - msg.setTransactionId(tx_id); - } - - msg.onSend(); - return new CommandEnvelope(msg, headers); - } - - protected boolean asBool(Object value) { - if (value != null) { - return String.valueOf(value).equals("true"); - } - return false; - } - - protected long asLong(Object value) { - if (value instanceof Number) { - Number n = (Number) value; - return n.longValue(); - } - return Long.parseLong(value.toString()); - } - - protected int asInt(Object value) { - if (value instanceof Number) { - Number n = (Number) value; - return n.intValue(); - } - return Integer.parseInt(value.toString()); - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCommand.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCommand.java deleted file mode 100644 index 61d6123cc6..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCommand.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import javax.jms.JMSException; - -import java.io.DataInput; -import java.io.IOException; -import java.util.Properties; - -interface StompCommand -{ - public CommandEnvelope build(String commandLine, DataInput in) throws IOException, JMSException; - - /** - * Returns a command instance which always returns null for a packet - */ - StompCommand NULL_COMMAND = new StompCommand() - { - public CommandEnvelope build(String commandLine, DataInput in) - { - return new CommandEnvelope(null, new Properties()); - } - }; -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java deleted file mode 100644 index c9e4d790da..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activemq.transport.tcp.TcpTransportFactory; - -/** - * A Stomp transport factory - * - * @version $Revision: 1.1.1.1 $ - */ -public class StompTransportFactory extends TcpTransportFactory { - - protected String getDefaultWireFormatType() { - return "stomp"; - } - -} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java deleted file mode 100644 index 24c4ca82a4..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java +++ /dev/null @@ -1,327 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.ProtocolException; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.jms.JMSException; - -import org.apache.activeio.adapter.PacketInputStream; -import org.apache.activeio.command.WireFormat; -import org.apache.activeio.packet.ByteArrayPacket; -import org.apache.activeio.packet.Packet; -import org.apache.activeio.util.ByteArrayOutputStream; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.Command; -import org.apache.activemq.command.CommandTypes; -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.FlushCommand; -import org.apache.activemq.command.LocalTransactionId; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.Response; -import org.apache.activemq.command.SessionId; -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.filter.DestinationMap; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.IdGenerator; -import org.apache.activemq.util.LongSequenceGenerator; - -import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue; -import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; -import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; -import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; - -/** - * Implements the Stomp protocol. - */ -public class StompWireFormat implements WireFormat { - - private static final IdGenerator connectionIdGenerator = new IdGenerator(); - private static int transactionIdCounter; - - private int version = 1; - private final CommandParser commandParser = new CommandParser(this); - private final HeaderParser headerParser = new HeaderParser(); - - private final BlockingQueue pendingReadCommands = new LinkedBlockingQueue(); - private final BlockingQueue pendingWriteFrames = new LinkedBlockingQueue(); - private final List receiptListeners = new CopyOnWriteArrayList(); - private final Map subscriptionsByConsumerId = new ConcurrentHashMap(); - private final Map subscriptionsByName = new ConcurrentHashMap(); - private final DestinationMap subscriptionsByDestination = new DestinationMap(); - private final Map transactions = new ConcurrentHashMap(); - private final Map dispachedMap = new ConcurrentHashMap(); - private short lastCommandId; - - private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId()); - private final SessionId sessionId = new SessionId(connectionId, -1); - private final ProducerId producerId = new ProducerId(sessionId, 1); - - private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); - private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); - - void addResponseListener(ResponseListener listener) { - receiptListeners.add(listener); - } - - boolean connected = false; - - public Command readCommand(DataInput in) throws IOException, JMSException { - Command pending = (Command) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn() { - public Object cycle() throws InterruptedException { - return pendingReadCommands.poll(0, TimeUnit.MILLISECONDS); - } - }); - - if (pending != null) { - return pending; - } - - try { - Command command = commandParser.parse(in); - addToPendingReadCommands(command); - - command = (Command) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn() { - public Object cycle() throws InterruptedException { - return pendingReadCommands.poll(0, TimeUnit.MILLISECONDS); - } - }); - - if( !connected ) { - if( command.getDataStructureType() != ConnectionInfo.DATA_STRUCTURE_TYPE ) - throw new IOException("Not yet connected."); - } - return command; - - } - catch (ProtocolException e) { - sendError(e.getMessage()); - return FlushCommand.COMMAND; - } - } - - public Command writeCommand(final Command packet, final DataOutput out) throws IOException, JMSException { - flushPendingFrames(out); - - // It may have just been a flush request. - if (packet == null) - return null; - - if (packet.getDataStructureType() == CommandTypes.RESPONSE) { - assert (packet instanceof Response); - Response receipt = (Response) packet; - for (int i = 0; i < receiptListeners.size(); i++) { - ResponseListener listener = (ResponseListener) receiptListeners.get(i); - if (listener.onResponse(receipt, out)) { - receiptListeners.remove(listener); - return null; - } - } - } - if( packet.isMessageDispatch() ) { - MessageDispatch md = (MessageDispatch)packet; - Message message = md.getMessage(); - Subscription sub = (Subscription) subscriptionsByConsumerId.get(md.getConsumerId()); - if (sub != null) - sub.receive(md, out); - } - return null; - } - - private void flushPendingFrames(final DataOutput out) throws IOException { - boolean interrupted = false; - do { - try { - byte[] frame = (byte[]) pendingWriteFrames.poll(0, TimeUnit.MILLISECONDS); - if (frame == null) - return; - out.write(frame); - } - catch (InterruptedException e) { - interrupted = true; - } - } - while (interrupted); - } - - private void sendError(final String message) { - // System.err.println("sending error [" + message + "]"); - AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() { - public void cycle() throws InterruptedException { - pendingWriteFrames.put(new FrameBuilder(Stomp.Responses.ERROR).addHeader(Stomp.Headers.Error.MESSAGE, message).toFrame()); - } - }); - } - - public void onFullyConnected() { - connected=true; - } - - public void addToPendingReadCommands(final Command info) { - AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() { - public void cycle() throws InterruptedException { - pendingReadCommands.put(info); - } - }); - } - - - void clearTransactionId(String user_tx_id) { - this.transactions.remove(user_tx_id); - } - - public SessionId getSessionId() { - return sessionId; - } - - public ProducerId getProducerId() { - return producerId; - } - - - public Subscription getSubcription(ConsumerId consumerId) { - return (Subscription) subscriptionsByConsumerId.get(consumerId); - } - public Set getSubcriptions(ActiveMQDestination destination) { - return subscriptionsByDestination.get(destination); - } - public Subscription getSubcription(String name) { - return (Subscription) subscriptionsByName.get(name); - } - - public void addSubscription(Subscription s) { - if (s.getSubscriptionId()!=null && subscriptionsByName.containsKey(s.getSubscriptionId())) { - Subscription old = (Subscription) subscriptionsByName.get(s.getSubscriptionId()); - removeSubscription(old); - enqueueCommand(old.close()); - } - if( s.getSubscriptionId()!=null ) - subscriptionsByName.put(s.getSubscriptionId(), s); - subscriptionsByConsumerId.put(s.getConsumerInfo().getConsumerId(), s); - subscriptionsByDestination.put(s.getConsumerInfo().getDestination(), s); - } - - public void removeSubscription(Subscription s) { - if( s.getSubscriptionId()!=null ) - subscriptionsByName.remove(s.getSubscriptionId()); - subscriptionsByConsumerId.remove(s.getConsumerInfo().getConsumerId()); - subscriptionsByDestination.remove(s.getConsumerInfo().getDestination(), s); - } - - public void enqueueCommand(final Command ack) { - AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() { - public void cycle() throws InterruptedException { - pendingReadCommands.put(ack); - } - }); - } - - public TransactionId getTransactionId(String key) { - return (TransactionId) transactions.get(key); - } - - public TransactionId registerTransactionId(String user_tx_id, int tx_id) { - LocalTransactionId transactionId = new LocalTransactionId(getConnectionId(), tx_id); - transactions.put(user_tx_id, transactionId); - return transactionId; - } - - public int getVersion() { - return version; - } - - public void setVersion(int version) { - this.version = version; - } - - public ConnectionId getConnectionId() { - return connectionId; - } - - public static synchronized int generateTransactionId() { - return ++transactionIdCounter; - } - - public ConsumerId createConsumerId() { - return new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); - } - - public MessageId createMessageId() { - return new MessageId(producerId, messageIdGenerator.getNextSequenceId()); - } - - synchronized public short generateCommandId() { - return lastCommandId++; - } - - public SessionId generateSessionId() { - throw new RuntimeException("TODO!!"); - } - - public Packet marshal(Object command) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos); - marshal(command, dos); - dos.close(); - return new ByteArrayPacket(baos.toByteSequence()); - } - - public Object unmarshal(Packet packet) throws IOException { - PacketInputStream stream = new PacketInputStream(packet); - DataInputStream dis = new DataInputStream(stream); - return unmarshal(dis); - } - - public void marshal(Object command, DataOutputStream os) throws IOException { - try { - writeCommand((Command) command, os); - } catch (IOException e) { - throw e; - } catch (JMSException e) { - throw IOExceptionSupport.create(e); - } - } - - public Object unmarshal(DataInputStream is) throws IOException { - try { - return readCommand(is); - } catch (IOException e) { - throw e; - } catch (JMSException e) { - throw IOExceptionSupport.create(e); - } - } - - public Map getDispachedMap() { - return dispachedMap; - } - -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java deleted file mode 100644 index 15e1462367..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activeio.command.WireFormat; -import org.apache.activeio.command.WireFormatFactory; - -/** - * Creates WireFormat objects that implement the Stomp protocol. - */ -public class StompWireFormatFactory implements WireFormatFactory { - public WireFormat createWireFormat() { - return new StompWireFormat(); - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java deleted file mode 100644 index e14ccf71c8..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.util.IntrospectionSupport; - -import java.io.DataInput; -import java.io.IOException; -import java.util.Properties; - -class Subscribe implements StompCommand { - private HeaderParser headerParser = new HeaderParser(); - private StompWireFormat format; - - Subscribe(StompWireFormat format) { - this.format = format; - } - - public CommandEnvelope build(String commandLine, DataInput in) throws IOException { - Properties headers = headerParser.parse(in); - - String subscriptionId = headers.getProperty(Stomp.Headers.Subscribe.ID); - String destination = headers.getProperty(Stomp.Headers.Subscribe.DESTINATION); - - ActiveMQDestination actual_dest = DestinationNamer.convert(destination); - ConsumerInfo ci = new ConsumerInfo(format.createConsumerId()); - ci.setPrefetchSize(1000); - ci.setDispatchAsync(true); - - String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR); - ci.setSelector(selector); - - IntrospectionSupport.setProperties(ci, headers, "activemq."); - - ci.setDestination(DestinationNamer.convert(destination)); - - while (in.readByte() != 0) { - } - - Subscription s = new Subscription(format, subscriptionId, ci); - s.setDestination(actual_dest); - String ack_mode_key = headers.getProperty(Stomp.Headers.Subscribe.ACK_MODE); - if (ack_mode_key != null && ack_mode_key.equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT)) { - s.setAckMode(Subscription.CLIENT_ACK); - } - - format.addSubscription(s); - return new CommandEnvelope(ci, headers); - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java deleted file mode 100644 index fdbfc6919b..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import java.io.DataOutput; -import java.io.IOException; -import java.util.Iterator; -import java.util.LinkedList; - -import javax.jms.JMSException; - -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.RemoveInfo; - -public class Subscription { - - public static final int AUTO_ACK = 1; - public static final int CLIENT_ACK = 2; - - public static final String NO_ID = "~~ NO SUCH THING ~~%%@#!Q"; - - private ActiveMQDestination destination; - private int ackMode = AUTO_ACK; - private StompWireFormat format; - - private final String subscriptionId; - private final ConsumerInfo consumerInfo; - private final LinkedList dispatchedMessages = new LinkedList(); - - public Subscription(StompWireFormat format, String subscriptionId, ConsumerInfo consumerInfo) { - this.format = format; - this.subscriptionId = subscriptionId; - this.consumerInfo = consumerInfo; - } - - void setDestination(ActiveMQDestination actual_dest) { - this.destination = actual_dest; - } - - void receive(MessageDispatch md, DataOutput out) throws IOException, JMSException { - - ActiveMQMessage m = (ActiveMQMessage) md.getMessage(); - - if (ackMode == CLIENT_ACK) { - Subscription sub = format.getSubcription(md.getConsumerId()); - sub.addMessageDispatch(md); - format.getDispachedMap().put(m.getJMSMessageID(), sub); - } - else if (ackMode == AUTO_ACK) { - MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); - format.enqueueCommand(ack); - } - else { - throw new JMSException("Unknown ackMode: " + ackMode); - } - - - FrameBuilder builder = new FrameBuilder(Stomp.Responses.MESSAGE); - builder.addHeaders(m); - - if( m.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) { - ActiveMQTextMessage msg = (ActiveMQTextMessage)m.copy(); - builder.setBody(msg.getText().getBytes("UTF-8")); - } else if( m.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE ) { - ActiveMQBytesMessage msg = (ActiveMQBytesMessage)m.copy(); - byte[] data = new byte[(int)msg.getBodyLength()]; - msg.readBytes(data); - builder.addHeader(Stomp.Headers.CONTENT_LENGTH, data.length); - builder.setBody(data); - } - - if (subscriptionId!=null) { - builder.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId); - } - - out.write(builder.toFrame()); - } - - synchronized private void addMessageDispatch(MessageDispatch md) { - dispatchedMessages.addLast(md); - } - - ActiveMQDestination getDestination() { - return destination; - } - - public void setAckMode(int clientAck) { - this.ackMode = clientAck; - } - - public RemoveInfo close() { - return new RemoveInfo(consumerInfo.getConsumerId()); - } - - public ConsumerInfo getConsumerInfo() { - return consumerInfo; - } - - public String getSubscriptionId() { - return subscriptionId; - } - - synchronized public MessageAck createMessageAck(String message_id) { - MessageAck ack = new MessageAck(); - ack.setDestination(consumerInfo.getDestination()); - ack.setAckType(MessageAck.STANDARD_ACK_TYPE); - ack.setConsumerId(consumerInfo.getConsumerId()); - - int count=0; - for (Iterator iter = dispatchedMessages.iterator(); iter.hasNext();) { - - MessageDispatch md = (MessageDispatch) iter.next(); - String id = ((ActiveMQMessage)md.getMessage()).getJMSMessageID(); - if( ack.getFirstMessageId()==null ) - ack.setFirstMessageId(md.getMessage().getMessageId()); - - format.getDispachedMap().remove(id); - iter.remove(); - count++; - if( id.equals(message_id) ) { - ack.setLastMessageId(md.getMessage().getMessageId()); - break; - } - } - ack.setMessageCount(count); - return ack; - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Unsubscribe.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Unsubscribe.java deleted file mode 100644 index c4e01bda09..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Unsubscribe.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activemq.command.ActiveMQDestination; - -import java.io.DataInput; -import java.io.IOException; -import java.net.ProtocolException; -import java.util.Iterator; -import java.util.Properties; -import java.util.Set; - -public class Unsubscribe implements StompCommand { - private static final HeaderParser parser = new HeaderParser(); - private final StompWireFormat format; - - Unsubscribe(StompWireFormat format) { - this.format = format; - } - - public CommandEnvelope build(String commandLine, DataInput in) throws IOException { - Properties headers = parser.parse(in); - while (in.readByte() == 0) { - } - - String subscriptionId = headers.getProperty(Stomp.Headers.Unsubscribe.ID); - String destination = headers.getProperty(Stomp.Headers.Unsubscribe.DESTINATION); - - - if( subscriptionId!=null ) { - Subscription s = format.getSubcription(subscriptionId); - format.removeSubscription(s); - return new CommandEnvelope(s.close(), headers); - } - - ActiveMQDestination d = DestinationNamer.convert(destination); - Set subs = format.getSubcriptions(d); - for (Iterator iter = subs.iterator(); iter.hasNext();) { - Subscription s = (Subscription) iter.next(); - format.removeSubscription(s); - return new CommandEnvelope(s.close(), headers); - } - - throw new ProtocolException("Unexpected UNSUBSCRIBE received."); - - } -} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java index 0618ba887b..6f9737e7b9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java @@ -48,7 +48,6 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; -import org.apache.activemq.transport.stomp.Stomp; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.LongSequenceGenerator; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/Stomp.java similarity index 98% rename from activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java rename to activemq-core/src/main/java/org/apache/activemq/transport/stomp2/Stomp.java index edd774f5a4..ecdc3f5148 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/Stomp.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.stomp; +package org.apache.activemq.transport.stomp2; public interface Stomp { String NULL = "\u0000"; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java index 0a5aa3e493..c08191b31c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java @@ -28,7 +28,6 @@ import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; -import org.apache.activemq.transport.stomp.Stomp; /** * Keeps track of the STOMP susbscription so that acking is correctly done. diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java index 1317dee9a5..f1a1669a85 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java @@ -29,7 +29,6 @@ import org.apache.activeio.packet.ByteArrayPacket; import org.apache.activeio.packet.ByteSequence; import org.apache.activeio.packet.Packet; import org.apache.activeio.util.ByteArrayOutputStream; -import org.apache.activemq.transport.stomp.Stomp; /** * Implements marshalling and unmarsalling the Stomp protocol. diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/package.html similarity index 100% rename from activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html rename to activemq-core/src/main/java/org/apache/activemq/transport/stomp2/package.html diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index 52a01f69d4..f0cf9bbb05 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -22,6 +22,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.transport.stomp2.Stomp; import javax.jms.Connection; import javax.jms.JMSException; diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java deleted file mode 100644 index 26c150c5f9..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.Command; -import org.apache.activemq.command.*; -import org.apache.activemq.transport.stomp.Stomp; -import org.apache.activemq.transport.stomp.StompWireFormat; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import javax.jms.JMSException; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import junit.framework.TestCase; - -public class StompWireFormatTest extends TestCase { - - protected static final Log log = LogFactory.getLog(StompWireFormatTest.class); - - private StompWireFormat wire; - - public void setUp() throws Exception { - wire = new StompWireFormat(); - } - - public void testValidConnectHandshake() throws Exception { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - DataOutputStream dout = new DataOutputStream(bout); - - ConnectionInfo ci = (ConnectionInfo) parseCommand("CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL); - assertNotNull(ci); - assertTrue(ci.isResponseRequired()); - - Response cr = new Response(); - cr.setCorrelationId(ci.getCommandId()); - - String response = writeCommand(cr); - log.info("Received: " + response); - - SessionInfo si = (SessionInfo) wire.readCommand(null); - assertNotNull(si); - assertTrue(!si.isResponseRequired()); - - ProducerInfo pi = (ProducerInfo) wire.readCommand(null); - assertNotNull(pi); - assertTrue(pi.isResponseRequired()); - - Response sr = new Response(); - sr.setCorrelationId(pi.getCommandId()); - response = writeCommand(sr); - log.info("Received: " + response); - assertTrue("Response should start with CONNECTED: " + response, response.startsWith("CONNECTED")); - - // now lets test subscribe - ConsumerInfo consumerInfo = (ConsumerInfo) parseCommand("SUBSCRIBE\n" + "destination: /queue/foo\n" + "ack: client\n" + "activemq.prefetchSize: 1\n" - + "\n" + Stomp.NULL); - assertNotNull(consumerInfo); - // assertTrue(consumerInfo.isResponseRequired()); - assertEquals("prefetch size", 1, consumerInfo.getPrefetchSize()); - - cr = new Response(); - cr.setCorrelationId(consumerInfo.getCommandId()); - response = writeCommand(cr); - log.info("Received: " + response); - } - - public void _testFakeServer() throws Exception { - final BrokerService container = new BrokerService(); - new Thread(new Runnable() { - public void run() { - try { - container.addConnector("stomp://localhost:61613"); - container.start(); - } - catch (Exception e) { - System.err.println("ARGH: caught: " + e); - e.printStackTrace(); - } - } - }).start(); - System.err.println("started container"); - System.err.println("okay, go play"); - - System.err.println(System.in.read()); - } - - protected Command parseCommand(String connect_frame) throws IOException, JMSException { - DataInputStream din = new DataInputStream(new ByteArrayInputStream(connect_frame.getBytes())); - - return wire.readCommand(din); - } - - protected String writeCommand(Command command) throws IOException, JMSException { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - DataOutputStream dout = new DataOutputStream(bout); - wire.writeCommand(command, dout); - return new String(bout.toByteArray()); - } - -}