mirror of https://github.com/apache/activemq.git
Applied Dejan's patch on AMQ-1272 with some small tweaks.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@605944 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
462921addf
commit
471ea33051
|
@ -25,7 +25,6 @@ import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import javax.jms.Destination;
|
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
@ -33,11 +32,13 @@ import org.apache.activemq.command.ActiveMQMessage;
|
||||||
import org.apache.activemq.command.ActiveMQTempQueue;
|
import org.apache.activemq.command.ActiveMQTempQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTempTopic;
|
import org.apache.activemq.command.ActiveMQTempTopic;
|
||||||
import org.apache.activemq.command.Command;
|
import org.apache.activemq.command.Command;
|
||||||
|
import org.apache.activemq.command.ConnectionError;
|
||||||
import org.apache.activemq.command.ConnectionId;
|
import org.apache.activemq.command.ConnectionId;
|
||||||
import org.apache.activemq.command.ConnectionInfo;
|
import org.apache.activemq.command.ConnectionInfo;
|
||||||
import org.apache.activemq.command.ConsumerId;
|
import org.apache.activemq.command.ConsumerId;
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
import org.apache.activemq.command.DestinationInfo;
|
import org.apache.activemq.command.DestinationInfo;
|
||||||
|
import org.apache.activemq.command.ExceptionResponse;
|
||||||
import org.apache.activemq.command.LocalTransactionId;
|
import org.apache.activemq.command.LocalTransactionId;
|
||||||
import org.apache.activemq.command.MessageAck;
|
import org.apache.activemq.command.MessageAck;
|
||||||
import org.apache.activemq.command.MessageDispatch;
|
import org.apache.activemq.command.MessageDispatch;
|
||||||
|
@ -51,6 +52,7 @@ import org.apache.activemq.command.ShutdownInfo;
|
||||||
import org.apache.activemq.command.TransactionId;
|
import org.apache.activemq.command.TransactionId;
|
||||||
import org.apache.activemq.command.TransactionInfo;
|
import org.apache.activemq.command.TransactionInfo;
|
||||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||||
|
import org.apache.activemq.util.IOExceptionSupport;
|
||||||
import org.apache.activemq.util.IdGenerator;
|
import org.apache.activemq.util.IdGenerator;
|
||||||
import org.apache.activemq.util.IntrospectionSupport;
|
import org.apache.activemq.util.IntrospectionSupport;
|
||||||
import org.apache.activemq.util.LongSequenceGenerator;
|
import org.apache.activemq.util.LongSequenceGenerator;
|
||||||
|
@ -94,18 +96,24 @@ public class ProtocolConverter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ResponseHandler createResponseHandler(StompFrame command) {
|
protected ResponseHandler createResponseHandler(final StompFrame command) {
|
||||||
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
|
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
|
||||||
// A response may not be needed.
|
|
||||||
if (receiptId != null) {
|
if (receiptId != null) {
|
||||||
return new ResponseHandler() {
|
return new ResponseHandler() {
|
||||||
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
|
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
|
||||||
|
if (response.isException()) {
|
||||||
|
// Generally a command can fail.. but that does not invalidate the connection.
|
||||||
|
// We report back the failure but we don't close the connection.
|
||||||
|
Throwable exception = ((ExceptionResponse)response).getException();
|
||||||
|
handleException(exception, command);
|
||||||
|
} else {
|
||||||
StompFrame sc = new StompFrame();
|
StompFrame sc = new StompFrame();
|
||||||
sc.setAction(Stomp.Responses.RECEIPT);
|
sc.setAction(Stomp.Responses.RECEIPT);
|
||||||
sc.setHeaders(new HashMap<String, String>(1));
|
sc.setHeaders(new HashMap<String, String>(1));
|
||||||
sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
|
sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
|
||||||
transportFilter.sendToStomp(sc);
|
transportFilter.sendToStomp(sc);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -160,30 +168,35 @@ public class ProtocolConverter {
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (ProtocolException e) {
|
} catch (ProtocolException e) {
|
||||||
|
handleException(e, command);
|
||||||
// Let the stomp client know about any protocol errors.
|
// Some protocol errors can cause the connection to get closed.
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
||||||
PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
|
|
||||||
e.printStackTrace(stream);
|
|
||||||
stream.close();
|
|
||||||
|
|
||||||
HashMap<String, String> headers = new HashMap<String, String>();
|
|
||||||
headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
|
|
||||||
|
|
||||||
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
|
|
||||||
if (receiptId != null) {
|
|
||||||
headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
|
|
||||||
}
|
|
||||||
|
|
||||||
StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
|
|
||||||
sendToStomp(errorMessage);
|
|
||||||
|
|
||||||
if( e.isFatal() ) {
|
if( e.isFatal() ) {
|
||||||
getTransportFilter().onException(e);
|
getTransportFilter().onException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void handleException(Throwable exception, StompFrame command) throws IOException {
|
||||||
|
// Let the stomp client know about any protocol errors.
|
||||||
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
|
||||||
|
exception.printStackTrace(stream);
|
||||||
|
stream.close();
|
||||||
|
|
||||||
|
HashMap<String, String> headers = new HashMap<String, String>();
|
||||||
|
headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
|
||||||
|
|
||||||
|
if (command != null) {
|
||||||
|
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
|
||||||
|
if (receiptId != null) {
|
||||||
|
headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
|
||||||
|
sendToStomp(errorMessage);
|
||||||
|
}
|
||||||
|
|
||||||
protected void onStompSend(StompFrame command) throws IOException, JMSException {
|
protected void onStompSend(StompFrame command) throws IOException, JMSException {
|
||||||
checkConnected();
|
checkConnected();
|
||||||
|
|
||||||
|
@ -393,7 +406,7 @@ public class ProtocolConverter {
|
||||||
throw new ProtocolException("No subscription matched.");
|
throw new ProtocolException("No subscription matched.");
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void onStompConnect(StompFrame command) throws ProtocolException {
|
protected void onStompConnect(final StompFrame command) throws ProtocolException {
|
||||||
|
|
||||||
if (connected.get()) {
|
if (connected.get()) {
|
||||||
throw new ProtocolException("Allready connected.");
|
throw new ProtocolException("Allready connected.");
|
||||||
|
@ -424,6 +437,14 @@ public class ProtocolConverter {
|
||||||
sendToActiveMQ(connectionInfo, new ResponseHandler() {
|
sendToActiveMQ(connectionInfo, new ResponseHandler() {
|
||||||
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
|
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
|
||||||
|
|
||||||
|
if (response.isException()) {
|
||||||
|
// If the connection attempt fails we close the socket.
|
||||||
|
Throwable exception = ((ExceptionResponse)response).getException();
|
||||||
|
handleException(exception, command);
|
||||||
|
getTransportFilter().onException(IOExceptionSupport.create(exception));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
final SessionInfo sessionInfo = new SessionInfo(sessionId);
|
final SessionInfo sessionInfo = new SessionInfo(sessionId);
|
||||||
sendToActiveMQ(sessionInfo, null);
|
sendToActiveMQ(sessionInfo, null);
|
||||||
|
|
||||||
|
@ -431,6 +452,13 @@ public class ProtocolConverter {
|
||||||
sendToActiveMQ(producerInfo, new ResponseHandler() {
|
sendToActiveMQ(producerInfo, new ResponseHandler() {
|
||||||
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
|
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
|
||||||
|
|
||||||
|
if (response.isException()) {
|
||||||
|
// If the connection attempt fails we close the socket.
|
||||||
|
Throwable exception = ((ExceptionResponse)response).getException();
|
||||||
|
handleException(exception, command);
|
||||||
|
getTransportFilter().onException(IOExceptionSupport.create(exception));
|
||||||
|
}
|
||||||
|
|
||||||
connected.set(true);
|
connected.set(true);
|
||||||
HashMap<String, String> responseHeaders = new HashMap<String, String>();
|
HashMap<String, String> responseHeaders = new HashMap<String, String>();
|
||||||
|
|
||||||
|
@ -483,8 +511,13 @@ public class ProtocolConverter {
|
||||||
ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
|
ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
|
||||||
if (rh != null) {
|
if (rh != null) {
|
||||||
rh.onResponse(this, response);
|
rh.onResponse(this, response);
|
||||||
|
} else {
|
||||||
|
// Pass down any unexpected errors. Should this close the connection?
|
||||||
|
if (response.isException()) {
|
||||||
|
Throwable exception = ((ExceptionResponse)response).getException();
|
||||||
|
handleException(exception, null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if (command.isMessageDispatch()) {
|
} else if (command.isMessageDispatch()) {
|
||||||
|
|
||||||
MessageDispatch md = (MessageDispatch)command;
|
MessageDispatch md = (MessageDispatch)command;
|
||||||
|
@ -492,6 +525,10 @@ public class ProtocolConverter {
|
||||||
if (sub != null) {
|
if (sub != null) {
|
||||||
sub.onMessageDispatch(md);
|
sub.onMessageDispatch(md);
|
||||||
}
|
}
|
||||||
|
} else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
|
||||||
|
// Pass down any unexpected async errors. Should this close the connection?
|
||||||
|
Throwable exception = ((ConnectionError)command).getException();
|
||||||
|
handleException(exception, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,11 @@ public class StompConnection {
|
||||||
private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
|
private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
|
||||||
|
|
||||||
public void open(String host, int port) throws IOException, UnknownHostException {
|
public void open(String host, int port) throws IOException, UnknownHostException {
|
||||||
stompSocket = new Socket(host, port);
|
open(new Socket(host, port));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void open(Socket socket) {
|
||||||
|
stompSocket = socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
@ -76,4 +80,12 @@ public class StompConnection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Socket getStompSocket() {
|
||||||
|
return stompSocket;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStompSocket(Socket stompSocket) {
|
||||||
|
this.stompSocket = stompSocket;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,49 +35,49 @@ import javax.jms.TextMessage;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.CombinationTestSupport;
|
import org.apache.activemq.CombinationTestSupport;
|
||||||
|
import org.apache.activemq.broker.BrokerFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerPlugin;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.TransportConnector;
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||||
import org.apache.activemq.transport.reliable.UnreliableUdpTransportTest;
|
import org.apache.activemq.security.AuthorizationPlugin;
|
||||||
|
import org.apache.activemq.security.SimpleSecurityBrokerSystemTest;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
public class StompTest extends CombinationTestSupport {
|
public class StompTest extends CombinationTestSupport {
|
||||||
private static final Log LOG = LogFactory.getLog(StompTest.class);
|
private static final Log LOG = LogFactory.getLog(StompTest.class);
|
||||||
|
|
||||||
protected String bindAddress = "stomp://localhost:0";
|
protected String bindAddress = "stomp://localhost:61613";
|
||||||
|
protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
|
||||||
|
|
||||||
private BrokerService broker;
|
private BrokerService broker;
|
||||||
private TransportConnector connector;
|
|
||||||
private StompConnection stompConnection = new StompConnection();
|
private StompConnection stompConnection = new StompConnection();
|
||||||
private Connection connection;
|
private Connection connection;
|
||||||
private Session session;
|
private Session session;
|
||||||
private ActiveMQQueue queue;
|
private ActiveMQQueue queue;
|
||||||
|
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
broker = new BrokerService();
|
broker = BrokerFactory.createBroker(new URI(confUri));
|
||||||
broker.setPersistent(false);
|
|
||||||
|
|
||||||
connector = broker.addConnector(bindAddress);
|
|
||||||
broker.start();
|
broker.start();
|
||||||
|
|
||||||
stompConnect();
|
stompConnect();
|
||||||
|
|
||||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
|
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
|
||||||
connection = cf.createConnection();
|
connection = cf.createConnection("system", "manager");
|
||||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
queue = new ActiveMQQueue(getQueueName());
|
queue = new ActiveMQQueue(getQueueName());
|
||||||
connection.start();
|
connection.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void stompConnect() throws IOException, URISyntaxException, UnknownHostException {
|
private void stompConnect() throws IOException, URISyntaxException, UnknownHostException {
|
||||||
URI connectUri = connector.getConnectUri();
|
URI connectUri = new URI(bindAddress);
|
||||||
stompConnection.open("127.0.0.1", connectUri.getPort());
|
stompConnection.open(createSocket(connectUri));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Socket createSocket(URI connectUri) throws IOException {
|
protected Socket createSocket(URI connectUri) throws IOException {
|
||||||
return new Socket();
|
return new Socket("127.0.0.1", connectUri.getPort());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getQueueName() {
|
protected String getQueueName() {
|
||||||
|
@ -117,7 +117,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
public void testConnect() throws Exception {
|
public void testConnect() throws Exception {
|
||||||
|
|
||||||
String connectFrame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
|
String connectFrame = "CONNECT\n" + "login: system\n" + "passcode: manager\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(connectFrame);
|
stompConnection.sendFrame(connectFrame);
|
||||||
|
|
||||||
String f = stompConnection.receiveFrame();
|
String f = stompConnection.receiveFrame();
|
||||||
|
@ -130,7 +130,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
|
@ -155,7 +155,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
|
@ -174,7 +174,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
|
MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
|
@ -195,7 +195,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
|
@ -222,7 +222,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
public void testSubscribeWithAutoAck() throws Exception {
|
public void testSubscribeWithAutoAck() throws Exception {
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
|
@ -242,7 +242,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
|
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
|
@ -271,7 +271,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
public void testSubscribeWithMessageSentWithProperties() throws Exception {
|
public void testSubscribeWithMessageSentWithProperties() throws Exception {
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
|
@ -305,7 +305,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
int ctr = 10;
|
int ctr = 10;
|
||||||
String[] data = new String[ctr];
|
String[] data = new String[ctr];
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
|
@ -343,7 +343,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
public void testSubscribeWithAutoAckAndSelector() throws Exception {
|
public void testSubscribeWithAutoAckAndSelector() throws Exception {
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
|
@ -365,7 +365,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
public void testSubscribeWithClientAck() throws Exception {
|
public void testSubscribeWithClientAck() throws Exception {
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
|
@ -389,7 +389,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
public void testUnsubscribe() throws Exception {
|
public void testUnsubscribe() throws Exception {
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
assertTrue(frame.startsWith("CONNECTED"));
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
@ -426,7 +426,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
public void testTransactionCommit() throws Exception {
|
public void testTransactionCommit() throws Exception {
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
String f = stompConnection.receiveFrame();
|
String f = stompConnection.receiveFrame();
|
||||||
|
@ -450,7 +450,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
public void testTransactionRollback() throws Exception {
|
public void testTransactionRollback() throws Exception {
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
String f = stompConnection.receiveFrame();
|
String f = stompConnection.receiveFrame();
|
||||||
|
@ -486,7 +486,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception {
|
public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception {
|
||||||
assertClients(1);
|
assertClients(1);
|
||||||
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
@ -503,6 +503,60 @@ public class StompTest extends CombinationTestSupport {
|
||||||
assertClients(1);
|
assertClients(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testConnectNotAuthenticatedWrongUser() throws Exception {
|
||||||
|
String frame = "CONNECT\n" + "login: dejanb\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
String f = stompConnection.receiveFrame();
|
||||||
|
|
||||||
|
assertTrue(f.startsWith("ERROR"));
|
||||||
|
assertClients(1);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testConnectNotAuthenticatedWrongPassword() throws Exception {
|
||||||
|
|
||||||
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: dejanb\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
String f = stompConnection.receiveFrame();
|
||||||
|
|
||||||
|
assertTrue(f.startsWith("ERROR"));
|
||||||
|
assertClients(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSendNotAuthorized() throws Exception {
|
||||||
|
|
||||||
|
String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
frame = "SEND\n" + "destination:/queue/USERS." + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
|
||||||
|
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
String f = stompConnection.receiveFrame();
|
||||||
|
assertTrue(f.startsWith("ERROR"));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSubscribeNotAuthorized() throws Exception {
|
||||||
|
|
||||||
|
String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
|
||||||
|
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
String f = stompConnection.receiveFrame();
|
||||||
|
assertTrue(f.startsWith("ERROR"));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
protected void assertClients(int expected) throws Exception {
|
protected void assertClients(int expected) throws Exception {
|
||||||
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
|
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
|
||||||
int actual = clients.length;
|
int actual = clients.length;
|
||||||
|
|
Loading…
Reference in New Issue