SOLR-4401 adding rapidly opening/closing cores to unit tests

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1451997 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Erick Erickson 2013-03-03 03:15:41 +00:00
parent 43f384e57f
commit ac389d7f07
6 changed files with 638 additions and 1 deletions

View File

@ -145,6 +145,10 @@ public abstract class FSDirectory extends Directory {
throw new NoSuchDirectoryException("file '" + directory + "' exists but is not a directory"); throw new NoSuchDirectoryException("file '" + directory + "' exists but is not a directory");
setLockFactory(lockFactory); setLockFactory(lockFactory);
if (path.getName().indexOf("index") != -1) {
int eoe = 32;
}
} }
/** Creates an FSDirectory instance, trying to pick the /** Creates an FSDirectory instance, trying to pick the

View File

@ -452,7 +452,7 @@ public class SolrProperties implements ConfigSolr {
if (props.getProperty(CoreDescriptor.CORE_NAME) == null) { if (props.getProperty(CoreDescriptor.CORE_NAME) == null) {
// Should default to this directory // Should default to this directory
props.setProperty(CoreDescriptor.CORE_NAME, file.getName()); props.setProperty(CoreDescriptor.CORE_NAME, childFile.getName());
} }
CoreDescriptor desc = new CoreDescriptor(container, props); CoreDescriptor desc = new CoreDescriptor(container, props);
CoreDescriptorPlus plus = new CoreDescriptorPlus(propFile.getAbsolutePath(), desc, propsOrig); CoreDescriptorPlus plus = new CoreDescriptorPlus(propFile.getAbsolutePath(), desc, propsOrig);

View File

@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
schema=schema-tiny.xml
config=solrconfig-minimal.xml
transient=true
loadOnStartup=false

View File

@ -0,0 +1,22 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
persistent=true
cores.defaultCoreName=collection1
cores.transientCacheSize=10
cores.adminPath=/admin/cores
host=127.0.0.1
hostPort=${hostPort:8983}
hostContext=${hostContext:solr}

View File

@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
All (relative) paths are relative to the installation path
persistent: Save changes made via the API to this file
sharedLib: path to a lib directory that will be shared across all cores
-->
<solr persistent="${solr.xml.persist:true}">
<!--
adminPath: RequestHandler path to manage cores.
If 'null' (or absent), cores will not be manageable via request handler
-->
<cores adminPath="/admin/cores" defaultCoreName="collection1" host="127.0.0.1" hostPort="${hostPort:8983}"
hostContext="${hostContext:solr}">
<core name="00000_core" instanceDir="00000_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="false" loadOnStartup="true" />
<core name="00001_core" instanceDir="00001_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00002_core" instanceDir="00002_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00003_core" instanceDir="00003_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00004_core" instanceDir="00004_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00005_core" instanceDir="00005_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00006_core" instanceDir="00006_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00007_core" instanceDir="00007_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00008_core" instanceDir="00008_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00009_core" instanceDir="00009_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00010_core" instanceDir="00010_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00011_core" instanceDir="00011_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00012_core" instanceDir="00012_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00013_core" instanceDir="00013_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00014_core" instanceDir="00014_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00015_core" instanceDir="00015_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00016_core" instanceDir="00016_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00017_core" instanceDir="00017_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00018_core" instanceDir="00018_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<core name="00019_core" instanceDir="00019_core" schema="schema-tiny.xml" config="solrconfig-minimal.xml" transient="true" loadOnStartup="false" />
<shardHandlerFactory name="shardHandlerFactory" class="HttpShardHandlerFactory">
<int name="socketTimeout">${socketTimeout:120000}</int>
<int name="connTimeout">${connTimeout:15000}</int>
</shardHandlerFactory>
</cores>
</solr>

View File

@ -0,0 +1,533 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.core;
import org.apache.commons.io.FileUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.solr.core.SolrCore.verbose;
import static org.junit.Assert.fail;
/**
* Incorporate the open/close stress tests into junit.
*/
public class OpenCloseCoreStressTest extends SolrTestCaseJ4 {
private int numCores = 20;
private Map<String, Long> coreCounts;
private List<String> coreNames;
Random random = new Random(System.currentTimeMillis());
static final int COMMIT_WITHIN = 5000;
final int indexingThreads = 15;
final int queryThreads = 15;
final int resetInterval = 30 * 60; // minutes to report then delete everything
long cumulativeDocs = 0;
String url;
JettySolrRunner jetty = null;
File solrHomeDirectory;
List<HttpSolrServer> indexingServers = new ArrayList<HttpSolrServer>(indexingThreads);
List<HttpSolrServer> queryServers = new ArrayList<HttpSolrServer>(queryThreads);
static String savedFactory;
// Keep the indexes from being randomly generated.
@BeforeClass
public static void beforeClass() {
savedFactory = System.getProperty("solr.DirectoryFactory");
System.setProperty("solr.directoryFactory", "org.apache.solr.core.MockFSDirectoryFactory");
}
@AfterClass
public static void afterClass() {
if (savedFactory == null) {
System.clearProperty("solr.directoryFactory");
} else {
System.setProperty("solr.directoryFactory", savedFactory);
}
}
@Before
public void setupServer() throws Exception {
coreCounts = new TreeMap<String, Long>();
coreNames = new ArrayList<String>();
cumulativeDocs = 0;
solrHomeDirectory = new File(TEMP_DIR, "OpenCloseCoreStressTest_");
FileUtils.deleteDirectory(solrHomeDirectory); // Insure that a failed test didn't leave something lying around.
jetty = new JettySolrRunner(solrHomeDirectory.getAbsolutePath(), "/solr", 0);
}
@After
public void tearDownServer() throws Exception {
if (jetty != null) jetty.stop();
FileUtils.deleteDirectory(solrHomeDirectory);
}
@Test
@Slow
public void test30SecondsOld() throws Exception {
doStress(30, true);
}
@Test
@Slow
public void test30SecondsNew() throws Exception {
doStress(30, false);
}
@Test
@Nightly
public void test10MinutesOld() throws Exception {
doStress(300, true);
}
@Test
@Nightly
public void test10MinutesNew() throws Exception {
doStress(300, false);
}
@Test
@Weekly
public void test1HourOld() throws Exception {
doStress(1800, true);
}
@Test
@Weekly
public void test1HourNew() throws Exception {
doStress(1800, false);
}
private void getServers() throws Exception {
jetty.start();
url = "http://127.0.0.1:" + jetty.getLocalPort() + "/solr/";
// Mostly to keep annoying logging messages from being sent out all the time.
for (int idx = 0; idx < indexingThreads; ++idx) {
HttpSolrServer server = new HttpSolrServer(url);
server.setDefaultMaxConnectionsPerHost(25);
server.setConnectionTimeout(30000);
server.setSoTimeout(30000);
indexingServers.add(server);
}
for (int idx = 0; idx < queryThreads; ++idx) {
HttpSolrServer server = new HttpSolrServer(url);
server.setDefaultMaxConnectionsPerHost(25);
server.setConnectionTimeout(30000);
server.setSoTimeout(30000);
queryServers.add(server);
}
}
// Unless things go _really_ well, stop after you have the directories set up.
private void doStress(int secondsToRun, boolean oldStyle) throws Exception {
makeCores(solrHomeDirectory, oldStyle);
//MUST start the server after the cores are made.
getServers();
try {
verbose("Starting indexing and querying");
int secondsRun = 0;
int secondsRemaining = secondsToRun;
do {
int cycleSeconds = Math.min(resetInterval, secondsRemaining);
verbose(String.format(Locale.ROOT, "\n\n\n\n\nStarting a %,d second cycle, seconds left: %,d. Seconds run so far: %,d.",
cycleSeconds, secondsRemaining, secondsRun));
Indexer idxer = new Indexer(this, url, indexingServers, indexingThreads, cycleSeconds);
Queries queries = new Queries(this, url, queryServers, queryThreads);
idxer.waitOnThreads();
queries.waitOnThreads();
secondsRemaining = Math.max(secondsRemaining - resetInterval, 0);
checkResults(queryServers.get(0), queries, idxer);
secondsRun += cycleSeconds;
if (secondsRemaining > 0) {
deleteAllDocuments(queryServers.get(0), queries);
}
} while (secondsRemaining > 0);
assertTrue("We didn't index any documents, somethings really messsed up", cumulativeDocs > 0);
} catch (Exception e) {
e.printStackTrace();
}
}
private void makeCores(File home, boolean oldStyle) throws Exception {
File testSrcRoot = new File(SolrTestCaseJ4.TEST_HOME());
if (oldStyle) {
FileUtils.copyFile(new File(testSrcRoot, "solr-stress.xml"), new File(home, "solr.xml"));
} else {
FileUtils.copyFile(new File(testSrcRoot, "solr-stress.properties"), new File(home, "solr.properties"));
}
// create directories in groups of 100 until you have enough.
for (int idx = 0; idx < numCores; ++idx) {
String coreName = String.format(Locale.ROOT, "%05d_core", idx);
makeCore(new File(home, coreName), testSrcRoot, coreName);
coreCounts.put(coreName, 0L);
coreNames.add(coreName);
}
}
private void makeCore(File coreDir, File testSrcRoot, String coreName) throws IOException {
File conf = new File(coreDir, "conf");
conf.mkdirs();
File testConf = new File(testSrcRoot, "collection1/conf");
FileUtils.copyFile(new File(testConf, "schema-tiny.xml"), new File(conf, "schema-tiny.xml"));
FileUtils.copyFile(new File(testConf, "solrconfig-minimal.xml"), new File(conf, "solrconfig-minimal.xml"));
FileUtils.copyFile(new File(testSrcRoot, "conf/core.properties"), new File(coreDir, "core.properties"));
}
void deleteAllDocuments(HttpSolrServer server, Queries queries) {
verbose("Deleting data from last cycle, this may take a few minutes.");
for (String core : coreNames) {
try {
server.setBaseURL(url + core);
server.deleteByQuery("*:*");
server.optimize(true, true); // should be close to a no-op.
} catch (Exception e) {
e.printStackTrace();
}
}
// We're testing, after all. Let's be really sure things are as we expect.
verbose("Insuring all cores empty");
long foundDocs = 0;
for (String core : coreNames) {
try {
long found = queries.getCount(server, core);
assertEquals("Cores should be empty", found, 0L);
foundDocs += found;
} catch (Exception e) {
e.printStackTrace();
}
}
if (foundDocs > 0) {
verbose("Found docs after purging done, this is bad.");
}
// Reset counters for another go-round
coreCounts.clear();
for (String core : coreNames) {
coreCounts.put(core, 0L);
}
}
private void checkResults(HttpSolrServer server, Queries queries, Indexer idxer) throws InterruptedException {
verbose("Checking if indexes have all the documents they should...");
long totalDocsFound = 0;
for (Map.Entry<String, Long> ent : coreCounts.entrySet()) {
server.setBaseURL(url + ent.getKey());
try {
server.commit(true, true);
} catch (Exception e) {
fail("Exception when committing core " + ent.getKey() + " " + e.getMessage());
}
long numFound = queries.getCount(server, ent.getKey());
totalDocsFound += numFound;
assertEquals(String.format(Locale.ROOT, "Core %s bad!", ent.getKey()), (long) ent.getValue(), numFound);
}
verbose(String.format(Locale.ROOT, "\n\nDocs indexed (cumulative, all cycles): %,d, total docs: %,d: Cycle stats: updates: %,d: qtimes: %,d",
Indexer.idUnique.get(), totalDocsFound, idxer.getAccumUpdates(), idxer.getAccumQtimes()));
cumulativeDocs += totalDocsFound;
}
String getRandomCore() {
return coreNames.get(Math.abs(random.nextInt()) % coreNames.size());
}
void incrementCoreCount(String core) {
synchronized (coreCounts) {
coreCounts.put(core, coreCounts.get(core) + 1);
}
}
}
class Indexer {
static volatile long stopTime;
static AtomicInteger idUnique = new AtomicInteger(0);
static AtomicInteger errors = new AtomicInteger(0);
static AtomicInteger docsThisCycle = new AtomicInteger(0);
static AtomicLong qTimesAccum = new AtomicLong(0);
static AtomicInteger updateCounts = new AtomicInteger(0);
static volatile int lastCount;
static volatile long nextTime;
ArrayList<OneIndexer> _threads = new ArrayList<OneIndexer>();
public Indexer(OpenCloseCoreStressTest OCCST, String url, List<HttpSolrServer> servers, int numThreads, int secondsToRun) {
stopTime = System.currentTimeMillis() + (secondsToRun * 1000);
nextTime = System.currentTimeMillis() + 60000;
docsThisCycle.set(0);
qTimesAccum.set(0);
updateCounts.set(0);
for (int idx = 0; idx < numThreads; ++idx) {
OneIndexer one = new OneIndexer(OCCST, url, servers.get(idx));
_threads.add(one);
one.start();
}
}
public void waitOnThreads() {
for (Thread thread : _threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public long getAccumQtimes() {
return qTimesAccum.get();
}
public int getAccumUpdates() {
return updateCounts.get();
}
synchronized static void progress(int myId, String core) {
if (nextTime - System.currentTimeMillis() <= 0) {
verbose(String.format(Locale.ROOT, " s indexed: [run %,8d] [cycle %,8d] [last minute %,8d] Last core updated: %s. Seconds left in cycle %,4d",
myId, docsThisCycle.get(), myId - lastCount, core, stopTime - (System.currentTimeMillis() / 1000)));
lastCount = myId;
nextTime += (System.currentTimeMillis() / 1000) * 60;
}
}
}
class OneIndexer extends Thread {
private final OpenCloseCoreStressTest OCCST;
private final HttpSolrServer server;
private final String baseUrl;
private OpenCloseCoreStressTest OCSST;
OneIndexer(OpenCloseCoreStressTest OCCST, String url, HttpSolrServer server) {
this.OCCST = OCCST;
this.server = server;
this.baseUrl = url;
}
@Override
public void run() {
verbose(String.format(Locale.ROOT, "Starting indexing thread: " + getId()));
String core = OCCST.getRandomCore();
while (Indexer.stopTime > System.currentTimeMillis()) {
int myId = Indexer.idUnique.incrementAndGet();
Indexer.docsThisCycle.incrementAndGet();
core = OCCST.getRandomCore();
OCCST.incrementCoreCount(core);
Indexer.progress(myId, core);
for (int idx = 0; idx < 3; ++idx) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "id" + Integer.toString(myId));
doc.addField("text", "text " + Integer.toString(myId));
UpdateRequest update = new UpdateRequest();
update.add(doc);
try {
server.setBaseURL(baseUrl + core);
UpdateResponse response = server.add(doc, OpenCloseCoreStressTest.COMMIT_WITHIN);
if (response.getStatus() != 0) {
verbose("Failed to index a document with status " + response.getStatus());
} else {
Indexer.qTimesAccum.addAndGet(response.getQTime());
Indexer.updateCounts.incrementAndGet();
}
server.commit(true, true);
Thread.sleep(100L); // Let's not go crazy here.
break; // try loop.
} catch (Exception e) {
if (e instanceof InterruptedException) return;
Indexer.errors.incrementAndGet();
if (idx == 2) {
fail("Could not reach server while querying for three tries, quitting " + e.getMessage());
} else {
verbose("Indexing thread " + Thread.currentThread().getId() + " swallowed one exception " + e.getMessage());
try {
Thread.sleep(100);
} catch (InterruptedException tex) {
return;
}
}
}
}
}
verbose("Leaving indexing thread " + getId());
}
}
class Queries {
static AtomicBoolean _keepon = new AtomicBoolean(true);
List<Thread> _threads = new ArrayList<Thread>();
static AtomicInteger _errors = new AtomicInteger(0);
static volatile boolean _verbose = false;
public Queries(OpenCloseCoreStressTest OCCST, String url, List<HttpSolrServer> servers, int numThreads) {
for (int idx = 0; idx < numThreads; ++idx) {
Thread one = new OneQuery(OCCST, url, servers.get(idx));
_threads.add(one);
one.start();
}
}
public void waitOnThreads() {
Queries._keepon.set(false);
for (Thread thread : _threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
public long getCount(HttpSolrServer server, String core) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("qt", "/select");
params.set("q", "*:*");
long numFound = 0;
try {
QueryResponse response = server.query(params);
numFound = response.getResults().getNumFound();
} catch (SolrServerException e) {
e.printStackTrace();
}
return numFound;
}
}
class OneQuery extends Thread {
OpenCloseCoreStressTest OCCST;
private final HttpSolrServer server;
private final String baseUrl;
OneQuery(OpenCloseCoreStressTest OCCST, String url, HttpSolrServer server) {
this.OCCST = OCCST;
this.server = server;
this.baseUrl = url;
}
@Override
public void run() {
verbose(String.format(Locale.ROOT, "Starting query thread: " + getId()));
String core = OCCST.getRandomCore();
int repeated = 0;
while (Queries._keepon.get()) {
core = OCCST.getRandomCore();
for (int idx = 0; idx < 3; ++idx) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("qt", "/select");
params.set("q", "*:*");
try {
// sleep between 250ms and 10000 ms
Thread.sleep(100L); // Let's not go crazy here.
server.setBaseURL(baseUrl + core);
QueryResponse response = server.query(params);
long numFound = response.getResults().getNumFound();
// Perhaps collect some stats here in future.
break; // retry loop
} catch (Exception e) {
if (e instanceof InterruptedException) return;
Queries._errors.incrementAndGet();
if (idx == 2) {
fail("Could not reach server while indexing for three tries, quitting " + e.getMessage());
} else {
verbose("Querying thread: " + Thread.currentThread().getId() + " swallowed exception: " + e.getMessage());
try {
Thread.sleep(250L);
} catch (InterruptedException tex) {
return;
}
}
}
}
}
verbose(String.format(Locale.ROOT, "Leaving query thread: " + getId()));
}
}