First and Last Aggregator (#3566)

* add first and last aggregator

* add test and fix

* moving around

* separate aggregator valueType

* address PR comment

* add finalize inner query and adjust v1 inner indexing

* better test and fixes

* java-util import fixes

* PR comments

* Add first/last aggs to ITWikipediaQueryTest
This commit is contained in:
Jonathan Wei 2016-12-16 15:26:40 -08:00 committed by Fangjin Yang
parent 93c34d3c3f
commit 2bfcc8a592
31 changed files with 3389 additions and 28 deletions

View File

@ -0,0 +1,45 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.collections;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.java.util.common.Pair;
public class SerializablePair<T1, T2> extends Pair<T1, T2>
{
@JsonCreator
public SerializablePair(@JsonProperty("lhs") T1 lhs, @JsonProperty("rhs") T2 rhs)
{
super(lhs, rhs);
}
@JsonProperty
public T1 getLhs()
{
return lhs;
}
@JsonProperty
public T2 getRhs()
{
return rhs;
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.collections;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class SerializablePairTest
{
private static final ObjectMapper jsonMapper = new ObjectMapper();
@Test
public void testBytesSerde() throws IOException
{
SerializablePair pair = new SerializablePair<>(5L, 9L);
byte[] bytes = jsonMapper.writeValueAsBytes(pair);
SerializablePair<Number, Number> deserializedPair = jsonMapper.readValue(bytes, SerializablePair.class);
Assert.assertEquals(pair.lhs, deserializedPair.lhs.longValue());
Assert.assertEquals(pair.rhs, deserializedPair.rhs.longValue());
}
@Test
public void testStringSerde() throws IOException
{
SerializablePair pair = new SerializablePair<>(5L, 9L);
String str = jsonMapper.writeValueAsString(pair);
SerializablePair<Number, Number> deserializedPair = jsonMapper.readValue(str, SerializablePair.class);
Assert.assertEquals(pair.lhs, deserializedPair.lhs.longValue());
Assert.assertEquals(pair.rhs, deserializedPair.rhs.longValue());
}
}

View File

@ -76,6 +76,60 @@ Computes the sum of values as 64-bit floating point value. Similar to `longSum`
{ "type" : "longMax", "name" : <output_name>, "fieldName" : <metric_name> } { "type" : "longMax", "name" : <output_name>, "fieldName" : <metric_name> }
``` ```
### First / Last aggregator
First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries.
Note that queries with first/last aggregators on a segment created with rollup enabled will return the rolled up value, and not the last value within the raw ingested data.
#### `doubleFirst` aggregator
`doubleFirst` computes the metric value with the minimum timestamp or 0 if no row exist
```json
{
"type" : "doubleFirst",
"name" : <output_name>,
"fieldName" : <metric_name>
}
```
#### `doubleLast` aggregator
`doubleLast` computes the metric value with the maximum timestamp or 0 if no row exist
```json
{
"type" : "doubleLast",
"name" : <output_name>,
"fieldName" : <metric_name>
}
```
#### `longFirst` aggregator
`longFirst` computes the metric value with the minimum timestamp or 0 if no row exist
```json
{
"type" : "longFirst",
"name" : <output_name>,
"fieldName" : <metric_name>
}
```
#### `longLast` aggregator
`longLast` computes the metric value with the maximum timestamp or 0 if no row exist
```json
{
"type" : "longLast",
"name" : <output_name>,
"fieldName" : <metric_name>,
}
```
### JavaScript aggregator ### JavaScript aggregator
Computes an arbitrary JavaScript function over a set of columns (both metrics and dimensions are allowed). Your Computes an arbitrary JavaScript function over a set of columns (both metrics and dimensions are allowed). Your

View File

@ -73,6 +73,26 @@
"type": "hyperUnique", "type": "hyperUnique",
"fieldName": "unique_users", "fieldName": "unique_users",
"name": "unique_users" "name": "unique_users"
},
{
"type" : "doubleFirst",
"name" : "firstAdded",
"fieldName" : "added"
},
{
"type" : "doubleLast",
"name" : "lastAdded",
"fieldName" : "added"
},
{
"type" : "longFirst",
"name" : "firstCount",
"fieldName" : "count"
},
{
"type" : "longLast",
"name" : "lastCount",
"fieldName" : "count"
} }
], ],
"context": { "context": {
@ -87,6 +107,10 @@
"result": { "result": {
"added": 9.11526338E8, "added": 9.11526338E8,
"count": 2815650, "count": 2815650,
"firstAdded": 39.0,
"lastAdded": 210.0,
"firstCount": 1,
"lastCount": 1,
"delta": 5.48967603E8, "delta": 5.48967603E8,
"variation": 1.274085073E9, "variation": 1.274085073E9,
"delta_hist": { "delta_hist": {
@ -174,6 +198,26 @@
"type": "hyperUnique", "type": "hyperUnique",
"fieldName": "unique_users", "fieldName": "unique_users",
"name": "unique_users" "name": "unique_users"
},
{
"type" : "doubleFirst",
"name" : "firstAdded",
"fieldName" : "added"
},
{
"type" : "doubleLast",
"name" : "lastAdded",
"fieldName" : "added"
},
{
"type" : "longFirst",
"name" : "firstCount",
"fieldName" : "count"
},
{
"type" : "longLast",
"name" : "lastCount",
"fieldName" : "count"
} }
], ],
"context": { "context": {
@ -188,6 +232,10 @@
"result": { "result": {
"added": 3.49393993E8, "added": 3.49393993E8,
"count": 1829240, "count": 1829240,
"firstAdded": 39.0,
"lastAdded": 210.0,
"firstCount": 1,
"lastCount": 1,
"delta": 2.24089868E8, "delta": 2.24089868E8,
"variation": 4.74698118E8, "variation": 4.74698118E8,
"delta_hist": { "delta_hist": {
@ -365,6 +413,26 @@
"type": "hyperUnique", "type": "hyperUnique",
"fieldName": "unique_users", "fieldName": "unique_users",
"name": "unique_users" "name": "unique_users"
},
{
"type" : "doubleFirst",
"name" : "firstAdded",
"fieldName" : "added"
},
{
"type" : "doubleLast",
"name" : "lastAdded",
"fieldName" : "added"
},
{
"type" : "longFirst",
"name" : "firstCount",
"fieldName" : "count"
},
{
"type" : "longLast",
"name" : "lastCount",
"fieldName" : "count"
} }
], ],
"dimension": "page", "dimension": "page",
@ -383,6 +451,10 @@
{ {
"added": 1812960.0, "added": 1812960.0,
"count": 1697, "count": 1697,
"firstCount": 2,
"lastCount": 3,
"firstAdded": 462.0,
"lastAdded": 1871.0,
"page": "Wikipedia:Administrators'_noticeboard/Incidents", "page": "Wikipedia:Administrators'_noticeboard/Incidents",
"delta": 770071.0, "delta": 770071.0,
"variation": 2855849.0, "variation": 2855849.0,
@ -393,6 +465,10 @@
{ {
"added": 70162.0, "added": 70162.0,
"count": 967, "count": 967,
"firstCount": 1,
"lastCount": 1,
"firstAdded": 12.0,
"lastAdded": 129.0,
"page": "2013", "page": "2013",
"delta": 40872.0, "delta": 40872.0,
"variation": 99452.0, "variation": 99452.0,
@ -403,6 +479,10 @@
{ {
"added": 519152.0, "added": 519152.0,
"count": 1700, "count": 1700,
"firstCount": 1,
"lastCount": 5,
"firstAdded": 0.0,
"lastAdded": 2399.0,
"page": "Wikipedia:Vandalismusmeldung", "page": "Wikipedia:Vandalismusmeldung",
"delta": -5446.0, "delta": -5446.0,
"variation": 1043750.0, "variation": 1043750.0,
@ -480,6 +560,26 @@
"type": "hyperUnique", "type": "hyperUnique",
"fieldName": "unique_users", "fieldName": "unique_users",
"name": "unique_users" "name": "unique_users"
},
{
"type" : "doubleFirst",
"name" : "firstAdded",
"fieldName" : "added"
},
{
"type" : "doubleLast",
"name" : "lastAdded",
"fieldName" : "added"
},
{
"type" : "longFirst",
"name" : "firstCount",
"fieldName" : "count"
},
{
"type" : "longLast",
"name" : "lastCount",
"fieldName" : "count"
} }
], ],
"dimension": "page", "dimension": "page",
@ -498,6 +598,10 @@
{ {
"added": 61739.0, "added": 61739.0,
"count": 852, "count": 852,
"firstCount": 1,
"lastCount": 1,
"firstAdded": 12.0,
"lastAdded": 129.0,
"page": "2013", "page": "2013",
"delta": 35313.0, "delta": 35313.0,
"variation": 88165.0, "variation": 88165.0,
@ -508,6 +612,10 @@
{ {
"added": 28288.0, "added": 28288.0,
"count": 513, "count": 513,
"firstCount": 1,
"lastCount": 1,
"firstAdded": 29.0,
"lastAdded": 37.0,
"page": "Gérard_Depardieu", "page": "Gérard_Depardieu",
"delta": 7027.0, "delta": 7027.0,
"variation": 49549.0, "variation": 49549.0,
@ -518,6 +626,10 @@
{ {
"added": 10951.0, "added": 10951.0,
"count": 459, "count": 459,
"firstCount": 1,
"lastCount": 1,
"firstAdded": 29.0,
"lastAdded": 35.0,
"page": "Zichyújfalu", "page": "Zichyújfalu",
"delta": 9030.0, "delta": 9030.0,
"variation": 12872.0, "variation": 12872.0,
@ -570,6 +682,26 @@
"type": "hyperUnique", "type": "hyperUnique",
"fieldName": "unique_users", "fieldName": "unique_users",
"name": "unique_users" "name": "unique_users"
},
{
"type" : "doubleFirst",
"name" : "firstAdded",
"fieldName" : "added"
},
{
"type" : "doubleLast",
"name" : "lastAdded",
"fieldName" : "added"
},
{
"type" : "longFirst",
"name" : "firstCount",
"fieldName" : "count"
},
{
"type" : "longLast",
"name" : "lastCount",
"fieldName" : "count"
} }
], ],
"postAggregations": [ "postAggregations": [
@ -619,6 +751,10 @@
{ {
"added": 151409.0, "added": 151409.0,
"count": 1770, "count": 1770,
"firstCount": 9,
"lastCount": 9,
"firstAdded": 1612.0,
"lastAdded": 560.0,
"page": "User:Cyde/List_of_candidates_for_speedy_deletion/Subpage", "page": "User:Cyde/List_of_candidates_for_speedy_deletion/Subpage",
"delta": 670.0, "delta": 670.0,
"variation": 302148.0, "variation": 302148.0,
@ -630,6 +766,10 @@
{ {
"added": 519152.0, "added": 519152.0,
"count": 1700, "count": 1700,
"firstCount": 1,
"lastCount": 5,
"firstAdded": 0.0,
"lastAdded": 2399.0,
"page": "Wikipedia:Vandalismusmeldung", "page": "Wikipedia:Vandalismusmeldung",
"delta": -5446.0, "delta": -5446.0,
"variation": 1043750.0, "variation": 1043750.0,
@ -641,6 +781,10 @@
{ {
"added": 1812960.0, "added": 1812960.0,
"count": 1697, "count": 1697,
"firstCount": 2,
"lastCount": 3,
"firstAdded": 462.0,
"lastAdded": 1871.0,
"page": "Wikipedia:Administrators'_noticeboard/Incidents", "page": "Wikipedia:Administrators'_noticeboard/Incidents",
"delta": 770071.0, "delta": 770071.0,
"variation": 2855849.0, "variation": 2855849.0,
@ -865,7 +1009,7 @@
] ]
}, },
{ {
"description": "groupBy, two aggs, namespace + robot dim, postAggs", "description": "groupBy, six aggs, namespace + robot dim, postAggs",
"query": { "query": {
"queryType": "groupBy", "queryType": "groupBy",
"dataSource": "wikipedia_editstream", "dataSource": "wikipedia_editstream",
@ -880,6 +1024,26 @@
"type": "longSum", "type": "longSum",
"fieldName": "count", "fieldName": "count",
"name": "count" "name": "count"
},
{
"type" : "doubleFirst",
"name" : "firstAdded",
"fieldName" : "added"
},
{
"type" : "doubleLast",
"name" : "lastAdded",
"fieldName" : "added"
},
{
"type" : "longFirst",
"name" : "firstCount",
"fieldName" : "count"
},
{
"type" : "longLast",
"name" : "lastCount",
"fieldName" : "count"
} }
], ],
"postAggregations": [ "postAggregations": [
@ -920,6 +1084,10 @@
"event": { "event": {
"sumOfRowsAndCount": 2268154.0, "sumOfRowsAndCount": 2268154.0,
"count": 1286354, "count": 1286354,
"firstCount": 1,
"lastCount": 1,
"firstAdded": 70.0,
"lastAdded": 210.0,
"robot": "0", "robot": "0",
"rows": 981800, "rows": 981800,
"namespace": "article" "namespace": "article"
@ -931,6 +1099,10 @@
"event": { "event": {
"sumOfRowsAndCount": 1385233.0, "sumOfRowsAndCount": 1385233.0,
"count": 693711, "count": 693711,
"firstCount": 1,
"lastCount": 1,
"firstAdded": 39.0,
"lastAdded": 0.0,
"robot": "1", "robot": "1",
"rows": 691522, "rows": 691522,
"namespace": "article" "namespace": "article"
@ -942,6 +1114,10 @@
"event": { "event": {
"sumOfRowsAndCount": 878393.0, "sumOfRowsAndCount": 878393.0,
"count": 492643, "count": 492643,
"firstCount": 2,
"lastCount": 1,
"firstAdded": 431.0,
"lastAdded": 43.0,
"robot": "0", "robot": "0",
"rows": 385750, "rows": 385750,
"namespace": "wikipedia" "namespace": "wikipedia"

View File

@ -29,8 +29,11 @@ import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory; import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.HistogramAggregatorFactory; import io.druid.query.aggregation.HistogramAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory; import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregatorFactory; import io.druid.query.aggregation.LongMaxAggregatorFactory;
import io.druid.query.aggregation.LongMinAggregatorFactory; import io.druid.query.aggregation.LongMinAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -39,6 +42,7 @@ import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.aggregation.last.LongLastAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator; import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator; import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.DoubleGreatestPostAggregator; import io.druid.query.aggregation.post.DoubleGreatestPostAggregator;
@ -79,7 +83,11 @@ public class AggregatorsModule extends SimpleModule
@JsonSubTypes.Type(name = "histogram", value = HistogramAggregatorFactory.class), @JsonSubTypes.Type(name = "histogram", value = HistogramAggregatorFactory.class),
@JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class), @JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class),
@JsonSubTypes.Type(name = "cardinality", value = CardinalityAggregatorFactory.class), @JsonSubTypes.Type(name = "cardinality", value = CardinalityAggregatorFactory.class),
@JsonSubTypes.Type(name = "filtered", value = FilteredAggregatorFactory.class) @JsonSubTypes.Type(name = "filtered", value = FilteredAggregatorFactory.class),
@JsonSubTypes.Type(name = "longFirst", value = LongFirstAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleFirst", value = DoubleFirstAggregatorFactory.class),
@JsonSubTypes.Type(name = "longLast", value = LongLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleLast", value = DoubleLastAggregatorFactory.class)
}) })
public static interface AggregatorFactoryMixin public static interface AggregatorFactoryMixin
{ {

View File

@ -87,7 +87,8 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( final Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query, query,
querySpecificConfig, querySpecificConfig,
bufferPool bufferPool,
true
); );
final Pair<Queue, Accumulator<Queue, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); final Pair<Queue, Accumulator<Queue, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final boolean bySegment = BaseQuery.getContextBySegment(query, false); final boolean bySegment = BaseQuery.getContextBySegment(query, false);

View File

@ -171,7 +171,7 @@ public class JavaScriptAggregatorFactory extends AggregatorFactory
@Override @Override
public AggregatorFactory apply(String input) public AggregatorFactory apply(String input)
{ {
return new JavaScriptAggregatorFactory(input, fieldNames, fnAggregate, fnReset, fnCombine, config); return new JavaScriptAggregatorFactory(input, Lists.newArrayList(input), fnCombine, fnReset, fnCombine, config);
} }
} }
) )

View File

@ -0,0 +1,97 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.first;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
public class DoubleFirstAggregator implements Aggregator
{
private final FloatColumnSelector valueSelector;
private final LongColumnSelector timeSelector;
private final String name;
protected long firstTime;
protected double firstValue;
public DoubleFirstAggregator(
String name,
LongColumnSelector timeSelector,
FloatColumnSelector valueSelector
)
{
this.name = name;
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;
reset();
}
@Override
public void aggregate()
{
long time = timeSelector.get();
if (time < firstTime) {
firstTime = time;
firstValue = valueSelector.get();
}
}
@Override
public void reset()
{
firstTime = Long.MAX_VALUE;
firstValue = 0;
}
@Override
public Object get()
{
return new SerializablePair<>(firstTime, firstValue);
}
@Override
public float getFloat()
{
return (float) firstValue;
}
@Override
public String getName()
{
return name;
}
@Override
public void close()
{
}
@Override
public long getLong()
{
return (long) firstValue;
}
}

View File

@ -0,0 +1,265 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.first;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;
import com.metamx.common.StringUtils;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
public class DoubleFirstAggregatorFactory extends AggregatorFactory
{
public static final Comparator VALUE_COMPARATOR = new Comparator()
{
@Override
public int compare(Object o1, Object o2)
{
return Doubles.compare(((SerializablePair<Long, Double>) o1).rhs, ((SerializablePair<Long, Double>) o2).rhs);
}
};
public static final Comparator TIME_COMPARATOR = new Comparator()
{
@Override
public int compare(Object o1, Object o2)
{
return Longs.compare(((SerializablePair<Long, Object>) o1).lhs, ((SerializablePair<Long, Object>) o2).lhs);
}
};
private static final byte CACHE_TYPE_ID = 16;
private final String fieldName;
private final String name;
@JsonCreator
public DoubleFirstAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name;
this.fieldName = fieldName;
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new DoubleFirstAggregator(
name,
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeFloatColumnSelector(fieldName)
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleFirstBufferAggregator(
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeFloatColumnSelector(fieldName)
);
}
@Override
public Comparator getComparator()
{
return VALUE_COMPARATOR;
}
@Override
public Object combine(Object lhs, Object rhs)
{
return TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs;
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new DoubleFirstAggregatorFactory(name, name)
{
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new DoubleFirstAggregator(name, null, null)
{
@Override
public void aggregate()
{
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.get();
if (pair.lhs < firstTime) {
firstTime = pair.lhs;
firstValue = pair.rhs;
}
}
};
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new DoubleFirstBufferAggregator(null, null)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.get();
long firstTime = buf.getLong(position);
if (pair.lhs < firstTime) {
buf.putLong(position, pair.lhs);
buf.putDouble(position + Longs.BYTES, pair.rhs);
}
}
};
}
};
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new DoubleFirstAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{
Map map = (Map) object;
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).doubleValue());
}
@Override
public Object finalizeComputation(Object object)
{
return ((SerializablePair<Long, Double>) object).rhs;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
public List<String> requiredFields()
{
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName);
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
return ByteBuffer.allocate(2 + fieldNameBytes.length)
.put(CACHE_TYPE_ID)
.put(fieldNameBytes)
.put((byte)0xff)
.array();
}
@Override
public String getTypeName()
{
return "float";
}
@Override
public int getMaxIntermediateSize()
{
return Longs.BYTES + Doubles.BYTES;
}
@Override
public Object getAggregatorStartValue()
{
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DoubleFirstAggregatorFactory that = (DoubleFirstAggregatorFactory) o;
return fieldName.equals(that.fieldName) && name.equals(that.name);
}
@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fieldName.hashCode();
return result;
}
@Override
public String toString()
{
return "DoubleFirstAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.first;
import com.google.common.primitives.Longs;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import java.nio.ByteBuffer;
public class DoubleFirstBufferAggregator implements BufferAggregator
{
private final LongColumnSelector timeSelector;
private final FloatColumnSelector valueSelector;
public DoubleFirstBufferAggregator(LongColumnSelector timeSelector, FloatColumnSelector valueSelector)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, Long.MAX_VALUE);
buf.putDouble(position + Longs.BYTES, 0);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
long time = timeSelector.get();
long firstTime = buf.getLong(position);
if (time < firstTime) {
buf.putLong(position, time);
buf.putDouble(position + Longs.BYTES, valueSelector.get());
}
}
@Override
public Object get(ByteBuffer buf, int position)
{
return new SerializablePair<>(buf.getLong(position), buf.getDouble(position + Longs.BYTES));
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getDouble(position + Longs.BYTES);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position + Longs.BYTES);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.first;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.LongColumnSelector;
public class LongFirstAggregator implements Aggregator
{
private final LongColumnSelector valueSelector;
private final LongColumnSelector timeSelector;
private final String name;
protected long firstTime;
protected long firstValue;
public LongFirstAggregator(
String name,
LongColumnSelector timeSelector,
LongColumnSelector valueSelector
)
{
this.name = name;
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;
reset();
}
@Override
public void aggregate()
{
long time = timeSelector.get();
if (time < firstTime) {
firstTime = time;
firstValue = valueSelector.get();
}
}
@Override
public void reset()
{
firstTime = Long.MAX_VALUE;
firstValue = 0;
}
@Override
public Object get()
{
return new SerializablePair<>(firstTime, firstValue);
}
@Override
public float getFloat()
{
return (float) firstValue;
}
@Override
public String getName()
{
return name;
}
@Override
public void close()
{
}
@Override
public long getLong()
{
return firstValue;
}
}

View File

@ -0,0 +1,255 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.first;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import com.metamx.common.StringUtils;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
public class LongFirstAggregatorFactory extends AggregatorFactory
{
public static final Comparator VALUE_COMPARATOR = new Comparator()
{
@Override
public int compare(Object o1, Object o2)
{
return Longs.compare(((SerializablePair<Long, Long>) o1).rhs, ((SerializablePair<Long, Long>) o2).rhs);
}
};
private static final byte CACHE_TYPE_ID = 17;
private final String fieldName;
private final String name;
@JsonCreator
public LongFirstAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name;
this.fieldName = fieldName;
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new LongFirstAggregator(
name,
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeLongColumnSelector(fieldName)
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new LongFirstBufferAggregator(
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeLongColumnSelector(fieldName)
);
}
@Override
public Comparator getComparator()
{
return VALUE_COMPARATOR;
}
@Override
public Object combine(Object lhs, Object rhs)
{
return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs;
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new LongFirstAggregatorFactory(name, name)
{
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new LongFirstAggregator(name, null, null)
{
@Override
public void aggregate()
{
SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) selector.get();
if (pair.lhs < firstTime) {
firstTime = pair.lhs;
firstValue = pair.rhs;
}
}
};
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new LongFirstBufferAggregator(null, null)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) selector.get();
long firstTime = buf.getLong(position);
if (pair.lhs < firstTime) {
buf.putLong(position, pair.lhs);
buf.putLong(position + Longs.BYTES, pair.rhs);
}
}
};
}
};
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new LongFirstAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{
Map map = (Map) object;
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).longValue());
}
@Override
public Object finalizeComputation(Object object)
{
return ((SerializablePair<Long, Long>) object).rhs;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
public List<String> requiredFields()
{
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName);
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
return ByteBuffer.allocate(2 + fieldNameBytes.length)
.put(CACHE_TYPE_ID)
.put(fieldNameBytes)
.put((byte)0xff)
.array();
}
@Override
public String getTypeName()
{
return "long";
}
@Override
public int getMaxIntermediateSize()
{
return Longs.BYTES * 2;
}
@Override
public Object getAggregatorStartValue()
{
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LongFirstAggregatorFactory that = (LongFirstAggregatorFactory) o;
return fieldName.equals(that.fieldName) && name.equals(that.name);
}
@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fieldName.hashCode();
return result;
}
@Override
public String toString()
{
return "LongFirstAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.first;
import com.google.common.primitives.Longs;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.LongColumnSelector;
import java.nio.ByteBuffer;
public class LongFirstBufferAggregator implements BufferAggregator
{
private final LongColumnSelector timeSelector;
private final LongColumnSelector valueSelector;
public LongFirstBufferAggregator(LongColumnSelector timeSelector, LongColumnSelector valueSelector)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, Long.MAX_VALUE);
buf.putLong(position + Longs.BYTES, 0);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
long time = timeSelector.get();
long firstTime = buf.getLong(position);
if (time < firstTime) {
buf.putLong(position, time);
buf.putLong(position + Longs.BYTES, valueSelector.get());
}
}
@Override
public Object get(ByteBuffer buf, int position)
{
return new SerializablePair<>(buf.getLong(position), buf.getLong(position + Longs.BYTES));
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getLong(position + Longs.BYTES);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position + Longs.BYTES);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.last;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
public class DoubleLastAggregator implements Aggregator
{
private final FloatColumnSelector valueSelector;
private final LongColumnSelector timeSelector;
private final String name;
protected long lastTime;
protected double lastValue;
public DoubleLastAggregator(
String name,
LongColumnSelector timeSelector,
FloatColumnSelector valueSelector
)
{
this.name = name;
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;
reset();
}
@Override
public void aggregate()
{
long time = timeSelector.get();
if (time >= lastTime) {
lastTime = timeSelector.get();
lastValue = valueSelector.get();
}
}
@Override
public void reset()
{
lastTime = Long.MIN_VALUE;
lastValue = 0;
}
@Override
public Object get()
{
return new SerializablePair<>(lastTime, lastValue);
}
@Override
public float getFloat()
{
return (float) lastValue;
}
@Override
public String getName()
{
return name;
}
@Override
public void close()
{
}
@Override
public long getLong()
{
return (long) lastValue;
}
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.last;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;
import com.metamx.common.StringUtils;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
public class DoubleLastAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 18;
private final String fieldName;
private final String name;
@JsonCreator
public DoubleLastAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name;
this.fieldName = fieldName;
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new DoubleLastAggregator(
name,
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeFloatColumnSelector(fieldName)
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleLastBufferAggregator(
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeFloatColumnSelector(fieldName)
);
}
@Override
public Comparator getComparator()
{
return DoubleFirstAggregatorFactory.VALUE_COMPARATOR;
}
@Override
public Object combine(Object lhs, Object rhs)
{
return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs;
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new DoubleLastAggregatorFactory(name, name)
{
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new DoubleLastAggregator(name, null, null)
{
@Override
public void aggregate()
{
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.get();
if (pair.lhs >= lastTime) {
lastTime = pair.lhs;
lastValue = pair.rhs;
}
}
};
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new DoubleLastBufferAggregator(null, null)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.get();
long lastTime = buf.getLong(position);
if (pair.lhs >= lastTime) {
buf.putLong(position, pair.lhs);
buf.putDouble(position + Longs.BYTES, pair.rhs);
}
}
};
}
};
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new LongFirstAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{
Map map = (Map) object;
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).doubleValue());
}
@Override
public Object finalizeComputation(Object object)
{
return ((SerializablePair<Long, Double>) object).rhs;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
public List<String> requiredFields()
{
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName);
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
return ByteBuffer.allocate(2 + fieldNameBytes.length)
.put(CACHE_TYPE_ID)
.put(fieldNameBytes)
.put((byte)0xff)
.array();
}
@Override
public String getTypeName()
{
return "float";
}
@Override
public int getMaxIntermediateSize()
{
return Longs.BYTES + Doubles.BYTES;
}
@Override
public Object getAggregatorStartValue()
{
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DoubleLastAggregatorFactory that = (DoubleLastAggregatorFactory) o;
return fieldName.equals(that.fieldName) && name.equals(that.name);
}
@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fieldName.hashCode();
return result;
}
@Override
public String toString()
{
return "DoubleLastAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.last;
import com.google.common.primitives.Longs;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import java.nio.ByteBuffer;
public class DoubleLastBufferAggregator implements BufferAggregator
{
private final LongColumnSelector timeSelector;
private final FloatColumnSelector valueSelector;
public DoubleLastBufferAggregator(LongColumnSelector timeSelector, FloatColumnSelector valueSelector)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, Long.MIN_VALUE);
buf.putDouble(position + Longs.BYTES, 0);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
long time = timeSelector.get();
long lastTime = buf.getLong(position);
if (time >= lastTime) {
buf.putLong(position, time);
buf.putDouble(position + Longs.BYTES, valueSelector.get());
}
}
@Override
public Object get(ByteBuffer buf, int position)
{
return new SerializablePair<>(buf.getLong(position), buf.getDouble(position + Longs.BYTES));
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getDouble(position + Longs.BYTES);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position + Longs.BYTES);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.last;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.LongColumnSelector;
public class LongLastAggregator implements Aggregator
{
private final LongColumnSelector valueSelector;
private final LongColumnSelector timeSelector;
private final String name;
protected long lastTime;
protected long lastValue;
public LongLastAggregator(
String name,
LongColumnSelector timeSelector,
LongColumnSelector valueSelector
)
{
this.name = name;
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;
reset();
}
@Override
public void aggregate()
{
long time = timeSelector.get();
if (time >= lastTime) {
lastTime = timeSelector.get();
lastValue = valueSelector.get();
}
}
@Override
public void reset()
{
lastTime = Long.MIN_VALUE;
lastValue = 0;
}
@Override
public Object get()
{
return new SerializablePair<>(lastTime, lastValue);
}
@Override
public float getFloat()
{
return (float) lastValue;
}
@Override
public String getName()
{
return name;
}
@Override
public void close()
{
}
@Override
public long getLong()
{
return lastValue;
}
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.last;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import com.metamx.common.StringUtils;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
public class LongLastAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 19;
private final String fieldName;
private final String name;
@JsonCreator
public LongLastAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name;
this.fieldName = fieldName;
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new LongLastAggregator(
name,
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeLongColumnSelector(fieldName)
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new LongLastBufferAggregator(
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeLongColumnSelector(fieldName)
);
}
@Override
public Comparator getComparator()
{
return LongFirstAggregatorFactory.VALUE_COMPARATOR;
}
@Override
public Object combine(Object lhs, Object rhs)
{
return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs;
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new LongLastAggregatorFactory(name, name)
{
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new LongLastAggregator(name, null, null)
{
@Override
public void aggregate()
{
SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) selector.get();
if (pair.lhs >= lastTime) {
lastTime = pair.lhs;
lastValue = pair.rhs;
}
}
};
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new LongLastBufferAggregator(null, null)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) selector.get();
long lastTime = buf.getLong(position);
if (pair.lhs >= lastTime) {
buf.putLong(position, pair.lhs);
buf.putLong(position + Longs.BYTES, pair.rhs);
}
}
};
}
};
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new LongLastAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{
Map map = (Map) object;
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).longValue());
}
@Override
public Object finalizeComputation(Object object)
{
return ((SerializablePair<Long, Long>) object).rhs;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
public List<String> requiredFields()
{
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName);
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
return ByteBuffer.allocate(2 + fieldNameBytes.length)
.put(CACHE_TYPE_ID)
.put(fieldNameBytes)
.put((byte)0xff)
.array();
}
@Override
public String getTypeName()
{
return "long";
}
@Override
public int getMaxIntermediateSize()
{
return Longs.BYTES * 2;
}
@Override
public Object getAggregatorStartValue()
{
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LongLastAggregatorFactory that = (LongLastAggregatorFactory) o;
return name.equals(that.name) && fieldName.equals(that.fieldName);
}
@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fieldName.hashCode();
return result;
}
@Override
public String toString()
{
return "LongLastAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.last;
import com.google.common.primitives.Longs;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.LongColumnSelector;
import java.nio.ByteBuffer;
public class LongLastBufferAggregator implements BufferAggregator
{
private final LongColumnSelector timeSelector;
private final LongColumnSelector valueSelector;
public LongLastBufferAggregator(LongColumnSelector timeSelector, LongColumnSelector valueSelector)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, Long.MIN_VALUE);
buf.putLong(position + Longs.BYTES, 0);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
long time = timeSelector.get();
long lastTime = buf.getLong(position);
if (time >= lastTime) {
buf.putLong(position, time);
buf.putLong(position + Longs.BYTES, valueSelector.get());
}
}
@Override
public Object get(ByteBuffer buf, int position)
{
return new SerializablePair<>(buf.getLong(position), buf.getLong(position + Longs.BYTES));
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getLong(position + Longs.BYTES);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position + Longs.BYTES);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -55,7 +55,8 @@ public class GroupByQueryHelper
public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair( public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair(
final GroupByQuery query, final GroupByQuery query,
final GroupByQueryConfig config, final GroupByQueryConfig config,
StupidPool<ByteBuffer> bufferPool StupidPool<ByteBuffer> bufferPool,
final boolean combine
) )
{ {
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
@ -66,17 +67,23 @@ public class GroupByQueryHelper
// AllGranularity returns timeStart instead of Long.MIN_VALUE // AllGranularity returns timeStart instead of Long.MIN_VALUE
final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next(); final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next();
final List<AggregatorFactory> aggs = Lists.transform( final List<AggregatorFactory> aggs;
query.getAggregatorSpecs(), if (combine) {
new Function<AggregatorFactory, AggregatorFactory>() aggs = Lists.transform(
{ query.getAggregatorSpecs(),
@Override new Function<AggregatorFactory, AggregatorFactory>()
public AggregatorFactory apply(AggregatorFactory input)
{ {
return input.getCombiningFactory(); @Override
public AggregatorFactory apply(AggregatorFactory input)
{
return input.getCombiningFactory();
}
} }
} );
); } else {
aggs = query.getAggregatorSpecs();
}
final List<String> dimensions = Lists.transform( final List<String> dimensions = Lists.transform(
query.getDimensions(), query.getDimensions(),
new Function<DimensionSpec, String>() new Function<DimensionSpec, String>()
@ -178,13 +185,15 @@ public class GroupByQueryHelper
GroupByQuery query, GroupByQuery query,
GroupByQueryConfig config, GroupByQueryConfig config,
StupidPool<ByteBuffer> bufferPool, StupidPool<ByteBuffer> bufferPool,
Sequence<Row> rows Sequence<Row> rows,
boolean combine
) )
{ {
Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query, query,
config, config,
bufferPool bufferPool,
combine
); );
return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);

View File

@ -38,6 +38,7 @@ import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.MappedSequence;
import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequence;
import io.druid.query.BaseQuery; import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
@ -172,7 +173,21 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
runner, runner,
context context
); );
return strategySelector.strategize(query).processSubqueryResult(subquery, query, subqueryResult);
final Sequence<Row> finalizingResults;
if (GroupByQuery.getContextFinalize(subquery, false)) {
finalizingResults = new MappedSequence<>(
subqueryResult,
makePreComputeManipulatorFn(
subquery,
MetricManipulatorFns.finalizing()
)
);
} else {
finalizingResults = subqueryResult;
}
return strategySelector.strategize(query).processSubqueryResult(subquery, query, finalizingResults);
} else { } else {
return strategySelector.strategize(query).mergeResults(runner, query, context); return strategySelector.strategize(query).mergeResults(runner, query, context);
} }

