Add support for timezone in segment granularity (#3528)

* Add support for timezone in segment granularity

* CR feedback. Handle null timezone during equals check.

* Include timezone in docs.
Add timezone for ArbitraryGranularitySpec.
This commit is contained in:
praveev 2016-10-03 08:15:42 -07:00 committed by Slim
parent 40f2fe7893
commit 43cdc675c7
8 changed files with 139 additions and 10 deletions

View File

@ -188,6 +188,7 @@ This spec is used to generated segments with uniform intervals.
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
| timezone | string | The timezone to represent the interval offsets in. | no (default == 'UTC')
### Arbitrary Granularity Spec
@ -199,6 +200,7 @@ This spec is used to generate segments with arbitrary intervals (it tries to cre
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
| timezone | string | The timezone to represent the interval offsets in. | no (default == 'UTC')
# IO Config

View File

@ -143,7 +143,8 @@ public class DetermineHashedPartitionsJob implements Jobby
config.getGranularitySpec().getSegmentGranularity(),
config.getGranularitySpec().getQueryGranularity(),
config.getGranularitySpec().isRollup(),
intervals
intervals,
config.getGranularitySpec().getTimezone()
)
);
log.info("Determined Intervals for Job [%s].", config.getSegmentGranularIntervals());

View File

@ -113,7 +113,9 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
segmentGranularity,
config.getGranularitySpec().getQueryGranularity(),
config.getGranularitySpec().isRollup(),
Lists.newArrayList(bucketsToRun)
Lists.newArrayList(bucketsToRun),
config.getGranularitySpec().getTimezone()
)
);

View File

@ -31,6 +31,7 @@ import com.metamx.common.guava.Comparators;
import io.druid.common.utils.JodaUtils;
import io.druid.granularity.QueryGranularity;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import java.util.List;
@ -42,17 +43,22 @@ public class ArbitraryGranularitySpec implements GranularitySpec
private final TreeSet<Interval> intervals;
private final QueryGranularity queryGranularity;
private final Boolean rollup;
private final String timezone;
@JsonCreator
public ArbitraryGranularitySpec(
@JsonProperty("queryGranularity") QueryGranularity queryGranularity,
@JsonProperty("rollup") Boolean rollup,
@JsonProperty("intervals") List<Interval> inputIntervals
@JsonProperty("intervals") List<Interval> inputIntervals,
@JsonProperty("timezone") String timezone
)
{
this.queryGranularity = queryGranularity;
this.rollup = rollup == null ? Boolean.TRUE : rollup;
this.intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
this.timezone = timezone;
final DateTimeZone timeZone = DateTimeZone.forID(this.timezone);
if (inputIntervals == null) {
inputIntervals = Lists.newArrayList();
@ -60,7 +66,11 @@ public class ArbitraryGranularitySpec implements GranularitySpec
// Insert all intervals
for (final Interval inputInterval : inputIntervals) {
intervals.add(inputInterval);
Interval adjustedInterval = inputInterval;
if (this.timezone != null) {
adjustedInterval = new Interval(inputInterval.getStartMillis(), inputInterval.getEndMillis(), timeZone);
}
intervals.add(adjustedInterval);
}
// Ensure intervals are non-overlapping (but they may abut each other)
@ -88,7 +98,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec
List<Interval> inputIntervals
)
{
this(queryGranularity, true, inputIntervals);
this(queryGranularity, true, inputIntervals, null);
}
@Override
@ -131,6 +141,13 @@ public class ArbitraryGranularitySpec implements GranularitySpec
return queryGranularity;
}
@Override
@JsonProperty("timezone")
public String getTimezone()
{
return timezone;
}
@Override
public boolean equals(Object o)
{
@ -149,6 +166,10 @@ public class ArbitraryGranularitySpec implements GranularitySpec
if (!rollup.equals(that.rollup)) {
return false;
}
if (timezone != null ? !timezone.equals(that.timezone): that.timezone != null) {
return false;
}
return !(queryGranularity != null
? !queryGranularity.equals(that.queryGranularity)
: that.queryGranularity != null);
@ -161,6 +182,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec
int result = intervals.hashCode();
result = 31 * result + rollup.hashCode();
result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0);
result = 31 * result + (timezone != null ? timezone.hashCode() : 0);
return result;
}
}

