mirror of https://github.com/apache/activemq.git
Unit tests for some STOMP over WebSockets functionality and some fixes for resource cleanup.
This commit is contained in:
parent
82200b6e70
commit
f05f83b15d
|
@ -60,12 +60,10 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
|
|||
doConsume(command);
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract void sendToStomp(StompFrame command) throws IOException;
|
||||
|
||||
@Override
|
||||
protected void doStop(ServiceStopper stopper) throws Exception {
|
||||
stompInactivityMonitor.stop();
|
||||
handleStopped();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -74,6 +72,19 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
|
|||
stompInactivityMonitor.setTransportListener(getTransportListener());
|
||||
}
|
||||
|
||||
//----- Abstract methods for subclasses to implement ---------------------//
|
||||
|
||||
@Override
|
||||
public abstract void sendToStomp(StompFrame command) throws IOException;
|
||||
|
||||
/**
|
||||
* Called when the transport is stopping to allow the dervied classes
|
||||
* a chance to close WebSocket resources.
|
||||
*
|
||||
* @throws IOException if an error occurs during the stop.
|
||||
*/
|
||||
public abstract void handleStopped() throws IOException;
|
||||
|
||||
//----- Accessor methods -------------------------------------------------//
|
||||
|
||||
@Override
|
||||
|
|
|
@ -34,6 +34,20 @@ class StompSocket extends AbstractStompSocket implements WebSocket.OnTextMessage
|
|||
|
||||
private Connection outbound;
|
||||
|
||||
@Override
|
||||
public void handleStopped() throws IOException {
|
||||
if (outbound != null && outbound.isOpen()) {
|
||||
outbound.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendToStomp(StompFrame command) throws IOException {
|
||||
outbound.sendMessage(command.format());
|
||||
}
|
||||
|
||||
//----- WebSocket.OnTextMessage callback handlers ------------------------//
|
||||
|
||||
@Override
|
||||
public void onOpen(Connection connection) {
|
||||
this.outbound = connection;
|
||||
|
@ -52,9 +66,4 @@ class StompSocket extends AbstractStompSocket implements WebSocket.OnTextMessage
|
|||
public void onMessage(String data) {
|
||||
processStompFrame(data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendToStomp(StompFrame command) throws IOException {
|
||||
outbound.sendMessage(command.format());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,15 @@ class StompSocket extends AbstractStompSocket implements WebSocketListener {
|
|||
session.getRemote().sendString(command.format());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleStopped() throws IOException {
|
||||
if (session != null && session.isOpen()) {
|
||||
session.close();
|
||||
}
|
||||
}
|
||||
|
||||
//----- WebSocketListener event callbacks --------------------------------//
|
||||
|
||||
@Override
|
||||
public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) {
|
||||
}
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.ws;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.transport.stomp.StompFrame;
|
||||
import org.eclipse.jetty.websocket.WebSocket;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* STOMP over WS based Connection class
|
||||
*/
|
||||
public class StompWSConnection implements WebSocket, WebSocket.OnTextMessage {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(StompWSConnection.class);
|
||||
|
||||
private Connection connection;
|
||||
private final CountDownLatch connectLatch = new CountDownLatch(1);
|
||||
|
||||
private final BlockingQueue<String> prefetch = new LinkedBlockingDeque<String>();
|
||||
|
||||
private int closeCode = -1;
|
||||
private String closeMessage;
|
||||
|
||||
public boolean isConnected() {
|
||||
return connection != null ? connection.isOpen() : false;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
//---- Send methods ------------------------------------------------------//
|
||||
|
||||
public void sendRawFrame(String rawFrame) throws Exception {
|
||||
checkConnected();
|
||||
connection.sendMessage(rawFrame);
|
||||
}
|
||||
|
||||
public void sendFrame(StompFrame frame) throws Exception {
|
||||
checkConnected();
|
||||
connection.sendMessage(frame.format());
|
||||
}
|
||||
|
||||
public void keepAlive() throws Exception {
|
||||
checkConnected();
|
||||
connection.sendMessage("\n");
|
||||
}
|
||||
|
||||
//----- Receive methods --------------------------------------------------//
|
||||
|
||||
public String receive() throws Exception {
|
||||
checkConnected();
|
||||
return prefetch.take();
|
||||
}
|
||||
|
||||
public String receive(long timeout, TimeUnit unit) throws Exception {
|
||||
checkConnected();
|
||||
return prefetch.poll(timeout, unit);
|
||||
}
|
||||
|
||||
public String receiveNoWait() throws Exception {
|
||||
checkConnected();
|
||||
return prefetch.poll();
|
||||
}
|
||||
|
||||
//---- Blocking state change calls ---------------------------------------//
|
||||
|
||||
public void awaitConnection() throws InterruptedException {
|
||||
connectLatch.await();
|
||||
}
|
||||
|
||||
public boolean awaitConnection(long time, TimeUnit unit) throws InterruptedException {
|
||||
return connectLatch.await(time, unit);
|
||||
}
|
||||
|
||||
//----- Property Accessors -----------------------------------------------//
|
||||
|
||||
public int getCloseCode() {
|
||||
return closeCode;
|
||||
}
|
||||
|
||||
public String getCloseMessage() {
|
||||
return closeMessage;
|
||||
}
|
||||
|
||||
//----- WebSocket callback handlers --------------------------------------//
|
||||
|
||||
@Override
|
||||
public void onMessage(String data) {
|
||||
if (data == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.equals("\n")) {
|
||||
LOG.debug("New incoming heartbeat read");
|
||||
} else {
|
||||
LOG.trace("New incoming STOMP Frame read: \n{}", data);
|
||||
prefetch.add(data);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen(Connection connection) {
|
||||
this.connection = connection;
|
||||
this.connectLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(int closeCode, String message) {
|
||||
LOG.trace("STOMP WS Connection closed, code:{} message:{}", closeCode, message);
|
||||
|
||||
this.connection = null;
|
||||
this.closeCode = closeCode;
|
||||
this.closeMessage = message;
|
||||
}
|
||||
|
||||
//----- Internal implementation ------------------------------------------//
|
||||
|
||||
private void checkConnected() throws IOException {
|
||||
if (!isConnected()) {
|
||||
throw new IOException("STOMP WS Connection is closed.");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,283 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.ws;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.transport.stomp.Stomp;
|
||||
import org.apache.activemq.transport.stomp.StompFrame;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.eclipse.jetty.websocket.WebSocketClient;
|
||||
import org.eclipse.jetty.websocket.WebSocketClientFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Test STOMP over WebSockets functionality.
|
||||
*/
|
||||
public class StompWSTransportTest extends WSTransportTestSupport {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(StompWSTransportTest.class);
|
||||
|
||||
protected WebSocketClient wsClient;
|
||||
protected StompWSConnection wsStompConnection;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
WebSocketClientFactory clientFactory = new WebSocketClientFactory();
|
||||
clientFactory.start();
|
||||
|
||||
wsClient = clientFactory.newWebSocketClient();
|
||||
wsStompConnection = new StompWSConnection();
|
||||
|
||||
wsClient.open(wsConnectUri, wsStompConnection);
|
||||
if (!wsStompConnection.awaitConnection(30, TimeUnit.SECONDS)) {
|
||||
throw new IOException("Could not connect to STOMP WS endpoint");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (wsStompConnection != null) {
|
||||
wsStompConnection.close();
|
||||
wsStompConnection = null;
|
||||
wsClient = null;
|
||||
}
|
||||
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testConnect() throws Exception {
|
||||
String connectFrame = "STOMP\n" +
|
||||
"login:system\n" +
|
||||
"passcode:manager\n" +
|
||||
"accept-version:1.2\n" +
|
||||
"host:localhost\n" +
|
||||
"\n" + Stomp.NULL;
|
||||
|
||||
wsStompConnection.sendRawFrame(connectFrame);
|
||||
|
||||
String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
|
||||
assertNotNull(incoming);
|
||||
assertTrue(incoming.startsWith("CONNECTED"));
|
||||
|
||||
assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return getProxyToBroker().getCurrentConnectionsCount() == 1;
|
||||
}
|
||||
}));
|
||||
|
||||
wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT));
|
||||
wsStompConnection.close();
|
||||
|
||||
assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return getProxyToBroker().getCurrentConnectionsCount() == 0;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testConnectWithVersionOptions() throws Exception {
|
||||
String connectFrame = "STOMP\n" +
|
||||
"login:system\n" +
|
||||
"passcode:manager\n" +
|
||||
"accept-version:1.0,1.1\n" +
|
||||
"host:localhost\n" +
|
||||
"\n" + Stomp.NULL;
|
||||
wsStompConnection.sendRawFrame(connectFrame);
|
||||
|
||||
String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrue(incoming.startsWith("CONNECTED"));
|
||||
assertTrue(incoming.indexOf("version:1.1") >= 0);
|
||||
assertTrue(incoming.indexOf("session:") >= 0);
|
||||
|
||||
wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT));
|
||||
wsStompConnection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testRejectInvalidHeartbeats1() throws Exception {
|
||||
String connectFrame = "STOMP\n" +
|
||||
"login:system\n" +
|
||||
"passcode:manager\n" +
|
||||
"accept-version:1.1\n" +
|
||||
"heart-beat:0\n" +
|
||||
"host:localhost\n" +
|
||||
"\n" + Stomp.NULL;
|
||||
wsStompConnection.sendRawFrame(connectFrame);
|
||||
|
||||
String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrue(incoming.startsWith("ERROR"));
|
||||
assertTrue(incoming.indexOf("heart-beat") >= 0);
|
||||
assertTrue(incoming.indexOf("message:") >= 0);
|
||||
|
||||
assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return getProxyToBroker().getCurrentConnectionsCount() == 0;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testRejectInvalidHeartbeats2() throws Exception {
|
||||
String connectFrame = "STOMP\n" +
|
||||
"login:system\n" +
|
||||
"passcode:manager\n" +
|
||||
"accept-version:1.1\n" +
|
||||
"heart-beat:T,0\n" +
|
||||
"host:localhost\n" +
|
||||
"\n" + Stomp.NULL;
|
||||
wsStompConnection.sendRawFrame(connectFrame);
|
||||
|
||||
String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrue(incoming.startsWith("ERROR"));
|
||||
assertTrue(incoming.indexOf("heart-beat") >= 0);
|
||||
assertTrue(incoming.indexOf("message:") >= 0);
|
||||
|
||||
assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return getProxyToBroker().getCurrentConnectionsCount() == 0;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testRejectInvalidHeartbeats3() throws Exception {
|
||||
String connectFrame = "STOMP\n" +
|
||||
"login:system\n" +
|
||||
"passcode:manager\n" +
|
||||
"accept-version:1.1\n" +
|
||||
"heart-beat:100,10,50\n" +
|
||||
"host:localhost\n" +
|
||||
"\n" + Stomp.NULL;
|
||||
wsStompConnection.sendRawFrame(connectFrame);
|
||||
|
||||
String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
|
||||
|
||||
assertTrue(incoming.startsWith("ERROR"));
|
||||
assertTrue(incoming.indexOf("heart-beat") >= 0);
|
||||
assertTrue(incoming.indexOf("message:") >= 0);
|
||||
|
||||
assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return getProxyToBroker().getCurrentConnectionsCount() == 0;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testHeartbeatsDropsIdleConnection() throws Exception {
|
||||
String connectFrame = "STOMP\n" +
|
||||
"login:system\n" +
|
||||
"passcode:manager\n" +
|
||||
"accept-version:1.1\n" +
|
||||
"heart-beat:1000,0\n" +
|
||||
"host:localhost\n" +
|
||||
"\n" + Stomp.NULL;
|
||||
|
||||
wsStompConnection.sendRawFrame(connectFrame);
|
||||
String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
|
||||
assertTrue(incoming.startsWith("CONNECTED"));
|
||||
assertTrue(incoming.indexOf("version:1.1") >= 0);
|
||||
assertTrue(incoming.indexOf("heart-beat:") >= 0);
|
||||
assertTrue(incoming.indexOf("session:") >= 0);
|
||||
|
||||
assertTrue("Broker should have closed WS connection:", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return !wsStompConnection.isConnected();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testHeartbeatsKeepsConnectionOpen() throws Exception {
|
||||
String connectFrame = "STOMP\n" +
|
||||
"login:system\n" +
|
||||
"passcode:manager\n" +
|
||||
"accept-version:1.1\n" +
|
||||
"heart-beat:2000,0\n" +
|
||||
"host:localhost\n" +
|
||||
"\n" + Stomp.NULL;
|
||||
|
||||
wsStompConnection.sendRawFrame(connectFrame);
|
||||
String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
|
||||
assertTrue(incoming.startsWith("CONNECTED"));
|
||||
assertTrue(incoming.indexOf("version:1.1") >= 0);
|
||||
assertTrue(incoming.indexOf("heart-beat:") >= 0);
|
||||
assertTrue(incoming.indexOf("session:") >= 0);
|
||||
|
||||
String message = "SEND\n" + "destination:/queue/" + getTestName() + "\n\n" + "Hello World" + Stomp.NULL;
|
||||
wsStompConnection.sendRawFrame(message);
|
||||
|
||||
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
service.scheduleAtFixedRate(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
LOG.info("Sending next KeepAlive");
|
||||
wsStompConnection.keepAlive();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
}, 1, 1, TimeUnit.SECONDS);
|
||||
|
||||
TimeUnit.SECONDS.sleep(15);
|
||||
|
||||
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getTestName() + "\n" +
|
||||
"id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
|
||||
wsStompConnection.sendRawFrame(frame);
|
||||
|
||||
incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
|
||||
assertTrue(incoming.startsWith("MESSAGE"));
|
||||
|
||||
service.shutdownNow();
|
||||
service.awaitTermination(5, TimeUnit.SECONDS);
|
||||
|
||||
wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT));
|
||||
}
|
||||
}
|
|
@ -21,29 +21,22 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
import javax.net.ServerSocketFactory;
|
||||
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.spring.SpringSslContext;
|
||||
import org.apache.activemq.transport.SocketConnectorFactory;
|
||||
import org.apache.activemq.transport.stomp.StompConnection;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.webapp.WebAppContext;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.openqa.selenium.By;
|
||||
import org.openqa.selenium.WebDriver;
|
||||
import org.openqa.selenium.WebElement;
|
||||
|
@ -54,43 +47,25 @@ import org.openqa.selenium.firefox.FirefoxProfile;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class WSTransportTest {
|
||||
public class WSTransportTest extends WSTransportTestSupport {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(WSTransportTest.class);
|
||||
private static final int MESSAGE_COUNT = 1000;
|
||||
|
||||
private BrokerService broker;
|
||||
private Server server;
|
||||
private WebDriver driver;
|
||||
private File profileDir;
|
||||
|
||||
private String stompUri;
|
||||
private int proxyPort = 0;
|
||||
protected String wsUri;
|
||||
|
||||
private StompConnection stompConnection = new StompConnection();
|
||||
|
||||
protected BrokerService createBroker(boolean deleteMessages) throws Exception {
|
||||
BrokerService broker = BrokerFactory.createBroker(
|
||||
new URI("broker:()/localhost?persistent=false&useJmx=false"));
|
||||
|
||||
SpringSslContext context = new SpringSslContext();
|
||||
context.setKeyStore("src/test/resources/server.keystore");
|
||||
context.setKeyStoreKeyPassword("password");
|
||||
context.setTrustStore("src/test/resources/client.keystore");
|
||||
context.setTrustStorePassword("password");
|
||||
context.afterPropertiesSet();
|
||||
broker.setSslContext(context);
|
||||
|
||||
stompUri = broker.addConnector("stomp://localhost:0").getPublishableConnectString();
|
||||
wsUri = broker.addConnector(getWSConnectorURI()).getPublishableConnectString();
|
||||
broker.setDeleteAllMessagesOnStartup(deleteMessages);
|
||||
broker.start();
|
||||
broker.waitUntilStarted();
|
||||
|
||||
return broker;
|
||||
@Override
|
||||
protected void addAdditionalConnectors(BrokerService service) throws Exception {
|
||||
stompUri = service.addConnector("stomp://localhost:0").getPublishableConnectString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getWSConnectorURI() {
|
||||
return "ws://127.0.0.1:61623?websocket.maxTextMessageSize=99999&transport.maxIdleTime=1001";
|
||||
}
|
||||
|
@ -114,31 +89,13 @@ public class WSTransportTest {
|
|||
return server;
|
||||
}
|
||||
|
||||
protected int getProxyPort() {
|
||||
if (proxyPort == 0) {
|
||||
ServerSocket ss = null;
|
||||
try {
|
||||
ss = ServerSocketFactory.getDefault().createServerSocket(0);
|
||||
proxyPort = ss.getLocalPort();
|
||||
} catch (IOException e) { // ignore
|
||||
} finally {
|
||||
try {
|
||||
if (ss != null ) {
|
||||
ss.close();
|
||||
}
|
||||
} catch (IOException e) { // ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
return proxyPort;
|
||||
}
|
||||
|
||||
protected Connector createJettyConnector(Server server) throws Exception {
|
||||
Connector c = new SocketConnectorFactory().createConnector(server);
|
||||
c.getClass().getMethod("setPort", Integer.TYPE).invoke(c, getProxyPort());
|
||||
return c;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void stopBroker() throws Exception {
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
|
@ -147,14 +104,16 @@ public class WSTransportTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
profileDir = new File("activemq-data/profiles");
|
||||
broker = createBroker(true);
|
||||
stompConnect();
|
||||
server = createWebServer();
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
try {
|
||||
|
@ -163,10 +122,11 @@ public class WSTransportTest {
|
|||
// Some tests explicitly disconnect from stomp so can ignore
|
||||
} finally {
|
||||
try {
|
||||
stopBroker();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Error on Broker stop.");
|
||||
super.tearDown();
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Error on super tearDown()");
|
||||
}
|
||||
|
||||
if (driver != null) {
|
||||
try {
|
||||
driver.quit();
|
||||
|
@ -234,7 +194,7 @@ public class WSTransportTest {
|
|||
|
||||
protected String getTestURI() {
|
||||
int port = getProxyPort();
|
||||
return "http://localhost:" + port + "/websocket.html#" + wsUri;
|
||||
return "http://localhost:" + port + "/websocket.html#" + wsConnectUri;
|
||||
}
|
||||
|
||||
public void doTestWebSockets(WebDriver driver) throws Exception {
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.ws;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.URI;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
import javax.net.ServerSocketFactory;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.jmx.BrokerViewMBean;
|
||||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||
import org.apache.activemq.broker.jmx.TopicViewMBean;
|
||||
import org.apache.activemq.spring.SpringSslContext;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Basic infrastructure for test WebSocket connections.
|
||||
*/
|
||||
public class WSTransportTestSupport {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(WSTransportTestSupport.class);
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
private int proxyPort = 0;
|
||||
|
||||
protected BrokerService broker;
|
||||
protected URI wsConnectUri;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
broker = createBroker(true);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
try {
|
||||
stopBroker();
|
||||
} catch(Exception e) {
|
||||
LOG.warn("Error on Broker stop.");
|
||||
}
|
||||
}
|
||||
|
||||
protected String getWSConnectorURI() {
|
||||
return "ws://127.0.0.1:" + getProxyPort() + "?websocket.maxTextMessageSize=99999&transport.maxIdleTime=1001";
|
||||
}
|
||||
|
||||
protected void addAdditionalConnectors(BrokerService service) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
protected BrokerService createBroker(boolean deleteMessages) throws Exception {
|
||||
|
||||
BrokerService broker = new BrokerService();
|
||||
|
||||
SpringSslContext context = new SpringSslContext();
|
||||
context.setKeyStore("src/test/resources/server.keystore");
|
||||
context.setKeyStoreKeyPassword("password");
|
||||
context.setTrustStore("src/test/resources/client.keystore");
|
||||
context.setTrustStorePassword("password");
|
||||
context.afterPropertiesSet();
|
||||
broker.setSslContext(context);
|
||||
|
||||
wsConnectUri = broker.addConnector(getWSConnectorURI()).getPublishableConnectURI();
|
||||
|
||||
broker.setUseJmx(true);
|
||||
broker.getManagementContext().setCreateConnector(false);
|
||||
broker.setPersistent(isPersistent());
|
||||
broker.setDeleteAllMessagesOnStartup(deleteMessages);
|
||||
broker.start();
|
||||
broker.waitUntilStarted();
|
||||
|
||||
addAdditionalConnectors(broker);
|
||||
|
||||
return broker;
|
||||
}
|
||||
|
||||
protected boolean isPersistent() {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected String getTestName() {
|
||||
return name.getMethodName();
|
||||
}
|
||||
|
||||
protected int getProxyPort() {
|
||||
if (proxyPort == 0) {
|
||||
ServerSocket ss = null;
|
||||
try {
|
||||
ss = ServerSocketFactory.getDefault().createServerSocket(0);
|
||||
proxyPort = ss.getLocalPort();
|
||||
} catch (IOException e) { // ignore
|
||||
} finally {
|
||||
try {
|
||||
if (ss != null ) {
|
||||
ss.close();
|
||||
}
|
||||
} catch (IOException e) { // ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return proxyPort;
|
||||
}
|
||||
|
||||
protected void stopBroker() throws Exception {
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
broker = null;
|
||||
}
|
||||
}
|
||||
|
||||
protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
|
||||
ObjectName brokerViewMBean = new ObjectName(
|
||||
"org.apache.activemq:type=Broker,brokerName=localhost");
|
||||
BrokerViewMBean proxy = (BrokerViewMBean) broker.getManagementContext()
|
||||
.newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true);
|
||||
return proxy;
|
||||
}
|
||||
|
||||
protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
|
||||
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
|
||||
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
|
||||
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
|
||||
return proxy;
|
||||
}
|
||||
|
||||
protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
|
||||
ObjectName topicViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
|
||||
TopicViewMBean proxy = (TopicViewMBean) broker.getManagementContext()
|
||||
.newProxyInstance(topicViewMBeanName, TopicViewMBean.class, true);
|
||||
return proxy;
|
||||
}
|
||||
}
|
|
@ -20,6 +20,8 @@
|
|||
#
|
||||
log4j.rootLogger=INFO, out, stdout
|
||||
|
||||
log4j.logger.org.apache.activemq.transport.ws=DEBUG
|
||||
|
||||
#log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
|
||||
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
|
||||
#log4j.logger.org.apache.activemq.transport.failover=TRACE
|
||||
|
|
Loading…
Reference in New Issue