From 43cdc675c7780c5dafe3903b51e589cee4c7d776 Mon Sep 17 00:00:00 2001 From: praveev Date: Mon, 3 Oct 2016 08:15:42 -0700 Subject: [PATCH] Add support for timezone in segment granularity (#3528) * Add support for timezone in segment granularity * CR feedback. Handle null timezone during equals check. * Include timezone in docs. Add timezone for ArbitraryGranularitySpec. --- docs/content/ingestion/index.md | 2 + .../indexer/DetermineHashedPartitionsJob.java | 3 +- .../path/GranularUnprocessedPathSpec.java | 4 +- .../granularity/ArbitraryGranularitySpec.java | 28 +++++++++-- .../indexing/granularity/GranularitySpec.java | 1 + .../granularity/UniformGranularitySpec.java | 26 +++++++++-- .../granularity/ArbitraryGranularityTest.java | 39 +++++++++++++++- .../granularity/UniformGranularityTest.java | 46 ++++++++++++++++++- 8 files changed, 139 insertions(+), 10 deletions(-) diff --git a/docs/content/ingestion/index.md b/docs/content/ingestion/index.md index cff399205a6..d178e088bed 100644 --- a/docs/content/ingestion/index.md +++ b/docs/content/ingestion/index.md @@ -188,6 +188,7 @@ This spec is used to generated segments with uniform intervals. | queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') | | rollup | boolean | rollup or not | no (default == true) | | intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time | +| timezone | string | The timezone to represent the interval offsets in. | no (default == 'UTC') ### Arbitrary Granularity Spec @@ -199,6 +200,7 @@ This spec is used to generate segments with arbitrary intervals (it tries to cre | queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') | | rollup | boolean | rollup or not | no (default == true) | | intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time | +| timezone | string | The timezone to represent the interval offsets in. | no (default == 'UTC') # IO Config diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 2562c3573a6..65fc52d5e74 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -143,7 +143,8 @@ public class DetermineHashedPartitionsJob implements Jobby config.getGranularitySpec().getSegmentGranularity(), config.getGranularitySpec().getQueryGranularity(), config.getGranularitySpec().isRollup(), - intervals + intervals, + config.getGranularitySpec().getTimezone() ) ); log.info("Determined Intervals for Job [%s].", config.getSegmentGranularIntervals()); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java index 2d11ebe3521..0ad2353b1f6 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java @@ -113,7 +113,9 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec segmentGranularity, config.getGranularitySpec().getQueryGranularity(), config.getGranularitySpec().isRollup(), - Lists.newArrayList(bucketsToRun) + Lists.newArrayList(bucketsToRun), + config.getGranularitySpec().getTimezone() + ) ); diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java index e7294b1bcee..0f542bb95a3 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java @@ -31,6 +31,7 @@ import com.metamx.common.guava.Comparators; import io.druid.common.utils.JodaUtils; import io.druid.granularity.QueryGranularity; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Interval; import java.util.List; @@ -42,17 +43,22 @@ public class ArbitraryGranularitySpec implements GranularitySpec private final TreeSet intervals; private final QueryGranularity queryGranularity; private final Boolean rollup; + private final String timezone; @JsonCreator public ArbitraryGranularitySpec( @JsonProperty("queryGranularity") QueryGranularity queryGranularity, @JsonProperty("rollup") Boolean rollup, - @JsonProperty("intervals") List inputIntervals + @JsonProperty("intervals") List inputIntervals, + @JsonProperty("timezone") String timezone + ) { this.queryGranularity = queryGranularity; this.rollup = rollup == null ? Boolean.TRUE : rollup; this.intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd()); + this.timezone = timezone; + final DateTimeZone timeZone = DateTimeZone.forID(this.timezone); if (inputIntervals == null) { inputIntervals = Lists.newArrayList(); @@ -60,7 +66,11 @@ public class ArbitraryGranularitySpec implements GranularitySpec // Insert all intervals for (final Interval inputInterval : inputIntervals) { - intervals.add(inputInterval); + Interval adjustedInterval = inputInterval; + if (this.timezone != null) { + adjustedInterval = new Interval(inputInterval.getStartMillis(), inputInterval.getEndMillis(), timeZone); + } + intervals.add(adjustedInterval); } // Ensure intervals are non-overlapping (but they may abut each other) @@ -88,7 +98,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec List inputIntervals ) { - this(queryGranularity, true, inputIntervals); + this(queryGranularity, true, inputIntervals, null); } @Override @@ -131,6 +141,13 @@ public class ArbitraryGranularitySpec implements GranularitySpec return queryGranularity; } + @Override + @JsonProperty("timezone") + public String getTimezone() + { + return timezone; + } + @Override public boolean equals(Object o) { @@ -149,6 +166,10 @@ public class ArbitraryGranularitySpec implements GranularitySpec if (!rollup.equals(that.rollup)) { return false; } + if (timezone != null ? !timezone.equals(that.timezone): that.timezone != null) { + return false; + } + return !(queryGranularity != null ? !queryGranularity.equals(that.queryGranularity) : that.queryGranularity != null); @@ -161,6 +182,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec int result = intervals.hashCode(); result = 31 * result + rollup.hashCode(); result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0); + result = 31 * result + (timezone != null ? timezone.hashCode() : 0); return result; } } diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java index e7e0c09eedb..331ae068204 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java @@ -61,4 +61,5 @@ public interface GranularitySpec public QueryGranularity getQueryGranularity(); + public String getTimezone(); } diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java index 2b5366a104f..de0aeeef4fa 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java @@ -30,6 +30,7 @@ import io.druid.granularity.QueryGranularities; import io.druid.granularity.QueryGranularity; import org.joda.time.DateTime; import org.joda.time.Interval; +import org.joda.time.DateTimeZone; import java.util.List; import java.util.SortedSet; @@ -44,27 +45,35 @@ public class UniformGranularitySpec implements GranularitySpec private final Boolean rollup; private final List inputIntervals; private final ArbitraryGranularitySpec wrappedSpec; + private final String timezone; @JsonCreator public UniformGranularitySpec( @JsonProperty("segmentGranularity") Granularity segmentGranularity, @JsonProperty("queryGranularity") QueryGranularity queryGranularity, @JsonProperty("rollup") Boolean rollup, - @JsonProperty("intervals") List inputIntervals + @JsonProperty("intervals") List inputIntervals, + @JsonProperty("timezone") String timezone ) { this.segmentGranularity = segmentGranularity == null ? DEFAULT_SEGMENT_GRANULARITY : segmentGranularity; this.queryGranularity = queryGranularity == null ? DEFAULT_QUERY_GRANULARITY : queryGranularity; this.rollup = rollup == null ? Boolean.TRUE : rollup; + this.timezone = timezone; + final DateTimeZone timeZone = DateTimeZone.forID(this.timezone); if (inputIntervals != null) { List granularIntervals = Lists.newArrayList(); for (Interval inputInterval : inputIntervals) { + if (this.timezone != null) { + inputInterval = new Interval(inputInterval.getStartMillis(), inputInterval.getEndMillis(), timeZone); + } + Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval)); } this.inputIntervals = ImmutableList.copyOf(inputIntervals); - this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, rollup, granularIntervals); + this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, rollup, granularIntervals, timezone); } else { this.inputIntervals = null; this.wrappedSpec = null; @@ -77,7 +86,7 @@ public class UniformGranularitySpec implements GranularitySpec List inputIntervals ) { - this(segmentGranularity, queryGranularity, true, inputIntervals); + this(segmentGranularity, queryGranularity, true, inputIntervals, null); } @Override @@ -123,6 +132,13 @@ public class UniformGranularitySpec implements GranularitySpec return Optional.fromNullable(inputIntervals); } + @Override + @JsonProperty("timezone") + public String getTimezone() + { + return timezone; + } + @Override public boolean equals(Object o) { @@ -144,6 +160,9 @@ public class UniformGranularitySpec implements GranularitySpec if (!rollup.equals(that.rollup)) { return false; } + if (timezone != null ? !timezone.equals(that.timezone): that.timezone != null) { + return false; + } if (inputIntervals != null ? !inputIntervals.equals(that.inputIntervals) : that.inputIntervals != null) { return false; } @@ -157,6 +176,7 @@ public class UniformGranularitySpec implements GranularitySpec int result = segmentGranularity.hashCode(); result = 31 * result + queryGranularity.hashCode(); result = 31 * result + rollup.hashCode(); + result = 31 * result + (timezone != null ? timezone.hashCode() : 0); result = 31 * result + (inputIntervals != null ? inputIntervals.hashCode() : 0); result = 31 * result + (wrappedSpec != null ? wrappedSpec.hashCode() : 0); return result; diff --git a/server/src/test/java/io/druid/segment/indexing/granularity/ArbitraryGranularityTest.java b/server/src/test/java/io/druid/segment/indexing/granularity/ArbitraryGranularityTest.java index cdb02c71bcc..e2fb8bfa1dc 100644 --- a/server/src/test/java/io/druid/segment/indexing/granularity/ArbitraryGranularityTest.java +++ b/server/src/test/java/io/druid/segment/indexing/granularity/ArbitraryGranularityTest.java @@ -26,11 +26,15 @@ import com.google.common.collect.Lists; import io.druid.granularity.QueryGranularities; import io.druid.jackson.DefaultObjectMapper; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Interval; +import org.joda.time.chrono.ISOChronology; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.List; +import java.util.SortedSet; public class ArbitraryGranularityTest { @@ -134,7 +138,7 @@ public class ArbitraryGranularityTest new Interval("2012-01-03T00Z/2012-01-04T00Z"), new Interval("2012-01-01T00Z/2012-01-03T00Z") ); - final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularities.NONE, false, intervals); + final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularities.NONE, false, intervals, null); Assert.assertFalse(spec.isRollup()); } @@ -175,4 +179,37 @@ public class ArbitraryGranularityTest throw Throwables.propagate(e); } } + + @Test + public void testTimezone() { + final GranularitySpec spec = new ArbitraryGranularitySpec( + null, + true, + Lists.newArrayList( + new Interval("2012-01-08T00-08:00/2012-01-11T00-08:00"), + new Interval("2012-01-07T00-08:00/2012-01-08T00-08:00"), + new Interval("2012-01-03T00-08:00/2012-01-04T00-08:00"), + new Interval("2012-01-01T00-08:00/2012-01-03T00-08:00"), + new Interval("2012-09-01T00-07:00/2012-09-03T00-07:00") + ), + "America/Los_Angeles" + ); + + Assert.assertTrue(spec.bucketIntervals().isPresent()); + + final Optional> sortedSetOptional = spec.bucketIntervals(); + final SortedSet intervals = sortedSetOptional.get(); + + final ISOChronology chrono = ISOChronology.getInstance(DateTimeZone.forID("America/Los_Angeles")); + + final ArrayList expectedIntervals = Lists.newArrayList( + new Interval("2012-01-01/2012-01-03", chrono), + new Interval("2012-01-03/2012-01-04", chrono), + new Interval("2012-01-07/2012-01-08", chrono), + new Interval("2012-01-08/2012-01-11", chrono), + new Interval("2012-09-01/2012-09-03", chrono) + ); + + Assert.assertEquals(expectedIntervals, new ArrayList(intervals)); + } } diff --git a/server/src/test/java/io/druid/segment/indexing/granularity/UniformGranularityTest.java b/server/src/test/java/io/druid/segment/indexing/granularity/UniformGranularityTest.java index c0f616c06e9..7ab9ca2acd8 100644 --- a/server/src/test/java/io/druid/segment/indexing/granularity/UniformGranularityTest.java +++ b/server/src/test/java/io/druid/segment/indexing/granularity/UniformGranularityTest.java @@ -27,11 +27,15 @@ import com.metamx.common.Granularity; import io.druid.granularity.QueryGranularities; import io.druid.jackson.DefaultObjectMapper; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Interval; +import org.joda.time.chrono.ISOChronology; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.List; +import java.util.SortedSet; public class UniformGranularityTest { @@ -106,7 +110,7 @@ public class UniformGranularityTest new Interval("2012-01-03T00Z/2012-01-04T00Z"), new Interval("2012-01-01T00Z/2012-01-03T00Z") ); - final GranularitySpec spec = new UniformGranularitySpec(Granularity.DAY, QueryGranularities.NONE, false, intervals); + final GranularitySpec spec = new UniformGranularitySpec(Granularity.DAY, QueryGranularities.NONE, false, intervals, null); Assert.assertFalse(spec.isRollup()); } @@ -229,6 +233,46 @@ public class UniformGranularityTest ); } + + @Test + public void testTimezone() { + final GranularitySpec spec = new UniformGranularitySpec( + Granularity.DAY, + null, + true, + Lists.newArrayList( + new Interval("2012-01-08T00-08:00/2012-01-11T00-08:00"), + new Interval("2012-01-07T00-08:00/2012-01-08T00-08:00"), + new Interval("2012-01-03T00-08:00/2012-01-04T00-08:00"), + new Interval("2012-01-01T00-08:00/2012-01-03T00-08:00"), + new Interval("2012-09-01T00-07:00/2012-09-03T00-07:00") + ), + "America/Los_Angeles" + ); + + Assert.assertTrue(spec.bucketIntervals().isPresent()); + + final Optional> sortedSetOptional = spec.bucketIntervals(); + final SortedSet intervals = sortedSetOptional.get(); + + final ISOChronology chrono = ISOChronology.getInstance(DateTimeZone.forID("America/Los_Angeles")); + + final ArrayList expectedIntervals = Lists.newArrayList( + new Interval("2012-01-01/2012-01-02", chrono), + new Interval("2012-01-02/2012-01-03", chrono), + new Interval("2012-01-03/2012-01-04", chrono), + new Interval("2012-01-07/2012-01-08", chrono), + new Interval("2012-01-08/2012-01-09", chrono), + new Interval("2012-01-09/2012-01-10", chrono), + new Interval("2012-01-10/2012-01-11", chrono), + new Interval("2012-09-01/2012-09-02", chrono), + new Interval("2012-09-02/2012-09-03", chrono) + ); + + Assert.assertEquals(expectedIntervals, new ArrayList(intervals)); + } + + private void notEqualsCheck(GranularitySpec spec1, GranularitySpec spec2) { Assert.assertNotEquals(spec1, spec2); Assert.assertNotEquals(spec1.hashCode(), spec2.hashCode());