Speed up filter tests with adapter cache (#3103)

This commit is contained in:
Jonathan Wei 2016-06-08 07:41:10 -07:00 committed by Gian Merlino
parent 4faa298977
commit 37c8a8f186
8 changed files with 160 additions and 65 deletions

View File

@ -23,7 +23,6 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.Pair;
import com.metamx.common.guava.Sequence;
@ -53,7 +52,6 @@ import io.druid.segment.data.RoaringBitmapSerdeFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
@ -61,7 +59,9 @@ import org.junit.runners.Parameterized;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -77,14 +77,31 @@ public abstract class BaseFilterTest
protected StorageAdapter adapter;
protected Closeable closeable;
protected boolean optimize;
protected final String testName;
// JUnit creates a new test instance for every test method call.
// For filter tests, the test setup creates a segment.
// Creating a new segment for every test method call is pretty slow, so cache the StorageAdapters.
// Each thread gets its own map.
protected static ThreadLocal<Map<String, Map<String, Pair<StorageAdapter, Closeable>>>> adapterCache =
new ThreadLocal<Map<String, Map<String, Pair<StorageAdapter, Closeable>>>>()
{
@Override
protected Map<String, Map<String, Pair<StorageAdapter, Closeable>>> initialValue()
{
return new HashMap<>();
}
};
public BaseFilterTest(
String testName,
List<InputRow> rows,
IndexBuilder indexBuilder,
Function<IndexBuilder, Pair<StorageAdapter, Closeable>> finisher,
boolean optimize
)
{
this.testName = testName;
this.rows = rows;
this.indexBuilder = indexBuilder;
this.finisher = finisher;
@ -94,17 +111,37 @@ public abstract class BaseFilterTest
@Before
public void setUp() throws Exception
{
final Pair<StorageAdapter, Closeable> pair = finisher.apply(
indexBuilder.tmpDir(temporaryFolder.newFolder()).add(rows)
);
String className = getClass().getName();
Map<String, Pair<StorageAdapter, Closeable>> adaptersForClass = adapterCache.get().get(className);
if (adaptersForClass == null) {
adaptersForClass = new HashMap<>();
adapterCache.get().put(className, adaptersForClass);
}
Pair<StorageAdapter, Closeable> pair = adaptersForClass.get(testName);
if (pair == null) {
pair = finisher.apply(
indexBuilder.tmpDir(temporaryFolder.newFolder()).add(rows)
);
adaptersForClass.put(testName, pair);
}
this.adapter = pair.lhs;
this.closeable = pair.rhs;
}
@After
public void tearDown() throws Exception
public static void tearDown(String className) throws Exception
{
closeable.close();
Map<String, Pair<StorageAdapter, Closeable>> adaptersForClass = adapterCache.get().get(className);
if (adaptersForClass != null) {
for (Map.Entry<String, Pair<StorageAdapter, Closeable>> entry : adaptersForClass.entrySet()) {
Closeable closeable = entry.getValue().rhs;
closeable.close();
}
adapterCache.get().put(className, null);
}
}
@Parameterized.Parameters(name = "{0}")
@ -215,41 +252,6 @@ public abstract class BaseFilterTest
return constructors;
}
/**
* Selects elements from "selectColumn" from rows matching a filter. selectColumn must be a single valued dimension.
*/
protected List<String> selectColumnValuesMatchingFilter(final DimFilter filter, final String selectColumn)
{
final Cursor cursor = makeCursor(Filters.toFilter(maybeOptimize(filter)));
final List<String> values = Lists.newArrayList();
final DimensionSelector selector = cursor.makeDimensionSelector(
new DefaultDimensionSpec(selectColumn, selectColumn)
);
for (; !cursor.isDone(); cursor.advance()) {
final IndexedInts row = selector.getRow();
Preconditions.checkState(row.size() == 1);
values.add(selector.lookupName(row.get(0)));
}
return values;
}
protected long selectCountUsingFilteredAggregator(final DimFilter filter)
{
final Cursor cursor = makeCursor(null);
final Aggregator agg = new FilteredAggregatorFactory(
new CountAggregatorFactory("count"),
maybeOptimize(filter)
).factorize(cursor);
for (; !cursor.isDone(); cursor.advance()) {
agg.aggregate();
}
return agg.getLong();
}
private DimFilter maybeOptimize(final DimFilter dimFilter)
{
if (dimFilter == null) {
@ -258,7 +260,7 @@ public abstract class BaseFilterTest
return optimize ? dimFilter.optimize() : dimFilter;
}
private Cursor makeCursor(final Filter filter)
private Sequence<Cursor> makeCursorSequence(final Filter filter)
{
final Sequence<Cursor> cursors = adapter.makeCursors(
filter,
@ -266,6 +268,66 @@ public abstract class BaseFilterTest
QueryGranularities.ALL,
false
);
return Iterables.getOnlyElement(Sequences.toList(cursors, Lists.<Cursor>newArrayList()));
return cursors;
}
/**
* Selects elements from "selectColumn" from rows matching a filter. selectColumn must be a single valued dimension.
*/
protected List<String> selectColumnValuesMatchingFilter(final DimFilter filter, final String selectColumn)
{
final Sequence<Cursor> cursors = makeCursorSequence(Filters.toFilter(maybeOptimize(filter)));
Sequence<List<String>> seq = Sequences.map(
cursors,
new Function<Cursor, List<String>>()
{
@Override
public List<String> apply(Cursor input)
{
final DimensionSelector selector = input.makeDimensionSelector(
new DefaultDimensionSpec(selectColumn, selectColumn)
);
final List<String> values = Lists.newArrayList();
while (!input.isDone()) {
IndexedInts row = selector.getRow();
Preconditions.checkState(row.size() == 1);
values.add(selector.lookupName(row.get(0)));
input.advance();
}
return values;
}
}
);
return Sequences.toList(seq, new ArrayList<List<String>>()).get(0);
}
protected long selectCountUsingFilteredAggregator(final DimFilter filter)
{
final Sequence<Cursor> cursors = makeCursorSequence(Filters.toFilter(maybeOptimize(filter)));
Sequence<Aggregator> aggSeq = Sequences.map(
cursors,
new Function<Cursor, Aggregator>()
{
@Override
public Aggregator apply(Cursor input)
{
Aggregator agg = new FilteredAggregatorFactory(
new CountAggregatorFactory("count"),
maybeOptimize(filter)
).factorize(input);
for (; !input.isDone(); input.advance()) {
agg.aggregate();
}
return agg;
}
}
);
return Sequences.toList(aggSeq, new ArrayList<Aggregator>()).get(0).getLong();
}
}

View File

@ -37,6 +37,7 @@ import io.druid.query.filter.DimFilter;
import io.druid.segment.IndexBuilder;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -74,7 +75,13 @@ public class BoundFilterTest extends BaseFilterTest
boolean optimize
)
{
super(ROWS, indexBuilder, finisher, optimize);
super(testName, ROWS, indexBuilder, finisher, optimize);
}
@AfterClass
public static void tearDown() throws Exception
{
BaseFilterTest.tearDown(BoundFilterTest.class.getName());
}
@Test

