From 1fb5fc6707ae63f436b79ae3b3963dfe5994c38d Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 9 Dec 2013 14:59:44 -0800 Subject: [PATCH] tests --- .../io/druid/query/select/EventHolder.java | 44 ++ .../io/druid/query/select/PagingSpec.java | 40 +- .../io/druid/query/select/SelectQuery.java | 5 +- .../druid/query/select/SelectQueryEngine.java | 8 +- .../select/SelectQueryQueryToolChest.java | 74 +++- .../select/SelectQueryRunnerFactory.java | 7 +- .../druid/query/select/SelectResultValue.java | 51 ++- .../select/SelectResultValueBuilder.java | 2 +- .../io/druid/query/QueryRunnerTestHelper.java | 8 +- .../query/select/SelectBinaryFnTest.java | 224 ++++++++++ .../query/select/SelectQueryRunnerTest.java | 403 ++++++++++++++++++ 11 files changed, 832 insertions(+), 34 deletions(-) create mode 100644 processing/src/test/java/io/druid/query/select/SelectBinaryFnTest.java create mode 100644 processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java diff --git a/processing/src/main/java/io/druid/query/select/EventHolder.java b/processing/src/main/java/io/druid/query/select/EventHolder.java index 86a898a5d84..1ac3661d1f5 100644 --- a/processing/src/main/java/io/druid/query/select/EventHolder.java +++ b/processing/src/main/java/io/druid/query/select/EventHolder.java @@ -70,4 +70,48 @@ public class EventHolder { return event; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + EventHolder that = (EventHolder) o; + + if (offset != that.offset) { + return false; + } + if (!Maps.difference(event, ((EventHolder) o).event).areEqual()) { + return false; + } + if (segmentId != null ? !segmentId.equals(that.segmentId) : that.segmentId != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = segmentId != null ? segmentId.hashCode() : 0; + result = 31 * result + offset; + result = 31 * result + (event != null ? event.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + return "EventHolder{" + + "segmentId='" + segmentId + '\'' + + ", offset=" + offset + + ", event=" + event + + '}'; + } } diff --git a/processing/src/main/java/io/druid/query/select/PagingSpec.java b/processing/src/main/java/io/druid/query/select/PagingSpec.java index d57afb1589f..7be4cf62746 100644 --- a/processing/src/main/java/io/druid/query/select/PagingSpec.java +++ b/processing/src/main/java/io/druid/query/select/PagingSpec.java @@ -21,19 +21,22 @@ package io.druid.query.select; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.primitives.Ints; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; import java.util.Map; /** */ public class PagingSpec { - private final Map pagingIdentifiers; + private final LinkedHashMap pagingIdentifiers; private final int threshold; @JsonCreator public PagingSpec( - @JsonProperty("pagingIdentifiers") Map pagingIdentifiers, + @JsonProperty("pagingIdentifiers") LinkedHashMap pagingIdentifiers, @JsonProperty("threshold") int threshold ) { @@ -53,6 +56,39 @@ public class PagingSpec return threshold; } + public byte[] getCacheKey() + { + final byte[][] pagingKeys = new byte[pagingIdentifiers.size()][]; + final byte[][] pagingValues = new byte[pagingIdentifiers.size()][]; + + int index = 0; + int pagingKeysSize = 0; + int pagingValuesSize = 0; + for (Map.Entry entry : pagingIdentifiers.entrySet()) { + pagingKeys[index] = entry.getKey().getBytes(); + pagingValues[index] = ByteBuffer.allocate(Ints.BYTES).putInt(entry.getValue()).array(); + pagingKeysSize += pagingKeys[index].length; + pagingValuesSize += Ints.BYTES; + index++; + } + + final byte[] thresholdBytes = ByteBuffer.allocate(Ints.BYTES).putInt(threshold).array(); + + final ByteBuffer queryCacheKey = ByteBuffer.allocate(pagingKeysSize + pagingValuesSize + thresholdBytes.length); + + for (byte[] pagingKey : pagingKeys) { + queryCacheKey.put(pagingKey); + } + + for (byte[] pagingValue : pagingValues) { + queryCacheKey.put(pagingValue); + } + + queryCacheKey.put(thresholdBytes); + + return queryCacheKey.array(); + } + @Override public String toString() { diff --git a/processing/src/main/java/io/druid/query/select/SelectQuery.java b/processing/src/main/java/io/druid/query/select/SelectQuery.java index a4c924d6459..8c5eb2ba59f 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQuery.java +++ b/processing/src/main/java/io/druid/query/select/SelectQuery.java @@ -37,9 +37,6 @@ import java.util.Map; @JsonTypeName("select") public class SelectQuery extends BaseQuery> { - // TODO: remove this - private static final PagingSpec defaultPagingSpec = new PagingSpec(null, 5); - private final DimFilter dimFilter; private final QueryGranularity granularity; private final List dimensions; @@ -63,7 +60,7 @@ public class SelectQuery extends BaseQuery> this.granularity = granularity; this.dimensions = dimensions; this.metrics = metrics; - this.pagingSpec = pagingSpec == null ? defaultPagingSpec : pagingSpec; + this.pagingSpec = pagingSpec; } @Override diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java index bc957c25983..3238ac01f7a 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java @@ -29,7 +29,6 @@ import io.druid.query.Result; import io.druid.segment.Cursor; import io.druid.segment.DimensionSelector; import io.druid.segment.ObjectColumnSelector; -import io.druid.segment.QueryableIndex; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; import io.druid.segment.TimestampColumnSelector; @@ -79,13 +78,14 @@ public class SelectQueryEngine @Override public Result apply(Cursor cursor) { - final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector(); final SelectResultValueBuilder builder = new SelectResultValueBuilder( cursor.getTime(), query.getPagingSpec() .getThreshold() ); + final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector(); + final Map dimSelectors = Maps.newHashMap(); for (String dim : dims) { final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim); @@ -118,9 +118,7 @@ public class SelectQueryEngine final DimensionSelector selector = dimSelector.getValue(); final IndexedInts vals = selector.getRow(); - if (vals.size() == 0) { - continue; - } else if (vals.size() <= 1) { + if (vals.size() == 1) { final String dimVal = selector.lookupName(vals.get(0)); theEvent.put(dim, dimVal); } else { diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 181af647428..9e5a365479c 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -20,11 +20,12 @@ package io.druid.query.select; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Joiner; -import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; @@ -51,6 +52,8 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Set; /** */ @@ -70,11 +73,13 @@ public class SelectQueryQueryToolChest extends QueryToolChest dimensions = Sets.newTreeSet(); + if (query.getDimensions() != null) { + dimensions.addAll(query.getDimensions()); + } + + final byte[][] dimensionsBytes = new byte[dimensions.size()][]; + int dimensionsBytesSize = 0; + int index = 0; + for (String dimension : dimensions) { + dimensionsBytes[index] = dimension.getBytes(); + dimensionsBytesSize += dimensionsBytes[index].length; + ++index; + } + + + final Set metrics = Sets.newTreeSet(); + if (query.getMetrics() != null) { + dimensions.addAll(query.getMetrics()); + } + + final byte[][] metricBytes = new byte[metrics.size()][]; + int metricBytesSize = 0; + index = 0; + for (String metric : metrics) { + metricBytes[index] = metric.getBytes(); + metricBytesSize += metricBytes[index].length; + ++index; + } + + final ByteBuffer queryCacheKey = ByteBuffer + .allocate( + 1 + + granularityBytes.length + + filterBytes.length + + query.getPagingSpec().getCacheKey().length + + dimensionsBytesSize + + metricBytesSize + ) .put(SELECT_QUERY) .put(granularityBytes) .put(filterBytes) - .array(); + .put(query.getPagingSpec().getCacheKey()); + + for (byte[] dimensionsByte : dimensionsBytes) { + queryCacheKey.put(dimensionsByte); + } + + for (byte[] metricByte : metricBytes) { + queryCacheKey.put(metricByte); + } + + return queryCacheKey.array(); } @Override @@ -202,7 +253,18 @@ public class SelectQueryQueryToolChest extends QueryToolChest( timestamp, - new SelectResultValue(resultIter.next(), Lists.newArrayList(resultIter.next())) + new SelectResultValue( + (Map) jsonMapper.convertValue( + resultIter.next(), new TypeReference>() + { + } + ), + (List) jsonMapper.convertValue( + resultIter.next(), new TypeReference>() + { + } + ) + ) ); } }; diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java index c4e50358b3e..6e995b15f44 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java @@ -19,6 +19,7 @@ package io.druid.query.select; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; @@ -29,9 +30,7 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; import io.druid.query.Result; -import io.druid.segment.QueryableIndex; import io.druid.segment.Segment; -import io.druid.segment.StorageAdapter; import java.util.concurrent.ExecutorService; @@ -40,10 +39,10 @@ import java.util.concurrent.ExecutorService; public class SelectQueryRunnerFactory implements QueryRunnerFactory, SelectQuery> { - public static SelectQueryRunnerFactory create() + public static SelectQueryRunnerFactory create(ObjectMapper jsonMapper) { return new SelectQueryRunnerFactory( - new SelectQueryQueryToolChest(new QueryConfig()), + new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper), new SelectQueryEngine() ); } diff --git a/processing/src/main/java/io/druid/query/select/SelectResultValue.java b/processing/src/main/java/io/druid/query/select/SelectResultValue.java index a5910a89a15..d3ce5d6ef68 100644 --- a/processing/src/main/java/io/druid/query/select/SelectResultValue.java +++ b/processing/src/main/java/io/druid/query/select/SelectResultValue.java @@ -26,6 +26,7 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; import com.metamx.common.ISE; +import javax.annotation.Nullable; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -46,15 +47,6 @@ public class SelectResultValue implements Iterable this.events = events; } - public SelectResultValue( - Object pagingIdentifiers, - List events - ) - { - this.pagingIdentifiers = null; - this.events = null; - } - @JsonProperty public Map getPagingIdentifiers() { @@ -72,4 +64,45 @@ public class SelectResultValue implements Iterable { return events.iterator(); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SelectResultValue that = (SelectResultValue) o; + + if (events != null ? !events.equals(that.events) : that.events != null) { + return false; + } + if (pagingIdentifiers != null + ? !pagingIdentifiers.equals(that.pagingIdentifiers) + : that.pagingIdentifiers != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = pagingIdentifiers != null ? pagingIdentifiers.hashCode() : 0; + result = 31 * result + (events != null ? events.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + return "SelectResultValue{" + + "pagingIdentifiers=" + pagingIdentifiers + + ", events=" + events + + '}'; + } } diff --git a/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java b/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java index fe3ba09a946..d417bddae8e 100644 --- a/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java +++ b/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java @@ -78,7 +78,7 @@ public class SelectResultValueBuilder { // Pull out top aggregated values List values = Lists.newArrayListWithCapacity(pQueue.size()); - Map pagingIdentifiers = Maps.newHashMap(); + Map pagingIdentifiers = Maps.newLinkedHashMap(); while (!pQueue.isEmpty()) { EventHolder event = pQueue.remove(); pagingIdentifiers.put(event.getSegmentId(), event.getOffset()); diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 747ab090da5..2f51162f076 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -48,11 +48,13 @@ import java.util.List; */ public class QueryRunnerTestHelper { + public static final String segmentId= "testSegment"; public static final String dataSource = "testing"; public static final QueryGranularity dayGran = QueryGranularity.DAY; public static final QueryGranularity allGran = QueryGranularity.ALL; public static final String providerDimension = "proVider"; public static final String qualityDimension = "quality"; + public static final String placementDimension = "placement"; public static final String placementishDimension = "placementish"; public static final String indexMetric = "index"; public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); @@ -110,13 +112,13 @@ public class QueryRunnerTestHelper return Arrays.asList( new Object[][]{ { - makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, null)) + makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)) }, { - makeQueryRunner(factory, new QueryableIndexSegment(null, mMappedTestIndex)) + makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)) }, { - makeQueryRunner(factory, new QueryableIndexSegment(null, mergedRealtimeIndex)) + makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)) } } ); diff --git a/processing/src/test/java/io/druid/query/select/SelectBinaryFnTest.java b/processing/src/test/java/io/druid/query/select/SelectBinaryFnTest.java new file mode 100644 index 00000000000..5eb059deb30 --- /dev/null +++ b/processing/src/test/java/io/druid/query/select/SelectBinaryFnTest.java @@ -0,0 +1,224 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.select; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.metamx.common.ISE; +import io.druid.granularity.QueryGranularity; +import io.druid.query.Result; +import junit.framework.Assert; +import org.joda.time.DateTime; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; + +/** + */ +public class SelectBinaryFnTest +{ + private static final String segmentId1 = "testSegment"; + + private static final String segmentId2 = "testSegment"; + + @Test + public void testApply() throws Exception + { + SelectBinaryFn binaryFn = new SelectBinaryFn(QueryGranularity.ALL, new PagingSpec(null, 5)); + + Result res1 = new Result<>( + new DateTime("2013-01-01"), + new SelectResultValue( + ImmutableMap.of(), + Arrays.asList( + new EventHolder( + segmentId1, + 0, + ImmutableMap.of( + EventHolder.timestampKey, + new DateTime("2013-01-01T00"), + "dim", + "first" + ) + ), + new EventHolder( + segmentId1, + 1, + ImmutableMap.of( + EventHolder.timestampKey, + new DateTime("2013-01-01T03"), + "dim", + "fourth" + ) + ), + new EventHolder( + segmentId1, + 2, + ImmutableMap.of( + EventHolder.timestampKey, + new DateTime("2013-01-01T05"), + "dim", + "sixth" + ) + ) + ) + ) + ); + + + Result res2 = new Result<>( + new DateTime("2013-01-01"), + new SelectResultValue( + ImmutableMap.of(), + Arrays.asList( + new EventHolder( + segmentId2, + 0, + ImmutableMap.of( + EventHolder.timestampKey, + new DateTime("2013-01-01T00"), + "dim", + "second" + ) + ), + new EventHolder( + segmentId2, + 1, + ImmutableMap.of( + EventHolder.timestampKey, + new DateTime("2013-01-01T02"), + "dim", + "third" + ) + ), + new EventHolder( + segmentId2, + 2, + ImmutableMap.of( + EventHolder.timestampKey, + new DateTime("2013-01-01T04"), + "dim", + "fifth" + ) + ) + ) + ) + ); + + Result merged = binaryFn.apply(res1, res2); + + Assert.assertEquals(res1.getTimestamp(), merged.getTimestamp()); + + LinkedHashMap expectedPageIds = Maps.newLinkedHashMap(); + expectedPageIds.put(segmentId1, 0); + expectedPageIds.put(segmentId2, 0); + expectedPageIds.put(segmentId2, 1); + expectedPageIds.put(segmentId1, 1); + expectedPageIds.put(segmentId2, 2); + + Iterator exSegmentIter = expectedPageIds.keySet().iterator(); + Iterator acSegmentIter = merged.getValue().getPagingIdentifiers().keySet().iterator(); + + verifyIters(exSegmentIter, acSegmentIter); + + Iterator exOffsetIter = expectedPageIds.values().iterator(); + Iterator acOffsetIter = merged.getValue().getPagingIdentifiers().values().iterator(); + + verifyIters(exOffsetIter, acOffsetIter); + + List exEvents = Arrays.asList( + new EventHolder( + segmentId1, + 0, + ImmutableMap.of( + EventHolder.timestampKey, + new DateTime("2013-01-01T00"), "dim", "first" + ) + ), + new EventHolder( + segmentId2, + 0, + ImmutableMap.of( + EventHolder.timestampKey, + new DateTime("2013-01-01T00"), + "dim", + "second" + ) + ), + new EventHolder( + segmentId2, + 1, + ImmutableMap.of( + EventHolder.timestampKey, + new DateTime("2013-01-01T02"), + "dim", + "third" + ) + ), + new EventHolder( + segmentId1, + 1, + ImmutableMap.of( + EventHolder.timestampKey, + new DateTime("2013-01-01T03"), + "dim", + "fourth" + ) + ), + new EventHolder( + segmentId2, + 2, + ImmutableMap.of( + EventHolder.timestampKey, + new DateTime("2013-01-01T04"), + "dim", + "fifth" + ) + ) + ); + + List acEvents = merged.getValue().getEvents(); + + + verifyEvents(exEvents, acEvents); + } + + private void verifyIters(Iterator iter1, Iterator iter2) + { + while (iter1.hasNext()) { + Assert.assertEquals(iter1.next(), iter2.next()); + } + + if (iter2.hasNext()) { + throw new ISE("This should be empty!"); + } + } + + private void verifyEvents(List events1, List events2) + { + Iterator ex = events1.iterator(); + Iterator ac = events2.iterator(); + + verifyIters(ex, ac); + } +} diff --git a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java new file mode 100644 index 00000000000..6c7b26d6059 --- /dev/null +++ b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java @@ -0,0 +1,403 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.select; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.common.ISE; +import com.metamx.common.guava.Sequences; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.Result; +import io.druid.query.filter.SelectorDimFilter; +import io.druid.query.spec.LegacySegmentSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + */ +@RunWith(Parameterized.class) +public class SelectQueryRunnerTest +{ + @Parameterized.Parameters + public static Collection constructorFeeder() throws IOException + { + return QueryRunnerTestHelper.makeQueryRunners( + SelectQueryRunnerFactory.create(new DefaultObjectMapper()) + ); + } + + private static final String providerLowercase = "provider"; + + private final QueryRunner runner; + + public SelectQueryRunnerTest( + QueryRunner runner + ) + { + this.runner = runner; + } + + @Test + public void testFullOnSelect() + { + SelectQuery query = new SelectQuery( + QueryRunnerTestHelper.dataSource, + QueryRunnerTestHelper.fullOnInterval, + null, + QueryRunnerTestHelper.allGran, + Lists.newArrayList(), + Lists.newArrayList(), + new PagingSpec(null, 3), + null + ); + + Iterable> results = Sequences.toList( + runner.run(query), + Lists.>newArrayList() + ); + + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-01-12T00:00:00.000Z"), + new SelectResultValue( + ImmutableMap.of(QueryRunnerTestHelper.segmentId, 2), + Arrays.asList( + new EventHolder( + QueryRunnerTestHelper.segmentId, + 0, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) + .put(providerLowercase, "spot") + .put(QueryRunnerTestHelper.qualityDimension, "automotive") + .put(QueryRunnerTestHelper.placementDimension, "preferred") + .put(QueryRunnerTestHelper.placementishDimension, Lists.newArrayList("a", "preferred")) + .put(QueryRunnerTestHelper.indexMetric, 100.000000F) + .build() + ), + new EventHolder( + QueryRunnerTestHelper.segmentId, + 1, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) + .put(providerLowercase, "spot") + .put(QueryRunnerTestHelper.qualityDimension, "business") + .put(QueryRunnerTestHelper.placementDimension, "preferred") + .put(QueryRunnerTestHelper.placementishDimension, Lists.newArrayList("b", "preferred")) + .put(QueryRunnerTestHelper.indexMetric, 100.000000F) + .build() + ), + new EventHolder( + QueryRunnerTestHelper.segmentId, + 2, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) + .put(providerLowercase, "spot") + .put(QueryRunnerTestHelper.qualityDimension, "entertainment") + .put(QueryRunnerTestHelper.placementDimension, "preferred") + .put(QueryRunnerTestHelper.placementishDimension, Lists.newArrayList("e", "preferred")) + .put(QueryRunnerTestHelper.indexMetric, 100.000000F) + .build() + ) + ) + ) + ) + ); + + verify(expectedResults, results); + } + + @Test + public void testSelectWithDimsAndMets() + { + SelectQuery query = new SelectQuery( + QueryRunnerTestHelper.dataSource, + QueryRunnerTestHelper.fullOnInterval, + null, + QueryRunnerTestHelper.allGran, + Lists.newArrayList(providerLowercase), + Lists.newArrayList(QueryRunnerTestHelper.indexMetric), + new PagingSpec(null, 3), + null + ); + + Iterable> results = Sequences.toList( + runner.run(query), + Lists.>newArrayList() + ); + + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-01-12T00:00:00.000Z"), + new SelectResultValue( + ImmutableMap.of(QueryRunnerTestHelper.segmentId, 2), + Arrays.asList( + new EventHolder( + QueryRunnerTestHelper.segmentId, + 0, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) + .put(providerLowercase, "spot") + .put(QueryRunnerTestHelper.indexMetric, 100.000000F) + .build() + ), + new EventHolder( + QueryRunnerTestHelper.segmentId, + 1, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) + .put(providerLowercase, "spot") + .put(QueryRunnerTestHelper.indexMetric, 100.000000F) + .build() + ), + new EventHolder( + QueryRunnerTestHelper.segmentId, + 2, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) + .put(providerLowercase, "spot") + .put(QueryRunnerTestHelper.indexMetric, 100.000000F) + .build() + ) + ) + ) + ) + ); + + verify(expectedResults, results); + } + + @Test + public void testSelectPagination() + { + SelectQuery query = new SelectQuery( + QueryRunnerTestHelper.dataSource, + QueryRunnerTestHelper.fullOnInterval, + null, + QueryRunnerTestHelper.allGran, + Lists.newArrayList(QueryRunnerTestHelper.qualityDimension), + Lists.newArrayList(QueryRunnerTestHelper.indexMetric), + new PagingSpec(Maps.newLinkedHashMap(ImmutableMap.of(QueryRunnerTestHelper.segmentId, 3)), 3), + null + ); + + Iterable> results = Sequences.toList( + runner.run(query), + Lists.>newArrayList() + ); + + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-01-12T00:00:00.000Z"), + new SelectResultValue( + ImmutableMap.of(QueryRunnerTestHelper.segmentId, 5), + Arrays.asList( + new EventHolder( + QueryRunnerTestHelper.segmentId, + 3, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) + .put(QueryRunnerTestHelper.qualityDimension, "health") + .put(QueryRunnerTestHelper.indexMetric, 100.000000F) + .build() + ), + new EventHolder( + QueryRunnerTestHelper.segmentId, + 4, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) + .put(QueryRunnerTestHelper.qualityDimension, "mezzanine") + .put(QueryRunnerTestHelper.indexMetric, 100.000000F) + .build() + ), + new EventHolder( + QueryRunnerTestHelper.segmentId, + 5, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) + .put(QueryRunnerTestHelper.qualityDimension, "news") + .put(QueryRunnerTestHelper.indexMetric, 100.000000F) + .build() + ) + ) + ) + ) + ); + + verify(expectedResults, results); + } + + @Test + public void testFullOnSelectWithFilter() + { + SelectQuery query = new SelectQuery( + QueryRunnerTestHelper.dataSource, + new LegacySegmentSpec(new Interval("2011-01-12/2011-01-14")), + new SelectorDimFilter(QueryRunnerTestHelper.providerDimension, "spot"), + QueryRunnerTestHelper.dayGran, + Lists.newArrayList(QueryRunnerTestHelper.qualityDimension), + Lists.newArrayList(QueryRunnerTestHelper.indexMetric), + new PagingSpec(Maps.newLinkedHashMap(ImmutableMap.of(QueryRunnerTestHelper.segmentId, 3)), 3), + null + ); + + Iterable> results = Sequences.toList( + runner.run(query), + Lists.>newArrayList() + ); + + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-01-12T00:00:00.000Z"), + new SelectResultValue( + ImmutableMap.of(QueryRunnerTestHelper.segmentId, 5), + Arrays.asList( + new EventHolder( + QueryRunnerTestHelper.segmentId, + 3, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) + .put(QueryRunnerTestHelper.qualityDimension, "health") + .put(QueryRunnerTestHelper.indexMetric, 100.000000F) + .build() + ), + new EventHolder( + QueryRunnerTestHelper.segmentId, + 4, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) + .put(QueryRunnerTestHelper.qualityDimension, "mezzanine") + .put(QueryRunnerTestHelper.indexMetric, 100.000000F) + .build() + ), + new EventHolder( + QueryRunnerTestHelper.segmentId, + 5, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) + .put(QueryRunnerTestHelper.qualityDimension, "news") + .put(QueryRunnerTestHelper.indexMetric, 100.000000F) + .build() + ) + ) + ) + ), + new Result( + new DateTime("2011-01-13T00:00:00.000Z"), + new SelectResultValue( + ImmutableMap.of(QueryRunnerTestHelper.segmentId, 5), + Arrays.asList( + new EventHolder( + QueryRunnerTestHelper.segmentId, + 3, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-13T00:00:00.000Z")) + .put(QueryRunnerTestHelper.qualityDimension, "health") + .put(QueryRunnerTestHelper.indexMetric, 114.947403F) + .build() + ), + new EventHolder( + QueryRunnerTestHelper.segmentId, + 4, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-13T00:00:00.000Z")) + .put(QueryRunnerTestHelper.qualityDimension, "mezzanine") + .put(QueryRunnerTestHelper.indexMetric, 104.465767F) + .build() + ), + new EventHolder( + QueryRunnerTestHelper.segmentId, + 5, + new ImmutableMap.Builder() + .put(EventHolder.timestampKey, new DateTime("2011-01-13T00:00:00.000Z")) + .put(QueryRunnerTestHelper.qualityDimension, "news") + .put(QueryRunnerTestHelper.indexMetric, 102.851683F) + .build() + ) + ) + ) + ) + ); + + verify(expectedResults, results); + } + + private static void verify( + Iterable> expectedResults, + Iterable> actualResults + ) + { + Iterator> expectedIter = expectedResults.iterator(); + Iterator> actualIter = actualResults.iterator(); + + while (expectedIter.hasNext()) { + Result expected = expectedIter.next(); + Result actual = actualIter.next(); + + Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp()); + + for (Map.Entry entry : expected.getValue().getPagingIdentifiers().entrySet()) { + Assert.assertEquals(entry.getValue(), actual.getValue().getPagingIdentifiers().get(entry.getKey())); + } + + Iterator expectedEvts = expected.getValue().getEvents().iterator(); + Iterator actualEvts = actual.getValue().getEvents().iterator(); + + while (expectedEvts.hasNext()) { + EventHolder exHolder = expectedEvts.next(); + EventHolder acHolder = actualEvts.next(); + + Assert.assertEquals(exHolder.getTimestamp(), acHolder.getTimestamp()); + Assert.assertEquals(exHolder.getOffset(), acHolder.getOffset()); + + for (Map.Entry ex : exHolder.getEvent().entrySet()) { + Object actVal = acHolder.getEvent().get(ex.getKey()); + + // work around for current II limitations + if (acHolder.getEvent().get(ex.getKey()) instanceof Double) { + actVal = ((Double) actVal).floatValue(); + } + Assert.assertEquals(ex.getValue(), actVal); + } + } + + if (actualEvts.hasNext()) { + throw new ISE("This event iterator should be exhausted!"); + } + } + + if (actualIter.hasNext()) { + throw new ISE("This iterator should be exhausted!"); + } + } +}