From 09bae42a83c2c20867b0e7ac9c101837c5f06b5c Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Wed, 13 May 2009 16:43:31 +0000 Subject: [PATCH] partial fix for https://issues.apache.org/activemq/browse/AMQ-2238 - http transport improvements git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@774421 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/TransportConnector.java | 1 - activemq-optional/pom.xml | 4 -- .../transport/http/HttpClientTransport.java | 43 ++++++------- .../transport/http/HttpTunnelServlet.java | 10 ++- .../http/HttpClientReconnectTest.java | 63 +++++++++++++++++++ .../http/HttpTransportBrokerTest.java | 16 ++++- .../https/HttpsTransportBrokerTest.java | 2 +- pom.xml | 2 +- 8 files changed, 110 insertions(+), 31 deletions(-) create mode 100644 activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java index c6d2ce44e4..7f501799f0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java @@ -217,7 +217,6 @@ public class TransportConnector implements Connector, BrokerServiceAware { } } }; - startThread.setPriority(4); startThread.start(); } catch (Exception e) { String remoteHost = transport.getRemoteAddress(); diff --git a/activemq-optional/pom.xml b/activemq-optional/pom.xml index 34ec9ca526..3582e6ffbc 100755 --- a/activemq-optional/pom.xml +++ b/activemq-optional/pom.xml @@ -159,10 +159,6 @@ maven-surefire-plugin - - **/http/* - **/https/* - diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java b/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java index 347568a4b0..6bbe16dcb3 100755 --- a/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java +++ b/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.net.URI; +import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.util.TextWireFormat; import org.apache.activemq.util.ByteArrayInputStream; @@ -29,10 +30,15 @@ import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.ServiceStopper; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpMethod; +import org.apache.commons.httpclient.HttpMethodRetryHandler; import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.httpclient.NoHttpResponseException; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.methods.HeadMethod; +import org.apache.commons.httpclient.methods.InputStreamRequestEntity; import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.params.HttpClientParams; +import org.apache.commons.httpclient.params.HttpMethodParams; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,7 +60,8 @@ public class HttpClientTransport extends HttpTransportSupport { private final String clientID = CLIENT_ID_GENERATOR.generateId(); private boolean trace; - + private GetMethod httpMethod; + public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) { super(wireFormat, remoteUrl); } @@ -68,23 +75,30 @@ public class HttpClientTransport extends HttpTransportSupport { if (isStopped()) { throw new IOException("stopped."); } - PostMethod httpMethod = new PostMethod(getRemoteUrl().toString()); configureMethod(httpMethod); String data = getTextWireFormat().marshalText(command); byte[] bytes = data.getBytes("UTF-8"); - httpMethod.setRequestBody(new ByteArrayInputStream(bytes)); + InputStreamRequestEntity entity = new InputStreamRequestEntity(new ByteArrayInputStream(bytes)); + httpMethod.setRequestEntity(entity); try { HttpClient client = getSendHttpClient(); - client.setTimeout(MAX_CLIENT_TIMEOUT); + HttpClientParams params = new HttpClientParams(); + params.setSoTimeout(MAX_CLIENT_TIMEOUT); + client.setParams(params); int answer = client.executeMethod(httpMethod); if (answer != HttpStatus.SC_OK) { throw new IOException("Failed to post command: " + command + " as response was: " + answer); } - - // checkSession(httpMethod); + if (command instanceof ShutdownInfo) { + try { + stop(); + } catch (Exception e) { + LOG.warn("Error trying to stop HTTP client: "+ e, e); + } + } } catch (IOException e) { throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e); } finally { @@ -105,7 +119,7 @@ public class HttpClientTransport extends HttpTransportSupport { while (!isStopped() && !isStopping()) { - GetMethod httpMethod = new GetMethod(remoteUrl.toString()); + httpMethod = new GetMethod(remoteUrl.toString()); configureMethod(httpMethod); try { @@ -124,7 +138,6 @@ public class HttpClientTransport extends HttpTransportSupport { break; } } else { - // checkSession(httpMethod); DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream()); Object command = (Object)getTextWireFormat().unmarshal(stream); if (command == null) { @@ -137,7 +150,6 @@ public class HttpClientTransport extends HttpTransportSupport { onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e)); break; } finally { - httpMethod.getResponseBody(); httpMethod.releaseConnection(); } } @@ -187,6 +199,7 @@ public class HttpClientTransport extends HttpTransportSupport { } protected void doStop(ServiceStopper stopper) throws Exception { + httpMethod.abort(); } protected HttpClient createHttpClient() { @@ -209,16 +222,4 @@ public class HttpClientTransport extends HttpTransportSupport { this.trace = trace; } - // protected void checkSession(HttpMethod client) { - // Header header = client.getRequestHeader("Set-Cookie"); - // if (header != null) { - // String set_cookie = header.getValue(); - // - // if (set_cookie != null && set_cookie.startsWith("JSESSIONID=")) { - // String[] bits = set_cookie.split("[=;]"); - // sessionID = bits[1]; - // } - // } - // } - } diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java b/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java index 7203a2ea4d..bf44d44d75 100755 --- a/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java +++ b/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java @@ -175,12 +175,20 @@ public class HttpTunnelServlet extends HttpServlet { answer = createTransportChannel(); clients.put(clientID, answer); listener.onAccept(answer); + //wait for the transport to connect + while (!answer.isConnected()) { + try { + Thread.sleep(100); + } catch (InterruptedException ignore) { + } + } return answer; } } protected BlockingQueueTransport createTransportChannel() { - return new BlockingQueueTransport(new ArrayBlockingQueue(10)); + // return new BlockingQueueTransport(new LinkedBlockingQueue()); + return new BlockingQueueTransport(new ArrayBlockingQueue(10)); } protected TextWireFormat createWireFormat() { diff --git a/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java b/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java new file mode 100644 index 0000000000..33090fe683 --- /dev/null +++ b/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpClientReconnectTest.java @@ -0,0 +1,63 @@ +package org.apache.activemq.transport.http; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; + +public class HttpClientReconnectTest extends TestCase { + + BrokerService broker; + ActiveMQConnectionFactory factory; + + protected void setUp() throws Exception { + broker = new BrokerService(); + broker.addConnector("http://localhost:61666?trace=true"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.deleteAllMessages(); + broker.start(); + factory = new ActiveMQConnectionFactory("http://localhost:61666?trace=true"); + } + + protected void tearDown() throws Exception { + broker.stop(); + } + + public void testReconnectClient() throws Exception { + for (int i = 0; i < 100; i++) { + sendAndReceiveMessage(i); + } + } + + private void sendAndReceiveMessage(int i) throws Exception { + Connection conn = factory.createConnection(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + Destination dest = new ActiveMQQueue("test"); + MessageProducer producer = sess.createProducer(dest); + MessageConsumer consumer = sess.createConsumer(dest); + String messageText = "test " + i; + try { + producer.send(sess.createTextMessage(messageText)); + TextMessage msg = (TextMessage)consumer.receive(1000); + assertEquals(messageText, msg.getText()); + } finally { + producer.close(); + consumer.close(); + conn.close(); + sess.close(); + } + } + + + +} diff --git a/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpTransportBrokerTest.java b/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpTransportBrokerTest.java index 485d19d13a..877722df63 100755 --- a/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpTransportBrokerTest.java +++ b/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpTransportBrokerTest.java @@ -16,8 +16,13 @@ */ package org.apache.activemq.transport.http; +import java.net.URI; + import junit.framework.Test; import junit.textui.TestRunner; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.transport.TransportBrokerTestSupport; public class HttpTransportBrokerTest extends TransportBrokerTestSupport { @@ -29,12 +34,19 @@ public class HttpTransportBrokerTest extends TransportBrokerTestSupport { protected void setUp() throws Exception { maxWait = 2000; super.setUp(); + Thread.sleep(500); } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false&useJmx=false")); + connector = broker.addConnector(getBindLocation()); + return broker; + } - protected void tearDown() throws Exception { + protected void tearDown() throws Exception { super.tearDown(); // Give the jetty server enough time to shutdown before starting another one - Thread.sleep(300); + Thread.sleep(500); } public static Test suite() { diff --git a/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java b/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java index f417c31c31..92f3510f33 100644 --- a/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java +++ b/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java @@ -36,7 +36,7 @@ public class HttpsTransportBrokerTest extends HttpTransportBrokerTest { //System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager"); super.setUp(); - Thread.sleep(5000); + Thread.sleep(500); } public static Test suite() { diff --git a/pom.xml b/pom.xml index a87d4fd3fe..50fb218426 100755 --- a/pom.xml +++ b/pom.xml @@ -48,7 +48,7 @@ 3.2.1 1.2.0 1.2.2 - 2.0.1 + 3.1 1.1 1.4 1.0