From 46e67a10ccac3c1c6dd7fc20a8b03337b9655482 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Wed, 6 Feb 2013 19:18:55 +0000 Subject: [PATCH] Enhances the Http(s) and ws(s) transport Servers such that they can update the connectUri after starting so that test cases can use the any port option on their URI like "localhost:0" and get the connection string after the BrokerService is started. This will allow most of those test cases to be updated so that they don't fail because the hard coded port is already bound. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1443146 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/WebTransportServerSupport.java | 7 +- .../transport/http/HttpTransportServer.java | 29 +++++- .../transport/ws/WSTransportServer.java | 25 ++++- .../http/HttpClientReconnectTest.java | 97 ++++++++++--------- .../HttpJmsDurableTopicSendReceiveTest.java | 13 ++- 5 files changed, 114 insertions(+), 57 deletions(-) diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java b/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java index c9a56b28ac..28c11a6419 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java @@ -35,7 +35,7 @@ abstract public class WebTransportServerSupport extends TransportServerSupport { super(location); } - public void bind() throws Exception { + public URI bind() throws Exception { URI bind = getBindLocation(); @@ -51,6 +51,9 @@ abstract public class WebTransportServerSupport extends TransportServerSupport { if (addr.isAnyLocalAddress()) { host = InetAddressUtil.getLocalHostName(); } - setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), host, bindAddress.getPort(), bind.getPath(), bind.getQuery(), bind.getFragment())); + + URI boundUri = new URI(bind.getScheme(), bind.getUserInfo(), host, bindAddress.getPort(), bind.getPath(), bind.getQuery(), bind.getFragment()); + setConnectURI(boundUri); + return boundUri; } } diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java index ec02967005..18cc1a9588 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java @@ -31,11 +31,15 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.GzipHandler; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HttpTransportServer extends WebTransportServerSupport { + private static final Logger LOG = LoggerFactory.getLogger(HttpTransportServer.class); + private TextWireFormat wireFormat; - private HttpTransportFactory transportFactory; + private final HttpTransportFactory transportFactory; public HttpTransportServer(URI uri, HttpTransportFactory factory) { super(uri); @@ -44,6 +48,7 @@ public class HttpTransportServer extends WebTransportServerSupport { socketConnectorFactory = new SocketConnectorFactory(); } + @Override public void setBrokerInfo(BrokerInfo brokerInfo) { } @@ -70,14 +75,14 @@ public class HttpTransportServer extends WebTransportServerSupport { this.connector = connector; } + @Override protected void doStart() throws Exception { server = new Server(); if (connector == null) { connector = socketConnectorFactory.createConnector(); } - URI bind = getBindLocation(); - bind(); + URI boundTo = bind(); ServletContextHandler contextHandler = new ServletContextHandler(server, "/", ServletContextHandler.NO_SECURITY); @@ -95,8 +100,25 @@ public class HttpTransportServer extends WebTransportServerSupport { contextHandler.setHandler(gzipHandler); server.start(); + + // Update the Connect To URI with our actual location in case the configured port + // was set to zero so that we report the actual port we are listening on. + + int port = boundTo.getPort(); + if (connector.getLocalPort() != -1) { + port = connector.getLocalPort(); + } + + setConnectURI(new URI(boundTo.getScheme(), + boundTo.getUserInfo(), + boundTo.getHost(), + port, + boundTo.getPath(), + boundTo.getQuery(), + boundTo.getFragment())); } + @Override protected void doStop(ServiceStopper stopper) throws Exception { Server temp = server; server = null; @@ -105,6 +127,7 @@ public class HttpTransportServer extends WebTransportServerSupport { } } + @Override public InetSocketAddress getSocketAddress() { return null; } diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java index 918da6ab61..4a71e5f072 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java @@ -43,6 +43,7 @@ public class WSTransportServer extends WebTransportServerSupport { socketConnectorFactory = new SocketConnectorFactory(); } + @Override protected void doStart() throws Exception { server = new Server(); @@ -50,9 +51,7 @@ public class WSTransportServer extends WebTransportServerSupport { connector = socketConnectorFactory.createConnector(); } - URI bind = getBindLocation(); - - bind(); + URI boundTo = bind(); ServletContextHandler contextHandler = new ServletContextHandler(server, "/", ServletContextHandler.NO_SECURITY); @@ -72,8 +71,25 @@ public class WSTransportServer extends WebTransportServerSupport { contextHandler.setAttribute("acceptListener", getAcceptListener()); server.start(); + + // Update the Connect To URI with our actual location in case the configured port + // was set to zero so that we report the actual port we are listening on. + + int port = boundTo.getPort(); + if (connector.getLocalPort() != -1) { + port = connector.getLocalPort(); + } + + setConnectURI(new URI(boundTo.getScheme(), + boundTo.getUserInfo(), + boundTo.getHost(), + port, + boundTo.getPath(), + boundTo.getQuery(), + boundTo.getFragment())); } + @Override protected void doStop(ServiceStopper stopper) throws Exception { Server temp = server; server = null; @@ -82,10 +98,12 @@ public class WSTransportServer extends WebTransportServerSupport { } } + @Override public InetSocketAddress getSocketAddress() { return null; } + @Override public void setBrokerInfo(BrokerInfo brokerInfo) { } @@ -104,5 +122,4 @@ public class WSTransportServer extends WebTransportServerSupport { public boolean isSslServer() { return false; } - } diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java index 24045c30f7..49ccbf07d7 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.transport.http; +import static org.junit.Assert.assertEquals; + import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MessageConsumer; @@ -23,17 +25,21 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; -public class HttpClientReconnectTest extends TestCase { - - BrokerService broker; - ActiveMQConnectionFactory factory; +public class HttpClientReconnectTest { - protected void setUp() throws Exception { + private BrokerService broker; + private ActiveMQConnectionFactory factory; + + @Before + public void setUp() throws Exception { System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore"); System.setProperty("javax.net.ssl.trustStorePassword", "password"); System.setProperty("javax.net.ssl.trustStoreType", "jks"); @@ -41,45 +47,46 @@ public class HttpClientReconnectTest extends TestCase { System.setProperty("javax.net.ssl.keyStorePassword", "password"); System.setProperty("javax.net.ssl.keyStoreType", "jks"); - broker = new BrokerService(); - broker.addConnector("https://localhost:61666?trace=true"); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.deleteAllMessages(); - broker.start(); - factory = new ActiveMQConnectionFactory("https://localhost:61666?trace=true&soTimeout=1000"); - } + broker = new BrokerService(); + TransportConnector connector = broker.addConnector("https://localhost:0?trace=true"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.deleteAllMessages(); + broker.start(); - protected void tearDown() throws Exception { - broker.stop(); - } - - public void testReconnectClient() throws Exception { - for (int i = 0; i < 100; i++) { - sendAndReceiveMessage(i); - } - } - - private void sendAndReceiveMessage(int i) throws Exception { - Connection conn = factory.createConnection(); - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - conn.start(); - Destination dest = new ActiveMQQueue("test"); - MessageProducer producer = sess.createProducer(dest); - MessageConsumer consumer = sess.createConsumer(dest); - String messageText = "test " + i; - try { - producer.send(sess.createTextMessage(messageText)); - TextMessage msg = (TextMessage)consumer.receive(1000); - assertEquals(messageText, msg.getText()); - } finally { - producer.close(); - consumer.close(); - conn.close(); - sess.close(); - } - } - - + String connectionUri = connector.getPublishableConnectString(); + factory = new ActiveMQConnectionFactory(connectionUri + "?trace=true&soTimeout=1000"); + } + @After + public void tearDown() throws Exception { + broker.stop(); + } + + @Test + public void testReconnectClient() throws Exception { + for (int i = 0; i < 100; i++) { + sendAndReceiveMessage(i); + } + } + + private void sendAndReceiveMessage(int i) throws Exception { + Connection conn = factory.createConnection(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + Destination dest = new ActiveMQQueue("test"); + MessageProducer producer = sess.createProducer(dest); + MessageConsumer consumer = sess.createConsumer(dest); + String messageText = "test " + i; + try { + producer.send(sess.createTextMessage(messageText)); + TextMessage msg = (TextMessage)consumer.receive(1000); + assertEquals(messageText, msg.getText()); + } finally { + producer.close(); + consumer.close(); + conn.close(); + sess.close(); + } + } } diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpJmsDurableTopicSendReceiveTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpJmsDurableTopicSendReceiveTest.java index ec2d1ffb0c..3b4c7c49f1 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpJmsDurableTopicSendReceiveTest.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpJmsDurableTopicSendReceiveTest.java @@ -21,17 +21,23 @@ import org.apache.activemq.JmsDurableTopicSendReceiveTest; import org.apache.activemq.broker.BrokerService; public class HttpJmsDurableTopicSendReceiveTest extends JmsDurableTopicSendReceiveTest { + protected BrokerService broker; + private String connectionUri; + + @Override protected void setUp() throws Exception { if (broker == null) { broker = createBroker(); broker.start(); + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); } super.setUp(); - WaitForJettyListener.waitForJettySocketToAccept(getBrokerURL()); + WaitForJettyListener.waitForJettySocketToAccept(connectionUri); } + @Override protected void tearDown() throws Exception { super.tearDown(); if (broker != null) { @@ -39,13 +45,14 @@ public class HttpJmsDurableTopicSendReceiveTest extends JmsDurableTopicSendRecei } } + @Override protected ActiveMQConnectionFactory createConnectionFactory() { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURL()); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); return connectionFactory; } protected String getBrokerURL() { - return "http://localhost:8161"; + return "http://localhost:0"; } protected BrokerService createBroker() throws Exception {