Fix for SOLR-4557, fixes for reload testing. Also made a series of changes to make reloading cores more robust

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1455606 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Erick Erickson 2013-03-12 16:42:26 +00:00
parent b25cc528d9
commit 22401bab04
2 changed files with 157 additions and 141 deletions

View File

@ -497,7 +497,6 @@ public class CoreContainer
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new DefaultSolrThreadFactory("coreLoadExecutor")); new DefaultSolrThreadFactory("coreLoadExecutor"));
try { try {
CompletionService<SolrCore> completionService = new ExecutorCompletionService<SolrCore>( CompletionService<SolrCore> completionService = new ExecutorCompletionService<SolrCore>(
coreLoadExecutor); coreLoadExecutor);
Set<Future<SolrCore>> pending = new HashSet<Future<SolrCore>>(); Set<Future<SolrCore>> pending = new HashSet<Future<SolrCore>>();
@ -588,7 +587,6 @@ public class CoreContainer
return c; return c;
} }
}; };
pending.add(completionService.submit(task)); pending.add(completionService.submit(task));
} else { } else {
@ -607,7 +605,7 @@ public class CoreContainer
Future<SolrCore> future = completionService.take(); Future<SolrCore> future = completionService.take();
if (future == null) return; if (future == null) return;
pending.remove(future); pending.remove(future);
try { try {
SolrCore c = future.get(); SolrCore c = future.get();
// track original names // track original names
@ -680,7 +678,7 @@ public class CoreContainer
try { try {
// First allow the closer thread to drain all the pending closes it can. // First wake up the closer thread, it'll terminate almost immediately since it checks isShutDown.
synchronized (coreMaps.getLocker()) { synchronized (coreMaps.getLocker()) {
coreMaps.getLocker().notifyAll(); // wake up anyone waiting coreMaps.getLocker().notifyAll(); // wake up anyone waiting
} }
@ -1032,53 +1030,55 @@ public class CoreContainer
*/ */
public void reload(String name) { public void reload(String name) {
try { try {
name = checkDefault(name);
name= checkDefault(name);
SolrCore core = coreMaps.getCore(name); SolrCore core = coreMaps.getCore(name);
if (core == null) if (core == null)
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "No such core: " + name ); throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "No such core: " + name );
CoreDescriptor cd = core.getCoreDescriptor(); try {
coreMaps.waitAddPendingCoreOps(name);
File instanceDir = new File(cd.getInstanceDir()); CoreDescriptor cd = core.getCoreDescriptor();
log.info("Reloading SolrCore '{}' using instanceDir: {}", File instanceDir = new File(cd.getInstanceDir());
cd.getName(), instanceDir.getAbsolutePath());
SolrResourceLoader solrLoader;
if(zkController == null) {
solrLoader = new SolrResourceLoader(instanceDir.getAbsolutePath(), libLoader, SolrProperties.getCoreProperties(instanceDir.getAbsolutePath(), cd));
} else {
try {
String collection = cd.getCloudDescriptor().getCollectionName();
zkController.createCollectionZkNode(cd.getCloudDescriptor());
String zkConfigName = zkController.readConfigName(collection); log.info("Reloading SolrCore '{}' using instanceDir: {}",
if (zkConfigName == null) { cd.getName(), instanceDir.getAbsolutePath());
log.error("Could not find config name for collection:" + collection); SolrResourceLoader solrLoader;
if(zkController == null) {
solrLoader = new SolrResourceLoader(instanceDir.getAbsolutePath(), libLoader, SolrProperties.getCoreProperties(instanceDir.getAbsolutePath(), cd));
} else {
try {
String collection = cd.getCloudDescriptor().getCollectionName();
zkController.createCollectionZkNode(cd.getCloudDescriptor());
String zkConfigName = zkController.readConfigName(collection);
if (zkConfigName == null) {
log.error("Could not find config name for collection:" + collection);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"Could not find config name for collection:" + collection);
}
solrLoader = new ZkSolrResourceLoader(instanceDir.getAbsolutePath(), zkConfigName, libLoader,
SolrProperties.getCoreProperties(instanceDir.getAbsolutePath(), cd), zkController);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"Could not find config name for collection:" + collection); "", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
} }
solrLoader = new ZkSolrResourceLoader(instanceDir.getAbsolutePath(), zkConfigName, libLoader,
SolrProperties.getCoreProperties(instanceDir.getAbsolutePath(), cd), zkController);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
} }
SolrCore newCore = core.reload(solrLoader, core);
// keep core to orig name link
coreMaps.removeCoreToOrigName(newCore, core);
registerCore(false, name, newCore, false);
} finally {
coreMaps.removeFromPendingOps(name);
} }
SolrCore newCore = core.reload(solrLoader, core);
// keep core to orig name link
coreMaps.removeCoreToOrigName(newCore, core);
registerCore(false, name, newCore, false);
// :TODO: Java7... // :TODO: Java7...
// http://docs.oracle.com/javase/7/docs/technotes/guides/language/catch-multiple.html // http://docs.oracle.com/javase/7/docs/technotes/guides/language/catch-multiple.html
} catch (Exception ex) { } catch (Exception ex) {
@ -1137,7 +1137,10 @@ public class CoreContainer
// Do this in two phases since we don't want to lock access to the cores over a load. // Do this in two phases since we don't want to lock access to the cores over a load.
SolrCore core = coreMaps.getCoreFromAnyList(name); SolrCore core = coreMaps.getCoreFromAnyList(name);
if (core != null) return core; if (core != null) {
core.open();
return core;
}
// OK, it's not presently in any list, is it in the list of dynamic cores but not loaded yet? If so, load it. // OK, it's not presently in any list, is it in the list of dynamic cores but not loaded yet? If so, load it.
CoreDescriptor desc = coreMaps.getDynamicDescriptor(name); CoreDescriptor desc = coreMaps.getDynamicDescriptor(name);
@ -1145,22 +1148,25 @@ public class CoreContainer
return null; return null;
} }
core = coreMaps.waitPendingCoreOps(name); // This will put an entry in pending core ops if the core isn't loaded // This will put an entry in pending core ops if the core isn't loaded
core = coreMaps.waitAddPendingCoreOps(name);
if (isShutDown) return null; // We're quitting, so stop. This needs to be after the wait above since we may come off if (isShutDown) return null; // We're quitting, so stop. This needs to be after the wait above since we may come off
// the wait as a consequence of shutting down. // the wait as a consequence of shutting down.
try {
if (core == null) { if (core == null) {
try {
core = create(desc); // This should throw an error if it fails. core = create(desc); // This should throw an error if it fails.
core.open(); core.open();
registerCore(desc.isTransient(), name, core, false); registerCore(desc.isTransient(), name, core, false);
} catch (Exception ex) { } else {
throw recordAndThrow(name, "Unable to create core: " + name, ex); core.open();
} finally {
coreMaps.releasePending(name);
} }
} catch(Exception ex){
throw recordAndThrow(name, "Unable to create core: " + name, ex);
} finally {
coreMaps.removeFromPendingOps(name);
} }
return core; return core;
} }
@ -1371,6 +1377,7 @@ public class CoreContainer
// dynamicDescriptors // dynamicDescriptors
// //
class CoreMaps { class CoreMaps {
private static Object locker = new Object(); // for locking around manipulating any of the core maps. private static Object locker = new Object(); // for locking around manipulating any of the core maps.
@ -1387,11 +1394,13 @@ class CoreMaps {
private final CoreContainer container; private final CoreContainer container;
// It's a little clumsy to have two, but closing requires a SolrCore, whereas pending loads don't have a core. // This map will hold objects that are being currently operated on. The core (value) may be null in the case of
private static final Set<String> pendingDynamicLoads = new TreeSet<String>(); // initial load. The rule is, never to any operation on a core that is currently being operated upon.
private static final Set<String> pendingCoreOps = new HashSet<String>();
// Holds cores from the time they're removed from the transient cache until after they're closed. // Due to the fact that closes happen potentially whenever anything is _added_ to the transient core list, we need
private static final List<SolrCore> pendingDynamicCloses = new ArrayList<SolrCore>(); // to essentially queue them up to be handled via pendingCoreOps.
private static final List<SolrCore> pendingCloses = new ArrayList<SolrCore>();
CoreMaps(CoreContainer container) { CoreMaps(CoreContainer container) {
this.container = container; this.container = container;
@ -1408,11 +1417,8 @@ class CoreMaps {
protected boolean removeEldestEntry(Map.Entry<String, SolrCore> eldest) { protected boolean removeEldestEntry(Map.Entry<String, SolrCore> eldest) {
if (size() > transientCacheSize) { if (size() > transientCacheSize) {
synchronized (locker) { synchronized (locker) {
SolrCore closeMe = eldest.getValue(); pendingCloses.add(eldest.getValue()); // Essentially just queue this core up for closing.
synchronized (locker) { locker.notifyAll(); // Wakes up closer thread too
pendingDynamicCloses.add(closeMe);
locker.notifyAll(); // Wakes up closer thread too
}
} }
return true; return true;
} }
@ -1433,11 +1439,11 @@ class CoreMaps {
protected void clearMaps(ConfigSolr cfg) { protected void clearMaps(ConfigSolr cfg) {
List<String> coreNames; List<String> coreNames;
List<String> transientNames; List<String> transientNames;
List<SolrCore> pendingClosers; List<SolrCore> pendingToClose;
synchronized (locker) { synchronized (locker) {
coreNames = new ArrayList(cores.keySet()); coreNames = new ArrayList(cores.keySet());
transientNames = new ArrayList(transientCores.keySet()); transientNames = new ArrayList(transientCores.keySet());
pendingClosers = new ArrayList(pendingDynamicCloses); pendingToClose = new ArrayList(pendingCloses);
} }
for (String coreName : coreNames) { for (String coreName : coreNames) {
SolrCore core = cores.get(coreName); SolrCore core = cores.get(coreName);
@ -1466,8 +1472,12 @@ class CoreMaps {
transientCores.clear(); transientCores.clear();
// We might have some cores that we were _thinking_ about shutting down, so take care of those too. // We might have some cores that we were _thinking_ about shutting down, so take care of those too.
for (SolrCore core : pendingClosers) { for (SolrCore core : pendingToClose) {
core.close(); try {
core.close();
} catch (Throwable t) {
SolrException.log(CoreContainer.log, "Error shutting down core", t);
}
} }
} }
@ -1610,10 +1620,9 @@ class CoreMaps {
protected SolrCore getCoreFromAnyList(String name) { protected SolrCore getCoreFromAnyList(String name) {
SolrCore core; SolrCore core;
synchronized (locker) { // This one's OK, the core.open is just an increment synchronized (locker) {
core = cores.get(name); core = cores.get(name);
if (core != null) { if (core != null) {
core.open(); // increment the ref count while still synchronized
return core; return core;
} }
@ -1621,15 +1630,8 @@ class CoreMaps {
return null; // Nobody even tried to define any transient cores, so we're done. return null; // Nobody even tried to define any transient cores, so we're done.
} }
// Now look for already loaded transient cores. // Now look for already loaded transient cores.
core = transientCores.get(name); return transientCores.get(name);
if (core != null) {
core.open(); // Just increments ref count, so it's ok that we're in a synch block
return core;
}
} }
return null;
} }
protected CoreDescriptor getDynamicDescriptor(String name) { protected CoreDescriptor getDynamicDescriptor(String name) {
@ -1719,29 +1721,22 @@ class CoreMaps {
} }
} }
} }
// We get here when we're being loaded, and the presumption is that we're not in the list yet. // Wait here until any pending operations (load, unload or reload) are completed on this core.
protected SolrCore waitPendingCoreOps(String name) { protected SolrCore waitAddPendingCoreOps(String name) {
// Keep multiple threads from opening or closing a core at one time.
SolrCore ret = null;
// Keep multiple threads from operating on a core at one time.
synchronized (locker) { synchronized (locker) {
boolean pending; boolean pending;
do { // We're either loading or unloading this core, do { // Are we currently doing anything to this core? Loading, unloading, reloading?
pending = pendingDynamicLoads.contains(name); // wait for the core to be loaded pending = pendingCoreOps.contains(name); // wait for the core to be done being operated upon
if (! pending) { if (! pending) { // Linear list, but shouldn't be too long
// Check pending closes. This is a linear search is inefficient, but maps don't work without a lot of complexity, for (SolrCore core : pendingCloses) {
// we'll live with it unless it proves to be a bottleneck. In the "usual" case, this list shouldn't be
// very long. In the stress test associated with SOLR-4196, this hovered around 0-3, occasionally spiking
// very briefly to around 30.
for (SolrCore core : pendingDynamicCloses) {
if (core.getName().equals(name)) { if (core.getName().equals(name)) {
pending = true; pending = true;
break; break;
} }
} }
} }
if (container.isShutDown()) return null; // Just stop already. if (container.isShutDown()) return null; // Just stop already.
if (pending) { if (pending) {
@ -1752,26 +1747,29 @@ class CoreMaps {
} }
} }
} while (pending); } while (pending);
// We _really_ need to do this within the synchronized block!
if (!container.isShutDown()) { if (! container.isShutDown()) {
ret = getCoreFromAnyList(name); // we might have been _unloading_ the core, so check. if (! pendingCoreOps.add(name)) {
if (ret == null) { CoreContainer.log.warn("Replaced an entry in pendingCoreOps {}, we should not be doing this", name);
pendingDynamicLoads.add(name); // the caller is going to load us. If we happen to be shutting down, we don't care.
} }
return getCoreFromAnyList(name); // we might have been _unloading_ the core, so return the core if it was loaded.
} }
} }
return null;
return ret;
} }
// The core is loaded, remove it from the pendin gloads // We should always be removing the first thing in the list with our name! The idea here is to NOT do anything n
protected void releasePending(String name) { // any core while some other operation is working on that core.
protected void removeFromPendingOps(String name) {
synchronized (locker) { synchronized (locker) {
pendingDynamicLoads.remove(name); if (! pendingCoreOps.remove(name)) {
CoreContainer.log.warn("Tried to remove core {} from pendingCoreOps and it wasn't there. ", name);
}
locker.notifyAll(); locker.notifyAll();
} }
} }
protected void persistCores(ConfigSolr cfg, Map<String, SolrCore> whichCores, SolrResourceLoader loader) { protected void persistCores(ConfigSolr cfg, Map<String, SolrCore> whichCores, SolrResourceLoader loader) {
for (SolrCore solrCore : whichCores.values()) { for (SolrCore solrCore : whichCores.values()) {
addPersistOneCore(cfg, solrCore, loader); addPersistOneCore(cfg, solrCore, loader);
@ -1880,13 +1878,14 @@ class CoreMaps {
// Be a little careful. We don't want to either open or close a core unless it's _not_ being opened or closed by // Be a little careful. We don't want to either open or close a core unless it's _not_ being opened or closed by
// another thread. So within this lock we'll walk along the list of pending closes until we find something NOT in // another thread. So within this lock we'll walk along the list of pending closes until we find something NOT in
// the list of threads currently being opened. The "usual" case will probably return the very first one anyway.. // the list of threads currently being loaded or reloaded. The "usual" case will probably return the very first
// one anyway..
protected SolrCore getCoreToClose() { protected SolrCore getCoreToClose() {
synchronized (locker) { synchronized (locker) {
if (pendingDynamicCloses.size() == 0) return null; // nothing to do. for (SolrCore core : pendingCloses) {
// Yes, a linear search but this is a pretty short list in the normal case and usually we'll take the first one. if (! pendingCoreOps.contains(core.getName())) {
for (SolrCore core : pendingDynamicCloses) { pendingCoreOps.add(core.getName());
if (! pendingDynamicLoads.contains(core.getName())) { // Don't try close a core if it's being opened. pendingCloses.remove(core);
return core; return core;
} }
} }
@ -1894,12 +1893,7 @@ class CoreMaps {
return null; return null;
} }
protected void removeClosedFromCloser(SolrCore core) {
synchronized (locker) {
pendingDynamicCloses.remove(core);
locker.notifyAll();
}
}
} }
class CloserThread extends Thread { class CloserThread extends Thread {
@ -1936,7 +1930,7 @@ class CloserThread extends Thread {
coreMaps.addPersistOneCore(cfg, removeMe, container.loader); coreMaps.addPersistOneCore(cfg, removeMe, container.loader);
removeMe.close(); removeMe.close();
} finally { } finally {
coreMaps.removeClosedFromCloser(removeMe); coreMaps.removeFromPendingOps(removeMe.getName());
} }
} }
} }

View File

@ -32,6 +32,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.SolrTestCaseJ4;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.xml.sax.SAXException; import org.xml.sax.SAXException;
@ -42,24 +43,28 @@ public class TestCoreContainer extends SolrTestCaseJ4 {
initCore("solrconfig.xml", "schema.xml"); initCore("solrconfig.xml", "schema.xml");
} }
private File solrHomeDirectory;
public void testShareSchema() throws IOException, ParserConfigurationException, SAXException { private CoreContainer init(String dirName) throws Exception {
final File solrHomeDirectory = new File(TEMP_DIR, this.getClass().getName() solrHomeDirectory = new File(TEMP_DIR, this.getClass().getName() + dirName);
+ "_shareSchema");
if (solrHomeDirectory.exists()) { if (solrHomeDirectory.exists()) {
FileUtils.deleteDirectory(solrHomeDirectory); FileUtils.deleteDirectory(solrHomeDirectory);
} }
assertTrue("Failed to mkdirs workDir", solrHomeDirectory.mkdirs()); assertTrue("Failed to mkdirs workDir", solrHomeDirectory.mkdirs());
FileUtils.copyDirectory(new File(SolrTestCaseJ4.TEST_HOME()), solrHomeDirectory);
File fconf = new File(solrHomeDirectory, "solr.xml");
final CoreContainer cores = new CoreContainer(solrHomeDirectory.getAbsolutePath()); FileUtils.copyDirectory(new File(SolrTestCaseJ4.TEST_HOME()), solrHomeDirectory);
CoreContainer ret = new CoreContainer(solrHomeDirectory.getAbsolutePath());
ret.load(solrHomeDirectory.getAbsolutePath(), new File(solrHomeDirectory, "solr.xml"));
return ret;
}
@Test
public void testShareSchema() throws Exception {
System.setProperty("shareSchema", "true"); System.setProperty("shareSchema", "true");
cores.load(solrHomeDirectory.getAbsolutePath(), fconf); final CoreContainer cores = init("_shareSchema");
try { try {
cores.setPersistent(false); cores.setPersistent(false);
assertTrue(cores.isShareSchema()); assertTrue(cores.isShareSchema());
@ -79,31 +84,47 @@ public class TestCoreContainer extends SolrTestCaseJ4 {
System.clearProperty("shareSchema"); System.clearProperty("shareSchema");
} }
} }
@Test @Test
public void testReload() throws Exception { public void testReloadSequential() throws Exception {
final CoreContainer cc = h.getCoreContainer(); final CoreContainer cc = init("_reloadSequential");
try {
class TestThread extends Thread { cc.reload("collection1");
@Override cc.reload("collection1");
public void run() { cc.reload("collection1");
cc.reload("collection1"); cc.reload("collection1");
} finally {
cc.shutdown();
}
}
@Test
public void testReloadThreaded() throws Exception {
final CoreContainer cc = init("_reloadThreaded");
class TestThread extends Thread {
@Override
public void run() {
cc.reload("collection1");
}
} }
List<Thread> threads = new ArrayList<Thread>();
int numThreads = 4;
for (int i = 0; i < numThreads; i++) {
threads.add(new TestThread());
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
} }
List<Thread> threads = new ArrayList<Thread>(); cc.shutdown();
int numThreads = 4;
for (int i = 0; i < numThreads; i++) {
threads.add(new TestThread());
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
} }
@ -117,6 +138,7 @@ public class TestCoreContainer extends SolrTestCaseJ4 {
assertTrue("Failed to mkdirs workDir", workDir.mkdirs()); assertTrue("Failed to mkdirs workDir", workDir.mkdirs());
final CoreContainer cores = h.getCoreContainer(); final CoreContainer cores = h.getCoreContainer();
cores.setPersistent(true); // is this needed since we make explicit calls? cores.setPersistent(true); // is this needed since we make explicit calls?
String instDir = null; String instDir = null;
@ -261,7 +283,7 @@ public class TestCoreContainer extends SolrTestCaseJ4 {
fail("CoreContainer not created" + e.getMessage()); fail("CoreContainer not created" + e.getMessage());
} }
try { try {
//assert cero cores //assert zero cores
assertEquals("There should not be cores", 0, cores.getCores().size()); assertEquals("There should not be cores", 0, cores.getCores().size());
FileUtils.copyDirectory(new File(SolrTestCaseJ4.TEST_HOME(), "collection1"), solrHomeDirectory); FileUtils.copyDirectory(new File(SolrTestCaseJ4.TEST_HOME(), "collection1"), solrHomeDirectory);