From 0c343d844343e12694333a0d9f53c890bb07028a Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 14 Aug 2019 13:08:27 -0500 Subject: [PATCH] [7.x] [ML][Transforms] adjusting stats.progress for cont. transforms (#45361) (#45551) * [ML][Transforms] adjusting stats.progress for cont. transforms (#45361) * [ML][Transforms] adjusting stats.progress for cont. transforms * addressing PR comments * rename fix * Adjusting bwc serialization versions --- .../client/core/IndexerJobStats.java | 20 +-- .../DataFrameIndexerTransformStats.java | 74 +++++++++- .../DataFrameTransformCheckpointingInfo.java | 41 ++++-- .../DataFrameTransformProgress.java | 49 +++++-- .../client/DataFrameTransformIT.java | 15 +- .../DataFrameIndexerTransformStatsTests.java | 11 +- ...aFrameTransformCheckpointingInfoTests.java | 7 +- .../DataFrameTransformProgressTests.java | 25 ++-- .../DataFrameIndexerTransformStatsTests.java | 22 ++- ...ataFrameTransformCheckpointStatsTests.java | 52 +++---- ...aFrameTransformCheckpointingInfoTests.java | 15 +- .../hlrc/DataFrameTransformProgressTests.java | 17 ++- .../hlrc/DataFrameTransformStatsTests.java | 19 ++- .../apis/get-transform-stats.asciidoc | 11 +- .../DataFrameIndexerTransformStats.java | 100 +++++++++++--- .../DataFrameTransformCheckpointingInfo.java | 73 ++++++++-- .../DataFrameTransformProgress.java | 130 ++++++++++++------ .../DataFrameIndexerTransformStatsTests.java | 33 +++-- ...aFrameTransformCheckpointingInfoTests.java | 29 +++- .../DataFrameTransformProgressTests.java | 69 ++++++++-- .../DataFrameGetAndGetStatsIT.java | 48 ++++--- .../DataFrameTransformProgressIT.java | 6 +- .../persistence/DataFrameInternalIndex.java | 16 +++ .../transforms/DataFrameIndexer.java | 3 +- .../transforms/DataFrameTransformTask.java | 43 +++++- .../transforms/TransformProgressGatherer.java | 2 +- .../test/data_frame/transforms_stats.yml | 3 + 27 files changed, 712 insertions(+), 221 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/core/IndexerJobStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/core/IndexerJobStats.java index a358f53d988..5e59b4b19db 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/core/IndexerJobStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/core/IndexerJobStats.java @@ -36,16 +36,16 @@ public abstract class IndexerJobStats { public static ParseField SEARCH_FAILURES = new ParseField("search_failures"); public static ParseField INDEX_FAILURES = new ParseField("index_failures"); - private final long numPages; - private final long numInputDocuments; - private final long numOuputDocuments; - private final long numInvocations; - private final long indexTime; - private final long indexTotal; - private final long searchTime; - private final long searchTotal; - private final long indexFailures; - private final long searchFailures; + protected final long numPages; + protected final long numInputDocuments; + protected final long numOuputDocuments; + protected final long numInvocations; + protected final long indexTime; + protected final long indexTotal; + protected final long searchTime; + protected final long searchTotal; + protected final long indexFailures; + protected final long searchFailures; public IndexerJobStats(long numPages, long numInputDocuments, long numOutputDocuments, long numInvocations, long indexTime, long searchTime, long indexTotal, long searchTotal, long indexFailures, long searchFailures) { diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerTransformStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerTransformStats.java index 356c3d98916..f184810609a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerTransformStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerTransformStats.java @@ -20,18 +20,28 @@ package org.elasticsearch.client.dataframe.transforms; import org.elasticsearch.client.core.IndexerJobStats; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; +import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; public class DataFrameIndexerTransformStats extends IndexerJobStats { + static ParseField EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS = new ParseField("exponential_avg_checkpoint_duration_ms"); + static ParseField EXPONENTIAL_AVG_DOCUMENTS_INDEXED = new ParseField("exponential_avg_documents_indexed"); + static ParseField EXPONENTIAL_AVG_DOCUMENTS_PROCESSED = new ParseField("exponential_avg_documents_processed"); + public static final ConstructingObjectParser LENIENT_PARSER = new ConstructingObjectParser<>( - NAME, true, args -> new DataFrameIndexerTransformStats((long) args[0], (long) args[1], (long) args[2], - (long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9])); + NAME, + true, + args -> new DataFrameIndexerTransformStats((long) args[0], (long) args[1], (long) args[2], + (long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9], + (Double) args[10], (Double) args[11], (Double) args[12])); static { LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES); @@ -44,16 +54,74 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats { LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TOTAL); LENIENT_PARSER.declareLong(constructorArg(), INDEX_FAILURES); LENIENT_PARSER.declareLong(constructorArg(), SEARCH_FAILURES); + LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS); + LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_INDEXED); + LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_PROCESSED); } public static DataFrameIndexerTransformStats fromXContent(XContentParser parser) throws IOException { return LENIENT_PARSER.parse(parser, null); } + private final double expAvgCheckpointDurationMs; + private final double expAvgDocumentsIndexed; + private final double expAvgDocumentsProcessed; + public DataFrameIndexerTransformStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations, long indexTime, long searchTime, - long indexTotal, long searchTotal, long indexFailures, long searchFailures) { + long indexTotal, long searchTotal, long indexFailures, long searchFailures, + Double expAvgCheckpointDurationMs, Double expAvgDocumentsIndexed, + Double expAvgDocumentsProcessed) { super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal, indexFailures, searchFailures); + this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs == null ? 0.0 : expAvgCheckpointDurationMs; + this.expAvgDocumentsIndexed = expAvgDocumentsIndexed == null ? 0.0 : expAvgDocumentsIndexed; + this.expAvgDocumentsProcessed = expAvgDocumentsProcessed == null ? 0.0 : expAvgDocumentsProcessed; + } + + public double getExpAvgCheckpointDurationMs() { + return expAvgCheckpointDurationMs; + } + + public double getExpAvgDocumentsIndexed() { + return expAvgDocumentsIndexed; + } + + public double getExpAvgDocumentsProcessed() { + return expAvgDocumentsProcessed; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + DataFrameIndexerTransformStats that = (DataFrameIndexerTransformStats) other; + + return Objects.equals(this.numPages, that.numPages) + && Objects.equals(this.numInputDocuments, that.numInputDocuments) + && Objects.equals(this.numOuputDocuments, that.numOuputDocuments) + && Objects.equals(this.numInvocations, that.numInvocations) + && Objects.equals(this.indexTime, that.indexTime) + && Objects.equals(this.searchTime, that.searchTime) + && Objects.equals(this.indexFailures, that.indexFailures) + && Objects.equals(this.searchFailures, that.searchFailures) + && Objects.equals(this.indexTotal, that.indexTotal) + && Objects.equals(this.searchTotal, that.searchTotal) + && Objects.equals(this.expAvgCheckpointDurationMs, that.expAvgCheckpointDurationMs) + && Objects.equals(this.expAvgDocumentsIndexed, that.expAvgDocumentsIndexed) + && Objects.equals(this.expAvgDocumentsProcessed, that.expAvgDocumentsProcessed); + } + + @Override + public int hashCode() { + return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations, + indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal, + expAvgCheckpointDurationMs, expAvgDocumentsIndexed, expAvgDocumentsProcessed); } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointingInfo.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointingInfo.java index a5ef7a1ce0e..659863c3cf3 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointingInfo.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointingInfo.java @@ -19,10 +19,14 @@ package org.elasticsearch.client.dataframe.transforms; +import org.elasticsearch.client.dataframe.transforms.util.TimeUtil; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentParser; +import java.time.Instant; import java.util.Objects; public class DataFrameTransformCheckpointingInfo { @@ -30,19 +34,25 @@ public class DataFrameTransformCheckpointingInfo { public static final ParseField LAST_CHECKPOINT = new ParseField("last", "current"); public static final ParseField NEXT_CHECKPOINT = new ParseField("next", "in_progress"); public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind"); + public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at"); private final DataFrameTransformCheckpointStats last; private final DataFrameTransformCheckpointStats next; private final long operationsBehind; + private final Instant changesLastDetectedAt; private static final ConstructingObjectParser LENIENT_PARSER = new ConstructingObjectParser<>( - "data_frame_transform_checkpointing_info", true, a -> { + "data_frame_transform_checkpointing_info", + true, + a -> { long behind = a[2] == null ? 0L : (Long) a[2]; - + Instant changesLastDetectedAt = (Instant)a[3]; return new DataFrameTransformCheckpointingInfo( - a[0] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[0], - a[1] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[1], behind); + a[0] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[0], + a[1] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[1], + behind, + changesLastDetectedAt); }); static { @@ -51,13 +61,20 @@ public class DataFrameTransformCheckpointingInfo { LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), NEXT_CHECKPOINT); LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND); + LENIENT_PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), + p -> TimeUtil.parseTimeFieldToInstant(p, CHANGES_LAST_DETECTED_AT.getPreferredName()), + CHANGES_LAST_DETECTED_AT, + ObjectParser.ValueType.VALUE); } - public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats last, DataFrameTransformCheckpointStats next, - long operationsBehind) { + public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats last, + DataFrameTransformCheckpointStats next, + long operationsBehind, + Instant changesLastDetectedAt) { this.last = Objects.requireNonNull(last); this.next = Objects.requireNonNull(next); this.operationsBehind = operationsBehind; + this.changesLastDetectedAt = changesLastDetectedAt; } public DataFrameTransformCheckpointStats getLast() { @@ -72,13 +89,18 @@ public class DataFrameTransformCheckpointingInfo { return operationsBehind; } + @Nullable + public Instant getChangesLastDetectedAt() { + return changesLastDetectedAt; + } + public static DataFrameTransformCheckpointingInfo fromXContent(XContentParser p) { return LENIENT_PARSER.apply(p, null); } @Override public int hashCode() { - return Objects.hash(last, next, operationsBehind); + return Objects.hash(last, next, operationsBehind, changesLastDetectedAt); } @Override @@ -94,8 +116,9 @@ public class DataFrameTransformCheckpointingInfo { DataFrameTransformCheckpointingInfo that = (DataFrameTransformCheckpointingInfo) other; return Objects.equals(this.last, that.last) && - Objects.equals(this.next, that.next) && - this.operationsBehind == that.operationsBehind; + Objects.equals(this.next, that.next) && + this.operationsBehind == that.operationsBehind && + Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt); } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformProgress.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformProgress.java index a4177a33487..96920651035 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformProgress.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformProgress.java @@ -19,13 +19,13 @@ package org.elasticsearch.client.dataframe.transforms; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentParser; import java.util.Objects; -import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; public class DataFrameTransformProgress { @@ -33,44 +33,67 @@ public class DataFrameTransformProgress { public static final ParseField TOTAL_DOCS = new ParseField("total_docs"); public static final ParseField DOCS_REMAINING = new ParseField("docs_remaining"); public static final ParseField PERCENT_COMPLETE = new ParseField("percent_complete"); + public static final ParseField DOCS_PROCESSED = new ParseField("docs_processed"); + public static final ParseField DOCS_INDEXED = new ParseField("docs_indexed"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "data_frame_transform_progress", true, - a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1], (Double)a[2])); + a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1], (Double)a[2], (Long)a[3], (Long)a[4])); static { - PARSER.declareLong(constructorArg(), TOTAL_DOCS); + PARSER.declareLong(optionalConstructorArg(), TOTAL_DOCS); PARSER.declareLong(optionalConstructorArg(), DOCS_REMAINING); PARSER.declareDouble(optionalConstructorArg(), PERCENT_COMPLETE); + PARSER.declareLong(optionalConstructorArg(), DOCS_PROCESSED); + PARSER.declareLong(optionalConstructorArg(), DOCS_INDEXED); } public static DataFrameTransformProgress fromXContent(XContentParser parser) { return PARSER.apply(parser, null); } - private final long totalDocs; - private final long remainingDocs; - private final double percentComplete; + private final Long totalDocs; + private final Long remainingDocs; + private final Double percentComplete; + private final long documentsProcessed; + private final long documentsIndexed; - public DataFrameTransformProgress(long totalDocs, Long remainingDocs, double percentComplete) { + public DataFrameTransformProgress(Long totalDocs, + Long remainingDocs, + Double percentComplete, + Long documentsProcessed, + Long documentsIndexed) { this.totalDocs = totalDocs; this.remainingDocs = remainingDocs == null ? totalDocs : remainingDocs; this.percentComplete = percentComplete; + this.documentsProcessed = documentsProcessed == null ? 0 : documentsProcessed; + this.documentsIndexed = documentsIndexed == null ? 0 : documentsIndexed; } - public double getPercentComplete() { + @Nullable + public Double getPercentComplete() { return percentComplete; } - public long getTotalDocs() { + @Nullable + public Long getTotalDocs() { return totalDocs; } - public long getRemainingDocs() { + @Nullable + public Long getRemainingDocs() { return remainingDocs; } + public long getDocumentsProcessed() { + return documentsProcessed; + } + + public long getDocumentsIndexed() { + return documentsIndexed; + } + @Override public boolean equals(Object other) { if (other == this) { @@ -84,11 +107,13 @@ public class DataFrameTransformProgress { DataFrameTransformProgress that = (DataFrameTransformProgress) other; return Objects.equals(this.remainingDocs, that.remainingDocs) && Objects.equals(this.totalDocs, that.totalDocs) - && Objects.equals(this.percentComplete, that.percentComplete); + && Objects.equals(this.percentComplete, that.percentComplete) + && Objects.equals(this.documentsIndexed, that.documentsIndexed) + && Objects.equals(this.documentsProcessed, that.documentsProcessed); } @Override public int hashCode(){ - return Objects.hash(remainingDocs, totalDocs, percentComplete); + return Objects.hash(remainingDocs, totalDocs, percentComplete, documentsIndexed, documentsProcessed); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index 2e0175f4164..7633aef964a 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -407,7 +407,20 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase { DataFrameTransformStats stats = statsResponse.getTransformsStats().get(0); assertEquals(DataFrameTransformStats.State.STOPPED, stats.getState()); - DataFrameIndexerTransformStats zeroIndexerStats = new DataFrameIndexerTransformStats(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L); + DataFrameIndexerTransformStats zeroIndexerStats = new DataFrameIndexerTransformStats( + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0.0, + 0.0, + 0.0); assertEquals(zeroIndexerStats, stats.getIndexerStats()); // start the transform diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerTransformStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerTransformStatsTests.java index f5c05ae47fe..75557caea2f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerTransformStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerTransformStatsTests.java @@ -42,7 +42,10 @@ public class DataFrameIndexerTransformStatsTests extends ESTestCase { public static DataFrameIndexerTransformStats randomStats() { return new DataFrameIndexerTransformStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomBoolean() ? null : randomDouble(), + randomBoolean() ? null : randomDouble(), + randomBoolean() ? null : randomDouble()); } public static void toXContent(DataFrameIndexerTransformStats stats, XContentBuilder builder) throws IOException { @@ -57,6 +60,12 @@ public class DataFrameIndexerTransformStatsTests extends ESTestCase { builder.field(IndexerJobStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime()); builder.field(IndexerJobStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal()); builder.field(IndexerJobStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures()); + builder.field(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(), + stats.getExpAvgCheckpointDurationMs()); + builder.field(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(), + stats.getExpAvgDocumentsIndexed()); + builder.field(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(), + stats.getExpAvgDocumentsProcessed()); builder.endObject(); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointingInfoTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointingInfoTests.java index f70a853784a..23ac6748898 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointingInfoTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointingInfoTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import java.time.Instant; import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; @@ -41,7 +42,8 @@ public class DataFrameTransformCheckpointingInfoTests extends ESTestCase { return new DataFrameTransformCheckpointingInfo( DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(), DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(), - randomLongBetween(0, 10000)); + randomLongBetween(0, 10000), + randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong())); } public static void toXContent(DataFrameTransformCheckpointingInfo info, XContentBuilder builder) throws IOException { @@ -55,6 +57,9 @@ public class DataFrameTransformCheckpointingInfoTests extends ESTestCase { DataFrameTransformCheckpointStatsTests.toXContent(info.getNext(), builder); } builder.field(DataFrameTransformCheckpointingInfo.OPERATIONS_BEHIND.getPreferredName(), info.getOperationsBehind()); + if (info.getChangesLastDetectedAt() != null) { + builder.field(DataFrameTransformCheckpointingInfo.CHANGES_LAST_DETECTED_AT.getPreferredName(), info.getChangesLastDetectedAt()); + } builder.endObject(); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformProgressTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformProgressTests.java index 573e2ffdbb9..3792d4855db 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformProgressTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformProgressTests.java @@ -34,22 +34,31 @@ public class DataFrameTransformProgressTests extends ESTestCase { DataFrameTransformProgressTests::toXContent, DataFrameTransformProgress::fromXContent) .supportsUnknownFields(true) - .randomFieldsExcludeFilter(field -> field.startsWith("state")) .test(); } public static DataFrameTransformProgress randomInstance() { - long totalDocs = randomNonNegativeLong(); - Long docsRemaining = randomBoolean() ? null : randomLongBetween(0, totalDocs); - double percentComplete = totalDocs == 0 ? 1.0 : docsRemaining == null ? 0.0 : 100.0*(double)(totalDocs - docsRemaining)/totalDocs; - return new DataFrameTransformProgress(totalDocs, docsRemaining, percentComplete); + return new DataFrameTransformProgress( + randomBoolean() ? null : randomNonNegativeLong(), + randomBoolean() ? null : randomNonNegativeLong(), + randomBoolean() ? null : randomDouble(), + randomBoolean() ? null : randomNonNegativeLong(), + randomBoolean() ? null : randomNonNegativeLong()); } public static void toXContent(DataFrameTransformProgress progress, XContentBuilder builder) throws IOException { builder.startObject(); - builder.field(DataFrameTransformProgress.TOTAL_DOCS.getPreferredName(), progress.getTotalDocs()); - builder.field(DataFrameTransformProgress.DOCS_REMAINING.getPreferredName(), progress.getRemainingDocs()); - builder.field(DataFrameTransformProgress.PERCENT_COMPLETE.getPreferredName(), progress.getPercentComplete()); + if (progress.getTotalDocs() != null) { + builder.field(DataFrameTransformProgress.TOTAL_DOCS.getPreferredName(), progress.getTotalDocs()); + } + if (progress.getPercentComplete() != null) { + builder.field(DataFrameTransformProgress.PERCENT_COMPLETE.getPreferredName(), progress.getPercentComplete()); + } + if (progress.getRemainingDocs() != null) { + builder.field(DataFrameTransformProgress.DOCS_REMAINING.getPreferredName(), progress.getRemainingDocs()); + } + builder.field(DataFrameTransformProgress.DOCS_INDEXED.getPreferredName(), progress.getDocumentsIndexed()); + builder.field(DataFrameTransformProgress.DOCS_PROCESSED.getPreferredName(), progress.getDocumentsProcessed()); builder.endObject(); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameIndexerTransformStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameIndexerTransformStatsTests.java index e42aaa97b22..9aa58105311 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameIndexerTransformStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameIndexerTransformStatsTests.java @@ -31,9 +31,20 @@ public class DataFrameIndexerTransformStatsTests extends AbstractHlrcXContentTes public static DataFrameIndexerTransformStats fromHlrc( org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats instance) { - return new DataFrameIndexerTransformStats(instance.getNumPages(), instance.getNumDocuments(), - instance.getOutputDocuments(), instance.getNumInvocations(), instance.getIndexTime(), instance.getSearchTime(), - instance.getIndexTotal(), instance.getSearchTotal(), instance.getIndexFailures(), instance.getSearchFailures()); + return new DataFrameIndexerTransformStats( + instance.getNumPages(), + instance.getNumDocuments(), + instance.getOutputDocuments(), + instance.getNumInvocations(), + instance.getIndexTime(), + instance.getSearchTime(), + instance.getIndexTotal(), + instance.getSearchTotal(), + instance.getIndexFailures(), + instance.getSearchFailures(), + instance.getExpAvgCheckpointDurationMs(), + instance.getExpAvgDocumentsIndexed(), + instance.getExpAvgDocumentsProcessed()); } @Override @@ -52,7 +63,10 @@ public class DataFrameIndexerTransformStatsTests extends AbstractHlrcXContentTes return new DataFrameIndexerTransformStats(randomLongBetween(10L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), - randomLongBetween(0L, 10000L)); + randomLongBetween(0L, 10000L), + randomBoolean() ? null : randomDouble(), + randomBoolean() ? null : randomDouble(), + randomBoolean() ? null : randomDouble()); } @Override diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformCheckpointStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformCheckpointStatsTests.java index e0976f040de..90dec41467a 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformCheckpointStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformCheckpointStatsTests.java @@ -19,14 +19,15 @@ package org.elasticsearch.client.dataframe.transforms.hlrc; +import org.elasticsearch.client.AbstractResponseTestCase; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.client.AbstractHlrcXContentTestCase; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats; import java.io.IOException; -import java.util.function.Predicate; -public class DataFrameTransformCheckpointStatsTests extends AbstractHlrcXContentTestCase< +import static org.hamcrest.Matchers.equalTo; + +public class DataFrameTransformCheckpointStatsTests extends AbstractResponseTestCase< DataFrameTransformCheckpointStats, org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats> { @@ -39,18 +40,6 @@ public class DataFrameTransformCheckpointStatsTests extends AbstractHlrcXContent instance.getTimeUpperBoundMillis()); } - @Override - public org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats doHlrcParseInstance(XContentParser parser) - throws IOException { - return org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats.fromXContent(parser); - } - - @Override - public DataFrameTransformCheckpointStats convertHlrcToInternal( - org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats instance) { - return fromHlrc(instance); - } - public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() { return new DataFrameTransformCheckpointStats(randomLongBetween(1, 1_000_000), DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(), @@ -59,22 +48,33 @@ public class DataFrameTransformCheckpointStatsTests extends AbstractHlrcXContent } @Override - protected DataFrameTransformCheckpointStats createTestInstance() { - return DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(); + protected DataFrameTransformCheckpointStats createServerTestInstance() { + return randomDataFrameTransformCheckpointStats(); } @Override - protected DataFrameTransformCheckpointStats doParseInstance(XContentParser parser) throws IOException { - return DataFrameTransformCheckpointStats.fromXContent(parser); + protected org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats doParseToClientInstance(XContentParser parser) + throws IOException { + return org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats.fromXContent(parser); } @Override - protected boolean supportsUnknownFields() { - return true; - } - - @Override - protected Predicate getRandomFieldsExcludeFilter() { - return field -> field.startsWith("position"); + protected void assertInstances(DataFrameTransformCheckpointStats serverTestInstance, + org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats clientInstance) { + assertThat(serverTestInstance.getCheckpoint(), equalTo(clientInstance.getCheckpoint())); + assertThat(serverTestInstance.getPosition().getBucketsPosition(), equalTo(clientInstance.getPosition().getBucketsPosition())); + assertThat(serverTestInstance.getPosition().getIndexerPosition(), equalTo(clientInstance.getPosition().getIndexerPosition())); + assertThat(serverTestInstance.getTimestampMillis(), equalTo(clientInstance.getTimestampMillis())); + assertThat(serverTestInstance.getTimeUpperBoundMillis(), equalTo(clientInstance.getTimeUpperBoundMillis())); + if (serverTestInstance.getCheckpointProgress() != null) { + assertThat(serverTestInstance.getCheckpointProgress().getDocumentsIndexed(), + equalTo(clientInstance.getCheckpointProgress().getDocumentsIndexed())); + assertThat(serverTestInstance.getCheckpointProgress().getDocumentsProcessed(), + equalTo(clientInstance.getCheckpointProgress().getDocumentsProcessed())); + assertThat(serverTestInstance.getCheckpointProgress().getPercentComplete(), + equalTo(clientInstance.getCheckpointProgress().getPercentComplete())); + assertThat(serverTestInstance.getCheckpointProgress().getTotalDocs(), + equalTo(clientInstance.getCheckpointProgress().getTotalDocs())); + } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformCheckpointingInfoTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformCheckpointingInfoTests.java index e95eb7bdedc..d9cd95326e4 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformCheckpointingInfoTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformCheckpointingInfoTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.client.AbstractHlrcXContentTestCase; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo; import java.io.IOException; +import java.time.Instant; import java.util.function.Predicate; public class DataFrameTransformCheckpointingInfoTests extends AbstractHlrcXContentTestCase< @@ -33,9 +34,10 @@ public class DataFrameTransformCheckpointingInfoTests extends AbstractHlrcXConte public static DataFrameTransformCheckpointingInfo fromHlrc( org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointingInfo instance) { return new DataFrameTransformCheckpointingInfo( - DataFrameTransformCheckpointStatsTests.fromHlrc(instance.getLast()), - DataFrameTransformCheckpointStatsTests.fromHlrc(instance.getNext()), - instance.getOperationsBehind()); + DataFrameTransformCheckpointStatsTests.fromHlrc(instance.getLast()), + DataFrameTransformCheckpointStatsTests.fromHlrc(instance.getNext()), + instance.getOperationsBehind(), + instance.getChangesLastDetectedAt()); } @Override @@ -50,8 +52,11 @@ public class DataFrameTransformCheckpointingInfoTests extends AbstractHlrcXConte } public static DataFrameTransformCheckpointingInfo randomDataFrameTransformCheckpointingInfo() { - return new DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(), - DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(), randomNonNegativeLong()); + return new DataFrameTransformCheckpointingInfo( + DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(), + DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(), + randomNonNegativeLong(), + randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong())); } @Override diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformProgressTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformProgressTests.java index a9dd1eaecab..c3f8479dd41 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformProgressTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformProgressTests.java @@ -34,12 +34,20 @@ public class DataFrameTransformProgressTests extends AbstractResponseTestCase< if (instance == null) { return null; } - return new DataFrameTransformProgress(instance.getTotalDocs(), instance.getRemainingDocs()); + return new DataFrameTransformProgress(instance.getTotalDocs(), + instance.getRemainingDocs(), + instance.getDocumentsProcessed(), + instance.getDocumentsIndexed()); } public static DataFrameTransformProgress randomDataFrameTransformProgress() { - long totalDocs = randomNonNegativeLong(); - return new DataFrameTransformProgress(totalDocs, randomBoolean() ? null : randomLongBetween(0, totalDocs)); + Long totalDocs = randomBoolean() ? null : randomNonNegativeLong(); + Long docsRemaining = totalDocs != null ? randomLongBetween(0, totalDocs) : null; + return new DataFrameTransformProgress( + totalDocs, + docsRemaining, + totalDocs != null ? totalDocs - docsRemaining : randomNonNegativeLong(), + randomBoolean() ? null : randomNonNegativeLong()); } @Override @@ -56,7 +64,8 @@ public class DataFrameTransformProgressTests extends AbstractResponseTestCase< protected void assertInstances(DataFrameTransformProgress serverTestInstance, org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress clientInstance) { assertThat(serverTestInstance.getTotalDocs(), equalTo(clientInstance.getTotalDocs())); - assertThat(serverTestInstance.getRemainingDocs(), equalTo(clientInstance.getRemainingDocs())); + assertThat(serverTestInstance.getDocumentsProcessed(), equalTo(clientInstance.getDocumentsProcessed())); assertThat(serverTestInstance.getPercentComplete(), equalTo(clientInstance.getPercentComplete())); + assertThat(serverTestInstance.getDocumentsIndexed(), equalTo(clientInstance.getDocumentsIndexed())); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStatsTests.java index e65ecf10d96..42c22cbecc7 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStatsTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStats import org.elasticsearch.xpack.core.dataframe.transforms.NodeAttributes; import java.io.IOException; +import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.function.Predicate; @@ -97,14 +98,19 @@ public class DataFrameTransformStatsTests extends AbstractHlrcXContentTestCase LENIENT_PARSER = new ConstructingObjectParser<>( NAME, true, args -> new DataFrameIndexerTransformStats( - (long) args[0], (long) args[1], (long) args[2], (long) args[3], (long) args[4], (long) args[5], (long) args[6], - (long) args[7], (long) args[8], (long) args[9])); + (long) args[0], (long) args[1], (long) args[2], (long) args[3], (long) args[4], (long) args[5], (long) args[6], + (long) args[7], (long) args[8], (long) args[9], (Double) args[10], (Double) args[11], (Double) args[12])); static { LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES); @@ -53,8 +65,14 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats { LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TOTAL); LENIENT_PARSER.declareLong(constructorArg(), INDEX_FAILURES); LENIENT_PARSER.declareLong(constructorArg(), SEARCH_FAILURES); + LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS); + LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_INDEXED); + LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_PROCESSED); } + private double expAvgCheckpointDurationMs; + private double expAvgDocumentsIndexed; + private double expAvgDocumentsProcessed; /** * Create with all stats set to zero */ @@ -64,14 +82,28 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats { public DataFrameIndexerTransformStats(long numPages, long numInputDocuments, long numOutputDocuments, long numInvocations, long indexTime, long searchTime, long indexTotal, long searchTotal, - long indexFailures, long searchFailures) { + long indexFailures, long searchFailures, Double expAvgCheckpointDurationMs, + Double expAvgDocumentsIndexed, Double expAvgDocumentsProcessed ) { super(numPages, numInputDocuments, numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal, indexFailures, searchFailures); + this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs == null ? 0.0 : expAvgCheckpointDurationMs; + this.expAvgDocumentsIndexed = expAvgDocumentsIndexed == null ? 0.0 : expAvgDocumentsIndexed; + this.expAvgDocumentsProcessed = expAvgDocumentsProcessed == null ? 0.0 : expAvgDocumentsProcessed; + } + + public DataFrameIndexerTransformStats(long numPages, long numInputDocuments, long numOutputDocuments, + long numInvocations, long indexTime, long searchTime, long indexTotal, long searchTotal, + long indexFailures, long searchFailures) { + this(numPages, numInputDocuments, numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal, + indexFailures, searchFailures, 0.0, 0.0, 0.0); } public DataFrameIndexerTransformStats(DataFrameIndexerTransformStats other) { this(other.numPages, other.numInputDocuments, other.numOuputDocuments, other.numInvocations, other.indexTime, other.searchTime, other.indexTotal, other.searchTotal, other.indexFailures, other.searchFailures); + this.expAvgCheckpointDurationMs = other.expAvgCheckpointDurationMs; + this.expAvgDocumentsIndexed = other.expAvgDocumentsIndexed; + this.expAvgDocumentsProcessed = other.expAvgDocumentsProcessed; } public DataFrameIndexerTransformStats(StreamInput in) throws IOException { @@ -79,6 +111,11 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats { if (in.getVersion().before(Version.V_7_4_0)) { in.readString(); // was transformId } + if (in.getVersion().onOrAfter(Version.V_7_4_0)) { + this.expAvgCheckpointDurationMs = in.readDouble(); + this.expAvgDocumentsIndexed = in.readDouble(); + this.expAvgDocumentsProcessed = in.readDouble(); + } } @Override @@ -87,6 +124,11 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats { if (out.getVersion().before(Version.V_7_4_0)) { out.writeString(DEFAULT_TRANSFORM_ID); } + if (out.getVersion().onOrAfter(Version.V_7_4_0)) { + out.writeDouble(this.expAvgCheckpointDurationMs); + out.writeDouble(this.expAvgDocumentsIndexed); + out.writeDouble(this.expAvgDocumentsProcessed); + } } @Override @@ -102,23 +144,43 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats { builder.field(SEARCH_TIME_IN_MS.getPreferredName(), searchTime); builder.field(SEARCH_TOTAL.getPreferredName(), searchTotal); builder.field(SEARCH_FAILURES.getPreferredName(), searchFailures); + builder.field(EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(), this.expAvgCheckpointDurationMs); + builder.field(EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(), this.expAvgDocumentsIndexed); + builder.field(EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(), this.expAvgDocumentsProcessed); builder.endObject(); return builder; } - public DataFrameIndexerTransformStats merge(DataFrameIndexerTransformStats other) { - numPages += other.numPages; - numInputDocuments += other.numInputDocuments; - numOuputDocuments += other.numOuputDocuments; - numInvocations += other.numInvocations; - indexTime += other.indexTime; - searchTime += other.searchTime; - indexTotal += other.indexTotal; - searchTotal += other.searchTotal; - indexFailures += other.indexFailures; - searchFailures += other.searchFailures; + public double getExpAvgCheckpointDurationMs() { + return expAvgCheckpointDurationMs; + } - return this; + public double getExpAvgDocumentsIndexed() { + return expAvgDocumentsIndexed; + } + + public double getExpAvgDocumentsProcessed() { + return expAvgDocumentsProcessed; + } + + public void incrementCheckpointExponentialAverages(long checkpointDurationMs, long docsIndexed, long docsProcessed) { + assert checkpointDurationMs >= 0; + assert docsIndexed >= 0; + assert docsProcessed >= 0; + // If all our exp averages are 0.0, just assign the new values. + if (expAvgCheckpointDurationMs == 0.0 && expAvgDocumentsIndexed == 0.0 && expAvgDocumentsProcessed == 0.0) { + expAvgCheckpointDurationMs = checkpointDurationMs; + expAvgDocumentsIndexed = docsIndexed; + expAvgDocumentsProcessed = docsProcessed; + } else { + expAvgCheckpointDurationMs = calculateExpAvg(expAvgCheckpointDurationMs, ALPHA, checkpointDurationMs); + expAvgDocumentsIndexed = calculateExpAvg(expAvgDocumentsIndexed, ALPHA, docsIndexed); + expAvgDocumentsProcessed = calculateExpAvg(expAvgDocumentsProcessed, ALPHA, docsProcessed); + } + } + + private double calculateExpAvg(double previousExpValue, double alpha, long observedValue) { + return alpha * observedValue + (1-alpha) * previousExpValue; } @Override @@ -142,13 +204,17 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats { && Objects.equals(this.indexFailures, that.indexFailures) && Objects.equals(this.searchFailures, that.searchFailures) && Objects.equals(this.indexTotal, that.indexTotal) - && Objects.equals(this.searchTotal, that.searchTotal); + && Objects.equals(this.searchTotal, that.searchTotal) + && Objects.equals(this.expAvgCheckpointDurationMs, that.expAvgCheckpointDurationMs) + && Objects.equals(this.expAvgDocumentsIndexed, that.expAvgDocumentsIndexed) + && Objects.equals(this.expAvgDocumentsProcessed, that.expAvgDocumentsProcessed); } @Override public int hashCode() { return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations, - indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal); + indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal, + expAvgCheckpointDurationMs, expAvgDocumentsIndexed, expAvgDocumentsProcessed); } public static DataFrameIndexerTransformStats fromXContent(XContentParser parser) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointingInfo.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointingInfo.java index 4e33c178649..004f89f977d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointingInfo.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointingInfo.java @@ -6,17 +6,21 @@ package org.elasticsearch.xpack.core.dataframe.transforms; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.common.time.TimeUtils; import java.io.IOException; +import java.time.Instant; import java.util.Objects; /** @@ -28,27 +32,33 @@ import java.util.Objects; public class DataFrameTransformCheckpointingInfo implements Writeable, ToXContentObject { public static final DataFrameTransformCheckpointingInfo EMPTY = new DataFrameTransformCheckpointingInfo( - DataFrameTransformCheckpointStats.EMPTY, - DataFrameTransformCheckpointStats.EMPTY, - 0L); + DataFrameTransformCheckpointStats.EMPTY, + DataFrameTransformCheckpointStats.EMPTY, + 0L, + null); public static final ParseField LAST_CHECKPOINT = new ParseField("last"); public static final ParseField NEXT_CHECKPOINT = new ParseField("next"); public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind"); - + public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at"); private final DataFrameTransformCheckpointStats last; private final DataFrameTransformCheckpointStats next; private final long operationsBehind; + private Instant changesLastDetectedAt; private static final ConstructingObjectParser LENIENT_PARSER = new ConstructingObjectParser<>( - "data_frame_transform_checkpointing_info", true, a -> { + "data_frame_transform_checkpointing_info", + true, + a -> { long behind = a[2] == null ? 0L : (Long) a[2]; - + Instant changesLastDetectedAt = (Instant)a[3]; return new DataFrameTransformCheckpointingInfo( - a[0] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[0], - a[1] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[1], behind); - }); + a[0] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[0], + a[1] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[1], + behind, + changesLastDetectedAt); + }); static { LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), @@ -56,6 +66,10 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), DataFrameTransformCheckpointStats.LENIENT_PARSER::apply, NEXT_CHECKPOINT); LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND); + LENIENT_PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), + p -> TimeUtils.parseTimeFieldToInstant(p, CHANGES_LAST_DETECTED_AT.getPreferredName()), + CHANGES_LAST_DETECTED_AT, + ObjectParser.ValueType.VALUE); } /** @@ -65,18 +79,31 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten * @param last stats of the last checkpoint * @param next stats of the next checkpoint * @param operationsBehind counter of operations the current checkpoint is behind source + * @param changesLastDetectedAt the last time the source indices were checked for changes */ - public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats last, DataFrameTransformCheckpointStats next, - long operationsBehind) { + public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats last, + DataFrameTransformCheckpointStats next, + long operationsBehind, + Instant changesLastDetectedAt) { this.last = Objects.requireNonNull(last); this.next = Objects.requireNonNull(next); this.operationsBehind = operationsBehind; + this.changesLastDetectedAt = changesLastDetectedAt == null ? null : Instant.ofEpochMilli(changesLastDetectedAt.toEpochMilli()); + } + + public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats last, + DataFrameTransformCheckpointStats next, + long operationsBehind) { + this(last, next, operationsBehind, null); } public DataFrameTransformCheckpointingInfo(StreamInput in) throws IOException { last = new DataFrameTransformCheckpointStats(in); next = new DataFrameTransformCheckpointStats(in); operationsBehind = in.readLong(); + if (in.getVersion().onOrAfter(Version.V_7_4_0)) { + changesLastDetectedAt = in.readOptionalInstant(); + } } public DataFrameTransformCheckpointStats getLast() { @@ -91,6 +118,15 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten return operationsBehind; } + public Instant getChangesLastDetectedAt() { + return changesLastDetectedAt; + } + + public DataFrameTransformCheckpointingInfo setChangesLastDetectedAt(Instant changesLastDetectedAt) { + this.changesLastDetectedAt = Instant.ofEpochMilli(Objects.requireNonNull(changesLastDetectedAt).toEpochMilli()); + return this; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -99,6 +135,11 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten builder.field(NEXT_CHECKPOINT.getPreferredName(), next); } builder.field(OPERATIONS_BEHIND.getPreferredName(), operationsBehind); + if (changesLastDetectedAt != null) { + builder.timeField(CHANGES_LAST_DETECTED_AT.getPreferredName(), + CHANGES_LAST_DETECTED_AT.getPreferredName() + "_string", + changesLastDetectedAt.toEpochMilli()); + } builder.endObject(); return builder; } @@ -108,6 +149,9 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten last.writeTo(out); next.writeTo(out); out.writeLong(operationsBehind); + if (out.getVersion().onOrAfter(Version.V_7_4_0)) { + out.writeOptionalInstant(changesLastDetectedAt); + } } public static DataFrameTransformCheckpointingInfo fromXContent(XContentParser p) { @@ -116,7 +160,7 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten @Override public int hashCode() { - return Objects.hash(last, next, operationsBehind); + return Objects.hash(last, next, operationsBehind, changesLastDetectedAt); } @Override @@ -132,8 +176,9 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten DataFrameTransformCheckpointingInfo that = (DataFrameTransformCheckpointingInfo) other; return Objects.equals(this.last, that.last) && - Objects.equals(this.next, that.next) && - this.operationsBehind == that.operationsBehind; + Objects.equals(this.next, that.next) && + this.operationsBehind == that.operationsBehind && + Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgress.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgress.java index 0741be296ed..d7e409f4a52 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgress.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgress.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.dataframe.transforms; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -18,79 +19,110 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; import java.util.Objects; -import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; public class DataFrameTransformProgress implements Writeable, ToXContentObject { public static final ParseField TOTAL_DOCS = new ParseField("total_docs"); public static final ParseField DOCS_REMAINING = new ParseField("docs_remaining"); + public static final ParseField DOCS_PROCESSED = new ParseField("docs_processed"); + public static final ParseField DOCS_INDEXED = new ParseField("docs_indexed"); public static final String PERCENT_COMPLETE = "percent_complete"; public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "data_frame_transform_progress", true, - a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1])); + a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1], (Long)a[2], (Long)a[3])); static { - PARSER.declareLong(constructorArg(), TOTAL_DOCS); + PARSER.declareLong(optionalConstructorArg(), TOTAL_DOCS); PARSER.declareLong(optionalConstructorArg(), DOCS_REMAINING); + PARSER.declareLong(optionalConstructorArg(), DOCS_PROCESSED); + PARSER.declareLong(optionalConstructorArg(), DOCS_INDEXED); } - private final long totalDocs; - private long remainingDocs; + private final Long totalDocs; + private long documentsProcessed; + private long documentsIndexed; - public DataFrameTransformProgress(long totalDocs, Long remainingDocs) { - if (totalDocs < 0) { + public DataFrameTransformProgress() { + this(null, 0L, 0L); + } + + // If we are reading from an old document we need to convert docsRemaining to docsProcessed + public DataFrameTransformProgress(Long totalDocs, Long docsRemaining, Long documentsProcessed, Long documentsIndexed) { + this(totalDocs, + documentsProcessed != null ? + documentsProcessed : + docsRemaining != null && totalDocs != null ? totalDocs - docsRemaining : 0L, + documentsIndexed); + } + + public DataFrameTransformProgress(Long totalDocs, Long documentsProcessed, Long documentsIndexed) { + if (totalDocs != null && totalDocs < 0) { throw new IllegalArgumentException("[total_docs] must be >0."); } this.totalDocs = totalDocs; - if (remainingDocs != null && remainingDocs < 0) { - throw new IllegalArgumentException("[docs_remaining] must be >0."); + if (documentsProcessed != null && documentsProcessed < 0) { + throw new IllegalArgumentException("[docs_processed] must be >0."); } - this.remainingDocs = remainingDocs == null ? totalDocs : remainingDocs; + this.documentsProcessed = documentsProcessed == null ? 0 : documentsProcessed; + if (documentsIndexed != null && documentsIndexed < 0) { + throw new IllegalArgumentException("[docs_indexed] must be >0."); + } + this.documentsIndexed = documentsIndexed == null ? 0 : documentsIndexed; } public DataFrameTransformProgress(DataFrameTransformProgress otherProgress) { this.totalDocs = otherProgress.totalDocs; - this.remainingDocs = otherProgress.remainingDocs; + this.documentsProcessed = otherProgress.documentsProcessed; + this.documentsIndexed = otherProgress.documentsIndexed; } public DataFrameTransformProgress(StreamInput in) throws IOException { - this.totalDocs = in.readLong(); - this.remainingDocs = in.readLong(); + if (in.getVersion().onOrAfter(Version.V_7_4_0)) { + this.totalDocs = in.readOptionalLong(); + this.documentsProcessed = in.readVLong(); + this.documentsIndexed = in.readVLong(); + } else { + this.totalDocs = in.readLong(); + long remainingDocs = in.readLong(); + this.documentsProcessed = this.totalDocs - remainingDocs; + // was not previously tracked + this.documentsIndexed = 0; + } } public Double getPercentComplete() { - if (totalDocs == 0) { + if (totalDocs == null) { + return null; + } + if (documentsProcessed >= totalDocs) { return 100.0; } - long docsRead = totalDocs - remainingDocs; - if (docsRead < 0) { - return 100.0; - } - return 100.0*(double)docsRead/totalDocs; + return 100.0*(double)documentsProcessed/totalDocs; } - public long getTotalDocs() { + public Long getTotalDocs() { return totalDocs; } - public long getRemainingDocs() { - return remainingDocs; - } - - public void resetRemainingDocs() { - this.remainingDocs = totalDocs; - } - - public void docsProcessed(long docsProcessed) { + public void incrementDocsProcessed(long docsProcessed) { assert docsProcessed >= 0; - if (docsProcessed > remainingDocs) { - remainingDocs = 0; - } else { - remainingDocs -= docsProcessed; - } + this.documentsProcessed += docsProcessed; + } + + public void incrementDocsIndexed(long documentsIndexed) { + assert documentsIndexed >= 0; + this.documentsIndexed += documentsIndexed; + } + + public long getDocumentsProcessed() { + return documentsProcessed; + } + + public long getDocumentsIndexed() { + return documentsIndexed; } @Override @@ -104,26 +136,42 @@ public class DataFrameTransformProgress implements Writeable, ToXContentObject { } DataFrameTransformProgress that = (DataFrameTransformProgress) other; - return Objects.equals(this.remainingDocs, that.remainingDocs) && Objects.equals(this.totalDocs, that.totalDocs); + return Objects.equals(this.documentsIndexed, that.documentsIndexed) + && Objects.equals(this.totalDocs, that.totalDocs) + && Objects.equals(this.documentsProcessed, that.documentsProcessed); } @Override public int hashCode(){ - return Objects.hash(remainingDocs, totalDocs); + return Objects.hash(documentsProcessed, documentsIndexed, totalDocs); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeLong(totalDocs); - out.writeLong(remainingDocs); + if (out.getVersion().onOrAfter(Version.V_7_4_0)) { + out.writeOptionalLong(totalDocs); + out.writeVLong(documentsProcessed); + out.writeVLong(documentsIndexed); + } else { + // What if our total docs number is `null` because we are in a continuous checkpoint, but are serializing to an old version? + // totalDocs was always incorrect in past versions when in a continuous checkpoint. So, just write 0 + // which will imply documentsRemaining == 0. + long unboxedTotalDocs = totalDocs == null ? 0 : totalDocs; + out.writeLong(unboxedTotalDocs); + out.writeLong(unboxedTotalDocs < documentsProcessed ? 0 : unboxedTotalDocs - documentsProcessed); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(TOTAL_DOCS.getPreferredName(), totalDocs); - builder.field(DOCS_REMAINING.getPreferredName(), remainingDocs); - builder.field(PERCENT_COMPLETE, getPercentComplete()); + if (totalDocs != null) { + builder.field(DOCS_REMAINING.getPreferredName(), documentsProcessed > totalDocs ? 0 : totalDocs - documentsProcessed); + builder.field(TOTAL_DOCS.getPreferredName(), totalDocs); + builder.field(PERCENT_COMPLETE, getPercentComplete()); + } + builder.field(DOCS_INDEXED.getPreferredName(), documentsIndexed); + builder.field(DOCS_PROCESSED.getPreferredName(), documentsProcessed); builder.endObject(); return builder; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameIndexerTransformStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameIndexerTransformStatsTests.java index 057e41e2354..3313ae8d145 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameIndexerTransformStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameIndexerTransformStatsTests.java @@ -10,7 +10,8 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; -import java.io.IOException; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; public class DataFrameIndexerTransformStatsTests extends AbstractSerializingTestCase { @@ -33,23 +34,29 @@ public class DataFrameIndexerTransformStatsTests extends AbstractSerializingTest return new DataFrameIndexerTransformStats(randomLongBetween(10L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), - randomLongBetween(0L, 10000L)); + randomLongBetween(0L, 10000L), + randomBoolean() ? randomDouble() : null, + randomBoolean() ? randomDouble() : null, + randomBoolean() ? randomDouble() : null); } - public void testMerge() throws IOException { - DataFrameIndexerTransformStats emptyStats = new DataFrameIndexerTransformStats(); - DataFrameIndexerTransformStats randomStats = randomStats(); + public void testExpAvgIncrement() { + DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats(); - assertEquals(randomStats, emptyStats.merge(randomStats)); - assertEquals(randomStats, randomStats.merge(emptyStats)); + assertThat(stats.getExpAvgCheckpointDurationMs(), equalTo(0.0)); + assertThat(stats.getExpAvgDocumentsIndexed(), equalTo(0.0)); + assertThat(stats.getExpAvgDocumentsProcessed(), equalTo(0.0)); - DataFrameIndexerTransformStats randomStatsClone = copyInstance(randomStats); + stats.incrementCheckpointExponentialAverages(100, 20, 50); - DataFrameIndexerTransformStats tripleRandomStats = new DataFrameIndexerTransformStats(3 * randomStats.getNumPages(), - 3 * randomStats.getNumDocuments(), 3 * randomStats.getOutputDocuments(), 3 * randomStats.getNumInvocations(), - 3 * randomStats.getIndexTime(), 3 * randomStats.getSearchTime(), 3 * randomStats.getIndexTotal(), - 3 * randomStats.getSearchTotal(), 3 * randomStats.getIndexFailures(), 3 * randomStats.getSearchFailures()); + assertThat(stats.getExpAvgCheckpointDurationMs(), equalTo(100.0)); + assertThat(stats.getExpAvgDocumentsIndexed(), equalTo(20.0)); + assertThat(stats.getExpAvgDocumentsProcessed(), equalTo(50.0)); - assertEquals(tripleRandomStats, randomStats.merge(randomStatsClone).merge(randomStatsClone)); + stats.incrementCheckpointExponentialAverages(150, 23, 100); + + assertThat(stats.getExpAvgCheckpointDurationMs(), closeTo(109.090909, 0.0000001)); + assertThat(stats.getExpAvgDocumentsIndexed(), closeTo(20.54545454, 0.0000001)); + assertThat(stats.getExpAvgDocumentsProcessed(), closeTo(59.0909090, 0.0000001)); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointingInfoTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointingInfoTests.java index b1c28073c75..18ccc142bd6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointingInfoTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointingInfoTests.java @@ -6,16 +6,23 @@ package org.elasticsearch.xpack.core.dataframe.transforms; +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; +import java.time.Instant; public class DataFrameTransformCheckpointingInfoTests extends AbstractSerializingDataFrameTestCase { public static DataFrameTransformCheckpointingInfo randomDataFrameTransformCheckpointingInfo() { - return new DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(), - DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(), randomNonNegativeLong()); + return new DataFrameTransformCheckpointingInfo( + DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(), + DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(), + randomNonNegativeLong(), + randomBoolean() ? null : Instant.ofEpochMilli(randomLongBetween(1, 100000))); } @Override @@ -32,4 +39,22 @@ public class DataFrameTransformCheckpointingInfoTests extends AbstractSerializin protected Reader instanceReader() { return DataFrameTransformCheckpointingInfo::new; } + + public void testBackwardsSerialization() throws IOException { + DataFrameTransformCheckpointingInfo checkpointingInfo = new DataFrameTransformCheckpointingInfo( + DataFrameTransformCheckpointStats.EMPTY, + DataFrameTransformCheckpointStats.EMPTY, + randomNonNegativeLong(), + // changesLastDetectedAt is not serialized to past values, so when it is pulled back in, it will be null + null); + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.setVersion(Version.V_7_4_0); + checkpointingInfo.writeTo(output); + try (StreamInput in = output.bytes().streamInput()) { + in.setVersion(Version.V_7_4_0); + DataFrameTransformCheckpointingInfo streamedCheckpointingInfo = new DataFrameTransformCheckpointingInfo(in); + assertEquals(checkpointingInfo, streamedCheckpointingInfo); + } + } + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgressTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgressTests.java index 1d4c308e86b..ccd61a6dd44 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgressTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgressTests.java @@ -6,19 +6,26 @@ package org.elasticsearch.xpack.core.dataframe.transforms; +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; public class DataFrameTransformProgressTests extends AbstractSerializingDataFrameTestCase { public static DataFrameTransformProgress randomDataFrameTransformProgress() { - long totalDocs = randomNonNegativeLong(); - return new DataFrameTransformProgress(totalDocs, randomBoolean() ? null : randomLongBetween(0, totalDocs)); + return new DataFrameTransformProgress( + randomBoolean() ? null : randomLongBetween(0, 10000), + randomBoolean() ? null : randomLongBetween(0, 10000), + randomBoolean() ? null : randomLongBetween(1, 10000)); } @Override @@ -37,29 +44,63 @@ public class DataFrameTransformProgressTests extends AbstractSerializingDataFram } public void testPercentComplete() { - DataFrameTransformProgress progress = new DataFrameTransformProgress(0L, 100L); + DataFrameTransformProgress progress = new DataFrameTransformProgress(0L, 100L, null); assertThat(progress.getPercentComplete(), equalTo(100.0)); - progress = new DataFrameTransformProgress(100L, 0L); - assertThat(progress.getPercentComplete(), equalTo(100.0)); - - progress = new DataFrameTransformProgress(100L, 10000L); - assertThat(progress.getPercentComplete(), equalTo(100.0)); - - progress = new DataFrameTransformProgress(100L, null); + progress = new DataFrameTransformProgress(100L, 0L, null); assertThat(progress.getPercentComplete(), equalTo(0.0)); - progress = new DataFrameTransformProgress(100L, 50L); + progress = new DataFrameTransformProgress(100L, 10000L, null); + assertThat(progress.getPercentComplete(), equalTo(100.0)); + + progress = new DataFrameTransformProgress(100L, null, null); + assertThat(progress.getPercentComplete(), equalTo(0.0)); + + progress = new DataFrameTransformProgress(100L, 50L, null); assertThat(progress.getPercentComplete(), closeTo(50.0, 0.000001)); + + progress = new DataFrameTransformProgress(null, 50L, 10L); + assertThat(progress.getPercentComplete(), is(nullValue())); } public void testConstructor() { IllegalArgumentException ex = - expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformProgress(-1, null)); + expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformProgress(-1L, null, null)); assertThat(ex.getMessage(), equalTo("[total_docs] must be >0.")); - ex = expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformProgress(1L, -1L)); - assertThat(ex.getMessage(), equalTo("[docs_remaining] must be >0.")); + ex = expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformProgress(1L, -1L, null)); + assertThat(ex.getMessage(), equalTo("[docs_processed] must be >0.")); + + ex = expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformProgress(1L, 1L, -1L)); + assertThat(ex.getMessage(), equalTo("[docs_indexed] must be >0.")); + } + + public void testBackwardsSerialization() throws IOException { + long totalDocs = 10_000; + long processedDocs = randomLongBetween(0, totalDocs); + // documentsIndexed are not in past versions, so it would be zero coming in + DataFrameTransformProgress progress = new DataFrameTransformProgress(totalDocs, processedDocs, 0L); + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.setVersion(Version.V_7_2_0); + progress.writeTo(output); + try (StreamInput in = output.bytes().streamInput()) { + in.setVersion(Version.V_7_2_0); + DataFrameTransformProgress streamedProgress = new DataFrameTransformProgress(in); + assertEquals(progress, streamedProgress); + } + } + + progress = new DataFrameTransformProgress(null, processedDocs, 0L); + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.setVersion(Version.V_7_2_0); + progress.writeTo(output); + try (StreamInput in = output.bytes().streamInput()) { + in.setVersion(Version.V_7_2_0); + DataFrameTransformProgress streamedProgress = new DataFrameTransformProgress(in); + assertEquals(new DataFrameTransformProgress(0L, 0L, 0L), streamedProgress); + } + } + } } diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java index 2476e634f85..3a7809125c7 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java @@ -20,8 +20,10 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.oneOf; public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase { @@ -211,7 +213,7 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase { } @SuppressWarnings("unchecked") - public void testGetProgressResetWithContinuous() throws Exception { + public void testGetStatsWithContinuous() throws Exception { String transformId = "pivot_progress_continuous"; String transformDest = transformId + "_idx"; String transformSrc = "reviews_cont_pivot_test"; @@ -244,18 +246,20 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase { Map stats = entityAsMap(client().performRequest(getRequest)); List> transformsStats = (List>)XContentMapValues.extractValue("transforms", stats); assertEquals(1, transformsStats.size()); - /* TODO progress is now checkpoint progress and it may be that no checkpoint is in progress here - // Verify that the transforms progress + // No continuous checkpoints have been seen and thus all exponential averages should be 0.0 for (Map transformStats : transformsStats) { - Map progress = - (Map)XContentMapValues.extractValue("checkpointing.next.checkpoint_progress", transformStats); - assertThat("total_docs is not 1000", progress.get("total_docs"), equalTo(1000)); - assertThat("docs_remaining is not 0", progress.get("docs_remaining"), equalTo(0)); - assertThat("percent_complete is not 100.0", progress.get("percent_complete"), equalTo(100.0)); + transformStats = (Map)transformStats.get("stats"); + assertThat("exponential_avg_checkpoint_duration_ms is not 0.0", + transformStats.get("exponential_avg_checkpoint_duration_ms"), + equalTo(0.0)); + assertThat("exponential_avg_documents_indexed is not 0.0", + transformStats.get("exponential_avg_documents_indexed"), + equalTo(0.0)); + assertThat("exponential_avg_documents_processed is not 0.0", + transformStats.get("exponential_avg_documents_processed"), + equalTo(0.0)); } - */ - // add more docs to verify total_docs gets updated with continuous int numDocs = 10; final StringBuilder bulk = new StringBuilder(); long now = Instant.now().toEpochMilli() - 1_000; @@ -282,19 +286,27 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase { waitForDataFrameCheckpoint(transformId, 2L); + // We should now have exp avgs since we have processed a continuous checkpoint assertBusy(() -> { Map statsResponse = entityAsMap(client().performRequest(getRequest)); List> contStats = (List>)XContentMapValues.extractValue("transforms", statsResponse); assertEquals(1, contStats.size()); - /* TODO progress is now checkpoint progress and it may be that no checkpoint is in progress here for (Map transformStats : contStats) { - Map progress = - (Map)XContentMapValues.extractValue("checkpointing.next.checkpoint_progress", transformStats); - assertThat("total_docs is not 10", progress.get("total_docs"), equalTo(numDocs)); - assertThat("docs_remaining is not 0", progress.get("docs_remaining"), equalTo(0)); - assertThat("percent_complete is not 100.0", progress.get("percent_complete"), equalTo(100.0)); + Map statsObj = (Map)transformStats.get("stats"); + assertThat("exponential_avg_checkpoint_duration_ms is 0", + (Double)statsObj.get("exponential_avg_checkpoint_duration_ms"), + greaterThan(0.0)); + assertThat("exponential_avg_documents_indexed is 0", + (Double)statsObj.get("exponential_avg_documents_indexed"), + greaterThan(0.0)); + assertThat("exponential_avg_documents_processed is 0", + (Double)statsObj.get("exponential_avg_documents_processed"), + greaterThan(0.0)); + Map checkpointing = (Map)transformStats.get("checkpointing"); + assertThat("changes_last_detected_at is null", + checkpointing.get("changes_last_detected_at"), + is(notNullValue())); } - */ - }, 60, TimeUnit.SECONDS); + }, 120, TimeUnit.SECONDS); } } diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java index b2db7783cfd..2df5963f264 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java @@ -145,7 +145,7 @@ public class DataFrameTransformProgressIT extends ESRestTestCase { TransformProgressGatherer.searchResponseToDataFrameTransformProgressFunction().apply(response); assertThat(progress.getTotalDocs(), equalTo(1000L)); - assertThat(progress.getRemainingDocs(), equalTo(1000L)); + assertThat(progress.getDocumentsProcessed(), equalTo(0L)); assertThat(progress.getPercentComplete(), equalTo(0.0)); @@ -166,7 +166,7 @@ public class DataFrameTransformProgressIT extends ESRestTestCase { progress = TransformProgressGatherer.searchResponseToDataFrameTransformProgressFunction().apply(response); assertThat(progress.getTotalDocs(), equalTo(35L)); - assertThat(progress.getRemainingDocs(), equalTo(35L)); + assertThat(progress.getDocumentsProcessed(), equalTo(0L)); assertThat(progress.getPercentComplete(), equalTo(0.0)); histgramGroupConfig = new GroupConfig(Collections.emptyMap(), @@ -186,7 +186,7 @@ public class DataFrameTransformProgressIT extends ESRestTestCase { progress = TransformProgressGatherer.searchResponseToDataFrameTransformProgressFunction().apply(response); assertThat(progress.getTotalDocs(), equalTo(0L)); - assertThat(progress.getRemainingDocs(), equalTo(0L)); + assertThat(progress.getDocumentsProcessed(), equalTo(0L)); assertThat(progress.getPercentComplete(), equalTo(100.0)); deleteIndex(REVIEWS_INDEX_NAME); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java index 6121b511b39..0307f4458a3 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java @@ -53,6 +53,7 @@ public final class DataFrameInternalIndex { // data types public static final String FLOAT = "float"; + public static final String DOUBLE = "double"; public static final String LONG = "long"; public static final String KEYWORD = "keyword"; @@ -173,6 +174,12 @@ public final class DataFrameInternalIndex { .startObject(DataFrameTransformProgress.PERCENT_COMPLETE) .field(TYPE, FLOAT) .endObject() + .startObject(DataFrameTransformProgress.DOCS_INDEXED.getPreferredName()) + .field(TYPE, LONG) + .endObject() + .startObject(DataFrameTransformProgress.DOCS_PROCESSED.getPreferredName()) + .field(TYPE, LONG) + .endObject() .endObject() .endObject() .endObject() @@ -209,6 +216,15 @@ public final class DataFrameInternalIndex { .startObject(DataFrameIndexerTransformStats.INDEX_FAILURES.getPreferredName()) .field(TYPE, LONG) .endObject() + .startObject(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName()) + .field(TYPE, DOUBLE) + .endObject() + .startObject(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName()) + .field(TYPE, DOUBLE) + .endObject() + .startObject(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName()) + .field(TYPE, DOUBLE) + .endObject() .endObject() .endObject() // This is obsolete and can be removed for future versions of the index, but is left here as a warning/reminder that diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index f9c6cac9490..861855abbad 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -226,7 +226,8 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer indexer = new SetOnce<>(); @@ -197,7 +200,16 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S indexer.getNextCheckpoint(), indexer.getPosition(), indexer.getProgress(), - listener); + ActionListener.wrap( + info -> { + if (changesLastDetectedAt == null) { + listener.onResponse(info); + } else { + listener.onResponse(info.setChangesLastDetectedAt(changesLastDetectedAt)); + } + }, + listener::onFailure + )); } public DataFrameTransformCheckpoint getLastCheckpoint() { @@ -319,6 +331,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S logger.debug("Trigger initial run"); getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis()); } else if (getIndexer().isContinuous() && getIndexer().sourceHasChanged()) { + changesLastDetectedAt = Instant.now(); logger.debug("Source has changed, triggering new indexer run"); getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis()); } @@ -616,6 +629,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S if (initialRun()) { createCheckpoint(ActionListener.wrap(cp -> { nextCheckpoint = cp; + // If nextCheckpoint > 1, this means that we are now on the checkpoint AFTER the batch checkpoint + // Consequently, the idea of percent complete no longer makes sense. + if (nextCheckpoint.getCheckpoint() > 1) { + progress = new DataFrameTransformProgress(null, 0L, 0L); + super.onStart(now, listener); + return; + } TransformProgressGatherer.getInitialProgress(this.client, buildFilterQuery(), getConfig(), ActionListener.wrap( newProgress -> { logger.trace("[{}] reset the progress from [{}] to [{}]", transformId, progress, newProgress); @@ -825,14 +845,29 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S // Reset our failure count as we have finished and may start again with a new checkpoint failureCount.set(0); - // TODO: progress hack to get around bucket_selector filtering out buckets // With bucket_selector we could have read all the buckets and completed the transform // but not "see" all the buckets since they were filtered out. Consequently, progress would // show less than 100% even though we are done. // NOTE: this method is called in the same thread as the processing thread. // Theoretically, there should not be a race condition with updating progress here. - if (progress != null && progress.getRemainingDocs() > 0) { - progress.docsProcessed(progress.getRemainingDocs()); + // NOTE 2: getPercentComplete should only NOT be null on the first (batch) checkpoint + if (progress != null && progress.getPercentComplete() != null && progress.getPercentComplete() < 100.0) { + progress.incrementDocsProcessed(progress.getTotalDocs() - progress.getDocumentsProcessed()); + } + logger.info("Last checkpoint for {} {}", getJobId(), Strings.toString(lastCheckpoint)); + // If the last checkpoint is now greater than 1, that means that we have just processed the first + // continuous checkpoint and should start recording the exponential averages + if (lastCheckpoint != null && lastCheckpoint.getCheckpoint() > 1) { + long docsIndexed = 0; + long docsProcessed = 0; + // This should not happen as we simply create a new one when we reach continuous checkpoints + // but this is a paranoid `null` check + if (progress != null) { + docsIndexed = progress.getDocumentsIndexed(); + docsProcessed = progress.getDocumentsProcessed(); + } + long durationMs = System.currentTimeMillis() - lastCheckpoint.getTimestamp(); + getStats().incrementCheckpointExponentialAverages(durationMs < 0 ? 0 : durationMs, docsIndexed, docsProcessed); } if (shouldAuditOnFinish(checkpoint)) { auditor.info(transformTask.getTransformId(), diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/TransformProgressGatherer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/TransformProgressGatherer.java index 6850f8a5e94..a7c37f7c2e4 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/TransformProgressGatherer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/TransformProgressGatherer.java @@ -73,6 +73,6 @@ public final class TransformProgressGatherer { } public static Function searchResponseToDataFrameTransformProgressFunction() { - return searchResponse -> new DataFrameTransformProgress(searchResponse.getHits().getTotalHits().value, null); + return searchResponse -> new DataFrameTransformProgress(searchResponse.getHits().getTotalHits().value, 0L, 0L); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml index 08ab6ce969c..b4699898d48 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml @@ -246,6 +246,9 @@ teardown: - gte: { transforms.0.stats.search_time_in_ms: 0 } - lte: { transforms.0.stats.search_total: 1 } - match: { transforms.0.stats.search_failures: 0 } + - match: { transforms.0.stats.exponential_avg_checkpoint_duration_ms: 0.0 } + - match: { transforms.0.stats.exponential_avg_documents_indexed: 0.0 } + - match: { transforms.0.stats.exponential_avg_documents_processed: 0.0 } - do: data_frame.stop_data_frame_transform: