Hadoop indexing: Fix NPE when intervals not provided (#4686)

* Fix #4647

* NPE protect bucketInterval as well

* Add test to verify timezone as well

* Also handle case when intervals are already present

* Fix checkstyle error

* Use factory method instead for Datetime

* Use Intervals factory method
This commit is contained in:
praveev 2017-10-05 22:46:07 -07:00 committed by Jonathan Wei
parent 716a5ec1a8
commit 4ff12e4394
4 changed files with 75 additions and 13 deletions

View File

@ -300,12 +300,14 @@ public class DetermineHashedPartitionsJob implements Jobby
{
private final List<Interval> intervals = Lists.newArrayList();
protected HadoopDruidIndexerConfig config = null;
private boolean determineIntervals;
@Override
protected void setup(Context context)
throws IOException, InterruptedException
{
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
determineIntervals = !config.getSegmentGranularIntervals().isPresent();
}
@Override
@ -321,12 +323,20 @@ public class DetermineHashedPartitionsJob implements Jobby
HyperLogLogCollector.makeCollector(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()))
);
}
Interval interval;
if (determineIntervals) {
interval = config.getGranularitySpec().getSegmentGranularity().bucket(DateTimes.utc(key.get()));
} else {
Optional<Interval> intervalOptional = config.getGranularitySpec().bucketInterval(DateTimes.utc(key.get()));
if (!intervalOptional.isPresent()) {
throw new ISE("WTF?! No bucket found for timestamp: %s", key.get());
}
Interval interval = intervalOptional.get();
interval = intervalOptional.get();
}
intervals.add(interval);
final Path outPath = config.makeSegmentPartitionInfoPath(interval);
final OutputStream out = Utils.makePathAndOutputStream(
@ -353,7 +363,7 @@ public class DetermineHashedPartitionsJob implements Jobby
throws IOException, InterruptedException
{
super.run(context);
if (!config.getSegmentGranularIntervals().isPresent()) {
if (determineIntervals) {
final Path outPath = config.makeIntervalInfoPath();
final OutputStream out = Utils.makePathAndOutputStream(
context, outPath, config.isOverwriteFiles()

View File

@ -29,10 +29,15 @@ import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.granularity.PeriodGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -53,7 +58,7 @@ public class DetermineHashedPartitionsJobTest
private int[] expectedNumOfShards;
private int errorMargin;
@Parameterized.Parameters(name = "File={0}, TargetPartitionSize={1}, Interval={2}, ErrorMargin={3}, NumTimeBuckets={4}, NumShards={5}")
@Parameterized.Parameters(name = "File={0}, TargetPartitionSize={1}, Interval={2}, ErrorMargin={3}, NumTimeBuckets={4}, NumShards={5}, SegmentGranularity={6}")
public static Collection<?> data()
{
int[] first = new int[1];
@ -73,7 +78,8 @@ public class DetermineHashedPartitionsJobTest
"2011-04-10T00:00:00.000Z/2011-04-11T00:00:00.000Z",
0,
1,
first
first,
Granularities.DAY
},
{
DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(),
@ -81,7 +87,8 @@ public class DetermineHashedPartitionsJobTest
"2011-04-10T00:00:00.000Z/2011-04-16T00:00:00.000Z",
0,
6,
second
second,
Granularities.DAY
},
{
DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(),
@ -89,7 +96,26 @@ public class DetermineHashedPartitionsJobTest
"2011-04-10T00:00:00.000Z/2011-04-16T00:00:00.000Z",
0,
6,
third
third,
Granularities.DAY
},
{
DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(),
1L,
null,
0,
6,
third,
Granularities.DAY
},
{
DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.rows.in.timezone.tsv").getPath(),
1L,
null,
0,
1,
first,
new PeriodGranularity(new Period("P1D"), null, DateTimeZone.forID("America/Los_Angeles"))
}
}
);
@ -101,7 +127,8 @@ public class DetermineHashedPartitionsJobTest
String interval,
int errorMargin,
int expectedNumTimeBuckets,
int[] expectedNumOfShards
int[] expectedNumOfShards,
Granularity segmentGranularity
) throws IOException
{
this.expectedNumOfShards = expectedNumOfShards;
@ -109,6 +136,11 @@ public class DetermineHashedPartitionsJobTest
this.errorMargin = errorMargin;
File tmpDir = Files.createTempDir();
ImmutableList<Interval> intervals = null;
if (interval != null) {
intervals = ImmutableList.of(Intervals.of(interval));
}
HadoopIngestionSpec ingestionSpec = new HadoopIngestionSpec(
new DataSchema(
"test_schema",
@ -145,9 +177,9 @@ public class DetermineHashedPartitionsJobTest
),
new AggregatorFactory[]{new DoubleSumAggregatorFactory("index", "index")},
new UniformGranularitySpec(
Granularities.DAY,
segmentGranularity,
Granularities.NONE,
ImmutableList.of(Intervals.of(interval))
intervals
),
HadoopDruidIndexerConfig.JSON_MAPPER
),

View File

@ -0,0 +1,16 @@
2011-04-10T00:00:00.000-07:00 spot automotive preferred apreferred 113.221448
2011-04-10T00:00:00.000-07:00 spot automotive preferred apreferred 11.221448
2011-04-10T00:00:00.000-07:00 spot automotive preferred apreferred 103.221448
2011-04-10T00:00:00.000-07:00 spot automotive preferred apreferred 53.221448
2011-04-10T00:00:00.000-07:00 spot business preferred bpreferred 95.570457
2011-04-10T00:00:00.000-07:00 spot entertainment preferred epreferred 131.766616
2011-04-10T00:00:00.000-07:00 spot health preferred hpreferred 99.950855
2011-04-10T00:00:00.000-07:00 spot mezzanine preferred mpreferred 91.470524
2011-04-10T00:00:00.000-07:00 spot news preferred npreferred 99.393076
2011-04-10T00:00:00.000-07:00 spot premium preferred ppreferred 123.207579
2011-04-10T00:00:00.000-07:00 spot technology preferred tpreferred 84.898691
2011-04-10T00:00:00.000-07:00 spot travel preferred tpreferred 114.353962
2011-04-10T00:00:00.000-07:00 total_market mezzanine preferred mpreferred 1005.253077
2011-04-10T00:00:00.000-07:00 total_market premium preferred ppreferred 1030.094757
2011-04-10T00:00:00.000-07:00 upfront mezzanine preferred mpreferred 1031.741509
2011-04-10T00:00:00.000-07:00 upfront premium preferred ppreferred 775.965555
1 2011-04-10T00:00:00.000-07:00 spot automotive preferred apreferred 113.221448
2 2011-04-10T00:00:00.000-07:00 spot automotive preferred apreferred 11.221448
3 2011-04-10T00:00:00.000-07:00 spot automotive preferred apreferred 103.221448
4 2011-04-10T00:00:00.000-07:00 spot automotive preferred apreferred 53.221448
5 2011-04-10T00:00:00.000-07:00 spot business preferred bpreferred 95.570457
6 2011-04-10T00:00:00.000-07:00 spot entertainment preferred epreferred 131.766616
7 2011-04-10T00:00:00.000-07:00 spot health preferred hpreferred 99.950855
8 2011-04-10T00:00:00.000-07:00 spot mezzanine preferred mpreferred 91.470524
9 2011-04-10T00:00:00.000-07:00 spot news preferred npreferred 99.393076
10 2011-04-10T00:00:00.000-07:00 spot premium preferred ppreferred 123.207579
11 2011-04-10T00:00:00.000-07:00 spot technology preferred tpreferred 84.898691
12 2011-04-10T00:00:00.000-07:00 spot travel preferred tpreferred 114.353962
13 2011-04-10T00:00:00.000-07:00 total_market mezzanine preferred mpreferred 1005.253077
14 2011-04-10T00:00:00.000-07:00 total_market premium preferred ppreferred 1030.094757
15 2011-04-10T00:00:00.000-07:00 upfront mezzanine preferred mpreferred 1031.741509
16 2011-04-10T00:00:00.000-07:00 upfront premium preferred ppreferred 775.965555

View File

@ -97,8 +97,12 @@ public class UniformGranularitySpec implements GranularitySpec
@Override
public Optional<Interval> bucketInterval(DateTime dt)
{
if (wrappedSpec == null) {
return Optional.absent();
} else {
return wrappedSpec.bucketInterval(dt);
}
}
@Override
@JsonProperty("segmentGranularity")