Support Min/Max for Timestamp (#3299)

* Min/Max aggregator for Timestamp

* remove unused imports and method

* rebase and zip the test data

* add docs
This commit is contained in:
Keuntae Park 2016-11-15 16:00:21 +09:00 committed by Fangjin Yang
parent 2df98bcaa6
commit 094f5b851b
18 changed files with 1289 additions and 10 deletions

View File

@ -88,15 +88,19 @@ public class TimestampSpec
public DateTime extractTimestamp(Map<String, Object> input)
{
final Object o = input.get(timestampColumn);
return parseDateTime(input.get(timestampColumn));
}
public DateTime parseDateTime(Object input)
{
DateTime extracted = missingValue;
if (o != null) {
if (o.equals(parseCtx.lastTimeObject)) {
if (input != null) {
if (input.equals(parseCtx.lastTimeObject)) {
extracted = parseCtx.lastDateTime;
} else {
ParseCtx newCtx = new ParseCtx();
newCtx.lastTimeObject = o;
extracted = timestampConverter.apply(o);
newCtx.lastTimeObject = input;
extracted = timestampConverter.apply(input);
newCtx.lastDateTime = extracted;
parseCtx = newCtx;
}

View File

@ -0,0 +1,85 @@
---
layout: doc_page
---
# Timestamp Min/Max aggregators
To use this extension, make sure to [include](../../operations/including-extensions.html) `druid-time-min-max`.
These aggregators enable more precise calculation of min and max time of given events than `__time` column whose granularity is sparse, the same as query granularity.
To use this feature, a "timeMin" or "timeMax" aggregator must be included at indexing time.
They can apply to any columns that can be converted to timestamp, which include Long, DateTime, Timestamp, and String types.
For example, when a data set consists of timestamp, dimension, and metric value like followings.
```
2015-07-28T01:00:00.000Z A 1
2015-07-28T02:00:00.000Z A 1
2015-07-28T03:00:00.000Z A 1
2015-07-28T04:00:00.000Z B 1
2015-07-28T05:00:00.000Z A 1
2015-07-28T06:00:00.000Z B 1
2015-07-29T01:00:00.000Z C 1
2015-07-29T02:00:00.000Z C 1
2015-07-29T03:00:00.000Z A 1
2015-07-29T04:00:00.000Z A 1
```
At ingestion time, timeMin and timeMax aggregator can be included as other aggregators.
```json
{
"type": "timeMin",
"name": "tmin",
"fieldName": "<field_name, typically column specified in timestamp spec>"
}
```
```json
{
"type": "timeMax",
"name": "tmax",
"fieldName": "<field_name, typically column specified in timestamp spec>"
}
```
`name` is output name of aggregator and can be any string. `fieldName` is typically column specified in timestamp spec but can be any column that can be converted to timestamp.
To query for results, the same aggregators "timeMin" and "timeMax" is used.
```json
{
"queryType": "groupBy",
"dataSource": "timeMinMax",
"granularity": "DAY",
"dimensions": ["product"],
"aggregations": [
{
"type": "count",
"name": "count"
}
{
"type": "timeMin",
"name": "<output_name of timeMin>",
"fieldName": "tmin"
},
{
"type": "timeMax",
"name": "<output_name of timeMax>",
"fieldName": "tmax"
}
],
"intervals": [
"2010-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"
]
}
```
Then, result has min and max of timestamp, which is finer than query granularity.
```
2015-07-28T00:00:00.000Z A 4 2015-07-28T01:00:00.000Z 2015-07-28T05:00:00.000Z
2015-07-28T00:00:00.000Z B 2 2015-07-28T04:00:00.000Z 2015-07-28T06:00:00.000Z
2015-07-29T00:00:00.000Z A 2 2015-07-29T03:00:00.000Z 2015-07-29T04:00:00.000Z
2015-07-29T00:00:00.000Z C 2 2015-07-29T01:00:00.000Z 2015-07-29T02:00:00.000Z
```

View File

@ -58,6 +58,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|druid-parquet-extensions|Support for data in Apache Parquet data format. Requires druid-avro-extensions to be loaded.|[link](../development/extensions-contrib/parquet.html)|
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|
|druid-time-min-max|Min/Max aggregator for timestamp.|[link](../development/extensions-contrib/time-min-max.html)|
|sqlserver-metadata-storage|Microsoft SqlServer deep storage.|[link](../development/extensions-contrib/sqlserver.html)|
|graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)|
|statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)|

View File

@ -0,0 +1,79 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>druid</artifactId>
<groupId>io.druid</groupId>
<version>0.9.3-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions.contrib</groupId>
<artifactId>druid-time-min-max</artifactId>
<name>druid-time-min-max</name>
<description>Min/Max of timestamp</description>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,111 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.segment.ObjectColumnSelector;
import java.util.Comparator;
public class TimestampAggregator implements Aggregator
{
static final Comparator COMPARATOR = LongMaxAggregator.COMPARATOR;
static long combineValues(Object lhs, Object rhs)
{
return Math.max(((Number)lhs).longValue(), ((Number)rhs).longValue());
}
private final ObjectColumnSelector selector;
private final String name;
private final TimestampSpec timestampSpec;
private final Comparator<Long> comparator;
private final Long initValue;
private long most;
public TimestampAggregator(
String name,
ObjectColumnSelector selector,
TimestampSpec timestampSpec,
Comparator<Long> comparator,
Long initValue
)
{
this.name = name;
this.selector = selector;
this.timestampSpec = timestampSpec;
this.comparator = comparator;
this.initValue = initValue;
reset();
}
@Override
public void aggregate() {
Long value = TimestampAggregatorFactory.convertLong(timestampSpec, selector.get());
if (value != null) {
most = comparator.compare(most, value) > 0 ? most : value;
}
}
@Override
public void reset()
{
most = initValue;
}
@Override
public Object get()
{
return most;
}
@Override
public float getFloat()
{
return (float) most;
}
@Override
public String getName()
{
return name;
}
@Override
public void close()
{
// no resource to cleanup
}
@Override
public long getLong()
{
return most;
}
@Override
public Aggregator clone()
{
return new TimestampAggregator(name, selector, timestampSpec, comparator, initValue);
}
}

View File

@ -0,0 +1,227 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Longs;
import io.druid.common.utils.StringUtils;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.segment.ColumnSelectorFactory;
import org.joda.time.DateTime;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
public class TimestampAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 31;
final String name;
final String fieldName;
final String timeFormat;
private final Comparator<Long> comparator;
private final Long initValue;
private TimestampSpec timestampSpec;
TimestampAggregatorFactory(
String name,
String fieldName,
String timeFormat,
Comparator<Long> comparator,
Long initValue
)
{
this.name = name;
this.fieldName = fieldName;
this.timeFormat = timeFormat;
this.comparator = comparator;
this.initValue = initValue;
this.timestampSpec = new TimestampSpec(fieldName, timeFormat, null);
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new TimestampAggregator(name, metricFactory.makeObjectColumnSelector(fieldName), timestampSpec, comparator, initValue);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new TimestampBufferAggregator(metricFactory.makeObjectColumnSelector(fieldName), timestampSpec, comparator, initValue);
}
@Override
public Comparator getComparator()
{
return TimestampAggregator.COMPARATOR;
}
@Override
public Object combine(Object lhs, Object rhs)
{
return TimestampAggregator.combineValues(lhs, rhs);
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new TimestampAggregatorFactory(name, name, timeFormat, comparator, initValue);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new TimestampAggregatorFactory(fieldName, fieldName, timeFormat, comparator, initValue));
}
@Override
public Object deserialize(Object object)
{
return object;
}
@Override
public Object finalizeComputation(Object object)
{
return new DateTime((long)object);
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@JsonProperty
public String getTimeFormat()
{
return timeFormat;
}
@Override
public List<String> requiredFields()
{
return Arrays.asList(fieldName);
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
return ByteBuffer.allocate(1 + fieldNameBytes.length)
.put(CACHE_TYPE_ID).put(fieldNameBytes).array();
}
@Override
public String getTypeName()
{
return "long";
}
@Override
public int getMaxIntermediateSize()
{
return Longs.BYTES;
}
@Override
public Object getAggregatorStartValue()
{
return initValue;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TimestampAggregatorFactory that = (TimestampAggregatorFactory) o;
if (!Objects.equals(fieldName, that.fieldName)) {
return false;
}
if (!Objects.equals(name, that.name)) {
return false;
}
if (!Objects.equals(comparator, that.comparator)) {
return false;
}
if (!Objects.equals(initValue, that.initValue)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = fieldName != null ? fieldName.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
result = 31 * result + (comparator != null ? comparator.hashCode() : 0);
result = 31 * result + (initValue != null ? initValue.hashCode() : 0);
return result;
}
static Long convertLong(TimestampSpec timestampSpec, Object input)
{
if (input instanceof Number) {
return ((Number)input).longValue();
} else if (input instanceof DateTime) {
return ((DateTime)input).getMillis();
} else if (input instanceof Timestamp) {
return ((Timestamp)input).getTime();
} else if (input instanceof String) {
return timestampSpec.parseDateTime(input).getMillis();
}
return null;
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.segment.ObjectColumnSelector;
import java.nio.ByteBuffer;
import java.util.Comparator;
public class TimestampBufferAggregator implements BufferAggregator
{
private final ObjectColumnSelector selector;
private final TimestampSpec timestampSpec;
private final Comparator<Long> comparator;
private final Long initValue;
public TimestampBufferAggregator(
ObjectColumnSelector selector,
TimestampSpec timestampSpec,
Comparator<Long> comparator,
Long initValue)
{
this.selector = selector;
this.timestampSpec = timestampSpec;
this.comparator = comparator;
this.initValue = initValue;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, initValue);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
Long newTime = TimestampAggregatorFactory.convertLong(timestampSpec, selector.get());
if (newTime != null) {
Long prev = buf.getLong(position);
buf.putLong(position, comparator.compare(prev, newTime) > 0 ? prev: newTime);
}
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float)buf.getLong(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import java.util.Comparator;
public class TimestampMaxAggregatorFactory extends TimestampAggregatorFactory
{
@JsonCreator
public TimestampMaxAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName,
@JsonProperty("timeFormat") String timeFormat
)
{
super(name, fieldName, timeFormat, new Comparator<Long>() {
@Override
public int compare(Long o1, Long o2) {
return Long.compare(o1, o2);
}
}, Long.MIN_VALUE);
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
}
@Override
public String toString()
{
return "TimestampMaxAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", name='" + name + '\'' +
'}';
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import java.util.Comparator;
public class TimestampMinAggregatorFactory extends TimestampAggregatorFactory
{
@JsonCreator
public TimestampMinAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName,
@JsonProperty("timeFormat") String timeFormat
)
{
super(name, fieldName, timeFormat, new Comparator<Long>() {
@Override
public int compare(Long o1, Long o2) {
return -(Long.compare(o1, o2));
}
}, Long.MAX_VALUE);
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
}
@Override
public String toString()
{
return "TimestampMinAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", name='" + name + '\'' +
'}';
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.Arrays;
import java.util.List;
public class TimestampMinMaxModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.asList(
new SimpleModule("TimestampMinMaxModule")
.registerSubtypes(
new NamedType(TimestampMaxAggregatorFactory.class, "timeMax"),
new NamedType(TimestampMinAggregatorFactory.class, "timeMin")
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -0,0 +1 @@
io.druid.query.aggregation.TimestampMinMaxModule

View File

@ -0,0 +1,161 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import io.druid.granularity.QueryGranularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Result;
import io.druid.query.select.SelectResultValue;
import io.druid.segment.ColumnSelectorFactory;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.sql.Timestamp;
import java.util.List;
import java.util.zip.ZipFile;
@RunWith(Parameterized.class)
public class TimestampAggregationSelectTest
{
private AggregationTestHelper helper;
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private ColumnSelectorFactory selectorFactory;
private TestObjectColumnSelector selector;
private Timestamp[] values = new Timestamp[10];
@Parameterized.Parameters(name="{index}: Test for {0}")
public static Iterable<Object[]> constructorFeeder()
{
return Iterables.transform(
ImmutableList.of(
ImmutableList.of("timeMin", "tmin", TimestampMinAggregatorFactory.class, DateTime.parse("2011-01-12T01:00:00.000Z").getMillis()),
ImmutableList.of("timeMax", "tmax", TimestampMaxAggregatorFactory.class, DateTime.parse("2011-01-31T01:00:00.000Z").getMillis())
),
new Function<List<?>, Object[]>()
{
@Nullable
@Override
public Object[] apply(List<?> input)
{
return input.toArray();
}
}
);
}
private String aggType;
private String aggField;
private Class<? extends TimestampAggregatorFactory> aggClass;
private Long expected;
public TimestampAggregationSelectTest(String aggType, String aggField, Class<? extends TimestampAggregatorFactory> aggClass, Long expected)
{
this.aggType = aggType;
this.aggField = aggField;
this.aggClass = aggClass;
this.expected = expected;
}
@Before
public void setup() throws Exception
{
helper = AggregationTestHelper.createSelectQueryAggregationTestHelper(
new TimestampMinMaxModule().getJacksonModules(),
temporaryFolder
);
selector = new TestObjectColumnSelector(values);
selectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(selectorFactory.makeObjectColumnSelector("test")).andReturn(selector);
EasyMock.replay(selectorFactory);
}
@Test
public void testSimpleDataIngestionAndSelectTest() throws Exception
{
String recordParser = "{\n" +
" \"type\": \"string\",\n" +
" \"parseSpec\": {\n" +
" \"format\": \"tsv\",\n" +
" \"timestampSpec\": {\n" +
" \"column\": \"timestamp\",\n" +
" \"format\": \"auto\"\n" +
" },\n" +
" \"dimensionsSpec\": {\n" +
" \"dimensions\": [\n" +
" \"product\"\n" +
" ],\n" +
" \"dimensionExclusions\": [],\n" +
" \"spatialDimensions\": []\n" +
" },\n" +
" \"columns\": [\n" +
" \"timestamp\",\n" +
" \"cat\",\n" +
" \"product\",\n" +
" \"prefer\",\n" +
" \"prefer2\",\n" +
" \"pty_country\"\n" +
" ]\n" +
" }\n" +
"}";
String aggregator = "[\n" +
" {\n" +
" \"type\": \"" + aggType + "\",\n" +
" \"name\": \"" + aggField + "\",\n" +
" \"fieldName\": \"timestamp\"\n" +
" }\n" +
"]";
ZipFile zip = new ZipFile(new File(this.getClass().getClassLoader().getResource("druid.sample.tsv.zip").toURI()));
Sequence seq = helper.createIndexAndRunQueryOnSegment(
zip.getInputStream(zip.getEntry("druid.sample.tsv")),
recordParser,
aggregator,
0,
QueryGranularities.MONTH,
100,
Resources.toString(Resources.getResource("select.json"), Charsets.UTF_8)
);
Result<SelectResultValue> result = (Result<SelectResultValue>) Iterables.getOnlyElement(Sequences.toList(seq, Lists.newArrayList()));
Assert.assertEquals(36, result.getValue().getEvents().size());
Assert.assertEquals(expected, result.getValue().getEvents().get(0).getEvent().get(aggField));
}
}

View File

@ -0,0 +1,177 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.segment.ColumnSelectorFactory;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.sql.Timestamp;
import java.util.List;
import java.util.zip.ZipFile;
@RunWith(Parameterized.class)
public class TimestampGroupByAggregationTest
{
private AggregationTestHelper helper;
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private ColumnSelectorFactory selectorFactory;
private TestObjectColumnSelector selector;
private Timestamp[] values = new Timestamp[10];
@Parameterized.Parameters(name="{index}: Test for {0}")
public static Iterable<Object[]> constructorFeeder()
{
return Iterables.transform(
ImmutableList.of(
ImmutableList.of("timeMin", "tmin", "time_min", TimestampMinAggregatorFactory.class, DateTime.parse("2011-01-12T01:00:00.000Z")),
ImmutableList.of("timeMax", "tmax", "time_max", TimestampMaxAggregatorFactory.class, DateTime.parse("2011-01-31T01:00:00.000Z"))
),
new Function<List<?>, Object[]>()
{
@Nullable
@Override
public Object[] apply(List<?> input)
{
return input.toArray();
}
}
);
}
private String aggType;
private String aggField;
private String groupByField;
private Class<? extends TimestampAggregatorFactory> aggClass;
private DateTime expected;
public TimestampGroupByAggregationTest(String aggType, String aggField, String groupByField, Class<? extends TimestampAggregatorFactory> aggClass, DateTime expected)
{
this.aggType = aggType;
this.aggField = aggField;
this.groupByField = groupByField;
this.aggClass = aggClass;
this.expected = expected;
}
@Before
public void setup() throws Exception
{
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
new TimestampMinMaxModule().getJacksonModules(),
temporaryFolder
);
selector = new TestObjectColumnSelector(values);
selectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(selectorFactory.makeObjectColumnSelector("test")).andReturn(selector);
EasyMock.replay(selectorFactory);
}
@Test
public void testSimpleDataIngestionAndGroupByTest() throws Exception
{
String recordParser = "{\n" +
" \"type\": \"string\",\n" +
" \"parseSpec\": {\n" +
" \"format\": \"tsv\",\n" +
" \"timestampSpec\": {\n" +
" \"column\": \"timestamp\",\n" +
" \"format\": \"auto\"\n" +
" },\n" +
" \"dimensionsSpec\": {\n" +
" \"dimensions\": [\n" +
" \"product\"\n" +
" ],\n" +
" \"dimensionExclusions\": [],\n" +
" \"spatialDimensions\": []\n" +
" },\n" +
" \"columns\": [\n" +
" \"timestamp\",\n" +
" \"cat\",\n" +
" \"product\",\n" +
" \"prefer\",\n" +
" \"prefer2\",\n" +
" \"pty_country\"\n" +
" ]\n" +
" }\n" +
"}";
String aggregator = "[\n" +
" {\n" +
" \"type\": \"" + aggType + "\",\n" +
" \"name\": \"" + aggField + "\",\n" +
" \"fieldName\": \"timestamp\"\n" +
" }\n" +
"]";
String groupBy = "{\n" +
" \"queryType\": \"groupBy\",\n" +
" \"dataSource\": \"test_datasource\",\n" +
" \"granularity\": \"MONTH\",\n" +
" \"dimensions\": [\"product\"],\n" +
" \"aggregations\": [\n" +
" {\n" +
" \"type\": \"" + aggType + "\",\n" +
" \"name\": \"" + groupByField + "\",\n" +
" \"fieldName\": \"" + aggField + "\"\n" +
" }\n" +
" ],\n" +
" \"intervals\": [\n" +
" \"2011-01-01T00:00:00.000Z/2011-05-01T00:00:00.000Z\"\n" +
" ]\n" +
"}";
ZipFile zip = new ZipFile(new File(this.getClass().getClassLoader().getResource("druid.sample.tsv.zip").toURI()));
Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment(
zip.getInputStream(zip.getEntry("druid.sample.tsv")),
recordParser,
aggregator,
0,
QueryGranularities.MONTH,
100,
groupBy
);
List<Row> results = Sequences.toList(seq, Lists.<Row>newArrayList());
Assert.assertEquals(36, results.size());
Assert.assertEquals(expected, ((MapBasedRow)results.get(0)).getEvent().get(groupByField));
}
}

View File

@ -0,0 +1,179 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Longs;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.druid.guice.GuiceInjectors;
import io.druid.initialization.Initialization;
import io.druid.segment.ColumnSelectorFactory;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.List;
@RunWith(Parameterized.class)
public class TimestampMinMaxAggregatorTest
{
Injector injector;
ObjectMapper mapper;
private TimestampAggregatorFactory aggregatorFactory;
private ColumnSelectorFactory selectorFactory;
private TestObjectColumnSelector selector;
private Timestamp[] values = {
Timestamp.valueOf("2014-01-02 11:00:00"),
Timestamp.valueOf("2014-01-02 01:00:00"),
Timestamp.valueOf("2014-01-02 05:00:00"),
Timestamp.valueOf("2014-01-02 12:00:00"),
Timestamp.valueOf("2014-01-02 12:00:00"),
Timestamp.valueOf("2014-01-02 13:00:00"),
Timestamp.valueOf("2014-01-02 06:00:00"),
Timestamp.valueOf("2014-01-02 17:00:00"),
Timestamp.valueOf("2014-01-02 12:00:00"),
Timestamp.valueOf("2014-01-02 02:00:00")
};
@Parameterized.Parameters(name="{index}: Test for {0}")
public static Iterable<Object[]> constructorFeeder()
{
return Iterables.transform(
ImmutableList.of(
ImmutableList.of("timeMin", TimestampMinAggregatorFactory.class, Long.MAX_VALUE, Timestamp.valueOf("2014-01-02 01:00:00")),
ImmutableList.of("timeMax", TimestampMaxAggregatorFactory.class, Long.MIN_VALUE, Timestamp.valueOf("2014-01-02 17:00:00"))
),
new Function<List<?>, Object[]>()
{
@Nullable
@Override
public Object[] apply(List<?> input)
{
return input.toArray();
}
}
);
}
private String aggType;
private Class<? extends TimestampAggregatorFactory> aggClass;
private Long initValue;
private Timestamp expected;
public TimestampMinMaxAggregatorTest(String aggType, Class<? extends TimestampAggregatorFactory> aggClass, Long initValue, Timestamp expected)
{
this.aggType = aggType;
this.aggClass = aggClass;
this.expected = expected;
this.initValue = initValue;
}
@Before
public void setup() throws Exception
{
injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
}
},
new TimestampMinMaxModule()
)
);
mapper = injector.getInstance(ObjectMapper.class);
String json = "{\"type\":\"" + aggType + "\",\"name\":\"" + aggType + "\",\"fieldName\":\"test\"}";
aggregatorFactory = mapper.readValue(json, aggClass);
selector = new TestObjectColumnSelector(values);
selectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(selectorFactory.makeObjectColumnSelector("test")).andReturn(selector);
EasyMock.replay(selectorFactory);
}
@Test
public void testAggregator()
{
TimestampAggregator aggregator = (TimestampAggregator) aggregatorFactory.factorize(selectorFactory);
Assert.assertEquals(aggType, aggregator.getName());
for (Timestamp value: values) {
aggregate(selector, aggregator);
}
Assert.assertEquals(expected, new Timestamp(aggregator.getLong()));
aggregator.reset();
Assert.assertEquals(initValue, aggregator.get());
}
@Test
public void testBufferAggregator()
{
TimestampBufferAggregator aggregator = (TimestampBufferAggregator) aggregatorFactory.factorizeBuffered(selectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[Longs.BYTES]);
aggregator.init(buffer, 0);
for (Timestamp value: values) {
aggregate(selector, aggregator, buffer, 0);
}
Assert.assertEquals(expected, new Timestamp(aggregator.getLong(buffer, 0)));
aggregator.init(buffer, 0);
Assert.assertEquals(initValue, aggregator.get(buffer, 0));
}
private void aggregate(TestObjectColumnSelector selector, TimestampAggregator agg)
{
agg.aggregate();
selector.increment();
}
private void aggregate(TestObjectColumnSelector selector, TimestampBufferAggregator agg, ByteBuffer buf, int pos)
{
agg.aggregate(buf, pos);
selector.increment();
}
}

View File

@ -0,0 +1,11 @@
{
"queryType": "select",
"dataSource": "test_datasource",
"dimensions":[],
"metrics":[],
"granularity": "ALL",
"intervals": [
"2011-01-01T00:00:00.000Z/2011-05-01T00:00:00.000Z"
],
"pagingSpec":{"pagingIdentifiers": {}, "threshold":100}
}

View File

@ -111,6 +111,7 @@
<module>extensions-contrib/parquet-extensions</module>
<module>extensions-contrib/statsd-emitter</module>
<module>extensions-contrib/orc-extensions</module>
<module>extensions-contrib/time-min-max</module>
</modules>
<dependencyManagement>

View File

@ -20,6 +20,7 @@
package io.druid.segment.incremental;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Enums;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
@ -172,13 +173,10 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
};
if (!deserializeComplexMetrics) {
if ((Enums.getIfPresent(ValueType.class, typeName.toUpperCase()).isPresent() && !typeName.equalsIgnoreCase(ValueType.COMPLEX.name()))
|| !deserializeComplexMetrics) {
return rawColumnSelector;
} else {
if (typeName.equals("float")) {
return rawColumnSelector;
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);