SOLR-4063: simplify core loading executor

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1423963 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2012-12-19 17:15:34 +00:00
parent 0cc46b3cae
commit a9356fc3ea
1 changed files with 16 additions and 32 deletions

View File

@ -41,8 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -460,6 +459,9 @@ public class CoreContainer
if (fname != null) { if (fname != null) {
if ("JUL".equalsIgnoreCase(fname)) { if ("JUL".equalsIgnoreCase(fname)) {
logging = new JulWatcher(slf4jImpl); logging = new JulWatcher(slf4jImpl);
// else if( "Log4j".equals(fname) ) {
// logging = new Log4jWatcher(slf4jImpl);
// }
} else { } else {
try { try {
logging = loader.newInstance(fname, LogWatcher.class); logging = loader.newInstance(fname, LogWatcher.class);
@ -546,13 +548,10 @@ public class CoreContainer
XPathConstants.NODESET); XPathConstants.NODESET);
// setup executor to load cores in parallel // setup executor to load cores in parallel
coreLoadExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, coreLoadExecutor = new ThreadPoolExecutor(coreLoadThreads, coreLoadThreads, 1,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new DefaultSolrThreadFactory("coreLoadExecutor")); new DefaultSolrThreadFactory("coreLoadExecutor"));
try { try {
// 4 threads at a time max
final AdjustableSemaphore semaphore = new AdjustableSemaphore(
coreLoadThreads);
CompletionService<SolrCore> completionService = new ExecutorCompletionService<SolrCore>( CompletionService<SolrCore> completionService = new ExecutorCompletionService<SolrCore>(
coreLoadExecutor); coreLoadExecutor);
@ -618,8 +617,8 @@ public class CoreContainer
.equalsIgnoreCase(opt)) ? true : false); .equalsIgnoreCase(opt)) ? true : false);
} }
if (p.isLoadOnStartup()) { // Just like current if (p.isLoadOnStartup()) { // The normal case
// case.
Callable<SolrCore> task = new Callable<SolrCore>() { Callable<SolrCore> task = new Callable<SolrCore>() {
@Override @Override
public SolrCore call() { public SolrCore call() {
@ -638,26 +637,13 @@ public class CoreContainer
c.close(); c.close();
} }
} }
semaphore.release();
return c; return c;
} }
}; };
try {
semaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR,
"Interrupted while loading SolrCore(s)", e);
}
try { pending.add(completionService.submit(task));
pending.add(completionService.submit(task));
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
} else { } else {
// Store it away for later use. includes non-swappable but not // Store it away for later use. includes non-swappable but not
@ -682,14 +668,12 @@ public class CoreContainer
coreToOrigName.put(c, c.getName()); coreToOrigName.put(c, c.getName());
} }
} catch (ExecutionException e) { } catch (ExecutionException e) {
// shouldn't happen since we catch exceptions ourselves SolrException.log(SolrCore.log, "error loading core", e);
SolrException.log(SolrCore.log,
"error sending update request to shard", e);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
"interrupted waiting for shard update response", e); "interrupted while loading core", e);
} }
} }
} finally { } finally {
@ -1275,7 +1259,7 @@ public class CoreContainer
synchronized (cores) { synchronized (cores) {
core = cores.get(name); core = cores.get(name);
if (core != null) { if (core != null) {
core.open(); core.open(); // increment the ref count while still synchronized
return core; return core;
} }
} }