View File

@ -116,7 +116,8 @@ public class GroupByStrategyV1 implements GroupByStrategy
) )
), ),
responseContext responseContext
) ),
true
); );
return new ResourceClosingSequence<>(query.applyLimit(GroupByQueryHelper.postAggregate(query, index)), index); return new ResourceClosingSequence<>(query.applyLimit(GroupByQueryHelper.postAggregate(query, index)), index);
@ -178,21 +179,26 @@ public class GroupByStrategyV1 implements GroupByStrategy
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec())) .setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
.build(); .build();
final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex( final IncrementalIndex innerQueryResultIndex = GroupByQueryHelper.makeIncrementalIndex(
innerQuery.withOverriddenContext( innerQuery.withOverriddenContext(
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
GroupByQueryHelper.CTX_KEY_SORT_RESULTS, true GroupByQueryHelper.CTX_KEY_SORT_RESULTS, true
) )
), ),
subqueryResult configSupplier.get(),
bufferPool,
subqueryResult,
false
); );
//Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which //Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which
//is ensured by QuerySegmentSpec. //is ensured by QuerySegmentSpec.
//GroupByQueryEngine can only process one interval at a time, so we need to call it once per interval //GroupByQueryEngine can only process one interval at a time, so we need to call it once per interval
//and concatenate the results. //and concatenate the results.
final IncrementalIndex outerQueryResultIndex = makeIncrementalIndex( final IncrementalIndex outerQueryResultIndex = GroupByQueryHelper.makeIncrementalIndex(
outerQuery, outerQuery,
configSupplier.get(),
bufferPool,
Sequences.concat( Sequences.concat(
Sequences.map( Sequences.map(
Sequences.simple(outerQuery.getIntervals()), Sequences.simple(outerQuery.getIntervals()),
@ -210,7 +216,8 @@ public class GroupByStrategyV1 implements GroupByStrategy
} }
} }
) )
) ),
true
); );
innerQueryResultIndex.close(); innerQueryResultIndex.close();
@ -221,11 +228,6 @@ public class GroupByStrategyV1 implements GroupByStrategy
); );
} }
private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence<Row> rows)
{
return GroupByQueryHelper.makeIncrementalIndex(query, configSupplier.get(), bufferPool, rows);
}
@Override @Override
public QueryRunner<Row> mergeRunners( public QueryRunner<Row> mergeRunners(
final ListeningExecutorService exec, final ListeningExecutorService exec,

View File

@ -106,6 +106,7 @@ public class QueryRunnerTestHelper
public static final DateTime minTime = new DateTime("2011-01-12T00:00:00.000Z"); public static final DateTime minTime = new DateTime("2011-01-12T00:00:00.000Z");
public static final QueryGranularity dayGran = QueryGranularities.DAY; public static final QueryGranularity dayGran = QueryGranularities.DAY;
public static final QueryGranularity monthGran = QueryGranularities.MONTH;
public static final QueryGranularity allGran = QueryGranularities.ALL; public static final QueryGranularity allGran = QueryGranularities.ALL;
public static final String timeDimension = "__time"; public static final String timeDimension = "__time";
public static final String marketDimension = "market"; public static final String marketDimension = "market";

View File

@ -0,0 +1,201 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.first;
import io.druid.collections.SerializablePair;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Pair;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.TestFloatColumnSelector;
import io.druid.query.aggregation.TestLongColumnSelector;
import io.druid.query.aggregation.TestObjectColumnSelector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.column.Column;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
public class DoubleFirstAggregationTest
{
private DoubleFirstAggregatorFactory doubleFirstAggFactory;
private DoubleFirstAggregatorFactory combiningAggFactory;
private ColumnSelectorFactory colSelectorFactory;
private TestLongColumnSelector timeSelector;
private TestFloatColumnSelector valueSelector;
private TestObjectColumnSelector objectSelector;
private float[] floatValues = {1.1f, 2.7f, 3.5f, 1.3f};
private long[] times = {12, 10, 5344, 7899999};
private SerializablePair[] pairs = {
new SerializablePair<>(1467225096L, 134.3d),
new SerializablePair<>(23163L, 1232.212d),
new SerializablePair<>(742L, 18d),
new SerializablePair<>(111111L, 233.5232d)
};
@Before
public void setup()
{
doubleFirstAggFactory = new DoubleFirstAggregatorFactory("billy", "nilly");
combiningAggFactory = (DoubleFirstAggregatorFactory) doubleFirstAggFactory.getCombiningFactory();
timeSelector = new TestLongColumnSelector(times);
valueSelector = new TestFloatColumnSelector(floatValues);
objectSelector = new TestObjectColumnSelector(pairs);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
EasyMock.expect(colSelectorFactory.makeFloatColumnSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("billy")).andReturn(objectSelector);
EasyMock.replay(colSelectorFactory);
}
@Test
public void testDoubleFirstAggregator()
{
DoubleFirstAggregator agg = (DoubleFirstAggregator) doubleFirstAggFactory.factorize(colSelectorFactory);
Assert.assertEquals("billy", agg.getName());
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Pair<Long, Double> result = (Pair<Long, Double>) agg.get();
Assert.assertEquals(times[1], result.lhs.longValue());
Assert.assertEquals(floatValues[1], result.rhs, 0.0001);
Assert.assertEquals((long) floatValues[1], agg.getLong());
Assert.assertEquals(floatValues[1], agg.getFloat(), 0.0001);
agg.reset();
Assert.assertEquals(0, ((Pair<Long, Double>) agg.get()).rhs, 0.0001);
}
@Test
public void testDoubleFirstBufferAggregator()
{
DoubleFirstBufferAggregator agg = (DoubleFirstBufferAggregator) doubleFirstAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleFirstAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Pair<Long, Double> result = (Pair<Long, Double>) agg.get(buffer, 0);
Assert.assertEquals(times[1], result.lhs.longValue());
Assert.assertEquals(floatValues[1], result.rhs, 0.0001);
Assert.assertEquals((long) floatValues[1], agg.getLong(buffer, 0));
Assert.assertEquals(floatValues[1], agg.getFloat(buffer, 0), 0.0001);
}
@Test
public void testCombine()
{
SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621);
SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4);
Assert.assertEquals(pair1, doubleFirstAggFactory.combine(pair1, pair2));
}
@Test
public void testDoubleFirstCombiningAggregator()
{
DoubleFirstAggregator agg = (DoubleFirstAggregator) combiningAggFactory.factorize(colSelectorFactory);
Assert.assertEquals("billy", agg.getName());
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Pair<Long, Double> result = (Pair<Long, Double>) agg.get();
Pair<Long, Double> expected = (Pair<Long, Double>)pairs[2];
Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs, 0.0001);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong());
Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001);
agg.reset();
Assert.assertEquals(0, ((Pair<Long, Double>) agg.get()).rhs, 0.0001);
}
@Test
public void testDoubleFirstCombiningBufferAggregator()
{
DoubleFirstBufferAggregator agg = (DoubleFirstBufferAggregator) combiningAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleFirstAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Pair<Long, Double> result = (Pair<Long, Double>) agg.get(buffer, 0);
Pair<Long, Double> expected = (Pair<Long, Double>)pairs[2];
Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs, 0.0001);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0));
Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001);
}
@Test
public void testSerde() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
String doubleSpecJson = "{\"type\":\"doubleFirst\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
Assert.assertEquals(doubleFirstAggFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class));
}
private void aggregate(
DoubleFirstAggregator agg
)
{
agg.aggregate();
timeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}
private void aggregate(
DoubleFirstBufferAggregator agg,
ByteBuffer buff,
int position
)
{
agg.aggregate(buff, position);
timeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}
}

