mirror of https://github.com/apache/druid.git
fix timewarp with different bounds
This commit is contained in:
parent
b0b39d6ec0
commit
9f742a87a6
|
@ -22,7 +22,6 @@ package io.druid.query;
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.collect.ImmutableMap;
|
|
||||||
import com.metamx.common.guava.Sequence;
|
import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
import io.druid.data.input.MapBasedRow;
|
import io.druid.data.input.MapBasedRow;
|
||||||
|
@ -80,7 +79,7 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
|
||||||
return new QueryRunner<T>()
|
return new QueryRunner<T>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public Sequence<T> run(Query<T> query)
|
public Sequence<T> run(final Query<T> query)
|
||||||
{
|
{
|
||||||
final long offset = computeOffset(now);
|
final long offset = computeOffset(now);
|
||||||
|
|
||||||
|
@ -101,16 +100,22 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
|
||||||
if (input instanceof Result) {
|
if (input instanceof Result) {
|
||||||
Result res = (Result) input;
|
Result res = (Result) input;
|
||||||
Object value = res.getValue();
|
Object value = res.getValue();
|
||||||
|
final DateTime timestamp = res.getTimestamp().minus(offset);
|
||||||
if (value instanceof TimeBoundaryResultValue) {
|
if (value instanceof TimeBoundaryResultValue) {
|
||||||
TimeBoundaryResultValue boundary = (TimeBoundaryResultValue) value;
|
TimeBoundaryResultValue boundary = (TimeBoundaryResultValue) value;
|
||||||
value = new TimeBoundaryResultValue(
|
|
||||||
ImmutableMap.of(
|
DateTime minTime = null;
|
||||||
TimeBoundaryQuery.MIN_TIME, boundary.getMinTime().minus(offset),
|
try{
|
||||||
TimeBoundaryQuery.MAX_TIME, new DateTime(Math.min(boundary.getMaxTime().getMillis() - offset, now))
|
minTime = boundary.getMinTime();
|
||||||
)
|
} catch(IllegalArgumentException e) {}
|
||||||
);
|
|
||||||
|
return (T) ((TimeBoundaryQuery) query).buildResult(
|
||||||
|
timestamp,
|
||||||
|
minTime,
|
||||||
|
boundary.getMaxTime()
|
||||||
|
).iterator().next();
|
||||||
}
|
}
|
||||||
return (T) new Result(res.getTimestamp().minus(offset), value);
|
return (T) new Result(timestamp, value);
|
||||||
} else if (input instanceof MapBasedRow) {
|
} else if (input instanceof MapBasedRow) {
|
||||||
MapBasedRow row = (MapBasedRow) input;
|
MapBasedRow row = (MapBasedRow) input;
|
||||||
return (T) new MapBasedRow(row.getTimestamp().minus(offset), row.getEvent());
|
return (T) new MapBasedRow(row.getTimestamp().minus(offset), row.getEvent());
|
||||||
|
|
Loading…
Reference in New Issue