This commit is contained in:
fjy 2013-12-09 14:59:44 -08:00
parent f17e08324d
commit 1fb5fc6707
11 changed files with 832 additions and 34 deletions

View File

@ -70,4 +70,48 @@ public class EventHolder
{
return event;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EventHolder that = (EventHolder) o;
if (offset != that.offset) {
return false;
}
if (!Maps.difference(event, ((EventHolder) o).event).areEqual()) {
return false;
}
if (segmentId != null ? !segmentId.equals(that.segmentId) : that.segmentId != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = segmentId != null ? segmentId.hashCode() : 0;
result = 31 * result + offset;
result = 31 * result + (event != null ? event.hashCode() : 0);
return result;
}
@Override
public String toString()
{
return "EventHolder{" +
"segmentId='" + segmentId + '\'' +
", offset=" + offset +
", event=" + event +
'}';
}
}

View File

@ -21,19 +21,22 @@ package io.druid.query.select;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Ints;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.Map;
/**
*/
public class PagingSpec
{
private final Map<String, Integer> pagingIdentifiers;
private final LinkedHashMap<String, Integer> pagingIdentifiers;
private final int threshold;
@JsonCreator
public PagingSpec(
@JsonProperty("pagingIdentifiers") Map<String, Integer> pagingIdentifiers,
@JsonProperty("pagingIdentifiers") LinkedHashMap<String, Integer> pagingIdentifiers,
@JsonProperty("threshold") int threshold
)
{
@ -53,6 +56,39 @@ public class PagingSpec
return threshold;
}
public byte[] getCacheKey()
{
final byte[][] pagingKeys = new byte[pagingIdentifiers.size()][];
final byte[][] pagingValues = new byte[pagingIdentifiers.size()][];
int index = 0;
int pagingKeysSize = 0;
int pagingValuesSize = 0;
for (Map.Entry<String, Integer> entry : pagingIdentifiers.entrySet()) {
pagingKeys[index] = entry.getKey().getBytes();
pagingValues[index] = ByteBuffer.allocate(Ints.BYTES).putInt(entry.getValue()).array();
pagingKeysSize += pagingKeys[index].length;
pagingValuesSize += Ints.BYTES;
index++;
}
final byte[] thresholdBytes = ByteBuffer.allocate(Ints.BYTES).putInt(threshold).array();
final ByteBuffer queryCacheKey = ByteBuffer.allocate(pagingKeysSize + pagingValuesSize + thresholdBytes.length);
for (byte[] pagingKey : pagingKeys) {
queryCacheKey.put(pagingKey);
}
for (byte[] pagingValue : pagingValues) {
queryCacheKey.put(pagingValue);
}
queryCacheKey.put(thresholdBytes);
return queryCacheKey.array();
}
@Override
public String toString()
{

View File

@ -37,9 +37,6 @@ import java.util.Map;
@JsonTypeName("select")
public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
{
// TODO: remove this
private static final PagingSpec defaultPagingSpec = new PagingSpec(null, 5);
private final DimFilter dimFilter;
private final QueryGranularity granularity;
private final List<String> dimensions;
@ -63,7 +60,7 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
this.granularity = granularity;
this.dimensions = dimensions;
this.metrics = metrics;
this.pagingSpec = pagingSpec == null ? defaultPagingSpec : pagingSpec;
this.pagingSpec = pagingSpec;
}
@Override

View File

@ -29,7 +29,6 @@ import io.druid.query.Result;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.QueryableIndex;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import io.druid.segment.TimestampColumnSelector;
@ -79,13 +78,14 @@ public class SelectQueryEngine
@Override
public Result<SelectResultValue> apply(Cursor cursor)
{
final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
final SelectResultValueBuilder builder = new SelectResultValueBuilder(
cursor.getTime(),
query.getPagingSpec()
.getThreshold()
);
final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) {
final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim);
@ -118,9 +118,7 @@ public class SelectQueryEngine
final DimensionSelector selector = dimSelector.getValue();
final IndexedInts vals = selector.getRow();
if (vals.size() == 0) {
continue;
} else if (vals.size() <= 1) {
if (vals.size() == 1) {
final String dimVal = selector.lookupName(vals.get(0));
theEvent.put(dim, dimVal);
} else {

View File

@ -20,11 +20,12 @@
package io.druid.query.select;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
@ -51,6 +52,8 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
@ -70,11 +73,13 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
};
private final QueryConfig config;
private final ObjectMapper jsonMapper;
@Inject
public SelectQueryQueryToolChest(QueryConfig config)
public SelectQueryQueryToolChest(QueryConfig config, ObjectMapper jsonMapper)
{
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
@ -154,12 +159,58 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
final byte[] filterBytes = dimFilter == null ? new byte[]{} : dimFilter.getCacheKey();
final byte[] granularityBytes = query.getGranularity().cacheKey();
return ByteBuffer
.allocate(1 + granularityBytes.length + filterBytes.length)
final Set<String> dimensions = Sets.newTreeSet();
if (query.getDimensions() != null) {
dimensions.addAll(query.getDimensions());
}
final byte[][] dimensionsBytes = new byte[dimensions.size()][];
int dimensionsBytesSize = 0;
int index = 0;
for (String dimension : dimensions) {
dimensionsBytes[index] = dimension.getBytes();
dimensionsBytesSize += dimensionsBytes[index].length;
++index;
}
final Set<String> metrics = Sets.newTreeSet();
if (query.getMetrics() != null) {
dimensions.addAll(query.getMetrics());
}
final byte[][] metricBytes = new byte[metrics.size()][];
int metricBytesSize = 0;
index = 0;
for (String metric : metrics) {
metricBytes[index] = metric.getBytes();
metricBytesSize += metricBytes[index].length;
++index;
}
final ByteBuffer queryCacheKey = ByteBuffer
.allocate(
1
+ granularityBytes.length
+ filterBytes.length
+ query.getPagingSpec().getCacheKey().length
+ dimensionsBytesSize
+ metricBytesSize
)
.put(SELECT_QUERY)
.put(granularityBytes)
.put(filterBytes)
.array();
.put(query.getPagingSpec().getCacheKey());
for (byte[] dimensionsByte : dimensionsBytes) {
queryCacheKey.put(dimensionsByte);
}
for (byte[] metricByte : metricBytes) {
queryCacheKey.put(metricByte);
}
return queryCacheKey.array();
}
@Override
@ -202,7 +253,18 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
return new Result<SelectResultValue>(
timestamp,
new SelectResultValue(resultIter.next(), Lists.newArrayList(resultIter.next()))
new SelectResultValue(
(Map<String, Integer>) jsonMapper.convertValue(
resultIter.next(), new TypeReference<Map<String, Integer>>()
{
}
),
(List<EventHolder>) jsonMapper.convertValue(
resultIter.next(), new TypeReference<List<EventHolder>>()
{
}
)
)
);
}
};

View File

@ -19,6 +19,7 @@
package io.druid.query.select;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
@ -29,9 +30,7 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.segment.QueryableIndex;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import java.util.concurrent.ExecutorService;
@ -40,10 +39,10 @@ import java.util.concurrent.ExecutorService;
public class SelectQueryRunnerFactory
implements QueryRunnerFactory<Result<SelectResultValue>, SelectQuery>
{
public static SelectQueryRunnerFactory create()
public static SelectQueryRunnerFactory create(ObjectMapper jsonMapper)
{
return new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(new QueryConfig()),
new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper),
new SelectQueryEngine()
);
}

