[Transform] Transform optmize date histogram (#54068)

optimize transform for group_by on date_histogram by injecting an additional range query. This limits the number of search and index requests and avoids unnecessary updates. Only recent buckets get re-written.

fixes #54254
This commit is contained in:
Hendrik Muhs 2020-03-26 21:38:17 +01:00
parent e866b57353
commit 4ecf9904d5
11 changed files with 221 additions and 140 deletions

View File

@ -74,19 +74,18 @@ public class DateHistogramGroupSourceTests extends AbstractResponseTestCase<
dateHistogramGroupSource = new DateHistogramGroupSource( dateHistogramGroupSource = new DateHistogramGroupSource(
field, field,
scriptConfig, scriptConfig,
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())) new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())),
randomBoolean() ? randomZone() : null
); );
} else { } else {
dateHistogramGroupSource = new DateHistogramGroupSource( dateHistogramGroupSource = new DateHistogramGroupSource(
field, field,
scriptConfig, scriptConfig,
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))) new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))),
randomBoolean() ? randomZone() : null
); );
} }
if (randomBoolean()) {
dateHistogramGroupSource.setTimeZone(randomZone());
}
return dateHistogramGroupSource; return dateHistogramGroupSource;
} }

View File

@ -19,6 +19,8 @@ public interface SyncConfig extends ToXContentObject, NamedWriteable {
*/ */
boolean isValid(); boolean isValid();
String getField();
QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint); QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint);
QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint); QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint);

View File

@ -25,7 +25,7 @@ import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class TimeSyncConfig implements SyncConfig { public class TimeSyncConfig implements SyncConfig {
public static final TimeValue DEFAULT_DELAY = TimeValue.timeValueSeconds(60); public static final TimeValue DEFAULT_DELAY = TimeValue.timeValueSeconds(60);
private static final String NAME = "data_frame_transform_pivot_sync_time"; private static final String NAME = "data_frame_transform_pivot_sync_time";
@ -37,17 +37,18 @@ public class TimeSyncConfig implements SyncConfig {
private static final ConstructingObjectParser<TimeSyncConfig, Void> LENIENT_PARSER = createParser(true); private static final ConstructingObjectParser<TimeSyncConfig, Void> LENIENT_PARSER = createParser(true);
private static ConstructingObjectParser<TimeSyncConfig, Void> createParser(boolean lenient) { private static ConstructingObjectParser<TimeSyncConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<TimeSyncConfig, Void> parser = new ConstructingObjectParser<>(NAME, lenient, ConstructingObjectParser<TimeSyncConfig, Void> parser = new ConstructingObjectParser<>(NAME, lenient, args -> {
args -> { String field = (String) args[0];
String field = (String) args[0]; TimeValue delay = (TimeValue) args[1];
TimeValue delay = (TimeValue) args[1]; return new TimeSyncConfig(field, delay);
return new TimeSyncConfig(field, delay); });
});
parser.declareString(constructorArg(), TransformField.FIELD); parser.declareString(constructorArg(), TransformField.FIELD);
parser.declareField(optionalConstructorArg(), parser.declareField(
optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), DEFAULT_DELAY, TransformField.DELAY.getPreferredName()), (p, c) -> TimeValue.parseTimeValue(p.text(), DEFAULT_DELAY, TransformField.DELAY.getPreferredName()),
TransformField.DELAY, TransformField.DELAY,
ObjectParser.ValueType.STRING); ObjectParser.ValueType.STRING
);
return parser; return parser;
} }
@ -65,6 +66,7 @@ public class TimeSyncConfig implements SyncConfig {
this.delay = in.readTimeValue(); this.delay = in.readTimeValue();
} }
@Override
public String getField() { public String getField() {
return field; return field;
} }
@ -105,12 +107,11 @@ public class TimeSyncConfig implements SyncConfig {
final TimeSyncConfig that = (TimeSyncConfig) other; final TimeSyncConfig that = (TimeSyncConfig) other;
return Objects.equals(this.field, that.field) return Objects.equals(this.field, that.field) && Objects.equals(this.delay, that.delay);
&& Objects.equals(this.delay, that.delay);
} }
@Override @Override
public int hashCode(){ public int hashCode() {
return Objects.hash(field, delay); return Objects.hash(field, delay);
} }
@ -139,7 +140,8 @@ public class TimeSyncConfig implements SyncConfig {
@Override @Override
public QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint) { public QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint) {
return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound()).lt(newCheckpoint.getTimeUpperBound()) return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound())
.format("epoch_millis"); .lt(newCheckpoint.getTimeUpperBound())
.format("epoch_millis");
} }
} }

