diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index f0779b2f309..478f9b5f99c 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -176,7 +176,7 @@ New Features * SOLR-2548: Allow multiple threads to be specified for faceting. When threading, one can specify facet.threads to parallelize loading the uninverted fields. In at least one extreme case this reduced warmup time from 20 seconds to 3 seconds. (Janne Majaranta, - Gun Akkor via Erick Erickson) + Gun Akkor via Erick Erickson, David Smiley) * SOLR-4816: CloudSolrServer can now route updates locally and no longer relies on inter-node update forwarding. (Joel Bernstein, Shikhar Bhushan, Mark Miller) diff --git a/solr/core/src/java/org/apache/solr/request/SimpleFacets.java b/solr/core/src/java/org/apache/solr/request/SimpleFacets.java index dd2f8712310..f1d2965eafa 100644 --- a/solr/core/src/java/org/apache/solr/request/SimpleFacets.java +++ b/solr/core/src/java/org/apache/solr/request/SimpleFacets.java @@ -17,26 +17,6 @@ package org.apache.solr.request; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.EnumSet; -import java.util.IdentityHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - import org.apache.lucene.index.AtomicReader; import org.apache.lucene.index.DocsEnum; import org.apache.lucene.index.Fields; @@ -100,6 +80,26 @@ import org.apache.solr.util.DateMathParser; import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.solr.util.LongPriorityQueue; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.EnumSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + /** * A class that generates simple Facet information for a request. * @@ -525,6 +525,7 @@ public class SimpleFacets { * @see #getFieldMissingCount * @see #getFacetTermEnumCounts */ + @SuppressWarnings("unchecked") public NamedList getFacetFieldCounts() throws IOException, SyntaxError { @@ -534,69 +535,67 @@ public class SimpleFacets { return res; } + // Passing a negative number for FACET_THREADS implies an unlimited number of threads is acceptable. + // Also, a subtlety of directExecutor is that no matter how many times you "submit" a job, it's really + // just a method call in that it's run by the calling thread. int maxThreads = req.getParams().getInt(FacetParams.FACET_THREADS, 0); Executor executor = maxThreads == 0 ? directExecutor : facetExecutor; + final Semaphore semaphore = new Semaphore((maxThreads <= 0) ? Integer.MAX_VALUE : maxThreads); + List> futures = new ArrayList>(facetFs.length); - // passing a negative number for FACET_THREADS implies an unlimited number of threads is acceptable. - // Also, a subtlety of directeExecutor is that no matter how many times you "submit" a job, it's really - // just a method call in that it's run by this thread. - maxThreads = (maxThreads <= 0) ? Integer.MAX_VALUE : maxThreads; - CompletionService completionService = new ExecutorCompletionService(executor); - LinkedList pending = new LinkedList(); - for (String f : facetFs) { - parseParams(FacetParams.FACET_FIELD, f); - final String termList = localParams == null ? null : localParams.get(CommonParams.TERMS); - final String workerKey = key; - final String workerFacetValue = facetValue; - final DocSet workerBase = this.docs; - Callable worker = new Callable() { - @Override - public Object call() throws Exception { - NamedList result = new SimpleOrderedMap(); - try { - if(termList != null) { - result.add(workerKey, getListedTermCounts(workerFacetValue, termList, workerBase)); - } else { - result.add(workerKey, getTermCounts(workerFacetValue, workerBase)); + try { + //Loop over fields; submit to executor, keeping the future + for (String f : facetFs) { + parseParams(FacetParams.FACET_FIELD, f); + final String termList = localParams == null ? null : localParams.get(CommonParams.TERMS); + final String workerKey = key; + final String workerFacetValue = facetValue; + final DocSet workerBase = this.docs; + Callable callable = new Callable() { + @Override + public NamedList call() throws Exception { + try { + NamedList result = new SimpleOrderedMap(); + if(termList != null) { + result.add(workerKey, getListedTermCounts(workerFacetValue, termList, workerBase)); + } else { + result.add(workerKey, getTermCounts(workerFacetValue, workerBase)); + } + return result; + } catch (SolrException se) { + throw se; + } catch (Exception e) { + throw new SolrException(ErrorCode.SERVER_ERROR, + "Exception during facet.field: " + workerFacetValue, e.getCause()); + } finally { + semaphore.release(); } - } catch (SolrException se) { - throw se; - } catch (Exception e){ - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - "Exception during facet.field: " + workerFacetValue, e.getCause()); } - return result; - } - }; - if (--maxThreads >= 0) { - completionService.submit(worker); - } else { - pending.add(worker); + }; + + RunnableFuture runnableFuture = new FutureTask(callable); + semaphore.acquire();//may block and/or interrupt + executor.execute(runnableFuture);//releases semaphore when done + futures.add(runnableFuture); + }//facetFs loop + + //Loop over futures to get the values. The order is the same as facetFs but shouldn't matter. + for (Future future : futures) { + res.addAll(future.get()); } - } - for (String f : facetFs) { - NamedList taskResult; - try { - Future future = completionService.take(); - taskResult = (NamedList)future.get(); - if (taskResult != null) { - res.addAll(taskResult); - } - if (pending.isEmpty() == false) { - completionService.submit(pending.removeFirst()); - } - } catch (InterruptedException e) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - "Processing of facet fields InterruptedException", e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof SolrException) { - throw (SolrException) cause; - } - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - "Processing of facet fields ExecutionException ", e); + assert semaphore.availablePermits() >= maxThreads; + } catch (InterruptedException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Error while processing facet fields: InterruptedException", e); + } catch (ExecutionException ee) { + Throwable e = ee.getCause();//unwrap + if (e instanceof RuntimeException) { + throw (RuntimeException) e; } + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Error while processing facet fields: " + e.toString(), e); } + return res; } @@ -1205,7 +1204,7 @@ public class SimpleFacets { } private > NamedList getFacetRangeCounts - (final SchemaField sf, + (final SchemaField sf, final RangeEndpointCalculator calc) throws IOException { final String f = sf.getName(); @@ -1338,7 +1337,7 @@ public class SimpleFacets { */ protected int rangeCount(SchemaField sf, String low, String high, boolean iLow, boolean iHigh) throws IOException { - Query rangeQ = sf.getType().getRangeQuery(null, sf,low,high,iLow,iHigh); + Query rangeQ = sf.getType().getRangeQuery(null, sf, low, high, iLow, iHigh); if (params.getBool(GroupParams.GROUP_FACET, false)) { return getGroupedFacetQueryCount(rangeQ); } else { @@ -1352,8 +1351,8 @@ public class SimpleFacets { @Deprecated protected int rangeCount(SchemaField sf, Date low, Date high, boolean iLow, boolean iHigh) throws IOException { - Query rangeQ = ((DateField)(sf.getType())).getRangeQuery(null, sf,low,high,iLow,iHigh); - return searcher.numDocs(rangeQ , docs); + Query rangeQ = ((DateField)(sf.getType())).getRangeQuery(null, sf, low, high, iLow, iHigh); + return searcher.numDocs(rangeQ, docs); } /**