From 184276154ce68cff172c642a8756af39541c44a1 Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 26 Oct 2010 17:17:12 +0200 Subject: [PATCH] add search stress test --- .../stress/search1/Search1StressTest.java | 323 ++++++++++++++++++ 1 file changed, 323 insertions(+) create mode 100644 modules/test/integration/src/test/java/org/elasticsearch/test/stress/search1/Search1StressTest.java diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/search1/Search1StressTest.java b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/search1/Search1StressTest.java new file mode 100644 index 00000000000..74505623c7d --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/search1/Search1StressTest.java @@ -0,0 +1,323 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.stress.search1; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.client.action.search.SearchRequestBuilder; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.xcontent.QueryBuilders; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortOrder; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; + +/** + * @author kimchy (shay.banon) + */ +public class Search1StressTest { + + private final ESLogger logger = Loggers.getLogger(getClass()); + + + private int numberOfNodes = 4; + + private int indexers = 0; + private TimeValue indexerThrottle = TimeValue.timeValueMillis(100); + private int searchers = 0; + private TimeValue searcherThrottle = TimeValue.timeValueMillis(20); + private int numberOfIndices = 10; + private int numberOfTypes = 4; + private int numberOfValues = 20; + private int numberOfHits = 300; + + private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS; + + private TimeValue period = TimeValue.timeValueMinutes(20); + + private AtomicLong indexCounter = new AtomicLong(); + private AtomicLong searchCounter = new AtomicLong(); + + + private Node client; + + public Search1StressTest setNumberOfNodes(int numberOfNodes) { + this.numberOfNodes = numberOfNodes; + return this; + } + + public Search1StressTest setIndexers(int indexers) { + this.indexers = indexers; + return this; + } + + public Search1StressTest setIndexerThrottle(TimeValue indexerThrottle) { + this.indexerThrottle = indexerThrottle; + return this; + } + + public Search1StressTest setSearchers(int searchers) { + this.searchers = searchers; + return this; + } + + public Search1StressTest setSearcherThrottle(TimeValue searcherThrottle) { + this.searcherThrottle = searcherThrottle; + return this; + } + + public Search1StressTest setNumberOfIndices(int numberOfIndices) { + this.numberOfIndices = numberOfIndices; + return this; + } + + public Search1StressTest setNumberOfTypes(int numberOfTypes) { + this.numberOfTypes = numberOfTypes; + return this; + } + + public Search1StressTest setNumberOfValues(int numberOfValues) { + this.numberOfValues = numberOfValues; + return this; + } + + public Search1StressTest setNumberOfHits(int numberOfHits) { + this.numberOfHits = numberOfHits; + return this; + } + + public Search1StressTest setSettings(Settings settings) { + this.settings = settings; + return this; + } + + public Search1StressTest setPeriod(TimeValue period) { + this.period = period; + return this; + } + + private String nextIndex() { + return "test" + Math.abs(ThreadLocalRandom.current().nextInt()) % numberOfIndices; + } + + private String nextType() { + return "type" + Math.abs(ThreadLocalRandom.current().nextInt()) % numberOfTypes; + } + + private int nextNumValue() { + return Math.abs(ThreadLocalRandom.current().nextInt()) % numberOfValues; + } + + private String nextFieldValue() { + return "value" + Math.abs(ThreadLocalRandom.current().nextInt()) % numberOfValues; + } + + private class Searcher extends Thread { + + volatile boolean close = false; + + volatile boolean closed = false; + + @Override public void run() { + while (true) { + if (close) { + closed = true; + return; + } + try { + String indexName = nextIndex(); + SearchRequestBuilder builder = client.client().prepareSearch(indexName); + if (ThreadLocalRandom.current().nextBoolean()) { + builder.addSort("num", SortOrder.DESC); + } else if (ThreadLocalRandom.current().nextBoolean()) { + // add a _score based sorting, won't do any sorting, just to test... + builder.addSort("_score", SortOrder.DESC); + } + if (ThreadLocalRandom.current().nextBoolean()) { + builder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); + } + int size = Math.abs(ThreadLocalRandom.current().nextInt()) % numberOfHits; + builder.setSize(size); + if (ThreadLocalRandom.current().nextBoolean()) { + // update from + builder.setFrom(size / 2); + } + String value = nextFieldValue(); + builder.setQuery(QueryBuilders.termQuery("field", value)); + searchCounter.incrementAndGet(); + SearchResponse searchResponse = builder.execute().actionGet(); + if (searchResponse.failedShards() > 0) { + logger.warn("failed search " + Arrays.toString(searchResponse.shardFailures())); + } + // verify that all come from the requested index + for (SearchHit hit : searchResponse.hits()) { + if (!hit.shard().index().equals(indexName)) { + logger.warn("got wrong index, asked for [{}], got [{}]", indexName, hit.shard().index()); + } + } + // verify that all has the relevant value + for (SearchHit hit : searchResponse.hits()) { + if (!value.equals(hit.sourceAsMap().get("field"))) { + logger.warn("got wrong field, asked for [{}], got [{}]", value, hit.sourceAsMap().get("field")); + } + } + Thread.sleep(searcherThrottle.millis()); + } catch (Exception e) { + logger.warn("failed to search", e); + } + } + } + } + + private class Indexer extends Thread { + + volatile boolean close = false; + + volatile boolean closed = false; + + @Override public void run() { + while (true) { + if (close) { + closed = true; + return; + } + try { + indexDoc(); + Thread.sleep(indexerThrottle.millis()); + } catch (Exception e) { + logger.warn("failed to index / sleep", e); + } + } + } + } + + private void indexDoc() throws Exception { + XContentBuilder json = XContentFactory.jsonBuilder().startObject() + .field("num", nextNumValue()) + .field("field", nextFieldValue()); + + json.endObject(); + + client.client().prepareIndex(nextIndex(), nextType()) + .setSource(json) + .execute().actionGet(); + indexCounter.incrementAndGet(); + } + + public void run() throws Exception { + Node[] nodes = new Node[numberOfNodes]; + for (int i = 0; i < nodes.length; i++) { + nodes[i] = NodeBuilder.nodeBuilder().settings(settings).node(); + } + client = NodeBuilder.nodeBuilder().settings(settings).client(true).node(); + + for (int i = 0; i < numberOfIndices; i++) { + client.client().admin().indices().prepareCreate("test" + i).execute().actionGet(); + } + + Indexer[] indexerThreads = new Indexer[indexers]; + for (int i = 0; i < indexerThreads.length; i++) { + indexerThreads[i] = new Indexer(); + } + for (int i = 0; i < indexerThreads.length; i++) { + indexerThreads[i].start(); + } + + Thread.sleep(10000); + + Searcher[] searcherThreads = new Searcher[searchers]; + for (int i = 0; i < searcherThreads.length; i++) { + searcherThreads[i] = new Searcher(); + } + for (int i = 0; i < searcherThreads.length; i++) { + searcherThreads[i].start(); + } + + long testStart = System.currentTimeMillis(); + + while (true) { + Thread.sleep(5000); + if ((System.currentTimeMillis() - testStart) > period.millis()) { + break; + } + } + + System.out.println("DONE, closing ....."); + + for (int i = 0; i < searcherThreads.length; i++) { + searcherThreads[i].close = true; + } + + for (int i = 0; i < indexerThreads.length; i++) { + indexerThreads[i].close = true; + } + + Thread.sleep(indexerThrottle.millis() + 10000); + + for (int i = 0; i < searcherThreads.length; i++) { + if (!searcherThreads[i].closed) { + logger.warn("search thread not closed!"); + } + } + for (int i = 0; i < indexerThreads.length; i++) { + if (!indexerThreads[i].closed) { + logger.warn("index thread not closed!"); + } + } + + client.close(); + for (Node node : nodes) { + node.close(); + } + + System.out.println("********** DONE, indexed [" + indexCounter.get() + "], searched [" + searchCounter.get() + "]"); + } + + public static void main(String[] args) throws Exception { + Settings settings = ImmutableSettings.settingsBuilder() + .put("gateway.type", "none") + .build(); + + Search1StressTest test = new Search1StressTest() + .setPeriod(TimeValue.timeValueMinutes(10)) + .setSettings(settings) + .setNumberOfNodes(2) + .setIndexers(2) + .setIndexerThrottle(TimeValue.timeValueMillis(100)) + .setSearchers(10) + .setSearcherThrottle(TimeValue.timeValueMillis(10)) + .setNumberOfIndices(10) + .setNumberOfTypes(5) + .setNumberOfValues(50) + .setNumberOfHits(300); + + test.run(); + } +}