Moves the HTTP client up to the newest v4.1.2 release. Update some additional dependencies as well.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1179226 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-10-05 13:49:50 +00:00
parent 71d4e2a2aa
commit 082fdc5627
5 changed files with 268 additions and 255 deletions

View File

@ -111,8 +111,9 @@
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient-version}</version>
</dependency>
<dependency>

View File

@ -31,10 +31,13 @@ import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryListener;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.DeleteMethod;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PutMethod;
import org.apache.http.client.HttpClient;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,19 +46,19 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
private static final Logger LOG = LoggerFactory.getLogger(HTTPDiscoveryAgent.class);
private String registryURL = "http://localhost:8080/discovery-registry/default";
private HttpClient httpClient = new HttpClient();
private HttpClient httpClient = new DefaultHttpClient();
private AtomicBoolean running = new AtomicBoolean();
private final AtomicReference<DiscoveryListener> discoveryListener = new AtomicReference<DiscoveryListener>();
private final HashSet<String> registeredServices = new HashSet<String>();
private final HashMap<String, SimpleDiscoveryEvent> discoveredServices = new HashMap<String, SimpleDiscoveryEvent>();
private Thread thread;
private long updateInterval = 1000 * 10;
@SuppressWarnings("unused")
private String brokerName;
private boolean startEmbeddRegistry = false;
private Service jetty;
private AtomicInteger startCounter = new AtomicInteger(0);
private long initialReconnectDelay = 1000;
private long maxReconnectDelay = 1000 * 30;
private long backOffMultiplier = 2;
@ -75,10 +78,8 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
public SimpleDiscoveryEvent(String service) {
super(service);
}
}
public String getGroup() {
return null;
}
@ -93,45 +94,39 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
synchronized private void doRegister(String service) {
String url = registryURL;
try {
PutMethod method = new PutMethod(url);
// method.setParams(createParams());
method.setRequestHeader("service", service);
int responseCode = httpClient.executeMethod(method);
LOG.debug("PUT to "+url+" got a "+responseCode);
HttpPut method = new HttpPut(url);
method.addHeader("service", service);
ResponseHandler<String> handler = new BasicResponseHandler();
String responseBody = httpClient.execute(method, handler);
LOG.debug("PUT to " + url + " got a " + responseBody);
} catch (Exception e) {
LOG.debug("PUT to " + url + " failed with: " + e);
}
}
@SuppressWarnings("unused")
synchronized private void doUnRegister(String service) {
String url = registryURL;
try {
DeleteMethod method = new DeleteMethod(url);
// method.setParams(createParams());
method.setRequestHeader("service", service);
int responseCode = httpClient.executeMethod(method);
LOG.debug("DELETE to "+url+" got a "+responseCode);
HttpDelete method = new HttpDelete(url);
method.addHeader("service", service);
ResponseHandler<String> handler = new BasicResponseHandler();
String responseBody = httpClient.execute(method, handler);
LOG.debug("DELETE to " + url + " got a " + responseBody);
} catch (Exception e) {
LOG.debug("DELETE to " + url + " failed with: " + e);
}
}
// private HttpMethodParams createParams() {
// HttpMethodParams params = new HttpMethodParams();
// params.setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(0,false));
// return params;
// }
synchronized private Set<String> doLookup(long freshness) {
String url = registryURL + "?freshness=" + freshness;
try {
GetMethod method = new GetMethod(url);
// method.setParams(createParams());
int responseCode = httpClient.executeMethod(method);
LOG.debug("GET to "+url+" got a "+responseCode);
if( responseCode == 200 ) {
HttpGet method = new HttpGet(url);
ResponseHandler<String> handler = new BasicResponseHandler();
String response = httpClient.execute(method, handler);
LOG.debug("GET to " + url + " got a " + response);
Set<String> rc = new HashSet<String>();
Scanner scanner = new Scanner(method.getResponseBodyAsStream());
Scanner scanner = new Scanner(response);
while (scanner.hasNextLine()) {
String service = scanner.nextLine();
if (service.trim().length() != 0) {
@ -139,10 +134,6 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
}
}
return rc;
} else {
LOG.debug("GET to "+url+" failed with response code: "+responseCode);
return null;
}
} catch (Exception e) {
LOG.debug("GET to " + url + " failed with: " + e);
return null;
@ -159,15 +150,18 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
Thread thread = new Thread() {
public void run() {
// We detect a failed connection attempt because the service
// We detect a failed connection attempt because the
// service
// fails right away.
if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
LOG.debug("Failure occured soon after the discovery event was generated. It will be clasified as a connection failure: "+event);
LOG.debug("Failure occured soon after the discovery event was generated. " +
"It will be clasified as a connection failure: " + event);
event.connectFailures++;
if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled.");
LOG.debug("Reconnect attempts exceeded " + maxReconnectAttempts +
" tries. Reconnecting has been disabled.");
return;
}
@ -176,7 +170,8 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
if (!running.get() || event.removed.get()) {
return;
}
LOG.debug("Waiting "+event.reconnectDelay+" ms before attepting to reconnect.");
LOG.debug("Waiting " + event.reconnectDelay +
" ms before attepting to reconnect.");
sleepMutex.wait(event.reconnectDelay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@ -214,7 +209,6 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
}
}
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
@ -230,7 +224,7 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
if (startCounter.addAndGet(1) == 1) {
if (startEmbeddRegistry) {
jetty = createEmbeddedJettyServer();
Map props = new HashMap();
Map<String, Object> props = new HashMap<String, Object>();
props.put("agent", this);
IntrospectionSupport.setProperties(jetty, props);
jetty.start();
@ -256,14 +250,14 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
}
/**
* Create the EmbeddedJettyServer instance via reflection so that we can avoid a hard runtime dependency on
* jetty.
* Create the EmbeddedJettyServer instance via reflection so that we can
* avoid a hard runtime dependency on jetty.
*
* @return
* @throws Exception
*/
private Service createEmbeddedJettyServer() throws Exception {
Class clazz = HTTPDiscoveryAgent.class.getClassLoader().loadClass("org.apache.activemq.transport.discovery.http.EmbeddedJettyServer");
Class<?> clazz = HTTPDiscoveryAgent.class.getClassLoader().loadClass("org.apache.activemq.transport.discovery.http.EmbeddedJettyServer");
return (Service) clazz.newInstance();
}
@ -279,7 +273,8 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
DiscoveryListener discoveryListener = this.discoveryListener.get();
if (discoveryListener != null) {
Set<String> activeServices = doLookup(updateInterval * 3);
// If there is error talking the the central server, then activeServices == null
// If there is error talking the the central server, then
// activeServices == null
if (activeServices != null) {
synchronized (discoveredServices) {
@ -345,5 +340,4 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
public void setStartEmbeddRegistry(boolean startEmbeddRegistry) {
this.startEmbeddRegistry = startEmbeddRegistry;
}
}

View File

@ -24,18 +24,25 @@ import java.net.URI;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.HeadMethod;
import org.apache.commons.httpclient.methods.InputStreamRequestEntity;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.params.HttpClientParams;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.params.ConnRoutePNames;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.message.AbstractHttpMessage;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,8 +50,6 @@ import org.slf4j.LoggerFactory;
* A HTTP {@link org.apache.activemq.transport.TransportChannel} which uses the
* <a href="http://jakarta.apache.org/commons/httpclient/">commons-httpclient</a>
* library
*
*
*/
public class HttpClientTransport extends HttpTransportSupport {
@ -57,7 +62,7 @@ public class HttpClientTransport extends HttpTransportSupport {
private final String clientID = CLIENT_ID_GENERATOR.generateId();
private boolean trace;
private GetMethod httpMethod;
private HttpGet httpMethod;
private volatile int receiveCounter;
private int soTimeout = MAX_CLIENT_TIMEOUT;
@ -75,21 +80,22 @@ public class HttpClientTransport extends HttpTransportSupport {
if (isStopped()) {
throw new IOException("stopped.");
}
PostMethod httpMethod = new PostMethod(getRemoteUrl().toString());
HttpPost httpMethod = new HttpPost(getRemoteUrl().toString());
configureMethod(httpMethod);
String data = getTextWireFormat().marshalText(command);
byte[] bytes = data.getBytes("UTF-8");
InputStreamRequestEntity entity = new InputStreamRequestEntity(new ByteArrayInputStream(bytes));
httpMethod.setRequestEntity(entity);
ByteArrayEntity entity = new ByteArrayEntity(bytes);
httpMethod.setEntity(entity);
HttpClient client = null;
HttpResponse answer = null;
try {
HttpClient client = getSendHttpClient();
HttpClientParams params = new HttpClientParams();
params.setSoTimeout(soTimeout);
client.setParams(params);
int answer = client.executeMethod(httpMethod);
if (answer != HttpStatus.SC_OK) {
client = getSendHttpClient();
HttpParams params = client.getParams();
HttpConnectionParams.setSoTimeout(params, soTimeout);
answer = client.execute(httpMethod);
int status = answer.getStatusLine().getStatusCode();
if (status != HttpStatus.SC_OK) {
throw new IOException("Failed to post command: " + command + " as response was: " + answer);
}
if (command instanceof ShutdownInfo) {
@ -102,8 +108,9 @@ public class HttpClientTransport extends HttpTransportSupport {
} catch (IOException e) {
throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e);
} finally {
httpMethod.getResponseBody();
httpMethod.releaseConnection();
if (answer != null) {
EntityUtils.consume(answer.getEntity());
}
}
}
@ -119,13 +126,15 @@ public class HttpClientTransport extends HttpTransportSupport {
while (!isStopped() && !isStopping()) {
httpMethod = new GetMethod(remoteUrl.toString());
httpMethod = new HttpGet(remoteUrl.toString());
configureMethod(httpMethod);
HttpResponse answer = null;
try {
int answer = httpClient.executeMethod(httpMethod);
if (answer != HttpStatus.SC_OK) {
if (answer == HttpStatus.SC_REQUEST_TIMEOUT) {
answer = httpClient.execute(httpMethod);
int status = answer.getStatusLine().getStatusCode();
if (status != HttpStatus.SC_OK) {
if (status == HttpStatus.SC_REQUEST_TIMEOUT) {
LOG.debug("GET timed out");
try {
Thread.sleep(1000);
@ -139,19 +148,25 @@ public class HttpClientTransport extends HttpTransportSupport {
}
} else {
receiveCounter++;
DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream());
DataInputStream stream = new DataInputStream(answer.getEntity().getContent());
Object command = (Object)getTextWireFormat().unmarshal(stream);
if (command == null) {
LOG.debug("Received null command from url: " + remoteUrl);
} else {
doConsume(command);
}
stream.close();
}
} catch (IOException e) {
onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e));
break;
} finally {
httpMethod.releaseConnection();
if (answer != null) {
try {
EntityUtils.consume(answer.getEntity());
} catch (IOException e) {
}
}
}
}
}
@ -188,12 +203,13 @@ public class HttpClientTransport extends HttpTransportSupport {
HttpClient httpClient = getReceiveHttpClient();
URI remoteUrl = getRemoteUrl();
HeadMethod httpMethod = new HeadMethod(remoteUrl.toString());
HttpHead httpMethod = new HttpHead(remoteUrl.toString());
configureMethod(httpMethod);
int answer = httpClient.executeMethod(httpMethod);
if (answer != HttpStatus.SC_OK) {
throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
ResponseHandler<String> handler = new BasicResponseHandler();
try {
httpClient.execute(httpMethod, handler);
} catch(Exception e) {
throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + e.getMessage());
}
super.doStart();
@ -206,15 +222,16 @@ public class HttpClientTransport extends HttpTransportSupport {
}
protected HttpClient createHttpClient() {
HttpClient client = new HttpClient();
HttpClient client = new DefaultHttpClient();
if (getProxyHost() != null) {
client.getHostConfiguration().setProxy(getProxyHost(), getProxyPort());
HttpHost proxy = new HttpHost(getProxyHost(), getProxyPort());
client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
}
return client;
}
protected void configureMethod(HttpMethod method) {
method.setRequestHeader("clientID", clientID);
protected void configureMethod(AbstractHttpMessage method) {
method.setHeader("clientID", clientID);
}
public boolean isTrace() {

View File

@ -146,8 +146,8 @@
<!-- commons -->
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>

17
pom.xml
View File

@ -45,16 +45,16 @@
<aopalliance-version>1.0</aopalliance-version>
<aries-version>0.2-incubating</aries-version>
<axion-version>1.0-M3-dev</axion-version>
<axis-version>1.4</axis-version>
<axis-version>1.4.2</axis-version>
<camel-version>2.8.1</camel-version>
<camel-web-version>2.4.0</camel-web-version>
<cglib-version>2.0</cglib-version>
<openjpa-version>1.2.0</openjpa-version>
<commons-beanutils-version>1.8.3</commons-beanutils-version>
<commons-collections-version>3.2.1</commons-collections-version>
<commons-daemon-version>1.0.5</commons-daemon-version>
<commons-daemon-version>1.0.7</commons-daemon-version>
<commons-dbcp-version>1.4</commons-dbcp-version>
<commons-httpclient-version>3.1</commons-httpclient-version>
<httpclient-version>4.1.2</httpclient-version>
<commons-io-version>1.4</commons-io-version>
<commons-lang-version>2.6</commons-lang-version>
<commons-logging-version>1.1.1</commons-logging-version>
@ -64,6 +64,7 @@
<directory-version>1.5.7</directory-version>
<geronimo-version>1.0</geronimo-version>
<howl-version>0.1.8</howl-version>
<hawtbuf-version>1.6</hawtbuf-version>
<hsqldb-version>1.7.2.2</hsqldb-version>
<jasypt-version>1.8</jasypt-version>
<jdom-version>1.0</jdom-version>
@ -73,7 +74,7 @@
<jettison-version>1.3</jettison-version>
<jmock-version>2.5.1</jmock-version>
<josql-version>1.5</josql-version>
<junit-version>4.8.2</junit-version>
<junit-version>4.10</junit-version>
<jxta-version>2.0</jxta-version>
<karaf-version>2.2.0</karaf-version>
<log4j-version>1.2.16</log4j-version>
@ -87,7 +88,7 @@
<xmlbeans-version>2.2.0</xmlbeans-version>
<xpp3-version>1.1.4c</xpp3-version>
<xstream-version>1.4.1</xstream-version>
<xbean-version>3.7</xbean-version>
<xbean-version>3.8</xbean-version>
<velocity-version>1.6.4</velocity-version>
<ftpserver-version>1.0.0</ftpserver-version>
<activemq-protobuf-version>1.1</activemq-protobuf-version>
@ -811,9 +812,9 @@
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>${commons-httpclient-version}</version>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient-version}</version>
</dependency>
<dependency>