diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 39c89762219..a8251665b8e 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -108,14 +108,20 @@ Bug Fixes * SOLR-5451: SyncStrategy closes it's http connection manager before the executor that uses it in it's close method. (Mark Miller) -* SOLR-5452: Do not attempt to proxy internal update requests. (Mark Miller) - * SOLR-5460: SolrDispatchFilter#sendError can get a SolrCore that it does not close. (Mark Miller) * SOLR-5461: Request proxying should only set con.setDoOutput(true) if the request is a post. (Mark Miller) +* SOLR-5465: SolrCmdDistributor retry logic has a concurrency race bug. + (Mark Miller) + +* SOLR-5464: ConcurrentSolrServer does not stream pure delete by id requests. + (Mark Miller) + +* SOLR-5452: Do not attempt to proxy internal update requests. (Mark Miller) + Other Changes --------------------- diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java index 41c1916ac37..3437245cd8c 100644 --- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java +++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java @@ -52,6 +52,7 @@ import org.apache.solr.response.QueryResponseWriter; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.servlet.cache.HttpCacheHeaderUtil; import org.apache.solr.servlet.cache.Method; +import org.apache.solr.update.processor.DistributedUpdateProcessor; import org.apache.solr.update.processor.DistributingUpdateProcessorFactory; import org.apache.solr.util.FastWriter; import org.slf4j.Logger; @@ -315,10 +316,12 @@ public class SolrDispatchFilter implements Filter if (core == null && idx > 0) { String coreUrl = getRemotCoreUrl(cores, corename, origCorename); // don't proxy for internal update requests - //solrReq = SolrRequestParsers.DEFAULT.parse(null,path, req); - //if (coreUrl != null && solrReq.getParams().get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) { - if (coreUrl != null) { - path = path.substring( idx ); + SolrParams queryParams = SolrRequestParsers.parseQueryString(req.getQueryString()); + if (coreUrl != null + && queryParams + .get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null + && queryParams.get(DistributedUpdateProcessor.DISTRIB_FROM) == null) { + path = path.substring(idx); remoteQuery(coreUrl + path, req, solrReq, resp); return; } else { diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index 0e4d9fbfaa9..f77a774ea39 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -69,8 +69,8 @@ public class SolrCmdDistributor { List errors = new ArrayList(this.errors); errors.addAll(servers.getErrors()); + List resubmitList = new ArrayList(); - boolean blockUntilFinishedAgain = false; for (Error err : errors) { String oldNodeUrl = err.req.node.getUrl(); @@ -108,8 +108,7 @@ public class SolrCmdDistributor { log.warn(null, e); } - submit(err.req); - blockUntilFinishedAgain = true; + resubmitList.add(err); } else { allErrors.add(err); } @@ -120,8 +119,11 @@ public class SolrCmdDistributor { servers.clearErrors(); this.errors.clear(); + for (Error err : resubmitList) { + submit(err.req); + } - if (blockUntilFinishedAgain) { + if (resubmitList.size() > 0) { servers.blockUntilFinished(); doRetriesIfNeeded(); } diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java index 3af7c328b8a..4d460ef4c0e 100644 --- a/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java +++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java @@ -20,8 +20,10 @@ package org.apache.solr.update; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import org.apache.http.client.HttpClient; @@ -33,6 +35,8 @@ import org.apache.solr.client.solrj.impl.HttpClientUtil; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.update.SolrCmdDistributor.Error; +import org.apache.solr.update.processor.DistributedUpdateProcessor; +import org.apache.solr.update.processor.DistributingUpdateProcessorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +74,7 @@ public class StreamingSolrServers { String url = getFullUrl(req.node.getUrl()); ConcurrentUpdateSolrServer server = solrServers.get(url); if (server == null) { - server = new ConcurrentUpdateSolrServer(url, httpClient, 100, 1, updateExecutor) { + server = new ConcurrentUpdateSolrServer(url, httpClient, 100, 1, updateExecutor, true) { @Override public void handleError(Throwable ex) { log.error("error", ex); @@ -86,6 +90,10 @@ public class StreamingSolrServers { server.setParser(new BinaryResponseParser()); server.setRequestWriter(new BinaryRequestWriter()); server.setPollQueueTime(0); + Set queryParams = new HashSet(2); + queryParams.add(DistributedUpdateProcessor.DISTRIB_FROM); + queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM); + server.setQueryParams(queryParams); solrServers.put(url, server); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java index 24e46566a26..34cf01e729b 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java @@ -22,6 +22,7 @@ import java.io.OutputStream; import java.util.LinkedList; import java.util.Locale; import java.util.Queue; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -80,6 +81,7 @@ public class ConcurrentUpdateSolrServer extends SolrServer { final int threadCount; boolean shutdownExecutor = false; int pollQueueTime = 250; + private final boolean streamDeletes; /** * Uses an internally managed HttpClient instance. @@ -109,14 +111,35 @@ public class ConcurrentUpdateSolrServer extends SolrServer { */ public ConcurrentUpdateSolrServer(String solrServerUrl, HttpClient client, int queueSize, int threadCount, ExecutorService es) { + this(solrServerUrl, client, queueSize, threadCount, es, false); + } + + /** + * Uses the supplied HttpClient to send documents to the Solr server. + */ + public ConcurrentUpdateSolrServer(String solrServerUrl, + HttpClient client, int queueSize, int threadCount, ExecutorService es, boolean streamDeletes) { this.server = new HttpSolrServer(solrServerUrl, client); this.server.setFollowRedirects(false); queue = new LinkedBlockingQueue(queueSize); this.threadCount = threadCount; runners = new LinkedList(); scheduler = es; + this.streamDeletes = streamDeletes; } + public Set getQueryParams() { + return this.server.getQueryParams(); + } + + /** + * Expert Method. + * @param queryParams set of param keys to only send via the query string + */ + public void setQueryParams(Set queryParams) { + this.server.setQueryParams(queryParams); + } + /** * Opens a connection and sends everything... */ @@ -261,11 +284,23 @@ public class ConcurrentUpdateSolrServer extends SolrServer { UpdateRequest req = (UpdateRequest) request; // this happens for commit... - if (req.getDocuments() == null || req.getDocuments().isEmpty()) { - blockUntilFinished(); - return server.request(request); + if (streamDeletes) { + if ((req.getDocuments() == null || req.getDocuments().isEmpty()) + && (req.getDeleteById() == null || req.getDeleteById().isEmpty()) + && (req.getDeleteByIdMap() == null || req.getDeleteByIdMap().isEmpty())) { + blockUntilFinished(); + if (req.getDeleteQuery() == null) { + return server.request(request); + } + } + } else { + if ((req.getDocuments() == null || req.getDocuments().isEmpty())) { + blockUntilFinished(); + return server.request(request); + } } + SolrParams params = req.getParams(); if (params != null) { // check if it is waiting for the searcher diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java index 7401bb82af1..ff4a5aa429a 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java @@ -22,9 +22,11 @@ import java.net.ConnectException; import java.net.SocketTimeoutException; import java.nio.charset.Charset; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Set; import org.apache.commons.io.IOUtils; import org.apache.http.Header; @@ -71,6 +73,7 @@ public class HttpSolrServer extends SolrServer { private static final String UTF_8 = "UTF-8"; private static final String DEFAULT_PATH = "/select"; private static final long serialVersionUID = -946812319974801896L; + /** * User-Agent String. */ @@ -117,7 +120,8 @@ public class HttpSolrServer extends SolrServer { private boolean useMultiPartPost; private final boolean internalClient; - + private Set queryParams = Collections.emptySet(); + /** * @param baseURL * The URL of the Solr server. For example, " @@ -158,6 +162,18 @@ public class HttpSolrServer extends SolrServer { this.parser = parser; } + public Set getQueryParams() { + return queryParams; + } + + /** + * Expert Method. + * @param queryParams set of param keys to only send via the query string + */ + public void setQueryParams(Set queryParams) { + this.queryParams = queryParams; + } + /** * Process the request. If * {@link org.apache.solr.client.solrj.SolrRequest#getResponseParser()} is @@ -207,7 +223,6 @@ public class HttpSolrServer extends SolrServer { if (invariantParams != null) { wparams.add(invariantParams); } - params = wparams; int tries = maxRetries + 1; try { @@ -221,7 +236,7 @@ public class HttpSolrServer extends SolrServer { if( streams != null ) { throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!" ); } - method = new HttpGet( baseUrl + path + ClientUtils.toQueryString( params, false ) ); + method = new HttpGet( baseUrl + path + ClientUtils.toQueryString( wparams, false ) ); } else if( SolrRequest.METHOD.POST == request.getMethod() ) { @@ -236,10 +251,22 @@ public class HttpSolrServer extends SolrServer { } } boolean isMultipart = (this.useMultiPartPost || ( streams != null && streams.size() > 1 )) && !hasNullStreamName; - + + // only send this list of params as query string params + ModifiableSolrParams queryParams = new ModifiableSolrParams(); + for (String param : this.queryParams) { + String[] value = wparams.getParams(param) ; + if (value != null) { + for (String v : value) { + queryParams.add(param, v); + } + wparams.remove(param); + } + } + LinkedList postParams = new LinkedList(); if (streams == null || isMultipart) { - HttpPost post = new HttpPost(url); + HttpPost post = new HttpPost(url + ClientUtils.toQueryString( queryParams, false )); post.setHeader("Content-Charset", "UTF-8"); if (!isMultipart) { post.addHeader("Content-Type", @@ -247,10 +274,10 @@ public class HttpSolrServer extends SolrServer { } List parts = new LinkedList(); - Iterator iter = params.getParameterNamesIterator(); + Iterator iter = wparams.getParameterNamesIterator(); while (iter.hasNext()) { String p = iter.next(); - String[] vals = params.getParams(p); + String[] vals = wparams.getParams(p); if (vals != null) { for (String v : vals) { if (isMultipart) { @@ -295,7 +322,7 @@ public class HttpSolrServer extends SolrServer { } // It is has one stream, it is the post body, put the params in the URL else { - String pstr = ClientUtils.toQueryString(params, false); + String pstr = ClientUtils.toQueryString(wparams, false); HttpPost post = new HttpPost(url + pstr); // Single stream as body diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java index c68662ee2f9..830b79e5dcb 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java @@ -97,6 +97,8 @@ public class LBHttpSolrServer extends SolrServer { private volatile ResponseParser parser; private volatile RequestWriter requestWriter; + private Set queryParams; + static { solrQuery.setRows(0); } @@ -213,6 +215,18 @@ public class LBHttpSolrServer extends SolrServer { } updateAliveList(); } + + public Set getQueryParams() { + return queryParams; + } + + /** + * Expert Method. + * @param queryParams set of param keys to only send via the query string + */ + public void setQueryParams(Set queryParams) { + this.queryParams = queryParams; + } public static String normalize(String server) { if (server.endsWith("/")) @@ -225,6 +239,9 @@ public class LBHttpSolrServer extends SolrServer { if (requestWriter != null) { s.setRequestWriter(requestWriter); } + if (queryParams != null) { + s.setQueryParams(queryParams); + } return s; }