View File

@ -0,0 +1,200 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.first;
import io.druid.collections.SerializablePair;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Pair;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.TestLongColumnSelector;
import io.druid.query.aggregation.TestObjectColumnSelector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.column.Column;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
public class LongFirstAggregationTest
{
private LongFirstAggregatorFactory longFirstAggFactory;
private LongFirstAggregatorFactory combiningAggFactory;
private ColumnSelectorFactory colSelectorFactory;
private TestLongColumnSelector timeSelector;
private TestLongColumnSelector valueSelector;
private TestObjectColumnSelector objectSelector;
private long[] longValues = {185, -216, -128751132, Long.MIN_VALUE};
private long[] times = {1123126751, 1784247991, 1854329816, 1000000000};
private SerializablePair[] pairs = {
new SerializablePair<>(1L, 113267L),
new SerializablePair<>(1L, 5437384L),
new SerializablePair<>(6L, 34583458L),
new SerializablePair<>(88L, 34583452L)
};
@Before
public void setup()
{
longFirstAggFactory = new LongFirstAggregatorFactory("billy", "nilly");
combiningAggFactory = (LongFirstAggregatorFactory) longFirstAggFactory.getCombiningFactory();
timeSelector = new TestLongColumnSelector(times);
valueSelector = new TestLongColumnSelector(longValues);
objectSelector = new TestObjectColumnSelector(pairs);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
EasyMock.expect(colSelectorFactory.makeLongColumnSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("billy")).andReturn(objectSelector);
EasyMock.replay(colSelectorFactory);
}
@Test
public void testLongFirstAggregator()
{
LongFirstAggregator agg = (LongFirstAggregator) longFirstAggFactory.factorize(colSelectorFactory);
Assert.assertEquals("billy", agg.getName());
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Pair<Long, Long> result = (Pair<Long, Long>) agg.get();
Assert.assertEquals(times[3], result.lhs.longValue());
Assert.assertEquals(longValues[3], result.rhs.longValue());
Assert.assertEquals(longValues[3], agg.getLong());
Assert.assertEquals(longValues[3], agg.getFloat(), 0.0001);
agg.reset();
Assert.assertEquals(0, ((Pair<Long, Long>) agg.get()).rhs, 0.0001);
}
@Test
public void testLongFirstBufferAggregator()
{
LongFirstBufferAggregator agg = (LongFirstBufferAggregator) longFirstAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[longFirstAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Pair<Long, Long> result = (Pair<Long, Long>) agg.get(buffer, 0);
Assert.assertEquals(times[3], result.lhs.longValue());
Assert.assertEquals(longValues[3], result.rhs.longValue());
Assert.assertEquals(longValues[3], agg.getLong(buffer, 0));
Assert.assertEquals(longValues[3], agg.getFloat(buffer, 0), 0.0001);
}
@Test
public void testCombine()
{
SerializablePair pair1 = new SerializablePair<>(1467225000L, 1263L);
SerializablePair pair2 = new SerializablePair<>(1467240000L, 752713L);
Assert.assertEquals(pair1, longFirstAggFactory.combine(pair1, pair2));
}
@Test
public void testLongFirstCombiningAggregator()
{
LongFirstAggregator agg = (LongFirstAggregator) combiningAggFactory.factorize(colSelectorFactory);
Assert.assertEquals("billy", agg.getName());
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Pair<Long, Long> result = (Pair<Long, Long>) agg.get();
Pair<Long, Long> expected = (Pair<Long, Long>)pairs[0];
Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong());
Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001);
agg.reset();
Assert.assertEquals(0, ((Pair<Long, Long>) agg.get()).rhs, 0.0001);
}
@Test
public void testLongFirstCombiningBufferAggregator()
{
LongFirstBufferAggregator agg = (LongFirstBufferAggregator) combiningAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[longFirstAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Pair<Long, Long> result = (Pair<Long, Long>) agg.get(buffer, 0);
Pair<Long, Long> expected = (Pair<Long, Long>)pairs[0];
Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0));
Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001);
}
@Test
public void testSerde() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
String longSpecJson = "{\"type\":\"longFirst\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
Assert.assertEquals(longFirstAggFactory, mapper.readValue(longSpecJson, AggregatorFactory.class));
}
private void aggregate(
LongFirstAggregator agg
)
{
agg.aggregate();
timeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}
private void aggregate(
LongFirstBufferAggregator agg,
ByteBuffer buff,
int position
)
{
agg.aggregate(buff, position);
timeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}
}

