diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java index 35cc78f26c..c9e4d790da 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java @@ -16,32 +16,17 @@ */ package org.apache.activemq.transport.stomp; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; - -import org.apache.activemq.transport.TransportFactory; -import org.apache.activemq.transport.TransportServer; -import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.transport.tcp.TcpTransportFactory; /** * A Stomp transport factory * * @version $Revision: 1.1.1.1 $ */ -public class StompTransportFactory extends TransportFactory { +public class StompTransportFactory extends TcpTransportFactory { - public TransportServer doBind(String brokerId, URI location) throws IOException { - try { - URI tcpURI = new URI( - "tcp://"+location.getHost()+ - (location.getPort()>=0 ? ":"+location.getPort() : "")+ - "?wireFormat=stomp" - ); - return TransportFactory.bind(brokerId, tcpURI); - } catch (URISyntaxException e) { - throw IOExceptionSupport.create(e); - } + protected String getDefaultWireFormatType() { + return "stomp"; } } \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java new file mode 100644 index 0000000000..f07dd97f9e --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java @@ -0,0 +1,229 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * + * @version $Revision$ + */ +public class StompSubscriptionRemoveTest extends TestCase { + private static final Log log = LogFactory.getLog(StompSubscriptionRemoveTest.class); + + private Socket stompSocket; + private ByteArrayOutputStream inputBuffer; + + /** + * @param args + * @throws Exception + */ + public void testRemoveSubscriber() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(true); + + broker.addConnector("stomp://localhost:61613").setName("Stomp"); + broker.addConnector("tcp://localhost:61616").setName("Default"); + broker.start(); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(new ActiveMQQueue(getDestinationName())); + Message message = session.createTextMessage("Testas"); + for (int idx = 0; idx < 2000; ++idx) { + producer.send(message); + log.debug("Sending: " + idx); + } + producer.close(); + // consumer.close(); + session.close(); + connection.close(); + broker.stop(); + while (broker.isStarted()) { + Thread.sleep(1000); + } + + broker = new BrokerService(); + broker.setPersistent(true); + + broker.addConnector("stomp://localhost:61613").setName("Stomp"); + broker.addConnector("tcp://localhost:61616").setName("Default"); + broker.start(); + + stompSocket = new Socket("localhost", 61613); + inputBuffer = new ByteArrayOutputStream(); + + String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL; + sendFrame(connect_frame); + + String f = receiveFrame(100000); + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n" + Stomp.NULL; + sendFrame(frame); + int messagesCount = 0; + int count = 0; + while (count < 2) { + String receiveFrame = receiveFrame(10000); + DataInput input = new DataInputStream(new ByteArrayInputStream(receiveFrame.getBytes())); + String line; + while (true) { + line = input.readLine(); + if (line == null) { + throw new IOException("connection was closed"); + } + else { + line = line.trim(); + if (line.length() > 0) { + break; + } + } + } + line = input.readLine(); + if (line == null) { + throw new IOException("connection was closed"); + } + String messageId = line.substring(line.indexOf(':') + 1); + messageId = messageId.trim(); + String ackmessage = "ACK\n" + "message-id:" + messageId + "\n\n" + Stomp.NULL; + sendFrame(ackmessage); + log.debug(receiveFrame); + //Thread.sleep(1000); + ++messagesCount; + ++count; + } + stompSocket.close(); + Thread.sleep(10000); + + // for (int idx = 0; idx < 500; ++idx) { + // producer.send(message); + // log.debug("Sending: " +idx); + // } + + stompSocket = new Socket("localhost", 61613); + inputBuffer = new ByteArrayOutputStream(); + + connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL; + sendFrame(connect_frame); + + f = receiveFrame(100000); + frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n" + Stomp.NULL; + sendFrame(frame); + try { + while (count != 2000) { + String receiveFrame = receiveFrame(10000); + DataInput input = new DataInputStream(new ByteArrayInputStream(receiveFrame.getBytes())); + String line; + while (true) { + line = input.readLine(); + if (line == null) { + throw new IOException("connection was closed"); + } + else { + line = line.trim(); + if (line.length() > 0) { + break; + } + } + } + line = input.readLine(); + if (line == null) { + throw new IOException("connection was closed"); + } + String messageId = line.substring(line.indexOf(':') + 1); + messageId = messageId.trim(); + String ackmessage = "ACK\n" + "message-id:" + messageId + "\n\n" + Stomp.NULL; + sendFrame(ackmessage); + log.debug("Received: " + receiveFrame); + //Thread.sleep(1000); + ++messagesCount; + ++count; + } + + } + catch (IOException ex) { + // timeout + } + stompSocket.close(); + broker.stop(); + log.info("Total messages receved: " + messagesCount); + assertTrue("Messages receved after connection loss: " + messagesCount, messagesCount >= 2000); + + // The first ack messages has no chance complete, so we receiving more messages + + // Don't know how to list subscriptions for the broker. Currently you + // can check using jmx console. You'll see + // Subscription whithout any connections + } + + public void sendFrame(String data) throws Exception { + byte[] bytes = data.getBytes("UTF-8"); + OutputStream outputStream = stompSocket.getOutputStream(); + for (int i = 0; i < bytes.length; i++) { + outputStream.write(bytes[i]); + } + outputStream.flush(); + } + + public String receiveFrame(long timeOut) throws Exception { + stompSocket.setSoTimeout((int) timeOut); + InputStream is = stompSocket.getInputStream(); + int c = 0; + for (;;) { + c = is.read(); + if (c < 0) { + throw new IOException("socket closed."); + } + else if (c == 0) { + c = is.read(); + byte[] ba = inputBuffer.toByteArray(); + inputBuffer.reset(); + return new String(ba, "UTF-8"); + } + else { + inputBuffer.write(c); + } + } + } + + protected String getDestinationName() { + return getClass().getName() + "." + getName(); + } +}