View File

@ -61,4 +61,5 @@ public interface GranularitySpec
public QueryGranularity getQueryGranularity();
public String getTimezone();
}

View File

@ -30,6 +30,7 @@ import io.druid.granularity.QueryGranularities;
import io.druid.granularity.QueryGranularity;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.DateTimeZone;
import java.util.List;
import java.util.SortedSet;
@ -44,27 +45,35 @@ public class UniformGranularitySpec implements GranularitySpec
private final Boolean rollup;
private final List<Interval> inputIntervals;
private final ArbitraryGranularitySpec wrappedSpec;
private final String timezone;
@JsonCreator
public UniformGranularitySpec(
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("queryGranularity") QueryGranularity queryGranularity,
@JsonProperty("rollup") Boolean rollup,
@JsonProperty("intervals") List<Interval> inputIntervals
@JsonProperty("intervals") List<Interval> inputIntervals,
@JsonProperty("timezone") String timezone
)
{
this.segmentGranularity = segmentGranularity == null ? DEFAULT_SEGMENT_GRANULARITY : segmentGranularity;
this.queryGranularity = queryGranularity == null ? DEFAULT_QUERY_GRANULARITY : queryGranularity;
this.rollup = rollup == null ? Boolean.TRUE : rollup;
this.timezone = timezone;
final DateTimeZone timeZone = DateTimeZone.forID(this.timezone);
if (inputIntervals != null) {
List<Interval> granularIntervals = Lists.newArrayList();
for (Interval inputInterval : inputIntervals) {
if (this.timezone != null) {
inputInterval = new Interval(inputInterval.getStartMillis(), inputInterval.getEndMillis(), timeZone);
}
Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval));
}
this.inputIntervals = ImmutableList.copyOf(inputIntervals);
this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, rollup, granularIntervals);
this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, rollup, granularIntervals, timezone);
} else {
this.inputIntervals = null;
this.wrappedSpec = null;
@ -77,7 +86,7 @@ public class UniformGranularitySpec implements GranularitySpec
List<Interval> inputIntervals
)
{
this(segmentGranularity, queryGranularity, true, inputIntervals);
this(segmentGranularity, queryGranularity, true, inputIntervals, null);
}
@Override
@ -123,6 +132,13 @@ public class UniformGranularitySpec implements GranularitySpec
return Optional.fromNullable(inputIntervals);
}
@Override
@JsonProperty("timezone")
public String getTimezone()
{
return timezone;
}
@Override
public boolean equals(Object o)
{
@ -144,6 +160,9 @@ public class UniformGranularitySpec implements GranularitySpec
if (!rollup.equals(that.rollup)) {
return false;
}
if (timezone != null ? !timezone.equals(that.timezone): that.timezone != null) {
return false;
}
if (inputIntervals != null ? !inputIntervals.equals(that.inputIntervals) : that.inputIntervals != null) {
return false;
}
@ -157,6 +176,7 @@ public class UniformGranularitySpec implements GranularitySpec
int result = segmentGranularity.hashCode();
result = 31 * result + queryGranularity.hashCode();
result = 31 * result + rollup.hashCode();
result = 31 * result + (timezone != null ? timezone.hashCode() : 0);
result = 31 * result + (inputIntervals != null ? inputIntervals.hashCode() : 0);
result = 31 * result + (wrappedSpec != null ? wrappedSpec.hashCode() : 0);
return result;

View File

@ -26,11 +26,15 @@ import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
public class ArbitraryGranularityTest
{
@ -134,7 +138,7 @@ public class ArbitraryGranularityTest
new Interval("2012-01-03T00Z/2012-01-04T00Z"),
new Interval("2012-01-01T00Z/2012-01-03T00Z")
);
final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularities.NONE, false, intervals);
final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularities.NONE, false, intervals, null);
Assert.assertFalse(spec.isRollup());
}
@ -175,4 +179,37 @@ public class ArbitraryGranularityTest
throw Throwables.propagate(e);
}
}
@Test
public void testTimezone() {
final GranularitySpec spec = new ArbitraryGranularitySpec(
null,
true,
Lists.newArrayList(
new Interval("2012-01-08T00-08:00/2012-01-11T00-08:00"),
new Interval("2012-01-07T00-08:00/2012-01-08T00-08:00"),
new Interval("2012-01-03T00-08:00/2012-01-04T00-08:00"),
new Interval("2012-01-01T00-08:00/2012-01-03T00-08:00"),
new Interval("2012-09-01T00-07:00/2012-09-03T00-07:00")
),
"America/Los_Angeles"
);
Assert.assertTrue(spec.bucketIntervals().isPresent());
final Optional<SortedSet<Interval>> sortedSetOptional = spec.bucketIntervals();
final SortedSet<Interval> intervals = sortedSetOptional.get();
final ISOChronology chrono = ISOChronology.getInstance(DateTimeZone.forID("America/Los_Angeles"));
final ArrayList<Interval> expectedIntervals = Lists.newArrayList(
new Interval("2012-01-01/2012-01-03", chrono),
new Interval("2012-01-03/2012-01-04", chrono),
new Interval("2012-01-07/2012-01-08", chrono),
new Interval("2012-01-08/2012-01-11", chrono),
new Interval("2012-09-01/2012-09-03", chrono)
);
Assert.assertEquals(expectedIntervals, new ArrayList<Interval>(intervals));
}
}

