From b4289c00048f89881718ea234137f6072c6cd446 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 24 Mar 2017 10:28:54 -0700 Subject: [PATCH] Remove "granularity" from IngestSegmentFirehose. (#4110) It wasn't doing anything useful (the sequences were being concatted, and cursor.getTime() wasn't being called) and it defaulted to Granularities.NONE. Changing it to Granularities.ALL gave me a 700x+ performance boost on a small dataset I was reindexing (2m27s to 365ms). Most of that was from avoiding making a lot of unnecessary column selectors. --- .../content/ingestion/update-existing-data.md | 1 - .../hadoop/DatasourceIngestionSpec.java | 20 ------------------- .../hadoop/DatasourceRecordReader.java | 4 +--- .../indexer/BatchDeltaIngestionTest.java | 3 +-- ...cUpdateDatasourcePathSpecSegmentsTest.java | 6 +----- .../hadoop/DatasourceIngestionSpecTest.java | 6 +----- .../hadoop/DatasourceRecordReaderTest.java | 1 - .../indexer/path/DatasourcePathSpecTest.java | 1 - .../IngestSegmentFirehoseFactory.java | 8 ++------ .../firehose/IngestSegmentFirehose.java | 7 +++---- .../firehose/IngestSegmentFirehoseTest.java | 3 +-- 11 files changed, 10 insertions(+), 50 deletions(-) diff --git a/docs/content/ingestion/update-existing-data.md b/docs/content/ingestion/update-existing-data.md index 92da8a0ae49..0b33d02f02b 100644 --- a/docs/content/ingestion/update-existing-data.md +++ b/docs/content/ingestion/update-existing-data.md @@ -52,7 +52,6 @@ Here is what goes inside `ingestionSpec`: |dataSource|String|Druid dataSource name from which you are loading the data.|yes| |intervals|List|A list of strings representing ISO-8601 Intervals.|yes| |segments|List|List of segments from which to read data from, by default it is obtained automatically. You can obtain list of segments to put here by making a POST query to coordinator at url /druid/coordinator/v1/metadata/datasources/segments?full with list of intervals specified in the request paylod e.g. ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]. You may want to provide this list manually in order to ensure that segments read are exactly same as they were at the time of task submission, task would fail if the list provided by the user does not match with state of database when the task actually runs.|no| -|granularity|String|Defines the granularity of the query while loading data. Default value is "none". See [Granularities](../querying/granularities.html).|no| |filter|JSON|See [Filters](../querying/filters.html)|no| |dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no| |metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no| diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java index 66546330fa4..63d10450c2c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import io.druid.common.utils.JodaUtils; -import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.granularity.Granularity; import io.druid.query.filter.DimFilter; import io.druid.timeline.DataSegment; @@ -38,7 +37,6 @@ public class DatasourceIngestionSpec private final List intervals; private final List segments; private final DimFilter filter; - private final Granularity granularity; private final List dimensions; private final List metrics; private final boolean ignoreWhenNoSegments; @@ -50,7 +48,6 @@ public class DatasourceIngestionSpec @JsonProperty("intervals") List intervals, @JsonProperty("segments") List segments, @JsonProperty("filter") DimFilter filter, - @JsonProperty("granularity") Granularity granularity, @JsonProperty("dimensions") List dimensions, @JsonProperty("metrics") List metrics, @JsonProperty("ignoreWhenNoSegments") boolean ignoreWhenNoSegments @@ -77,8 +74,6 @@ public class DatasourceIngestionSpec this.segments = segments; this.filter = filter; - this.granularity = granularity == null ? Granularities.NONE : granularity; - this.dimensions = dimensions; this.metrics = metrics; @@ -109,12 +104,6 @@ public class DatasourceIngestionSpec return filter; } - @JsonProperty - public Granularity getGranularity() - { - return granularity; - } - @JsonProperty public List getDimensions() { @@ -141,7 +130,6 @@ public class DatasourceIngestionSpec intervals, segments, filter, - granularity, dimensions, metrics, ignoreWhenNoSegments @@ -156,7 +144,6 @@ public class DatasourceIngestionSpec intervals, segments, filter, - granularity, dimensions, metrics, ignoreWhenNoSegments @@ -171,7 +158,6 @@ public class DatasourceIngestionSpec intervals, segments, filter, - granularity, dimensions, metrics, ignoreWhenNoSegments @@ -186,7 +172,6 @@ public class DatasourceIngestionSpec intervals, segments, filter, - granularity, dimensions, metrics, ignoreWhenNoSegments @@ -220,9 +205,6 @@ public class DatasourceIngestionSpec if (filter != null ? !filter.equals(that.filter) : that.filter != null) { return false; } - if (!granularity.equals(that.granularity)) { - return false; - } if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) { return false; } @@ -237,7 +219,6 @@ public class DatasourceIngestionSpec result = 31 * result + intervals.hashCode(); result = 31 * result + (segments != null ? segments.hashCode() : 0); result = 31 * result + (filter != null ? filter.hashCode() : 0); - result = 31 * result + granularity.hashCode(); result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); result = 31 * result + (metrics != null ? metrics.hashCode() : 0); result = 31 * result + (ignoreWhenNoSegments ? 1 : 0); @@ -252,7 +233,6 @@ public class DatasourceIngestionSpec ", intervals=" + intervals + ", segments=" + segments + ", filter=" + filter + - ", granularity=" + granularity + ", dimensions=" + dimensions + ", metrics=" + metrics + ", ignoreWhenNoSegments=" + ignoreWhenNoSegments + diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java index f18929f8902..be6eb64a268 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java @@ -26,7 +26,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.io.Closeables; import com.google.common.io.Files; - import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedRow; @@ -111,8 +110,7 @@ public class DatasourceRecordReader extends RecordReader adapters, spec.getDimensions(), spec.getMetrics(), - spec.getFilter(), - spec.getGranularity() + spec.getFilter() ); } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 00300c32ba4..7201bd11913 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -322,8 +322,7 @@ public class BatchDeltaIngestionTest ImmutableList.of(new WindowedStorageAdapter(adapter, windowedDataSegment.getInterval())), ImmutableList.of("host"), ImmutableList.of("visited_sum", "unique_hosts"), - null, - Granularities.NONE + null ); List rows = Lists.newArrayList(); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java index 4ab0b9f30a3..9154410875c 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java @@ -92,7 +92,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest PathSpec pathSpec = new DatasourcePathSpec( jsonMapper, null, - new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, null, false), + new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false), null ); HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed( @@ -119,7 +119,6 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest null, null, null, - null, false ), null @@ -148,7 +147,6 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest null, null, null, - null, false ), null @@ -174,7 +172,6 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest null, null, null, - null, false ), null @@ -206,7 +203,6 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest null, null, null, - null, false ), null diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java index 3c99203d933..e55b6309a20 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java @@ -22,7 +22,6 @@ package io.druid.indexer.hadoop; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import io.druid.java.util.common.granularity.Granularities; import io.druid.query.filter.SelectorDimFilter; import io.druid.segment.TestHelper; import io.druid.timeline.DataSegment; @@ -49,7 +48,6 @@ public class DatasourceIngestionSpecTest null, null, new SelectorDimFilter("dim", "value", null), - Granularities.DAY, Lists.newArrayList("d1", "d2"), Lists.newArrayList("m1", "m2", "m3"), false @@ -86,7 +84,6 @@ public class DatasourceIngestionSpecTest null, null, null, - null, false ); @@ -133,7 +130,6 @@ public class DatasourceIngestionSpecTest ) ), new SelectorDimFilter("dim", "value", null), - Granularities.DAY, Lists.newArrayList("d1", "d2"), Lists.newArrayList("m1", "m2", "m3"), true @@ -156,7 +152,7 @@ public class DatasourceIngestionSpecTest DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class); Assert.assertEquals( - new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, null, false), + new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, false), actual ); } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java index c0ae5ecf69a..b910e9b49b0 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java @@ -67,7 +67,6 @@ public class DatasourceRecordReaderTest null, null, null, - null, segment.getDimensions(), segment.getMetrics(), false diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java index 1766c73b5d6..f1c7d558c90 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java @@ -79,7 +79,6 @@ public class DatasourcePathSpecTest null, null, null, - null, false ); diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 8897e47af3b..25028f0f5c0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -38,7 +38,6 @@ import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.task.NoopTask; -import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.parsers.ParseException; import io.druid.query.filter.DimFilter; import io.druid.segment.IndexIO; @@ -282,12 +281,9 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory adapters, final List dims, final List metrics, - final DimFilter dimFilter, - final Granularity granularity + final DimFilter dimFilter ) { Sequence rows = Sequences.concat( @@ -77,7 +76,7 @@ public class IngestSegmentFirehose implements Firehose Filters.toFilter(dimFilter), adapter.getInterval(), VirtualColumns.EMPTY, - granularity, + Granularities.ALL, false ), new Function>() { diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java index 6bdaf279407..ea8b5590701 100644 --- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java +++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java @@ -72,8 +72,7 @@ public class IngestSegmentFirehoseTest ImmutableList.of(wsa, wsa), ImmutableList.of("host"), ImmutableList.of("visited_sum", "unique_hosts"), - null, - Granularities.NONE + null ); int count = 0;