SOLR-4063: Allow CoreContainer to load multiple SolrCores in parallel rather than just serially.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1408088 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-11-11 19:18:03 +00:00
parent a53ecb1d09
commit e92475e10e
3 changed files with 216 additions and 115 deletions

View File

@ -77,6 +77,9 @@ Optimizations
* SOLR-3941: The "commitOnLeader" part of distributed recovery can use
openSearcher=false. (Tomas Fernandez Lobbe via Mark Miller)
* SOLR-4063: Allow CoreContainer to load multiple SolrCores in parallel rather
than just serially. (Mark Miller)
Bug Fixes
----------------------

View File

@ -28,12 +28,23 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.xml.transform.Transformer;
@ -55,6 +66,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.core.SolrXMLSerializer.SolrCoreXMLDef;
import org.apache.solr.core.SolrXMLSerializer.SolrXMLDef;
import org.apache.solr.handler.admin.CollectionsHandler;
@ -66,7 +78,9 @@ import org.apache.solr.logging.LogWatcher;
import org.apache.solr.logging.jul.JulWatcher;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.update.SolrCoreState;
import org.apache.solr.util.AdjustableSemaphore;
import org.apache.solr.util.DOMUtil;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.SystemIdResolver;
import org.apache.zookeeper.KeeperException;
@ -86,6 +100,7 @@ import org.xml.sax.InputSource;
public class CoreContainer
{
private static final String LEADER_VOTE_WAIT = "180000"; // 3 minutes
private static final int CORE_LOAD_THREADS = 3;
private static final String DEFAULT_HOST_CONTEXT = "solr";
private static final String DEFAULT_HOST_PORT = "8983";
private static final int DEFAULT_ZK_CLIENT_TIMEOUT = 15000;
@ -145,6 +160,7 @@ public class CoreContainer
private Map<SolrCore,String> coreToOrigName = new ConcurrentHashMap<SolrCore,String>();
private String leaderVoteWait;
protected int swappableCacheSize = Integer.MAX_VALUE; // Use as a flag too, if swappableCacheSize set in solr.xml this will be changed
private int coreLoadThreads;
{
log.info("New CoreContainer " + System.identityHashCode(this));
@ -382,13 +398,13 @@ public class CoreContainer
* @param cfgis the configuration file InputStream
*/
public void load(String dir, InputSource cfgis) {
ThreadPoolExecutor coreLoadExecutor = null;
if (null == dir) {
// don't rely on SolrResourceLoader(), determine explicitly first
dir = SolrResourceLoader.locateSolrHome();
}
log.info("Loading CoreContainer using Solr Home: '{}'", dir);
this.loader = new SolrResourceLoader(dir);
solrHome = loader.getInstanceDir();
@ -401,89 +417,97 @@ public class CoreContainer
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "", e);
}
// Since the cores var is now initialized to null, let's set it up right now.
// Since the cores var is now initialized to null, let's set it up right
// now.
cfg.substituteProperties();
allocateLazyCores(cfg);
// Initialize Logging
if(cfg.getBool("solr/logging/@enabled",true)) {
if (cfg.getBool("solr/logging/@enabled", true)) {
String slf4jImpl = null;
String fname = cfg.get("solr/logging/watcher/@class", null);
try {
slf4jImpl = StaticLoggerBinder.getSingleton().getLoggerFactoryClassStr();
if(fname==null) {
if( slf4jImpl.indexOf("Log4j") > 0) {
log.warn("Log watching is not yet implemented for log4j" );
}
else if( slf4jImpl.indexOf("JDK") > 0) {
slf4jImpl = StaticLoggerBinder.getSingleton()
.getLoggerFactoryClassStr();
if (fname == null) {
if (slf4jImpl.indexOf("Log4j") > 0) {
log.warn("Log watching is not yet implemented for log4j");
} else if (slf4jImpl.indexOf("JDK") > 0) {
fname = "JUL";
}
}
}
catch(Throwable ex) {
log.warn("Unable to read SLF4J version. LogWatcher will be disabled: "+ex);
} catch (Throwable ex) {
log.warn("Unable to read SLF4J version. LogWatcher will be disabled: "
+ ex);
}
// Now load the framework
if(fname!=null) {
if("JUL".equalsIgnoreCase(fname)) {
if (fname != null) {
if ("JUL".equalsIgnoreCase(fname)) {
logging = new JulWatcher(slf4jImpl);
}
else {
} else {
try {
logging = loader.newInstance(fname, LogWatcher.class);
}
catch (Throwable e) {
} catch (Throwable e) {
log.warn("Unable to load LogWatcher", e);
}
}
if( logging != null ) {
if (logging != null) {
ListenerConfig v = new ListenerConfig();
v.size = cfg.getInt("solr/logging/watcher/@size",50);
v.threshold = cfg.get("solr/logging/watcher/@threshold",null);
if(v.size>0) {
v.size = cfg.getInt("solr/logging/watcher/@size", 50);
v.threshold = cfg.get("solr/logging/watcher/@threshold", null);
if (v.size > 0) {
log.info("Registering Log Listener");
logging.registerListener(v, this);
}
}
}
}
String dcoreName = cfg.get("solr/cores/@defaultCoreName", null);
if(dcoreName != null && !dcoreName.isEmpty()) {
if (dcoreName != null && !dcoreName.isEmpty()) {
defaultCoreName = dcoreName;
}
persistent = cfg.getBool("solr/@persistent", false);
libDir = cfg.get("solr/@sharedLib", null);
zkHost = cfg.get("solr/@zkHost" , null);
zkHost = cfg.get("solr/@zkHost", null);
coreLoadThreads = cfg.getInt("solr/@coreLoadThreads", CORE_LOAD_THREADS);
adminPath = cfg.get("solr/cores/@adminPath", null);
shareSchema = cfg.getBool("solr/cores/@shareSchema", DEFAULT_SHARE_SCHEMA);
zkClientTimeout = cfg.getInt("solr/cores/@zkClientTimeout", DEFAULT_ZK_CLIENT_TIMEOUT);
zkClientTimeout = cfg.getInt("solr/cores/@zkClientTimeout",
DEFAULT_ZK_CLIENT_TIMEOUT);
hostPort = cfg.get("solr/cores/@hostPort", DEFAULT_HOST_PORT);
hostContext = cfg.get("solr/cores/@hostContext", DEFAULT_HOST_CONTEXT);
host = cfg.get("solr/cores/@host", null);
leaderVoteWait = cfg.get("solr/cores/@leaderVoteWait", LEADER_VOTE_WAIT);
if(shareSchema){
indexSchemaCache = new ConcurrentHashMap<String ,IndexSchema>();
}
adminHandler = cfg.get("solr/cores/@adminHandler", null );
managementPath = cfg.get("solr/cores/@managementPath", null );
zkClientTimeout = Integer.parseInt(System.getProperty("zkClientTimeout", Integer.toString(zkClientTimeout)));
if (shareSchema) {
indexSchemaCache = new ConcurrentHashMap<String,IndexSchema>();
}
adminHandler = cfg.get("solr/cores/@adminHandler", null);
managementPath = cfg.get("solr/cores/@managementPath", null);
zkClientTimeout = Integer.parseInt(System.getProperty("zkClientTimeout",
Integer.toString(zkClientTimeout)));
initZooKeeper(zkHost, zkClientTimeout);
if (isZooKeeperAware() && coreLoadThreads <= 1) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"SolrCloud requires a value of at least 2 in solr.xml for coreLoadThreads");
}
if (libDir != null) {
File f = FileUtils.resolvePath(new File(dir), libDir);
log.info( "loading shared library: "+f.getAbsolutePath() );
log.info("loading shared library: " + f.getAbsolutePath());
libLoader = SolrResourceLoader.createClassLoader(f, null);
}
if (adminPath != null) {
if (adminHandler == null) {
coreAdminHandler = new CoreAdminHandler(this);
@ -491,91 +515,165 @@ public class CoreContainer
coreAdminHandler = this.createMultiCoreHandler(adminHandler);
}
}
collectionsHandler = new CollectionsHandler(this);
try {
containerProperties = readProperties(cfg, ((NodeList) cfg.evaluate(DEFAULT_HOST_CONTEXT, XPathConstants.NODESET)).item(0));
containerProperties = readProperties(cfg, ((NodeList) cfg.evaluate(
DEFAULT_HOST_CONTEXT, XPathConstants.NODESET)).item(0));
} catch (Throwable e) {
SolrException.log(log,null,e);
SolrException.log(log, null, e);
}
NodeList nodes = (NodeList)cfg.evaluate("solr/cores/core", XPathConstants.NODESET);
for (int i=0; i<nodes.getLength(); i++) {
Node node = nodes.item(i);
SolrCore core = null;
try {
String rawName = DOMUtil.getAttr(node, CORE_NAME, null);
if (null == rawName) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Each core in solr.xml must have a 'name'");
}
String name = rawName;
CoreDescriptor p = new CoreDescriptor(this, name, DOMUtil.getAttr(node, CORE_INSTDIR, null));
// deal with optional settings
String opt = DOMUtil.getAttr(node, CORE_CONFIG, null);
if (opt != null) {
p.setConfigName(opt);
}
opt = DOMUtil.getAttr(node, CORE_SCHEMA, null);
if (opt != null) {
p.setSchemaName(opt);
}
if (zkController != null) {
opt = DOMUtil.getAttr(node, CORE_SHARD, null);
if (opt != null && opt.length() > 0) {
p.getCloudDescriptor().setShardId(opt);
NodeList nodes = (NodeList) cfg.evaluate("solr/cores/core",
XPathConstants.NODESET);
// setup executor to load cores in parallel
coreLoadExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("coreLoadExecutor"));
try {
// 4 threads at a time max
final AdjustableSemaphore semaphore = new AdjustableSemaphore(
coreLoadThreads);
CompletionService<SolrCore> completionService = new ExecutorCompletionService<SolrCore>(
coreLoadExecutor);
Set<Future<SolrCore>> pending = new HashSet<Future<SolrCore>>();
for (int i = 0; i < nodes.getLength(); i++) {
Node node = nodes.item(i);
SolrCore core = null;
try {
String rawName = DOMUtil.getAttr(node, CORE_NAME, null);
if (null == rawName) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Each core in solr.xml must have a 'name'");
}
opt = DOMUtil.getAttr(node, CORE_COLLECTION, null);
final String name = rawName;
final CoreDescriptor p = new CoreDescriptor(this, name,
DOMUtil.getAttr(node, CORE_INSTDIR, null));
// deal with optional settings
String opt = DOMUtil.getAttr(node, CORE_CONFIG, null);
if (opt != null) {
p.getCloudDescriptor().setCollectionName(opt);
p.setConfigName(opt);
}
opt = DOMUtil.getAttr(node, CORE_ROLES, null);
if(opt != null){
p.getCloudDescriptor().setRoles(opt);
opt = DOMUtil.getAttr(node, CORE_SCHEMA, null);
if (opt != null) {
p.setSchemaName(opt);
}
}
opt = DOMUtil.getAttr(node, CORE_PROPERTIES, null);
if (opt != null) {
p.setPropertiesName(opt);
}
opt = DOMUtil.getAttr(node, CORE_DATADIR, null);
if (opt != null) {
p.setDataDir(opt);
}
p.setCoreProperties(readProperties(cfg, node));
opt = DOMUtil.getAttr(node, CORE_LOADONSTARTUP, null);
if (opt != null) {
p.setLoadOnStartup(("true".equalsIgnoreCase(opt) || "on".equalsIgnoreCase(opt)) ? true : false);
}
opt = DOMUtil.getAttr(node, CORE_SWAPPABLE, null);
if (opt != null) {
p.setSwappable(("true".equalsIgnoreCase(opt) || "on".equalsIgnoreCase(opt)) ? true : false);
}
if (! p.isSwappable() && p.isLoadOnStartup()) { // Just like current case.
core = create(p);
register(name, core, false);
// track original names
coreToOrigName.put(core, rawName);
} else {
// Store it away for later use. includes non-swappable but not loaded at startup cores.
dynamicDescriptors.put(rawName, p);
if (zkController != null) {
opt = DOMUtil.getAttr(node, CORE_SHARD, null);
if (opt != null && opt.length() > 0) {
p.getCloudDescriptor().setShardId(opt);
}
opt = DOMUtil.getAttr(node, CORE_COLLECTION, null);
if (opt != null) {
p.getCloudDescriptor().setCollectionName(opt);
}
opt = DOMUtil.getAttr(node, CORE_ROLES, null);
if (opt != null) {
p.getCloudDescriptor().setRoles(opt);
}
}
opt = DOMUtil.getAttr(node, CORE_PROPERTIES, null);
if (opt != null) {
p.setPropertiesName(opt);
}
opt = DOMUtil.getAttr(node, CORE_DATADIR, null);
if (opt != null) {
p.setDataDir(opt);
}
p.setCoreProperties(readProperties(cfg, node));
opt = DOMUtil.getAttr(node, CORE_LOADONSTARTUP, null);
if (opt != null) {
p.setLoadOnStartup(("true".equalsIgnoreCase(opt) || "on"
.equalsIgnoreCase(opt)) ? true : false);
}
opt = DOMUtil.getAttr(node, CORE_SWAPPABLE, null);
if (opt != null) {
p.setSwappable(("true".equalsIgnoreCase(opt) || "on"
.equalsIgnoreCase(opt)) ? true : false);
}
if (!p.isSwappable() && p.isLoadOnStartup()) { // Just like current
// case.
Callable<SolrCore> task = new Callable<SolrCore>() {
public SolrCore call() {
SolrCore c = null;
try {
c = create(p);
register(name, c, false);
} catch (Throwable t) {
SolrException.log(log, null, t);
if (c != null) {
c.close();
}
}
semaphore.release();
return c;
}
};
try {
semaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR,
"Interrupted while loading SolrCore(s)", e);
}
try {
pending.add(completionService.submit(task));
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
} else {
// Store it away for later use. includes non-swappable but not
// loaded at startup cores.
dynamicDescriptors.put(rawName, p);
}
} catch (Throwable ex) {
SolrException.log(log, null, ex);
}
}
catch (Throwable ex) {
SolrException.log(log,null,ex);
if (core != null) {
core.close();
while (pending != null && pending.size() > 0) {
try {
Future<SolrCore> future = completionService.take();
if (future == null) return;
pending.remove(future);
try {
SolrCore c = future.get();
// track original names
if (c != null) {
coreToOrigName.put(c, c.getName());
}
} catch (ExecutionException e) {
// shouldn't happen since we catch exceptions ourselves
SolrException.log(SolrCore.log,
"error sending update request to shard", e);
}
} catch (InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
"interrupted waiting for shard update response", e);
}
}
} finally {
if (coreLoadExecutor != null) {
ExecutorUtil.shutdownNowAndAwaitTermination(coreLoadExecutor);
}
}
}
@ -1276,7 +1374,7 @@ public class CoreContainer
Integer.toString(DEFAULT_ZK_CLIENT_TIMEOUT));
addCoresAttrib(coresAttribs, "hostContext", this.hostContext, DEFAULT_HOST_CONTEXT);
addCoresAttrib(coresAttribs, "leaderVoteWait", this.leaderVoteWait, LEADER_VOTE_WAIT);
addCoresAttrib(coresAttribs, "coreLoadThreads", Integer.toString(this.coreLoadThreads), Integer.toString(CORE_LOAD_THREADS));
List<SolrCoreXMLDef> solrCoreXMLDefs = new ArrayList<SolrCoreXMLDef>();

View File

@ -198,7 +198,7 @@ public class TestSolrProperties extends AbstractEmbeddedSolrServerTestCase {
CoreAdminRequest.renameCore(name, "renamed_core", coreadmin);
mcr = CoreAdminRequest.persist(SOLR_PERSIST_XML, getRenamedSolrAdmin());
// fis = new FileInputStream(new File(solrXml.getParent(), SOLR_PERSIST_XML));
// fis = new FileInputStream(new File(tempDir, SOLR_PERSIST_XML));
// String solrPersistXml = IOUtils.toString(fis);
// System.out.println("xml:" + solrPersistXml);
// fis.close();