mirror of https://github.com/apache/druid.git
Remove "granularity" from IngestSegmentFirehose. (#4110)
It wasn't doing anything useful (the sequences were being concatted, and cursor.getTime() wasn't being called) and it defaulted to Granularities.NONE. Changing it to Granularities.ALL gave me a 700x+ performance boost on a small dataset I was reindexing (2m27s to 365ms). Most of that was from avoiding making a lot of unnecessary column selectors.
This commit is contained in:
parent
23f77ebd20
commit
b4289c0004
|
@ -52,7 +52,6 @@ Here is what goes inside `ingestionSpec`:
|
|||
|dataSource|String|Druid dataSource name from which you are loading the data.|yes|
|
||||
|intervals|List|A list of strings representing ISO-8601 Intervals.|yes|
|
||||
|segments|List|List of segments from which to read data from, by default it is obtained automatically. You can obtain list of segments to put here by making a POST query to coordinator at url /druid/coordinator/v1/metadata/datasources/segments?full with list of intervals specified in the request paylod e.g. ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]. You may want to provide this list manually in order to ensure that segments read are exactly same as they were at the time of task submission, task would fail if the list provided by the user does not match with state of database when the task actually runs.|no|
|
||||
|granularity|String|Defines the granularity of the query while loading data. Default value is "none". See [Granularities](../querying/granularities.html).|no|
|
||||
|filter|JSON|See [Filters](../querying/filters.html)|no|
|
||||
|dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no|
|
||||
|metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no|
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.java.util.common.granularity.Granularities;
|
||||
import io.druid.java.util.common.granularity.Granularity;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
@ -38,7 +37,6 @@ public class DatasourceIngestionSpec
|
|||
private final List<Interval> intervals;
|
||||
private final List<DataSegment> segments;
|
||||
private final DimFilter filter;
|
||||
private final Granularity granularity;
|
||||
private final List<String> dimensions;
|
||||
private final List<String> metrics;
|
||||
private final boolean ignoreWhenNoSegments;
|
||||
|
@ -50,7 +48,6 @@ public class DatasourceIngestionSpec
|
|||
@JsonProperty("intervals") List<Interval> intervals,
|
||||
@JsonProperty("segments") List<DataSegment> segments,
|
||||
@JsonProperty("filter") DimFilter filter,
|
||||
@JsonProperty("granularity") Granularity granularity,
|
||||
@JsonProperty("dimensions") List<String> dimensions,
|
||||
@JsonProperty("metrics") List<String> metrics,
|
||||
@JsonProperty("ignoreWhenNoSegments") boolean ignoreWhenNoSegments
|
||||
|
@ -77,8 +74,6 @@ public class DatasourceIngestionSpec
|
|||
this.segments = segments;
|
||||
|
||||
this.filter = filter;
|
||||
this.granularity = granularity == null ? Granularities.NONE : granularity;
|
||||
|
||||
this.dimensions = dimensions;
|
||||
this.metrics = metrics;
|
||||
|
||||
|
@ -109,12 +104,6 @@ public class DatasourceIngestionSpec
|
|||
return filter;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Granularity getGranularity()
|
||||
{
|
||||
return granularity;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public List<String> getDimensions()
|
||||
{
|
||||
|
@ -141,7 +130,6 @@ public class DatasourceIngestionSpec
|
|||
intervals,
|
||||
segments,
|
||||
filter,
|
||||
granularity,
|
||||
dimensions,
|
||||
metrics,
|
||||
ignoreWhenNoSegments
|
||||
|
@ -156,7 +144,6 @@ public class DatasourceIngestionSpec
|
|||
intervals,
|
||||
segments,
|
||||
filter,
|
||||
granularity,
|
||||
dimensions,
|
||||
metrics,
|
||||
ignoreWhenNoSegments
|
||||
|
@ -171,7 +158,6 @@ public class DatasourceIngestionSpec
|
|||
intervals,
|
||||
segments,
|
||||
filter,
|
||||
granularity,
|
||||
dimensions,
|
||||
metrics,
|
||||
ignoreWhenNoSegments
|
||||
|
@ -186,7 +172,6 @@ public class DatasourceIngestionSpec
|
|||
intervals,
|
||||
segments,
|
||||
filter,
|
||||
granularity,
|
||||
dimensions,
|
||||
metrics,
|
||||
ignoreWhenNoSegments
|
||||
|
@ -220,9 +205,6 @@ public class DatasourceIngestionSpec
|
|||
if (filter != null ? !filter.equals(that.filter) : that.filter != null) {
|
||||
return false;
|
||||
}
|
||||
if (!granularity.equals(that.granularity)) {
|
||||
return false;
|
||||
}
|
||||
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) {
|
||||
return false;
|
||||
}
|
||||
|
@ -237,7 +219,6 @@ public class DatasourceIngestionSpec
|
|||
result = 31 * result + intervals.hashCode();
|
||||
result = 31 * result + (segments != null ? segments.hashCode() : 0);
|
||||
result = 31 * result + (filter != null ? filter.hashCode() : 0);
|
||||
result = 31 * result + granularity.hashCode();
|
||||
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
|
||||
result = 31 * result + (metrics != null ? metrics.hashCode() : 0);
|
||||
result = 31 * result + (ignoreWhenNoSegments ? 1 : 0);
|
||||
|
@ -252,7 +233,6 @@ public class DatasourceIngestionSpec
|
|||
", intervals=" + intervals +
|
||||
", segments=" + segments +
|
||||
", filter=" + filter +
|
||||
", granularity=" + granularity +
|
||||
", dimensions=" + dimensions +
|
||||
", metrics=" + metrics +
|
||||
", ignoreWhenNoSegments=" + ignoreWhenNoSegments +
|
||||
|
|
|
@ -26,7 +26,6 @@ import com.google.common.base.Throwables;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.io.Closeables;
|
||||
import com.google.common.io.Files;
|
||||
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.data.input.MapBasedRow;
|
||||
|
@ -111,8 +110,7 @@ public class DatasourceRecordReader extends RecordReader<NullWritable, InputRow>
|
|||
adapters,
|
||||
spec.getDimensions(),
|
||||
spec.getMetrics(),
|
||||
spec.getFilter(),
|
||||
spec.getGranularity()
|
||||
spec.getFilter()
|
||||
);
|
||||
|
||||
}
|
||||
|
|
|
@ -322,8 +322,7 @@ public class BatchDeltaIngestionTest
|
|||
ImmutableList.of(new WindowedStorageAdapter(adapter, windowedDataSegment.getInterval())),
|
||||
ImmutableList.of("host"),
|
||||
ImmutableList.of("visited_sum", "unique_hosts"),
|
||||
null,
|
||||
Granularities.NONE
|
||||
null
|
||||
);
|
||||
|
||||
List<InputRow> rows = Lists.newArrayList();
|
||||
|
|
|
@ -92,7 +92,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
|
|||
PathSpec pathSpec = new DatasourcePathSpec(
|
||||
jsonMapper,
|
||||
null,
|
||||
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, null, false),
|
||||
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false),
|
||||
null
|
||||
);
|
||||
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
|
||||
|
@ -119,7 +119,6 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false
|
||||
),
|
||||
null
|
||||
|
@ -148,7 +147,6 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false
|
||||
),
|
||||
null
|
||||
|
@ -174,7 +172,6 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false
|
||||
),
|
||||
null
|
||||
|
@ -206,7 +203,6 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false
|
||||
),
|
||||
null
|
||||
|
|
|
@ -22,7 +22,6 @@ package io.druid.indexer.hadoop;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.java.util.common.granularity.Granularities;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
@ -49,7 +48,6 @@ public class DatasourceIngestionSpecTest
|
|||
null,
|
||||
null,
|
||||
new SelectorDimFilter("dim", "value", null),
|
||||
Granularities.DAY,
|
||||
Lists.newArrayList("d1", "d2"),
|
||||
Lists.newArrayList("m1", "m2", "m3"),
|
||||
false
|
||||
|
@ -86,7 +84,6 @@ public class DatasourceIngestionSpecTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false
|
||||
);
|
||||
|
||||
|
@ -133,7 +130,6 @@ public class DatasourceIngestionSpecTest
|
|||
)
|
||||
),
|
||||
new SelectorDimFilter("dim", "value", null),
|
||||
Granularities.DAY,
|
||||
Lists.newArrayList("d1", "d2"),
|
||||
Lists.newArrayList("m1", "m2", "m3"),
|
||||
true
|
||||
|
@ -156,7 +152,7 @@ public class DatasourceIngestionSpecTest
|
|||
DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class);
|
||||
|
||||
Assert.assertEquals(
|
||||
new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, null, false),
|
||||
new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, false),
|
||||
actual
|
||||
);
|
||||
}
|
||||
|
|
|
@ -67,7 +67,6 @@ public class DatasourceRecordReaderTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
segment.getDimensions(),
|
||||
segment.getMetrics(),
|
||||
false
|
||||
|
|
|
@ -79,7 +79,6 @@ public class DatasourcePathSpecTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false
|
||||
);
|
||||
|
||||
|
|
|
@ -38,7 +38,6 @@ import io.druid.indexing.common.TaskToolbox;
|
|||
import io.druid.indexing.common.TaskToolboxFactory;
|
||||
import io.druid.indexing.common.actions.SegmentListUsedAction;
|
||||
import io.druid.indexing.common.task.NoopTask;
|
||||
import io.druid.java.util.common.granularity.Granularities;
|
||||
import io.druid.java.util.common.parsers.ParseException;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.segment.IndexIO;
|
||||
|
@ -282,12 +281,9 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
|
|||
)
|
||||
);
|
||||
|
||||
return new IngestSegmentFirehose(adapters, dims, metricsList, dimFilter, Granularities.NONE);
|
||||
return new IngestSegmentFirehose(adapters, dims, metricsList, dimFilter);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
catch (SegmentLoadingException e) {
|
||||
catch (IOException | SegmentLoadingException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ import com.google.common.collect.Maps;
|
|||
import io.druid.data.input.Firehose;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.java.util.common.granularity.Granularity;
|
||||
import io.druid.java.util.common.granularity.Granularities;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
import io.druid.java.util.common.guava.Yielder;
|
||||
|
@ -59,8 +59,7 @@ public class IngestSegmentFirehose implements Firehose
|
|||
final List<WindowedStorageAdapter> adapters,
|
||||
final List<String> dims,
|
||||
final List<String> metrics,
|
||||
final DimFilter dimFilter,
|
||||
final Granularity granularity
|
||||
final DimFilter dimFilter
|
||||
)
|
||||
{
|
||||
Sequence<InputRow> rows = Sequences.concat(
|
||||
|
@ -77,7 +76,7 @@ public class IngestSegmentFirehose implements Firehose
|
|||
Filters.toFilter(dimFilter),
|
||||
adapter.getInterval(),
|
||||
VirtualColumns.EMPTY,
|
||||
granularity,
|
||||
Granularities.ALL,
|
||||
false
|
||||
), new Function<Cursor, Sequence<InputRow>>()
|
||||
{
|
||||
|
|
|
@ -72,8 +72,7 @@ public class IngestSegmentFirehoseTest
|
|||
ImmutableList.of(wsa, wsa),
|
||||
ImmutableList.of("host"),
|
||||
ImmutableList.of("visited_sum", "unique_hosts"),
|
||||
null,
|
||||
Granularities.NONE
|
||||
null
|
||||
);
|
||||
|
||||
int count = 0;
|
||||
|
|
Loading…
Reference in New Issue