From 9ae9ab955388759d73c6617e501c676ee74f78f5 Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 10 Feb 2011 16:47:43 +0200 Subject: [PATCH] add flush to search 1 stress test --- .../stress/search1/Search1StressTest.java | 126 ++++++++++++++++-- 1 file changed, 112 insertions(+), 14 deletions(-) diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/search1/Search1StressTest.java b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/search1/Search1StressTest.java index 74505623c7d..62b645650a9 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/search1/Search1StressTest.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/search1/Search1StressTest.java @@ -26,11 +26,11 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.query.xcontent.QueryBuilders; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; import org.elasticsearch.search.SearchHit; @@ -39,6 +39,8 @@ import org.elasticsearch.search.sort.SortOrder; import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; +import static org.elasticsearch.index.query.xcontent.QueryBuilders.*; + /** * @author kimchy (shay.banon) */ @@ -50,6 +52,7 @@ public class Search1StressTest { private int numberOfNodes = 4; private int indexers = 0; + private SizeValue preIndexDocs = new SizeValue(0); private TimeValue indexerThrottle = TimeValue.timeValueMillis(100); private int searchers = 0; private TimeValue searcherThrottle = TimeValue.timeValueMillis(20); @@ -57,6 +60,8 @@ public class Search1StressTest { private int numberOfTypes = 4; private int numberOfValues = 20; private int numberOfHits = 300; + private TimeValue flusherThrottle = TimeValue.timeValueMillis(1000); + private TimeValue deleteByQueryThrottle = TimeValue.timeValueMillis(5000); private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS; @@ -73,6 +78,11 @@ public class Search1StressTest { return this; } + public Search1StressTest setPreIndexDocs(SizeValue preIndexDocs) { + this.preIndexDocs = preIndexDocs; + return this; + } + public Search1StressTest setIndexers(int indexers) { this.indexers = indexers; return this; @@ -113,6 +123,16 @@ public class Search1StressTest { return this; } + public Search1StressTest setFlusherThrottle(TimeValue flusherThrottle) { + this.flusherThrottle = flusherThrottle; + return this; + } + + public Search1StressTest setDeleteByQueryThrottle(TimeValue deleteByQueryThrottle) { + this.deleteByQueryThrottle = deleteByQueryThrottle; + return this; + } + public Search1StressTest setSettings(Settings settings) { this.settings = settings; return this; @@ -170,7 +190,7 @@ public class Search1StressTest { builder.setFrom(size / 2); } String value = nextFieldValue(); - builder.setQuery(QueryBuilders.termQuery("field", value)); + builder.setQuery(termQuery("field", value)); searchCounter.incrementAndGet(); SearchResponse searchResponse = builder.execute().actionGet(); if (searchResponse.failedShards() > 0) { @@ -218,6 +238,48 @@ public class Search1StressTest { } } + private class Flusher extends Thread { + volatile boolean close = false; + + volatile boolean closed = false; + + @Override public void run() { + while (true) { + if (close) { + closed = true; + return; + } + try { + client.client().admin().indices().prepareFlush().execute().actionGet(); + Thread.sleep(indexerThrottle.millis()); + } catch (Exception e) { + logger.warn("failed to flush / sleep", e); + } + } + } + } + + private class DeleteByQuery extends Thread { + volatile boolean close = false; + + volatile boolean closed = false; + + @Override public void run() { + while (true) { + if (close) { + closed = true; + return; + } + try { + client.client().prepareDeleteByQuery().setQuery(termQuery("num", nextNumValue())).execute().actionGet(); + Thread.sleep(deleteByQueryThrottle.millis()); + } catch (Exception e) { + logger.warn("failed to delete_by_query", e); + } + } + } + } + private void indexDoc() throws Exception { XContentBuilder json = XContentFactory.jsonBuilder().startObject() .field("num", nextNumValue()) @@ -242,12 +304,18 @@ public class Search1StressTest { client.client().admin().indices().prepareCreate("test" + i).execute().actionGet(); } + logger.info("Pre indexing docs [{}]...", preIndexDocs); + for (long i = 0; i < preIndexDocs.singles(); i++) { + indexDoc(); + } + logger.info("Done pre indexing docs [{}]", preIndexDocs); + Indexer[] indexerThreads = new Indexer[indexers]; for (int i = 0; i < indexerThreads.length; i++) { indexerThreads[i] = new Indexer(); } - for (int i = 0; i < indexerThreads.length; i++) { - indexerThreads[i].start(); + for (Indexer indexerThread : indexerThreads) { + indexerThread.start(); } Thread.sleep(10000); @@ -256,10 +324,23 @@ public class Search1StressTest { for (int i = 0; i < searcherThreads.length; i++) { searcherThreads[i] = new Searcher(); } - for (int i = 0; i < searcherThreads.length; i++) { - searcherThreads[i].start(); + for (Searcher searcherThread : searcherThreads) { + searcherThread.start(); } + Flusher flusher = null; + if (flusherThrottle.millis() > 0) { + flusher = new Flusher(); + flusher.start(); + } + + DeleteByQuery deleteByQuery = null; + if (deleteByQueryThrottle.millis() > 0) { + deleteByQuery = new DeleteByQuery(); + deleteByQuery.start(); + } + + long testStart = System.currentTimeMillis(); while (true) { @@ -271,23 +352,37 @@ public class Search1StressTest { System.out.println("DONE, closing ....."); - for (int i = 0; i < searcherThreads.length; i++) { - searcherThreads[i].close = true; + if (flusher != null) { + flusher.close = true; } - for (int i = 0; i < indexerThreads.length; i++) { - indexerThreads[i].close = true; + if (deleteByQuery != null) { + deleteByQuery.close = true; + } + + for (Searcher searcherThread : searcherThreads) { + searcherThread.close = true; + } + + for (Indexer indexerThread : indexerThreads) { + indexerThread.close = true; } Thread.sleep(indexerThrottle.millis() + 10000); - for (int i = 0; i < searcherThreads.length; i++) { - if (!searcherThreads[i].closed) { + if (flusher != null && !flusher.closed) { + logger.warn("flusher not closed!"); + } + if (deleteByQuery != null && !deleteByQuery.closed) { + logger.warn("deleteByQuery not closed!"); + } + for (Searcher searcherThread : searcherThreads) { + if (!searcherThread.closed) { logger.warn("search thread not closed!"); } } - for (int i = 0; i < indexerThreads.length; i++) { - if (!indexerThreads[i].closed) { + for (Indexer indexerThread : indexerThreads) { + if (!indexerThread.closed) { logger.warn("index thread not closed!"); } } @@ -309,10 +404,13 @@ public class Search1StressTest { .setPeriod(TimeValue.timeValueMinutes(10)) .setSettings(settings) .setNumberOfNodes(2) + .setPreIndexDocs(SizeValue.parseSizeValue("100")) .setIndexers(2) .setIndexerThrottle(TimeValue.timeValueMillis(100)) .setSearchers(10) .setSearcherThrottle(TimeValue.timeValueMillis(10)) + .setDeleteByQueryThrottle(TimeValue.timeValueMillis(-1)) + .setFlusherThrottle(TimeValue.timeValueMillis(1000)) .setNumberOfIndices(10) .setNumberOfTypes(5) .setNumberOfValues(50)