From ac389d7f07b3bbd29fa3d7cd8a00ac64b000b7a5 Mon Sep 17 00:00:00 2001 From: Erick Erickson Date: Sun, 3 Mar 2013 03:15:41 +0000 Subject: [PATCH] SOLR-4401 adding rapidly opening/closing cores to unit tests git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1451997 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/lucene/store/FSDirectory.java | 4 + .../org/apache/solr/core/SolrProperties.java | 2 +- .../src/test-files/solr/conf/core.properties | 19 + .../test-files/solr/solr-stress.properties | 22 + solr/core/src/test-files/solr/solr-stress.xml | 59 ++ .../solr/core/OpenCloseCoreStressTest.java | 533 ++++++++++++++++++ 6 files changed, 638 insertions(+), 1 deletion(-) create mode 100644 solr/core/src/test-files/solr/conf/core.properties create mode 100644 solr/core/src/test-files/solr/solr-stress.properties create mode 100644 solr/core/src/test-files/solr/solr-stress.xml create mode 100644 solr/core/src/test/org/apache/solr/core/OpenCloseCoreStressTest.java diff --git a/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java b/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java index 5b277fd5349..335d044591e 100644 --- a/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java +++ b/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java @@ -145,6 +145,10 @@ public abstract class FSDirectory extends Directory { throw new NoSuchDirectoryException("file '" + directory + "' exists but is not a directory"); setLockFactory(lockFactory); + if (path.getName().indexOf("index") != -1) { + int eoe = 32; + } + } /** Creates an FSDirectory instance, trying to pick the diff --git a/solr/core/src/java/org/apache/solr/core/SolrProperties.java b/solr/core/src/java/org/apache/solr/core/SolrProperties.java index 240fd1362ae..c71261760fd 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrProperties.java +++ b/solr/core/src/java/org/apache/solr/core/SolrProperties.java @@ -452,7 +452,7 @@ public class SolrProperties implements ConfigSolr { if (props.getProperty(CoreDescriptor.CORE_NAME) == null) { // Should default to this directory - props.setProperty(CoreDescriptor.CORE_NAME, file.getName()); + props.setProperty(CoreDescriptor.CORE_NAME, childFile.getName()); } CoreDescriptor desc = new CoreDescriptor(container, props); CoreDescriptorPlus plus = new CoreDescriptorPlus(propFile.getAbsolutePath(), desc, propsOrig); diff --git a/solr/core/src/test-files/solr/conf/core.properties b/solr/core/src/test-files/solr/conf/core.properties new file mode 100644 index 00000000000..65df5e6114f --- /dev/null +++ b/solr/core/src/test-files/solr/conf/core.properties @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +schema=schema-tiny.xml +config=solrconfig-minimal.xml +transient=true +loadOnStartup=false + diff --git a/solr/core/src/test-files/solr/solr-stress.properties b/solr/core/src/test-files/solr/solr-stress.properties new file mode 100644 index 00000000000..b3488d22f64 --- /dev/null +++ b/solr/core/src/test-files/solr/solr-stress.properties @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +persistent=true +cores.defaultCoreName=collection1 +cores.transientCacheSize=10 +cores.adminPath=/admin/cores +host=127.0.0.1 +hostPort=${hostPort:8983} +hostContext=${hostContext:solr} + diff --git a/solr/core/src/test-files/solr/solr-stress.xml b/solr/core/src/test-files/solr/solr-stress.xml new file mode 100644 index 00000000000..6bc1c35e888 --- /dev/null +++ b/solr/core/src/test-files/solr/solr-stress.xml @@ -0,0 +1,59 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ${socketTimeout:120000} + ${connTimeout:15000} + + + + diff --git a/solr/core/src/test/org/apache/solr/core/OpenCloseCoreStressTest.java b/solr/core/src/test/org/apache/solr/core/OpenCloseCoreStressTest.java new file mode 100644 index 00000000000..27de97194bf --- /dev/null +++ b/solr/core/src/test/org/apache/solr/core/OpenCloseCoreStressTest.java @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.core; + +import org.apache.commons.io.FileUtils; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.HttpSolrServer; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.solr.core.SolrCore.verbose; +import static org.junit.Assert.fail; + +/** + * Incorporate the open/close stress tests into junit. + */ +public class OpenCloseCoreStressTest extends SolrTestCaseJ4 { + + private int numCores = 20; + private Map coreCounts; + private List coreNames; + Random random = new Random(System.currentTimeMillis()); + + static final int COMMIT_WITHIN = 5000; + + final int indexingThreads = 15; + final int queryThreads = 15; + + final int resetInterval = 30 * 60; // minutes to report then delete everything + long cumulativeDocs = 0; + + String url; + + JettySolrRunner jetty = null; + + File solrHomeDirectory; + + List indexingServers = new ArrayList(indexingThreads); + List queryServers = new ArrayList(queryThreads); + + static String savedFactory; + + // Keep the indexes from being randomly generated. + @BeforeClass + public static void beforeClass() { + savedFactory = System.getProperty("solr.DirectoryFactory"); + System.setProperty("solr.directoryFactory", "org.apache.solr.core.MockFSDirectoryFactory"); + } + @AfterClass + public static void afterClass() { + if (savedFactory == null) { + System.clearProperty("solr.directoryFactory"); + } else { + System.setProperty("solr.directoryFactory", savedFactory); + } + } + + @Before + public void setupServer() throws Exception { + coreCounts = new TreeMap(); + coreNames = new ArrayList(); + cumulativeDocs = 0; + + solrHomeDirectory = new File(TEMP_DIR, "OpenCloseCoreStressTest_"); + FileUtils.deleteDirectory(solrHomeDirectory); // Insure that a failed test didn't leave something lying around. + + jetty = new JettySolrRunner(solrHomeDirectory.getAbsolutePath(), "/solr", 0); + + } + + @After + public void tearDownServer() throws Exception { + if (jetty != null) jetty.stop(); + FileUtils.deleteDirectory(solrHomeDirectory); + } + + @Test + @Slow + public void test30SecondsOld() throws Exception { + doStress(30, true); + } + + @Test + @Slow + public void test30SecondsNew() throws Exception { + doStress(30, false); + } + + @Test + @Nightly + public void test10MinutesOld() throws Exception { + doStress(300, true); + } + + @Test + @Nightly + public void test10MinutesNew() throws Exception { + doStress(300, false); + } + + @Test + @Weekly + public void test1HourOld() throws Exception { + doStress(1800, true); + } + + @Test + @Weekly + public void test1HourNew() throws Exception { + doStress(1800, false); + } + + + private void getServers() throws Exception { + jetty.start(); + url = "http://127.0.0.1:" + jetty.getLocalPort() + "/solr/"; + + // Mostly to keep annoying logging messages from being sent out all the time. + + for (int idx = 0; idx < indexingThreads; ++idx) { + HttpSolrServer server = new HttpSolrServer(url); + server.setDefaultMaxConnectionsPerHost(25); + server.setConnectionTimeout(30000); + server.setSoTimeout(30000); + indexingServers.add(server); + } + for (int idx = 0; idx < queryThreads; ++idx) { + HttpSolrServer server = new HttpSolrServer(url); + server.setDefaultMaxConnectionsPerHost(25); + server.setConnectionTimeout(30000); + server.setSoTimeout(30000); + queryServers.add(server); + } + + } + + // Unless things go _really_ well, stop after you have the directories set up. + private void doStress(int secondsToRun, boolean oldStyle) throws Exception { + + makeCores(solrHomeDirectory, oldStyle); + + //MUST start the server after the cores are made. + getServers(); + + try { + + verbose("Starting indexing and querying"); + + int secondsRun = 0; + int secondsRemaining = secondsToRun; + do { + + int cycleSeconds = Math.min(resetInterval, secondsRemaining); + verbose(String.format(Locale.ROOT, "\n\n\n\n\nStarting a %,d second cycle, seconds left: %,d. Seconds run so far: %,d.", + cycleSeconds, secondsRemaining, secondsRun)); + + Indexer idxer = new Indexer(this, url, indexingServers, indexingThreads, cycleSeconds); + + Queries queries = new Queries(this, url, queryServers, queryThreads); + + idxer.waitOnThreads(); + + queries.waitOnThreads(); + + secondsRemaining = Math.max(secondsRemaining - resetInterval, 0); + + checkResults(queryServers.get(0), queries, idxer); + + secondsRun += cycleSeconds; + + if (secondsRemaining > 0) { + deleteAllDocuments(queryServers.get(0), queries); + } + } while (secondsRemaining > 0); + + assertTrue("We didn't index any documents, somethings really messsed up", cumulativeDocs > 0); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void makeCores(File home, boolean oldStyle) throws Exception { + File testSrcRoot = new File(SolrTestCaseJ4.TEST_HOME()); + if (oldStyle) { + FileUtils.copyFile(new File(testSrcRoot, "solr-stress.xml"), new File(home, "solr.xml")); + } else { + FileUtils.copyFile(new File(testSrcRoot, "solr-stress.properties"), new File(home, "solr.properties")); + } + + // create directories in groups of 100 until you have enough. + for (int idx = 0; idx < numCores; ++idx) { + String coreName = String.format(Locale.ROOT, "%05d_core", idx); + makeCore(new File(home, coreName), testSrcRoot, coreName); + coreCounts.put(coreName, 0L); + coreNames.add(coreName); + } + } + + private void makeCore(File coreDir, File testSrcRoot, String coreName) throws IOException { + File conf = new File(coreDir, "conf"); + conf.mkdirs(); + + File testConf = new File(testSrcRoot, "collection1/conf"); + + FileUtils.copyFile(new File(testConf, "schema-tiny.xml"), new File(conf, "schema-tiny.xml")); + + FileUtils.copyFile(new File(testConf, "solrconfig-minimal.xml"), new File(conf, "solrconfig-minimal.xml")); + + FileUtils.copyFile(new File(testSrcRoot, "conf/core.properties"), new File(coreDir, "core.properties")); + } + + + void deleteAllDocuments(HttpSolrServer server, Queries queries) { + verbose("Deleting data from last cycle, this may take a few minutes."); + + for (String core : coreNames) { + try { + server.setBaseURL(url + core); + server.deleteByQuery("*:*"); + server.optimize(true, true); // should be close to a no-op. + } catch (Exception e) { + e.printStackTrace(); + } + } + + // We're testing, after all. Let's be really sure things are as we expect. + verbose("Insuring all cores empty"); + long foundDocs = 0; + for (String core : coreNames) { + try { + long found = queries.getCount(server, core); + assertEquals("Cores should be empty", found, 0L); + foundDocs += found; + } catch (Exception e) { + e.printStackTrace(); + } + } + + if (foundDocs > 0) { + verbose("Found docs after purging done, this is bad."); + } + // Reset counters for another go-round + coreCounts.clear(); + for (String core : coreNames) { + coreCounts.put(core, 0L); + } + } + + private void checkResults(HttpSolrServer server, Queries queries, Indexer idxer) throws InterruptedException { + verbose("Checking if indexes have all the documents they should..."); + long totalDocsFound = 0; + for (Map.Entry ent : coreCounts.entrySet()) { + server.setBaseURL(url + ent.getKey()); + try { + server.commit(true, true); + } catch (Exception e) { + fail("Exception when committing core " + ent.getKey() + " " + e.getMessage()); + } + long numFound = queries.getCount(server, ent.getKey()); + totalDocsFound += numFound; + assertEquals(String.format(Locale.ROOT, "Core %s bad!", ent.getKey()), (long) ent.getValue(), numFound); + } + + verbose(String.format(Locale.ROOT, "\n\nDocs indexed (cumulative, all cycles): %,d, total docs: %,d: Cycle stats: updates: %,d: qtimes: %,d", + Indexer.idUnique.get(), totalDocsFound, idxer.getAccumUpdates(), idxer.getAccumQtimes())); + + cumulativeDocs += totalDocsFound; + } + + String getRandomCore() { + return coreNames.get(Math.abs(random.nextInt()) % coreNames.size()); + } + + void incrementCoreCount(String core) { + synchronized (coreCounts) { + coreCounts.put(core, coreCounts.get(core) + 1); + } + } +} + +class Indexer { + static volatile long stopTime; + + static AtomicInteger idUnique = new AtomicInteger(0); + + static AtomicInteger errors = new AtomicInteger(0); + + static AtomicInteger docsThisCycle = new AtomicInteger(0); + + static AtomicLong qTimesAccum = new AtomicLong(0); + + static AtomicInteger updateCounts = new AtomicInteger(0); + + static volatile int lastCount; + static volatile long nextTime; + + ArrayList _threads = new ArrayList(); + + public Indexer(OpenCloseCoreStressTest OCCST, String url, List servers, int numThreads, int secondsToRun) { + stopTime = System.currentTimeMillis() + (secondsToRun * 1000); + nextTime = System.currentTimeMillis() + 60000; + docsThisCycle.set(0); + qTimesAccum.set(0); + updateCounts.set(0); + for (int idx = 0; idx < numThreads; ++idx) { + OneIndexer one = new OneIndexer(OCCST, url, servers.get(idx)); + _threads.add(one); + one.start(); + } + } + + public void waitOnThreads() { + for (Thread thread : _threads) { + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public long getAccumQtimes() { + return qTimesAccum.get(); + } + + public int getAccumUpdates() { + return updateCounts.get(); + } + + synchronized static void progress(int myId, String core) { + if (nextTime - System.currentTimeMillis() <= 0) { + verbose(String.format(Locale.ROOT, " s indexed: [run %,8d] [cycle %,8d] [last minute %,8d] Last core updated: %s. Seconds left in cycle %,4d", + myId, docsThisCycle.get(), myId - lastCount, core, stopTime - (System.currentTimeMillis() / 1000))); + lastCount = myId; + nextTime += (System.currentTimeMillis() / 1000) * 60; + } + } + +} + +class OneIndexer extends Thread { + private final OpenCloseCoreStressTest OCCST; + private final HttpSolrServer server; + private final String baseUrl; + private OpenCloseCoreStressTest OCSST; + + OneIndexer(OpenCloseCoreStressTest OCCST, String url, HttpSolrServer server) { + this.OCCST = OCCST; + this.server = server; + this.baseUrl = url; + } + + @Override + public void run() { + verbose(String.format(Locale.ROOT, "Starting indexing thread: " + getId())); + + String core = OCCST.getRandomCore(); + + while (Indexer.stopTime > System.currentTimeMillis()) { + int myId = Indexer.idUnique.incrementAndGet(); + Indexer.docsThisCycle.incrementAndGet(); + core = OCCST.getRandomCore(); + OCCST.incrementCoreCount(core); + Indexer.progress(myId, core); + for (int idx = 0; idx < 3; ++idx) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", "id" + Integer.toString(myId)); + doc.addField("text", "text " + Integer.toString(myId)); + UpdateRequest update = new UpdateRequest(); + update.add(doc); + + try { + server.setBaseURL(baseUrl + core); + UpdateResponse response = server.add(doc, OpenCloseCoreStressTest.COMMIT_WITHIN); + if (response.getStatus() != 0) { + verbose("Failed to index a document with status " + response.getStatus()); + } else { + Indexer.qTimesAccum.addAndGet(response.getQTime()); + Indexer.updateCounts.incrementAndGet(); + } + server.commit(true, true); + Thread.sleep(100L); // Let's not go crazy here. + break; // try loop. + } catch (Exception e) { + if (e instanceof InterruptedException) return; + Indexer.errors.incrementAndGet(); + if (idx == 2) { + fail("Could not reach server while querying for three tries, quitting " + e.getMessage()); + } else { + verbose("Indexing thread " + Thread.currentThread().getId() + " swallowed one exception " + e.getMessage()); + try { + Thread.sleep(100); + } catch (InterruptedException tex) { + return; + } + } + } + } + } + verbose("Leaving indexing thread " + getId()); + } +} + +class Queries { + static AtomicBoolean _keepon = new AtomicBoolean(true); + + List _threads = new ArrayList(); + static AtomicInteger _errors = new AtomicInteger(0); + static volatile boolean _verbose = false; + + public Queries(OpenCloseCoreStressTest OCCST, String url, List servers, int numThreads) { + for (int idx = 0; idx < numThreads; ++idx) { + Thread one = new OneQuery(OCCST, url, servers.get(idx)); + _threads.add(one); + one.start(); + } + + } + + public void waitOnThreads() { + Queries._keepon.set(false); + for (Thread thread : _threads) { + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + public long getCount(HttpSolrServer server, String core) { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set("qt", "/select"); + params.set("q", "*:*"); + long numFound = 0; + try { + QueryResponse response = server.query(params); + numFound = response.getResults().getNumFound(); + } catch (SolrServerException e) { + e.printStackTrace(); + } + return numFound; + } +} + +class OneQuery extends Thread { + OpenCloseCoreStressTest OCCST; + private final HttpSolrServer server; + private final String baseUrl; + + OneQuery(OpenCloseCoreStressTest OCCST, String url, HttpSolrServer server) { + this.OCCST = OCCST; + this.server = server; + this.baseUrl = url; + } + + @Override + public void run() { + verbose(String.format(Locale.ROOT, "Starting query thread: " + getId())); + String core = OCCST.getRandomCore(); + int repeated = 0; + while (Queries._keepon.get()) { + core = OCCST.getRandomCore(); + for (int idx = 0; idx < 3; ++idx) { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set("qt", "/select"); + params.set("q", "*:*"); + + try { + // sleep between 250ms and 10000 ms + Thread.sleep(100L); // Let's not go crazy here. + server.setBaseURL(baseUrl + core); + QueryResponse response = server.query(params); + long numFound = response.getResults().getNumFound(); + // Perhaps collect some stats here in future. + break; // retry loop + } catch (Exception e) { + if (e instanceof InterruptedException) return; + Queries._errors.incrementAndGet(); + if (idx == 2) { + fail("Could not reach server while indexing for three tries, quitting " + e.getMessage()); + } else { + verbose("Querying thread: " + Thread.currentThread().getId() + " swallowed exception: " + e.getMessage()); + try { + Thread.sleep(250L); + } catch (InterruptedException tex) { + return; + } + } + } + } + } + verbose(String.format(Locale.ROOT, "Leaving query thread: " + getId())); + } + +}