improved benchmark

Original commit: elastic/x-pack-elasticsearch@70b2a8a042
This commit is contained in:
Martijn van Groningen 2015-04-02 14:43:15 +02:00
parent 17bc9442e1
commit a778f1978d
1 changed files with 155 additions and 43 deletions

View File

@ -6,6 +6,7 @@
package org.elasticsearch.watcher.test.bench;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.Loggers;
@ -27,65 +28,176 @@ import java.util.List;
import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition;
import static org.elasticsearch.watcher.input.InputBuilders.httpInput;
import static org.elasticsearch.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
/**
* Starts a master only node with watcher and benchmarks it.
* A date node needs to be started outside this benchmark. This the removes non watcher noise like indexing.
*/
public class WatcherBenchmark {
public static void main(String[] args) throws Exception {
Settings settings = ImmutableSettings.builder()
.put("plugins.load_classpath_plugins", false)
.put("plugin.types", WatcherBenchmarkPlugin.class.getName())
.put("cluster.name", WatcherBenchmark.class.getSimpleName())
.put("http.cors.enabled", true)
.build();
InternalNode node = (InternalNode) NodeBuilder.nodeBuilder().settings(settings).data(false).node();
node.client().admin().cluster().prepareHealth().setWaitForGreenStatus().get();
private final static Settings SETTINGS = ImmutableSettings.builder()
.put("plugins.load_classpath_plugins", false)
.put("shield.enabled", false)
.put("plugin.types", WatcherBenchmarkPlugin.class.getName())
.put("cluster.name", WatcherBenchmark.class.getSimpleName())
.put("network.host", "localhost")
.put("script.disable_dynamic", false)
.put("discovery.zen.ping.unicast.hosts", "localhost")
.put("discovery.zen.ping.multicast.enabled", false)
.put("http.cors.enabled", true)
.put("cluster.routing.allocation.disk.threshold_enabled", false)
// .put("recycler.page.limit.heap", "60%")
.build();
private static Client client;
private static WatcherClient watcherClient;
private static ScheduleTriggerEngineMock scheduler;
protected static void start() throws Exception {
InternalNode node = (InternalNode) NodeBuilder.nodeBuilder().settings(SETTINGS).data(false).node();
client = node.client();
client.admin().cluster().prepareHealth("*").setWaitForGreenStatus().get();
Thread.sleep(5000);
WatcherClient watcherClient = node.injector().getInstance(WatcherClient.class);
watcherClient = node.injector().getInstance(WatcherClient.class);
scheduler = node.injector().getInstance(ScheduleTriggerEngineMock.class);
}
int numAlerts = 1000;
for (int i = 0; i < numAlerts; i++) {
final String name = "_name" + i;
PutWatchRequest putAlertRequest = new PutWatchRequest(name, new WatchSourceBuilder()
.trigger(schedule(interval("5s")))
.input(searchInput(new SearchRequest().source(
new SearchSourceBuilder()
)))
.condition(scriptCondition("1 == 1"))
.addAction(indexAction("index", "type")));
putAlertRequest.setName(name);
watcherClient.putWatch(putAlertRequest).actionGet();
}
public static final class SmallSearchInput extends WatcherBenchmark {
int numThreads = 50;
int watchersPerThread = numAlerts / numThreads;
final ScheduleTriggerEngineMock scheduler = node.injector().getInstance(ScheduleTriggerEngineMock.class);
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
final int begin = i * watchersPerThread;
final int end = (i + 1) * watchersPerThread;
Runnable r = new Runnable() {
@Override
public void run() {
while (true) {
for (int j = begin; j < end; j++) {
scheduler.trigger("_name" + j);
public static void main(String[] args) throws Exception {
start();
client.admin().indices().prepareCreate("test").get();
client.prepareIndex("test", "test", "1").setSource("{}").get();
int numAlerts = 1000;
for (int i = 0; i < numAlerts; i++) {
final String name = "_name" + i;
PutWatchRequest putAlertRequest = new PutWatchRequest(name, new WatchSourceBuilder()
.trigger(schedule(interval("5s")))
.input(searchInput(new SearchRequest("test")
.source(new SearchSourceBuilder()))
)
.condition(scriptCondition("ctx.payload.hits.total > 0")));
putAlertRequest.setName(name);
watcherClient.putWatch(putAlertRequest).actionGet();
}
int numThreads = 50;
int watchersPerThread = numAlerts / numThreads;
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
final int begin = i * watchersPerThread;
final int end = (i + 1) * watchersPerThread;
Runnable r = new Runnable() {
@Override
public void run() {
while (true) {
for (int j = begin; j < end; j++) {
scheduler.trigger("_name" + j);
}
}
}
}
};
threads[i] = new Thread(r);
threads[i].start();
};
threads[i] = new Thread(r);
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
}
for (Thread thread : threads) {
thread.join();
}
public static final class BigSearchInput extends WatcherBenchmark {
public static void main(String[] args) throws Exception {
start();
int numAlerts = 1000;
for (int i = 0; i < numAlerts; i++) {
final String name = "_name" + i;
PutWatchRequest putAlertRequest = new PutWatchRequest(name, new WatchSourceBuilder()
.trigger(schedule(interval("5s")))
.input(searchInput(new SearchRequest()
.source(new SearchSourceBuilder()))
.addExtractKey("hits.total")
)
.condition(scriptCondition("1 == 1"))
.addAction(indexAction("index", "type")));
putAlertRequest.setName(name);
watcherClient.putWatch(putAlertRequest).actionGet();
}
int numThreads = 50;
int watchersPerThread = numAlerts / numThreads;
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
final int begin = i * watchersPerThread;
final int end = (i + 1) * watchersPerThread;
Runnable r = new Runnable() {
@Override
public void run() {
while (true) {
for (int j = begin; j < end; j++) {
scheduler.trigger("_name" + j);
}
}
}
};
threads[i] = new Thread(r);
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
}
}
public static final class HttpInput extends WatcherBenchmark {
public static void main(String[] args) throws Exception {
start();
int numAlerts = 1000;
for (int i = 0; i < numAlerts; i++) {
final String name = "_name" + i;
PutWatchRequest putAlertRequest = new PutWatchRequest(name, new WatchSourceBuilder()
.trigger(schedule(interval("5s")))
.input(httpInput().setHost("localhost").setPort(9200))
.condition(scriptCondition("ctx.payload.tagline == \"You Know, for Search\"")));
putAlertRequest.setName(name);
watcherClient.putWatch(putAlertRequest).actionGet();
}
int numThreads = 50;
int watchersPerThread = numAlerts / numThreads;
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
final int begin = i * watchersPerThread;
final int end = (i + 1) * watchersPerThread;
Runnable r = new Runnable() {
@Override
public void run() {
while (true) {
for (int j = begin; j < end; j++) {
scheduler.trigger("_name" + j);
}
}
}
};
threads[i] = new Thread(r);
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
}
}
public static final class WatcherBenchmarkPlugin extends WatcherPlugin {