SOLR-462: Changes to CommonsHttpSolrServer.java

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@613304 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan McKinley 2008-01-18 22:46:58 +00:00
parent ef545d107f
commit 24006c12d2
2 changed files with 198 additions and 89 deletions

View File

@ -30,6 +30,13 @@ Changes in runtime behavior
The API to build documents has changed -- you need to pass a boost
(or null) with every field. (ryan)
3. SOLR-462: Changes to CommonsHttpSolrServer.java to add soTimeout (read
timeout), connection pool timeout, directive to not follow HTTP redirects,
configurable retries on NoHttpResponseException, compression, and not
creating a new HttpClient on each request. If your existing code overrides
getHttpConnection(), you will now need to override createHttpClient()
(Sean Timm via ryan)
Bug Fixes
Other Changes

View File

@ -18,6 +18,7 @@
package org.apache.solr.client.solrj.impl;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
@ -25,13 +26,17 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.Iterator;
import java.util.zip.GZIPInputStream;
import java.util.zip.InflaterInputStream;
import org.apache.commons.httpclient.Header;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.httpclient.HttpMethodBase;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.NoHttpResponseException;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.InputStreamRequestEntity;
import org.apache.commons.httpclient.methods.PostMethod;
@ -66,7 +71,11 @@ public class CommonsHttpSolrServer extends BaseSolrServer
protected String _baseURL;
protected ModifiableSolrParams _invariantParams;
protected ResponseParser _processor;
MultiThreadedHttpConnectionManager _connectionManager = new MultiThreadedHttpConnectionManager();
private final HttpClient _httpClient;
private boolean _followRedirects = false;
private boolean _allowCompression = false;
private int _maxRetries = 0;
/**
* @param solrServerUrl The URL of the Solr server. For
@ -91,6 +100,8 @@ public class CommonsHttpSolrServer extends BaseSolrServer
this._baseURL = this._baseURL.substring( 0, this._baseURL.length()-1 );
}
this._httpClient = createHttpClient();
// increase the default connections
this.setDefaultMaxConnectionsPerHost( 32 ); // 2
this.setMaxTotalConnections( 128 ); // 20
@ -103,6 +114,14 @@ public class CommonsHttpSolrServer extends BaseSolrServer
_invariantParams.set( CommonParams.WT, _processor.getWriterType() );
_invariantParams.set( CommonParams.VERSION, "2.2" );
}
/**
* This can be overridden to add certificates etc
*/
protected HttpClient createHttpClient()
{
return new HttpClient( new MultiThreadedHttpConnectionManager() );
}
//------------------------------------------------------------------------
//------------------------------------------------------------------------
@ -136,103 +155,121 @@ public class CommonsHttpSolrServer extends BaseSolrServer
params = new DefaultSolrParams( _invariantParams, params );
}
int tries = _maxRetries + 1;
try {
if( SolrRequest.METHOD.GET == request.getMethod() ) {
if( streams != null ) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!" );
}
method = new GetMethod( _baseURL + path + ClientUtils.toQueryString( params, false ) );
}
else if( SolrRequest.METHOD.POST == request.getMethod() ) {
String url = _baseURL + path;
boolean isMultipart = ( streams != null && streams.size() > 1 );
if( streams == null || isMultipart ) {
// Without streams, just post the parameters
PostMethod post = new PostMethod( url );
Iterator<String> iter = params.getParameterNamesIterator();
while( iter.hasNext() ) {
String p = iter.next();
String[] vals = params.getParams( p );
if( vals != null && vals.length > 0 ) {
for( String v : vals ) {
post.addParameter( p, (v==null)?null:v );
}
}
else {
post.addParameter( p, null );
while( tries-- > 0 ) {
// Note: since we aren't do intermittent time keeping
// ourselves, the potential non-timeout latency could be as
// much as tries-times (plus scheduling effects) the given
// timeAllowed.
try {
if( SolrRequest.METHOD.GET == request.getMethod() ) {
if( streams != null ) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!" );
}
method = new GetMethod( _baseURL + path + ClientUtils.toQueryString( params, false ) );
}
post.getParams().setContentCharset("UTF-8");
if( isMultipart ) {
int i=0;
Part[] parts = new Part[streams.size()];
for( ContentStream content : streams ) {
final ContentStream c = content;
String charSet = null;
String transferEncoding = null;
parts[i++] = new PartBase( c.getName(), c.getContentType(), charSet, transferEncoding ) {
@Override
protected long lengthOfData() throws IOException {
return c.getSize();
}
@Override
protected void sendData(OutputStream out) throws IOException {
IOUtils.copy( c.getReader(), out );
}
};
}
// Set the multi-part request
post.setRequestEntity(
new MultipartRequestEntity(
parts,
post.getParams() )
);
method = post;
}
method = post;
}
// It is has one stream, it is the post body, put the params in the URL
else {
String pstr = ClientUtils.toQueryString( params, false );
PostMethod post = new PostMethod( url+pstr );
else if( SolrRequest.METHOD.POST == request.getMethod() ) {
// Single stream as body
// Using a loop just to get the first one
for( ContentStream content : streams ) {
post.setRequestEntity(
new InputStreamRequestEntity(
content.getStream(),
content.getContentType()) );
break;
String url = _baseURL + path;
boolean isMultipart = ( streams != null && streams.size() > 1 );
if( streams == null || isMultipart ) {
// Without streams, just post the parameters
PostMethod post = new PostMethod( url );
Iterator<String> iter = params.getParameterNamesIterator();
while( iter.hasNext() ) {
String p = iter.next();
String[] vals = params.getParams( p );
if( vals != null && vals.length > 0 ) {
for( String v : vals ) {
post.addParameter( p, (v==null)?null:v );
}
}
else {
post.addParameter( p, null );
}
}
post.getParams().setContentCharset("UTF-8");
if( isMultipart ) {
int i=0;
Part[] parts = new Part[streams.size()];
for( ContentStream content : streams ) {
final ContentStream c = content;
String charSet = null;
String transferEncoding = null;
parts[i++] = new PartBase( c.getName(), c.getContentType(), charSet, transferEncoding ) {
@Override
protected long lengthOfData() throws IOException {
return c.getSize();
}
@Override
protected void sendData(OutputStream out) throws IOException {
IOUtils.copy( c.getReader(), out );
}
};
}
// Set the multi-part request
post.setRequestEntity( new MultipartRequestEntity( parts, post.getParams() ) );
method = post;
}
method = post;
}
// It is has one stream, it is the post body, put the params in the URL
else {
String pstr = ClientUtils.toQueryString( params, false );
PostMethod post = new PostMethod( url+pstr );
// Single stream as body
// Using a loop just to get the first one
for( ContentStream content : streams ) {
post.setRequestEntity(
new InputStreamRequestEntity( content.getStream(), content.getContentType())
);
break;
}
method = post;
}
}
else {
throw new SolrServerException("Unsupported method: "+request.getMethod() );
}
method = post;
}
}
else {
throw new SolrServerException("Unsupported method: "+request.getMethod() );
catch( NoHttpResponseException r ) {
// This is generally safe to retry on
method.releaseConnection();
method = null;
// If out of tries then just rethrow (as normal error).
if( ( tries < 1 ) ) {
throw r;
}
//log.warn( "Caught: " + r + ". Retrying..." );
}
}
}
catch( IOException ex ) {
throw new SolrServerException("error reading streams", ex );
}
method.setFollowRedirects( _followRedirects );
method.addRequestHeader( "User-Agent", AGENT );
if( _allowCompression ) {
method.setRequestHeader( new Header( "Accept-Encoding", "gzip,deflate" ) );
}
try {
// Execute the method.
//System.out.println( "EXECUTE:"+method.getURI() );
int statusCode = getHttpConnection().executeMethod(method);
int statusCode = _httpClient.executeMethod(method);
if (statusCode != HttpStatus.SC_OK) {
StringBuilder msg = new StringBuilder();
msg.append( method.getStatusLine().getReasonPhrase() );
@ -248,7 +285,41 @@ public class CommonsHttpSolrServer extends BaseSolrServer
if( method instanceof HttpMethodBase ) {
charset = ((HttpMethodBase)method).getResponseCharSet();
}
Reader reader = new InputStreamReader( method.getResponseBodyAsStream(), charset );
InputStream respBody = method.getResponseBodyAsStream();
// Jakarta Commons HTTPClient doesn't handle any
// compression natively. Handle gzip or deflate
// here if applicable.
if( _allowCompression ) {
Header contentEncodingHeader = method.getResponseHeader( "Content-Encoding" );
if( contentEncodingHeader != null ) {
String contentEncoding = contentEncodingHeader.getValue();
if( contentEncoding.contains( "gzip" ) ) {
//log.debug( "wrapping response in GZIPInputStream" );
respBody = new GZIPInputStream( respBody );
}
else if( contentEncoding.contains( "deflate" ) ) {
//log.debug( "wrapping response in InflaterInputStream" );
respBody = new InflaterInputStream(respBody);
}
}
else {
Header contentTypeHeader = method.getResponseHeader( "Content-Type" );
if( contentTypeHeader != null ) {
String contentType = contentTypeHeader.getValue();
if( contentType != null ) {
if( contentType.startsWith( "application/x-gzip-compressed" ) ) {
//log.debug( "wrapping response in GZIPInputStream" );
respBody = new GZIPInputStream( respBody );
}
else if ( contentType.startsWith("application/x-deflate") ) {
//log.debug( "wrapping response in InflaterInputStream" );
respBody = new InflaterInputStream(respBody);
}
}
}
}
}
Reader reader = new InputStreamReader( respBody, charset );
return _processor.processResponse( reader );
}
catch (HttpException e) {
@ -290,27 +361,58 @@ public class CommonsHttpSolrServer extends BaseSolrServer
_processor = processor;
}
protected HttpClient getHttpConnection() {
return new HttpClient(_connectionManager);
}
public MultiThreadedHttpConnectionManager getConnectionManager() {
return _connectionManager;
return (MultiThreadedHttpConnectionManager)_httpClient.getHttpConnectionManager();
}
/** set connectionTimeout on the underlying MultiThreadedHttpConnectionManager */
public void setConnectionTimeout(int timeout) {
_connectionManager.getParams().setConnectionTimeout(timeout);
getConnectionManager().getParams().setConnectionTimeout(timeout);
}
/** set connectionManagerTimeout on the HttpClient.**/
public void setConnectionManagerTimeout(int timeout) {
_httpClient.getParams().setConnectionManagerTimeout(timeout);
}
/** set soTimeout (read timeout) on the underlying MultiThreadedHttpConnectionManager. This is desirable for queries, but probably not for indexing. */
public void setSoTimeout(int timeout) {
getConnectionManager().getParams().setSoTimeout(timeout);
}
/** set maxConnectionsPerHost on the underlying MultiThreadedHttpConnectionManager */
public void setDefaultMaxConnectionsPerHost(int connections) {
_connectionManager.getParams().setDefaultMaxConnectionsPerHost(connections);
getConnectionManager().getParams().setDefaultMaxConnectionsPerHost(connections);
}
/** set maxTotalConnection on the underlying MultiThreadedHttpConnectionManager */
public void setMaxTotalConnections(int connections) {
_connectionManager.getParams().setMaxTotalConnections(connections);
getConnectionManager().getParams().setMaxTotalConnections(connections);
}
/**
* set followRedirects. This defaults to false under the
* assumption that if you are following a redirect to get to a Solr
* installation, something is misconfigured somewhere.
*/
public void setFollowRedirects( boolean followRedirects ) {
_followRedirects = followRedirects;
}
/**
* set allowCompression. If compression is enabled, both gzip and
* deflate compression will be accepted in the HTTP response.
*/
public void setAllowCompression( boolean allowCompression ) {
_allowCompression = allowCompression;
}
/**
* set maximum number of retries to attempt in the event of
* transient errors. Default: 0 (no) retries. No more than 1
* recommended.
*/
public void setMaxRetries( int maxRetries ) {
_maxRetries = maxRetries;
}
}