View File

@ -0,0 +1,201 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.last;
import io.druid.collections.SerializablePair;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Pair;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.TestFloatColumnSelector;
import io.druid.query.aggregation.TestLongColumnSelector;
import io.druid.query.aggregation.TestObjectColumnSelector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.column.Column;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
public class DoubleLastAggregationTest
{
private DoubleLastAggregatorFactory doubleLastAggFactory;
private DoubleLastAggregatorFactory combiningAggFactory;
private ColumnSelectorFactory colSelectorFactory;
private TestLongColumnSelector timeSelector;
private TestFloatColumnSelector valueSelector;
private TestObjectColumnSelector objectSelector;
private float[] floatValues = {1.1897f, 0.001f, 86.23f, 166.228f};
private long[] times = {8224, 6879, 2436, 7888};
private SerializablePair[] pairs = {
new SerializablePair<>(52782L, 134.3d),
new SerializablePair<>(65492L, 1232.212d),
new SerializablePair<>(69134L, 18.1233d),
new SerializablePair<>(11111L, 233.5232d)
};
@Before
public void setup()
{
doubleLastAggFactory = new DoubleLastAggregatorFactory("billy", "nilly");
combiningAggFactory = (DoubleLastAggregatorFactory) doubleLastAggFactory.getCombiningFactory();
timeSelector = new TestLongColumnSelector(times);
valueSelector = new TestFloatColumnSelector(floatValues);
objectSelector = new TestObjectColumnSelector(pairs);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
EasyMock.expect(colSelectorFactory.makeFloatColumnSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("billy")).andReturn(objectSelector);
EasyMock.replay(colSelectorFactory);
}
@Test
public void testDoubleLastAggregator()
{
DoubleLastAggregator agg = (DoubleLastAggregator) doubleLastAggFactory.factorize(colSelectorFactory);
Assert.assertEquals("billy", agg.getName());
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Pair<Long, Double> result = (Pair<Long, Double>) agg.get();
Assert.assertEquals(times[0], result.lhs.longValue());
Assert.assertEquals(floatValues[0], result.rhs, 0.0001);
Assert.assertEquals((long) floatValues[0], agg.getLong());
Assert.assertEquals(floatValues[0], agg.getFloat(), 0.0001);
agg.reset();
Assert.assertEquals(0, ((Pair<Long, Double>) agg.get()).rhs, 0.0001);
}
@Test
public void testDoubleLastBufferAggregator()
{
DoubleLastBufferAggregator agg = (DoubleLastBufferAggregator) doubleLastAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleLastAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Pair<Long, Double> result = (Pair<Long, Double>) agg.get(buffer, 0);
Assert.assertEquals(times[0], result.lhs.longValue());
Assert.assertEquals(floatValues[0], result.rhs, 0.0001);
Assert.assertEquals((long) floatValues[0], agg.getLong(buffer, 0));
Assert.assertEquals(floatValues[0], agg.getFloat(buffer, 0), 0.0001);
}
@Test
public void testCombine()
{
SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621);
SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4);
Assert.assertEquals(pair2, doubleLastAggFactory.combine(pair1, pair2));
}
@Test
public void testDoubleLastCombiningAggregator()
{
DoubleLastAggregator agg = (DoubleLastAggregator) combiningAggFactory.factorize(colSelectorFactory);
Assert.assertEquals("billy", agg.getName());
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Pair<Long, Double> result = (Pair<Long, Double>) agg.get();
Pair<Long, Double> expected = (Pair<Long, Double>)pairs[2];
Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs, 0.0001);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong());
Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001);
agg.reset();
Assert.assertEquals(0, ((Pair<Long, Double>) agg.get()).rhs, 0.0001);
}
@Test
public void testDoubleLastCombiningBufferAggregator()
{
DoubleLastBufferAggregator agg = (DoubleLastBufferAggregator) combiningAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleLastAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Pair<Long, Double> result = (Pair<Long, Double>) agg.get(buffer, 0);
Pair<Long, Double> expected = (Pair<Long, Double>)pairs[2];
Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs, 0.0001);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0));
Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001);
}
@Test
public void testSerde() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
String doubleSpecJson = "{\"type\":\"doubleLast\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
Assert.assertEquals(doubleLastAggFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class));
}
private void aggregate(
DoubleLastAggregator agg
)
{
agg.aggregate();
timeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}
private void aggregate(
DoubleLastBufferAggregator agg,
ByteBuffer buff,
int position
)
{
agg.aggregate(buff, position);
timeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}
}

