From 4ff12e4394b8634b60a927ad4f1d4614b6b598c4 Mon Sep 17 00:00:00 2001 From: praveev Date: Thu, 5 Oct 2017 22:46:07 -0700 Subject: [PATCH] Hadoop indexing: Fix NPE when intervals not provided (#4686) * Fix #4647 * NPE protect bucketInterval as well * Add test to verify timezone as well * Also handle case when intervals are already present * Fix checkstyle error * Use factory method instead for Datetime * Use Intervals factory method --- .../indexer/DetermineHashedPartitionsJob.java | 20 ++++++-- .../DetermineHashedPartitionsJobTest.java | 46 ++++++++++++++++--- .../druid.test.data.with.rows.in.timezone.tsv | 16 +++++++ .../granularity/UniformGranularitySpec.java | 6 ++- 4 files changed, 75 insertions(+), 13 deletions(-) create mode 100644 indexing-hadoop/src/test/resources/druid.test.data.with.rows.in.timezone.tsv diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 6f88a0c3d09..b3ccee6b8ae 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -300,12 +300,14 @@ public class DetermineHashedPartitionsJob implements Jobby { private final List intervals = Lists.newArrayList(); protected HadoopDruidIndexerConfig config = null; + private boolean determineIntervals; @Override protected void setup(Context context) throws IOException, InterruptedException { config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); + determineIntervals = !config.getSegmentGranularIntervals().isPresent(); } @Override @@ -321,12 +323,20 @@ public class DetermineHashedPartitionsJob implements Jobby HyperLogLogCollector.makeCollector(ByteBuffer.wrap(value.getBytes(), 0, value.getLength())) ); } - Optional intervalOptional = config.getGranularitySpec().bucketInterval(DateTimes.utc(key.get())); - if (!intervalOptional.isPresent()) { - throw new ISE("WTF?! No bucket found for timestamp: %s", key.get()); + Interval interval; + + if (determineIntervals) { + interval = config.getGranularitySpec().getSegmentGranularity().bucket(DateTimes.utc(key.get())); + } else { + Optional intervalOptional = config.getGranularitySpec().bucketInterval(DateTimes.utc(key.get())); + + if (!intervalOptional.isPresent()) { + throw new ISE("WTF?! No bucket found for timestamp: %s", key.get()); + } + interval = intervalOptional.get(); } - Interval interval = intervalOptional.get(); + intervals.add(interval); final Path outPath = config.makeSegmentPartitionInfoPath(interval); final OutputStream out = Utils.makePathAndOutputStream( @@ -353,7 +363,7 @@ public class DetermineHashedPartitionsJob implements Jobby throws IOException, InterruptedException { super.run(context); - if (!config.getSegmentGranularIntervals().isPresent()) { + if (determineIntervals) { final Path outPath = config.makeIntervalInfoPath(); final OutputStream out = Utils.makePathAndOutputStream( context, outPath, config.isOverwriteFiles() diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java index 31219d64bb3..5ff2afdcd5d 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -29,10 +29,15 @@ import io.druid.data.input.impl.TimestampSpec; import io.druid.indexer.partitions.HashedPartitionsSpec; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.granularity.Granularity; +import io.druid.java.util.common.granularity.PeriodGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.joda.time.DateTimeZone; +import org.joda.time.Interval; +import org.joda.time.Period; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -53,7 +58,7 @@ public class DetermineHashedPartitionsJobTest private int[] expectedNumOfShards; private int errorMargin; - @Parameterized.Parameters(name = "File={0}, TargetPartitionSize={1}, Interval={2}, ErrorMargin={3}, NumTimeBuckets={4}, NumShards={5}") + @Parameterized.Parameters(name = "File={0}, TargetPartitionSize={1}, Interval={2}, ErrorMargin={3}, NumTimeBuckets={4}, NumShards={5}, SegmentGranularity={6}") public static Collection data() { int[] first = new int[1]; @@ -73,7 +78,8 @@ public class DetermineHashedPartitionsJobTest "2011-04-10T00:00:00.000Z/2011-04-11T00:00:00.000Z", 0, 1, - first + first, + Granularities.DAY }, { DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), @@ -81,7 +87,8 @@ public class DetermineHashedPartitionsJobTest "2011-04-10T00:00:00.000Z/2011-04-16T00:00:00.000Z", 0, 6, - second + second, + Granularities.DAY }, { DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), @@ -89,7 +96,26 @@ public class DetermineHashedPartitionsJobTest "2011-04-10T00:00:00.000Z/2011-04-16T00:00:00.000Z", 0, 6, - third + third, + Granularities.DAY + }, + { + DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), + 1L, + null, + 0, + 6, + third, + Granularities.DAY + }, + { + DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.rows.in.timezone.tsv").getPath(), + 1L, + null, + 0, + 1, + first, + new PeriodGranularity(new Period("P1D"), null, DateTimeZone.forID("America/Los_Angeles")) } } ); @@ -101,7 +127,8 @@ public class DetermineHashedPartitionsJobTest String interval, int errorMargin, int expectedNumTimeBuckets, - int[] expectedNumOfShards + int[] expectedNumOfShards, + Granularity segmentGranularity ) throws IOException { this.expectedNumOfShards = expectedNumOfShards; @@ -109,6 +136,11 @@ public class DetermineHashedPartitionsJobTest this.errorMargin = errorMargin; File tmpDir = Files.createTempDir(); + ImmutableList intervals = null; + if (interval != null) { + intervals = ImmutableList.of(Intervals.of(interval)); + } + HadoopIngestionSpec ingestionSpec = new HadoopIngestionSpec( new DataSchema( "test_schema", @@ -145,9 +177,9 @@ public class DetermineHashedPartitionsJobTest ), new AggregatorFactory[]{new DoubleSumAggregatorFactory("index", "index")}, new UniformGranularitySpec( - Granularities.DAY, + segmentGranularity, Granularities.NONE, - ImmutableList.of(Intervals.of(interval)) + intervals ), HadoopDruidIndexerConfig.JSON_MAPPER ), diff --git a/indexing-hadoop/src/test/resources/druid.test.data.with.rows.in.timezone.tsv b/indexing-hadoop/src/test/resources/druid.test.data.with.rows.in.timezone.tsv new file mode 100644 index 00000000000..f855c19d096 --- /dev/null +++ b/indexing-hadoop/src/test/resources/druid.test.data.with.rows.in.timezone.tsv @@ -0,0 +1,16 @@ +2011-04-10T00:00:00.000-07:00 spot automotive preferred apreferred 113.221448 +2011-04-10T00:00:00.000-07:00 spot automotive preferred apreferred 11.221448 +2011-04-10T00:00:00.000-07:00 spot automotive preferred apreferred 103.221448 +2011-04-10T00:00:00.000-07:00 spot automotive preferred apreferred 53.221448 +2011-04-10T00:00:00.000-07:00 spot business preferred bpreferred 95.570457 +2011-04-10T00:00:00.000-07:00 spot entertainment preferred epreferred 131.766616 +2011-04-10T00:00:00.000-07:00 spot health preferred hpreferred 99.950855 +2011-04-10T00:00:00.000-07:00 spot mezzanine preferred mpreferred 91.470524 +2011-04-10T00:00:00.000-07:00 spot news preferred npreferred 99.393076 +2011-04-10T00:00:00.000-07:00 spot premium preferred ppreferred 123.207579 +2011-04-10T00:00:00.000-07:00 spot technology preferred tpreferred 84.898691 +2011-04-10T00:00:00.000-07:00 spot travel preferred tpreferred 114.353962 +2011-04-10T00:00:00.000-07:00 total_market mezzanine preferred mpreferred 1005.253077 +2011-04-10T00:00:00.000-07:00 total_market premium preferred ppreferred 1030.094757 +2011-04-10T00:00:00.000-07:00 upfront mezzanine preferred mpreferred 1031.741509 +2011-04-10T00:00:00.000-07:00 upfront premium preferred ppreferred 775.965555 \ No newline at end of file diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java index 7205f552422..e459877d8ad 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java @@ -97,7 +97,11 @@ public class UniformGranularitySpec implements GranularitySpec @Override public Optional bucketInterval(DateTime dt) { - return wrappedSpec.bucketInterval(dt); + if (wrappedSpec == null) { + return Optional.absent(); + } else { + return wrappedSpec.bucketInterval(dt); + } } @Override