add flush to search 1 stress test

This commit is contained in:
kimchy 2011-02-10 16:47:43 +02:00
parent 57108c8575
commit 9ae9ab9553
1 changed files with 112 additions and 14 deletions

View File

@ -26,11 +26,11 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.xcontent.QueryBuilders;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.search.SearchHit;
@ -39,6 +39,8 @@ import org.elasticsearch.search.sort.SortOrder;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.index.query.xcontent.QueryBuilders.*;
/**
* @author kimchy (shay.banon)
*/
@ -50,6 +52,7 @@ public class Search1StressTest {
private int numberOfNodes = 4;
private int indexers = 0;
private SizeValue preIndexDocs = new SizeValue(0);
private TimeValue indexerThrottle = TimeValue.timeValueMillis(100);
private int searchers = 0;
private TimeValue searcherThrottle = TimeValue.timeValueMillis(20);
@ -57,6 +60,8 @@ public class Search1StressTest {
private int numberOfTypes = 4;
private int numberOfValues = 20;
private int numberOfHits = 300;
private TimeValue flusherThrottle = TimeValue.timeValueMillis(1000);
private TimeValue deleteByQueryThrottle = TimeValue.timeValueMillis(5000);
private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
@ -73,6 +78,11 @@ public class Search1StressTest {
return this;
}
public Search1StressTest setPreIndexDocs(SizeValue preIndexDocs) {
this.preIndexDocs = preIndexDocs;
return this;
}
public Search1StressTest setIndexers(int indexers) {
this.indexers = indexers;
return this;
@ -113,6 +123,16 @@ public class Search1StressTest {
return this;
}
public Search1StressTest setFlusherThrottle(TimeValue flusherThrottle) {
this.flusherThrottle = flusherThrottle;
return this;
}
public Search1StressTest setDeleteByQueryThrottle(TimeValue deleteByQueryThrottle) {
this.deleteByQueryThrottle = deleteByQueryThrottle;
return this;
}
public Search1StressTest setSettings(Settings settings) {
this.settings = settings;
return this;
@ -170,7 +190,7 @@ public class Search1StressTest {
builder.setFrom(size / 2);
}
String value = nextFieldValue();
builder.setQuery(QueryBuilders.termQuery("field", value));
builder.setQuery(termQuery("field", value));
searchCounter.incrementAndGet();
SearchResponse searchResponse = builder.execute().actionGet();
if (searchResponse.failedShards() > 0) {
@ -218,6 +238,48 @@ public class Search1StressTest {
}
}
private class Flusher extends Thread {
volatile boolean close = false;
volatile boolean closed = false;
@Override public void run() {
while (true) {
if (close) {
closed = true;
return;
}
try {
client.client().admin().indices().prepareFlush().execute().actionGet();
Thread.sleep(indexerThrottle.millis());
} catch (Exception e) {
logger.warn("failed to flush / sleep", e);
}
}
}
}
private class DeleteByQuery extends Thread {
volatile boolean close = false;
volatile boolean closed = false;
@Override public void run() {
while (true) {
if (close) {
closed = true;
return;
}
try {
client.client().prepareDeleteByQuery().setQuery(termQuery("num", nextNumValue())).execute().actionGet();
Thread.sleep(deleteByQueryThrottle.millis());
} catch (Exception e) {
logger.warn("failed to delete_by_query", e);
}
}
}
}
private void indexDoc() throws Exception {
XContentBuilder json = XContentFactory.jsonBuilder().startObject()
.field("num", nextNumValue())
@ -242,12 +304,18 @@ public class Search1StressTest {
client.client().admin().indices().prepareCreate("test" + i).execute().actionGet();
}
logger.info("Pre indexing docs [{}]...", preIndexDocs);
for (long i = 0; i < preIndexDocs.singles(); i++) {
indexDoc();
}
logger.info("Done pre indexing docs [{}]", preIndexDocs);
Indexer[] indexerThreads = new Indexer[indexers];
for (int i = 0; i < indexerThreads.length; i++) {
indexerThreads[i] = new Indexer();
}
for (int i = 0; i < indexerThreads.length; i++) {
indexerThreads[i].start();
for (Indexer indexerThread : indexerThreads) {
indexerThread.start();
}
Thread.sleep(10000);
@ -256,10 +324,23 @@ public class Search1StressTest {
for (int i = 0; i < searcherThreads.length; i++) {
searcherThreads[i] = new Searcher();
}
for (int i = 0; i < searcherThreads.length; i++) {
searcherThreads[i].start();
for (Searcher searcherThread : searcherThreads) {
searcherThread.start();
}
Flusher flusher = null;
if (flusherThrottle.millis() > 0) {
flusher = new Flusher();
flusher.start();
}
DeleteByQuery deleteByQuery = null;
if (deleteByQueryThrottle.millis() > 0) {
deleteByQuery = new DeleteByQuery();
deleteByQuery.start();
}
long testStart = System.currentTimeMillis();
while (true) {
@ -271,23 +352,37 @@ public class Search1StressTest {
System.out.println("DONE, closing .....");
for (int i = 0; i < searcherThreads.length; i++) {
searcherThreads[i].close = true;
if (flusher != null) {
flusher.close = true;
}
for (int i = 0; i < indexerThreads.length; i++) {
indexerThreads[i].close = true;
if (deleteByQuery != null) {
deleteByQuery.close = true;
}
for (Searcher searcherThread : searcherThreads) {
searcherThread.close = true;
}
for (Indexer indexerThread : indexerThreads) {
indexerThread.close = true;
}
Thread.sleep(indexerThrottle.millis() + 10000);
for (int i = 0; i < searcherThreads.length; i++) {
if (!searcherThreads[i].closed) {
if (flusher != null && !flusher.closed) {
logger.warn("flusher not closed!");
}
if (deleteByQuery != null && !deleteByQuery.closed) {
logger.warn("deleteByQuery not closed!");
}
for (Searcher searcherThread : searcherThreads) {
if (!searcherThread.closed) {
logger.warn("search thread not closed!");
}
}
for (int i = 0; i < indexerThreads.length; i++) {
if (!indexerThreads[i].closed) {
for (Indexer indexerThread : indexerThreads) {
if (!indexerThread.closed) {
logger.warn("index thread not closed!");
}
}
@ -309,10 +404,13 @@ public class Search1StressTest {
.setPeriod(TimeValue.timeValueMinutes(10))
.setSettings(settings)
.setNumberOfNodes(2)
.setPreIndexDocs(SizeValue.parseSizeValue("100"))
.setIndexers(2)
.setIndexerThrottle(TimeValue.timeValueMillis(100))
.setSearchers(10)
.setSearcherThrottle(TimeValue.timeValueMillis(10))
.setDeleteByQueryThrottle(TimeValue.timeValueMillis(-1))
.setFlusherThrottle(TimeValue.timeValueMillis(1000))
.setNumberOfIndices(10)
.setNumberOfTypes(5)
.setNumberOfValues(50)