View File

@ -7,15 +7,18 @@ package org.elasticsearch.xpack.core.transform.transforms.pivot;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
@ -105,6 +108,11 @@ public class DateHistogramGroupSource extends SingleGroupSource {
public int hashCode() { public int hashCode() {
return Objects.hash(interval); return Objects.hash(interval);
} }
@Override
public String toString() {
return interval.toString();
}
} }
public static class CalendarInterval implements Interval { public static class CalendarInterval implements Interval {
@ -169,6 +177,11 @@ public class DateHistogramGroupSource extends SingleGroupSource {
public int hashCode() { public int hashCode() {
return Objects.hash(interval); return Objects.hash(interval);
} }
@Override
public String toString() {
return interval.toString();
}
} }
private Interval readInterval(StreamInput in) throws IOException { private Interval readInterval(StreamInput in) throws IOException {
@ -195,11 +208,26 @@ public class DateHistogramGroupSource extends SingleGroupSource {
private static final ConstructingObjectParser<DateHistogramGroupSource, Void> LENIENT_PARSER = createParser(true); private static final ConstructingObjectParser<DateHistogramGroupSource, Void> LENIENT_PARSER = createParser(true);
private final Interval interval; private final Interval interval;
private ZoneId timeZone; private final ZoneId timeZone;
private Rounding rounding;
public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval) { public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval, ZoneId timeZone) {
super(field, scriptConfig); super(field, scriptConfig);
this.interval = interval; this.interval = interval;
this.timeZone = timeZone;
Rounding.DateTimeUnit timeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(interval.toString());
final Rounding.Builder roundingBuilder;
if (timeUnit != null) {
roundingBuilder = new Rounding.Builder(timeUnit);
} else {
roundingBuilder = new Rounding.Builder(TimeValue.parseTimeValue(interval.toString(), interval.getName()));
}
if (timeZone != null) {
roundingBuilder.timeZone(timeZone);
}
this.rounding = roundingBuilder.build();
} }
public DateHistogramGroupSource(StreamInput in) throws IOException { public DateHistogramGroupSource(StreamInput in) throws IOException {
@ -218,6 +246,7 @@ public class DateHistogramGroupSource extends SingleGroupSource {
ScriptConfig scriptConfig = (ScriptConfig) args[1]; ScriptConfig scriptConfig = (ScriptConfig) args[1];
String fixedInterval = (String) args[2]; String fixedInterval = (String) args[2];
String calendarInterval = (String) args[3]; String calendarInterval = (String) args[3];
ZoneId zoneId = (ZoneId) args[4];
Interval interval = null; Interval interval = null;
@ -231,7 +260,7 @@ public class DateHistogramGroupSource extends SingleGroupSource {
throw new IllegalArgumentException("You must specify either fixed_interval or calendar_interval, found none"); throw new IllegalArgumentException("You must specify either fixed_interval or calendar_interval, found none");
} }
return new DateHistogramGroupSource(field, scriptConfig, interval); return new DateHistogramGroupSource(field, scriptConfig, interval, zoneId);
}); });
declareValuesSourceFields(parser, lenient); declareValuesSourceFields(parser, lenient);
@ -239,7 +268,7 @@ public class DateHistogramGroupSource extends SingleGroupSource {
parser.declareString(optionalConstructorArg(), new ParseField(FixedInterval.NAME)); parser.declareString(optionalConstructorArg(), new ParseField(FixedInterval.NAME));
parser.declareString(optionalConstructorArg(), new ParseField(CalendarInterval.NAME)); parser.declareString(optionalConstructorArg(), new ParseField(CalendarInterval.NAME));
parser.declareField(DateHistogramGroupSource::setTimeZone, p -> { parser.declareField(optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) { if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return ZoneId.of(p.text()); return ZoneId.of(p.text());
} else { } else {
@ -267,8 +296,8 @@ public class DateHistogramGroupSource extends SingleGroupSource {
return timeZone; return timeZone;
} }
public void setTimeZone(ZoneId timeZone) { public Rounding getRounding() {
this.timeZone = timeZone; return rounding;
} }
@Override @Override
@ -315,9 +344,16 @@ public class DateHistogramGroupSource extends SingleGroupSource {
} }
@Override @Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) { public QueryBuilder getIncrementalBucketUpdateFilterQuery(
// no need for an extra range filter as this is already done by checkpoints Set<String> changedBuckets,
return null; String synchronizationField,
long synchronizationTimestamp
) {
if (synchronizationField != null && synchronizationField.equals(field) && synchronizationTimestamp > 0) {
return new RangeQueryBuilder(field).gte(rounding.round(synchronizationTimestamp)).format("epoch_millis");
} else {
return null;
}
} }
@Override @Override

View File

@ -101,7 +101,11 @@ public class HistogramGroupSource extends SingleGroupSource {
} }
@Override @Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) { public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
// histograms are simple and cheap, so we skip this optimization // histograms are simple and cheap, so we skip this optimization
return null; return null;
} }

