mirror of https://github.com/apache/druid.git
Merge pull request #61 from metamx/fix-cache-timezone
Cache did not preserve timezone information
This commit is contained in:
commit
5f18f368e3
|
@ -29,9 +29,11 @@ import com.metamx.common.guava.ConcatSequence;
|
||||||
import com.metamx.common.guava.Sequence;
|
import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
import com.metamx.druid.Query;
|
import com.metamx.druid.Query;
|
||||||
|
import com.metamx.druid.QueryGranularity;
|
||||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||||
import com.metamx.druid.index.v1.IncrementalIndex;
|
import com.metamx.druid.index.v1.IncrementalIndex;
|
||||||
import com.metamx.druid.initialization.Initialization;
|
import com.metamx.druid.initialization.Initialization;
|
||||||
|
import com.metamx.druid.input.MapBasedRow;
|
||||||
import com.metamx.druid.input.Row;
|
import com.metamx.druid.input.Row;
|
||||||
import com.metamx.druid.input.Rows;
|
import com.metamx.druid.input.Rows;
|
||||||
import com.metamx.druid.query.CacheStrategy;
|
import com.metamx.druid.query.CacheStrategy;
|
||||||
|
@ -119,7 +121,21 @@ public class GroupByQueryQueryToolChest implements QueryToolChest<Row, GroupByQu
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
return Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs()));
|
// convert millis back to timestamp according to granularity to preserve time zone information
|
||||||
|
return Sequences.map(
|
||||||
|
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
|
||||||
|
new Function<Row, Row>()
|
||||||
|
{
|
||||||
|
private final QueryGranularity granularity = query.getGranularity();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Row apply(Row input)
|
||||||
|
{
|
||||||
|
final MapBasedRow row = (MapBasedRow) input;
|
||||||
|
return new MapBasedRow(granularity.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import com.metamx.common.guava.MergeSequence;
|
||||||
import com.metamx.common.guava.Sequence;
|
import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.nary.BinaryFn;
|
import com.metamx.common.guava.nary.BinaryFn;
|
||||||
import com.metamx.druid.Query;
|
import com.metamx.druid.Query;
|
||||||
|
import com.metamx.druid.QueryGranularity;
|
||||||
import com.metamx.druid.ResultGranularTimestampComparator;
|
import com.metamx.druid.ResultGranularTimestampComparator;
|
||||||
import com.metamx.druid.TimeseriesBinaryFn;
|
import com.metamx.druid.TimeseriesBinaryFn;
|
||||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||||
|
@ -49,6 +50,7 @@ import org.joda.time.DateTime;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
import org.joda.time.Minutes;
|
import org.joda.time.Minutes;
|
||||||
import org.joda.time.Period;
|
import org.joda.time.Period;
|
||||||
|
import org.joda.time.format.ISODateTimeFormat;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
@ -206,6 +208,8 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
|
||||||
{
|
{
|
||||||
return new Function<Object, Result<TimeseriesResultValue>>()
|
return new Function<Object, Result<TimeseriesResultValue>>()
|
||||||
{
|
{
|
||||||
|
private final QueryGranularity granularity = query.getGranularity();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result<TimeseriesResultValue> apply(@Nullable Object input)
|
public Result<TimeseriesResultValue> apply(@Nullable Object input)
|
||||||
{
|
{
|
||||||
|
@ -215,7 +219,8 @@ public class TimeseriesQueryQueryToolChest implements QueryToolChest<Result<Time
|
||||||
Iterator<AggregatorFactory> aggsIter = aggs.iterator();
|
Iterator<AggregatorFactory> aggsIter = aggs.iterator();
|
||||||
Iterator<Object> resultIter = results.iterator();
|
Iterator<Object> resultIter = results.iterator();
|
||||||
|
|
||||||
DateTime timestamp = new DateTime(resultIter.next());
|
DateTime timestamp = granularity.toDateTime(((Number) resultIter.next()).longValue());
|
||||||
|
|
||||||
while (aggsIter.hasNext() && resultIter.hasNext()) {
|
while (aggsIter.hasNext() && resultIter.hasNext()) {
|
||||||
final AggregatorFactory factory = aggsIter.next();
|
final AggregatorFactory factory = aggsIter.next();
|
||||||
retVal.put(factory.getName(), factory.deserialize(resultIter.next()));
|
retVal.put(factory.getName(), factory.deserialize(resultIter.next()));
|
||||||
|
|
|
@ -71,7 +71,7 @@ public class Result<T> implements Comparable<Result<T>>
|
||||||
|
|
||||||
Result result = (Result) o;
|
Result result = (Result) o;
|
||||||
|
|
||||||
if (timestamp != null ? !timestamp.equals(result.timestamp) : result.timestamp != null) {
|
if (timestamp != null ? !(timestamp.isEqual(result.timestamp) && timestamp.getZone().getOffset(timestamp) == result.timestamp.getZone().getOffset(result.timestamp)) : result.timestamp != null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (value != null ? !value.equals(result.value) : result.value != null) {
|
if (value != null ? !value.equals(result.value) : result.value != null) {
|
||||||
|
|
|
@ -37,7 +37,7 @@ import java.util.Map;
|
||||||
*/
|
*/
|
||||||
public class MapBasedRow implements Row
|
public class MapBasedRow implements Row
|
||||||
{
|
{
|
||||||
private final long timestamp;
|
private final DateTime timestamp;
|
||||||
private final Map<String, Object> event;
|
private final Map<String, Object> event;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
|
@ -46,22 +46,21 @@ public class MapBasedRow implements Row
|
||||||
@JsonProperty("event") Map<String, Object> event
|
@JsonProperty("event") Map<String, Object> event
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this(timestamp.getMillis(), event);
|
this.timestamp = timestamp;
|
||||||
|
this.event = event;
|
||||||
}
|
}
|
||||||
|
|
||||||
public MapBasedRow(
|
public MapBasedRow(
|
||||||
long timestamp,
|
long timestamp,
|
||||||
Map<String, Object> event
|
Map<String, Object> event
|
||||||
)
|
) {
|
||||||
{
|
this(new DateTime(timestamp), event);
|
||||||
this.timestamp = timestamp;
|
|
||||||
this.event = event;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getTimestampFromEpoch()
|
public long getTimestampFromEpoch()
|
||||||
{
|
{
|
||||||
return timestamp;
|
return timestamp.getMillis();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -120,7 +119,7 @@ public class MapBasedRow implements Row
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public DateTime getTimestamp()
|
public DateTime getTimestamp()
|
||||||
{
|
{
|
||||||
return new DateTime(timestamp);
|
return timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -133,9 +132,38 @@ public class MapBasedRow implements Row
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return "MapBasedRow{" +
|
return "MapBasedRow{" +
|
||||||
"timestamp=" + new DateTime(timestamp) +
|
"timestamp=" + timestamp +
|
||||||
", event=" + event +
|
", event=" + event +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
MapBasedRow that = (MapBasedRow) o;
|
||||||
|
|
||||||
|
if (!event.equals(that.event)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!timestamp.equals(that.timestamp)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
int result = timestamp.hashCode();
|
||||||
|
result = 31 * result + event.hashCode();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ import com.metamx.druid.index.v1.processing.DimensionSelector;
|
||||||
import com.metamx.druid.input.MapBasedRow;
|
import com.metamx.druid.input.MapBasedRow;
|
||||||
import com.metamx.druid.input.Row;
|
import com.metamx.druid.input.Row;
|
||||||
import com.metamx.druid.query.dimension.DimensionSpec;
|
import com.metamx.druid.query.dimension.DimensionSpec;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
@ -347,7 +348,7 @@ public class GroupByQueryEngine
|
||||||
.transform(
|
.transform(
|
||||||
new Function<Map.Entry<ByteBuffer, Integer>, Row>()
|
new Function<Map.Entry<ByteBuffer, Integer>, Row>()
|
||||||
{
|
{
|
||||||
private final long timestamp = cursor.getTime().getMillis();
|
private final DateTime timestamp = cursor.getTime();
|
||||||
private final int[] increments = positionMaintainer.getIncrements();
|
private final int[] increments = positionMaintainer.getIncrements();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -112,7 +112,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
|
||||||
MapBasedRow row = (MapBasedRow) input;
|
MapBasedRow row = (MapBasedRow) input;
|
||||||
|
|
||||||
return new Result<TimeseriesResultValue>(
|
return new Result<TimeseriesResultValue>(
|
||||||
new DateTime(input.getTimestampFromEpoch()), new TimeseriesResultValue(row.getEvent())
|
row.getTimestamp(), new TimeseriesResultValue(row.getEvent())
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec;
|
||||||
import com.metamx.druid.result.Result;
|
import com.metamx.druid.result.Result;
|
||||||
import com.metamx.druid.result.TimeseriesResultValue;
|
import com.metamx.druid.result.TimeseriesResultValue;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.DateTimeZone;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
import org.joda.time.Period;
|
import org.joda.time.Period;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -257,6 +258,46 @@ public class TimeseriesQueryRunnerTest
|
||||||
TestHelper.assertExpectedResults(expectedResults, results);
|
TestHelper.assertExpectedResults(expectedResults, results);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeseriesWithTimeZone()
|
||||||
|
{
|
||||||
|
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||||
|
.intervals("2011-03-31T00:00:00-07:00/2011-04-02T00:00:00-07:00")
|
||||||
|
.aggregators(
|
||||||
|
Arrays.<AggregatorFactory>asList(
|
||||||
|
QueryRunnerTestHelper.rowsCount,
|
||||||
|
new LongSumAggregatorFactory(
|
||||||
|
"idx",
|
||||||
|
"index"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.granularity(new PeriodGranularity(new Period("P1D"), null, DateTimeZone.forID("America/Los_Angeles")))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-03-31", DateTimeZone.forID("America/Los_Angeles")),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of("rows", 13L, "idx", 6619L)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-04-01T", DateTimeZone.forID("America/Los_Angeles")),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of("rows", 13L, "idx", 5827L)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||||
|
runner.run(query),
|
||||||
|
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||||
|
);
|
||||||
|
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, results);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTimeseriesWithVaryingGran()
|
public void testTimeseriesWithVaryingGran()
|
||||||
|
|
Loading…
Reference in New Issue