From 4ecf9904d597aae95a7c38b01596649f22e089f6 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 26 Mar 2020 21:38:17 +0100 Subject: [PATCH] [Transform] Transform optmize date histogram (#54068) optimize transform for group_by on date_histogram by injecting an additional range query. This limits the number of search and index requests and avoids unnecessary updates. Only recent buckets get re-written. fixes #54254 --- .../hlrc/DateHistogramGroupSourceTests.java | 9 +- .../core/transform/transforms/SyncConfig.java | 2 + .../transform/transforms/TimeSyncConfig.java | 30 +++-- .../pivot/DateHistogramGroupSource.java | 54 ++++++-- .../pivot/HistogramGroupSource.java | 6 +- .../transforms/pivot/SingleGroupSource.java | 6 +- .../transforms/pivot/TermsGroupSource.java | 11 +- .../TransformConfigUpdateTests.java | 7 +- .../pivot/DateHistogramGroupSourceTests.java | 69 +++++++++- .../transforms/TransformIndexer.java | 120 +++++++----------- .../transform/transforms/pivot/Pivot.java | 47 +++---- 11 files changed, 221 insertions(+), 140 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java index 3e119b79afb..6464cea4cf0 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java @@ -74,19 +74,18 @@ public class DateHistogramGroupSourceTests extends AbstractResponseTestCase< dateHistogramGroupSource = new DateHistogramGroupSource( field, scriptConfig, - new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())) + new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())), + randomBoolean() ? randomZone() : null ); } else { dateHistogramGroupSource = new DateHistogramGroupSource( field, scriptConfig, - new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))) + new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))), + randomBoolean() ? randomZone() : null ); } - if (randomBoolean()) { - dateHistogramGroupSource.setTimeZone(randomZone()); - } return dateHistogramGroupSource; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java index 44452426d40..342c53e4bcb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java @@ -19,6 +19,8 @@ public interface SyncConfig extends ToXContentObject, NamedWriteable { */ boolean isValid(); + String getField(); + QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint); QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java index d659a1f3905..a54507dc661 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java @@ -25,7 +25,7 @@ import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; -public class TimeSyncConfig implements SyncConfig { +public class TimeSyncConfig implements SyncConfig { public static final TimeValue DEFAULT_DELAY = TimeValue.timeValueSeconds(60); private static final String NAME = "data_frame_transform_pivot_sync_time"; @@ -37,17 +37,18 @@ public class TimeSyncConfig implements SyncConfig { private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); private static ConstructingObjectParser createParser(boolean lenient) { - ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, lenient, - args -> { - String field = (String) args[0]; - TimeValue delay = (TimeValue) args[1]; - return new TimeSyncConfig(field, delay); - }); + ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, lenient, args -> { + String field = (String) args[0]; + TimeValue delay = (TimeValue) args[1]; + return new TimeSyncConfig(field, delay); + }); parser.declareString(constructorArg(), TransformField.FIELD); - parser.declareField(optionalConstructorArg(), + parser.declareField( + optionalConstructorArg(), (p, c) -> TimeValue.parseTimeValue(p.text(), DEFAULT_DELAY, TransformField.DELAY.getPreferredName()), TransformField.DELAY, - ObjectParser.ValueType.STRING); + ObjectParser.ValueType.STRING + ); return parser; } @@ -65,6 +66,7 @@ public class TimeSyncConfig implements SyncConfig { this.delay = in.readTimeValue(); } + @Override public String getField() { return field; } @@ -105,12 +107,11 @@ public class TimeSyncConfig implements SyncConfig { final TimeSyncConfig that = (TimeSyncConfig) other; - return Objects.equals(this.field, that.field) - && Objects.equals(this.delay, that.delay); + return Objects.equals(this.field, that.field) && Objects.equals(this.delay, that.delay); } @Override - public int hashCode(){ + public int hashCode() { return Objects.hash(field, delay); } @@ -139,7 +140,8 @@ public class TimeSyncConfig implements SyncConfig { @Override public QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint) { - return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound()).lt(newCheckpoint.getTimeUpperBound()) - .format("epoch_millis"); + return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound()) + .lt(newCheckpoint.getTimeUpperBound()) + .format("epoch_millis"); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java index 6509546a2da..f61497e31d3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java @@ -7,15 +7,18 @@ package org.elasticsearch.xpack.core.transform.transforms.pivot; import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Rounding; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; @@ -105,6 +108,11 @@ public class DateHistogramGroupSource extends SingleGroupSource { public int hashCode() { return Objects.hash(interval); } + + @Override + public String toString() { + return interval.toString(); + } } public static class CalendarInterval implements Interval { @@ -169,6 +177,11 @@ public class DateHistogramGroupSource extends SingleGroupSource { public int hashCode() { return Objects.hash(interval); } + + @Override + public String toString() { + return interval.toString(); + } } private Interval readInterval(StreamInput in) throws IOException { @@ -195,11 +208,26 @@ public class DateHistogramGroupSource extends SingleGroupSource { private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); private final Interval interval; - private ZoneId timeZone; + private final ZoneId timeZone; + private Rounding rounding; - public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval) { + public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval, ZoneId timeZone) { super(field, scriptConfig); this.interval = interval; + this.timeZone = timeZone; + + Rounding.DateTimeUnit timeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(interval.toString()); + final Rounding.Builder roundingBuilder; + if (timeUnit != null) { + roundingBuilder = new Rounding.Builder(timeUnit); + } else { + roundingBuilder = new Rounding.Builder(TimeValue.parseTimeValue(interval.toString(), interval.getName())); + } + + if (timeZone != null) { + roundingBuilder.timeZone(timeZone); + } + this.rounding = roundingBuilder.build(); } public DateHistogramGroupSource(StreamInput in) throws IOException { @@ -218,6 +246,7 @@ public class DateHistogramGroupSource extends SingleGroupSource { ScriptConfig scriptConfig = (ScriptConfig) args[1]; String fixedInterval = (String) args[2]; String calendarInterval = (String) args[3]; + ZoneId zoneId = (ZoneId) args[4]; Interval interval = null; @@ -231,7 +260,7 @@ public class DateHistogramGroupSource extends SingleGroupSource { throw new IllegalArgumentException("You must specify either fixed_interval or calendar_interval, found none"); } - return new DateHistogramGroupSource(field, scriptConfig, interval); + return new DateHistogramGroupSource(field, scriptConfig, interval, zoneId); }); declareValuesSourceFields(parser, lenient); @@ -239,7 +268,7 @@ public class DateHistogramGroupSource extends SingleGroupSource { parser.declareString(optionalConstructorArg(), new ParseField(FixedInterval.NAME)); parser.declareString(optionalConstructorArg(), new ParseField(CalendarInterval.NAME)); - parser.declareField(DateHistogramGroupSource::setTimeZone, p -> { + parser.declareField(optionalConstructorArg(), p -> { if (p.currentToken() == XContentParser.Token.VALUE_STRING) { return ZoneId.of(p.text()); } else { @@ -267,8 +296,8 @@ public class DateHistogramGroupSource extends SingleGroupSource { return timeZone; } - public void setTimeZone(ZoneId timeZone) { - this.timeZone = timeZone; + public Rounding getRounding() { + return rounding; } @Override @@ -315,9 +344,16 @@ public class DateHistogramGroupSource extends SingleGroupSource { } @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets) { - // no need for an extra range filter as this is already done by checkpoints - return null; + public QueryBuilder getIncrementalBucketUpdateFilterQuery( + Set changedBuckets, + String synchronizationField, + long synchronizationTimestamp + ) { + if (synchronizationField != null && synchronizationField.equals(field) && synchronizationTimestamp > 0) { + return new RangeQueryBuilder(field).gte(rounding.round(synchronizationTimestamp)).format("epoch_millis"); + } else { + return null; + } } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java index 0eba51cb512..4f62faa7cb0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java @@ -101,7 +101,11 @@ public class HistogramGroupSource extends SingleGroupSource { } @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets) { + public QueryBuilder getIncrementalBucketUpdateFilterQuery( + Set changedBuckets, + String synchronizationField, + long synchronizationTimestamp + ) { // histograms are simple and cheap, so we skip this optimization return null; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java index bd215126119..569bf08d820 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java @@ -116,7 +116,11 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject { public abstract boolean supportsIncrementalBucketUpdate(); - public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets); + public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery( + Set changedBuckets, + String synchronizationField, + long synchronizationTimestamp + ); public String getField() { return field; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java index e0d7c8a4f9c..17a173836cb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java @@ -54,8 +54,15 @@ public class TermsGroupSource extends SingleGroupSource { } @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets) { - return new TermsQueryBuilder(field, changedBuckets); + public QueryBuilder getIncrementalBucketUpdateFilterQuery( + Set changedBuckets, + String synchronizationField, + long synchronizationTimestamp + ) { + if (changedBuckets != null && changedBuckets.isEmpty() == false) { + return new TermsQueryBuilder(field, changedBuckets); + } + return null; } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java index 59a7d1861d3..56886bf5bd9 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java @@ -21,9 +21,9 @@ import java.time.Instant; import java.util.Collections; import java.util.Map; -import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig; +import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig; import static org.hamcrest.Matchers.equalTo; public class TransformConfigUpdateTests extends AbstractSerializingTransformTestCase { @@ -184,6 +184,11 @@ public class TransformConfigUpdateTests extends AbstractSerializingTransformTest return "foo"; } + @Override + public String getField() { + return "foo"; + } + @Override public void writeTo(StreamOutput out) throws IOException {} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java index 6ce9a9373c7..265f2a94468 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java @@ -10,11 +10,17 @@ import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.DateFormatters; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; +import java.time.ZoneOffset; +import java.time.temporal.TemporalAccessor; + +import static org.hamcrest.Matchers.equalTo; public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase { @@ -26,19 +32,20 @@ public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase> changedBuckets; + private volatile Map> changedBuckets = Collections.emptyMap(); private volatile Map changedBucketsAfterKey; private volatile long lastCheckpointCleanup = 0L; @@ -146,7 +142,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer processBuckets(final CompositeAggregation agg) { // we reached the end if (agg.getBuckets().isEmpty()) { - return new IterationResult<>(Collections.emptyList(), null, true); + if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false || pivot.supportsIncrementalBucketUpdate() == false) { + return new IterationResult<>(Collections.emptyList(), null, true); + } + + // cleanup changed Buckets + changedBuckets = Collections.emptyMap(); + + // reset the runState to fetch changed buckets + runState = RunState.IDENTIFY_CHANGES; + + // advance the cursor for changed bucket detection + return new IterationResult<>(Collections.emptyList(), new TransformIndexerPosition(null, changedBucketsAfterKey), false); + } long docsBeforeProcess = getStats().getNumDocuments(); @@ -608,21 +614,6 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer processPartialBucketUpdates(final CompositeAggregation agg) { - // we reached the end - if (agg.getBuckets().isEmpty()) { - // cleanup changed Buckets - changedBuckets = null; - - // reset the runState to fetch changed buckets - runState = RunState.PARTIAL_RUN_IDENTIFY_CHANGES; - // advance the cursor for changed bucket detection - return new IterationResult<>(Collections.emptyList(), new TransformIndexerPosition(null, changedBucketsAfterKey), false); - } - - return processBuckets(agg); - } - private IterationResult processChangedBuckets(final CompositeAggregation agg) { // initialize the map of changed buckets, the map might be empty if source do not require/implement // changed bucket detection @@ -631,7 +622,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer(Collections.emptyList(), null, true); } @@ -644,7 +635,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer(Collections.emptyList(), getPosition(), false); } @@ -721,15 +712,12 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer fieldTypeMap, TransformIndexerStats transformIndexerStats ) { - GroupConfig groups = config.getGroupConfig(); Collection aggregationBuilders = config.getAggregationConfig().getAggregatorFactories(); Collection pipelineAggregationBuilders = config.getAggregationConfig().getPipelineAggregatorFactories(); @@ -189,44 +189,31 @@ public class Pivot { ); } - public QueryBuilder filterBuckets(Map> changedBuckets) { - - if (changedBuckets == null || changedBuckets.isEmpty()) { - return null; - } + public QueryBuilder filterBuckets( + Map> changedBuckets, + String synchronizationField, + long lastSynchronizationCheckpoint + ) { + assert changedBuckets != null; if (config.getGroupConfig().getGroups().size() == 1) { Entry entry = config.getGroupConfig().getGroups().entrySet().iterator().next(); - // it should not be possible to get into this code path - assert (entry.getValue().supportsIncrementalBucketUpdate()); - - logger.trace("filter by bucket: " + entry.getKey() + "/" + entry.getValue().getField()); - if (changedBuckets.containsKey(entry.getKey())) { - return entry.getValue().getIncrementalBucketUpdateFilterQuery(changedBuckets.get(entry.getKey())); - } else { - // should never happen - throw new RuntimeException("Could not find bucket value for key " + entry.getKey()); - } + logger.trace(() -> new ParameterizedMessage("filter by bucket: {}/{}", entry.getKey(), entry.getValue().getField())); + Set changedBucketsByGroup = changedBuckets.get(entry.getKey()); + return entry.getValue() + .getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationField, lastSynchronizationCheckpoint); } // else: more than 1 group by, need to nest it BoolQueryBuilder filteredQuery = new BoolQueryBuilder(); for (Entry entry : config.getGroupConfig().getGroups().entrySet()) { - if (entry.getValue().supportsIncrementalBucketUpdate() == false) { - continue; + Set changedBucketsByGroup = changedBuckets.get(entry.getKey()); + QueryBuilder sourceQueryFilter = entry.getValue() + .getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationField, lastSynchronizationCheckpoint); + // the source might not define a filter optimization + if (sourceQueryFilter != null) { + filteredQuery.filter(sourceQueryFilter); } - - if (changedBuckets.containsKey(entry.getKey())) { - QueryBuilder sourceQueryFilter = entry.getValue().getIncrementalBucketUpdateFilterQuery(changedBuckets.get(entry.getKey())); - // the source might not define an filter optimization - if (sourceQueryFilter != null) { - filteredQuery.filter(sourceQueryFilter); - } - } else { - // should never happen - throw new RuntimeException("Could not find bucket value for key " + entry.getKey()); - } - } return filteredQuery;