partial fix for https://issues.apache.org/activemq/browse/AMQ-2238 - http transport improvements

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@774421 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-05-13 16:43:31 +00:00
parent dd1b847ad7
commit 09bae42a83
8 changed files with 110 additions and 31 deletions

View File

@ -217,7 +217,6 @@ public class TransportConnector implements Connector, BrokerServiceAware {
} }
} }
}; };
startThread.setPriority(4);
startThread.start(); startThread.start();
} catch (Exception e) { } catch (Exception e) {
String remoteHost = transport.getRemoteAddress(); String remoteHost = transport.getRemoteAddress();

View File

@ -159,10 +159,6 @@
<plugin> <plugin>
<artifactId>maven-surefire-plugin</artifactId> <artifactId>maven-surefire-plugin</artifactId>
<configuration> <configuration>
<excludes>
<exclude>**/http/*</exclude>
<exclude>**/https/*</exclude>
</excludes>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.URI; import java.net.URI;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.util.TextWireFormat; import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.util.ByteArrayInputStream; import org.apache.activemq.util.ByteArrayInputStream;
@ -29,10 +30,15 @@ import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpMethod; import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.httpclient.HttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpStatus; import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.NoHttpResponseException;
import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.HeadMethod; import org.apache.commons.httpclient.methods.HeadMethod;
import org.apache.commons.httpclient.methods.InputStreamRequestEntity;
import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.params.HttpClientParams;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -54,6 +60,7 @@ public class HttpClientTransport extends HttpTransportSupport {
private final String clientID = CLIENT_ID_GENERATOR.generateId(); private final String clientID = CLIENT_ID_GENERATOR.generateId();
private boolean trace; private boolean trace;
private GetMethod httpMethod;
public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) { public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
super(wireFormat, remoteUrl); super(wireFormat, remoteUrl);
@ -68,23 +75,30 @@ public class HttpClientTransport extends HttpTransportSupport {
if (isStopped()) { if (isStopped()) {
throw new IOException("stopped."); throw new IOException("stopped.");
} }
PostMethod httpMethod = new PostMethod(getRemoteUrl().toString()); PostMethod httpMethod = new PostMethod(getRemoteUrl().toString());
configureMethod(httpMethod); configureMethod(httpMethod);
String data = getTextWireFormat().marshalText(command); String data = getTextWireFormat().marshalText(command);
byte[] bytes = data.getBytes("UTF-8"); byte[] bytes = data.getBytes("UTF-8");
httpMethod.setRequestBody(new ByteArrayInputStream(bytes)); InputStreamRequestEntity entity = new InputStreamRequestEntity(new ByteArrayInputStream(bytes));
httpMethod.setRequestEntity(entity);
try { try {
HttpClient client = getSendHttpClient(); HttpClient client = getSendHttpClient();
client.setTimeout(MAX_CLIENT_TIMEOUT); HttpClientParams params = new HttpClientParams();
params.setSoTimeout(MAX_CLIENT_TIMEOUT);
client.setParams(params);
int answer = client.executeMethod(httpMethod); int answer = client.executeMethod(httpMethod);
if (answer != HttpStatus.SC_OK) { if (answer != HttpStatus.SC_OK) {
throw new IOException("Failed to post command: " + command + " as response was: " + answer); throw new IOException("Failed to post command: " + command + " as response was: " + answer);
} }
if (command instanceof ShutdownInfo) {
// checkSession(httpMethod); try {
stop();
} catch (Exception e) {
LOG.warn("Error trying to stop HTTP client: "+ e, e);
}
}
} catch (IOException e) { } catch (IOException e) {
throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e); throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e);
} finally { } finally {
@ -105,7 +119,7 @@ public class HttpClientTransport extends HttpTransportSupport {
while (!isStopped() && !isStopping()) { while (!isStopped() && !isStopping()) {
GetMethod httpMethod = new GetMethod(remoteUrl.toString()); httpMethod = new GetMethod(remoteUrl.toString());
configureMethod(httpMethod); configureMethod(httpMethod);
try { try {
@ -124,7 +138,6 @@ public class HttpClientTransport extends HttpTransportSupport {
break; break;
} }
} else { } else {
// checkSession(httpMethod);
DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream()); DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream());
Object command = (Object)getTextWireFormat().unmarshal(stream); Object command = (Object)getTextWireFormat().unmarshal(stream);
if (command == null) { if (command == null) {
@ -137,7 +150,6 @@ public class HttpClientTransport extends HttpTransportSupport {
onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e)); onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e));
break; break;
} finally { } finally {
httpMethod.getResponseBody();
httpMethod.releaseConnection(); httpMethod.releaseConnection();
} }
} }
@ -187,6 +199,7 @@ public class HttpClientTransport extends HttpTransportSupport {
} }
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
httpMethod.abort();
} }
protected HttpClient createHttpClient() { protected HttpClient createHttpClient() {
@ -209,16 +222,4 @@ public class HttpClientTransport extends HttpTransportSupport {
this.trace = trace; this.trace = trace;
} }
// protected void checkSession(HttpMethod client) {
// Header header = client.getRequestHeader("Set-Cookie");
// if (header != null) {
// String set_cookie = header.getValue();
//
// if (set_cookie != null && set_cookie.startsWith("JSESSIONID=")) {
// String[] bits = set_cookie.split("[=;]");
// sessionID = bits[1];
// }
// }
// }
} }

View File

@ -175,12 +175,20 @@ public class HttpTunnelServlet extends HttpServlet {
answer = createTransportChannel(); answer = createTransportChannel();
clients.put(clientID, answer); clients.put(clientID, answer);
listener.onAccept(answer); listener.onAccept(answer);
//wait for the transport to connect
while (!answer.isConnected()) {
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
}
}
return answer; return answer;
} }
} }
protected BlockingQueueTransport createTransportChannel() { protected BlockingQueueTransport createTransportChannel() {
return new BlockingQueueTransport(new ArrayBlockingQueue(10)); // return new BlockingQueueTransport(new LinkedBlockingQueue<Object>());
return new BlockingQueueTransport(new ArrayBlockingQueue<Object>(10));
} }
protected TextWireFormat createWireFormat() { protected TextWireFormat createWireFormat() {

View File

@ -0,0 +1,63 @@
package org.apache.activemq.transport.http;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
public class HttpClientReconnectTest extends TestCase {
BrokerService broker;
ActiveMQConnectionFactory factory;
protected void setUp() throws Exception {
broker = new BrokerService();
broker.addConnector("http://localhost:61666?trace=true");
broker.setPersistent(false);
broker.setUseJmx(false);
broker.deleteAllMessages();
broker.start();
factory = new ActiveMQConnectionFactory("http://localhost:61666?trace=true");
}
protected void tearDown() throws Exception {
broker.stop();
}
public void testReconnectClient() throws Exception {
for (int i = 0; i < 100; i++) {
sendAndReceiveMessage(i);
}
}
private void sendAndReceiveMessage(int i) throws Exception {
Connection conn = factory.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
Destination dest = new ActiveMQQueue("test");
MessageProducer producer = sess.createProducer(dest);
MessageConsumer consumer = sess.createConsumer(dest);
String messageText = "test " + i;
try {
producer.send(sess.createTextMessage(messageText));
TextMessage msg = (TextMessage)consumer.receive(1000);
assertEquals(messageText, msg.getText());
} finally {
producer.close();
consumer.close();
conn.close();
sess.close();
}
}
}

View File

@ -16,8 +16,13 @@
*/ */
package org.apache.activemq.transport.http; package org.apache.activemq.transport.http;
import java.net.URI;
import junit.framework.Test; import junit.framework.Test;
import junit.textui.TestRunner; import junit.textui.TestRunner;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.transport.TransportBrokerTestSupport; import org.apache.activemq.transport.TransportBrokerTestSupport;
public class HttpTransportBrokerTest extends TransportBrokerTestSupport { public class HttpTransportBrokerTest extends TransportBrokerTestSupport {
@ -29,12 +34,19 @@ public class HttpTransportBrokerTest extends TransportBrokerTestSupport {
protected void setUp() throws Exception { protected void setUp() throws Exception {
maxWait = 2000; maxWait = 2000;
super.setUp(); super.setUp();
Thread.sleep(500);
} }
protected void tearDown() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false&useJmx=false"));
connector = broker.addConnector(getBindLocation());
return broker;
}
protected void tearDown() throws Exception {
super.tearDown(); super.tearDown();
// Give the jetty server enough time to shutdown before starting another one // Give the jetty server enough time to shutdown before starting another one
Thread.sleep(300); Thread.sleep(500);
} }
public static Test suite() { public static Test suite() {

View File

@ -36,7 +36,7 @@ public class HttpsTransportBrokerTest extends HttpTransportBrokerTest {
//System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager"); //System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager");
super.setUp(); super.setUp();
Thread.sleep(5000); Thread.sleep(500);
} }
public static Test suite() { public static Test suite() {

View File

@ -48,7 +48,7 @@
<commons-collections-version>3.2.1</commons-collections-version> <commons-collections-version>3.2.1</commons-collections-version>
<openjpa-version>1.2.0</openjpa-version> <openjpa-version>1.2.0</openjpa-version>
<commons-dbcp-version>1.2.2</commons-dbcp-version> <commons-dbcp-version>1.2.2</commons-dbcp-version>
<commons-httpclient-version>2.0.1</commons-httpclient-version> <commons-httpclient-version>3.1</commons-httpclient-version>
<commons-logging-version>1.1</commons-logging-version> <commons-logging-version>1.1</commons-logging-version>
<commons-pool-version>1.4</commons-pool-version> <commons-pool-version>1.4</commons-pool-version>
<commons-primitives-version>1.0</commons-primitives-version> <commons-primitives-version>1.0</commons-primitives-version>