View File

@ -26,6 +26,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -46,15 +47,6 @@ public class SelectResultValue implements Iterable<EventHolder>
this.events = events;
}
public SelectResultValue(
Object pagingIdentifiers,
List<?> events
)
{
this.pagingIdentifiers = null;
this.events = null;
}
@JsonProperty
public Map<String, Integer> getPagingIdentifiers()
{
@ -72,4 +64,45 @@ public class SelectResultValue implements Iterable<EventHolder>
{
return events.iterator();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SelectResultValue that = (SelectResultValue) o;
if (events != null ? !events.equals(that.events) : that.events != null) {
return false;
}
if (pagingIdentifiers != null
? !pagingIdentifiers.equals(that.pagingIdentifiers)
: that.pagingIdentifiers != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = pagingIdentifiers != null ? pagingIdentifiers.hashCode() : 0;
result = 31 * result + (events != null ? events.hashCode() : 0);
return result;
}
@Override
public String toString()
{
return "SelectResultValue{" +
"pagingIdentifiers=" + pagingIdentifiers +
", events=" + events +
'}';
}
}

View File

@ -78,7 +78,7 @@ public class SelectResultValueBuilder
{
// Pull out top aggregated values
List<EventHolder> values = Lists.newArrayListWithCapacity(pQueue.size());
Map<String, Integer> pagingIdentifiers = Maps.newHashMap();
Map<String, Integer> pagingIdentifiers = Maps.newLinkedHashMap();
while (!pQueue.isEmpty()) {
EventHolder event = pQueue.remove();
pagingIdentifiers.put(event.getSegmentId(), event.getOffset());

View File

@ -48,11 +48,13 @@ import java.util.List;
*/
public class QueryRunnerTestHelper
{
public static final String segmentId= "testSegment";
public static final String dataSource = "testing";
public static final QueryGranularity dayGran = QueryGranularity.DAY;
public static final QueryGranularity allGran = QueryGranularity.ALL;
public static final String providerDimension = "proVider";
public static final String qualityDimension = "quality";
public static final String placementDimension = "placement";
public static final String placementishDimension = "placementish";
public static final String indexMetric = "index";
public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
@ -110,13 +112,13 @@ public class QueryRunnerTestHelper
return Arrays.asList(
new Object[][]{
{
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, null))
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId))
},
{
makeQueryRunner(factory, new QueryableIndexSegment(null, mMappedTestIndex))
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex))
},
{
makeQueryRunner(factory, new QueryableIndexSegment(null, mergedRealtimeIndex))
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex))
}
}
);