View File

@ -0,0 +1,200 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.last;
import io.druid.collections.SerializablePair;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Pair;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.TestLongColumnSelector;
import io.druid.query.aggregation.TestObjectColumnSelector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.column.Column;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
public class LongLastAggregationTest
{
private LongLastAggregatorFactory longLastAggFactory;
private LongLastAggregatorFactory combiningAggFactory;
private ColumnSelectorFactory colSelectorFactory;
private TestLongColumnSelector timeSelector;
private TestLongColumnSelector valueSelector;
private TestObjectColumnSelector objectSelector;
private long[] longValues = {23216, 8635, 1547123, Long.MAX_VALUE};
private long[] times = {1467935723, 1467225653, 1601848932, 72515};
private SerializablePair[] pairs = {
new SerializablePair<>(12531L, 113267L),
new SerializablePair<>(123L, 5437384L),
new SerializablePair<>(125755L, 34583458L),
new SerializablePair<>(124L, 34283452L)
};
@Before
public void setup()
{
longLastAggFactory = new LongLastAggregatorFactory("billy", "nilly");
combiningAggFactory = (LongLastAggregatorFactory) longLastAggFactory.getCombiningFactory();
timeSelector = new TestLongColumnSelector(times);
valueSelector = new TestLongColumnSelector(longValues);
objectSelector = new TestObjectColumnSelector(pairs);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
EasyMock.expect(colSelectorFactory.makeLongColumnSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("billy")).andReturn(objectSelector);
EasyMock.replay(colSelectorFactory);
}
@Test
public void testLongLastAggregator()
{
LongLastAggregator agg = (LongLastAggregator) longLastAggFactory.factorize(colSelectorFactory);
Assert.assertEquals("billy", agg.getName());
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Pair<Long, Long> result = (Pair<Long, Long>) agg.get();
Assert.assertEquals(times[2], result.lhs.longValue());
Assert.assertEquals(longValues[2], result.rhs.longValue());
Assert.assertEquals(longValues[2], agg.getLong());
Assert.assertEquals(longValues[2], agg.getFloat(), 1);
agg.reset();
Assert.assertEquals(0, ((Pair<Long, Long>) agg.get()).rhs.longValue());
}
@Test
public void testLongLastBufferAggregator()
{
LongLastBufferAggregator agg = (LongLastBufferAggregator) longLastAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[longLastAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Pair<Long, Long> result = (Pair<Long, Long>) agg.get(buffer, 0);
Assert.assertEquals(times[2], result.lhs.longValue());
Assert.assertEquals(longValues[2], result.rhs.longValue());
Assert.assertEquals(longValues[2], agg.getLong(buffer, 0));
Assert.assertEquals(longValues[2], agg.getFloat(buffer, 0), 1);
}
@Test
public void testCombine()
{
SerializablePair pair1 = new SerializablePair<>(1467225000L, 64432L);
SerializablePair pair2 = new SerializablePair<>(1467240000L, 99999L);
Assert.assertEquals(pair2, longLastAggFactory.combine(pair1, pair2));
}
@Test
public void testLongLastCombiningAggregator()
{
LongLastAggregator agg = (LongLastAggregator) combiningAggFactory.factorize(colSelectorFactory);
Assert.assertEquals("billy", agg.getName());
aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);
Pair<Long, Long> result = (Pair<Long, Long>) agg.get();
Pair<Long, Long> expected = (Pair<Long, Long>)pairs[2];
Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong());
Assert.assertEquals(expected.rhs, agg.getFloat(), 1);
agg.reset();
Assert.assertEquals(0, ((Pair<Long, Long>) agg.get()).rhs.longValue());
}
@Test
public void testLongLastCombiningBufferAggregator()
{
LongLastBufferAggregator agg = (LongLastBufferAggregator) combiningAggFactory.factorizeBuffered(
colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[longLastAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
Pair<Long, Long> result = (Pair<Long, Long>) agg.get(buffer, 0);
Pair<Long, Long> expected = (Pair<Long, Long>)pairs[2];
Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0));
Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 1);
}
@Test
public void testSerde() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
String longSpecJson = "{\"type\":\"longLast\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
Assert.assertEquals(longLastAggFactory, mapper.readValue(longSpecJson, AggregatorFactory.class));
}
private void aggregate(
LongLastAggregator agg
)
{
agg.aggregate();
timeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}
private void aggregate(
LongLastBufferAggregator agg,
ByteBuffer buff,
int position
)
{
agg.aggregate(buff, position);
timeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}
}