View File

@ -116,7 +116,11 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject {
public abstract boolean supportsIncrementalBucketUpdate(); public abstract boolean supportsIncrementalBucketUpdate();
public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets); public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
);
public String getField() { public String getField() {
return field; return field;

View File

@ -54,8 +54,15 @@ public class TermsGroupSource extends SingleGroupSource {
} }
@Override @Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) { public QueryBuilder getIncrementalBucketUpdateFilterQuery(
return new TermsQueryBuilder(field, changedBuckets); Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
if (changedBuckets != null && changedBuckets.isEmpty() == false) {
return new TermsQueryBuilder(field, changedBuckets);
}
return null;
} }
@Override @Override

View File

@ -21,9 +21,9 @@ import java.time.Instant;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig;
import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig;
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig; import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig;
import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
public class TransformConfigUpdateTests extends AbstractSerializingTransformTestCase<TransformConfigUpdate> { public class TransformConfigUpdateTests extends AbstractSerializingTransformTestCase<TransformConfigUpdate> {
@ -184,6 +184,11 @@ public class TransformConfigUpdateTests extends AbstractSerializingTransformTest
return "foo"; return "foo";
} }
@Override
public String getField() {
return "foo";
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException {} public void writeTo(StreamOutput out) throws IOException {}

View File

@ -10,11 +10,17 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException; import java.io.IOException;
import java.time.ZoneOffset;
import java.time.temporal.TemporalAccessor;
import static org.hamcrest.Matchers.equalTo;
public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase<DateHistogramGroupSource> { public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase<DateHistogramGroupSource> {
@ -26,19 +32,20 @@ public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase<D
dateHistogramGroupSource = new DateHistogramGroupSource( dateHistogramGroupSource = new DateHistogramGroupSource(
field, field,
scriptConfig, scriptConfig,
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())) new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomTimeValue(1, 100, "d", "h", "ms", "s", "m"))),
randomBoolean() ? randomZone() : null
); );
} else { } else {
dateHistogramGroupSource = new DateHistogramGroupSource( dateHistogramGroupSource = new DateHistogramGroupSource(
field, field,
scriptConfig, scriptConfig,
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))) new DateHistogramGroupSource.CalendarInterval(
new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w", "M", "q", "y"))
),
randomBoolean() ? randomZone() : null
); );
} }
if (randomBoolean()) {
dateHistogramGroupSource.setTimeZone(randomZone());
}
return dateHistogramGroupSource; return dateHistogramGroupSource;
} }
@ -70,4 +77,56 @@ public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase<D
return DateHistogramGroupSource::new; return DateHistogramGroupSource::new;
} }
public void testRoundingDateHistogramFixedInterval() {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource(
field,
null,
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval("1d")),
null
);
// not meant to be complete rounding tests, see {@link RoundingTests} for more
assertNotNull(dateHistogramGroupSource.getRounding());
assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-26T23:59:59.000Z")),
equalTo(time("2020-03-26T00:00:00.000Z"))
);
assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-26T00:00:01.000Z")),
equalTo(time("2020-03-26T00:00:00.000Z"))
);
}
public void testRoundingDateHistogramCalendarInterval() {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource(
field,
null,
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval("1w")),
null
);
// not meant to be complete rounding tests, see {@link RoundingTests} for more
assertNotNull(dateHistogramGroupSource.getRounding());
assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-26T23:59:59.000Z")),
equalTo(time("2020-03-23T00:00:00.000Z"))
);
assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-29T23:59:59.000Z")),
equalTo(time("2020-03-23T00:00:00.000Z"))
);
assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-23T00:00:01.000Z")),
equalTo(time("2020-03-23T00:00:00.000Z"))
);
}
private static long time(String time) {
TemporalAccessor accessor = DateFormatter.forPattern("date_optional_time").withZone(ZoneOffset.UTC).parse(time);
return DateFormatters.from(accessor).toInstant().toEpochMilli();
}
} }

