SOLR-2548: Simplified multi-threading of facet.threads logic

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1523677 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
David Wayne Smiley 2013-09-16 14:34:03 +00:00
parent ac38faf3da
commit 61dd81d787
2 changed files with 79 additions and 80 deletions

View File

@ -176,7 +176,7 @@ New Features
* SOLR-2548: Allow multiple threads to be specified for faceting. When threading, one
can specify facet.threads to parallelize loading the uninverted fields. In at least
one extreme case this reduced warmup time from 20 seconds to 3 seconds. (Janne Majaranta,
Gun Akkor via Erick Erickson)
Gun Akkor via Erick Erickson, David Smiley)
* SOLR-4816: CloudSolrServer can now route updates locally and no longer relies on inter-node
update forwarding. (Joel Bernstein, Shikhar Bhushan, Mark Miller)

View File

@ -17,26 +17,6 @@
package org.apache.solr.request;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.EnumSet;
import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.DocsEnum;
import org.apache.lucene.index.Fields;
@ -100,6 +80,26 @@ import org.apache.solr.util.DateMathParser;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.LongPriorityQueue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.EnumSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* A class that generates simple Facet information for a request.
*
@ -525,6 +525,7 @@ public class SimpleFacets {
* @see #getFieldMissingCount
* @see #getFacetTermEnumCounts
*/
@SuppressWarnings("unchecked")
public NamedList<Object> getFacetFieldCounts()
throws IOException, SyntaxError {
@ -534,69 +535,67 @@ public class SimpleFacets {
return res;
}
// Passing a negative number for FACET_THREADS implies an unlimited number of threads is acceptable.
// Also, a subtlety of directExecutor is that no matter how many times you "submit" a job, it's really
// just a method call in that it's run by the calling thread.
int maxThreads = req.getParams().getInt(FacetParams.FACET_THREADS, 0);
Executor executor = maxThreads == 0 ? directExecutor : facetExecutor;
final Semaphore semaphore = new Semaphore((maxThreads <= 0) ? Integer.MAX_VALUE : maxThreads);
List<Future<NamedList>> futures = new ArrayList<Future<NamedList>>(facetFs.length);
// passing a negative number for FACET_THREADS implies an unlimited number of threads is acceptable.
// Also, a subtlety of directeExecutor is that no matter how many times you "submit" a job, it's really
// just a method call in that it's run by this thread.
maxThreads = (maxThreads <= 0) ? Integer.MAX_VALUE : maxThreads;
CompletionService completionService = new ExecutorCompletionService(executor);
LinkedList<Callable> pending = new LinkedList<Callable>();
for (String f : facetFs) {
parseParams(FacetParams.FACET_FIELD, f);
final String termList = localParams == null ? null : localParams.get(CommonParams.TERMS);
final String workerKey = key;
final String workerFacetValue = facetValue;
final DocSet workerBase = this.docs;
Callable worker = new Callable() {
@Override
public Object call() throws Exception {
NamedList<Object> result = new SimpleOrderedMap<Object>();
try {
if(termList != null) {
result.add(workerKey, getListedTermCounts(workerFacetValue, termList, workerBase));
} else {
result.add(workerKey, getTermCounts(workerFacetValue, workerBase));
try {
//Loop over fields; submit to executor, keeping the future
for (String f : facetFs) {
parseParams(FacetParams.FACET_FIELD, f);
final String termList = localParams == null ? null : localParams.get(CommonParams.TERMS);
final String workerKey = key;
final String workerFacetValue = facetValue;
final DocSet workerBase = this.docs;
Callable<NamedList> callable = new Callable<NamedList>() {
@Override
public NamedList call() throws Exception {
try {
NamedList<Object> result = new SimpleOrderedMap<Object>();
if(termList != null) {
result.add(workerKey, getListedTermCounts(workerFacetValue, termList, workerBase));
} else {
result.add(workerKey, getTermCounts(workerFacetValue, workerBase));
}
return result;
} catch (SolrException se) {
throw se;
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Exception during facet.field: " + workerFacetValue, e.getCause());
} finally {
semaphore.release();
}
} catch (SolrException se) {
throw se;
} catch (Exception e){
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Exception during facet.field: " + workerFacetValue, e.getCause());
}
return result;
}
};
if (--maxThreads >= 0) {
completionService.submit(worker);
} else {
pending.add(worker);
};
RunnableFuture<NamedList> runnableFuture = new FutureTask<NamedList>(callable);
semaphore.acquire();//may block and/or interrupt
executor.execute(runnableFuture);//releases semaphore when done
futures.add(runnableFuture);
}//facetFs loop
//Loop over futures to get the values. The order is the same as facetFs but shouldn't matter.
for (Future<NamedList> future : futures) {
res.addAll(future.get());
}
}
for (String f : facetFs) {
NamedList taskResult;
try {
Future future = completionService.take();
taskResult = (NamedList)future.get();
if (taskResult != null) {
res.addAll(taskResult);
}
if (pending.isEmpty() == false) {
completionService.submit(pending.removeFirst());
}
} catch (InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Processing of facet fields InterruptedException", e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof SolrException) {
throw (SolrException) cause;
}
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Processing of facet fields ExecutionException ", e);
assert semaphore.availablePermits() >= maxThreads;
} catch (InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Error while processing facet fields: InterruptedException", e);
} catch (ExecutionException ee) {
Throwable e = ee.getCause();//unwrap
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Error while processing facet fields: " + e.toString(), e);
}
return res;
}
@ -1205,7 +1204,7 @@ public class SimpleFacets {
}
private <T extends Comparable<T>> NamedList getFacetRangeCounts
(final SchemaField sf,
(final SchemaField sf,
final RangeEndpointCalculator<T> calc) throws IOException {
final String f = sf.getName();
@ -1338,7 +1337,7 @@ public class SimpleFacets {
*/
protected int rangeCount(SchemaField sf, String low, String high,
boolean iLow, boolean iHigh) throws IOException {
Query rangeQ = sf.getType().getRangeQuery(null, sf,low,high,iLow,iHigh);
Query rangeQ = sf.getType().getRangeQuery(null, sf, low, high, iLow, iHigh);
if (params.getBool(GroupParams.GROUP_FACET, false)) {
return getGroupedFacetQueryCount(rangeQ);
} else {
@ -1352,8 +1351,8 @@ public class SimpleFacets {
@Deprecated
protected int rangeCount(SchemaField sf, Date low, Date high,
boolean iLow, boolean iHigh) throws IOException {
Query rangeQ = ((DateField)(sf.getType())).getRangeQuery(null, sf,low,high,iLow,iHigh);
return searcher.numDocs(rangeQ , docs);
Query rangeQ = ((DateField)(sf.getType())).getRangeQuery(null, sf, low, high, iLow, iHigh);
return searcher.numDocs(rangeQ, docs);
}
/**