From 1ab2f59af7229fc87e065be2dc6c29da3ee62848 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Tue, 18 Nov 2008 13:56:13 +0000 Subject: [PATCH] fix for AMQ-2003 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@718590 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/stomp/StompConnection.java | 219 ++++++++++++++++++ .../transport/stomp/StompConnection.java | 91 -------- assembly/src/release/example/build.xml | 8 + .../src/release/example/src/StompExample.java | 63 +++++ 4 files changed, 290 insertions(+), 91 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java delete mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java create mode 100644 assembly/src/release/example/src/StompExample.java diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java new file mode 100644 index 0000000000..94742d293c --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.stomp; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe; + +public class StompConnection { + + public static final long RECEIVE_TIMEOUT = 10000; + + private Socket stompSocket; + private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream(); + + public void open(String host, int port) throws IOException, UnknownHostException { + open(new Socket(host, port)); + } + + public void open(Socket socket) { + stompSocket = socket; + } + + public void close() throws IOException { + if (stompSocket != null) { + stompSocket.close(); + stompSocket = null; + } + } + + public void sendFrame(String data) throws Exception { + byte[] bytes = data.getBytes("UTF-8"); + OutputStream outputStream = stompSocket.getOutputStream(); + outputStream.write(bytes); + outputStream.write(0); + outputStream.flush(); + } + + public StompFrame receive() throws Exception { + return receive(RECEIVE_TIMEOUT); + } + + public StompFrame receive(long timeOut) throws Exception { + stompSocket.setSoTimeout((int)timeOut); + InputStream is = stompSocket.getInputStream(); + StompWireFormat wf = new StompWireFormat(); + DataInputStream dis = new DataInputStream(is); + return (StompFrame)wf.unmarshal(dis); + } + + public String receiveFrame() throws Exception { + return receiveFrame(RECEIVE_TIMEOUT); + } + + public String receiveFrame(long timeOut) throws Exception { + stompSocket.setSoTimeout((int)timeOut); + InputStream is = stompSocket.getInputStream(); + int c = 0; + for (;;) { + c = is.read(); + if (c < 0) { + throw new IOException("socket closed."); + } else if (c == 0) { + c = is.read(); + if (c != '\n') { + throw new IOException("Expecting stomp frame to terminate with \0\n"); + } + byte[] ba = inputBuffer.toByteArray(); + inputBuffer.reset(); + return new String(ba, "UTF-8"); + } else { + inputBuffer.write(c); + } + } + } + + public Socket getStompSocket() { + return stompSocket; + } + + public void setStompSocket(Socket stompSocket) { + this.stompSocket = stompSocket; + } + + public void connect(String username, String password) throws Exception { + HashMap headers = new HashMap(); + headers.put("login", username); + headers.put("password", password); + StompFrame frame = new StompFrame("CONNECT", headers); + sendFrame(frame.toString()); + } + + public void disconnect() throws Exception { + StompFrame frame = new StompFrame("DISCONNECT"); + sendFrame(frame.toString()); + } + + public void send(String destination, String message) throws Exception { + send(destination, message, null); + } + + public void send(String destination, String message, HashMap headers) throws Exception { + if (headers == null) { + headers = new HashMap(); + } + headers.put("destination", destination); + StompFrame frame = new StompFrame("SEND", headers, message.getBytes()); + sendFrame(frame.toString()); + } + + public void subscribe(String destination) throws Exception { + subscribe(destination, null, null); + } + + public void subscribe(String destination, String ack) throws Exception { + subscribe(destination, ack, new HashMap()); + } + + public void subscribe(String destination, String ack, HashMap headers) throws Exception { + if (headers == null) { + headers = new HashMap(); + } + headers.put("destination", destination); + if (ack != null) { + headers.put("ack", ack); + } + StompFrame frame = new StompFrame("SUBSCRIBE", headers); + sendFrame(frame.toString()); + } + + public void unsubscribe(String destination) throws Exception { + unsubscribe(destination, null); + } + + public void unsubscribe(String destination, HashMap headers) throws Exception { + if (headers == null) { + headers = new HashMap(); + } + headers.put("destination", destination); + StompFrame frame = new StompFrame("UNSUBSCRIBE", headers); + sendFrame(frame.toString()); + } + + public void begin(String transaction) throws Exception { + HashMap headers = new HashMap(); + headers.put("transaction", transaction); + StompFrame frame = new StompFrame("BEGIN", headers); + sendFrame(frame.toString()); + } + + public void abort(String transaction) throws Exception { + HashMap headers = new HashMap(); + headers.put("transaction", transaction); + StompFrame frame = new StompFrame("ABORT", headers); + sendFrame(frame.toString()); + } + + public void commit(String transaction) throws Exception { + HashMap headers = new HashMap(); + headers.put("transaction", transaction); + StompFrame frame = new StompFrame("COMMIT", headers); + sendFrame(frame.toString()); + } + + public void ack(StompFrame frame) throws Exception { + ack(frame.getHeaders().get("message-id"), null); + } + + public void ack(StompFrame frame, String transaction) throws Exception { + ack(frame.getHeaders().get("message-id"), transaction); + } + + public void ack(String messageId) throws Exception { + ack(messageId, null); + } + + public void ack(String messageId, String transaction) throws Exception { + HashMap headers = new HashMap(); + headers.put("message-id", messageId); + if (transaction != null) + headers.put("transaction", transaction); + StompFrame frame = new StompFrame("ACK", headers); + sendFrame(frame.toString()); + } + + protected String appendHeaders(HashMap headers) { + StringBuffer result = new StringBuffer(); + for (String key : headers.keySet()) { + result.append(key + ":" + headers.get(key) + "\n"); + } + result.append("\n"); + return result.toString(); + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java deleted file mode 100644 index 064080d35c..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.transport.stomp; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.net.UnknownHostException; - -public class StompConnection { - - public static final long RECEIVE_TIMEOUT = 10000; - - private Socket stompSocket; - private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream(); - - public void open(String host, int port) throws IOException, UnknownHostException { - open(new Socket(host, port)); - } - - public void open(Socket socket) { - stompSocket = socket; - } - - public void close() throws IOException { - if (stompSocket != null) { - stompSocket.close(); - stompSocket = null; - } - } - - public void sendFrame(String data) throws Exception { - byte[] bytes = data.getBytes("UTF-8"); - OutputStream outputStream = stompSocket.getOutputStream(); - outputStream.write(bytes); - outputStream.write(0); - outputStream.flush(); - } - - public String receiveFrame() throws Exception { - return receiveFrame(RECEIVE_TIMEOUT); - } - - private String receiveFrame(long timeOut) throws Exception { - stompSocket.setSoTimeout((int)timeOut); - InputStream is = stompSocket.getInputStream(); - int c = 0; - for (;;) { - c = is.read(); - if (c < 0) { - throw new IOException("socket closed."); - } else if (c == 0) { - c = is.read(); - if (c != '\n') { - throw new IOException("Expecting stomp frame to terminate with \0\n"); - } - byte[] ba = inputBuffer.toByteArray(); - inputBuffer.reset(); - return new String(ba, "UTF-8"); - } else { - inputBuffer.write(c); - } - } - } - - public Socket getStompSocket() { - return stompSocket; - } - - public void setStompSocket(Socket stompSocket) { - this.stompSocket = stompSocket; - } - -} diff --git a/assembly/src/release/example/build.xml b/assembly/src/release/example/build.xml index 30c45e309b..f7db5eab29 100755 --- a/assembly/src/release/example/build.xml +++ b/assembly/src/release/example/build.xml @@ -290,4 +290,12 @@ + + Running a Stomp example + + + + + + diff --git a/assembly/src/release/example/src/StompExample.java b/assembly/src/release/example/src/StompExample.java new file mode 100644 index 0000000000..89e93626f5 --- /dev/null +++ b/assembly/src/release/example/src/StompExample.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompConnection; +import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe; + +/** + * + * This example demonstrates Stomp Java API + * + * @version $Revision$ + * + */ +public class StompExample { + + public static void main(String args[]) throws Exception { + StompConnection connection = new StompConnection(); + connection.open("localhost", 61613); + + connection.connect("system", "manager"); + StompFrame connect = connection.receive(); + if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) { + throw new Exception ("Not connected"); + } + + connection.begin("tx1"); + connection.send("/queue/test", "message1"); + connection.send("/queue/test", "message2"); + connection.commit("tx1"); + + connection.subscribe("/queue/test", Subscribe.AckModeValues.CLIENT); + + connection.begin("tx2"); + + StompFrame message = connection.receive(); + System.out.println(message.getBody()); + connection.ack(message, "tx2"); + + message = connection.receive(); + System.out.println(message.getBody()); + connection.ack(message, "tx2"); + + connection.commit("tx2"); + + connection.disconnect(); + } + +}