View File

@ -45,6 +45,7 @@ import io.druid.query.lookup.LookupExtractor;
import io.druid.segment.IndexBuilder;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -79,12 +80,6 @@ public class InFilterTest extends BaseFilterTest
PARSER.parse(ImmutableMap.<String, Object>of("dim0", "f", "dim1", "abc"))
);
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder() throws IOException
{
return makeConstructors();
}
public InFilterTest(
String testName,
IndexBuilder indexBuilder,
@ -92,17 +87,13 @@ public class InFilterTest extends BaseFilterTest
boolean optimize
)
{
super(ROWS, indexBuilder, finisher, optimize);
super(testName, ROWS, indexBuilder, finisher, optimize);
}
@Before
public void setUp() throws IOException
@AfterClass
public static void tearDown() throws Exception
{
final Pair<StorageAdapter, Closeable> pair = finisher.apply(
indexBuilder.tmpDir(temporaryFolder.newFolder()).add(ROWS)
);
this.adapter = pair.lhs;
this.closeable = pair.rhs;
BaseFilterTest.tearDown(InFilterTest.class.getName());
}
@Test

View File

@ -39,6 +39,7 @@ import io.druid.query.lookup.LookupExtractor;
import io.druid.segment.IndexBuilder;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -80,7 +81,13 @@ public class JavaScriptFilterTest extends BaseFilterTest
boolean optimize
)
{
super(ROWS, indexBuilder, finisher, optimize);
super(testName, ROWS, indexBuilder, finisher, optimize);
}
@AfterClass
public static void tearDown() throws Exception
{
BaseFilterTest.tearDown(JavaScriptFilterTest.class.getName());
}
private final String jsNullFilter = "function(x) { return(x === null) }";

View File

@ -35,6 +35,7 @@ import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.IndexBuilder;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -72,7 +73,13 @@ public class NotFilterTest extends BaseFilterTest
boolean optimize
)
{
super(ROWS, indexBuilder, finisher, optimize);
super(testName, ROWS, indexBuilder, finisher, optimize);
}
@AfterClass
public static void tearDown() throws Exception
{
BaseFilterTest.tearDown(NotFilterTest.class.getName());
}
@Test

View File

@ -37,6 +37,7 @@ import io.druid.query.filter.RegexDimFilter;
import io.druid.segment.IndexBuilder;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -78,7 +79,13 @@ public class RegexFilterTest extends BaseFilterTest
boolean optimize
)
{
super(ROWS, indexBuilder, finisher, optimize);
super(testName, ROWS, indexBuilder, finisher, optimize);
}
@AfterClass
public static void tearDown() throws Exception
{
BaseFilterTest.tearDown(RegexFilterTest.class.getName());
}
@Test

View File

@ -40,6 +40,7 @@ import io.druid.query.search.search.SearchQuerySpec;
import io.druid.segment.IndexBuilder;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -81,7 +82,13 @@ public class SearchQueryFilterTest extends BaseFilterTest
boolean optimize
)
{
super(ROWS, indexBuilder, finisher, optimize);
super(testName, ROWS, indexBuilder, finisher, optimize);
}
@AfterClass
public static void tearDown() throws Exception
{
BaseFilterTest.tearDown(SearchQueryFilterTest.class.getName());
}
private SearchQuerySpec specForValue(String value)

View File

@ -40,6 +40,7 @@ import io.druid.query.lookup.LookupExtractor;
import io.druid.segment.IndexBuilder;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -82,7 +83,13 @@ public class SelectorFilterTest extends BaseFilterTest
boolean optimize
)
{
super(ROWS, indexBuilder, finisher, optimize);
super(testName, ROWS, indexBuilder, finisher, optimize);
}
@AfterClass
public static void tearDown() throws Exception
{
BaseFilterTest.tearDown(SelectorFilterTest.class.getName());
}
@Test