View File

@ -64,15 +64,11 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
* which query filters to run and which index requests to send * which query filters to run and which index requests to send
*/ */
private enum RunState { private enum RunState {
// do a complete query/index, this is used for batch transforms and for bootstrapping (1st run) // apply bucket results
FULL_RUN, APPLY_BUCKET_RESULTS,
// Partial run modes in 2 stages: // identify buckets that have changed, used for continuous if terms is used in group_by
// identify buckets that have changed IDENTIFY_CHANGES,
PARTIAL_RUN_IDENTIFY_CHANGES,
// recalculate buckets based on the update list
PARTIAL_RUN_APPLY_CHANGES
} }
public static final int MINIMUM_PAGE_SIZE = 10; public static final int MINIMUM_PAGE_SIZE = 10;
@ -112,7 +108,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
private volatile RunState runState; private volatile RunState runState;
// hold information for continuous mode (partial updates) // hold information for continuous mode (partial updates)
private volatile Map<String, Set<String>> changedBuckets; private volatile Map<String, Set<String>> changedBuckets = Collections.emptyMap();
private volatile Map<String, Object> changedBucketsAfterKey; private volatile Map<String, Object> changedBucketsAfterKey;
private volatile long lastCheckpointCleanup = 0L; private volatile long lastCheckpointCleanup = 0L;
@ -146,7 +142,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
this.context = ExceptionsHelper.requireNonNull(context, "context"); this.context = ExceptionsHelper.requireNonNull(context, "context");
// give runState a default // give runState a default
this.runState = RunState.FULL_RUN; this.runState = RunState.APPLY_BUCKET_RESULTS;
} }
public int getPageSize() { public int getPageSize() {
@ -342,7 +338,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
// reset the page size, so we do not memorize a low page size forever // reset the page size, so we do not memorize a low page size forever
pageSize = pivot.getInitialPageSize(); pageSize = pivot.getInitialPageSize();
// reset the changed bucket to free memory // reset the changed bucket to free memory
changedBuckets = null; changedBuckets = Collections.emptyMap();
long checkpoint = context.getAndIncrementCheckpoint(); long checkpoint = context.getAndIncrementCheckpoint();
lastCheckpoint = getNextCheckpoint(); lastCheckpoint = getNextCheckpoint();
@ -414,11 +410,9 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME); final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME);
switch (runState) { switch (runState) {
case FULL_RUN: case APPLY_BUCKET_RESULTS:
return processBuckets(agg); return processBuckets(agg);
case PARTIAL_RUN_APPLY_CHANGES: case IDENTIFY_CHANGES:
return processPartialBucketUpdates(agg);
case PARTIAL_RUN_IDENTIFY_CHANGES:
return processChangedBuckets(agg); return processChangedBuckets(agg);
default: default:
@ -582,7 +576,19 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
private IterationResult<TransformIndexerPosition> processBuckets(final CompositeAggregation agg) { private IterationResult<TransformIndexerPosition> processBuckets(final CompositeAggregation agg) {
// we reached the end // we reached the end
if (agg.getBuckets().isEmpty()) { if (agg.getBuckets().isEmpty()) {
return new IterationResult<>(Collections.emptyList(), null, true); if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false || pivot.supportsIncrementalBucketUpdate() == false) {
return new IterationResult<>(Collections.emptyList(), null, true);
}
// cleanup changed Buckets
changedBuckets = Collections.emptyMap();
// reset the runState to fetch changed buckets
runState = RunState.IDENTIFY_CHANGES;
// advance the cursor for changed bucket detection
return new IterationResult<>(Collections.emptyList(), new TransformIndexerPosition(null, changedBucketsAfterKey), false);
} }
long docsBeforeProcess = getStats().getNumDocuments(); long docsBeforeProcess = getStats().getNumDocuments();
@ -608,21 +614,6 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
return result; return result;
} }
private IterationResult<TransformIndexerPosition> processPartialBucketUpdates(final CompositeAggregation agg) {
// we reached the end
if (agg.getBuckets().isEmpty()) {
// cleanup changed Buckets
changedBuckets = null;
// reset the runState to fetch changed buckets
runState = RunState.PARTIAL_RUN_IDENTIFY_CHANGES;
// advance the cursor for changed bucket detection
return new IterationResult<>(Collections.emptyList(), new TransformIndexerPosition(null, changedBucketsAfterKey), false);
}
return processBuckets(agg);
}
private IterationResult<TransformIndexerPosition> processChangedBuckets(final CompositeAggregation agg) { private IterationResult<TransformIndexerPosition> processChangedBuckets(final CompositeAggregation agg) {
// initialize the map of changed buckets, the map might be empty if source do not require/implement // initialize the map of changed buckets, the map might be empty if source do not require/implement
// changed bucket detection // changed bucket detection
@ -631,7 +622,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
// reached the end? // reached the end?
if (agg.getBuckets().isEmpty()) { if (agg.getBuckets().isEmpty()) {
// reset everything and return the end marker // reset everything and return the end marker
changedBuckets = null; changedBuckets = Collections.emptyMap();
changedBucketsAfterKey = null; changedBucketsAfterKey = null;
return new IterationResult<>(Collections.emptyList(), null, true); return new IterationResult<>(Collections.emptyList(), null, true);
} }
@ -644,7 +635,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
changedBucketsAfterKey = agg.afterKey(); changedBucketsAfterKey = agg.afterKey();
// reset the runState to fetch the partial updates next // reset the runState to fetch the partial updates next
runState = RunState.PARTIAL_RUN_APPLY_CHANGES; runState = RunState.APPLY_BUCKET_RESULTS;
return new IterationResult<>(Collections.emptyList(), getPosition(), false); return new IterationResult<>(Collections.emptyList(), getPosition(), false);
} }
@ -721,15 +712,12 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0);
switch (runState) { switch (runState) {
case FULL_RUN: case APPLY_BUCKET_RESULTS:
buildFullRunQuery(sourceBuilder); buildUpdateQuery(sourceBuilder);
break; break;
case PARTIAL_RUN_IDENTIFY_CHANGES: case IDENTIFY_CHANGES:
buildChangedBucketsQuery(sourceBuilder); buildChangedBucketsQuery(sourceBuilder);
break; break;
case PARTIAL_RUN_APPLY_CHANGES:
buildPartialUpdateQuery(sourceBuilder);
break;
default: default:
// Any other state is a bug, should not happen // Any other state is a bug, should not happen
logger.warn("Encountered unexpected run state [" + runState + "]"); logger.warn("Encountered unexpected run state [" + runState + "]");
@ -740,26 +728,6 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
return searchRequest; return searchRequest;
} }
private SearchSourceBuilder buildFullRunQuery(SearchSourceBuilder sourceBuilder) {
TransformIndexerPosition position = getPosition();
sourceBuilder.aggregation(pivot.buildAggregation(position != null ? position.getIndexerPosition() : null, pageSize));
TransformConfig config = getConfig();
QueryBuilder pivotQueryBuilder = config.getSource().getQueryConfig().getQuery();
if (isContinuous()) {
BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(pivotQueryBuilder)
.filter(config.getSyncConfig().getRangeQuery(nextCheckpoint));
sourceBuilder.query(filteredQuery);
} else {
sourceBuilder.query(pivotQueryBuilder);
}
logger.trace("running full run query: {}", sourceBuilder);
return sourceBuilder;
}
private SearchSourceBuilder buildChangedBucketsQuery(SearchSourceBuilder sourceBuilder) { private SearchSourceBuilder buildChangedBucketsQuery(SearchSourceBuilder sourceBuilder) {
assert isContinuous(); assert isContinuous();
@ -781,9 +749,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
return sourceBuilder; return sourceBuilder;
} }
private SearchSourceBuilder buildPartialUpdateQuery(SearchSourceBuilder sourceBuilder) { private SearchSourceBuilder buildUpdateQuery(SearchSourceBuilder sourceBuilder) {
assert isContinuous();
TransformIndexerPosition position = getPosition(); TransformIndexerPosition position = getPosition();
sourceBuilder.aggregation(pivot.buildAggregation(position != null ? position.getIndexerPosition() : null, pageSize)); sourceBuilder.aggregation(pivot.buildAggregation(position != null ? position.getIndexerPosition() : null, pageSize));
@ -791,18 +757,28 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
QueryBuilder pivotQueryBuilder = config.getSource().getQueryConfig().getQuery(); QueryBuilder pivotQueryBuilder = config.getSource().getQueryConfig().getQuery();
// if its either the 1st run or not continuous, do not apply extra filters
if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) {
sourceBuilder.query(pivotQueryBuilder);
logger.trace("running query: {}", sourceBuilder);
return sourceBuilder;
}
BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(pivotQueryBuilder) BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(pivotQueryBuilder)
.filter(config.getSyncConfig().getRangeQuery(nextCheckpoint)); .filter(config.getSyncConfig().getRangeQuery(nextCheckpoint));
if (changedBuckets != null && changedBuckets.isEmpty() == false) { QueryBuilder pivotFilter = pivot.filterBuckets(
QueryBuilder pivotFilter = pivot.filterBuckets(changedBuckets); changedBuckets,
if (pivotFilter != null) { config.getSyncConfig().getField(),
filteredQuery.filter(pivotFilter); lastCheckpoint.getTimeUpperBound()
} );
if (pivotFilter != null) {
filteredQuery.filter(pivotFilter);
} }
sourceBuilder.query(filteredQuery); sourceBuilder.query(filteredQuery);
logger.trace("running partial update query: {}", sourceBuilder); logger.trace("running query: {}", sourceBuilder);
return sourceBuilder; return sourceBuilder;
} }
@ -903,16 +879,16 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
private RunState determineRunStateAtStart() { private RunState determineRunStateAtStart() {
// either 1st run or not a continuous transform // either 1st run or not a continuous transform
if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) { if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) {
return RunState.FULL_RUN; return RunState.APPLY_BUCKET_RESULTS;
} }
// if incremental update is not supported, do a full run // if incremental update is not supported, do a normal run
if (pivot.supportsIncrementalBucketUpdate() == false) { if (pivot.supportsIncrementalBucketUpdate() == false) {
return RunState.FULL_RUN; return RunState.APPLY_BUCKET_RESULTS;
} }
// continuous mode: we need to get the changed buckets first // continuous mode: we need to get the changed buckets first
return RunState.PARTIAL_RUN_IDENTIFY_CHANGES; return RunState.IDENTIFY_CHANGES;
} }
/** /**

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.transform.transforms.pivot;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchAction;
@ -174,7 +175,6 @@ public class Pivot {
Map<String, String> fieldTypeMap, Map<String, String> fieldTypeMap,
TransformIndexerStats transformIndexerStats TransformIndexerStats transformIndexerStats
) { ) {
GroupConfig groups = config.getGroupConfig(); GroupConfig groups = config.getGroupConfig();
Collection<AggregationBuilder> aggregationBuilders = config.getAggregationConfig().getAggregatorFactories(); Collection<AggregationBuilder> aggregationBuilders = config.getAggregationConfig().getAggregatorFactories();
Collection<PipelineAggregationBuilder> pipelineAggregationBuilders = config.getAggregationConfig().getPipelineAggregatorFactories(); Collection<PipelineAggregationBuilder> pipelineAggregationBuilders = config.getAggregationConfig().getPipelineAggregatorFactories();
@ -189,44 +189,31 @@ public class Pivot {
); );
} }
public QueryBuilder filterBuckets(Map<String, Set<String>> changedBuckets) { public QueryBuilder filterBuckets(
Map<String, Set<String>> changedBuckets,
if (changedBuckets == null || changedBuckets.isEmpty()) { String synchronizationField,
return null; long lastSynchronizationCheckpoint
} ) {
assert changedBuckets != null;
if (config.getGroupConfig().getGroups().size() == 1) { if (config.getGroupConfig().getGroups().size() == 1) {
Entry<String, SingleGroupSource> entry = config.getGroupConfig().getGroups().entrySet().iterator().next(); Entry<String, SingleGroupSource> entry = config.getGroupConfig().getGroups().entrySet().iterator().next();
// it should not be possible to get into this code path logger.trace(() -> new ParameterizedMessage("filter by bucket: {}/{}", entry.getKey(), entry.getValue().getField()));
assert (entry.getValue().supportsIncrementalBucketUpdate()); Set<String> changedBucketsByGroup = changedBuckets.get(entry.getKey());
return entry.getValue()
logger.trace("filter by bucket: " + entry.getKey() + "/" + entry.getValue().getField()); .getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationField, lastSynchronizationCheckpoint);
if (changedBuckets.containsKey(entry.getKey())) {
return entry.getValue().getIncrementalBucketUpdateFilterQuery(changedBuckets.get(entry.getKey()));
} else {
// should never happen
throw new RuntimeException("Could not find bucket value for key " + entry.getKey());
}
} }
// else: more than 1 group by, need to nest it // else: more than 1 group by, need to nest it
BoolQueryBuilder filteredQuery = new BoolQueryBuilder(); BoolQueryBuilder filteredQuery = new BoolQueryBuilder();
for (Entry<String, SingleGroupSource> entry : config.getGroupConfig().getGroups().entrySet()) { for (Entry<String, SingleGroupSource> entry : config.getGroupConfig().getGroups().entrySet()) {
if (entry.getValue().supportsIncrementalBucketUpdate() == false) { Set<String> changedBucketsByGroup = changedBuckets.get(entry.getKey());
continue; QueryBuilder sourceQueryFilter = entry.getValue()
.getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationField, lastSynchronizationCheckpoint);
// the source might not define a filter optimization
if (sourceQueryFilter != null) {
filteredQuery.filter(sourceQueryFilter);
} }
if (changedBuckets.containsKey(entry.getKey())) {
QueryBuilder sourceQueryFilter = entry.getValue().getIncrementalBucketUpdateFilterQuery(changedBuckets.get(entry.getKey()));
// the source might not define an filter optimization
if (sourceQueryFilter != null) {
filteredQuery.filter(sourceQueryFilter);
}
} else {
// should never happen
throw new RuntimeException("Could not find bucket value for key " + entry.getKey());
}
} }
return filteredQuery; return filteredQuery;