mirror of https://github.com/apache/druid.git
timezone support in groupby query
This commit is contained in:
parent
35058786d9
commit
55ae4c87dd
|
@ -32,6 +32,7 @@ import com.metamx.druid.Query;
|
|||
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||
import com.metamx.druid.index.v1.IncrementalIndex;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.input.MapBasedRow;
|
||||
import com.metamx.druid.input.Row;
|
||||
import com.metamx.druid.input.Rows;
|
||||
import com.metamx.druid.query.CacheStrategy;
|
||||
|
@ -119,7 +120,19 @@ public class GroupByQueryQueryToolChest implements QueryToolChest<Row, GroupByQu
|
|||
}
|
||||
);
|
||||
|
||||
return Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs()));
|
||||
// convert millis back to timestamp according to granularity to preserve time zone information
|
||||
return Sequences.map(
|
||||
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
|
||||
new Function<Row, Row>()
|
||||
{
|
||||
@Override
|
||||
public Row apply(@Nullable Row input)
|
||||
{
|
||||
final MapBasedRow row = (MapBasedRow) input;
|
||||
return new MapBasedRow(query.getGranularity().toDateTime(row.getTimestampFromEpoch()), row.getEvent());
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ import java.util.Map;
|
|||
*/
|
||||
public class MapBasedRow implements Row
|
||||
{
|
||||
private final long timestamp;
|
||||
private final DateTime timestamp;
|
||||
private final Map<String, Object> event;
|
||||
|
||||
@JsonCreator
|
||||
|
@ -46,22 +46,21 @@ public class MapBasedRow implements Row
|
|||
@JsonProperty("event") Map<String, Object> event
|
||||
)
|
||||
{
|
||||
this(timestamp.getMillis(), event);
|
||||
this.timestamp = timestamp;
|
||||
this.event = event;
|
||||
}
|
||||
|
||||
public MapBasedRow(
|
||||
long timestamp,
|
||||
Map<String, Object> event
|
||||
)
|
||||
{
|
||||
this.timestamp = timestamp;
|
||||
this.event = event;
|
||||
) {
|
||||
this(new DateTime(timestamp), event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTimestampFromEpoch()
|
||||
{
|
||||
return timestamp;
|
||||
return timestamp.getMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -120,7 +119,7 @@ public class MapBasedRow implements Row
|
|||
@JsonProperty
|
||||
public DateTime getTimestamp()
|
||||
{
|
||||
return new DateTime(timestamp);
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -133,9 +132,38 @@ public class MapBasedRow implements Row
|
|||
public String toString()
|
||||
{
|
||||
return "MapBasedRow{" +
|
||||
"timestamp=" + new DateTime(timestamp) +
|
||||
"timestamp=" + timestamp +
|
||||
", event=" + event +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
MapBasedRow that = (MapBasedRow) o;
|
||||
|
||||
if (!event.equals(that.event)) {
|
||||
return false;
|
||||
}
|
||||
if (!timestamp.equals(that.timestamp)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = timestamp.hashCode();
|
||||
result = 31 * result + event.hashCode();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ import com.metamx.druid.index.v1.processing.DimensionSelector;
|
|||
import com.metamx.druid.input.MapBasedRow;
|
||||
import com.metamx.druid.input.Row;
|
||||
import com.metamx.druid.query.dimension.DimensionSpec;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -347,7 +348,7 @@ public class GroupByQueryEngine
|
|||
.transform(
|
||||
new Function<Map.Entry<ByteBuffer, Integer>, Row>()
|
||||
{
|
||||
private final long timestamp = cursor.getTime().getMillis();
|
||||
private final DateTime timestamp = cursor.getTime();
|
||||
private final int[] increments = positionMaintainer.getIncrements();
|
||||
|
||||
@Override
|
||||
|
|
|
@ -112,7 +112,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
|
|||
MapBasedRow row = (MapBasedRow) input;
|
||||
|
||||
return new Result<TimeseriesResultValue>(
|
||||
new DateTime(input.getTimestampFromEpoch()), new TimeseriesResultValue(row.getEvent())
|
||||
row.getTimestamp(), new TimeseriesResultValue(row.getEvent())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue