mirror of https://github.com/apache/activemq.git
Enhances the Http(s) and ws(s) transport Servers such that they can update the connectUri after starting so that test cases can use the any port option on their URI like "localhost:0" and get the connection string after the BrokerService is started. This will allow most of those test cases to be updated so that they don't fail because the hard coded port is already bound.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1443146 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4a8033ed97
commit
46e67a10cc
|
@ -35,7 +35,7 @@ abstract public class WebTransportServerSupport extends TransportServerSupport {
|
|||
super(location);
|
||||
}
|
||||
|
||||
public void bind() throws Exception {
|
||||
public URI bind() throws Exception {
|
||||
|
||||
URI bind = getBindLocation();
|
||||
|
||||
|
@ -51,6 +51,9 @@ abstract public class WebTransportServerSupport extends TransportServerSupport {
|
|||
if (addr.isAnyLocalAddress()) {
|
||||
host = InetAddressUtil.getLocalHostName();
|
||||
}
|
||||
setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), host, bindAddress.getPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
|
||||
|
||||
URI boundUri = new URI(bind.getScheme(), bind.getUserInfo(), host, bindAddress.getPort(), bind.getPath(), bind.getQuery(), bind.getFragment());
|
||||
setConnectURI(boundUri);
|
||||
return boundUri;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,11 +31,15 @@ import org.eclipse.jetty.server.Server;
|
|||
import org.eclipse.jetty.server.handler.GzipHandler;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class HttpTransportServer extends WebTransportServerSupport {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HttpTransportServer.class);
|
||||
|
||||
private TextWireFormat wireFormat;
|
||||
private HttpTransportFactory transportFactory;
|
||||
private final HttpTransportFactory transportFactory;
|
||||
|
||||
public HttpTransportServer(URI uri, HttpTransportFactory factory) {
|
||||
super(uri);
|
||||
|
@ -44,6 +48,7 @@ public class HttpTransportServer extends WebTransportServerSupport {
|
|||
socketConnectorFactory = new SocketConnectorFactory();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBrokerInfo(BrokerInfo brokerInfo) {
|
||||
}
|
||||
|
||||
|
@ -70,14 +75,14 @@ public class HttpTransportServer extends WebTransportServerSupport {
|
|||
this.connector = connector;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception {
|
||||
server = new Server();
|
||||
if (connector == null) {
|
||||
connector = socketConnectorFactory.createConnector();
|
||||
}
|
||||
|
||||
URI bind = getBindLocation();
|
||||
bind();
|
||||
URI boundTo = bind();
|
||||
|
||||
ServletContextHandler contextHandler =
|
||||
new ServletContextHandler(server, "/", ServletContextHandler.NO_SECURITY);
|
||||
|
@ -95,8 +100,25 @@ public class HttpTransportServer extends WebTransportServerSupport {
|
|||
contextHandler.setHandler(gzipHandler);
|
||||
|
||||
server.start();
|
||||
|
||||
// Update the Connect To URI with our actual location in case the configured port
|
||||
// was set to zero so that we report the actual port we are listening on.
|
||||
|
||||
int port = boundTo.getPort();
|
||||
if (connector.getLocalPort() != -1) {
|
||||
port = connector.getLocalPort();
|
||||
}
|
||||
|
||||
setConnectURI(new URI(boundTo.getScheme(),
|
||||
boundTo.getUserInfo(),
|
||||
boundTo.getHost(),
|
||||
port,
|
||||
boundTo.getPath(),
|
||||
boundTo.getQuery(),
|
||||
boundTo.getFragment()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop(ServiceStopper stopper) throws Exception {
|
||||
Server temp = server;
|
||||
server = null;
|
||||
|
@ -105,6 +127,7 @@ public class HttpTransportServer extends WebTransportServerSupport {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getSocketAddress() {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ public class WSTransportServer extends WebTransportServerSupport {
|
|||
socketConnectorFactory = new SocketConnectorFactory();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception {
|
||||
server = new Server();
|
||||
|
||||
|
@ -50,9 +51,7 @@ public class WSTransportServer extends WebTransportServerSupport {
|
|||
connector = socketConnectorFactory.createConnector();
|
||||
}
|
||||
|
||||
URI bind = getBindLocation();
|
||||
|
||||
bind();
|
||||
URI boundTo = bind();
|
||||
|
||||
ServletContextHandler contextHandler =
|
||||
new ServletContextHandler(server, "/", ServletContextHandler.NO_SECURITY);
|
||||
|
@ -72,8 +71,25 @@ public class WSTransportServer extends WebTransportServerSupport {
|
|||
contextHandler.setAttribute("acceptListener", getAcceptListener());
|
||||
|
||||
server.start();
|
||||
|
||||
// Update the Connect To URI with our actual location in case the configured port
|
||||
// was set to zero so that we report the actual port we are listening on.
|
||||
|
||||
int port = boundTo.getPort();
|
||||
if (connector.getLocalPort() != -1) {
|
||||
port = connector.getLocalPort();
|
||||
}
|
||||
|
||||
setConnectURI(new URI(boundTo.getScheme(),
|
||||
boundTo.getUserInfo(),
|
||||
boundTo.getHost(),
|
||||
port,
|
||||
boundTo.getPath(),
|
||||
boundTo.getQuery(),
|
||||
boundTo.getFragment()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop(ServiceStopper stopper) throws Exception {
|
||||
Server temp = server;
|
||||
server = null;
|
||||
|
@ -82,10 +98,12 @@ public class WSTransportServer extends WebTransportServerSupport {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getSocketAddress() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBrokerInfo(BrokerInfo brokerInfo) {
|
||||
}
|
||||
|
||||
|
@ -104,5 +122,4 @@ public class WSTransportServer extends WebTransportServerSupport {
|
|||
public boolean isSslServer() {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.transport.http;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.MessageConsumer;
|
||||
|
@ -23,17 +25,21 @@ import javax.jms.MessageProducer;
|
|||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class HttpClientReconnectTest extends TestCase {
|
||||
|
||||
BrokerService broker;
|
||||
ActiveMQConnectionFactory factory;
|
||||
public class HttpClientReconnectTest {
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
private BrokerService broker;
|
||||
private ActiveMQConnectionFactory factory;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
|
||||
System.setProperty("javax.net.ssl.trustStorePassword", "password");
|
||||
System.setProperty("javax.net.ssl.trustStoreType", "jks");
|
||||
|
@ -41,45 +47,46 @@ public class HttpClientReconnectTest extends TestCase {
|
|||
System.setProperty("javax.net.ssl.keyStorePassword", "password");
|
||||
System.setProperty("javax.net.ssl.keyStoreType", "jks");
|
||||
|
||||
broker = new BrokerService();
|
||||
broker.addConnector("https://localhost:61666?trace=true");
|
||||
broker.setPersistent(false);
|
||||
broker.setUseJmx(false);
|
||||
broker.deleteAllMessages();
|
||||
broker.start();
|
||||
factory = new ActiveMQConnectionFactory("https://localhost:61666?trace=true&soTimeout=1000");
|
||||
}
|
||||
broker = new BrokerService();
|
||||
TransportConnector connector = broker.addConnector("https://localhost:0?trace=true");
|
||||
broker.setPersistent(false);
|
||||
broker.setUseJmx(false);
|
||||
broker.deleteAllMessages();
|
||||
broker.start();
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
broker.stop();
|
||||
}
|
||||
|
||||
public void testReconnectClient() throws Exception {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
sendAndReceiveMessage(i);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendAndReceiveMessage(int i) throws Exception {
|
||||
Connection conn = factory.createConnection();
|
||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
conn.start();
|
||||
Destination dest = new ActiveMQQueue("test");
|
||||
MessageProducer producer = sess.createProducer(dest);
|
||||
MessageConsumer consumer = sess.createConsumer(dest);
|
||||
String messageText = "test " + i;
|
||||
try {
|
||||
producer.send(sess.createTextMessage(messageText));
|
||||
TextMessage msg = (TextMessage)consumer.receive(1000);
|
||||
assertEquals(messageText, msg.getText());
|
||||
} finally {
|
||||
producer.close();
|
||||
consumer.close();
|
||||
conn.close();
|
||||
sess.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
String connectionUri = connector.getPublishableConnectString();
|
||||
factory = new ActiveMQConnectionFactory(connectionUri + "?trace=true&soTimeout=1000");
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
broker.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReconnectClient() throws Exception {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
sendAndReceiveMessage(i);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendAndReceiveMessage(int i) throws Exception {
|
||||
Connection conn = factory.createConnection();
|
||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
conn.start();
|
||||
Destination dest = new ActiveMQQueue("test");
|
||||
MessageProducer producer = sess.createProducer(dest);
|
||||
MessageConsumer consumer = sess.createConsumer(dest);
|
||||
String messageText = "test " + i;
|
||||
try {
|
||||
producer.send(sess.createTextMessage(messageText));
|
||||
TextMessage msg = (TextMessage)consumer.receive(1000);
|
||||
assertEquals(messageText, msg.getText());
|
||||
} finally {
|
||||
producer.close();
|
||||
consumer.close();
|
||||
conn.close();
|
||||
sess.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,17 +21,23 @@ import org.apache.activemq.JmsDurableTopicSendReceiveTest;
|
|||
import org.apache.activemq.broker.BrokerService;
|
||||
|
||||
public class HttpJmsDurableTopicSendReceiveTest extends JmsDurableTopicSendReceiveTest {
|
||||
|
||||
protected BrokerService broker;
|
||||
|
||||
private String connectionUri;
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
if (broker == null) {
|
||||
broker = createBroker();
|
||||
broker.start();
|
||||
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
|
||||
}
|
||||
super.setUp();
|
||||
WaitForJettyListener.waitForJettySocketToAccept(getBrokerURL());
|
||||
WaitForJettyListener.waitForJettySocketToAccept(connectionUri);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
if (broker != null) {
|
||||
|
@ -39,13 +45,14 @@ public class HttpJmsDurableTopicSendReceiveTest extends JmsDurableTopicSendRecei
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ActiveMQConnectionFactory createConnectionFactory() {
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURL());
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
|
||||
return connectionFactory;
|
||||
}
|
||||
|
||||
protected String getBrokerURL() {
|
||||
return "http://localhost:8161";
|
||||
return "http://localhost:0";
|
||||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue