SOLR-5465: SolrCmdDistributor retry logic has a concurrency race bug.

SOLR-5464: ConcurrentSolrServer does not stream pure delete by id requests.
SOLR-5452: Do not attempt to proxy internal update requests.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1543299 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-11-19 03:13:33 +00:00
parent d6e1114002
commit 8822632048
7 changed files with 120 additions and 22 deletions

View File

@ -108,14 +108,20 @@ Bug Fixes
* SOLR-5451: SyncStrategy closes it's http connection manager before the * SOLR-5451: SyncStrategy closes it's http connection manager before the
executor that uses it in it's close method. (Mark Miller) executor that uses it in it's close method. (Mark Miller)
* SOLR-5452: Do not attempt to proxy internal update requests. (Mark Miller)
* SOLR-5460: SolrDispatchFilter#sendError can get a SolrCore that it does not * SOLR-5460: SolrDispatchFilter#sendError can get a SolrCore that it does not
close. (Mark Miller) close. (Mark Miller)
* SOLR-5461: Request proxying should only set con.setDoOutput(true) if the * SOLR-5461: Request proxying should only set con.setDoOutput(true) if the
request is a post. (Mark Miller) request is a post. (Mark Miller)
* SOLR-5465: SolrCmdDistributor retry logic has a concurrency race bug.
(Mark Miller)
* SOLR-5464: ConcurrentSolrServer does not stream pure delete by id requests.
(Mark Miller)
* SOLR-5452: Do not attempt to proxy internal update requests. (Mark Miller)
Other Changes Other Changes
--------------------- ---------------------

View File

@ -52,6 +52,7 @@ import org.apache.solr.response.QueryResponseWriter;
import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.servlet.cache.HttpCacheHeaderUtil; import org.apache.solr.servlet.cache.HttpCacheHeaderUtil;
import org.apache.solr.servlet.cache.Method; import org.apache.solr.servlet.cache.Method;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.DistributingUpdateProcessorFactory; import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
import org.apache.solr.util.FastWriter; import org.apache.solr.util.FastWriter;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -315,10 +316,12 @@ public class SolrDispatchFilter implements Filter
if (core == null && idx > 0) { if (core == null && idx > 0) {
String coreUrl = getRemotCoreUrl(cores, corename, origCorename); String coreUrl = getRemotCoreUrl(cores, corename, origCorename);
// don't proxy for internal update requests // don't proxy for internal update requests
//solrReq = SolrRequestParsers.DEFAULT.parse(null,path, req); SolrParams queryParams = SolrRequestParsers.parseQueryString(req.getQueryString());
//if (coreUrl != null && solrReq.getParams().get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) { if (coreUrl != null
if (coreUrl != null) { && queryParams
path = path.substring( idx ); .get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null
&& queryParams.get(DistributedUpdateProcessor.DISTRIB_FROM) == null) {
path = path.substring(idx);
remoteQuery(coreUrl + path, req, solrReq, resp); remoteQuery(coreUrl + path, req, solrReq, resp);
return; return;
} else { } else {

View File

@ -69,8 +69,8 @@ public class SolrCmdDistributor {
List<Error> errors = new ArrayList<Error>(this.errors); List<Error> errors = new ArrayList<Error>(this.errors);
errors.addAll(servers.getErrors()); errors.addAll(servers.getErrors());
List<Error> resubmitList = new ArrayList<Error>();
boolean blockUntilFinishedAgain = false;
for (Error err : errors) { for (Error err : errors) {
String oldNodeUrl = err.req.node.getUrl(); String oldNodeUrl = err.req.node.getUrl();
@ -108,8 +108,7 @@ public class SolrCmdDistributor {
log.warn(null, e); log.warn(null, e);
} }
submit(err.req); resubmitList.add(err);
blockUntilFinishedAgain = true;
} else { } else {
allErrors.add(err); allErrors.add(err);
} }
@ -120,8 +119,11 @@ public class SolrCmdDistributor {
servers.clearErrors(); servers.clearErrors();
this.errors.clear(); this.errors.clear();
for (Error err : resubmitList) {
submit(err.req);
}
if (blockUntilFinishedAgain) { if (resubmitList.size() > 0) {
servers.blockUntilFinished(); servers.blockUntilFinished();
doRetriesIfNeeded(); doRetriesIfNeeded();
} }

View File

@ -20,8 +20,10 @@ package org.apache.solr.update;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
@ -33,6 +35,8 @@ import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.update.SolrCmdDistributor.Error; import org.apache.solr.update.SolrCmdDistributor.Error;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -70,7 +74,7 @@ public class StreamingSolrServers {
String url = getFullUrl(req.node.getUrl()); String url = getFullUrl(req.node.getUrl());
ConcurrentUpdateSolrServer server = solrServers.get(url); ConcurrentUpdateSolrServer server = solrServers.get(url);
if (server == null) { if (server == null) {
server = new ConcurrentUpdateSolrServer(url, httpClient, 100, 1, updateExecutor) { server = new ConcurrentUpdateSolrServer(url, httpClient, 100, 1, updateExecutor, true) {
@Override @Override
public void handleError(Throwable ex) { public void handleError(Throwable ex) {
log.error("error", ex); log.error("error", ex);
@ -86,6 +90,10 @@ public class StreamingSolrServers {
server.setParser(new BinaryResponseParser()); server.setParser(new BinaryResponseParser());
server.setRequestWriter(new BinaryRequestWriter()); server.setRequestWriter(new BinaryRequestWriter());
server.setPollQueueTime(0); server.setPollQueueTime(0);
Set<String> queryParams = new HashSet<String>(2);
queryParams.add(DistributedUpdateProcessor.DISTRIB_FROM);
queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);
server.setQueryParams(queryParams);
solrServers.put(url, server); solrServers.put(url, server);
} }

View File

@ -22,6 +22,7 @@ import java.io.OutputStream;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Locale; import java.util.Locale;
import java.util.Queue; import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -80,6 +81,7 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
final int threadCount; final int threadCount;
boolean shutdownExecutor = false; boolean shutdownExecutor = false;
int pollQueueTime = 250; int pollQueueTime = 250;
private final boolean streamDeletes;
/** /**
* Uses an internally managed HttpClient instance. * Uses an internally managed HttpClient instance.
@ -109,12 +111,33 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
*/ */
public ConcurrentUpdateSolrServer(String solrServerUrl, public ConcurrentUpdateSolrServer(String solrServerUrl,
HttpClient client, int queueSize, int threadCount, ExecutorService es) { HttpClient client, int queueSize, int threadCount, ExecutorService es) {
this(solrServerUrl, client, queueSize, threadCount, es, false);
}
/**
* Uses the supplied HttpClient to send documents to the Solr server.
*/
public ConcurrentUpdateSolrServer(String solrServerUrl,
HttpClient client, int queueSize, int threadCount, ExecutorService es, boolean streamDeletes) {
this.server = new HttpSolrServer(solrServerUrl, client); this.server = new HttpSolrServer(solrServerUrl, client);
this.server.setFollowRedirects(false); this.server.setFollowRedirects(false);
queue = new LinkedBlockingQueue<UpdateRequest>(queueSize); queue = new LinkedBlockingQueue<UpdateRequest>(queueSize);
this.threadCount = threadCount; this.threadCount = threadCount;
runners = new LinkedList<Runner>(); runners = new LinkedList<Runner>();
scheduler = es; scheduler = es;
this.streamDeletes = streamDeletes;
}
public Set<String> getQueryParams() {
return this.server.getQueryParams();
}
/**
* Expert Method.
* @param queryParams set of param keys to only send via the query string
*/
public void setQueryParams(Set<String> queryParams) {
this.server.setQueryParams(queryParams);
} }
/** /**
@ -261,10 +284,22 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
UpdateRequest req = (UpdateRequest) request; UpdateRequest req = (UpdateRequest) request;
// this happens for commit... // this happens for commit...
if (req.getDocuments() == null || req.getDocuments().isEmpty()) { if (streamDeletes) {
if ((req.getDocuments() == null || req.getDocuments().isEmpty())
&& (req.getDeleteById() == null || req.getDeleteById().isEmpty())
&& (req.getDeleteByIdMap() == null || req.getDeleteByIdMap().isEmpty())) {
blockUntilFinished();
if (req.getDeleteQuery() == null) {
return server.request(request);
}
}
} else {
if ((req.getDocuments() == null || req.getDocuments().isEmpty())) {
blockUntilFinished(); blockUntilFinished();
return server.request(request); return server.request(request);
} }
}
SolrParams params = req.getParams(); SolrParams params = req.getParams();
if (params != null) { if (params != null) {

View File

@ -22,9 +22,11 @@ import java.net.ConnectException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.http.Header; import org.apache.http.Header;
@ -71,6 +73,7 @@ public class HttpSolrServer extends SolrServer {
private static final String UTF_8 = "UTF-8"; private static final String UTF_8 = "UTF-8";
private static final String DEFAULT_PATH = "/select"; private static final String DEFAULT_PATH = "/select";
private static final long serialVersionUID = -946812319974801896L; private static final long serialVersionUID = -946812319974801896L;
/** /**
* User-Agent String. * User-Agent String.
*/ */
@ -117,6 +120,7 @@ public class HttpSolrServer extends SolrServer {
private boolean useMultiPartPost; private boolean useMultiPartPost;
private final boolean internalClient; private final boolean internalClient;
private Set<String> queryParams = Collections.emptySet();
/** /**
* @param baseURL * @param baseURL
@ -158,6 +162,18 @@ public class HttpSolrServer extends SolrServer {
this.parser = parser; this.parser = parser;
} }
public Set<String> getQueryParams() {
return queryParams;
}
/**
* Expert Method.
* @param queryParams set of param keys to only send via the query string
*/
public void setQueryParams(Set<String> queryParams) {
this.queryParams = queryParams;
}
/** /**
* Process the request. If * Process the request. If
* {@link org.apache.solr.client.solrj.SolrRequest#getResponseParser()} is * {@link org.apache.solr.client.solrj.SolrRequest#getResponseParser()} is
@ -207,7 +223,6 @@ public class HttpSolrServer extends SolrServer {
if (invariantParams != null) { if (invariantParams != null) {
wparams.add(invariantParams); wparams.add(invariantParams);
} }
params = wparams;
int tries = maxRetries + 1; int tries = maxRetries + 1;
try { try {
@ -221,7 +236,7 @@ public class HttpSolrServer extends SolrServer {
if( streams != null ) { if( streams != null ) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!" ); throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!" );
} }
method = new HttpGet( baseUrl + path + ClientUtils.toQueryString( params, false ) ); method = new HttpGet( baseUrl + path + ClientUtils.toQueryString( wparams, false ) );
} }
else if( SolrRequest.METHOD.POST == request.getMethod() ) { else if( SolrRequest.METHOD.POST == request.getMethod() ) {
@ -237,9 +252,21 @@ public class HttpSolrServer extends SolrServer {
} }
boolean isMultipart = (this.useMultiPartPost || ( streams != null && streams.size() > 1 )) && !hasNullStreamName; boolean isMultipart = (this.useMultiPartPost || ( streams != null && streams.size() > 1 )) && !hasNullStreamName;
// only send this list of params as query string params
ModifiableSolrParams queryParams = new ModifiableSolrParams();
for (String param : this.queryParams) {
String[] value = wparams.getParams(param) ;
if (value != null) {
for (String v : value) {
queryParams.add(param, v);
}
wparams.remove(param);
}
}
LinkedList<NameValuePair> postParams = new LinkedList<NameValuePair>(); LinkedList<NameValuePair> postParams = new LinkedList<NameValuePair>();
if (streams == null || isMultipart) { if (streams == null || isMultipart) {
HttpPost post = new HttpPost(url); HttpPost post = new HttpPost(url + ClientUtils.toQueryString( queryParams, false ));
post.setHeader("Content-Charset", "UTF-8"); post.setHeader("Content-Charset", "UTF-8");
if (!isMultipart) { if (!isMultipart) {
post.addHeader("Content-Type", post.addHeader("Content-Type",
@ -247,10 +274,10 @@ public class HttpSolrServer extends SolrServer {
} }
List<FormBodyPart> parts = new LinkedList<FormBodyPart>(); List<FormBodyPart> parts = new LinkedList<FormBodyPart>();
Iterator<String> iter = params.getParameterNamesIterator(); Iterator<String> iter = wparams.getParameterNamesIterator();
while (iter.hasNext()) { while (iter.hasNext()) {
String p = iter.next(); String p = iter.next();
String[] vals = params.getParams(p); String[] vals = wparams.getParams(p);
if (vals != null) { if (vals != null) {
for (String v : vals) { for (String v : vals) {
if (isMultipart) { if (isMultipart) {
@ -295,7 +322,7 @@ public class HttpSolrServer extends SolrServer {
} }
// It is has one stream, it is the post body, put the params in the URL // It is has one stream, it is the post body, put the params in the URL
else { else {
String pstr = ClientUtils.toQueryString(params, false); String pstr = ClientUtils.toQueryString(wparams, false);
HttpPost post = new HttpPost(url + pstr); HttpPost post = new HttpPost(url + pstr);
// Single stream as body // Single stream as body

View File

@ -97,6 +97,8 @@ public class LBHttpSolrServer extends SolrServer {
private volatile ResponseParser parser; private volatile ResponseParser parser;
private volatile RequestWriter requestWriter; private volatile RequestWriter requestWriter;
private Set<String> queryParams;
static { static {
solrQuery.setRows(0); solrQuery.setRows(0);
} }
@ -214,6 +216,18 @@ public class LBHttpSolrServer extends SolrServer {
updateAliveList(); updateAliveList();
} }
public Set<String> getQueryParams() {
return queryParams;
}
/**
* Expert Method.
* @param queryParams set of param keys to only send via the query string
*/
public void setQueryParams(Set<String> queryParams) {
this.queryParams = queryParams;
}
public static String normalize(String server) { public static String normalize(String server) {
if (server.endsWith("/")) if (server.endsWith("/"))
server = server.substring(0, server.length() - 1); server = server.substring(0, server.length() - 1);
@ -225,6 +239,9 @@ public class LBHttpSolrServer extends SolrServer {
if (requestWriter != null) { if (requestWriter != null) {
s.setRequestWriter(requestWriter); s.setRequestWriter(requestWriter);
} }
if (queryParams != null) {
s.setQueryParams(queryParams);
}
return s; return s;
} }