diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Abort.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Abort.java
deleted file mode 100644
index 6817791199..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Abort.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.TransactionInfo;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.net.ProtocolException;
-import java.util.Properties;
-
-class Abort implements StompCommand {
- private StompWireFormat format;
- private static final HeaderParser parser = new HeaderParser();
-
- Abort(StompWireFormat format) {
- this.format = format;
- }
-
- public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
- Properties headers = parser.parse(in);
- while (in.readByte() != 0) {
- }
- String user_tx_id = headers.getProperty(Stomp.Headers.TRANSACTION);
-
- if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
- throw new ProtocolException("Must specify the transaction you are aborting");
- }
-
- TransactionId txnId = format.getTransactionId(user_tx_id);
- TransactionInfo tx = new TransactionInfo();
- tx.setConnectionId(format.getConnectionId());
- tx.setTransactionId(txnId);
- tx.setType(TransactionInfo.ROLLBACK);
- format.clearTransactionId(user_tx_id);
- return new CommandEnvelope(tx, headers);
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Ack.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Ack.java
deleted file mode 100644
index 2c27bf19c0..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Ack.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.net.ProtocolException;
-import java.util.Properties;
-
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.TransactionId;
-
-class Ack implements StompCommand {
- private final StompWireFormat format;
- private static final HeaderParser parser = new HeaderParser();
-
- Ack(StompWireFormat format) {
- this.format = format;
- }
-
- public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
- Properties headers = parser.parse(in);
- String message_id = headers.getProperty(Stomp.Headers.Ack.MESSAGE_ID);
- if (message_id == null)
- throw new ProtocolException("ACK received without a message-id to acknowledge!");
-
- Subscription sub = (Subscription) format.getDispachedMap().get(message_id);
- if( sub ==null )
- throw new ProtocolException("Unexpected ACK received for message-id [" + message_id + "]");
-
- MessageAck ack = sub.createMessageAck(message_id);
-
- if (headers.containsKey(Stomp.Headers.TRANSACTION)) {
- TransactionId tx_id = format.getTransactionId(headers.getProperty(Stomp.Headers.TRANSACTION));
- if (tx_id == null)
- throw new ProtocolException(headers.getProperty(Stomp.Headers.TRANSACTION) + " is an invalid transaction id");
- ack.setTransactionId(tx_id);
- }
-
- while ((in.readByte()) != 0) {
- }
-
- return new CommandEnvelope(ack, headers);
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/AsyncHelper.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/AsyncHelper.java
deleted file mode 100644
index cbb4218814..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/AsyncHelper.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-class AsyncHelper {
- public static Object tryUntilNotInterrupted(HelperWithReturn helper) {
- while (true) {
- try {
- return helper.cycle();
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- static void tryUntilNotInterrupted(final Helper helper) {
- tryUntilNotInterrupted(new HelperWithReturn() {
-
- public Object cycle() throws InterruptedException {
- helper.cycle();
- return null;
- }
- });
- }
-
- interface HelperWithReturn {
- Object cycle() throws InterruptedException;
- }
-
- interface Helper {
- void cycle() throws InterruptedException;
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Begin.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Begin.java
deleted file mode 100644
index bd08be06d6..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Begin.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.TransactionInfo;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.net.ProtocolException;
-import java.util.Properties;
-
-public class Begin implements StompCommand {
- private StompWireFormat format;
- private static final HeaderParser parser = new HeaderParser();
-
- public Begin(StompWireFormat format) {
- this.format = format;
- }
-
- public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
- Properties headers = parser.parse(in);
- while (in.readByte() != 0) {
- }
-
- TransactionInfo tx = new TransactionInfo();
- String user_tx_id = headers.getProperty(Stomp.Headers.TRANSACTION);
- if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
- throw new ProtocolException("Must specify the transaction you are beginning");
- }
- int tx_id = StompWireFormat.generateTransactionId();
- TransactionId transactionId = format.registerTransactionId(user_tx_id, tx_id);
- tx.setConnectionId(format.getConnectionId());
- tx.setTransactionId(transactionId);
- tx.setType(TransactionInfo.BEGIN);
- return new CommandEnvelope(tx, headers);
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Command.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Command.java
deleted file mode 100644
index 904238ebe6..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Command.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import javax.jms.JMSException;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.Properties;
-
-interface Command {
- public CommandEnvelope build(String commandLine, DataInput in) throws IOException, JMSException;
-
- /**
- * Returns a command instance which always returns null for a packet
- */
- StompCommand NULL_COMMAND = new StompCommand() {
- public CommandEnvelope build(String commandLine, DataInput in) {
- return new CommandEnvelope(null, new Properties());
- }
- };
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/CommandEnvelope.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/CommandEnvelope.java
deleted file mode 100644
index 8770c40568..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/CommandEnvelope.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import org.apache.activemq.command.Command;
-
-import java.util.Properties;
-
-public class CommandEnvelope {
-
- private final Command command;
- private final Properties headers;
- private final ResponseListener responseListener;
-
- public CommandEnvelope(Command command, Properties headers) {
- this(command, headers, null);
- }
-
- public CommandEnvelope(Command command, Properties headers, ResponseListener responseListener) {
- this.command = command;
- this.headers = headers;
- this.responseListener = responseListener;
- }
-
- public Properties getHeaders() {
- return headers;
- }
-
- public Command getCommand() {
- return command;
- }
-
- public ResponseListener getResponseListener() {
- return responseListener;
- }
-
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/CommandParser.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/CommandParser.java
deleted file mode 100644
index f0f56c9341..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/CommandParser.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.Response;
-
-import javax.jms.JMSException;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.net.ProtocolException;
-
-class CommandParser {
- private final StompWireFormat format;
-
- CommandParser(StompWireFormat wireFormat) {
- format = wireFormat;
- }
-
- Command parse(DataInput in) throws IOException, JMSException {
- String line = null;
-
- // skip white space to next real line
- while (true) {
- line = in.readLine();
- if (line == null) {
- throw new IOException("connection was closed");
- }
- else {
- line = line.trim();
- if (line.length() > 0) {
- break;
- }
- }
- }
-
- // figure correct command and return it
- StompCommand command = null;
- if (line.startsWith(Stomp.Commands.CONNECT))
- command = new Connect(format);
- else if (line.startsWith(Stomp.Commands.SUBSCRIBE))
- command = new Subscribe(format);
- else if (line.startsWith(Stomp.Commands.SEND))
- command = new Send(format);
- else if (line.startsWith(Stomp.Commands.DISCONNECT))
- command = new Disconnect();
- else if (line.startsWith(Stomp.Commands.BEGIN))
- command = new Begin(format);
- else if (line.startsWith(Stomp.Commands.COMMIT))
- command = new Commit(format);
- else if (line.startsWith(Stomp.Commands.ABORT))
- command = new Abort(format);
- else if (line.startsWith(Stomp.Commands.UNSUBSCRIBE))
- command = new Unsubscribe(format);
- else if (line.startsWith(Stomp.Commands.ACK))
- command = new Ack(format);
-
- if (command == null) {
- while (in.readByte() == 0) {
- }
- throw new ProtocolException("Unknown command [" + line + "]");
- }
-
- final CommandEnvelope envelope = command.build(line, in);
- final short commandId = format.generateCommandId();
- final String client_packet_key = envelope.getHeaders().getProperty(Stomp.Headers.RECEIPT_REQUESTED);
- final boolean receiptRequested = client_packet_key!=null;
-
- envelope.getCommand().setCommandId(commandId);
- if (receiptRequested || envelope.getResponseListener()!=null ) {
- envelope.getCommand().setResponseRequired(true);
- if( envelope.getResponseListener()!=null ) {
- format.addResponseListener(envelope.getResponseListener());
- } else {
- format.addResponseListener(new ResponseListener() {
- public boolean onResponse(Response receipt, DataOutput out) throws IOException {
- if (receipt.getCorrelationId() != commandId)
- return false;
- out.write(new FrameBuilder(Stomp.Responses.RECEIPT).addHeader(Stomp.Headers.Response.RECEIPT_ID, client_packet_key).toFrame());
- return true;
- }
- });
- }
- }
-
- return envelope.getCommand();
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Commit.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Commit.java
deleted file mode 100644
index aae23e3858..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Commit.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.TransactionInfo;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.net.ProtocolException;
-import java.util.Properties;
-
-class Commit implements StompCommand {
- private StompWireFormat format;
- private static final HeaderParser parser = new HeaderParser();
-
- Commit(StompWireFormat format) {
- this.format = format;
- }
-
- public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
- Properties headers = parser.parse(in);
- while (in.readByte() != 0) {
- }
-
- String user_tx_id = headers.getProperty(Stomp.Headers.TRANSACTION);
-
- if (user_tx_id == null) {
- throw new ProtocolException("Must specify the transaction you are committing");
- }
-
- TransactionId tx_id = format.getTransactionId(user_tx_id);
- if (tx_id == null)
- throw new ProtocolException(user_tx_id + " is an invalid transaction id");
- TransactionInfo tx = new TransactionInfo();
- tx.setConnectionId(format.getConnectionId());
- tx.setTransactionId(tx_id);
- tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
- format.clearTransactionId(user_tx_id);
- return new CommandEnvelope(tx, headers);
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java
deleted file mode 100644
index 32c0484cbd..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Connect.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.util.IntrospectionSupport;
-
-class Connect implements StompCommand {
- private HeaderParser headerParser = new HeaderParser();
- private StompWireFormat format;
-
- Connect(StompWireFormat format) {
- this.format = format;
- }
-
- public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
-
- final Properties headers = headerParser.parse(in);
-
-
- // allow anyone to login for now
- String login = headers.getProperty(Stomp.Headers.Connect.LOGIN);
- String passcode = headers.getProperty(Stomp.Headers.Connect.PASSCODE);
- String clientId = headers.getProperty(Stomp.Headers.Connect.CLIENT_ID);
-
- final ConnectionInfo connectionInfo = new ConnectionInfo();
-
- IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
-
- connectionInfo.setConnectionId(format.getConnectionId());
- if( clientId!=null )
- connectionInfo.setClientId(clientId);
- else
- connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString());
- connectionInfo.setResponseRequired(true);
- connectionInfo.setUserName(login);
- connectionInfo.setPassword(passcode);
-
- while (in.readByte() != 0) {
- }
-
- return new CommandEnvelope(connectionInfo, headers,
- new ConnectResponseListener(headers, connectionInfo) );
- }
-
- class ConnectResponseListener implements ResponseListener{
-
- private Properties headers;
- private ConnectionInfo connectionInfo;
-
- public ConnectResponseListener( Properties headers, final ConnectionInfo connectionInfo ){
- this.headers = headers;
- this.connectionInfo = connectionInfo;
- }
-
- public boolean onResponse(Response receipt, DataOutput out) throws IOException {
-
- if (receipt.getCorrelationId() != connectionInfo.getCommandId())
- return false;
-
- final SessionInfo sessionInfo = new SessionInfo(format.getSessionId());
- sessionInfo.setCommandId(format.generateCommandId());
- sessionInfo.setResponseRequired(false);
-
- final ProducerInfo producerInfo = new ProducerInfo(format.getProducerId());
- producerInfo.setCommandId(format.generateCommandId());
- producerInfo.setResponseRequired(true);
-
- format.addResponseListener( new ResponseListener(){
- public boolean onResponse(Response receipt, DataOutput out) throws IOException {
- if (receipt.getCorrelationId() != producerInfo.getCommandId() )
- return false;
-
- format.onFullyConnected();
-
- StringBuffer buffer = new StringBuffer();
- buffer.append(Stomp.Responses.CONNECTED);
- buffer.append(Stomp.NEWLINE);
- buffer.append(Stomp.Headers.Connected.SESSION);
- buffer.append(Stomp.Headers.SEPERATOR);
- buffer.append(connectionInfo.getClientId());
- if( headers.containsKey(Stomp.Headers.Connect.REQUEST_ID) ){
- buffer.append(Stomp.NEWLINE);
- buffer.append(Stomp.Headers.Connected.RESPONSE_ID);
- buffer.append(Stomp.Headers.SEPERATOR);
- buffer.append(headers.getProperty( Stomp.Headers.Connect.REQUEST_ID ));
- }
- buffer.append(Stomp.NEWLINE);
- buffer.append(Stomp.NEWLINE);
- buffer.append(Stomp.NULL);
- buffer.append(Stomp.NEWLINE);
- out.writeBytes(buffer.toString());
- return true;
- }
- });
-
- format.addToPendingReadCommands(sessionInfo);
- format.addToPendingReadCommands(producerInfo);
- return true;
- }
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/DestinationNamer.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/DestinationNamer.java
deleted file mode 100644
index 3b6c10733c..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/DestinationNamer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import org.apache.activemq.command.ActiveMQDestination;
-
-import javax.jms.Destination;
-
-import java.net.ProtocolException;
-
-class DestinationNamer {
- static ActiveMQDestination convert(String name) throws ProtocolException {
- if (name == null) {
- return null;
- }
- else if (name.startsWith("/queue/")) {
- String q_name = name.substring("/queue/".length(), name.length());
- return ActiveMQDestination.createDestination(q_name, ActiveMQDestination.QUEUE_TYPE);
- }
- else if (name.startsWith("/topic/")) {
- String t_name = name.substring("/topic/".length(), name.length());
- return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TOPIC_TYPE);
- }
- else {
- throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ TTMP destinations " + "must begine with /queue/ or /topic/");
- }
-
- }
-
- static String convert(Destination d) {
- if (d == null) {
- return null;
- }
- ActiveMQDestination amq_d = (ActiveMQDestination) d;
- String p_name = amq_d.getPhysicalName();
-
- StringBuffer buffer = new StringBuffer();
- if (amq_d.isQueue()) {
- buffer.append("/queue/");
- }
- if (amq_d.isTopic()) {
- buffer.append("/topic/");
- }
- buffer.append(p_name);
-
- return buffer.toString();
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Disconnect.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Disconnect.java
deleted file mode 100644
index 852f001eca..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Disconnect.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import org.apache.activemq.command.ShutdownInfo;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.Properties;
-
-class Disconnect implements StompCommand {
-
- public CommandEnvelope build(String line, DataInput in) throws IOException {
- while (in.readByte() != 0) {
- }
- return new CommandEnvelope(new ShutdownInfo(), new Properties());
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameBuilder.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameBuilder.java
deleted file mode 100644
index c82cee52b9..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameBuilder.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import org.apache.activemq.command.ActiveMQMessage;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Map.Entry;
-
-class FrameBuilder {
- private String command;
- private Properties headers = new Properties();
- private byte[] body = new byte[0];
-
- public FrameBuilder(String command) {
- this.command = command;
- }
-
- public FrameBuilder addHeader(String key, String value) {
- if (value != null) {
- this.headers.setProperty(key, value);
- }
- return this;
- }
-
- public FrameBuilder addHeader(String key, long value) {
- this.headers.put(key, new Long(value));
- return this;
- }
-
- public FrameBuilder addHeaders(ActiveMQMessage message) throws IOException {
- addHeader(Stomp.Headers.Message.DESTINATION, DestinationNamer.convert(message.getDestination()));
- addHeader(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
- addHeader(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID());
- addHeader(Stomp.Headers.Message.EXPIRATION_TIME, message.getJMSExpiration());
- if (message.getJMSRedelivered()) {
- addHeader(Stomp.Headers.Message.REDELIVERED, "true");
- }
- addHeader(Stomp.Headers.Message.PRORITY, message.getJMSPriority());
- addHeader(Stomp.Headers.Message.REPLY_TO, DestinationNamer.convert(message.getJMSReplyTo()));
- addHeader(Stomp.Headers.Message.TIMESTAMP, message.getJMSTimestamp());
- addHeader(Stomp.Headers.Message.TYPE, message.getJMSType());
-
- // now lets add all the message headers
- Map properties = message.getProperties();
- if (properties != null) {
- headers.putAll(properties);
- }
- return this;
- }
-
- public FrameBuilder setBody(byte[] body) {
- this.body = body;
- return this;
- }
-
- public String toString() {
- StringBuffer buffer = new StringBuffer();
- buffer.append(command);
- buffer.append(Stomp.NEWLINE);
- for (Iterator iterator = headers.keySet().iterator(); iterator.hasNext();) {
- String key = (String) iterator.next();
- String property = headers.getProperty(key);
- if (property != null) {
- buffer.append(key);
- buffer.append(Stomp.Headers.SEPERATOR);
- buffer.append(property);
- buffer.append(Stomp.NEWLINE);
- }
- }
- buffer.append(Stomp.NEWLINE);
- buffer.append(body);
- buffer.append(Stomp.NULL);
- buffer.append(Stomp.NEWLINE);
- return buffer.toString();
- }
-
- byte[] toFrame() {
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- try {
- bout.write(command.getBytes("UTF-8"));
- bout.write(Stomp.NEWLINE.getBytes("UTF-8"));
- for (Iterator iterator = headers.entrySet().iterator(); iterator.hasNext();) {
- Map.Entry entry = (Entry) iterator.next();
- String key = (String) entry.getKey();
- String property = entry.getValue().toString();
- if (property != null) {
- bout.write(key.getBytes("UTF-8"));
- bout.write(Stomp.Headers.SEPERATOR.getBytes("UTF-8"));
- bout.write(property.getBytes("UTF-8"));
- bout.write(Stomp.NEWLINE.getBytes("UTF-8"));
- }
- }
- bout.write(Stomp.NEWLINE.getBytes("UTF-8"));
- bout.write(body);
- bout.write(Stomp.NULL.getBytes("UTF-8"));
- bout.write(Stomp.NEWLINE.getBytes("UTF-8"));
- }
- catch (IOException e) {
- throw new RuntimeException("World is caving in, we just got io error writing to" + "a byte array output stream we instantiated!");
- }
- return bout.toByteArray();
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/HeaderParser.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/HeaderParser.java
deleted file mode 100644
index 480cce2488..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/HeaderParser.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import java.io.BufferedReader;
-import java.io.DataInput;
-import java.io.IOException;
-import java.net.ProtocolException;
-import java.util.Properties;
-
-class HeaderParser {
- /**
- * Reads headers up through the blank line
- *
- * @param in
- * @return
- * @throws IOException
- */
- Properties parse(BufferedReader in) throws IOException {
- Properties props = new Properties();
- while (true) {
- String line = in.readLine();
- if (line != null && line.trim().length() > 0) {
- int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
- String name = line.substring(0, seperator_index).trim();
- String value = line.substring(seperator_index + 1, line.length()).trim();
- props.setProperty(name, value);
- }
- else {
- break;
- }
- }
- return props;
- }
-
- Properties parse(DataInput in) throws IOException {
- Properties props = new Properties();
- while (true) {
- String line = in.readLine();
- if (line != null && line.trim().length() > 0) {
- try {
- int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
- String name = line.substring(0, seperator_index).trim();
- String value = line.substring(seperator_index + 1, line.length()).trim();
- props.setProperty(name, value);
- }
- catch (Exception e) {
- throw new ProtocolException("Unable to parser header line [" + line + "]");
- }
- }
- else {
- break;
- }
- }
- return props;
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseListener.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseListener.java
deleted file mode 100644
index f8223ddf63..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseListener.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import org.apache.activemq.command.Response;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-interface ResponseListener {
- /**
- * Return true if you handled this, false otherwise
- */
- boolean onResponse(Response receipt, DataOutput out) throws IOException;
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Send.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Send.java
deleted file mode 100644
index 41d061bfc1..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Send.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.IOException;
-import java.net.ProtocolException;
-import java.util.Properties;
-
-import javax.jms.JMSException;
-
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.TransactionId;
-
-class Send implements StompCommand {
- private final HeaderParser parser = new HeaderParser();
- private final StompWireFormat format;
-
- Send(StompWireFormat format) {
- this.format = format;
- }
-
- public CommandEnvelope build(String commandLine, DataInput in) throws IOException, JMSException {
- Properties headers = parser.parse(in);
- String destination = (String) headers.remove(Stomp.Headers.Send.DESTINATION);
- // now the body
- ActiveMQMessage msg;
- if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
- ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
- String content_length = headers.getProperty(Stomp.Headers.CONTENT_LENGTH).trim();
- int length;
- try {
- length = Integer.parseInt(content_length);
- }
- catch (NumberFormatException e) {
- throw new ProtocolException("Specified content-length is not a valid integer");
- }
- byte[] bytes = new byte[length];
- in.readFully(bytes);
- byte nil = in.readByte();
- if (nil != 0)
- throw new ProtocolException("content-length bytes were read and " + "there was no trailing null byte");
- bm.writeBytes(bytes);
- msg = bm;
- }
- else {
- ActiveMQTextMessage text = new ActiveMQTextMessage();
- ByteArrayOutputStream bytes = new ByteArrayOutputStream();
- byte b;
- while ((b = in.readByte()) != 0) {
- bytes.write(b);
- }
- bytes.close();
- String body = new String(bytes.toByteArray());
- try {
- text.setText(body);
- }
- catch (JMSException e) {
- throw new RuntimeException("Something is really wrong, we instantiated this thing!");
- }
- msg = text;
- }
-
- msg.setProducerId(format.getProducerId());
- msg.setMessageId(format.createMessageId());
- msg.setJMSTimestamp(System.currentTimeMillis());
-
-
- ActiveMQDestination d = DestinationNamer.convert(destination);
- msg.setDestination(d);
- // msg.setJMSClientID(format.getClientId());
-
- // the standard JMS headers
- msg.setJMSCorrelationID((String) headers.remove(Stomp.Headers.Send.CORRELATION_ID));
-
- Object expiration = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
- if (expiration != null) {
- msg.setJMSExpiration(asLong(expiration));
- }
- Object priority = headers.remove(Stomp.Headers.Send.PRIORITY);
- if (priority != null) {
- msg.setJMSPriority(asInt(priority));
- }
- Object type = headers.remove(Stomp.Headers.Send.TYPE);
- if (type != null) {
- msg.setJMSType((String) type);
- }
-
- msg.setJMSReplyTo(DestinationNamer.convert((String) headers.remove(Stomp.Headers.Send.REPLY_TO)));
-
- Object persistent = headers.remove(Stomp.Headers.Send.PERSISTENT);
- if (persistent != null) {
- msg.setPersistent(asBool(persistent));
- }
-
- // No need to carry the content length in the JMS headers.
- headers.remove(Stomp.Headers.CONTENT_LENGTH);
-
- // now the general headers
- msg.setProperties(headers);
-
- if (headers.containsKey(Stomp.Headers.TRANSACTION)) {
- TransactionId tx_id = format.getTransactionId(headers.getProperty(Stomp.Headers.TRANSACTION));
- if (tx_id == null)
- throw new ProtocolException(headers.getProperty(Stomp.Headers.TRANSACTION) + " is an invalid transaction id");
- msg.setTransactionId(tx_id);
- }
-
- msg.onSend();
- return new CommandEnvelope(msg, headers);
- }
-
- protected boolean asBool(Object value) {
- if (value != null) {
- return String.valueOf(value).equals("true");
- }
- return false;
- }
-
- protected long asLong(Object value) {
- if (value instanceof Number) {
- Number n = (Number) value;
- return n.longValue();
- }
- return Long.parseLong(value.toString());
- }
-
- protected int asInt(Object value) {
- if (value instanceof Number) {
- Number n = (Number) value;
- return n.intValue();
- }
- return Integer.parseInt(value.toString());
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCommand.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCommand.java
deleted file mode 100644
index 61d6123cc6..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCommand.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import javax.jms.JMSException;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.Properties;
-
-interface StompCommand
-{
- public CommandEnvelope build(String commandLine, DataInput in) throws IOException, JMSException;
-
- /**
- * Returns a command instance which always returns null for a packet
- */
- StompCommand NULL_COMMAND = new StompCommand()
- {
- public CommandEnvelope build(String commandLine, DataInput in)
- {
- return new CommandEnvelope(null, new Properties());
- }
- };
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
deleted file mode 100644
index c9e4d790da..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import org.apache.activemq.transport.tcp.TcpTransportFactory;
-
-/**
- * A Stomp transport factory
- *
- * @version $Revision: 1.1.1.1 $
- */
-public class StompTransportFactory extends TcpTransportFactory {
-
- protected String getDefaultWireFormatType() {
- return "stomp";
- }
-
-}
\ No newline at end of file
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
deleted file mode 100644
index 24c4ca82a4..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.ProtocolException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.jms.JMSException;
-
-import org.apache.activeio.adapter.PacketInputStream;
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.util.ByteArrayOutputStream;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.FlushCommand;
-import org.apache.activemq.command.LocalTransactionId;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.filter.DestinationMap;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IdGenerator;
-import org.apache.activemq.util.LongSequenceGenerator;
-
-import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
-import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-
-/**
- * Implements the Stomp protocol.
- */
-public class StompWireFormat implements WireFormat {
-
- private static final IdGenerator connectionIdGenerator = new IdGenerator();
- private static int transactionIdCounter;
-
- private int version = 1;
- private final CommandParser commandParser = new CommandParser(this);
- private final HeaderParser headerParser = new HeaderParser();
-
- private final BlockingQueue pendingReadCommands = new LinkedBlockingQueue();
- private final BlockingQueue pendingWriteFrames = new LinkedBlockingQueue();
- private final List receiptListeners = new CopyOnWriteArrayList();
- private final Map subscriptionsByConsumerId = new ConcurrentHashMap();
- private final Map subscriptionsByName = new ConcurrentHashMap();
- private final DestinationMap subscriptionsByDestination = new DestinationMap();
- private final Map transactions = new ConcurrentHashMap();
- private final Map dispachedMap = new ConcurrentHashMap();
- private short lastCommandId;
-
- private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
- private final SessionId sessionId = new SessionId(connectionId, -1);
- private final ProducerId producerId = new ProducerId(sessionId, 1);
-
- private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
- private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
-
- void addResponseListener(ResponseListener listener) {
- receiptListeners.add(listener);
- }
-
- boolean connected = false;
-
- public Command readCommand(DataInput in) throws IOException, JMSException {
- Command pending = (Command) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn() {
- public Object cycle() throws InterruptedException {
- return pendingReadCommands.poll(0, TimeUnit.MILLISECONDS);
- }
- });
-
- if (pending != null) {
- return pending;
- }
-
- try {
- Command command = commandParser.parse(in);
- addToPendingReadCommands(command);
-
- command = (Command) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn() {
- public Object cycle() throws InterruptedException {
- return pendingReadCommands.poll(0, TimeUnit.MILLISECONDS);
- }
- });
-
- if( !connected ) {
- if( command.getDataStructureType() != ConnectionInfo.DATA_STRUCTURE_TYPE )
- throw new IOException("Not yet connected.");
- }
- return command;
-
- }
- catch (ProtocolException e) {
- sendError(e.getMessage());
- return FlushCommand.COMMAND;
- }
- }
-
- public Command writeCommand(final Command packet, final DataOutput out) throws IOException, JMSException {
- flushPendingFrames(out);
-
- // It may have just been a flush request.
- if (packet == null)
- return null;
-
- if (packet.getDataStructureType() == CommandTypes.RESPONSE) {
- assert (packet instanceof Response);
- Response receipt = (Response) packet;
- for (int i = 0; i < receiptListeners.size(); i++) {
- ResponseListener listener = (ResponseListener) receiptListeners.get(i);
- if (listener.onResponse(receipt, out)) {
- receiptListeners.remove(listener);
- return null;
- }
- }
- }
- if( packet.isMessageDispatch() ) {
- MessageDispatch md = (MessageDispatch)packet;
- Message message = md.getMessage();
- Subscription sub = (Subscription) subscriptionsByConsumerId.get(md.getConsumerId());
- if (sub != null)
- sub.receive(md, out);
- }
- return null;
- }
-
- private void flushPendingFrames(final DataOutput out) throws IOException {
- boolean interrupted = false;
- do {
- try {
- byte[] frame = (byte[]) pendingWriteFrames.poll(0, TimeUnit.MILLISECONDS);
- if (frame == null)
- return;
- out.write(frame);
- }
- catch (InterruptedException e) {
- interrupted = true;
- }
- }
- while (interrupted);
- }
-
- private void sendError(final String message) {
- // System.err.println("sending error [" + message + "]");
- AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() {
- public void cycle() throws InterruptedException {
- pendingWriteFrames.put(new FrameBuilder(Stomp.Responses.ERROR).addHeader(Stomp.Headers.Error.MESSAGE, message).toFrame());
- }
- });
- }
-
- public void onFullyConnected() {
- connected=true;
- }
-
- public void addToPendingReadCommands(final Command info) {
- AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() {
- public void cycle() throws InterruptedException {
- pendingReadCommands.put(info);
- }
- });
- }
-
-
- void clearTransactionId(String user_tx_id) {
- this.transactions.remove(user_tx_id);
- }
-
- public SessionId getSessionId() {
- return sessionId;
- }
-
- public ProducerId getProducerId() {
- return producerId;
- }
-
-
- public Subscription getSubcription(ConsumerId consumerId) {
- return (Subscription) subscriptionsByConsumerId.get(consumerId);
- }
- public Set getSubcriptions(ActiveMQDestination destination) {
- return subscriptionsByDestination.get(destination);
- }
- public Subscription getSubcription(String name) {
- return (Subscription) subscriptionsByName.get(name);
- }
-
- public void addSubscription(Subscription s) {
- if (s.getSubscriptionId()!=null && subscriptionsByName.containsKey(s.getSubscriptionId())) {
- Subscription old = (Subscription) subscriptionsByName.get(s.getSubscriptionId());
- removeSubscription(old);
- enqueueCommand(old.close());
- }
- if( s.getSubscriptionId()!=null )
- subscriptionsByName.put(s.getSubscriptionId(), s);
- subscriptionsByConsumerId.put(s.getConsumerInfo().getConsumerId(), s);
- subscriptionsByDestination.put(s.getConsumerInfo().getDestination(), s);
- }
-
- public void removeSubscription(Subscription s) {
- if( s.getSubscriptionId()!=null )
- subscriptionsByName.remove(s.getSubscriptionId());
- subscriptionsByConsumerId.remove(s.getConsumerInfo().getConsumerId());
- subscriptionsByDestination.remove(s.getConsumerInfo().getDestination(), s);
- }
-
- public void enqueueCommand(final Command ack) {
- AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() {
- public void cycle() throws InterruptedException {
- pendingReadCommands.put(ack);
- }
- });
- }
-
- public TransactionId getTransactionId(String key) {
- return (TransactionId) transactions.get(key);
- }
-
- public TransactionId registerTransactionId(String user_tx_id, int tx_id) {
- LocalTransactionId transactionId = new LocalTransactionId(getConnectionId(), tx_id);
- transactions.put(user_tx_id, transactionId);
- return transactionId;
- }
-
- public int getVersion() {
- return version;
- }
-
- public void setVersion(int version) {
- this.version = version;
- }
-
- public ConnectionId getConnectionId() {
- return connectionId;
- }
-
- public static synchronized int generateTransactionId() {
- return ++transactionIdCounter;
- }
-
- public ConsumerId createConsumerId() {
- return new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
- }
-
- public MessageId createMessageId() {
- return new MessageId(producerId, messageIdGenerator.getNextSequenceId());
- }
-
- synchronized public short generateCommandId() {
- return lastCommandId++;
- }
-
- public SessionId generateSessionId() {
- throw new RuntimeException("TODO!!");
- }
-
- public Packet marshal(Object command) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- marshal(command, dos);
- dos.close();
- return new ByteArrayPacket(baos.toByteSequence());
- }
-
- public Object unmarshal(Packet packet) throws IOException {
- PacketInputStream stream = new PacketInputStream(packet);
- DataInputStream dis = new DataInputStream(stream);
- return unmarshal(dis);
- }
-
- public void marshal(Object command, DataOutputStream os) throws IOException {
- try {
- writeCommand((Command) command, os);
- } catch (IOException e) {
- throw e;
- } catch (JMSException e) {
- throw IOExceptionSupport.create(e);
- }
- }
-
- public Object unmarshal(DataInputStream is) throws IOException {
- try {
- return readCommand(is);
- } catch (IOException e) {
- throw e;
- } catch (JMSException e) {
- throw IOExceptionSupport.create(e);
- }
- }
-
- public Map getDispachedMap() {
- return dispachedMap;
- }
-
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
deleted file mode 100644
index 15e1462367..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import org.apache.activeio.command.WireFormat;
-import org.apache.activeio.command.WireFormatFactory;
-
-/**
- * Creates WireFormat objects that implement the Stomp protocol.
- */
-public class StompWireFormatFactory implements WireFormatFactory {
- public WireFormat createWireFormat() {
- return new StompWireFormat();
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java
deleted file mode 100644
index e14ccf71c8..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscribe.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.util.IntrospectionSupport;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.Properties;
-
-class Subscribe implements StompCommand {
- private HeaderParser headerParser = new HeaderParser();
- private StompWireFormat format;
-
- Subscribe(StompWireFormat format) {
- this.format = format;
- }
-
- public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
- Properties headers = headerParser.parse(in);
-
- String subscriptionId = headers.getProperty(Stomp.Headers.Subscribe.ID);
- String destination = headers.getProperty(Stomp.Headers.Subscribe.DESTINATION);
-
- ActiveMQDestination actual_dest = DestinationNamer.convert(destination);
- ConsumerInfo ci = new ConsumerInfo(format.createConsumerId());
- ci.setPrefetchSize(1000);
- ci.setDispatchAsync(true);
-
- String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR);
- ci.setSelector(selector);
-
- IntrospectionSupport.setProperties(ci, headers, "activemq.");
-
- ci.setDestination(DestinationNamer.convert(destination));
-
- while (in.readByte() != 0) {
- }
-
- Subscription s = new Subscription(format, subscriptionId, ci);
- s.setDestination(actual_dest);
- String ack_mode_key = headers.getProperty(Stomp.Headers.Subscribe.ACK_MODE);
- if (ack_mode_key != null && ack_mode_key.equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT)) {
- s.setAckMode(Subscription.CLIENT_ACK);
- }
-
- format.addSubscription(s);
- return new CommandEnvelope(ci, headers);
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java
deleted file mode 100644
index fdbfc6919b..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import javax.jms.JMSException;
-
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.RemoveInfo;
-
-public class Subscription {
-
- public static final int AUTO_ACK = 1;
- public static final int CLIENT_ACK = 2;
-
- public static final String NO_ID = "~~ NO SUCH THING ~~%%@#!Q";
-
- private ActiveMQDestination destination;
- private int ackMode = AUTO_ACK;
- private StompWireFormat format;
-
- private final String subscriptionId;
- private final ConsumerInfo consumerInfo;
- private final LinkedList dispatchedMessages = new LinkedList();
-
- public Subscription(StompWireFormat format, String subscriptionId, ConsumerInfo consumerInfo) {
- this.format = format;
- this.subscriptionId = subscriptionId;
- this.consumerInfo = consumerInfo;
- }
-
- void setDestination(ActiveMQDestination actual_dest) {
- this.destination = actual_dest;
- }
-
- void receive(MessageDispatch md, DataOutput out) throws IOException, JMSException {
-
- ActiveMQMessage m = (ActiveMQMessage) md.getMessage();
-
- if (ackMode == CLIENT_ACK) {
- Subscription sub = format.getSubcription(md.getConsumerId());
- sub.addMessageDispatch(md);
- format.getDispachedMap().put(m.getJMSMessageID(), sub);
- }
- else if (ackMode == AUTO_ACK) {
- MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
- format.enqueueCommand(ack);
- }
- else {
- throw new JMSException("Unknown ackMode: " + ackMode);
- }
-
-
- FrameBuilder builder = new FrameBuilder(Stomp.Responses.MESSAGE);
- builder.addHeaders(m);
-
- if( m.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) {
- ActiveMQTextMessage msg = (ActiveMQTextMessage)m.copy();
- builder.setBody(msg.getText().getBytes("UTF-8"));
- } else if( m.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE ) {
- ActiveMQBytesMessage msg = (ActiveMQBytesMessage)m.copy();
- byte[] data = new byte[(int)msg.getBodyLength()];
- msg.readBytes(data);
- builder.addHeader(Stomp.Headers.CONTENT_LENGTH, data.length);
- builder.setBody(data);
- }
-
- if (subscriptionId!=null) {
- builder.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
- }
-
- out.write(builder.toFrame());
- }
-
- synchronized private void addMessageDispatch(MessageDispatch md) {
- dispatchedMessages.addLast(md);
- }
-
- ActiveMQDestination getDestination() {
- return destination;
- }
-
- public void setAckMode(int clientAck) {
- this.ackMode = clientAck;
- }
-
- public RemoveInfo close() {
- return new RemoveInfo(consumerInfo.getConsumerId());
- }
-
- public ConsumerInfo getConsumerInfo() {
- return consumerInfo;
- }
-
- public String getSubscriptionId() {
- return subscriptionId;
- }
-
- synchronized public MessageAck createMessageAck(String message_id) {
- MessageAck ack = new MessageAck();
- ack.setDestination(consumerInfo.getDestination());
- ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
- ack.setConsumerId(consumerInfo.getConsumerId());
-
- int count=0;
- for (Iterator iter = dispatchedMessages.iterator(); iter.hasNext();) {
-
- MessageDispatch md = (MessageDispatch) iter.next();
- String id = ((ActiveMQMessage)md.getMessage()).getJMSMessageID();
- if( ack.getFirstMessageId()==null )
- ack.setFirstMessageId(md.getMessage().getMessageId());
-
- format.getDispachedMap().remove(id);
- iter.remove();
- count++;
- if( id.equals(message_id) ) {
- ack.setLastMessageId(md.getMessage().getMessageId());
- break;
- }
- }
- ack.setMessageCount(count);
- return ack;
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Unsubscribe.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Unsubscribe.java
deleted file mode 100644
index c4e01bda09..0000000000
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Unsubscribe.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import org.apache.activemq.command.ActiveMQDestination;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.net.ProtocolException;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.Set;
-
-public class Unsubscribe implements StompCommand {
- private static final HeaderParser parser = new HeaderParser();
- private final StompWireFormat format;
-
- Unsubscribe(StompWireFormat format) {
- this.format = format;
- }
-
- public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
- Properties headers = parser.parse(in);
- while (in.readByte() == 0) {
- }
-
- String subscriptionId = headers.getProperty(Stomp.Headers.Unsubscribe.ID);
- String destination = headers.getProperty(Stomp.Headers.Unsubscribe.DESTINATION);
-
-
- if( subscriptionId!=null ) {
- Subscription s = format.getSubcription(subscriptionId);
- format.removeSubscription(s);
- return new CommandEnvelope(s.close(), headers);
- }
-
- ActiveMQDestination d = DestinationNamer.convert(destination);
- Set subs = format.getSubcriptions(d);
- for (Iterator iter = subs.iterator(); iter.hasNext();) {
- Subscription s = (Subscription) iter.next();
- format.removeSubscription(s);
- return new CommandEnvelope(s.close(), headers);
- }
-
- throw new ProtocolException("Unexpected UNSUBSCRIBE received.");
-
- }
-}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java
index 0618ba887b..6f9737e7b9 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java
@@ -48,7 +48,6 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
-import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/Stomp.java
similarity index 98%
rename from activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
rename to activemq-core/src/main/java/org/apache/activemq/transport/stomp2/Stomp.java
index edd774f5a4..ecdc3f5148 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/Stomp.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.stomp2;
public interface Stomp {
String NULL = "\u0000";
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java
index 0a5aa3e493..c08191b31c 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java
@@ -28,7 +28,6 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.transport.stomp.Stomp;
/**
* Keeps track of the STOMP susbscription so that acking is correctly done.
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java
index 1317dee9a5..f1a1669a85 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java
@@ -29,7 +29,6 @@ import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.ByteSequence;
import org.apache.activeio.packet.Packet;
import org.apache.activeio.util.ByteArrayOutputStream;
-import org.apache.activemq.transport.stomp.Stomp;
/**
* Implements marshalling and unmarsalling the Stomp protocol.
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html b/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/package.html
similarity index 100%
rename from activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html
rename to activemq-core/src/main/java/org/apache/activemq/transport/stomp2/package.html
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
index 52a01f69d4..f0cf9bbb05 100644
--- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
@@ -22,6 +22,7 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.transport.stomp2.Stomp;
import javax.jms.Connection;
import javax.jms.JMSException;
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java
deleted file mode 100644
index 26c150c5f9..0000000000
--- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.stomp;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.*;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompWireFormat;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.jms.JMSException;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-public class StompWireFormatTest extends TestCase {
-
- protected static final Log log = LogFactory.getLog(StompWireFormatTest.class);
-
- private StompWireFormat wire;
-
- public void setUp() throws Exception {
- wire = new StompWireFormat();
- }
-
- public void testValidConnectHandshake() throws Exception {
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- DataOutputStream dout = new DataOutputStream(bout);
-
- ConnectionInfo ci = (ConnectionInfo) parseCommand("CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL);
- assertNotNull(ci);
- assertTrue(ci.isResponseRequired());
-
- Response cr = new Response();
- cr.setCorrelationId(ci.getCommandId());
-
- String response = writeCommand(cr);
- log.info("Received: " + response);
-
- SessionInfo si = (SessionInfo) wire.readCommand(null);
- assertNotNull(si);
- assertTrue(!si.isResponseRequired());
-
- ProducerInfo pi = (ProducerInfo) wire.readCommand(null);
- assertNotNull(pi);
- assertTrue(pi.isResponseRequired());
-
- Response sr = new Response();
- sr.setCorrelationId(pi.getCommandId());
- response = writeCommand(sr);
- log.info("Received: " + response);
- assertTrue("Response should start with CONNECTED: " + response, response.startsWith("CONNECTED"));
-
- // now lets test subscribe
- ConsumerInfo consumerInfo = (ConsumerInfo) parseCommand("SUBSCRIBE\n" + "destination: /queue/foo\n" + "ack: client\n" + "activemq.prefetchSize: 1\n"
- + "\n" + Stomp.NULL);
- assertNotNull(consumerInfo);
- // assertTrue(consumerInfo.isResponseRequired());
- assertEquals("prefetch size", 1, consumerInfo.getPrefetchSize());
-
- cr = new Response();
- cr.setCorrelationId(consumerInfo.getCommandId());
- response = writeCommand(cr);
- log.info("Received: " + response);
- }
-
- public void _testFakeServer() throws Exception {
- final BrokerService container = new BrokerService();
- new Thread(new Runnable() {
- public void run() {
- try {
- container.addConnector("stomp://localhost:61613");
- container.start();
- }
- catch (Exception e) {
- System.err.println("ARGH: caught: " + e);
- e.printStackTrace();
- }
- }
- }).start();
- System.err.println("started container");
- System.err.println("okay, go play");
-
- System.err.println(System.in.read());
- }
-
- protected Command parseCommand(String connect_frame) throws IOException, JMSException {
- DataInputStream din = new DataInputStream(new ByteArrayInputStream(connect_frame.getBytes()));
-
- return wire.readCommand(din);
- }
-
- protected String writeCommand(Command command) throws IOException, JMSException {
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- DataOutputStream dout = new DataOutputStream(bout);
- wire.writeCommand(command, dout);
- return new String(bout.toByteArray());
- }
-
-}