View File

@ -27,11 +27,15 @@ import com.metamx.common.Granularity;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
public class UniformGranularityTest
{
@ -106,7 +110,7 @@ public class UniformGranularityTest
new Interval("2012-01-03T00Z/2012-01-04T00Z"),
new Interval("2012-01-01T00Z/2012-01-03T00Z")
);
final GranularitySpec spec = new UniformGranularitySpec(Granularity.DAY, QueryGranularities.NONE, false, intervals);
final GranularitySpec spec = new UniformGranularitySpec(Granularity.DAY, QueryGranularities.NONE, false, intervals, null);
Assert.assertFalse(spec.isRollup());
}
@ -229,6 +233,46 @@ public class UniformGranularityTest
);
}
@Test
public void testTimezone() {
final GranularitySpec spec = new UniformGranularitySpec(
Granularity.DAY,
null,
true,
Lists.newArrayList(
new Interval("2012-01-08T00-08:00/2012-01-11T00-08:00"),
new Interval("2012-01-07T00-08:00/2012-01-08T00-08:00"),
new Interval("2012-01-03T00-08:00/2012-01-04T00-08:00"),
new Interval("2012-01-01T00-08:00/2012-01-03T00-08:00"),
new Interval("2012-09-01T00-07:00/2012-09-03T00-07:00")
),
"America/Los_Angeles"
);
Assert.assertTrue(spec.bucketIntervals().isPresent());
final Optional<SortedSet<Interval>> sortedSetOptional = spec.bucketIntervals();
final SortedSet<Interval> intervals = sortedSetOptional.get();
final ISOChronology chrono = ISOChronology.getInstance(DateTimeZone.forID("America/Los_Angeles"));
final ArrayList<Interval> expectedIntervals = Lists.newArrayList(
new Interval("2012-01-01/2012-01-02", chrono),
new Interval("2012-01-02/2012-01-03", chrono),
new Interval("2012-01-03/2012-01-04", chrono),
new Interval("2012-01-07/2012-01-08", chrono),
new Interval("2012-01-08/2012-01-09", chrono),
new Interval("2012-01-09/2012-01-10", chrono),
new Interval("2012-01-10/2012-01-11", chrono),
new Interval("2012-09-01/2012-09-02", chrono),
new Interval("2012-09-02/2012-09-03", chrono)
);
Assert.assertEquals(expectedIntervals, new ArrayList<Interval>(intervals));
}
private void notEqualsCheck(GranularitySpec spec1, GranularitySpec spec2) {
Assert.assertNotEquals(spec1, spec2);
Assert.assertNotEquals(spec1.hashCode(), spec2.hashCode());