test and start of node stress tests
This commit is contained in:
parent
8d3347bb5c
commit
eef6c02bf7
|
@ -100,6 +100,7 @@
|
||||||
<w>uptime</w>
|
<w>uptime</w>
|
||||||
<w>uuid</w>
|
<w>uuid</w>
|
||||||
<w>versioned</w>
|
<w>versioned</w>
|
||||||
|
<w>warmup</w>
|
||||||
<w>wildcards</w>
|
<w>wildcards</w>
|
||||||
<w>xcontent</w>
|
<w>xcontent</w>
|
||||||
<w>xson</w>
|
<w>xson</w>
|
||||||
|
|
|
@ -0,0 +1,268 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.benchmark.stress;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.client.Requests;
|
||||||
|
import org.elasticsearch.index.query.xcontent.XContentQueryBuilder;
|
||||||
|
import org.elasticsearch.node.Node;
|
||||||
|
import org.elasticsearch.util.StopWatch;
|
||||||
|
import org.elasticsearch.util.TimeValue;
|
||||||
|
import org.elasticsearch.util.settings.Settings;
|
||||||
|
import org.elasticsearch.util.xcontent.XContentFactory;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import static org.elasticsearch.client.Requests.*;
|
||||||
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
|
||||||
|
import static org.elasticsearch.index.query.xcontent.FilterBuilders.*;
|
||||||
|
import static org.elasticsearch.index.query.xcontent.QueryBuilders.*;
|
||||||
|
import static org.elasticsearch.node.NodeBuilder.*;
|
||||||
|
import static org.elasticsearch.search.builder.SearchSourceBuilder.*;
|
||||||
|
import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
|
||||||
|
import static org.elasticsearch.util.settings.ImmutableSettings.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author kimchy (shay.banon)
|
||||||
|
*/
|
||||||
|
public class NodesStressTest {
|
||||||
|
|
||||||
|
private Node[] nodes;
|
||||||
|
|
||||||
|
private int numberOfNodes = 2;
|
||||||
|
|
||||||
|
private Client[] clients;
|
||||||
|
|
||||||
|
private AtomicLong idGenerator = new AtomicLong();
|
||||||
|
|
||||||
|
private int fieldNumLimit = 50;
|
||||||
|
|
||||||
|
private long searcherIterations = 10;
|
||||||
|
private Searcher[] searcherThreads = new Searcher[1];
|
||||||
|
|
||||||
|
private long indexIterations = 10;
|
||||||
|
private Indexer[] indexThreads = new Indexer[1];
|
||||||
|
|
||||||
|
private TimeValue sleepAfterDone = TimeValue.timeValueMillis(0);
|
||||||
|
|
||||||
|
private CountDownLatch latch;
|
||||||
|
private CyclicBarrier barrier1;
|
||||||
|
private CyclicBarrier barrier2;
|
||||||
|
|
||||||
|
public NodesStressTest() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodesStressTest numberOfNodes(int numberOfNodes) {
|
||||||
|
this.numberOfNodes = numberOfNodes;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodesStressTest fieldNumLimit(int fieldNumLimit) {
|
||||||
|
this.fieldNumLimit = fieldNumLimit;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodesStressTest searchIterations(int searchIterations) {
|
||||||
|
this.searcherIterations = searchIterations;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodesStressTest searcherThreads(int numberOfSearcherThreads) {
|
||||||
|
searcherThreads = new Searcher[numberOfSearcherThreads];
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodesStressTest indexIterations(long indexIterations) {
|
||||||
|
this.indexIterations = indexIterations;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodesStressTest indexThreads(int numberOfWriterThreads) {
|
||||||
|
indexThreads = new Indexer[numberOfWriterThreads];
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodesStressTest sleepAfterDone(TimeValue time) {
|
||||||
|
this.sleepAfterDone = time;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodesStressTest build(Settings settings) throws Exception {
|
||||||
|
settings = settingsBuilder()
|
||||||
|
// .put("index.engine.robin.refreshInterval", 1, TimeUnit.SECONDS)
|
||||||
|
.put(SETTING_NUMBER_OF_SHARDS, 5)
|
||||||
|
.put(SETTING_NUMBER_OF_REPLICAS, 1)
|
||||||
|
.put(settings)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
nodes = new Node[numberOfNodes];
|
||||||
|
clients = new Client[numberOfNodes];
|
||||||
|
for (int i = 0; i < numberOfNodes; i++) {
|
||||||
|
nodes[i] = nodeBuilder().settings(settingsBuilder().put(settings).put("name", "node" + i)).node();
|
||||||
|
clients[i] = nodes[i].client();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < searcherThreads.length; i++) {
|
||||||
|
searcherThreads[i] = new Searcher(i);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < indexThreads.length; i++) {
|
||||||
|
indexThreads[i] = new Indexer(i);
|
||||||
|
}
|
||||||
|
|
||||||
|
latch = new CountDownLatch(1);
|
||||||
|
barrier1 = new CyclicBarrier(2);
|
||||||
|
barrier2 = new CyclicBarrier(2);
|
||||||
|
// warmup
|
||||||
|
StopWatch stopWatch = new StopWatch().start();
|
||||||
|
Indexer warmup = new Indexer(-1).max(10000);
|
||||||
|
warmup.start();
|
||||||
|
barrier1.await();
|
||||||
|
barrier2.await();
|
||||||
|
latch.await();
|
||||||
|
stopWatch.stop();
|
||||||
|
System.out.println("Done Warmup, took [" + stopWatch.totalTime() + "]");
|
||||||
|
|
||||||
|
latch = new CountDownLatch(searcherThreads.length + indexThreads.length);
|
||||||
|
barrier1 = new CyclicBarrier(searcherThreads.length + indexThreads.length + 1);
|
||||||
|
barrier2 = new CyclicBarrier(searcherThreads.length + indexThreads.length + 1);
|
||||||
|
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() throws Exception {
|
||||||
|
for (Thread t : searcherThreads) {
|
||||||
|
t.start();
|
||||||
|
}
|
||||||
|
for (Thread t : indexThreads) {
|
||||||
|
t.start();
|
||||||
|
}
|
||||||
|
barrier1.await();
|
||||||
|
|
||||||
|
StopWatch stopWatch = new StopWatch();
|
||||||
|
stopWatch.start();
|
||||||
|
|
||||||
|
barrier2.await();
|
||||||
|
|
||||||
|
latch.await();
|
||||||
|
stopWatch.stop();
|
||||||
|
|
||||||
|
for (Client client : clients) {
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
for (Node node : nodes) {
|
||||||
|
node.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println("Done, took [" + stopWatch.totalTime() + "]");
|
||||||
|
Thread.sleep(sleepAfterDone.millis());
|
||||||
|
}
|
||||||
|
|
||||||
|
class Searcher extends Thread {
|
||||||
|
final int id;
|
||||||
|
long counter = 0;
|
||||||
|
long max = searcherIterations;
|
||||||
|
|
||||||
|
Searcher(int id) {
|
||||||
|
super("Searcher" + id);
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void run() {
|
||||||
|
try {
|
||||||
|
barrier1.await();
|
||||||
|
barrier2.await();
|
||||||
|
for (; counter < max; counter++) {
|
||||||
|
Client client = client(counter);
|
||||||
|
XContentQueryBuilder query = termQuery("num", counter % fieldNumLimit);
|
||||||
|
query = constantScoreQuery(queryFilter(query));
|
||||||
|
|
||||||
|
SearchResponse search = client.search(searchRequest()
|
||||||
|
.source(searchSource().query(query)))
|
||||||
|
.actionGet();
|
||||||
|
// System.out.println("Got search response, hits [" + search.hits().totalHits() + "]");
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("Failed to search:");
|
||||||
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class Indexer extends Thread {
|
||||||
|
|
||||||
|
final int id;
|
||||||
|
long counter = 0;
|
||||||
|
long max = indexIterations;
|
||||||
|
|
||||||
|
Indexer(int id) {
|
||||||
|
super("Indexer" + id);
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
Indexer max(int max) {
|
||||||
|
this.max = max;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void run() {
|
||||||
|
try {
|
||||||
|
barrier1.await();
|
||||||
|
barrier2.await();
|
||||||
|
for (; counter < max; counter++) {
|
||||||
|
Client client = client(counter);
|
||||||
|
long id = idGenerator.incrementAndGet();
|
||||||
|
client.index(Requests.indexRequest().index("test").type("type1").id(Long.toString(id))
|
||||||
|
.source(XContentFactory.jsonBuilder().startObject()
|
||||||
|
.field("num", id % fieldNumLimit)
|
||||||
|
.endObject()))
|
||||||
|
.actionGet();
|
||||||
|
}
|
||||||
|
System.out.println("Indexer [" + id + "]: Done");
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("Failed to index:");
|
||||||
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Client client(long i) {
|
||||||
|
return clients[((int) (i % clients.length))];
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
NodesStressTest test = new NodesStressTest()
|
||||||
|
.numberOfNodes(2)
|
||||||
|
.indexThreads(5)
|
||||||
|
.indexIterations(10 * 1000)
|
||||||
|
.searcherThreads(5)
|
||||||
|
.searchIterations(10 * 1000)
|
||||||
|
.sleepAfterDone(TimeValue.timeValueMinutes(10))
|
||||||
|
.build(EMPTY_SETTINGS);
|
||||||
|
|
||||||
|
test.start();
|
||||||
|
}
|
||||||
|
}
|
|
@ -27,11 +27,13 @@ import org.apache.lucene.search.TopDocs;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.RAMDirectory;
|
import org.apache.lucene.store.RAMDirectory;
|
||||||
import org.apache.lucene.util.NumericUtils;
|
import org.apache.lucene.util.NumericUtils;
|
||||||
|
import org.elasticsearch.util.collect.Lists;
|
||||||
import org.elasticsearch.util.lucene.Lucene;
|
import org.elasticsearch.util.lucene.Lucene;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import static org.elasticsearch.util.lucene.DocumentBuilder.*;
|
import static org.elasticsearch.util.lucene.DocumentBuilder.*;
|
||||||
import static org.hamcrest.MatcherAssert.*;
|
import static org.hamcrest.MatcherAssert.*;
|
||||||
|
@ -121,6 +123,35 @@ public class SimpleLuceneTests {
|
||||||
indexWriter.close();
|
indexWriter.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test public void testNRT() throws Exception {
|
||||||
|
Directory dir = new RAMDirectory();
|
||||||
|
IndexWriter indexWriter = new IndexWriter(dir, Lucene.STANDARD_ANALYZER, true, IndexWriter.MaxFieldLength.UNLIMITED);
|
||||||
|
IndexReader reader = indexWriter.getReader();
|
||||||
|
|
||||||
|
List<IndexReader> readers = Lists.newArrayList();
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
readers.add(reader);
|
||||||
|
indexWriter.addDocument(doc()
|
||||||
|
.add(field("id", Integer.toString(i)))
|
||||||
|
.boost(i).build());
|
||||||
|
|
||||||
|
reader = refreshReader(reader);
|
||||||
|
}
|
||||||
|
reader.close();
|
||||||
|
|
||||||
|
// verify that all readers are closed
|
||||||
|
for (IndexReader reader1 : readers) {
|
||||||
|
assertThat(reader1.getRefCount(), equalTo(0));
|
||||||
|
if (reader1.getSequentialSubReaders() != null) {
|
||||||
|
for (IndexReader reader2 : reader1.getSequentialSubReaders()) {
|
||||||
|
assertThat(reader2.getRefCount(), equalTo(0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
indexWriter.close();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify doc freqs update with refresh of readers.
|
* Verify doc freqs update with refresh of readers.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue