Added test that compares concurrent facet execution results with a serial execution result

This commit is contained in:
Simon Willnauer 2013-04-05 10:36:53 +02:00
parent 5af6343697
commit 9fbe075aec
3 changed files with 297 additions and 29 deletions

View File

@ -48,7 +48,7 @@ public final class HashedScriptAggregator extends HashedAggregator {
this.excluded = excluded;
this.matcher = pattern != null ? pattern.matcher("") : null;
this.script = script;
this.convert = script == null || matcher == null;
this.convert = script != null || matcher != null;
}
@Override
@ -59,16 +59,16 @@ public final class HashedScriptAggregator extends HashedAggregator {
if (convert) {
// only convert if we need to and only once per doc...
UnicodeUtil.UTF8toUTF16(value, spare);
}
if (matcher != null) {
assert value.utf8ToString().equals(spare.toString());
assert convert : "regexp: [convert == false] but should be true";
assert value.utf8ToString().equals(spare.toString()) : "not converted";
if (!matcher.reset(spare).matches()) {
return;
}
}
if (script != null) {
assert value.utf8ToString().equals(spare.toString());
assert convert : "script: [convert == false] but should be true";
assert value.utf8ToString().equals(spare.toString()) : "not converted";
script.setNextDocId(docId);
// LUCENE 4 UPGRADE: needs optimization -- maybe a CharSequence does the job here?
// we only creat that string if we really need
@ -89,6 +89,8 @@ public final class HashedScriptAggregator extends HashedAggregator {
return;
}
}
}
assert convert || (matcher == null && script == null);
super.onValue(docId, value, hashCode, values);
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.search.facet;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
public class ConcurrentDuel<T> {
private final ExecutorService pool;
private final int numExecutorThreads;
public ConcurrentDuel(int numThreads) {
pool = Executors.newFixedThreadPool(numThreads);
this.numExecutorThreads = numThreads;
}
public void close() {
pool.shutdown();
}
public List<T> runDuel(final DuelExecutor<T> executor, int iterations, int numTasks) throws InterruptedException, ExecutionException {
List<T> results = new ArrayList<T>();
T firstRun = executor.run();
results.add(firstRun);
for (int i = 0; i < 3; i++) {
}
final CountDownLatch latch = new CountDownLatch(1);
final AtomicLong count = new AtomicLong(iterations);
List<Future<List<T>>> futures = new ArrayList<Future<List<T>>>();
for (int i = 0; i < numTasks; i++) {
futures.add(pool.submit(new Callable<List<T>>() {
@Override
public List<T> call() throws Exception {
List<T> results = new ArrayList<T>();
latch.await();
while(count.decrementAndGet() >= 0) {
results.add(executor.run());
}
return results;
}
}));
}
latch.countDown();
for (Future<List<T>> future : futures) {
results.addAll(future.get());
}
return results;
}
public void duel(DuelJudge<T> judge, final DuelExecutor<T> executor, int iterations) throws InterruptedException, ExecutionException {
duel(judge, executor, iterations, numExecutorThreads);
}
public void duel(DuelJudge<T> judge, final DuelExecutor<T> executor, int iterations, int threadCount) throws InterruptedException, ExecutionException {
T firstRun = executor.run();
List<T> runDuel = runDuel(executor, iterations, threadCount);
for (T t : runDuel) {
judge.judge(firstRun, t);
}
}
public static interface DuelExecutor<T> {
public T run();
}
public static interface DuelJudge<T> {
public void judge(T firstRun, T result);
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.test.integration.search.facet;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ShardSearchFailure;
@ -29,7 +30,9 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.FacetBuilder;
import org.elasticsearch.search.facet.Facets;
import org.elasticsearch.search.facet.datehistogram.DateHistogramFacet;
import org.elasticsearch.search.facet.filter.FilterFacet;
import org.elasticsearch.search.facet.histogram.HistogramFacet;
@ -37,10 +40,13 @@ import org.elasticsearch.search.facet.query.QueryFacet;
import org.elasticsearch.search.facet.range.RangeFacet;
import org.elasticsearch.search.facet.statistical.StatisticalFacet;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.facet.terms.TermsFacet.Entry;
import org.elasticsearch.search.facet.terms.TermsFacetBuilder;
import org.elasticsearch.search.facet.terms.doubles.InternalDoubleTermsFacet;
import org.elasticsearch.search.facet.terms.longs.InternalLongTermsFacet;
import org.elasticsearch.search.facet.termsstats.TermsStatsFacet;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.hamcrest.Matchers;
import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;
import org.testng.annotations.AfterClass;
@ -48,7 +54,11 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.termFilter;
@ -236,9 +246,170 @@ public class SimpleFacetsTests extends AbstractNodesTests {
assertThat(facet.getTotalCount(), equalTo(100l));
assertThat(facet.getOtherCount(), equalTo(90l));
assertThat(facet.getMissingCount(), equalTo(10l));
}
@Test
public void testConcurrentFacets() throws ElasticSearchException, IOException, InterruptedException, ExecutionException {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test")
.addMapping("type", jsonBuilder().startObject().startObject("type").startObject("properties")
.startObject("byte").field("type", "byte").endObject()
.startObject("short").field("type", "short").endObject()
.startObject("integer").field("type", "integer").endObject()
.startObject("long").field("type", "long").endObject()
.startObject("float").field("type", "float").endObject()
.startObject("double").field("type", "double").endObject()
.endObject().endObject().endObject())
.execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
for (int i = 0; i < 100; i++) {
client.prepareIndex("test", "type", ""+i).setSource(jsonBuilder().startObject()
.field("name", ""+i)
.field("byte", i )
.field("short", i + Byte.MAX_VALUE)
.field("integer", i + Short.MAX_VALUE)
.field("long", i + Integer.MAX_VALUE)
.field("float", (float)i)
.field("double", (double)i)
.endObject()).execute().actionGet();
}
for (int i = 0; i < 10; i++) {
client.prepareIndex("test", "type", ""+(i + 100)).setSource(jsonBuilder().startObject()
.field("foo", ""+i)
.endObject()).execute().actionGet();
}
client.admin().indices().prepareFlush().setRefresh(true).execute().actionGet();
ConcurrentDuel<Facets> duel = new ConcurrentDuel<Facets>(5);
{
final Client cl = client;
duel.duel(new ConcurrentDuel.DuelJudge<Facets>() {
@Override
public void judge(Facets firstRun, Facets result) {
for (Facet f : result) {
TermsFacet facet = (TermsFacet) f;
assertThat(facet.getName(), isIn(new String[] {"short", "double", "byte", "float", "integer", "long", "termFacet"}));
TermsFacet firstRunFacet = (TermsFacet) firstRun.getFacets().get(facet.getName());
assertThat(facet.getEntries().size(), equalTo(firstRunFacet.getEntries().size()));
assertThat(facet.getEntries().size(), equalTo(10));
assertThat(facet.getTotalCount(), equalTo(100l));
assertThat(facet.getOtherCount(), equalTo(90l));
assertThat(facet.getMissingCount(), equalTo(10l));
List<? extends Entry> right = facet.getEntries();
List<? extends Entry> left = firstRunFacet.getEntries();
for (int i = 0; i < facet.getEntries().size(); i++) {
assertThat(left.get(i).getTerm(), equalTo(right.get(i).getTerm()));
assertThat(left.get(i).getCount(), equalTo(right.get(i).getCount()));
}
}
}
}, new ConcurrentDuel.DuelExecutor<Facets>() {
AtomicInteger count = new AtomicInteger();
@Override
public Facets run() {
final SearchRequestBuilder facetRequest;
if (count.incrementAndGet() % 2 == 0) { // every second request is mapped
facetRequest = cl.prepareSearch().setQuery(matchAllQuery())
.addFacet(termsFacet("double").field("double").size(10))
.addFacet(termsFacet("float").field("float").size(10))
.addFacet(termsFacet("integer").field("integer").size(10))
.addFacet(termsFacet("long").field("long").size(10))
.addFacet(termsFacet("short").field("short").size(10))
.addFacet(termsFacet("byte").field("byte").size(10))
.addFacet(termsFacet("termFacet").field("name").size(10));
} else {
facetRequest = cl.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("double").executionHint("map").field("double").size(10))
.addFacet(termsFacet("float").executionHint("map").field("float").size(10))
.addFacet(termsFacet("integer").executionHint("map").field("integer").size(10))
.addFacet(termsFacet("long").executionHint("map").field("long").size(10))
.addFacet(termsFacet("short").executionHint("map").field("short").size(10))
.addFacet(termsFacet("byte").executionHint("map").field("byte").size(10))
.addFacet(termsFacet("termFacet").executionHint("map").field("name").size(10));
}
SearchResponse actionGet = facetRequest.execute().actionGet();
return actionGet.getFacets();
}
}, 5000);
}
{
duel.duel(new ConcurrentDuel.DuelJudge<Facets>() {
@Override
public void judge(Facets firstRun, Facets result) {
for (Facet f : result) {
TermsFacet facet = (TermsFacet) f;
assertThat(facet.getName(), equalTo("termFacet"));
TermsFacet firstRunFacet = (TermsFacet) firstRun.getFacets().get(facet.getName());
assertThat(facet.getEntries().size(), equalTo(firstRunFacet.getEntries().size()));
assertThat(facet.getEntries().size(), equalTo(10));
assertThat(facet.getTotalCount(), equalTo(100l));
assertThat(facet.getOtherCount(), equalTo(90l));
assertThat(facet.getMissingCount(), equalTo(10l));
List<? extends Entry> right = facet.getEntries();
List<? extends Entry> left = firstRunFacet.getEntries();
for (int i = 0; i < facet.getEntries().size(); i++) {
assertThat(left.get(i).getTerm(), equalTo(right.get(i).getTerm()));
assertThat(left.get(i).getCount(), equalTo(right.get(i).getCount()));
}
}
}
}, new ConcurrentDuel.DuelExecutor<Facets>() {
AtomicInteger count = new AtomicInteger();
@Override
public Facets run() {
final SearchRequestBuilder facetRequest;
switch(count.incrementAndGet() % 4) {
case 2:
facetRequest = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("termFacet").executionHint("map").field("name").regex("\\d+").script("term").size(10));
break;
case 1:
facetRequest = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("termFacet").field("name").regex("\\d+").script("term").size(10));
break;
case 0:
facetRequest = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("termFacet").field("name").size(10));
break;
default:
facetRequest = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("termFacet").executionHint("map").field("name").size(10));
break;
}
SearchResponse actionGet = facetRequest.execute().actionGet();
return actionGet.getFacets();
}
}, 5000);
}
duel.close();
}
@Test
public void testSearchFilter() throws Exception {
try {