Get the stomp wireformat working again and added some more tests to verify that it is working.

The stomp transport factory just reuses the tcp transport and sets the wireformat to be stomp.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@358217 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2005-12-21 05:45:09 +00:00
parent b6eb3a6319
commit 230e459b26
14 changed files with 376 additions and 457 deletions

View File

@ -8,19 +8,31 @@ import org.activemq.command.Command;
import java.util.Properties;
public class CommandEnvelope {
private final Command command;
private final Properties headers;
CommandEnvelope(Command command, Properties headers) {
private final ResponseListener responseListener;
public CommandEnvelope(Command command, Properties headers) {
this(command, headers, null);
}
public CommandEnvelope(Command command, Properties headers, ResponseListener responseListener) {
this.command = command;
this.headers = headers;
this.responseListener = responseListener;
}
Properties getHeaders() {
public Properties getHeaders() {
return headers;
}
Command getCommand() {
public Command getCommand() {
return command;
}
public ResponseListener getResponseListener() {
return responseListener;
}
}

View File

@ -14,7 +14,6 @@ import java.io.IOException;
import java.net.ProtocolException;
class CommandParser {
private String clientId;
private final StompWireFormat format;
CommandParser(StompWireFormat wireFormat) {
@ -33,8 +32,10 @@ class CommandParser {
throw new IOException("connection was closed");
}
// figure corrent command and return it
// figure correct command and return it
StompCommand command = null;
if (line.startsWith(Stomp.Commands.CONNECT))
command = new Connect(format);
if (line.startsWith(Stomp.Commands.SUBSCRIBE))
command = new Subscribe(format);
if (line.startsWith(Stomp.Commands.SEND))
@ -58,27 +59,28 @@ class CommandParser {
throw new ProtocolException("Unknown command [" + line + "]");
}
CommandEnvelope envelope = command.build(line, in);
if (envelope.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED)) {
final short id = StompWireFormat.generateCommandId();
envelope.getCommand().setCommandId(id);
final CommandEnvelope envelope = command.build(line, in);
final short commandId = format.generateCommandId();
final String client_packet_key = envelope.getHeaders().getProperty(Stomp.Headers.RECEIPT_REQUESTED);
final boolean receiptRequested = client_packet_key!=null;
envelope.getCommand().setCommandId(commandId);
if (receiptRequested || envelope.getResponseListener()!=null ) {
envelope.getCommand().setResponseRequired(true);
final String client_packet_key = envelope.getHeaders().getProperty(Stomp.Headers.RECEIPT_REQUESTED);
format.addResponseListener(new ResponseListener() {
public boolean onResponse(Response receipt, DataOutput out) throws IOException {
if (receipt.getCorrelationId() != id)
return false;
out.write(new FrameBuilder(Stomp.Responses.RECEIPT).addHeader(Stomp.Headers.Response.RECEIPT_ID, client_packet_key).toFrame());
return true;
}
});
if( envelope.getResponseListener()!=null ) {
format.addResponseListener(envelope.getResponseListener());
} else {
format.addResponseListener(new ResponseListener() {
public boolean onResponse(Response receipt, DataOutput out) throws IOException {
if (receipt.getCorrelationId() != commandId)
return false;
out.write(new FrameBuilder(Stomp.Responses.RECEIPT).addHeader(Stomp.Headers.Response.RECEIPT_ID, client_packet_key).toFrame());
return true;
}
});
}
}
return envelope.getCommand();
}
void setClientId(String clientId) {
this.clientId = clientId;
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright (c) 2005 Your Corporation. All Rights Reserved.
*/
package org.activemq.transport.stomp;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Properties;
import org.activemq.command.ConnectionInfo;
import org.activemq.command.ProducerInfo;
import org.activemq.command.Response;
import org.activemq.command.SessionInfo;
class Connect implements StompCommand {
private HeaderParser headerParser = new HeaderParser();
private StompWireFormat format;
Connect(StompWireFormat format) {
this.format = format;
}
public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
Properties headers = headerParser.parse(in);
// allow anyone to login for now
String login = headers.getProperty(Stomp.Headers.Connect.LOGIN);
String passcode = headers.getProperty(Stomp.Headers.Connect.PASSCODE);
String clientId = headers.getProperty(Stomp.Headers.Connect.CLIENT_ID);
final ConnectionInfo connectionInfo = new ConnectionInfo();
connectionInfo.setConnectionId(format.getConnectionId());
if( clientId!=null )
connectionInfo.setClientId(clientId);
else
connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString());
connectionInfo.setResponseRequired(true);
connectionInfo.setUserName(login);
connectionInfo.setPassword(passcode);
while (in.readByte() != 0) {
}
return new CommandEnvelope(connectionInfo, headers, new ResponseListener() {
public boolean onResponse(Response receipt, DataOutput out) throws IOException {
if (receipt.getCorrelationId() != connectionInfo.getCommandId())
return false;
final SessionInfo sessionInfo = new SessionInfo(format.getSessionId());
sessionInfo.setCommandId(format.generateCommandId());
sessionInfo.setResponseRequired(false);
final ProducerInfo producerInfo = new ProducerInfo(format.getProducerId());
producerInfo.setCommandId(format.generateCommandId());
producerInfo.setResponseRequired(true);
format.addResponseListener(new ResponseListener() {
public boolean onResponse(Response receipt, DataOutput out) throws IOException {
if (receipt.getCorrelationId() != producerInfo.getCommandId())
return false;
format.onFullyConnected();
StringBuffer buffer = new StringBuffer();
buffer.append(Stomp.Responses.CONNECTED).append(Stomp.NEWLINE);
buffer.append(Stomp.Headers.Connected.SESSION).append(Stomp.Headers.SEPERATOR).append(connectionInfo.getClientId()).append(Stomp.NEWLINE).append(
Stomp.NEWLINE);
buffer.append(Stomp.NULL);
out.writeBytes(buffer.toString());
return true;
}
});
format.addToPendingReadCommands(sessionInfo);
format.addToPendingReadCommands(producerInfo);
return true;
}
});
}
}

View File

@ -68,7 +68,9 @@ class Send implements StompCommand {
msg = text;
}
msg.setMessageId(format.generateMessageId());
msg.setProducerId(format.getProducerId());
msg.setMessageId(format.createMessageId());
ActiveMQDestination d = DestinationNamer.convert(destination);
msg.setDestination(d);

View File

@ -80,6 +80,7 @@ public interface Stomp {
public interface Connect {
String LOGIN = "login";
String PASSCODE = "passcode";
String CLIENT_ID = "client-id";
}
public interface Error {

View File

@ -1,66 +0,0 @@
/**
*
* Copyright 2005 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.transport.stomp;
import org.activemq.transport.tcp.TcpTransport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
/**
* A transport for using Stomp to talk to ActiveMQ
*
* @version $Revision: 1.1 $
*/
public class StompTransportChannel extends TcpTransport {
private static final Log log = LogFactory.getLog(StompTransportChannel.class);
public StompTransportChannel() {
super(new StompWireFormat());
}
public StompTransportChannel(URI remoteLocation) throws UnknownHostException, IOException {
super(new StompWireFormat(), remoteLocation);
}
public StompTransportChannel(URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(new StompWireFormat(), remoteLocation, localLocation);
}
public StompTransportChannel(Socket socket) throws IOException {
super(new StompWireFormat(), socket);
}
/*
* protected void readWireFormat() throws JMSException, IOException { // no
* need to read wire format from wire }
*
* protected void doConsumeCommand(Command packet) { if( packet ==
* FlushCommand.PACKET ) { try { doAsyncSend(null); } catch (JMSException e) {
* ExceptionListener listener = getExceptionListener(); if (listener !=
* null) { listener.onException(e); } else { log.warn("No listener to report
* error consuming packet: " + e, e); } } } else {
* super.doConsumeCommand(packet); } }
*/
}

View File

@ -17,21 +17,32 @@
**/
package org.activemq.transport.stomp;
import org.activemq.transport.TransportFactory;
import org.activemq.transport.TransportServer;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.activemq.transport.TransportFactory;
import org.activemq.transport.TransportServer;
import org.activemq.util.IOExceptionSupport;
/**
* A <a href="http://stomp.codehaus.org/">Stomp</a> transport factory
*
* @version $Revision: 1.1.1.1 $
*/
public class StompTransportServerChannelFactory extends TransportFactory {
public class StompTransportFactory extends TransportFactory {
public TransportServer doBind(String brokerId, URI location) throws IOException {
return new StompTransportServerChannel(brokerId, location);
try {
URI tcpURI = new URI(
"tcp://"+location.getHost()+
(location.getPort()>=0 ? ":"+location.getPort() : "")+
"?wireFormat=stomp"
);
return TransportFactory.bind(brokerId, tcpURI);
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
}

View File

@ -1,54 +0,0 @@
/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.transport.stomp;
import org.activemq.command.BrokerInfo;
import org.activemq.transport.TransportServerSupport;
import java.net.URI;
/**
* A <a href="http://stomp.codehaus.org/">Stomp</a> transport server
*
* @version $Revision: 1.1.1.1 $
*/
public class StompTransportServerChannel extends TransportServerSupport implements Runnable {
public StompTransportServerChannel(String brokerId, URI location) {
// TODO...
}
public void run() {
}
public void setBrokerInfo(BrokerInfo brokerInfo) {
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
/*
* protected TcpTransportChannel createTransportChannel(Socket socket,
* PooledExecutor executor) throws JMSException { return new
* StompTransportChannel(socket, executor); }
*/
}

View File

@ -3,75 +3,80 @@
*/
package org.activemq.transport.stomp;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.ProtocolException;
import java.util.List;
import java.util.Map;
import javax.jms.JMSException;
import org.activeio.ByteArrayOutputStream;
import org.activeio.Packet;
import org.activeio.adapter.PacketInputStream;
import org.activeio.command.WireFormat;
import org.activeio.packet.ByteArrayPacket;
import org.activemq.command.ActiveMQBytesMessage;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQTextMessage;
import org.activemq.command.Command;
import org.activemq.command.CommandTypes;
import org.activemq.command.ConnectionId;
import org.activemq.command.ConnectionInfo;
import org.activemq.command.ConsumerId;
import org.activemq.command.FlushCommand;
import org.activemq.command.LocalTransactionId;
import org.activemq.command.MessageId;
import org.activemq.command.ProducerId;
import org.activemq.command.Response;
import org.activemq.command.SessionId;
import org.activemq.command.TransactionId;
import org.activemq.util.IOExceptionSupport;
import org.activemq.util.IdGenerator;
import org.activemq.util.LongSequenceGenerator;
import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import org.activeio.Packet;
import org.activeio.command.WireFormat;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQTextMessage;
import org.activemq.command.CommandTypes;
import org.activemq.command.ConnectionId;
import org.activemq.command.ConnectionInfo;
import org.activemq.command.ConsumerId;
import org.activemq.command.Command;
import org.activemq.command.FlushCommand;
import org.activemq.command.LocalTransactionId;
import org.activemq.command.MessageId;
import org.activemq.command.Response;
import org.activemq.command.SessionId;
import org.activemq.command.SessionInfo;
import org.activemq.command.ActiveMQBytesMessage;
import org.activemq.command.TransactionId;
import org.activemq.util.IdGenerator;
import javax.jms.JMSException;
import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.DatagramPacket;
import java.net.ProtocolException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Implements the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
public class StompWireFormat implements WireFormat {
static final IdGenerator PACKET_IDS = new IdGenerator();
static final IdGenerator clientIds = new IdGenerator();
private static final IdGenerator connectionIdGenerator = new IdGenerator();
private static int transactionIdCounter;
private int version = 1;
private CommandParser commandParser = new CommandParser(this);
private HeaderParser headerParser = new HeaderParser();
private DataInputStream in;
private String clientId;
private BlockingQueue pendingReadCommands = new LinkedBlockingQueue();
private BlockingQueue pendingWriteFrames = new LinkedBlockingQueue();
private List receiptListeners = new CopyOnWriteArrayList();
private SessionId sessionId;
private Map subscriptions = new ConcurrentHashMap();
private List ackListeners = new CopyOnWriteArrayList();
private final Map transactions = new ConcurrentHashMap();
private ConnectionId connectionId;
private short lastCommandId;
private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1);
private final ProducerId producerId = new ProducerId(sessionId, 1);
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
void addResponseListener(ResponseListener listener) {
receiptListeners.add(listener);
}
boolean connected = false;
public Command readCommand(DataInput in) throws IOException, JMSException {
Command pending = (Command) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn() {
public Object cycle() throws InterruptedException {
@ -83,7 +88,12 @@ public class StompWireFormat implements WireFormat {
}
try {
return commandParser.parse(in);
Command command = commandParser.parse(in);
if( !connected ) {
if( command.getDataStructureType() != ConnectionInfo.DATA_STRUCTURE_TYPE )
throw new IOException("Not yet connected.");
}
return command;
}
catch (ProtocolException e) {
sendError(e.getMessage());
@ -149,265 +159,32 @@ public class StompWireFormat implements WireFormat {
}
});
}
/**
* some transports may register their streams (e.g. Tcp)
*
* @param dataOut
* @param dataIn
*/
public void registerTransportStreams(DataOutputStream dataOut, DataInputStream dataIn) {
this.in = dataIn;
public void onFullyConnected() {
connected=true;
}
/**
* Some wire formats require a handshake at start-up
*
* @throws java.io.IOException
*/
public void initiateServerSideProtocol() throws IOException {
BufferedReader in = new BufferedReader(new InputStreamReader(this.in));
String first_line = in.readLine();
if (!first_line.startsWith(Stomp.Commands.CONNECT)) {
throw new IOException("First line does not begin with with " + Stomp.Commands.CONNECT);
}
Properties headers = headerParser.parse(in);
// if (!headers.containsKey(TTMP.Headers.Connect.LOGIN))
// System.err.println("Required header [" + TTMP.Headers.Connect.LOGIN +
// "] missing");
// if (!headers.containsKey(TTMP.Headers.Connect.PASSCODE))
// System.err.println("Required header [" +
// TTMP.Headers.Connect.PASSCODE + "] missing");
// allow anyone to login for now
String login = headers.getProperty(Stomp.Headers.Connect.LOGIN);
String passcode = headers.getProperty(Stomp.Headers.Connect.PASSCODE);
// skip to end of the packet
while (in.read() != 0) {
}
final ConnectionInfo info = new ConnectionInfo();
clientId = clientIds.generateId();
commandParser.setClientId(clientId);
info.setClientId(clientId);
info.setResponseRequired(true);
// info.setClientVersion(Integer.toString(getCurrentWireFormatVersion()));
final short commandId = generateCommandId();
info.setCommandId(commandId);
info.setUserName(login);
info.setPassword(passcode);
// info.setStartTime(System.currentTimeMillis());
public void addToPendingReadCommands(final Command info) {
AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() {
public void cycle() throws InterruptedException {
pendingReadCommands.put(info);
}
});
addResponseListener(new ResponseListener() {
public boolean onResponse(Response receipt, DataOutput out) {
if (receipt.getCorrelationId() != commandId)
return false;
sessionId = generateSessionId();
final SessionInfo info = new SessionInfo();
info.setCommandId(generateCommandId());
info.setSessionId(sessionId);
info.setResponseRequired(true);
AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() {
public void cycle() throws InterruptedException {
pendingReadCommands.put(info);
}
});
addResponseListener(new ResponseListener() {
public boolean onResponse(Response receipt, DataOutput out) throws IOException {
if (receipt.getCorrelationId() != commandId)
return false;
StringBuffer buffer = new StringBuffer();
buffer.append(Stomp.Responses.CONNECTED).append(Stomp.NEWLINE);
buffer.append(Stomp.Headers.Connected.SESSION).append(Stomp.Headers.SEPERATOR).append(clientId).append(Stomp.NEWLINE).append(
Stomp.NEWLINE);
buffer.append(Stomp.NULL);
out.writeBytes(buffer.toString());
return true;
}
});
return true;
}
});
}
/**
* Creates a new copy of this wire format so it can be used in another
* thread/context
*/
public WireFormat copy() {
return new StompWireFormat();
}
/* Stuff below here is leaky stuff we don't actually need */
/**
* Some wire formats require a handshake at start-up
*
* @throws java.io.IOException
*/
public void initiateClientSideProtocol() throws IOException {
throw new UnsupportedOperationException("Not yet implemented!");
}
/**
* Can this wireformat process packets of this version
*
* @param version
* the version number to test
* @return true if can accept the version
*/
public boolean canProcessWireFormatVersion(int version) {
return version == getCurrentWireFormatVersion();
}
/**
* @return the current version of this wire format
*/
public int getCurrentWireFormatVersion() {
return 1;
}
/**
* @return Returns the enableCaching.
*/
public boolean isCachingEnabled() {
return false;
}
/**
* @param enableCaching
* The enableCaching to set.
*/
public void setCachingEnabled(boolean enableCaching) {
// never
}
/**
* some wire formats will implement their own fragementation
*
* @return true unless a wire format supports it's own fragmentation
*/
public boolean doesSupportMessageFragmentation() {
return false;
}
/**
* Some wire formats will not be able to understand compressed messages
*
* @return true unless a wire format cannot understand compression
*/
public boolean doesSupportMessageCompression() {
return false;
}
/**
* Writes the given package to a new datagram
*
* @param channelID
* is the unique channel ID
* @param packet
* is the packet to write
* @return
* @throws java.io.IOException
* @throws javax.jms.JMSException
*/
public DatagramPacket writeCommand(String channelID, Command packet) throws IOException, JMSException {
throw new UnsupportedOperationException("Will not be implemented");
}
/**
* Reads the packet from the given byte[]
*
* @param bytes
* @param offset
* @param length
* @return
* @throws java.io.IOException
*/
public Command fromBytes(byte[] bytes, int offset, int length) throws IOException {
throw new UnsupportedOperationException("Will not be implemented");
}
/**
* Reads the packet from the given byte[]
*
* @param bytes
* @return
* @throws java.io.IOException
*/
public Command fromBytes(byte[] bytes) throws IOException {
throw new UnsupportedOperationException("Will not be implemented");
}
/**
* A helper method which converts a packet into a byte array
*
* @param packet
* @return a byte array representing the packet using some wire protocol
* @throws java.io.IOException
* @throws javax.jms.JMSException
*/
public byte[] toBytes(Command packet) throws IOException, JMSException {
throw new UnsupportedOperationException("Will not be implemented");
}
/**
* A helper method for working with sockets where the first byte is read
* first, then the rest of the message is read. <p/> Its common when dealing
* with sockets to have different timeout semantics until the first non-zero
* byte is read of a message, after which time a zero timeout is used.
*
* @param firstByte
* the first byte of the packet
* @param in
* the rest of the packet
* @return
* @throws java.io.IOException
*/
public Command readCommand(int firstByte, DataInput in) throws IOException {
throw new UnsupportedOperationException("Will not be implemented");
}
/**
* Read a packet from a Datagram packet from the given channelID. If the
* packet is from the same channel ID as it was sent then we have a
* loop-back so discard the packet
*
* @param channelID
* is the unique channel ID
* @param dpacket
* @return the packet read from the datagram or null if it should be
* discarded
* @throws java.io.IOException
*/
public Command readCommand(String channelID, DatagramPacket dpacket) throws IOException {
throw new UnsupportedOperationException("Will not be implemented");
}
void clearTransactionId(String user_tx_id) {
this.transactions.remove(user_tx_id);
}
String getClientId() {
return this.clientId;
}
public SessionId getSessionId() {
return sessionId;
}
public ProducerId getProducerId() {
return producerId;
}
public void addSubscription(Subscription s) {
if (subscriptions.containsKey(s.getDestination())) {
Subscription old = (Subscription) subscriptions.get(s.getDestination());
@ -462,44 +239,58 @@ public class StompWireFormat implements WireFormat {
return connectionId;
}
public void setConnectionId(ConnectionId connectionId) {
this.connectionId = connectionId;
}
public static synchronized int generateTransactionId() {
return ++transactionIdCounter;
}
public ConsumerId createConsumerId() {
throw new RuntimeException("TODO!!");
return new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
}
public MessageId generateMessageId() {
throw new RuntimeException("TODO!!");
public MessageId createMessageId() {
return new MessageId(producerId, messageIdGenerator.getNextSequenceId());
}
// TODO static???
public static short generateCommandId() {
throw new RuntimeException("TODO!!");
synchronized public short generateCommandId() {
return lastCommandId++;
}
public SessionId generateSessionId() {
throw new RuntimeException("TODO!!");
}
public Packet marshal(Object arg0) throws IOException {
throw new RuntimeException("TODO!!");
public Packet marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
marshal(command, dos);
dos.close();
return new ByteArrayPacket(baos.toByteSequence());
}
public Object unmarshal(Packet arg0) throws IOException {
throw new RuntimeException("TODO!!");
public Object unmarshal(Packet packet) throws IOException {
PacketInputStream stream = new PacketInputStream(packet);
DataInputStream dis = new DataInputStream(stream);
return unmarshal(dis);
}
public void marshal(Object arg0, DataOutputStream arg1) throws IOException {
throw new RuntimeException("TODO!!");
public void marshal(Object command, DataOutputStream os) throws IOException {
try {
writeCommand((Command) command, os);
} catch (IOException e) {
throw e;
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
}
}
public Object unmarshal(DataInputStream arg0) throws IOException {
throw new RuntimeException("TODO!!");
public Object unmarshal(DataInputStream is) throws IOException {
try {
return readCommand(is);
} catch (IOException e) {
throw e;
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
}
}
}

View File

@ -0,0 +1,16 @@
/*
* Copyright (c) 2005 Your Corporation. All Rights Reserved.
*/
package org.activemq.transport.stomp;
import org.activeio.command.WireFormat;
import org.activeio.command.WireFormatFactory;
/**
* Creates WireFormat objects that implement the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
public class StompWireFormatFactory implements WireFormatFactory {
public WireFormat createWireFormat() {
return new StompWireFormat();
}
}

View File

@ -0,0 +1 @@
class=org.activemq.transport.stomp.StompTransportFactory

View File

@ -0,0 +1 @@
class=org.activemq.transport.stomp.StompWireFormatFactory

View File

@ -0,0 +1,121 @@
package org.activemq.transport.stomp;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.URI;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.CombinationTestSupport;
import org.activemq.broker.BrokerService;
import org.activemq.broker.TransportConnector;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQQueue;
public class StompTest extends CombinationTestSupport {
private BrokerService broker;
private TransportConnector connector;
private Socket stompSocket;
private ByteArrayOutputStream inputBuffer;
private Connection connection;
private Session session;
private ActiveMQQueue queue;
protected void setUp() throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
connector = broker.addConnector("stomp://localhost:0");
broker.start();
URI connectUri = connector.getConnectUri();
stompSocket = new Socket(connectUri.getHost(), connectUri.getPort());
inputBuffer = new ByteArrayOutputStream();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
connection = cf.createConnection();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
queue = new ActiveMQQueue("TEST");
connection.start();
}
protected void tearDown() throws Exception {
connection.close();
stompSocket.close();
broker.stop();
}
public void sendFrame(String data) throws Exception {
byte[] bytes = data.getBytes("UTF-8");
OutputStream outputStream = stompSocket.getOutputStream();
for (int i = 0; i < bytes.length; i++) {
outputStream.write(bytes[i]);
}
outputStream.flush();
}
public String receiveFrame(long timeOut) throws Exception {
stompSocket.setSoTimeout((int) timeOut);
InputStream is = stompSocket.getInputStream();
int c=0;
for(;;) {
c = is.read();
if( c < 0 ) {
throw new IOException("socket closed.");
} else if( c == 0 ) {
byte[] ba = inputBuffer.toByteArray();
inputBuffer.reset();
return new String(ba, "UTF-8");
} else {
inputBuffer.write(c);
}
}
}
public void testConnect() throws Exception {
String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL;
sendFrame(connect_frame);
String f = receiveFrame(10000);
assertTrue(f.startsWith("CONNECTED"));
}
public void testSendMessage() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame =
"CONNECT\n" +
"login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
sendFrame(frame);
frame = receiveFrame(10000);
assertTrue(frame.startsWith("CONNECTED"));
frame =
"SEND\n" +
"destination:/queue/TEST\n\n" +
"Hello World" +
Stomp.NULL;
sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull(message);
assertEquals("Hello World", message.getText());
}
}

View File

@ -31,9 +31,6 @@ public class StompWireFormatTest extends TestCase {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream dout = new DataOutputStream(bout);
wire.registerTransportStreams(dout, din);
wire.initiateServerSideProtocol();
ConnectionInfo ci = (ConnectionInfo) wire.readCommand(din);
assertNotNull(ci);
assertTrue(ci.isResponseRequired());