SOLR-7408: Let SolrCore be the only thing which registers/unregisters a config directory listener

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1675274 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shai Erera 2015-04-22 08:25:26 +00:00
parent ede96a0dd1
commit 5afa59c004
7 changed files with 425 additions and 318 deletions

View File

@ -155,6 +155,9 @@ Bug Fixes
* SOLR-7440: DebugComponent does not return the right requestPurpose for pivot facet refinements. * SOLR-7440: DebugComponent does not return the right requestPurpose for pivot facet refinements.
(shalin) (shalin)
* SOLR-7408: Listeners set by SolrCores on config directories in ZK could be removed if collections
are created/deleted in paralle against the same config set. (Shai Erera, Anshum Gupta)
Optimizations Optimizations
---------------------- ----------------------

View File

@ -96,6 +96,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.MDC; import org.slf4j.MDC;
import com.google.common.base.Strings;
/** /**
* Handle ZooKeeper interactions. * Handle ZooKeeper interactions.
* <p> * <p>
@ -1209,14 +1211,13 @@ public final class ZkController {
return true; return true;
} }
public void unregister(String coreName, CoreDescriptor cd, String configLocation) public void unregister(String coreName, CoreDescriptor cd) throws InterruptedException, KeeperException {
throws InterruptedException, KeeperException {
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
final String collection = cd.getCloudDescriptor().getCollectionName(); final String collection = cd.getCloudDescriptor().getCollectionName();
assert collection != null;
if (collection == null || collection.trim().length() == 0) { if (Strings.isNullOrEmpty(collection)) {
log.error("No collection was specified."); log.error("No collection was specified.");
assert false : "No collection was specified [" + collection + "]";
return; return;
} }
@ -1225,38 +1226,29 @@ public final class ZkController {
if (context != null) { if (context != null) {
context.cancelElection(); context.cancelElection();
} }
final Collection<SolrCore> cores = cc.getCores();
// if there is no SolrCore which is a member of this collection, remove the watch
CloudDescriptor cloudDescriptor = cd.getCloudDescriptor(); CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
boolean removeWatch = true; boolean removeWatch = true;
// if there is no SolrCore which is a member of this collection, remove the watch for (SolrCore solrCore : cores) {
for (SolrCore solrCore : cc.getCores()) { final CloudDescriptor cloudDesc = solrCore.getCoreDescriptor().getCloudDescriptor();
if (((ZkSolrResourceLoader) solrCore.getResourceLoader()).getConfigSetZkPath().equals(configLocation)) if (cloudDesc != null && cloudDescriptor.getCollectionName().equals(cloudDesc.getCollectionName())) {
configLocation = null; //if a core uses this config dir , then set it to null
CloudDescriptor cloudDesc = solrCore.getCoreDescriptor()
.getCloudDescriptor();
if (cloudDesc != null
&& cloudDescriptor.getCollectionName().equals(
cloudDesc.getCollectionName())) {
removeWatch = false; removeWatch = false;
break; break;
} }
} }
if (removeWatch) zkStateReader.removeZKWatch(collection);
if (removeWatch) {
zkStateReader.removeZKWatch(collection);
}
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName, OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.NODE_NAME_PROP, getNodeName(), ZkStateReader.NODE_NAME_PROP, getNodeName(),
ZkStateReader.COLLECTION_PROP, cloudDescriptor.getCollectionName(), ZkStateReader.COLLECTION_PROP, cloudDescriptor.getCollectionName(),
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName); ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
overseerJobQueue.offer(ZkStateReader.toJSON(m)); overseerJobQueue.offer(ZkStateReader.toJSON(m));
if (configLocation != null) {
synchronized (confDirectoryListeners) {
log.info("This conf directory is no more watched {}", configLocation);
confDirectoryListeners.remove(configLocation);
}
}
} }
public void createCollection(String collection) throws KeeperException, public void createCollection(String collection) throws KeeperException,
@ -2254,18 +2246,19 @@ public final class ZkController {
} }
} }
public void unRegisterConfListener(Runnable listener) { private void unregisterConfListener(String confDir, Runnable listener) {
if (listener == null) return;
synchronized (confDirectoryListeners) { synchronized (confDirectoryListeners) {
for (Set<Runnable> listeners : confDirectoryListeners.values()) { final Set<Runnable> listeners = confDirectoryListeners.get(confDir);
if (listeners != null) { assert listeners != null : confDir + " has no more registered listeners, but a live one attempts to unregister!";
if (listeners.remove(listener)) { if (listeners.remove(listener)) {
log.info(" a listener was removed because of core close"); log.info("removed listener for config directory [{}]", confDir);
} }
} if (listeners.isEmpty()) {
// no more listeners for this confDir, remove it from the map
log.info("No more listeners for config directory [{}]", confDir);
confDirectoryListeners.remove(confDir);
} }
} }
} }
/** /**
@ -2274,37 +2267,41 @@ public final class ZkController {
* item of interest has been modified. When the last core which was interested in * item of interest has been modified. When the last core which was interested in
* this conf directory is gone the listeners will be removed automatically. * this conf directory is gone the listeners will be removed automatically.
*/ */
public void registerConfListenerForCore(String confDir, SolrCore core, final Runnable listener) { public void registerConfListenerForCore(final String confDir, SolrCore core, final Runnable listener) {
if (listener == null) throw new NullPointerException("listener cannot be null"); if (listener == null) {
synchronized (confDirectoryListeners) { throw new NullPointerException("listener cannot be null");
if (confDirectoryListeners.containsKey(confDir)) {
confDirectoryListeners.get(confDir).add(listener);
core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {
unRegisterConfListener(listener);
}
@Override
public void postClose(SolrCore core) {
}
});
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, "This conf directory is not valid "+ confDir);
}
} }
synchronized (confDirectoryListeners) {
final Set<Runnable> confDirListeners = getConfDirListeners(confDir);
confDirListeners.add(listener);
core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {
unregisterConfListener(confDir, listener);
}
@Override
public void postClose(SolrCore core) {
}
});
}
}
// this method is called in a protected confDirListeners block
private Set<Runnable> getConfDirListeners(final String confDir) {
assert Thread.holdsLock(confDirectoryListeners) : "confDirListeners lock not held by thread";
Set<Runnable> confDirListeners = confDirectoryListeners.get(confDir);
if (confDirListeners == null) {
log.info("watch zkdir {}" , confDir);
confDirListeners = new HashSet<>();
confDirectoryListeners.put(confDir, confDirListeners);
setConfWatcher(confDir, new WatcherImpl(confDir), null);
}
return confDirListeners;
} }
private final Map<String, Set<Runnable>> confDirectoryListeners = new HashMap<>(); private final Map<String, Set<Runnable>> confDirectoryListeners = new HashMap<>();
void watchZKConfDir(final String zkDir) {
log.info("watch zkdir {}" , zkDir);
if (!confDirectoryListeners.containsKey(zkDir)) {
confDirectoryListeners.put(zkDir, new HashSet<>());
setConfWatcher(zkDir, new WatcherImpl(zkDir), null);
}
}
private class WatcherImpl implements Watcher { private class WatcherImpl implements Watcher {
private final String zkDir; private final String zkDir;
@ -2335,7 +2332,6 @@ public final class ZkController {
} }
} }
} }
} }
private boolean fireEventListeners(String zkDir) { private boolean fireEventListeners(String zkDir) {
@ -2345,11 +2341,12 @@ public final class ZkController {
log.info("Watcher on {} is removed ", zkDir); log.info("Watcher on {} is removed ", zkDir);
return false; return false;
} }
Set<Runnable> listeners = confDirectoryListeners.get(zkDir); final Set<Runnable> listeners = confDirectoryListeners.get(zkDir);
if (listeners != null && !listeners.isEmpty()) { if (listeners != null && !listeners.isEmpty()) {
final Set<Runnable> listenersCopy = new HashSet<>(listeners); final Set<Runnable> listenersCopy = new HashSet<>(listeners);
new Thread() { new Thread() {
//run these in a separate thread because this can be long running // run these in a separate thread because this can be long running
@Override
public void run() { public void run() {
log.info("Running listeners for {}", zkDir); log.info("Running listeners for {}", zkDir);
for (final Runnable listener : listenersCopy) { for (final Runnable listener : listenersCopy) {
@ -2362,7 +2359,6 @@ public final class ZkController {
} }
}.start(); }.start();
} }
} }
return true; return true;
} }
@ -2371,7 +2367,7 @@ public final class ZkController {
try { try {
Stat newStat = zkClient.exists(zkDir, watcher, true); Stat newStat = zkClient.exists(zkDir, watcher, true);
if (stat != null && newStat.getVersion() > stat.getVersion()) { if (stat != null && newStat.getVersion() > stat.getVersion()) {
//a race condition where a we missed an even fired //a race condition where a we missed an event fired
//so fire the event listeners //so fire the event listeners
fireEventListeners(zkDir); fireEventListeners(zkDir);
} }
@ -2389,7 +2385,7 @@ public final class ZkController {
public void command() { public void command() {
synchronized (confDirectoryListeners) { synchronized (confDirectoryListeners) {
for (String s : confDirectoryListeners.keySet()) { for (String s : confDirectoryListeners.keySet()) {
watchZKConfDir(s); setConfWatcher(s, new WatcherImpl(s), null);
fireEventListeners(s); fireEventListeners(s);
} }
} }

View File

@ -48,7 +48,6 @@ public class ZkSolrResourceLoader extends SolrResourceLoader {
super(instanceDir); super(instanceDir);
this.zkController = zooKeeperController; this.zkController = zooKeeperController;
configSetZkPath = ZkConfigManager.CONFIGS_ZKNODE + "/" + configSet; configSetZkPath = ZkConfigManager.CONFIGS_ZKNODE + "/" + configSet;
zkController.watchZKConfDir(configSetZkPath);
} }
/** /**
@ -63,7 +62,6 @@ public class ZkSolrResourceLoader extends SolrResourceLoader {
super(instanceDir, parent, coreProperties); super(instanceDir, parent, coreProperties);
this.zkController = zooKeeperController; this.zkController = zooKeeperController;
configSetZkPath = ZkConfigManager.CONFIGS_ZKNODE + "/" + configSet; configSetZkPath = ZkConfigManager.CONFIGS_ZKNODE + "/" + configSet;
zkController.watchZKConfDir(configSetZkPath);
} }
/** /**

View File

@ -17,13 +17,25 @@
package org.apache.solr.core; package org.apache.solr.core;
import com.google.common.collect.ImmutableMap; import static com.google.common.base.Preconditions.*;
import com.google.common.collect.Maps;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.apache.solr.cloud.ZkController; import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkSolrResourceLoader; import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.handler.RequestHandlerBase; import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.admin.CollectionsHandler; import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.handler.admin.CoreAdminHandler; import org.apache.solr.handler.admin.CoreAdminHandler;
@ -37,21 +49,9 @@ import org.apache.solr.util.FileUtils;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.io.File; import com.google.common.collect.ImmutableMap;
import java.util.ArrayList; import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
/** /**
@ -508,6 +508,7 @@ public class CoreContainer {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Solr has close."); throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Solr has close.");
} }
SolrCore core = null;
try { try {
if (zkSys.getZkController() != null) { if (zkSys.getZkController() != null) {
@ -516,7 +517,7 @@ public class CoreContainer {
ConfigSet coreConfig = coreConfigService.getConfig(dcore); ConfigSet coreConfig = coreConfigService.getConfig(dcore);
log.info("Creating SolrCore '{}' using configuration from {}", dcore.getName(), coreConfig.getName()); log.info("Creating SolrCore '{}' using configuration from {}", dcore.getName(), coreConfig.getName());
SolrCore core = new SolrCore(dcore, coreConfig); core = new SolrCore(dcore, coreConfig);
solrCores.addCreated(core); solrCores.addCreated(core);
// always kick off recovery if we are in non-Cloud mode // always kick off recovery if we are in non-Cloud mode
@ -527,15 +528,17 @@ public class CoreContainer {
registerCore(dcore.getName(), core, publishState); registerCore(dcore.getName(), core, publishState);
return core; return core;
} catch (Exception e) { } catch (Exception e) {
coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e)); coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
log.error("Error creating core [{}]: {}", dcore.getName(), e.getMessage(), e); log.error("Error creating core [{}]: {}", dcore.getName(), e.getMessage(), e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e); final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e);
IOUtils.closeQuietly(core);
throw solrException;
} catch (Throwable t) { } catch (Throwable t) {
SolrException e = new SolrException(ErrorCode.SERVER_ERROR, "JVM Error creating core [" + dcore.getName() + "]: " + t.getMessage(), t); SolrException e = new SolrException(ErrorCode.SERVER_ERROR, "JVM Error creating core [" + dcore.getName() + "]: " + t.getMessage(), t);
log.error("Error creating core [{}]: {}", dcore.getName(), t.getMessage(), t); log.error("Error creating core [{}]: {}", dcore.getName(), t.getMessage(), t);
coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e)); coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
IOUtils.closeQuietly(core);
throw t; throw t;
} }
@ -694,7 +697,7 @@ public class CoreContainer {
if (zkSys.getZkController() != null) { if (zkSys.getZkController() != null) {
try { try {
zkSys.getZkController().unregister(name, cd, configSetZkPath); zkSys.getZkController().unregister(name, cd);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted while unregistering core [" + name + "] from cloud state"); throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted while unregistering core [" + name + "] from cloud state");

View File

@ -17,7 +17,9 @@
package org.apache.solr.core; package org.apache.solr.core;
import javax.xml.parsers.ParserConfigurationException; import static com.google.common.base.Preconditions.*;
import static org.apache.solr.common.params.CommonParams.*;
import java.io.Closeable; import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -50,7 +52,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -143,9 +144,6 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import static org.apache.solr.common.params.CommonParams.PATH;
/** /**
* *
@ -184,8 +182,8 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
private final PluginBag<UpdateRequestProcessorFactory> updateProcessors = new PluginBag<>(UpdateRequestProcessorFactory.class, this); private final PluginBag<UpdateRequestProcessorFactory> updateProcessors = new PluginBag<>(UpdateRequestProcessorFactory.class, this);
private final Map<String,UpdateRequestProcessorChain> updateProcessorChains; private final Map<String,UpdateRequestProcessorChain> updateProcessorChains;
private final Map<String, SolrInfoMBean> infoRegistry; private final Map<String, SolrInfoMBean> infoRegistry;
private IndexDeletionPolicyWrapper solrDelPolicy; private final IndexDeletionPolicyWrapper solrDelPolicy;
private DirectoryFactory directoryFactory; private final DirectoryFactory directoryFactory;
private IndexReaderFactory indexReaderFactory; private IndexReaderFactory indexReaderFactory;
private final Codec codec; private final Codec codec;
private final MemClassLoader memClassLoader; private final MemClassLoader memClassLoader;
@ -196,13 +194,14 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
public long getStartTime() { return startTime; } public long getStartTime() { return startTime; }
private RestManager restManager; private final RestManager restManager;
public RestManager getRestManager() { public RestManager getRestManager() {
return restManager; return restManager;
} }
static int boolean_query_max_clause_count = Integer.MIN_VALUE; static int boolean_query_max_clause_count = Integer.MIN_VALUE;
// only change the BooleanQuery maxClauseCount once for ALL cores... // only change the BooleanQuery maxClauseCount once for ALL cores...
void booleanQueryMaxClauseCount() { void booleanQueryMaxClauseCount() {
synchronized(SolrCore.class) { synchronized(SolrCore.class) {
@ -210,7 +209,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
boolean_query_max_clause_count = solrConfig.booleanQueryMaxClauseCount; boolean_query_max_clause_count = solrConfig.booleanQueryMaxClauseCount;
BooleanQuery.setMaxClauseCount(boolean_query_max_clause_count); BooleanQuery.setMaxClauseCount(boolean_query_max_clause_count);
} else if (boolean_query_max_clause_count != solrConfig.booleanQueryMaxClauseCount ) { } else if (boolean_query_max_clause_count != solrConfig.booleanQueryMaxClauseCount ) {
log.debug("BooleanQuery.maxClauseCount= " +boolean_query_max_clause_count+ ", ignoring " +solrConfig.booleanQueryMaxClauseCount); log.debug("BooleanQuery.maxClauseCount={}, ignoring {}", boolean_query_max_clause_count, solrConfig.booleanQueryMaxClauseCount);
} }
} }
} }
@ -367,33 +366,37 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
return infoRegistry; return infoRegistry;
} }
private void initDeletionPolicy() { private IndexDeletionPolicyWrapper initDeletionPolicy(IndexDeletionPolicyWrapper delPolicyWrapper) {
PluginInfo info = solrConfig.getPluginInfo(IndexDeletionPolicy.class.getName()); if (delPolicyWrapper != null) {
IndexDeletionPolicy delPolicy = null; return delPolicyWrapper;
if(info != null){ }
delPolicy = createInstance(info.className, IndexDeletionPolicy.class, "Deletion Policy for SOLR", this, getResourceLoader());
if (delPolicy instanceof NamedListInitializedPlugin) { final PluginInfo info = solrConfig.getPluginInfo(IndexDeletionPolicy.class.getName());
((NamedListInitializedPlugin) delPolicy).init(info.initArgs); final IndexDeletionPolicy delPolicy;
} if (info != null) {
} else { delPolicy = createInstance(info.className, IndexDeletionPolicy.class, "Deletion Policy for SOLR", this, getResourceLoader());
delPolicy = new SolrDeletionPolicy(); if (delPolicy instanceof NamedListInitializedPlugin) {
} ((NamedListInitializedPlugin) delPolicy).init(info.initArgs);
solrDelPolicy = new IndexDeletionPolicyWrapper(delPolicy); }
} } else {
delPolicy = new SolrDeletionPolicy();
}
return new IndexDeletionPolicyWrapper(delPolicy);
}
private void initListeners() { private void initListeners() {
final Class<SolrEventListener> clazz = SolrEventListener.class; final Class<SolrEventListener> clazz = SolrEventListener.class;
final String label = "Event Listener"; final String label = "Event Listener";
for (PluginInfo info : solrConfig.getPluginInfos(SolrEventListener.class.getName())) { for (PluginInfo info : solrConfig.getPluginInfos(SolrEventListener.class.getName())) {
String event = info.attributes.get("event"); final String event = info.attributes.get("event");
if("firstSearcher".equals(event) ){ if ("firstSearcher".equals(event)) {
SolrEventListener obj = createInitInstance(info,clazz,label,null); SolrEventListener obj = createInitInstance(info, clazz, label, null);
firstSearcherListeners.add(obj); firstSearcherListeners.add(obj);
log.info(logid + "Added SolrEventListener for firstSearcher: " + obj); log.info("[{}] Added SolrEventListener for firstSearcher: [{}]", logid, obj);
} else if("newSearcher".equals(event) ){ } else if ("newSearcher".equals(event)) {
SolrEventListener obj = createInitInstance(info,clazz,label,null); SolrEventListener obj = createInitInstance(info, clazz, label, null);
newSearcherListeners.add(obj); newSearcherListeners.add(obj);
log.info(logid + "Added SolrEventListener for newSearcher: " + obj); log.info("[{}] Added SolrEventListener for newSearcher: [{}]", logid, obj);
} }
} }
} }
@ -436,36 +439,39 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
return responseWriters.put(name, responseWriter); return responseWriters.put(name, responseWriter);
} }
public SolrCore reload(ConfigSet coreConfig) throws IOException, public SolrCore reload(ConfigSet coreConfig) throws IOException {
ParserConfigurationException, SAXException {
solrCoreState.increfSolrCoreState(); solrCoreState.increfSolrCoreState();
SolrCore currentCore; final SolrCore currentCore;
boolean indexDirChange = !getNewIndexDir().equals(getIndexDir()); if (!getNewIndexDir().equals(getIndexDir())) {
if (indexDirChange) {
// the directory is changing, don't pass on state // the directory is changing, don't pass on state
currentCore = null; currentCore = null;
} else { } else {
currentCore = this; currentCore = this;
} }
SolrCore core = new SolrCore(getName(), getDataDir(), coreConfig.getSolrConfig(), boolean success = false;
coreConfig.getIndexSchema(), coreDescriptor, updateHandler, this.solrDelPolicy, currentCore); SolrCore core = null;
core.solrDelPolicy = this.solrDelPolicy; try {
core = new SolrCore(getName(), getDataDir(), coreConfig.getSolrConfig(),
coreConfig.getIndexSchema(), coreDescriptor, updateHandler, solrDelPolicy, currentCore);
// we open a new indexwriter to pick up the latest config
core.getUpdateHandler().getSolrCoreState().newIndexWriter(core, false); // we open a new IndexWriter to pick up the latest config
core.getUpdateHandler().getSolrCoreState().newIndexWriter(core, false);
core.getSearcher(true, false, null, true);
core.getSearcher(true, false, null, true);
return core; success = true;
return core;
} finally {
// close the new core on any errors that have occurred.
if (!success) {
IOUtils.closeQuietly(core);
}
}
} }
private DirectoryFactory initDirectoryFactory() {
private void initDirectoryFactory() { final PluginInfo info = solrConfig.getPluginInfo(DirectoryFactory.class.getName());
DirectoryFactory dirFactory; final DirectoryFactory dirFactory;
PluginInfo info = solrConfig.getPluginInfo(DirectoryFactory.class.getName());
if (info != null) { if (info != null) {
log.info(info.className); log.info(info.className);
dirFactory = getResourceLoader().newInstance(info.className, DirectoryFactory.class); dirFactory = getResourceLoader().newInstance(info.className, DirectoryFactory.class);
@ -474,8 +480,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
log.info("solr.NRTCachingDirectoryFactory"); log.info("solr.NRTCachingDirectoryFactory");
dirFactory = new NRTCachingDirectoryFactory(); dirFactory = new NRTCachingDirectoryFactory();
} }
// And set it return dirFactory;
directoryFactory = dirFactory;
} }
private void initIndexReaderFactory() { private void initIndexReaderFactory() {
@ -645,6 +650,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
* @param schema a solr schema instance * @param schema a solr schema instance
* *
* @since solr 1.3 * @since solr 1.3
* @deprecated will be removed in the next release
*/ */
public SolrCore(String name, String dataDir, SolrConfig config, IndexSchema schema, CoreDescriptor cd) { public SolrCore(String name, String dataDir, SolrConfig config, IndexSchema schema, CoreDescriptor cd) {
this(name, dataDir, config, schema, cd, null, null, null); this(name, dataDir, config, schema, cd, null, null, null);
@ -654,13 +660,14 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
this(cd.getName(), null, coreConfig.getSolrConfig(), coreConfig.getIndexSchema(), cd, null, null, null); this(cd.getName(), null, coreConfig.getSolrConfig(), coreConfig.getIndexSchema(), cd, null, null, null);
} }
/** /**
* Creates a new core that is to be loaded lazily. i.e. lazyLoad="true" in solr.xml * Creates a new core that is to be loaded lazily. i.e. lazyLoad="true" in solr.xml
*
* @since solr 4.1 * @since solr 4.1
* @deprecated will be removed in the next release
*/ */
public SolrCore(String name, CoreDescriptor cd) { public SolrCore(String name, CoreDescriptor coreDescriptor) {
coreDescriptor = cd; this.coreDescriptor = coreDescriptor;
this.setName(name); this.setName(name);
this.schema = null; this.schema = null;
this.dataDir = null; this.dataDir = null;
@ -678,97 +685,59 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
this.codec = null; this.codec = null;
this.ruleExpiryLock = null; this.ruleExpiryLock = null;
this.memClassLoader = null; this.memClassLoader = null;
this.directoryFactory = null;
solrCoreState = null; this.solrCoreState = null;
this.restManager = null;
this.solrDelPolicy = null;
} }
/** /**
* Creates a new core and register it in the list of cores. * Creates a new core and register it in the list of cores. If a core with the
* If a core with the same name already exists, it will be stopped and replaced by this one. * same name already exists, it will be stopped and replaced by this one.
*@param dataDir the index directory
*@param config a solr config instance
*@param schema a solr schema instance
* *
*@since solr 1.3 * @param dataDir
* the index directory
* @param config
* a solr config instance
* @param schema
* a solr schema instance
*
* @since solr 1.3
*/ */
public SolrCore(String name, String dataDir, SolrConfig config, IndexSchema schema, CoreDescriptor cd, UpdateHandler updateHandler, IndexDeletionPolicyWrapper delPolicy, SolrCore prev) { public SolrCore(String name, String dataDir, SolrConfig config,
coreDescriptor = cd; IndexSchema schema, CoreDescriptor coreDescriptor, UpdateHandler updateHandler,
this.setName( name ); IndexDeletionPolicyWrapper delPolicy, SolrCore prev) {
checkNotNull(coreDescriptor, "coreDescriptor cannot be null");
this.coreDescriptor = coreDescriptor;
setName(name);
MDCUtils.setCore(name); // show the core name in the error logs MDCUtils.setCore(name); // show the core name in the error logs
resourceLoader = config.getResourceLoader(); resourceLoader = config.getResourceLoader();
this.solrConfig = config; this.solrConfig = config;
if (updateHandler == null) { if (updateHandler == null) {
initDirectoryFactory(); directoryFactory = initDirectoryFactory();
solrCoreState = new DefaultSolrCoreState(directoryFactory);
} else {
solrCoreState = updateHandler.getSolrCoreState();
directoryFactory = solrCoreState.getDirectoryFactory();
isReloaded = true;
} }
if (dataDir == null) { this.dataDir = initDataDir(dataDir, config, coreDescriptor);
if (cd.usingDefaultDataDir()) dataDir = config.getDataDir(); this.ulogDir = initUpdateLogDir(coreDescriptor);
if (dataDir == null) {
try {
dataDir = cd.getDataDir();
if (!directoryFactory.isAbsolute(dataDir)) {
dataDir = directoryFactory.getDataHome(cd);
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, e);
}
}
}
dataDir = SolrResourceLoader.normalizeDir(dataDir);
String updateLogDir = cd.getUlogDir(); log.info("[{}] Opening new SolrCore at [{}], dataDir=[{}]", logid, resourceLoader.getInstanceDir(), dataDir);
if (updateLogDir == null) {
updateLogDir = dataDir;
if (new File(updateLogDir).isAbsolute() == false) {
updateLogDir = SolrResourceLoader.normalizeDir(cd.getInstanceDir()) + updateLogDir;
}
}
ulogDir = updateLogDir;
checkVersionFieldExistsInSchema(schema, coreDescriptor);
log.info(logid+"Opening new SolrCore at " + resourceLoader.getInstanceDir() + ", dataDir="+dataDir); // Initialize JMX
this.infoRegistry = initInfoRegistry(name, config);
if (null != cd && null != cd.getCloudDescriptor()) {
// we are evidently running in cloud mode.
//
// In cloud mode, version field is required for correct consistency
// ideally this check would be more fine grained, and individual features
// would assert it when they initialize, but DistributedUpdateProcessor
// is currently a big ball of wax that does more then just distributing
// updates (ie: partial document updates), so it needs to work in no cloud
// mode as well, and can't assert version field support on init.
try {
VersionInfo.getAndCheckVersionField(schema);
} catch (SolrException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Schema will not work with SolrCloud mode: " +
e.getMessage(), e);
}
}
//Initialize JMX
if (config.jmxConfig.enabled) {
infoRegistry = new JmxMonitoredMap<String, SolrInfoMBean>(name, String.valueOf(this.hashCode()), config.jmxConfig);
} else {
log.info("JMX monitoring not detected for core: " + name);
infoRegistry = new ConcurrentHashMap<>();
}
infoRegistry.put("fieldCache", new SolrFieldCacheMBean()); infoRegistry.put("fieldCache", new SolrFieldCacheMBean());
if (schema==null) { this.schema = initSchema(config, schema);
schema = IndexSchemaFactory.buildIndexSchema(IndexSchema.DEFAULT_SCHEMA_FILE, config);
}
this.schema = schema;
final SimilarityFactory similarityFactory = schema.getSimilarityFactory();
if (similarityFactory instanceof SolrCoreAware) {
// Similarity needs SolrCore before inform() is called on all registered SolrCoreAware listeners below
((SolrCoreAware)similarityFactory).inform(this);
}
this.dataDir = dataDir;
this.startTime = System.currentTimeMillis(); this.startTime = System.currentTimeMillis();
this.maxWarmingSearchers = config.maxWarmingSearchers; this.maxWarmingSearchers = config.maxWarmingSearchers;
this.slowQueryThresholdMillis = config.slowQueryThresholdMillis; this.slowQueryThresholdMillis = config.slowQueryThresholdMillis;
@ -781,21 +750,10 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
initListeners(); initListeners();
if (delPolicy == null) { this.solrDelPolicy = initDeletionPolicy(delPolicy);
initDeletionPolicy();
} else {
this.solrDelPolicy = delPolicy;
}
this.codec = initCodec(solrConfig, schema); this.codec = initCodec(solrConfig, this.schema);
if (updateHandler == null) {
solrCoreState = new DefaultSolrCoreState(getDirectoryFactory());
} else {
solrCoreState = updateHandler.getSolrCoreState();
directoryFactory = solrCoreState.getDirectoryFactory();
this.isReloaded = true;
}
memClassLoader = new MemClassLoader(PluginBag.RuntimeLib.getLibObjects(this, solrConfig.getPluginInfos(PluginBag.RuntimeLib.class.getName())), getResourceLoader()); memClassLoader = new MemClassLoader(PluginBag.RuntimeLib.getLibObjects(this, solrConfig.getPluginInfos(PluginBag.RuntimeLib.class.getName())), getResourceLoader());
initIndex(prev != null); initIndex(prev != null);
@ -804,7 +762,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
valueSourceParsers.init(ValueSourceParser.standardValueSourceParsers, this); valueSourceParsers.init(ValueSourceParser.standardValueSourceParsers, this);
transformerFactories.init(TransformerFactory.defaultFactories, this); transformerFactories.init(TransformerFactory.defaultFactories, this);
loadSearchComponents(); loadSearchComponents();
updateProcessors.init(Collections.EMPTY_MAP, this); updateProcessors.init(Collections.emptyMap(), this);
// Processors initialized before the handlers // Processors initialized before the handlers
updateProcessorChains = loadUpdateProcessorChains(); updateProcessorChains = loadUpdateProcessorChains();
@ -827,42 +785,9 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
} }
}); });
// use the (old) writer to open the first searcher this.updateHandler = initUpdateHandler(updateHandler);
RefCounted<IndexWriter> iwRef = null;
if (prev != null) { initSearcher(prev);
iwRef = prev.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
if (iwRef != null) {
final IndexWriter iw = iwRef.get();
final SolrCore core = this;
newReaderCreator = new Callable<DirectoryReader>() {
// this is used during a core reload
@Override
public DirectoryReader call() throws Exception {
return indexReaderFactory.newReader(iw, core);
}
};
}
}
String updateHandlerClass = solrConfig.getUpdateHandlerInfo().className;
if (updateHandler == null) {
this.updateHandler = createUpdateHandler(updateHandlerClass == null ? DirectUpdateHandler2.class
.getName() : updateHandlerClass);
} else {
this.updateHandler = createUpdateHandler(
updateHandlerClass == null ? DirectUpdateHandler2.class.getName()
: updateHandlerClass, updateHandler);
}
infoRegistry.put("updateHandler", this.updateHandler);
try {
getSearcher(false, false, null, true);
} finally {
newReaderCreator = null;
if (iwRef != null) iwRef.decref();
}
// Initialize the RestManager // Initialize the RestManager
restManager = initRestManager(); restManager = initRestManager();
@ -871,23 +796,25 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
resourceLoader.inform(resourceLoader); resourceLoader.inform(resourceLoader);
resourceLoader.inform(this); // last call before the latch is released. resourceLoader.inform(this); // last call before the latch is released.
} catch (Throwable e) { } catch (Throwable e) {
latch.countDown();//release the latch, otherwise we block trying to do the close. This should be fine, since counting down on a latch of 0 is still fine // release the latch, otherwise we block trying to do the close. This
//close down the searcher and any other resources, if it exists, as this is not recoverable // should be fine, since counting down on a latch of 0 is still fine
latch.countDown();
if (e instanceof OutOfMemoryError) { if (e instanceof OutOfMemoryError) {
throw (OutOfMemoryError)e; throw (OutOfMemoryError)e;
} }
try { try {
this.close(); // close down the searcher and any other resources, if it exists, as this
// is not recoverable
close();
} catch (Throwable t) { } catch (Throwable t) {
if (t instanceof OutOfMemoryError) { if (t instanceof OutOfMemoryError) {
throw (OutOfMemoryError)t; throw (OutOfMemoryError) t;
} }
log.error("Error while closing", t); log.error("Error while closing", t);
} }
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e.getMessage(), e);
e.getMessage(), e);
} finally { } finally {
// allow firstSearcher events to fire and make sure it is released // allow firstSearcher events to fire and make sure it is released
latch.countDown(); latch.countDown();
@ -903,32 +830,153 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
// from the core. // from the core.
resourceLoader.inform(infoRegistry); resourceLoader.inform(infoRegistry);
CoreContainer cc = cd.getCoreContainer(); bufferUpdatesIfConstructing(coreDescriptor);
// For debugging
// numOpens.incrementAndGet();
// openHandles.put(this, new RuntimeException("unclosed core - name:" + getName() + " refs: " + refCount.get()));
this.ruleExpiryLock = new ReentrantLock();
registerConfListener();
}
/** Set UpdateLog to buffer updates if the slice is in construction. */
private void bufferUpdatesIfConstructing(CoreDescriptor coreDescriptor) {
final CoreContainer cc = coreDescriptor.getCoreContainer();
if (cc != null && cc.isZooKeeperAware()) { if (cc != null && cc.isZooKeeperAware()) {
SolrRequestHandler realtimeGetHandler = reqHandlers.get("/get"); if (reqHandlers.get("/get") == null) {
if (realtimeGetHandler == null) {
log.warn("WARNING: RealTimeGetHandler is not registered at /get. " + log.warn("WARNING: RealTimeGetHandler is not registered at /get. " +
"SolrCloud will always use full index replication instead of the more efficient PeerSync method."); "SolrCloud will always use full index replication instead of the more efficient PeerSync method.");
} }
// ZK pre-Register would have already happened so we read slice properties now // ZK pre-register would have already happened so we read slice properties now
ClusterState clusterState = cc.getZkController().getClusterState(); final ClusterState clusterState = cc.getZkController().getClusterState();
Slice slice = clusterState.getSlice(cd.getCloudDescriptor().getCollectionName(), final Slice slice = clusterState.getSlice(coreDescriptor.getCloudDescriptor().getCollectionName(),
cd.getCloudDescriptor().getShardId()); coreDescriptor.getCloudDescriptor().getShardId());
if (slice.getState() == Slice.State.CONSTRUCTION) { if (slice.getState() == Slice.State.CONSTRUCTION) {
// set update log to buffer before publishing the core // set update log to buffer before publishing the core
getUpdateHandler().getUpdateLog().bufferUpdates(); getUpdateHandler().getUpdateLog().bufferUpdates();
} }
} }
// For debugging
// numOpens.incrementAndGet();
// openHandles.put(this, new RuntimeException("unclosed core - name:" + getName() + " refs: " + refCount.get()));
ruleExpiryLock = new ReentrantLock();
registerConfListener();
} }
private void initSearcher(SolrCore prev) throws IOException {
// use the (old) writer to open the first searcher
RefCounted<IndexWriter> iwRef = null;
if (prev != null) {
iwRef = prev.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
if (iwRef != null) {
final IndexWriter iw = iwRef.get();
final SolrCore core = this;
newReaderCreator = new Callable<DirectoryReader>() {
// this is used during a core reload
@Override
public DirectoryReader call() throws Exception {
return indexReaderFactory.newReader(iw, core);
}
};
}
}
try {
getSearcher(false, false, null, true);
} finally {
newReaderCreator = null;
if (iwRef != null) {
iwRef.decref();
}
}
}
private UpdateHandler initUpdateHandler(UpdateHandler updateHandler) {
String updateHandlerClass = solrConfig.getUpdateHandlerInfo().className;
if (updateHandlerClass == null) {
updateHandlerClass = DirectUpdateHandler2.class.getName();
}
final UpdateHandler newUpdateHandler;
if (updateHandler == null) {
newUpdateHandler = createUpdateHandler(updateHandlerClass);
} else {
newUpdateHandler = createUpdateHandler(updateHandlerClass, updateHandler);
}
infoRegistry.put("updateHandler", newUpdateHandler);
return newUpdateHandler;
}
private IndexSchema initSchema(SolrConfig config, IndexSchema schema) {
if (schema == null) {
schema = IndexSchemaFactory.buildIndexSchema(IndexSchema.DEFAULT_SCHEMA_FILE, config);
}
final SimilarityFactory similarityFactory = schema.getSimilarityFactory();
if (similarityFactory instanceof SolrCoreAware) {
// Similarity needs SolrCore before inform() is called on all registered SolrCoreAware listeners below
((SolrCoreAware) similarityFactory).inform(this);
}
return schema;
}
private Map<String,SolrInfoMBean> initInfoRegistry(String name, SolrConfig config) {
if (config.jmxConfig.enabled) {
return new JmxMonitoredMap<String, SolrInfoMBean>(name, String.valueOf(this.hashCode()), config.jmxConfig);
} else {
log.info("JMX monitoring not detected for core: " + name);
return new ConcurrentHashMap<>();
}
}
private void checkVersionFieldExistsInSchema(IndexSchema schema, CoreDescriptor coreDescriptor) {
if (null != coreDescriptor.getCloudDescriptor()) {
// we are evidently running in cloud mode.
//
// In cloud mode, version field is required for correct consistency
// ideally this check would be more fine grained, and individual features
// would assert it when they initialize, but DistributedUpdateProcessor
// is currently a big ball of wax that does more then just distributing
// updates (ie: partial document updates), so it needs to work in no cloud
// mode as well, and can't assert version field support on init.
try {
VersionInfo.getAndCheckVersionField(schema);
} catch (SolrException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Schema will not work with SolrCloud mode: " +
e.getMessage(), e);
}
}
}
private String initDataDir(String dataDir, SolrConfig config, CoreDescriptor coreDescriptor) {
if (dataDir == null) {
if (coreDescriptor.usingDefaultDataDir()) {
dataDir = config.getDataDir();
}
if (dataDir == null) {
try {
dataDir = coreDescriptor.getDataDir();
if (!directoryFactory.isAbsolute(dataDir)) {
dataDir = directoryFactory.getDataHome(coreDescriptor);
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
}
return SolrResourceLoader.normalizeDir(dataDir);
}
private String initUpdateLogDir(CoreDescriptor coreDescriptor) {
String updateLogDir = coreDescriptor.getUlogDir();
if (updateLogDir == null) {
updateLogDir = dataDir;
if (new File(updateLogDir).isAbsolute() == false) {
updateLogDir = SolrResourceLoader.normalizeDir(coreDescriptor.getInstanceDir()) + updateLogDir;
}
}
return updateLogDir;
}
private Codec initCodec(SolrConfig solrConfig, final IndexSchema schema) { private Codec initCodec(SolrConfig solrConfig, final IndexSchema schema) {
final PluginInfo info = solrConfig.getPluginInfo(CodecFactory.class.getName()); final PluginInfo info = solrConfig.getPluginInfo(CodecFactory.class.getName());
final CodecFactory factory; final CodecFactory factory;
@ -1070,6 +1118,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
* </ul> * </ul>
* @see #isClosed() * @see #isClosed()
*/ */
@Override
public void close() { public void close() {
int count = refCount.decrementAndGet(); int count = refCount.decrementAndGet();
if (count > 0) return; // close is called often, and only actually closes if nothing is using it. if (count > 0) return; // close is called often, and only actually closes if nothing is using it.
@ -1080,7 +1129,6 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
} }
log.info(logid+" CLOSING SolrCore " + this); log.info(logid+" CLOSING SolrCore " + this);
if( closeHooks != null ) { if( closeHooks != null ) {
for( CloseHook hook : closeHooks ) { for( CloseHook hook : closeHooks ) {
try { try {

View File

@ -605,7 +605,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
catch (Exception ex) { catch (Exception ex) {
if (coreContainer.isZooKeeperAware() && dcore != null && !preExisitingZkEntry) { if (coreContainer.isZooKeeperAware() && dcore != null && !preExisitingZkEntry) {
try { try {
coreContainer.getZkController().unregister(dcore.getName(), dcore,null); coreContainer.getZkController().unregister(dcore.getName(), dcore);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
SolrException.log(log, null, e); SolrException.log(log, null, e);

View File

@ -22,10 +22,12 @@ import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.CollectionAdminResponse;
@ -33,6 +35,7 @@ import org.apache.zookeeper.KeeperException;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@Nightly
public class ConcurrentDeleteAndCreateCollectionTest extends SolrTestCaseJ4 { public class ConcurrentDeleteAndCreateCollectionTest extends SolrTestCaseJ4 {
private MiniSolrCloudCluster solrCluster; private MiniSolrCloudCluster solrCluster;
@ -61,7 +64,8 @@ public class ConcurrentDeleteAndCreateCollectionTest extends SolrTestCaseJ4 {
final String collectionName = "collection" + i; final String collectionName = "collection" + i;
uploadConfig(configDir, collectionName); uploadConfig(configDir, collectionName);
final SolrClient solrClient = new HttpSolrClient(solrCluster.getJettySolrRunners().get(0).getBaseUrl().toString()); final SolrClient solrClient = new HttpSolrClient(solrCluster.getJettySolrRunners().get(0).getBaseUrl().toString());
threads[i] = new CreateDeleteCollectionThread("create-delete-" + i, collectionName, timeToRunSec, solrClient, failure); threads[i] = new CreateDeleteSearchCollectionThread("create-delete-search-" + i, collectionName, collectionName,
timeToRunSec, solrClient, failure);
} }
startAll(threads); startAll(threads);
@ -70,6 +74,33 @@ public class ConcurrentDeleteAndCreateCollectionTest extends SolrTestCaseJ4 {
assertNull("concurrent create and delete collection failed: " + failure.get(), failure.get()); assertNull("concurrent create and delete collection failed: " + failure.get(), failure.get());
} }
public void testConcurrentCreateAndDeleteOverTheSameConfig() {
Logger.getLogger("org.apache.solr").setLevel(Level.WARN);
final String configName = "testconfig";
final File configDir = getFile("solr").toPath().resolve("configsets/configset-2/conf").toFile();
uploadConfig(configDir, configName); // upload config once, to be used by all collections
final SolrClient solrClient = new HttpSolrClient(solrCluster.getJettySolrRunners().get(0).getBaseUrl().toString());
final AtomicReference<Exception> failure = new AtomicReference<>();
final int timeToRunSec = 30;
final Thread[] threads = new Thread[2];
for (int i = 0; i < threads.length; i++) {
final String collectionName = "collection" + i;
threads[i] = new CreateDeleteCollectionThread("create-delete-" + i, collectionName, configName,
timeToRunSec, solrClient, failure);
}
startAll(threads);
joinAll(threads);
assertNull("concurrent create and delete collection failed: " + failure.get(), failure.get());
try {
solrClient.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void uploadConfig(File configDir, String configName) { private void uploadConfig(File configDir, String configName) {
try { try {
solrCluster.uploadConfigDir(configDir, configName); solrCluster.uploadConfigDir(configDir, configName);
@ -96,58 +127,59 @@ public class ConcurrentDeleteAndCreateCollectionTest extends SolrTestCaseJ4 {
} }
private static class CreateDeleteCollectionThread extends Thread { private static class CreateDeleteCollectionThread extends Thread {
private final String collectionName; protected final String collectionName;
private final long timeToRunSec; protected final String configName;
private final SolrClient solrClient; protected final long timeToRunSec;
private final AtomicReference<Exception> failure; protected final SolrClient solrClient;
protected final AtomicReference<Exception> failure;
public CreateDeleteCollectionThread(String name, String collectionName, public CreateDeleteCollectionThread(String name, String collectionName, String configName, long timeToRunSec,
long timeToRunSec, SolrClient solrClient, AtomicReference<Exception> failure) { SolrClient solrClient, AtomicReference<Exception> failure) {
super(name); super(name);
this.collectionName = collectionName; this.collectionName = collectionName;
this.timeToRunSec = timeToRunSec; this.timeToRunSec = timeToRunSec;
this.solrClient = solrClient; this.solrClient = solrClient;
this.failure = failure; this.failure = failure;
this.configName = configName;
} }
@Override @Override
public void run() { public void run() {
final long timeToStop = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeToRunSec); final long timeToStop = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeToRunSec);
while (System.currentTimeMillis() < timeToStop && failure.get() == null) { while (System.currentTimeMillis() < timeToStop && failure.get() == null) {
createCollection(collectionName); doWork();
deleteCollection();
searchNonExistingCollection();
} }
} }
private void searchNonExistingCollection() { protected void doWork() {
try { createCollection();
solrClient.query(collectionName, new SolrQuery("*")); deleteCollection();
} catch (Exception e) { }
if (!e.getMessage().contains("not found") && !e.getMessage().contains("Can not find")) {
synchronized (failure) { protected void addFailure(Exception e) {
if (failure.get() != null) { synchronized (failure) {
failure.get().addSuppressed(e); if (failure.get() != null) {
} else { failure.get().addSuppressed(e);
failure.set(e); } else {
} failure.set(e);
}
} }
} }
} }
private void createCollection(String collectionName) { private void createCollection() {
try { try {
final CollectionAdminRequest.Create createCollectionRequest = new CollectionAdminRequest.Create(); final CollectionAdminRequest.Create createCollectionRequest = new CollectionAdminRequest.Create();
createCollectionRequest.setCollectionName(collectionName); createCollectionRequest.setCollectionName(collectionName);
createCollectionRequest.setNumShards(1); createCollectionRequest.setNumShards(1);
createCollectionRequest.setReplicationFactor(1); createCollectionRequest.setReplicationFactor(1);
createCollectionRequest.setConfigName(collectionName); createCollectionRequest.setConfigName(configName);
final CollectionAdminResponse response = createCollectionRequest.process(solrClient); final CollectionAdminResponse response = createCollectionRequest.process(solrClient);
assertEquals(0, response.getStatus()); if (response.getStatus() != 0) {
} catch (IOException | SolrServerException e) { addFailure(new RuntimeException("failed to create collection " + collectionName));
throw new RuntimeException(e); }
} catch (Exception e) {
addFailure(e);
} }
} }
@ -158,11 +190,38 @@ public class ConcurrentDeleteAndCreateCollectionTest extends SolrTestCaseJ4 {
deleteCollectionRequest.setCollectionName(collectionName); deleteCollectionRequest.setCollectionName(collectionName);
final CollectionAdminResponse response = deleteCollectionRequest.process(solrClient); final CollectionAdminResponse response = deleteCollectionRequest.process(solrClient);
assertEquals(0, response.getStatus()); if (response.getStatus() != 0) {
} catch (IOException | SolrServerException e) { addFailure(new RuntimeException("failed to delete collection " + collectionName));
throw new RuntimeException(e); }
} catch (Exception e) {
addFailure(e);
} }
} }
} }
private static class CreateDeleteSearchCollectionThread extends CreateDeleteCollectionThread {
public CreateDeleteSearchCollectionThread(String name, String collectionName, String configName, long timeToRunSec,
SolrClient solrClient, AtomicReference<Exception> failure) {
super(name, collectionName, configName, timeToRunSec, solrClient, failure);
}
@Override
protected void doWork() {
super.doWork();
searchNonExistingCollection();
}
private void searchNonExistingCollection() {
try {
solrClient.query(collectionName, new SolrQuery("*"));
} catch (Exception e) {
if (!e.getMessage().contains("not found") && !e.getMessage().contains("Can not find")) {
addFailure(e);
}
}
}
}
} }