View File

@ -0,0 +1,224 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.select;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.junit.Test;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
/**
*/
public class SelectBinaryFnTest
{
private static final String segmentId1 = "testSegment";
private static final String segmentId2 = "testSegment";
@Test
public void testApply() throws Exception
{
SelectBinaryFn binaryFn = new SelectBinaryFn(QueryGranularity.ALL, new PagingSpec(null, 5));
Result<SelectResultValue> res1 = new Result<>(
new DateTime("2013-01-01"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Arrays.asList(
new EventHolder(
segmentId1,
0,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T00"),
"dim",
"first"
)
),
new EventHolder(
segmentId1,
1,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T03"),
"dim",
"fourth"
)
),
new EventHolder(
segmentId1,
2,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T05"),
"dim",
"sixth"
)
)
)
)
);
Result<SelectResultValue> res2 = new Result<>(
new DateTime("2013-01-01"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Arrays.asList(
new EventHolder(
segmentId2,
0,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T00"),
"dim",
"second"
)
),
new EventHolder(
segmentId2,
1,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T02"),
"dim",
"third"
)
),
new EventHolder(
segmentId2,
2,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T04"),
"dim",
"fifth"
)
)
)
)
);
Result<SelectResultValue> merged = binaryFn.apply(res1, res2);
Assert.assertEquals(res1.getTimestamp(), merged.getTimestamp());
LinkedHashMap<String, Integer> expectedPageIds = Maps.newLinkedHashMap();
expectedPageIds.put(segmentId1, 0);
expectedPageIds.put(segmentId2, 0);
expectedPageIds.put(segmentId2, 1);
expectedPageIds.put(segmentId1, 1);
expectedPageIds.put(segmentId2, 2);
Iterator<String> exSegmentIter = expectedPageIds.keySet().iterator();
Iterator<String> acSegmentIter = merged.getValue().getPagingIdentifiers().keySet().iterator();
verifyIters(exSegmentIter, acSegmentIter);
Iterator<Integer> exOffsetIter = expectedPageIds.values().iterator();
Iterator<Integer> acOffsetIter = merged.getValue().getPagingIdentifiers().values().iterator();
verifyIters(exOffsetIter, acOffsetIter);
List<EventHolder> exEvents = Arrays.<EventHolder>asList(
new EventHolder(
segmentId1,
0,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T00"), "dim", "first"
)
),
new EventHolder(
segmentId2,
0,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T00"),
"dim",
"second"
)
),
new EventHolder(
segmentId2,
1,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T02"),
"dim",
"third"
)
),
new EventHolder(
segmentId1,
1,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T03"),
"dim",
"fourth"
)
),
new EventHolder(
segmentId2,
2,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T04"),
"dim",
"fifth"
)
)
);
List<EventHolder> acEvents = merged.getValue().getEvents();
verifyEvents(exEvents, acEvents);
}
private void verifyIters(Iterator iter1, Iterator iter2)
{
while (iter1.hasNext()) {
Assert.assertEquals(iter1.next(), iter2.next());
}
if (iter2.hasNext()) {
throw new ISE("This should be empty!");
}
}
private void verifyEvents(List<EventHolder> events1, List<EventHolder> events2)
{
Iterator<EventHolder> ex = events1.iterator();
Iterator<EventHolder> ac = events2.iterator();
verifyIters(ex, ac);
}
}