View File

@ -60,11 +60,13 @@ import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory; import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.last.LongLastAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator; import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator; import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.ExpressionPostAggregator; import io.druid.query.aggregation.post.ExpressionPostAggregator;
@ -1703,6 +1705,67 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, ""); TestHelper.assertExpectedObjects(expectedResults, results, "");
} }
@Test
public void testGroupByWithFirstLast()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("market", "market")))
.setAggregatorSpecs(
Arrays.asList(
new LongFirstAggregatorFactory("first", "index"),
new LongLastAggregatorFactory("last", "index")
)
)
.setGranularity(QueryRunnerTestHelper.monthGran)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-01-01", "market", "spot", "first", 100L, "last", 155L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-01-01", "market", "total_market", "first", 1000L, "last", 1127L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-01-01", "market", "upfront", "first", 800L, "last", 943L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-02-01", "market", "spot", "first", 132L, "last", 114L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-02-01", "market", "total_market", "first", 1203L, "last", 1292L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-02-01", "market", "upfront", "first", 1667L, "last", 1101L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-01", "market", "spot", "first", 153L, "last", 125L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-01", "market", "total_market", "first", 1124L, "last", 1366L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-01", "market", "upfront", "first", 1166L, "last", 1063L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "market", "spot", "first", 135L, "last", 120L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "market", "total_market", "first", 1314L, "last", 1029L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "market", "upfront", "first", 1447L, "last", 780L)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testGroupByWithNoResult()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.emptyInterval)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("market", "market")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexLongSum,
QueryRunnerTestHelper.qualityCardinality,
new LongFirstAggregatorFactory("first", "index"),
new LongLastAggregatorFactory("last", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = ImmutableList.of();
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
Assert.assertEquals(expectedResults, results);
}
@Test @Test
public void testGroupByWithNullProducingDimExtractionFn() public void testGroupByWithNullProducingDimExtractionFn()
{ {
@ -5195,7 +5258,7 @@ public class GroupByQueryRunnerTest
} }
@Test @Test
public void testSubqueryWithOuterJavascriptAggregators() public void testSubqueryWithOuterDimJavascriptAggregators()
{ {
final GroupByQuery subquery = GroupByQuery final GroupByQuery subquery = GroupByQuery
.builder() .builder()
@ -5257,6 +5320,69 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, ""); TestHelper.assertExpectedObjects(expectedResults, results, "");
} }
@Test
public void testSubqueryWithOuterJavascriptAggregators()
{
final GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("market", "market"),
new DefaultDimensionSpec("quality", "quality")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("index", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "quality")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
new JavaScriptAggregatorFactory(
"js_agg",
Arrays.asList("index", "rows"),
"function(current, index, rows){return current + index + rows;}",
"function(){return 0;}",
"function(a,b){return a + b;}",
JavaScriptConfig.getDefault()
)
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "automotive", "js_agg", 136D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "business", "js_agg", 119D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "entertainment", "js_agg", 159D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "health", "js_agg", 121D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "mezzanine", "js_agg", 2873D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "news", "js_agg", 122D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "premium", "js_agg", 2903D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "technology", "js_agg", 79D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "travel", "js_agg", 120D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "automotive", "js_agg", 148D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "business", "js_agg", 113D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "entertainment", "js_agg", 167D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "health", "js_agg", 114D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "mezzanine", "js_agg", 2450D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "news", "js_agg", 115D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "premium", "js_agg", 2508D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "technology", "js_agg", 98D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "travel", "js_agg", 127D)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test @Test
public void testSubqueryWithHyperUniques() public void testSubqueryWithHyperUniques()
{ {
@ -5459,6 +5585,50 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, ""); TestHelper.assertExpectedObjects(expectedResults, results, "");
} }
@Test
public void testSubqueryWithFirstLast()
{
GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
.setDimensions(ImmutableList.<DimensionSpec>of(new DefaultDimensionSpec("market", "market")))
.setAggregatorSpecs(
ImmutableList.<AggregatorFactory>of(
QueryRunnerTestHelper.rowsCount,
new LongFirstAggregatorFactory("innerfirst", "index"),
new LongLastAggregatorFactory("innerlast", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.setContext(ImmutableMap.<String, Object>of("finalize", true))
.build();
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
.setDimensions(Lists.<DimensionSpec>newArrayList())
.setAggregatorSpecs(
ImmutableList.<AggregatorFactory>of(
new LongFirstAggregatorFactory("first", "innerfirst"),
new LongLastAggregatorFactory("last", "innerlast")
)
)
.setGranularity(QueryRunnerTestHelper.monthGran)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-01-01", "first", 100L, "last", 943L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-02-01", "first", 132L, "last", 1101L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-01", "first", 153L, "last", 1063L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "first", 135L, "last", 780L)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test @Test
public void testGroupByWithTimeColumn() public void testGroupByWithTimeColumn()
{ {

View File

@ -112,6 +112,13 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
super(runner, false); super(runner, false);
} }
@Override
public void testEmptyTimeseries()
{
// Skip this test because the timeseries test expects the empty range to have one entry, but group by
// does not expect anything
}
@Override @Override
public void testFullOnTimeseries() public void testFullOnTimeseries()
{ {

View File

@ -36,6 +36,8 @@ import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory; import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory; import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.query.extraction.MapLookupExtractor; import io.druid.query.extraction.MapLookupExtractor;
@ -111,6 +113,44 @@ public class TimeseriesQueryRunnerTest
this.descending = descending; this.descending = descending;
} }
@Test
public void testEmptyTimeseries()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.intervals(QueryRunnerTestHelper.emptyInterval)
.aggregators(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexDoubleSum,
new DoubleFirstAggregatorFactory("first", "index")
)
)
.descending(descending)
.build();
List<Result<TimeseriesResultValue>> expectedResults = ImmutableList.of(
new Result<>(
new DateTime("2020-04-02"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 0D,
"first", 0D
)
)
)
);
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
}
@Test @Test
public void testFullOnTimeseries() public void testFullOnTimeseries()
{ {
@ -1732,6 +1772,114 @@ public class TimeseriesQueryRunnerTest
assertExpectedResults(expectedResults, actualResults); assertExpectedResults(expectedResults, actualResults);
} }
@Test
public void testTimeseriesWithFirstLastAggregator()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.monthGran)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
ImmutableList.of(
new DoubleFirstAggregatorFactory("first", "index"),
new DoubleLastAggregatorFactory("last", "index")
)
)
.descending(descending)
.build();
// There's a difference between ascending and descending results since granularity of druid.sample.tsv is days,
// with multiple first and last times. The traversal order difference cause the first and last aggregator
// to select different value from the list of first and last dates
List<Result<TimeseriesResultValue>> expectedAscendingResults = ImmutableList.of(
new Result<>(
new DateTime("2011-01-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"first", new Float(100.000000).doubleValue(),
"last", new Float(943.497198).doubleValue()
)
)
),
new Result<>(
new DateTime("2011-02-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"first", new Float(132.123776).doubleValue(),
"last", new Float(1101.918270).doubleValue()
)
)
),
new Result<>(
new DateTime("2011-03-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"first", new Float(153.059937).doubleValue(),
"last", new Float(1063.201156).doubleValue()
)
)
),
new Result<>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"first", new Float(135.885094).doubleValue(),
"last", new Float(780.271977).doubleValue()
)
)
)
);
List<Result<TimeseriesResultValue>> expectedDescendingResults = ImmutableList.of(
new Result<>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"first", new Float(1234.247546).doubleValue(),
"last", new Float(106.793700).doubleValue()
)
)
),
new Result<>(
new DateTime("2011-03-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"first", new Float(1004.940887).doubleValue(),
"last", new Float(151.752485).doubleValue()
)
)
),
new Result<>(
new DateTime("2011-02-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"first", new Float(913.561076).doubleValue(),
"last", new Float(122.258195).doubleValue()
)
)
),
new Result<>(
new DateTime("2011-01-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"first", new Float(800.000000).doubleValue(),
"last", new Float(133.740047).doubleValue()
)
)
)
);
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
if (descending) {
TestHelper.assertExpectedResults(expectedDescendingResults, actualResults);
} else {
TestHelper.assertExpectedResults(expectedAscendingResults, actualResults);
}
}
@Test @Test
public void testTimeseriesWithMultiValueDimFilter1() public void testTimeseriesWithMultiValueDimFilter1()
{ {

View File

@ -47,12 +47,15 @@ import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory; import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
import io.druid.query.aggregation.last.LongLastAggregatorFactory;
import io.druid.query.dimension.ExtractionDimensionSpec; import io.druid.query.dimension.ExtractionDimensionSpec;
import io.druid.query.extraction.DimExtractionFn; import io.druid.query.extraction.DimExtractionFn;
import io.druid.query.extraction.ExtractionFn; import io.druid.query.extraction.ExtractionFn;
@ -172,6 +175,40 @@ public class TopNQueryRunnerTest
return mergeRunner.run(query, context); return mergeRunner.run(query, context);
} }
@Test
public void testEmptyTopN()
{
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(QueryRunnerTestHelper.marketDimension)
.metric(QueryRunnerTestHelper.indexMetric)
.threshold(4)
.intervals(QueryRunnerTestHelper.emptyInterval)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index"),
new DoubleFirstAggregatorFactory("first", "index")
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
List<Result<TopNResultValue>> expectedResults = ImmutableList.of(
new Result<>(
new DateTime("2020-04-02T00:00:00.000Z"),
new TopNResultValue(ImmutableList.of())
)
);
assertExpectedResults(expectedResults, query);
}
@Test @Test
public void testFullOnTopN() public void testFullOnTopN()
{ {
@ -449,6 +486,117 @@ public class TopNQueryRunnerTest
assertExpectedResults(expectedResults, query); assertExpectedResults(expectedResults, query);
} }
@Test
public void testTopNOverFirstLastAggregator()
{
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.monthGran)
.dimension(QueryRunnerTestHelper.marketDimension)
.metric("last")
.threshold(3)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Arrays.<AggregatorFactory>asList(
new LongFirstAggregatorFactory("first", "index"),
new LongLastAggregatorFactory("last", "index")
)
)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
new DateTime("2011-01-01T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
.put("first", 1000L)
.put("last", 1127L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "upfront")
.put("first", 800L)
.put("last", 943L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "spot")
.put("first", 100L)
.put("last", 155L)
.build()
)
)
),
new Result<>(
new DateTime("2011-02-01T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
.put("first", 1203L)
.put("last", 1292L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "upfront")
.put("first", 1667L)
.put("last", 1101L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "spot")
.put("first", 132L)
.put("last", 114L)
.build()
)
)
),
new Result<>(
new DateTime("2011-03-01T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
.put("first", 1124L)
.put("last", 1366L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "upfront")
.put("first", 1166L)
.put("last", 1063L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "spot")
.put("first", 153L)
.put("last", 125L)
.build()
)
)
),
new Result<>(
new DateTime("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
.put("first", 1314L)
.put("last", 1029L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "upfront")
.put("first", 1447L)
.put("last", 780L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "spot")
.put("first", 135L)
.put("last", 120L)
.build()
)
)
)
);
assertExpectedResults(expectedResults, query);
}
@Test @Test
public void testTopNBySegment() public void testTopNBySegment()
{ {