mirror of https://github.com/apache/activemq.git
Removing old stomp implementation since the new one seems to be working better and has been verified to have fixed some bugs.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@419012 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
399981fe74
commit
1f6aa57df6
|
@ -1,53 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.TransactionId;
|
|
||||||
import org.apache.activemq.command.TransactionInfo;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.ProtocolException;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
class Abort implements StompCommand {
|
|
||||||
private StompWireFormat format;
|
|
||||||
private static final HeaderParser parser = new HeaderParser();
|
|
||||||
|
|
||||||
Abort(StompWireFormat format) {
|
|
||||||
this.format = format;
|
|
||||||
}
|
|
||||||
|
|
||||||
public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
|
|
||||||
Properties headers = parser.parse(in);
|
|
||||||
while (in.readByte() != 0) {
|
|
||||||
}
|
|
||||||
String user_tx_id = headers.getProperty(Stomp.Headers.TRANSACTION);
|
|
||||||
|
|
||||||
if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
|
|
||||||
throw new ProtocolException("Must specify the transaction you are aborting");
|
|
||||||
}
|
|
||||||
|
|
||||||
TransactionId txnId = format.getTransactionId(user_tx_id);
|
|
||||||
TransactionInfo tx = new TransactionInfo();
|
|
||||||
tx.setConnectionId(format.getConnectionId());
|
|
||||||
tx.setTransactionId(txnId);
|
|
||||||
tx.setType(TransactionInfo.ROLLBACK);
|
|
||||||
format.clearTransactionId(user_tx_id);
|
|
||||||
return new CommandEnvelope(tx, headers);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,59 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.ProtocolException;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.MessageAck;
|
|
||||||
import org.apache.activemq.command.TransactionId;
|
|
||||||
|
|
||||||
class Ack implements StompCommand {
|
|
||||||
private final StompWireFormat format;
|
|
||||||
private static final HeaderParser parser = new HeaderParser();
|
|
||||||
|
|
||||||
Ack(StompWireFormat format) {
|
|
||||||
this.format = format;
|
|
||||||
}
|
|
||||||
|
|
||||||
public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
|
|
||||||
Properties headers = parser.parse(in);
|
|
||||||
String message_id = headers.getProperty(Stomp.Headers.Ack.MESSAGE_ID);
|
|
||||||
if (message_id == null)
|
|
||||||
throw new ProtocolException("ACK received without a message-id to acknowledge!");
|
|
||||||
|
|
||||||
Subscription sub = (Subscription) format.getDispachedMap().get(message_id);
|
|
||||||
if( sub ==null )
|
|
||||||
throw new ProtocolException("Unexpected ACK received for message-id [" + message_id + "]");
|
|
||||||
|
|
||||||
MessageAck ack = sub.createMessageAck(message_id);
|
|
||||||
|
|
||||||
if (headers.containsKey(Stomp.Headers.TRANSACTION)) {
|
|
||||||
TransactionId tx_id = format.getTransactionId(headers.getProperty(Stomp.Headers.TRANSACTION));
|
|
||||||
if (tx_id == null)
|
|
||||||
throw new ProtocolException(headers.getProperty(Stomp.Headers.TRANSACTION) + " is an invalid transaction id");
|
|
||||||
ack.setTransactionId(tx_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
while ((in.readByte()) != 0) {
|
|
||||||
}
|
|
||||||
|
|
||||||
return new CommandEnvelope(ack, headers);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,48 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
class AsyncHelper {
|
|
||||||
public static Object tryUntilNotInterrupted(HelperWithReturn helper) {
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
return helper.cycle();
|
|
||||||
}
|
|
||||||
catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tryUntilNotInterrupted(final Helper helper) {
|
|
||||||
tryUntilNotInterrupted(new HelperWithReturn() {
|
|
||||||
|
|
||||||
public Object cycle() throws InterruptedException {
|
|
||||||
helper.cycle();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
interface HelperWithReturn {
|
|
||||||
Object cycle() throws InterruptedException;
|
|
||||||
}
|
|
||||||
|
|
||||||
interface Helper {
|
|
||||||
void cycle() throws InterruptedException;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,52 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.TransactionId;
|
|
||||||
import org.apache.activemq.command.TransactionInfo;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.ProtocolException;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
public class Begin implements StompCommand {
|
|
||||||
private StompWireFormat format;
|
|
||||||
private static final HeaderParser parser = new HeaderParser();
|
|
||||||
|
|
||||||
public Begin(StompWireFormat format) {
|
|
||||||
this.format = format;
|
|
||||||
}
|
|
||||||
|
|
||||||
public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
|
|
||||||
Properties headers = parser.parse(in);
|
|
||||||
while (in.readByte() != 0) {
|
|
||||||
}
|
|
||||||
|
|
||||||
TransactionInfo tx = new TransactionInfo();
|
|
||||||
String user_tx_id = headers.getProperty(Stomp.Headers.TRANSACTION);
|
|
||||||
if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
|
|
||||||
throw new ProtocolException("Must specify the transaction you are beginning");
|
|
||||||
}
|
|
||||||
int tx_id = StompWireFormat.generateTransactionId();
|
|
||||||
TransactionId transactionId = format.registerTransactionId(user_tx_id, tx_id);
|
|
||||||
tx.setConnectionId(format.getConnectionId());
|
|
||||||
tx.setTransactionId(transactionId);
|
|
||||||
tx.setType(TransactionInfo.BEGIN);
|
|
||||||
return new CommandEnvelope(tx, headers);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,36 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
interface Command {
|
|
||||||
public CommandEnvelope build(String commandLine, DataInput in) throws IOException, JMSException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a command instance which always returns null for a packet
|
|
||||||
*/
|
|
||||||
StompCommand NULL_COMMAND = new StompCommand() {
|
|
||||||
public CommandEnvelope build(String commandLine, DataInput in) {
|
|
||||||
return new CommandEnvelope(null, new Properties());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
|
@ -1,51 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.Command;
|
|
||||||
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
public class CommandEnvelope {
|
|
||||||
|
|
||||||
private final Command command;
|
|
||||||
private final Properties headers;
|
|
||||||
private final ResponseListener responseListener;
|
|
||||||
|
|
||||||
public CommandEnvelope(Command command, Properties headers) {
|
|
||||||
this(command, headers, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public CommandEnvelope(Command command, Properties headers, ResponseListener responseListener) {
|
|
||||||
this.command = command;
|
|
||||||
this.headers = headers;
|
|
||||||
this.responseListener = responseListener;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Properties getHeaders() {
|
|
||||||
return headers;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Command getCommand() {
|
|
||||||
return command;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ResponseListener getResponseListener() {
|
|
||||||
return responseListener;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,104 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.Command;
|
|
||||||
import org.apache.activemq.command.Response;
|
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.DataOutput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.ProtocolException;
|
|
||||||
|
|
||||||
class CommandParser {
|
|
||||||
private final StompWireFormat format;
|
|
||||||
|
|
||||||
CommandParser(StompWireFormat wireFormat) {
|
|
||||||
format = wireFormat;
|
|
||||||
}
|
|
||||||
|
|
||||||
Command parse(DataInput in) throws IOException, JMSException {
|
|
||||||
String line = null;
|
|
||||||
|
|
||||||
// skip white space to next real line
|
|
||||||
while (true) {
|
|
||||||
line = in.readLine();
|
|
||||||
if (line == null) {
|
|
||||||
throw new IOException("connection was closed");
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
line = line.trim();
|
|
||||||
if (line.length() > 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// figure correct command and return it
|
|
||||||
StompCommand command = null;
|
|
||||||
if (line.startsWith(Stomp.Commands.CONNECT))
|
|
||||||
command = new Connect(format);
|
|
||||||
else if (line.startsWith(Stomp.Commands.SUBSCRIBE))
|
|
||||||
command = new Subscribe(format);
|
|
||||||
else if (line.startsWith(Stomp.Commands.SEND))
|
|
||||||
command = new Send(format);
|
|
||||||
else if (line.startsWith(Stomp.Commands.DISCONNECT))
|
|
||||||
command = new Disconnect();
|
|
||||||
else if (line.startsWith(Stomp.Commands.BEGIN))
|
|
||||||
command = new Begin(format);
|
|
||||||
else if (line.startsWith(Stomp.Commands.COMMIT))
|
|
||||||
command = new Commit(format);
|
|
||||||
else if (line.startsWith(Stomp.Commands.ABORT))
|
|
||||||
command = new Abort(format);
|
|
||||||
else if (line.startsWith(Stomp.Commands.UNSUBSCRIBE))
|
|
||||||
command = new Unsubscribe(format);
|
|
||||||
else if (line.startsWith(Stomp.Commands.ACK))
|
|
||||||
command = new Ack(format);
|
|
||||||
|
|
||||||
if (command == null) {
|
|
||||||
while (in.readByte() == 0) {
|
|
||||||
}
|
|
||||||
throw new ProtocolException("Unknown command [" + line + "]");
|
|
||||||
}
|
|
||||||
|
|
||||||
final CommandEnvelope envelope = command.build(line, in);
|
|
||||||
final short commandId = format.generateCommandId();
|
|
||||||
final String client_packet_key = envelope.getHeaders().getProperty(Stomp.Headers.RECEIPT_REQUESTED);
|
|
||||||
final boolean receiptRequested = client_packet_key!=null;
|
|
||||||
|
|
||||||
envelope.getCommand().setCommandId(commandId);
|
|
||||||
if (receiptRequested || envelope.getResponseListener()!=null ) {
|
|
||||||
envelope.getCommand().setResponseRequired(true);
|
|
||||||
if( envelope.getResponseListener()!=null ) {
|
|
||||||
format.addResponseListener(envelope.getResponseListener());
|
|
||||||
} else {
|
|
||||||
format.addResponseListener(new ResponseListener() {
|
|
||||||
public boolean onResponse(Response receipt, DataOutput out) throws IOException {
|
|
||||||
if (receipt.getCorrelationId() != commandId)
|
|
||||||
return false;
|
|
||||||
out.write(new FrameBuilder(Stomp.Responses.RECEIPT).addHeader(Stomp.Headers.Response.RECEIPT_ID, client_packet_key).toFrame());
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return envelope.getCommand();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,56 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.TransactionId;
|
|
||||||
import org.apache.activemq.command.TransactionInfo;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.ProtocolException;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
class Commit implements StompCommand {
|
|
||||||
private StompWireFormat format;
|
|
||||||
private static final HeaderParser parser = new HeaderParser();
|
|
||||||
|
|
||||||
Commit(StompWireFormat format) {
|
|
||||||
this.format = format;
|
|
||||||
}
|
|
||||||
|
|
||||||
public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
|
|
||||||
Properties headers = parser.parse(in);
|
|
||||||
while (in.readByte() != 0) {
|
|
||||||
}
|
|
||||||
|
|
||||||
String user_tx_id = headers.getProperty(Stomp.Headers.TRANSACTION);
|
|
||||||
|
|
||||||
if (user_tx_id == null) {
|
|
||||||
throw new ProtocolException("Must specify the transaction you are committing");
|
|
||||||
}
|
|
||||||
|
|
||||||
TransactionId tx_id = format.getTransactionId(user_tx_id);
|
|
||||||
if (tx_id == null)
|
|
||||||
throw new ProtocolException(user_tx_id + " is an invalid transaction id");
|
|
||||||
TransactionInfo tx = new TransactionInfo();
|
|
||||||
tx.setConnectionId(format.getConnectionId());
|
|
||||||
tx.setTransactionId(tx_id);
|
|
||||||
tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
|
|
||||||
format.clearTransactionId(user_tx_id);
|
|
||||||
return new CommandEnvelope(tx, headers);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,124 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.DataOutput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.ConnectionInfo;
|
|
||||||
import org.apache.activemq.command.ProducerInfo;
|
|
||||||
import org.apache.activemq.command.Response;
|
|
||||||
import org.apache.activemq.command.SessionInfo;
|
|
||||||
import org.apache.activemq.util.IntrospectionSupport;
|
|
||||||
|
|
||||||
class Connect implements StompCommand {
|
|
||||||
private HeaderParser headerParser = new HeaderParser();
|
|
||||||
private StompWireFormat format;
|
|
||||||
|
|
||||||
Connect(StompWireFormat format) {
|
|
||||||
this.format = format;
|
|
||||||
}
|
|
||||||
|
|
||||||
public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
|
|
||||||
|
|
||||||
final Properties headers = headerParser.parse(in);
|
|
||||||
|
|
||||||
|
|
||||||
// allow anyone to login for now
|
|
||||||
String login = headers.getProperty(Stomp.Headers.Connect.LOGIN);
|
|
||||||
String passcode = headers.getProperty(Stomp.Headers.Connect.PASSCODE);
|
|
||||||
String clientId = headers.getProperty(Stomp.Headers.Connect.CLIENT_ID);
|
|
||||||
|
|
||||||
final ConnectionInfo connectionInfo = new ConnectionInfo();
|
|
||||||
|
|
||||||
IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
|
|
||||||
|
|
||||||
connectionInfo.setConnectionId(format.getConnectionId());
|
|
||||||
if( clientId!=null )
|
|
||||||
connectionInfo.setClientId(clientId);
|
|
||||||
else
|
|
||||||
connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString());
|
|
||||||
connectionInfo.setResponseRequired(true);
|
|
||||||
connectionInfo.setUserName(login);
|
|
||||||
connectionInfo.setPassword(passcode);
|
|
||||||
|
|
||||||
while (in.readByte() != 0) {
|
|
||||||
}
|
|
||||||
|
|
||||||
return new CommandEnvelope(connectionInfo, headers,
|
|
||||||
new ConnectResponseListener(headers, connectionInfo) );
|
|
||||||
}
|
|
||||||
|
|
||||||
class ConnectResponseListener implements ResponseListener{
|
|
||||||
|
|
||||||
private Properties headers;
|
|
||||||
private ConnectionInfo connectionInfo;
|
|
||||||
|
|
||||||
public ConnectResponseListener( Properties headers, final ConnectionInfo connectionInfo ){
|
|
||||||
this.headers = headers;
|
|
||||||
this.connectionInfo = connectionInfo;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean onResponse(Response receipt, DataOutput out) throws IOException {
|
|
||||||
|
|
||||||
if (receipt.getCorrelationId() != connectionInfo.getCommandId())
|
|
||||||
return false;
|
|
||||||
|
|
||||||
final SessionInfo sessionInfo = new SessionInfo(format.getSessionId());
|
|
||||||
sessionInfo.setCommandId(format.generateCommandId());
|
|
||||||
sessionInfo.setResponseRequired(false);
|
|
||||||
|
|
||||||
final ProducerInfo producerInfo = new ProducerInfo(format.getProducerId());
|
|
||||||
producerInfo.setCommandId(format.generateCommandId());
|
|
||||||
producerInfo.setResponseRequired(true);
|
|
||||||
|
|
||||||
format.addResponseListener( new ResponseListener(){
|
|
||||||
public boolean onResponse(Response receipt, DataOutput out) throws IOException {
|
|
||||||
if (receipt.getCorrelationId() != producerInfo.getCommandId() )
|
|
||||||
return false;
|
|
||||||
|
|
||||||
format.onFullyConnected();
|
|
||||||
|
|
||||||
StringBuffer buffer = new StringBuffer();
|
|
||||||
buffer.append(Stomp.Responses.CONNECTED);
|
|
||||||
buffer.append(Stomp.NEWLINE);
|
|
||||||
buffer.append(Stomp.Headers.Connected.SESSION);
|
|
||||||
buffer.append(Stomp.Headers.SEPERATOR);
|
|
||||||
buffer.append(connectionInfo.getClientId());
|
|
||||||
if( headers.containsKey(Stomp.Headers.Connect.REQUEST_ID) ){
|
|
||||||
buffer.append(Stomp.NEWLINE);
|
|
||||||
buffer.append(Stomp.Headers.Connected.RESPONSE_ID);
|
|
||||||
buffer.append(Stomp.Headers.SEPERATOR);
|
|
||||||
buffer.append(headers.getProperty( Stomp.Headers.Connect.REQUEST_ID ));
|
|
||||||
}
|
|
||||||
buffer.append(Stomp.NEWLINE);
|
|
||||||
buffer.append(Stomp.NEWLINE);
|
|
||||||
buffer.append(Stomp.NULL);
|
|
||||||
buffer.append(Stomp.NEWLINE);
|
|
||||||
out.writeBytes(buffer.toString());
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
format.addToPendingReadCommands(sessionInfo);
|
|
||||||
format.addToPendingReadCommands(producerInfo);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,62 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
|
||||||
|
|
||||||
import javax.jms.Destination;
|
|
||||||
|
|
||||||
import java.net.ProtocolException;
|
|
||||||
|
|
||||||
class DestinationNamer {
|
|
||||||
static ActiveMQDestination convert(String name) throws ProtocolException {
|
|
||||||
if (name == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
else if (name.startsWith("/queue/")) {
|
|
||||||
String q_name = name.substring("/queue/".length(), name.length());
|
|
||||||
return ActiveMQDestination.createDestination(q_name, ActiveMQDestination.QUEUE_TYPE);
|
|
||||||
}
|
|
||||||
else if (name.startsWith("/topic/")) {
|
|
||||||
String t_name = name.substring("/topic/".length(), name.length());
|
|
||||||
return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TOPIC_TYPE);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ TTMP destinations " + "must begine with /queue/ or /topic/");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
static String convert(Destination d) {
|
|
||||||
if (d == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
ActiveMQDestination amq_d = (ActiveMQDestination) d;
|
|
||||||
String p_name = amq_d.getPhysicalName();
|
|
||||||
|
|
||||||
StringBuffer buffer = new StringBuffer();
|
|
||||||
if (amq_d.isQueue()) {
|
|
||||||
buffer.append("/queue/");
|
|
||||||
}
|
|
||||||
if (amq_d.isTopic()) {
|
|
||||||
buffer.append("/topic/");
|
|
||||||
}
|
|
||||||
buffer.append(p_name);
|
|
||||||
|
|
||||||
return buffer.toString();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,32 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.ShutdownInfo;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
class Disconnect implements StompCommand {
|
|
||||||
|
|
||||||
public CommandEnvelope build(String line, DataInput in) throws IOException {
|
|
||||||
while (in.readByte() != 0) {
|
|
||||||
}
|
|
||||||
return new CommandEnvelope(new ShutdownInfo(), new Properties());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,122 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
|
|
||||||
class FrameBuilder {
|
|
||||||
private String command;
|
|
||||||
private Properties headers = new Properties();
|
|
||||||
private byte[] body = new byte[0];
|
|
||||||
|
|
||||||
public FrameBuilder(String command) {
|
|
||||||
this.command = command;
|
|
||||||
}
|
|
||||||
|
|
||||||
public FrameBuilder addHeader(String key, String value) {
|
|
||||||
if (value != null) {
|
|
||||||
this.headers.setProperty(key, value);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public FrameBuilder addHeader(String key, long value) {
|
|
||||||
this.headers.put(key, new Long(value));
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public FrameBuilder addHeaders(ActiveMQMessage message) throws IOException {
|
|
||||||
addHeader(Stomp.Headers.Message.DESTINATION, DestinationNamer.convert(message.getDestination()));
|
|
||||||
addHeader(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
|
|
||||||
addHeader(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID());
|
|
||||||
addHeader(Stomp.Headers.Message.EXPIRATION_TIME, message.getJMSExpiration());
|
|
||||||
if (message.getJMSRedelivered()) {
|
|
||||||
addHeader(Stomp.Headers.Message.REDELIVERED, "true");
|
|
||||||
}
|
|
||||||
addHeader(Stomp.Headers.Message.PRORITY, message.getJMSPriority());
|
|
||||||
addHeader(Stomp.Headers.Message.REPLY_TO, DestinationNamer.convert(message.getJMSReplyTo()));
|
|
||||||
addHeader(Stomp.Headers.Message.TIMESTAMP, message.getJMSTimestamp());
|
|
||||||
addHeader(Stomp.Headers.Message.TYPE, message.getJMSType());
|
|
||||||
|
|
||||||
// now lets add all the message headers
|
|
||||||
Map properties = message.getProperties();
|
|
||||||
if (properties != null) {
|
|
||||||
headers.putAll(properties);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public FrameBuilder setBody(byte[] body) {
|
|
||||||
this.body = body;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String toString() {
|
|
||||||
StringBuffer buffer = new StringBuffer();
|
|
||||||
buffer.append(command);
|
|
||||||
buffer.append(Stomp.NEWLINE);
|
|
||||||
for (Iterator iterator = headers.keySet().iterator(); iterator.hasNext();) {
|
|
||||||
String key = (String) iterator.next();
|
|
||||||
String property = headers.getProperty(key);
|
|
||||||
if (property != null) {
|
|
||||||
buffer.append(key);
|
|
||||||
buffer.append(Stomp.Headers.SEPERATOR);
|
|
||||||
buffer.append(property);
|
|
||||||
buffer.append(Stomp.NEWLINE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
buffer.append(Stomp.NEWLINE);
|
|
||||||
buffer.append(body);
|
|
||||||
buffer.append(Stomp.NULL);
|
|
||||||
buffer.append(Stomp.NEWLINE);
|
|
||||||
return buffer.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
byte[] toFrame() {
|
|
||||||
ByteArrayOutputStream bout = new ByteArrayOutputStream();
|
|
||||||
try {
|
|
||||||
bout.write(command.getBytes("UTF-8"));
|
|
||||||
bout.write(Stomp.NEWLINE.getBytes("UTF-8"));
|
|
||||||
for (Iterator iterator = headers.entrySet().iterator(); iterator.hasNext();) {
|
|
||||||
Map.Entry entry = (Entry) iterator.next();
|
|
||||||
String key = (String) entry.getKey();
|
|
||||||
String property = entry.getValue().toString();
|
|
||||||
if (property != null) {
|
|
||||||
bout.write(key.getBytes("UTF-8"));
|
|
||||||
bout.write(Stomp.Headers.SEPERATOR.getBytes("UTF-8"));
|
|
||||||
bout.write(property.getBytes("UTF-8"));
|
|
||||||
bout.write(Stomp.NEWLINE.getBytes("UTF-8"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bout.write(Stomp.NEWLINE.getBytes("UTF-8"));
|
|
||||||
bout.write(body);
|
|
||||||
bout.write(Stomp.NULL.getBytes("UTF-8"));
|
|
||||||
bout.write(Stomp.NEWLINE.getBytes("UTF-8"));
|
|
||||||
}
|
|
||||||
catch (IOException e) {
|
|
||||||
throw new RuntimeException("World is caving in, we just got io error writing to" + "a byte array output stream we instantiated!");
|
|
||||||
}
|
|
||||||
return bout.toByteArray();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,71 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.ProtocolException;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
class HeaderParser {
|
|
||||||
/**
|
|
||||||
* Reads headers up through the blank line
|
|
||||||
*
|
|
||||||
* @param in
|
|
||||||
* @return
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
Properties parse(BufferedReader in) throws IOException {
|
|
||||||
Properties props = new Properties();
|
|
||||||
while (true) {
|
|
||||||
String line = in.readLine();
|
|
||||||
if (line != null && line.trim().length() > 0) {
|
|
||||||
int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
|
|
||||||
String name = line.substring(0, seperator_index).trim();
|
|
||||||
String value = line.substring(seperator_index + 1, line.length()).trim();
|
|
||||||
props.setProperty(name, value);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return props;
|
|
||||||
}
|
|
||||||
|
|
||||||
Properties parse(DataInput in) throws IOException {
|
|
||||||
Properties props = new Properties();
|
|
||||||
while (true) {
|
|
||||||
String line = in.readLine();
|
|
||||||
if (line != null && line.trim().length() > 0) {
|
|
||||||
try {
|
|
||||||
int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
|
|
||||||
String name = line.substring(0, seperator_index).trim();
|
|
||||||
String value = line.substring(seperator_index + 1, line.length()).trim();
|
|
||||||
props.setProperty(name, value);
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw new ProtocolException("Unable to parser header line [" + line + "]");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return props;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,29 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.Response;
|
|
||||||
|
|
||||||
import java.io.DataOutput;
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
interface ResponseListener {
|
|
||||||
/**
|
|
||||||
* Return true if you handled this, false otherwise
|
|
||||||
*/
|
|
||||||
boolean onResponse(Response receipt, DataOutput out) throws IOException;
|
|
||||||
}
|
|
|
@ -1,153 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.ProtocolException;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.ActiveMQBytesMessage;
|
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
|
||||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
|
||||||
import org.apache.activemq.command.TransactionId;
|
|
||||||
|
|
||||||
class Send implements StompCommand {
|
|
||||||
private final HeaderParser parser = new HeaderParser();
|
|
||||||
private final StompWireFormat format;
|
|
||||||
|
|
||||||
Send(StompWireFormat format) {
|
|
||||||
this.format = format;
|
|
||||||
}
|
|
||||||
|
|
||||||
public CommandEnvelope build(String commandLine, DataInput in) throws IOException, JMSException {
|
|
||||||
Properties headers = parser.parse(in);
|
|
||||||
String destination = (String) headers.remove(Stomp.Headers.Send.DESTINATION);
|
|
||||||
// now the body
|
|
||||||
ActiveMQMessage msg;
|
|
||||||
if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
|
|
||||||
ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
|
|
||||||
String content_length = headers.getProperty(Stomp.Headers.CONTENT_LENGTH).trim();
|
|
||||||
int length;
|
|
||||||
try {
|
|
||||||
length = Integer.parseInt(content_length);
|
|
||||||
}
|
|
||||||
catch (NumberFormatException e) {
|
|
||||||
throw new ProtocolException("Specified content-length is not a valid integer");
|
|
||||||
}
|
|
||||||
byte[] bytes = new byte[length];
|
|
||||||
in.readFully(bytes);
|
|
||||||
byte nil = in.readByte();
|
|
||||||
if (nil != 0)
|
|
||||||
throw new ProtocolException("content-length bytes were read and " + "there was no trailing null byte");
|
|
||||||
bm.writeBytes(bytes);
|
|
||||||
msg = bm;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
ActiveMQTextMessage text = new ActiveMQTextMessage();
|
|
||||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
|
||||||
byte b;
|
|
||||||
while ((b = in.readByte()) != 0) {
|
|
||||||
bytes.write(b);
|
|
||||||
}
|
|
||||||
bytes.close();
|
|
||||||
String body = new String(bytes.toByteArray());
|
|
||||||
try {
|
|
||||||
text.setText(body);
|
|
||||||
}
|
|
||||||
catch (JMSException e) {
|
|
||||||
throw new RuntimeException("Something is really wrong, we instantiated this thing!");
|
|
||||||
}
|
|
||||||
msg = text;
|
|
||||||
}
|
|
||||||
|
|
||||||
msg.setProducerId(format.getProducerId());
|
|
||||||
msg.setMessageId(format.createMessageId());
|
|
||||||
msg.setJMSTimestamp(System.currentTimeMillis());
|
|
||||||
|
|
||||||
|
|
||||||
ActiveMQDestination d = DestinationNamer.convert(destination);
|
|
||||||
msg.setDestination(d);
|
|
||||||
// msg.setJMSClientID(format.getClientId());
|
|
||||||
|
|
||||||
// the standard JMS headers
|
|
||||||
msg.setJMSCorrelationID((String) headers.remove(Stomp.Headers.Send.CORRELATION_ID));
|
|
||||||
|
|
||||||
Object expiration = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
|
|
||||||
if (expiration != null) {
|
|
||||||
msg.setJMSExpiration(asLong(expiration));
|
|
||||||
}
|
|
||||||
Object priority = headers.remove(Stomp.Headers.Send.PRIORITY);
|
|
||||||
if (priority != null) {
|
|
||||||
msg.setJMSPriority(asInt(priority));
|
|
||||||
}
|
|
||||||
Object type = headers.remove(Stomp.Headers.Send.TYPE);
|
|
||||||
if (type != null) {
|
|
||||||
msg.setJMSType((String) type);
|
|
||||||
}
|
|
||||||
|
|
||||||
msg.setJMSReplyTo(DestinationNamer.convert((String) headers.remove(Stomp.Headers.Send.REPLY_TO)));
|
|
||||||
|
|
||||||
Object persistent = headers.remove(Stomp.Headers.Send.PERSISTENT);
|
|
||||||
if (persistent != null) {
|
|
||||||
msg.setPersistent(asBool(persistent));
|
|
||||||
}
|
|
||||||
|
|
||||||
// No need to carry the content length in the JMS headers.
|
|
||||||
headers.remove(Stomp.Headers.CONTENT_LENGTH);
|
|
||||||
|
|
||||||
// now the general headers
|
|
||||||
msg.setProperties(headers);
|
|
||||||
|
|
||||||
if (headers.containsKey(Stomp.Headers.TRANSACTION)) {
|
|
||||||
TransactionId tx_id = format.getTransactionId(headers.getProperty(Stomp.Headers.TRANSACTION));
|
|
||||||
if (tx_id == null)
|
|
||||||
throw new ProtocolException(headers.getProperty(Stomp.Headers.TRANSACTION) + " is an invalid transaction id");
|
|
||||||
msg.setTransactionId(tx_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
msg.onSend();
|
|
||||||
return new CommandEnvelope(msg, headers);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected boolean asBool(Object value) {
|
|
||||||
if (value != null) {
|
|
||||||
return String.valueOf(value).equals("true");
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected long asLong(Object value) {
|
|
||||||
if (value instanceof Number) {
|
|
||||||
Number n = (Number) value;
|
|
||||||
return n.longValue();
|
|
||||||
}
|
|
||||||
return Long.parseLong(value.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected int asInt(Object value) {
|
|
||||||
if (value instanceof Number) {
|
|
||||||
Number n = (Number) value;
|
|
||||||
return n.intValue();
|
|
||||||
}
|
|
||||||
return Integer.parseInt(value.toString());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,39 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
interface StompCommand
|
|
||||||
{
|
|
||||||
public CommandEnvelope build(String commandLine, DataInput in) throws IOException, JMSException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a command instance which always returns null for a packet
|
|
||||||
*/
|
|
||||||
StompCommand NULL_COMMAND = new StompCommand()
|
|
||||||
{
|
|
||||||
public CommandEnvelope build(String commandLine, DataInput in)
|
|
||||||
{
|
|
||||||
return new CommandEnvelope(null, new Properties());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
|
@ -1,32 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.apache.activemq.transport.tcp.TcpTransportFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A <a href="http://stomp.codehaus.org/">Stomp</a> transport factory
|
|
||||||
*
|
|
||||||
* @version $Revision: 1.1.1.1 $
|
|
||||||
*/
|
|
||||||
public class StompTransportFactory extends TcpTransportFactory {
|
|
||||||
|
|
||||||
protected String getDefaultWireFormatType() {
|
|
||||||
return "stomp";
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,327 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.DataInputStream;
|
|
||||||
import java.io.DataOutput;
|
|
||||||
import java.io.DataOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.ProtocolException;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
|
||||||
|
|
||||||
import org.apache.activeio.adapter.PacketInputStream;
|
|
||||||
import org.apache.activeio.command.WireFormat;
|
|
||||||
import org.apache.activeio.packet.ByteArrayPacket;
|
|
||||||
import org.apache.activeio.packet.Packet;
|
|
||||||
import org.apache.activeio.util.ByteArrayOutputStream;
|
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
|
||||||
import org.apache.activemq.command.Command;
|
|
||||||
import org.apache.activemq.command.CommandTypes;
|
|
||||||
import org.apache.activemq.command.ConnectionId;
|
|
||||||
import org.apache.activemq.command.ConnectionInfo;
|
|
||||||
import org.apache.activemq.command.ConsumerId;
|
|
||||||
import org.apache.activemq.command.FlushCommand;
|
|
||||||
import org.apache.activemq.command.LocalTransactionId;
|
|
||||||
import org.apache.activemq.command.Message;
|
|
||||||
import org.apache.activemq.command.MessageDispatch;
|
|
||||||
import org.apache.activemq.command.MessageId;
|
|
||||||
import org.apache.activemq.command.ProducerId;
|
|
||||||
import org.apache.activemq.command.Response;
|
|
||||||
import org.apache.activemq.command.SessionId;
|
|
||||||
import org.apache.activemq.command.TransactionId;
|
|
||||||
import org.apache.activemq.filter.DestinationMap;
|
|
||||||
import org.apache.activemq.util.IOExceptionSupport;
|
|
||||||
import org.apache.activemq.util.IdGenerator;
|
|
||||||
import org.apache.activemq.util.LongSequenceGenerator;
|
|
||||||
|
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
|
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
|
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Implements the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
|
|
||||||
*/
|
|
||||||
public class StompWireFormat implements WireFormat {
|
|
||||||
|
|
||||||
private static final IdGenerator connectionIdGenerator = new IdGenerator();
|
|
||||||
private static int transactionIdCounter;
|
|
||||||
|
|
||||||
private int version = 1;
|
|
||||||
private final CommandParser commandParser = new CommandParser(this);
|
|
||||||
private final HeaderParser headerParser = new HeaderParser();
|
|
||||||
|
|
||||||
private final BlockingQueue pendingReadCommands = new LinkedBlockingQueue();
|
|
||||||
private final BlockingQueue pendingWriteFrames = new LinkedBlockingQueue();
|
|
||||||
private final List receiptListeners = new CopyOnWriteArrayList();
|
|
||||||
private final Map subscriptionsByConsumerId = new ConcurrentHashMap();
|
|
||||||
private final Map subscriptionsByName = new ConcurrentHashMap();
|
|
||||||
private final DestinationMap subscriptionsByDestination = new DestinationMap();
|
|
||||||
private final Map transactions = new ConcurrentHashMap();
|
|
||||||
private final Map dispachedMap = new ConcurrentHashMap();
|
|
||||||
private short lastCommandId;
|
|
||||||
|
|
||||||
private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
|
|
||||||
private final SessionId sessionId = new SessionId(connectionId, -1);
|
|
||||||
private final ProducerId producerId = new ProducerId(sessionId, 1);
|
|
||||||
|
|
||||||
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
|
|
||||||
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
|
|
||||||
|
|
||||||
void addResponseListener(ResponseListener listener) {
|
|
||||||
receiptListeners.add(listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean connected = false;
|
|
||||||
|
|
||||||
public Command readCommand(DataInput in) throws IOException, JMSException {
|
|
||||||
Command pending = (Command) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn() {
|
|
||||||
public Object cycle() throws InterruptedException {
|
|
||||||
return pendingReadCommands.poll(0, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (pending != null) {
|
|
||||||
return pending;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
Command command = commandParser.parse(in);
|
|
||||||
addToPendingReadCommands(command);
|
|
||||||
|
|
||||||
command = (Command) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn() {
|
|
||||||
public Object cycle() throws InterruptedException {
|
|
||||||
return pendingReadCommands.poll(0, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if( !connected ) {
|
|
||||||
if( command.getDataStructureType() != ConnectionInfo.DATA_STRUCTURE_TYPE )
|
|
||||||
throw new IOException("Not yet connected.");
|
|
||||||
}
|
|
||||||
return command;
|
|
||||||
|
|
||||||
}
|
|
||||||
catch (ProtocolException e) {
|
|
||||||
sendError(e.getMessage());
|
|
||||||
return FlushCommand.COMMAND;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public Command writeCommand(final Command packet, final DataOutput out) throws IOException, JMSException {
|
|
||||||
flushPendingFrames(out);
|
|
||||||
|
|
||||||
// It may have just been a flush request.
|
|
||||||
if (packet == null)
|
|
||||||
return null;
|
|
||||||
|
|
||||||
if (packet.getDataStructureType() == CommandTypes.RESPONSE) {
|
|
||||||
assert (packet instanceof Response);
|
|
||||||
Response receipt = (Response) packet;
|
|
||||||
for (int i = 0; i < receiptListeners.size(); i++) {
|
|
||||||
ResponseListener listener = (ResponseListener) receiptListeners.get(i);
|
|
||||||
if (listener.onResponse(receipt, out)) {
|
|
||||||
receiptListeners.remove(listener);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if( packet.isMessageDispatch() ) {
|
|
||||||
MessageDispatch md = (MessageDispatch)packet;
|
|
||||||
Message message = md.getMessage();
|
|
||||||
Subscription sub = (Subscription) subscriptionsByConsumerId.get(md.getConsumerId());
|
|
||||||
if (sub != null)
|
|
||||||
sub.receive(md, out);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void flushPendingFrames(final DataOutput out) throws IOException {
|
|
||||||
boolean interrupted = false;
|
|
||||||
do {
|
|
||||||
try {
|
|
||||||
byte[] frame = (byte[]) pendingWriteFrames.poll(0, TimeUnit.MILLISECONDS);
|
|
||||||
if (frame == null)
|
|
||||||
return;
|
|
||||||
out.write(frame);
|
|
||||||
}
|
|
||||||
catch (InterruptedException e) {
|
|
||||||
interrupted = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
while (interrupted);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendError(final String message) {
|
|
||||||
// System.err.println("sending error [" + message + "]");
|
|
||||||
AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() {
|
|
||||||
public void cycle() throws InterruptedException {
|
|
||||||
pendingWriteFrames.put(new FrameBuilder(Stomp.Responses.ERROR).addHeader(Stomp.Headers.Error.MESSAGE, message).toFrame());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public void onFullyConnected() {
|
|
||||||
connected=true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addToPendingReadCommands(final Command info) {
|
|
||||||
AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() {
|
|
||||||
public void cycle() throws InterruptedException {
|
|
||||||
pendingReadCommands.put(info);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void clearTransactionId(String user_tx_id) {
|
|
||||||
this.transactions.remove(user_tx_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
public SessionId getSessionId() {
|
|
||||||
return sessionId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ProducerId getProducerId() {
|
|
||||||
return producerId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public Subscription getSubcription(ConsumerId consumerId) {
|
|
||||||
return (Subscription) subscriptionsByConsumerId.get(consumerId);
|
|
||||||
}
|
|
||||||
public Set getSubcriptions(ActiveMQDestination destination) {
|
|
||||||
return subscriptionsByDestination.get(destination);
|
|
||||||
}
|
|
||||||
public Subscription getSubcription(String name) {
|
|
||||||
return (Subscription) subscriptionsByName.get(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addSubscription(Subscription s) {
|
|
||||||
if (s.getSubscriptionId()!=null && subscriptionsByName.containsKey(s.getSubscriptionId())) {
|
|
||||||
Subscription old = (Subscription) subscriptionsByName.get(s.getSubscriptionId());
|
|
||||||
removeSubscription(old);
|
|
||||||
enqueueCommand(old.close());
|
|
||||||
}
|
|
||||||
if( s.getSubscriptionId()!=null )
|
|
||||||
subscriptionsByName.put(s.getSubscriptionId(), s);
|
|
||||||
subscriptionsByConsumerId.put(s.getConsumerInfo().getConsumerId(), s);
|
|
||||||
subscriptionsByDestination.put(s.getConsumerInfo().getDestination(), s);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void removeSubscription(Subscription s) {
|
|
||||||
if( s.getSubscriptionId()!=null )
|
|
||||||
subscriptionsByName.remove(s.getSubscriptionId());
|
|
||||||
subscriptionsByConsumerId.remove(s.getConsumerInfo().getConsumerId());
|
|
||||||
subscriptionsByDestination.remove(s.getConsumerInfo().getDestination(), s);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void enqueueCommand(final Command ack) {
|
|
||||||
AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper() {
|
|
||||||
public void cycle() throws InterruptedException {
|
|
||||||
pendingReadCommands.put(ack);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public TransactionId getTransactionId(String key) {
|
|
||||||
return (TransactionId) transactions.get(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
public TransactionId registerTransactionId(String user_tx_id, int tx_id) {
|
|
||||||
LocalTransactionId transactionId = new LocalTransactionId(getConnectionId(), tx_id);
|
|
||||||
transactions.put(user_tx_id, transactionId);
|
|
||||||
return transactionId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getVersion() {
|
|
||||||
return version;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setVersion(int version) {
|
|
||||||
this.version = version;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ConnectionId getConnectionId() {
|
|
||||||
return connectionId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static synchronized int generateTransactionId() {
|
|
||||||
return ++transactionIdCounter;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ConsumerId createConsumerId() {
|
|
||||||
return new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
|
|
||||||
}
|
|
||||||
|
|
||||||
public MessageId createMessageId() {
|
|
||||||
return new MessageId(producerId, messageIdGenerator.getNextSequenceId());
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized public short generateCommandId() {
|
|
||||||
return lastCommandId++;
|
|
||||||
}
|
|
||||||
|
|
||||||
public SessionId generateSessionId() {
|
|
||||||
throw new RuntimeException("TODO!!");
|
|
||||||
}
|
|
||||||
|
|
||||||
public Packet marshal(Object command) throws IOException {
|
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
||||||
DataOutputStream dos = new DataOutputStream(baos);
|
|
||||||
marshal(command, dos);
|
|
||||||
dos.close();
|
|
||||||
return new ByteArrayPacket(baos.toByteSequence());
|
|
||||||
}
|
|
||||||
|
|
||||||
public Object unmarshal(Packet packet) throws IOException {
|
|
||||||
PacketInputStream stream = new PacketInputStream(packet);
|
|
||||||
DataInputStream dis = new DataInputStream(stream);
|
|
||||||
return unmarshal(dis);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void marshal(Object command, DataOutputStream os) throws IOException {
|
|
||||||
try {
|
|
||||||
writeCommand((Command) command, os);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw e;
|
|
||||||
} catch (JMSException e) {
|
|
||||||
throw IOExceptionSupport.create(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public Object unmarshal(DataInputStream is) throws IOException {
|
|
||||||
try {
|
|
||||||
return readCommand(is);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw e;
|
|
||||||
} catch (JMSException e) {
|
|
||||||
throw IOExceptionSupport.create(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public Map getDispachedMap() {
|
|
||||||
return dispachedMap;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,29 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.apache.activeio.command.WireFormat;
|
|
||||||
import org.apache.activeio.command.WireFormatFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates WireFormat objects that implement the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
|
|
||||||
*/
|
|
||||||
public class StompWireFormatFactory implements WireFormatFactory {
|
|
||||||
public WireFormat createWireFormat() {
|
|
||||||
return new StompWireFormat();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,66 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
|
||||||
import org.apache.activemq.util.IntrospectionSupport;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
class Subscribe implements StompCommand {
|
|
||||||
private HeaderParser headerParser = new HeaderParser();
|
|
||||||
private StompWireFormat format;
|
|
||||||
|
|
||||||
Subscribe(StompWireFormat format) {
|
|
||||||
this.format = format;
|
|
||||||
}
|
|
||||||
|
|
||||||
public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
|
|
||||||
Properties headers = headerParser.parse(in);
|
|
||||||
|
|
||||||
String subscriptionId = headers.getProperty(Stomp.Headers.Subscribe.ID);
|
|
||||||
String destination = headers.getProperty(Stomp.Headers.Subscribe.DESTINATION);
|
|
||||||
|
|
||||||
ActiveMQDestination actual_dest = DestinationNamer.convert(destination);
|
|
||||||
ConsumerInfo ci = new ConsumerInfo(format.createConsumerId());
|
|
||||||
ci.setPrefetchSize(1000);
|
|
||||||
ci.setDispatchAsync(true);
|
|
||||||
|
|
||||||
String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR);
|
|
||||||
ci.setSelector(selector);
|
|
||||||
|
|
||||||
IntrospectionSupport.setProperties(ci, headers, "activemq.");
|
|
||||||
|
|
||||||
ci.setDestination(DestinationNamer.convert(destination));
|
|
||||||
|
|
||||||
while (in.readByte() != 0) {
|
|
||||||
}
|
|
||||||
|
|
||||||
Subscription s = new Subscription(format, subscriptionId, ci);
|
|
||||||
s.setDestination(actual_dest);
|
|
||||||
String ack_mode_key = headers.getProperty(Stomp.Headers.Subscribe.ACK_MODE);
|
|
||||||
if (ack_mode_key != null && ack_mode_key.equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT)) {
|
|
||||||
s.setAckMode(Subscription.CLIENT_ACK);
|
|
||||||
}
|
|
||||||
|
|
||||||
format.addSubscription(s);
|
|
||||||
return new CommandEnvelope(ci, headers);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,148 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import java.io.DataOutput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.ActiveMQBytesMessage;
|
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
|
||||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
|
||||||
import org.apache.activemq.command.MessageAck;
|
|
||||||
import org.apache.activemq.command.MessageDispatch;
|
|
||||||
import org.apache.activemq.command.RemoveInfo;
|
|
||||||
|
|
||||||
public class Subscription {
|
|
||||||
|
|
||||||
public static final int AUTO_ACK = 1;
|
|
||||||
public static final int CLIENT_ACK = 2;
|
|
||||||
|
|
||||||
public static final String NO_ID = "~~ NO SUCH THING ~~%%@#!Q";
|
|
||||||
|
|
||||||
private ActiveMQDestination destination;
|
|
||||||
private int ackMode = AUTO_ACK;
|
|
||||||
private StompWireFormat format;
|
|
||||||
|
|
||||||
private final String subscriptionId;
|
|
||||||
private final ConsumerInfo consumerInfo;
|
|
||||||
private final LinkedList dispatchedMessages = new LinkedList();
|
|
||||||
|
|
||||||
public Subscription(StompWireFormat format, String subscriptionId, ConsumerInfo consumerInfo) {
|
|
||||||
this.format = format;
|
|
||||||
this.subscriptionId = subscriptionId;
|
|
||||||
this.consumerInfo = consumerInfo;
|
|
||||||
}
|
|
||||||
|
|
||||||
void setDestination(ActiveMQDestination actual_dest) {
|
|
||||||
this.destination = actual_dest;
|
|
||||||
}
|
|
||||||
|
|
||||||
void receive(MessageDispatch md, DataOutput out) throws IOException, JMSException {
|
|
||||||
|
|
||||||
ActiveMQMessage m = (ActiveMQMessage) md.getMessage();
|
|
||||||
|
|
||||||
if (ackMode == CLIENT_ACK) {
|
|
||||||
Subscription sub = format.getSubcription(md.getConsumerId());
|
|
||||||
sub.addMessageDispatch(md);
|
|
||||||
format.getDispachedMap().put(m.getJMSMessageID(), sub);
|
|
||||||
}
|
|
||||||
else if (ackMode == AUTO_ACK) {
|
|
||||||
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
|
|
||||||
format.enqueueCommand(ack);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
throw new JMSException("Unknown ackMode: " + ackMode);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
FrameBuilder builder = new FrameBuilder(Stomp.Responses.MESSAGE);
|
|
||||||
builder.addHeaders(m);
|
|
||||||
|
|
||||||
if( m.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) {
|
|
||||||
ActiveMQTextMessage msg = (ActiveMQTextMessage)m.copy();
|
|
||||||
builder.setBody(msg.getText().getBytes("UTF-8"));
|
|
||||||
} else if( m.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE ) {
|
|
||||||
ActiveMQBytesMessage msg = (ActiveMQBytesMessage)m.copy();
|
|
||||||
byte[] data = new byte[(int)msg.getBodyLength()];
|
|
||||||
msg.readBytes(data);
|
|
||||||
builder.addHeader(Stomp.Headers.CONTENT_LENGTH, data.length);
|
|
||||||
builder.setBody(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (subscriptionId!=null) {
|
|
||||||
builder.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
|
|
||||||
}
|
|
||||||
|
|
||||||
out.write(builder.toFrame());
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized private void addMessageDispatch(MessageDispatch md) {
|
|
||||||
dispatchedMessages.addLast(md);
|
|
||||||
}
|
|
||||||
|
|
||||||
ActiveMQDestination getDestination() {
|
|
||||||
return destination;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setAckMode(int clientAck) {
|
|
||||||
this.ackMode = clientAck;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RemoveInfo close() {
|
|
||||||
return new RemoveInfo(consumerInfo.getConsumerId());
|
|
||||||
}
|
|
||||||
|
|
||||||
public ConsumerInfo getConsumerInfo() {
|
|
||||||
return consumerInfo;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getSubscriptionId() {
|
|
||||||
return subscriptionId;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized public MessageAck createMessageAck(String message_id) {
|
|
||||||
MessageAck ack = new MessageAck();
|
|
||||||
ack.setDestination(consumerInfo.getDestination());
|
|
||||||
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
|
|
||||||
ack.setConsumerId(consumerInfo.getConsumerId());
|
|
||||||
|
|
||||||
int count=0;
|
|
||||||
for (Iterator iter = dispatchedMessages.iterator(); iter.hasNext();) {
|
|
||||||
|
|
||||||
MessageDispatch md = (MessageDispatch) iter.next();
|
|
||||||
String id = ((ActiveMQMessage)md.getMessage()).getJMSMessageID();
|
|
||||||
if( ack.getFirstMessageId()==null )
|
|
||||||
ack.setFirstMessageId(md.getMessage().getMessageId());
|
|
||||||
|
|
||||||
format.getDispachedMap().remove(id);
|
|
||||||
iter.remove();
|
|
||||||
count++;
|
|
||||||
if( id.equals(message_id) ) {
|
|
||||||
ack.setLastMessageId(md.getMessage().getMessageId());
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ack.setMessageCount(count);
|
|
||||||
return ack;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,62 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.ProtocolException;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
public class Unsubscribe implements StompCommand {
|
|
||||||
private static final HeaderParser parser = new HeaderParser();
|
|
||||||
private final StompWireFormat format;
|
|
||||||
|
|
||||||
Unsubscribe(StompWireFormat format) {
|
|
||||||
this.format = format;
|
|
||||||
}
|
|
||||||
|
|
||||||
public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
|
|
||||||
Properties headers = parser.parse(in);
|
|
||||||
while (in.readByte() == 0) {
|
|
||||||
}
|
|
||||||
|
|
||||||
String subscriptionId = headers.getProperty(Stomp.Headers.Unsubscribe.ID);
|
|
||||||
String destination = headers.getProperty(Stomp.Headers.Unsubscribe.DESTINATION);
|
|
||||||
|
|
||||||
|
|
||||||
if( subscriptionId!=null ) {
|
|
||||||
Subscription s = format.getSubcription(subscriptionId);
|
|
||||||
format.removeSubscription(s);
|
|
||||||
return new CommandEnvelope(s.close(), headers);
|
|
||||||
}
|
|
||||||
|
|
||||||
ActiveMQDestination d = DestinationNamer.convert(destination);
|
|
||||||
Set subs = format.getSubcriptions(d);
|
|
||||||
for (Iterator iter = subs.iterator(); iter.hasNext();) {
|
|
||||||
Subscription s = (Subscription) iter.next();
|
|
||||||
format.removeSubscription(s);
|
|
||||||
return new CommandEnvelope(s.close(), headers);
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new ProtocolException("Unexpected UNSUBSCRIBE received.");
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -48,7 +48,6 @@ import org.apache.activemq.command.SessionInfo;
|
||||||
import org.apache.activemq.command.ShutdownInfo;
|
import org.apache.activemq.command.ShutdownInfo;
|
||||||
import org.apache.activemq.command.TransactionId;
|
import org.apache.activemq.command.TransactionId;
|
||||||
import org.apache.activemq.command.TransactionInfo;
|
import org.apache.activemq.command.TransactionInfo;
|
||||||
import org.apache.activemq.transport.stomp.Stomp;
|
|
||||||
import org.apache.activemq.util.IdGenerator;
|
import org.apache.activemq.util.IdGenerator;
|
||||||
import org.apache.activemq.util.IntrospectionSupport;
|
import org.apache.activemq.util.IntrospectionSupport;
|
||||||
import org.apache.activemq.util.LongSequenceGenerator;
|
import org.apache.activemq.util.LongSequenceGenerator;
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.stomp;
|
package org.apache.activemq.transport.stomp2;
|
||||||
|
|
||||||
public interface Stomp {
|
public interface Stomp {
|
||||||
String NULL = "\u0000";
|
String NULL = "\u0000";
|
|
@ -28,7 +28,6 @@ import org.apache.activemq.command.ConsumerInfo;
|
||||||
import org.apache.activemq.command.MessageAck;
|
import org.apache.activemq.command.MessageAck;
|
||||||
import org.apache.activemq.command.MessageDispatch;
|
import org.apache.activemq.command.MessageDispatch;
|
||||||
import org.apache.activemq.command.MessageId;
|
import org.apache.activemq.command.MessageId;
|
||||||
import org.apache.activemq.transport.stomp.Stomp;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Keeps track of the STOMP susbscription so that acking is correctly done.
|
* Keeps track of the STOMP susbscription so that acking is correctly done.
|
||||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.activeio.packet.ByteArrayPacket;
|
||||||
import org.apache.activeio.packet.ByteSequence;
|
import org.apache.activeio.packet.ByteSequence;
|
||||||
import org.apache.activeio.packet.Packet;
|
import org.apache.activeio.packet.Packet;
|
||||||
import org.apache.activeio.util.ByteArrayOutputStream;
|
import org.apache.activeio.util.ByteArrayOutputStream;
|
||||||
import org.apache.activemq.transport.stomp.Stomp;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
|
* Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.TransportConnector;
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||||
|
import org.apache.activemq.transport.stomp2.Stomp;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
|
|
@ -1,121 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Copyright 2005-2006 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
|
||||||
import org.apache.activemq.command.Command;
|
|
||||||
import org.apache.activemq.command.*;
|
|
||||||
import org.apache.activemq.transport.stomp.Stomp;
|
|
||||||
import org.apache.activemq.transport.stomp.StompWireFormat;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.DataInputStream;
|
|
||||||
import java.io.DataOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
|
||||||
|
|
||||||
public class StompWireFormatTest extends TestCase {
|
|
||||||
|
|
||||||
protected static final Log log = LogFactory.getLog(StompWireFormatTest.class);
|
|
||||||
|
|
||||||
private StompWireFormat wire;
|
|
||||||
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
wire = new StompWireFormat();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testValidConnectHandshake() throws Exception {
|
|
||||||
ByteArrayOutputStream bout = new ByteArrayOutputStream();
|
|
||||||
DataOutputStream dout = new DataOutputStream(bout);
|
|
||||||
|
|
||||||
ConnectionInfo ci = (ConnectionInfo) parseCommand("CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL);
|
|
||||||
assertNotNull(ci);
|
|
||||||
assertTrue(ci.isResponseRequired());
|
|
||||||
|
|
||||||
Response cr = new Response();
|
|
||||||
cr.setCorrelationId(ci.getCommandId());
|
|
||||||
|
|
||||||
String response = writeCommand(cr);
|
|
||||||
log.info("Received: " + response);
|
|
||||||
|
|
||||||
SessionInfo si = (SessionInfo) wire.readCommand(null);
|
|
||||||
assertNotNull(si);
|
|
||||||
assertTrue(!si.isResponseRequired());
|
|
||||||
|
|
||||||
ProducerInfo pi = (ProducerInfo) wire.readCommand(null);
|
|
||||||
assertNotNull(pi);
|
|
||||||
assertTrue(pi.isResponseRequired());
|
|
||||||
|
|
||||||
Response sr = new Response();
|
|
||||||
sr.setCorrelationId(pi.getCommandId());
|
|
||||||
response = writeCommand(sr);
|
|
||||||
log.info("Received: " + response);
|
|
||||||
assertTrue("Response should start with CONNECTED: " + response, response.startsWith("CONNECTED"));
|
|
||||||
|
|
||||||
// now lets test subscribe
|
|
||||||
ConsumerInfo consumerInfo = (ConsumerInfo) parseCommand("SUBSCRIBE\n" + "destination: /queue/foo\n" + "ack: client\n" + "activemq.prefetchSize: 1\n"
|
|
||||||
+ "\n" + Stomp.NULL);
|
|
||||||
assertNotNull(consumerInfo);
|
|
||||||
// assertTrue(consumerInfo.isResponseRequired());
|
|
||||||
assertEquals("prefetch size", 1, consumerInfo.getPrefetchSize());
|
|
||||||
|
|
||||||
cr = new Response();
|
|
||||||
cr.setCorrelationId(consumerInfo.getCommandId());
|
|
||||||
response = writeCommand(cr);
|
|
||||||
log.info("Received: " + response);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void _testFakeServer() throws Exception {
|
|
||||||
final BrokerService container = new BrokerService();
|
|
||||||
new Thread(new Runnable() {
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
container.addConnector("stomp://localhost:61613");
|
|
||||||
container.start();
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
System.err.println("ARGH: caught: " + e);
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}).start();
|
|
||||||
System.err.println("started container");
|
|
||||||
System.err.println("okay, go play");
|
|
||||||
|
|
||||||
System.err.println(System.in.read());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Command parseCommand(String connect_frame) throws IOException, JMSException {
|
|
||||||
DataInputStream din = new DataInputStream(new ByteArrayInputStream(connect_frame.getBytes()));
|
|
||||||
|
|
||||||
return wire.readCommand(din);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String writeCommand(Command command) throws IOException, JMSException {
|
|
||||||
ByteArrayOutputStream bout = new ByteArrayOutputStream();
|
|
||||||
DataOutputStream dout = new DataOutputStream(bout);
|
|
||||||
wire.writeCommand(command, dout);
|
|
||||||
return new String(bout.toByteArray());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in New Issue