View File

@ -0,0 +1,403 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.select;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequences;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.spec.LegacySegmentSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*/
@RunWith(Parameterized.class)
public class SelectQueryRunnerTest
{
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
SelectQueryRunnerFactory.create(new DefaultObjectMapper())
);
}
private static final String providerLowercase = "provider";
private final QueryRunner runner;
public SelectQueryRunnerTest(
QueryRunner runner
)
{
this.runner = runner;
}
@Test
public void testFullOnSelect()
{
SelectQuery query = new SelectQuery(
QueryRunnerTestHelper.dataSource,
QueryRunnerTestHelper.fullOnInterval,
null,
QueryRunnerTestHelper.allGran,
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new PagingSpec(null, 3),
null
);
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query),
Lists.<Result<SelectResultValue>>newArrayList()
);
List<Result<SelectResultValue>> expectedResults = Arrays.asList(
new Result<SelectResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, 2),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
0,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(providerLowercase, "spot")
.put(QueryRunnerTestHelper.qualityDimension, "automotive")
.put(QueryRunnerTestHelper.placementDimension, "preferred")
.put(QueryRunnerTestHelper.placementishDimension, Lists.newArrayList("a", "preferred"))
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
1,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(providerLowercase, "spot")
.put(QueryRunnerTestHelper.qualityDimension, "business")
.put(QueryRunnerTestHelper.placementDimension, "preferred")
.put(QueryRunnerTestHelper.placementishDimension, Lists.newArrayList("b", "preferred"))
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
2,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(providerLowercase, "spot")
.put(QueryRunnerTestHelper.qualityDimension, "entertainment")
.put(QueryRunnerTestHelper.placementDimension, "preferred")
.put(QueryRunnerTestHelper.placementishDimension, Lists.newArrayList("e", "preferred"))
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
)
)
)
)
);
verify(expectedResults, results);
}
@Test
public void testSelectWithDimsAndMets()
{
SelectQuery query = new SelectQuery(
QueryRunnerTestHelper.dataSource,
QueryRunnerTestHelper.fullOnInterval,
null,
QueryRunnerTestHelper.allGran,
Lists.<String>newArrayList(providerLowercase),
Lists.<String>newArrayList(QueryRunnerTestHelper.indexMetric),
new PagingSpec(null, 3),
null
);
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query),
Lists.<Result<SelectResultValue>>newArrayList()
);
List<Result<SelectResultValue>> expectedResults = Arrays.asList(
new Result<SelectResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, 2),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
0,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(providerLowercase, "spot")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
1,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(providerLowercase, "spot")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
2,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(providerLowercase, "spot")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
)
)
)
)
);
verify(expectedResults, results);
}
@Test
public void testSelectPagination()
{
SelectQuery query = new SelectQuery(
QueryRunnerTestHelper.dataSource,
QueryRunnerTestHelper.fullOnInterval,
null,
QueryRunnerTestHelper.allGran,
Lists.<String>newArrayList(QueryRunnerTestHelper.qualityDimension),
Lists.<String>newArrayList(QueryRunnerTestHelper.indexMetric),
new PagingSpec(Maps.newLinkedHashMap(ImmutableMap.of(QueryRunnerTestHelper.segmentId, 3)), 3),
null
);
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query),
Lists.<Result<SelectResultValue>>newArrayList()
);
List<Result<SelectResultValue>> expectedResults = Arrays.asList(
new Result<SelectResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, 5),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
3,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "health")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
4,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "mezzanine")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
5,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "news")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
)
)
)
)
);
verify(expectedResults, results);
}
@Test
public void testFullOnSelectWithFilter()
{
SelectQuery query = new SelectQuery(
QueryRunnerTestHelper.dataSource,
new LegacySegmentSpec(new Interval("2011-01-12/2011-01-14")),
new SelectorDimFilter(QueryRunnerTestHelper.providerDimension, "spot"),
QueryRunnerTestHelper.dayGran,
Lists.<String>newArrayList(QueryRunnerTestHelper.qualityDimension),
Lists.<String>newArrayList(QueryRunnerTestHelper.indexMetric),
new PagingSpec(Maps.newLinkedHashMap(ImmutableMap.of(QueryRunnerTestHelper.segmentId, 3)), 3),
null
);
Iterable<Result<SelectResultValue>> results = Sequences.toList(
runner.run(query),
Lists.<Result<SelectResultValue>>newArrayList()
);
List<Result<SelectResultValue>> expectedResults = Arrays.asList(
new Result<SelectResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, 5),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
3,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "health")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
4,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "mezzanine")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
5,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "news")
.put(QueryRunnerTestHelper.indexMetric, 100.000000F)
.build()
)
)
)
),
new Result<SelectResultValue>(
new DateTime("2011-01-13T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, 5),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
3,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-13T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "health")
.put(QueryRunnerTestHelper.indexMetric, 114.947403F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
4,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-13T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "mezzanine")
.put(QueryRunnerTestHelper.indexMetric, 104.465767F)
.build()
),
new EventHolder(
QueryRunnerTestHelper.segmentId,
5,
new ImmutableMap.Builder<String, Object>()
.put(EventHolder.timestampKey, new DateTime("2011-01-13T00:00:00.000Z"))
.put(QueryRunnerTestHelper.qualityDimension, "news")
.put(QueryRunnerTestHelper.indexMetric, 102.851683F)
.build()
)
)
)
)
);
verify(expectedResults, results);
}
private static void verify(
Iterable<Result<SelectResultValue>> expectedResults,
Iterable<Result<SelectResultValue>> actualResults
)
{
Iterator<Result<SelectResultValue>> expectedIter = expectedResults.iterator();
Iterator<Result<SelectResultValue>> actualIter = actualResults.iterator();
while (expectedIter.hasNext()) {
Result<SelectResultValue> expected = expectedIter.next();
Result<SelectResultValue> actual = actualIter.next();
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
for (Map.Entry<String, Integer> entry : expected.getValue().getPagingIdentifiers().entrySet()) {
Assert.assertEquals(entry.getValue(), actual.getValue().getPagingIdentifiers().get(entry.getKey()));
}
Iterator<EventHolder> expectedEvts = expected.getValue().getEvents().iterator();
Iterator<EventHolder> actualEvts = actual.getValue().getEvents().iterator();
while (expectedEvts.hasNext()) {
EventHolder exHolder = expectedEvts.next();
EventHolder acHolder = actualEvts.next();
Assert.assertEquals(exHolder.getTimestamp(), acHolder.getTimestamp());
Assert.assertEquals(exHolder.getOffset(), acHolder.getOffset());
for (Map.Entry<String, Object> ex : exHolder.getEvent().entrySet()) {
Object actVal = acHolder.getEvent().get(ex.getKey());
// work around for current II limitations
if (acHolder.getEvent().get(ex.getKey()) instanceof Double) {
actVal = ((Double) actVal).floatValue();
}
Assert.assertEquals(ex.getValue(), actVal);
}
}
if (actualEvts.hasNext()) {
throw new ISE("This event iterator should be exhausted!");
}
}
if (actualIter.hasNext()) {
throw new ISE("This iterator should be exhausted!");
}
}
}