[7.x] [ML][Transforms] adjusting stats.progress for cont. transforms (#45361) (#45551)

* [ML][Transforms] adjusting stats.progress for cont. transforms (#45361)

* [ML][Transforms] adjusting stats.progress for cont. transforms

* addressing PR comments

* rename fix

* Adjusting bwc serialization versions
This commit is contained in:
Benjamin Trent 2019-08-14 13:08:27 -05:00 committed by GitHub
parent 4af6d86c01
commit 0c343d8443
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 712 additions and 221 deletions

View File

@ -36,16 +36,16 @@ public abstract class IndexerJobStats {
public static ParseField SEARCH_FAILURES = new ParseField("search_failures");
public static ParseField INDEX_FAILURES = new ParseField("index_failures");
private final long numPages;
private final long numInputDocuments;
private final long numOuputDocuments;
private final long numInvocations;
private final long indexTime;
private final long indexTotal;
private final long searchTime;
private final long searchTotal;
private final long indexFailures;
private final long searchFailures;
protected final long numPages;
protected final long numInputDocuments;
protected final long numOuputDocuments;
protected final long numInvocations;
protected final long indexTime;
protected final long indexTotal;
protected final long searchTime;
protected final long searchTotal;
protected final long indexFailures;
protected final long searchFailures;
public IndexerJobStats(long numPages, long numInputDocuments, long numOutputDocuments, long numInvocations,
long indexTime, long searchTime, long indexTotal, long searchTotal, long indexFailures, long searchFailures) {

View File

@ -20,18 +20,28 @@
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.client.core.IndexerJobStats;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class DataFrameIndexerTransformStats extends IndexerJobStats {
static ParseField EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS = new ParseField("exponential_avg_checkpoint_duration_ms");
static ParseField EXPONENTIAL_AVG_DOCUMENTS_INDEXED = new ParseField("exponential_avg_documents_indexed");
static ParseField EXPONENTIAL_AVG_DOCUMENTS_PROCESSED = new ParseField("exponential_avg_documents_processed");
public static final ConstructingObjectParser<DataFrameIndexerTransformStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
NAME, true, args -> new DataFrameIndexerTransformStats((long) args[0], (long) args[1], (long) args[2],
(long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9]));
NAME,
true,
args -> new DataFrameIndexerTransformStats((long) args[0], (long) args[1], (long) args[2],
(long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9],
(Double) args[10], (Double) args[11], (Double) args[12]));
static {
LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES);
@ -44,16 +54,74 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_FAILURES);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_INDEXED);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_PROCESSED);
}
public static DataFrameIndexerTransformStats fromXContent(XContentParser parser) throws IOException {
return LENIENT_PARSER.parse(parser, null);
}
private final double expAvgCheckpointDurationMs;
private final double expAvgDocumentsIndexed;
private final double expAvgDocumentsProcessed;
public DataFrameIndexerTransformStats(long numPages, long numInputDocuments, long numOuputDocuments,
long numInvocations, long indexTime, long searchTime,
long indexTotal, long searchTotal, long indexFailures, long searchFailures) {
long indexTotal, long searchTotal, long indexFailures, long searchFailures,
Double expAvgCheckpointDurationMs, Double expAvgDocumentsIndexed,
Double expAvgDocumentsProcessed) {
super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime,
indexTotal, searchTotal, indexFailures, searchFailures);
this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs == null ? 0.0 : expAvgCheckpointDurationMs;
this.expAvgDocumentsIndexed = expAvgDocumentsIndexed == null ? 0.0 : expAvgDocumentsIndexed;
this.expAvgDocumentsProcessed = expAvgDocumentsProcessed == null ? 0.0 : expAvgDocumentsProcessed;
}
public double getExpAvgCheckpointDurationMs() {
return expAvgCheckpointDurationMs;
}
public double getExpAvgDocumentsIndexed() {
return expAvgDocumentsIndexed;
}
public double getExpAvgDocumentsProcessed() {
return expAvgDocumentsProcessed;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
DataFrameIndexerTransformStats that = (DataFrameIndexerTransformStats) other;
return Objects.equals(this.numPages, that.numPages)
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations)
&& Objects.equals(this.indexTime, that.indexTime)
&& Objects.equals(this.searchTime, that.searchTime)
&& Objects.equals(this.indexFailures, that.indexFailures)
&& Objects.equals(this.searchFailures, that.searchFailures)
&& Objects.equals(this.indexTotal, that.indexTotal)
&& Objects.equals(this.searchTotal, that.searchTotal)
&& Objects.equals(this.expAvgCheckpointDurationMs, that.expAvgCheckpointDurationMs)
&& Objects.equals(this.expAvgDocumentsIndexed, that.expAvgDocumentsIndexed)
&& Objects.equals(this.expAvgDocumentsProcessed, that.expAvgDocumentsProcessed);
}
@Override
public int hashCode() {
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal,
expAvgCheckpointDurationMs, expAvgDocumentsIndexed, expAvgDocumentsProcessed);
}
}

View File

@ -19,10 +19,14 @@
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.client.dataframe.transforms.util.TimeUtil;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.time.Instant;
import java.util.Objects;
public class DataFrameTransformCheckpointingInfo {
@ -30,19 +34,25 @@ public class DataFrameTransformCheckpointingInfo {
public static final ParseField LAST_CHECKPOINT = new ParseField("last", "current");
public static final ParseField NEXT_CHECKPOINT = new ParseField("next", "in_progress");
public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind");
public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at");
private final DataFrameTransformCheckpointStats last;
private final DataFrameTransformCheckpointStats next;
private final long operationsBehind;
private final Instant changesLastDetectedAt;
private static final ConstructingObjectParser<DataFrameTransformCheckpointingInfo, Void> LENIENT_PARSER =
new ConstructingObjectParser<>(
"data_frame_transform_checkpointing_info", true, a -> {
"data_frame_transform_checkpointing_info",
true,
a -> {
long behind = a[2] == null ? 0L : (Long) a[2];
Instant changesLastDetectedAt = (Instant)a[3];
return new DataFrameTransformCheckpointingInfo(
a[0] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[0],
a[1] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[1], behind);
a[1] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[1],
behind,
changesLastDetectedAt);
});
static {
@ -51,13 +61,20 @@ public class DataFrameTransformCheckpointingInfo {
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), NEXT_CHECKPOINT);
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND);
LENIENT_PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
p -> TimeUtil.parseTimeFieldToInstant(p, CHANGES_LAST_DETECTED_AT.getPreferredName()),
CHANGES_LAST_DETECTED_AT,
ObjectParser.ValueType.VALUE);
}
public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats last, DataFrameTransformCheckpointStats next,
long operationsBehind) {
public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats last,
DataFrameTransformCheckpointStats next,
long operationsBehind,
Instant changesLastDetectedAt) {
this.last = Objects.requireNonNull(last);
this.next = Objects.requireNonNull(next);
this.operationsBehind = operationsBehind;
this.changesLastDetectedAt = changesLastDetectedAt;
}
public DataFrameTransformCheckpointStats getLast() {
@ -72,13 +89,18 @@ public class DataFrameTransformCheckpointingInfo {
return operationsBehind;
}
@Nullable
public Instant getChangesLastDetectedAt() {
return changesLastDetectedAt;
}
public static DataFrameTransformCheckpointingInfo fromXContent(XContentParser p) {
return LENIENT_PARSER.apply(p, null);
}
@Override
public int hashCode() {
return Objects.hash(last, next, operationsBehind);
return Objects.hash(last, next, operationsBehind, changesLastDetectedAt);
}
@Override
@ -95,7 +117,8 @@ public class DataFrameTransformCheckpointingInfo {
return Objects.equals(this.last, that.last) &&
Objects.equals(this.next, that.next) &&
this.operationsBehind == that.operationsBehind;
this.operationsBehind == that.operationsBehind &&
Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt);
}
}

View File

@ -19,13 +19,13 @@
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class DataFrameTransformProgress {
@ -33,44 +33,67 @@ public class DataFrameTransformProgress {
public static final ParseField TOTAL_DOCS = new ParseField("total_docs");
public static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
public static final ParseField PERCENT_COMPLETE = new ParseField("percent_complete");
public static final ParseField DOCS_PROCESSED = new ParseField("docs_processed");
public static final ParseField DOCS_INDEXED = new ParseField("docs_indexed");
public static final ConstructingObjectParser<DataFrameTransformProgress, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_transform_progress",
true,
a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1], (Double)a[2]));
a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1], (Double)a[2], (Long)a[3], (Long)a[4]));
static {
PARSER.declareLong(constructorArg(), TOTAL_DOCS);
PARSER.declareLong(optionalConstructorArg(), TOTAL_DOCS);
PARSER.declareLong(optionalConstructorArg(), DOCS_REMAINING);
PARSER.declareDouble(optionalConstructorArg(), PERCENT_COMPLETE);
PARSER.declareLong(optionalConstructorArg(), DOCS_PROCESSED);
PARSER.declareLong(optionalConstructorArg(), DOCS_INDEXED);
}
public static DataFrameTransformProgress fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
private final long totalDocs;
private final long remainingDocs;
private final double percentComplete;
private final Long totalDocs;
private final Long remainingDocs;
private final Double percentComplete;
private final long documentsProcessed;
private final long documentsIndexed;
public DataFrameTransformProgress(long totalDocs, Long remainingDocs, double percentComplete) {
public DataFrameTransformProgress(Long totalDocs,
Long remainingDocs,
Double percentComplete,
Long documentsProcessed,
Long documentsIndexed) {
this.totalDocs = totalDocs;
this.remainingDocs = remainingDocs == null ? totalDocs : remainingDocs;
this.percentComplete = percentComplete;
this.documentsProcessed = documentsProcessed == null ? 0 : documentsProcessed;
this.documentsIndexed = documentsIndexed == null ? 0 : documentsIndexed;
}
public double getPercentComplete() {
@Nullable
public Double getPercentComplete() {
return percentComplete;
}
public long getTotalDocs() {
@Nullable
public Long getTotalDocs() {
return totalDocs;
}
public long getRemainingDocs() {
@Nullable
public Long getRemainingDocs() {
return remainingDocs;
}
public long getDocumentsProcessed() {
return documentsProcessed;
}
public long getDocumentsIndexed() {
return documentsIndexed;
}
@Override
public boolean equals(Object other) {
if (other == this) {
@ -84,11 +107,13 @@ public class DataFrameTransformProgress {
DataFrameTransformProgress that = (DataFrameTransformProgress) other;
return Objects.equals(this.remainingDocs, that.remainingDocs)
&& Objects.equals(this.totalDocs, that.totalDocs)
&& Objects.equals(this.percentComplete, that.percentComplete);
&& Objects.equals(this.percentComplete, that.percentComplete)
&& Objects.equals(this.documentsIndexed, that.documentsIndexed)
&& Objects.equals(this.documentsProcessed, that.documentsProcessed);
}
@Override
public int hashCode(){
return Objects.hash(remainingDocs, totalDocs, percentComplete);
return Objects.hash(remainingDocs, totalDocs, percentComplete, documentsIndexed, documentsProcessed);
}
}

View File

@ -407,7 +407,20 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
DataFrameTransformStats stats = statsResponse.getTransformsStats().get(0);
assertEquals(DataFrameTransformStats.State.STOPPED, stats.getState());
DataFrameIndexerTransformStats zeroIndexerStats = new DataFrameIndexerTransformStats(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
DataFrameIndexerTransformStats zeroIndexerStats = new DataFrameIndexerTransformStats(
0L,
0L,
0L,
0L,
0L,
0L,
0L,
0L,
0L,
0L,
0.0,
0.0,
0.0);
assertEquals(zeroIndexerStats, stats.getIndexerStats());
// start the transform

View File

@ -42,7 +42,10 @@ public class DataFrameIndexerTransformStatsTests extends ESTestCase {
public static DataFrameIndexerTransformStats randomStats() {
return new DataFrameIndexerTransformStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
}
public static void toXContent(DataFrameIndexerTransformStats stats, XContentBuilder builder) throws IOException {
@ -57,6 +60,12 @@ public class DataFrameIndexerTransformStatsTests extends ESTestCase {
builder.field(IndexerJobStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
builder.field(IndexerJobStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
builder.field(IndexerJobStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
builder.field(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
stats.getExpAvgCheckpointDurationMs());
builder.field(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(),
stats.getExpAvgDocumentsIndexed());
builder.field(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(),
stats.getExpAvgDocumentsProcessed());
builder.endObject();
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.time.Instant;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
@ -41,7 +42,8 @@ public class DataFrameTransformCheckpointingInfoTests extends ESTestCase {
return new DataFrameTransformCheckpointingInfo(
DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
randomLongBetween(0, 10000));
randomLongBetween(0, 10000),
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()));
}
public static void toXContent(DataFrameTransformCheckpointingInfo info, XContentBuilder builder) throws IOException {
@ -55,6 +57,9 @@ public class DataFrameTransformCheckpointingInfoTests extends ESTestCase {
DataFrameTransformCheckpointStatsTests.toXContent(info.getNext(), builder);
}
builder.field(DataFrameTransformCheckpointingInfo.OPERATIONS_BEHIND.getPreferredName(), info.getOperationsBehind());
if (info.getChangesLastDetectedAt() != null) {
builder.field(DataFrameTransformCheckpointingInfo.CHANGES_LAST_DETECTED_AT.getPreferredName(), info.getChangesLastDetectedAt());
}
builder.endObject();
}
}

View File

@ -34,22 +34,31 @@ public class DataFrameTransformProgressTests extends ESTestCase {
DataFrameTransformProgressTests::toXContent,
DataFrameTransformProgress::fromXContent)
.supportsUnknownFields(true)
.randomFieldsExcludeFilter(field -> field.startsWith("state"))
.test();
}
public static DataFrameTransformProgress randomInstance() {
long totalDocs = randomNonNegativeLong();
Long docsRemaining = randomBoolean() ? null : randomLongBetween(0, totalDocs);
double percentComplete = totalDocs == 0 ? 1.0 : docsRemaining == null ? 0.0 : 100.0*(double)(totalDocs - docsRemaining)/totalDocs;
return new DataFrameTransformProgress(totalDocs, docsRemaining, percentComplete);
return new DataFrameTransformProgress(
randomBoolean() ? null : randomNonNegativeLong(),
randomBoolean() ? null : randomNonNegativeLong(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomNonNegativeLong(),
randomBoolean() ? null : randomNonNegativeLong());
}
public static void toXContent(DataFrameTransformProgress progress, XContentBuilder builder) throws IOException {
builder.startObject();
if (progress.getTotalDocs() != null) {
builder.field(DataFrameTransformProgress.TOTAL_DOCS.getPreferredName(), progress.getTotalDocs());
builder.field(DataFrameTransformProgress.DOCS_REMAINING.getPreferredName(), progress.getRemainingDocs());
}
if (progress.getPercentComplete() != null) {
builder.field(DataFrameTransformProgress.PERCENT_COMPLETE.getPreferredName(), progress.getPercentComplete());
}
if (progress.getRemainingDocs() != null) {
builder.field(DataFrameTransformProgress.DOCS_REMAINING.getPreferredName(), progress.getRemainingDocs());
}
builder.field(DataFrameTransformProgress.DOCS_INDEXED.getPreferredName(), progress.getDocumentsIndexed());
builder.field(DataFrameTransformProgress.DOCS_PROCESSED.getPreferredName(), progress.getDocumentsProcessed());
builder.endObject();
}
}

View File

@ -31,9 +31,20 @@ public class DataFrameIndexerTransformStatsTests extends AbstractHlrcXContentTes
public static DataFrameIndexerTransformStats fromHlrc(
org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats instance) {
return new DataFrameIndexerTransformStats(instance.getNumPages(), instance.getNumDocuments(),
instance.getOutputDocuments(), instance.getNumInvocations(), instance.getIndexTime(), instance.getSearchTime(),
instance.getIndexTotal(), instance.getSearchTotal(), instance.getIndexFailures(), instance.getSearchFailures());
return new DataFrameIndexerTransformStats(
instance.getNumPages(),
instance.getNumDocuments(),
instance.getOutputDocuments(),
instance.getNumInvocations(),
instance.getIndexTime(),
instance.getSearchTime(),
instance.getIndexTotal(),
instance.getSearchTotal(),
instance.getIndexFailures(),
instance.getSearchFailures(),
instance.getExpAvgCheckpointDurationMs(),
instance.getExpAvgDocumentsIndexed(),
instance.getExpAvgDocumentsProcessed());
}
@Override
@ -52,7 +63,10 @@ public class DataFrameIndexerTransformStatsTests extends AbstractHlrcXContentTes
return new DataFrameIndexerTransformStats(randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L));
randomLongBetween(0L, 10000L),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
}
@Override

View File

@ -19,14 +19,15 @@
package org.elasticsearch.client.dataframe.transforms.hlrc;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.client.AbstractHlrcXContentTestCase;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
import java.io.IOException;
import java.util.function.Predicate;
public class DataFrameTransformCheckpointStatsTests extends AbstractHlrcXContentTestCase<
import static org.hamcrest.Matchers.equalTo;
public class DataFrameTransformCheckpointStatsTests extends AbstractResponseTestCase<
DataFrameTransformCheckpointStats,
org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats> {
@ -39,18 +40,6 @@ public class DataFrameTransformCheckpointStatsTests extends AbstractHlrcXContent
instance.getTimeUpperBoundMillis());
}
@Override
public org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats doHlrcParseInstance(XContentParser parser)
throws IOException {
return org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats.fromXContent(parser);
}
@Override
public DataFrameTransformCheckpointStats convertHlrcToInternal(
org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats instance) {
return fromHlrc(instance);
}
public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() {
return new DataFrameTransformCheckpointStats(randomLongBetween(1, 1_000_000),
DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(),
@ -59,22 +48,33 @@ public class DataFrameTransformCheckpointStatsTests extends AbstractHlrcXContent
}
@Override
protected DataFrameTransformCheckpointStats createTestInstance() {
return DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats();
protected DataFrameTransformCheckpointStats createServerTestInstance() {
return randomDataFrameTransformCheckpointStats();
}
@Override
protected DataFrameTransformCheckpointStats doParseInstance(XContentParser parser) throws IOException {
return DataFrameTransformCheckpointStats.fromXContent(parser);
protected org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats doParseToClientInstance(XContentParser parser)
throws IOException {
return org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
protected void assertInstances(DataFrameTransformCheckpointStats serverTestInstance,
org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats clientInstance) {
assertThat(serverTestInstance.getCheckpoint(), equalTo(clientInstance.getCheckpoint()));
assertThat(serverTestInstance.getPosition().getBucketsPosition(), equalTo(clientInstance.getPosition().getBucketsPosition()));
assertThat(serverTestInstance.getPosition().getIndexerPosition(), equalTo(clientInstance.getPosition().getIndexerPosition()));
assertThat(serverTestInstance.getTimestampMillis(), equalTo(clientInstance.getTimestampMillis()));
assertThat(serverTestInstance.getTimeUpperBoundMillis(), equalTo(clientInstance.getTimeUpperBoundMillis()));
if (serverTestInstance.getCheckpointProgress() != null) {
assertThat(serverTestInstance.getCheckpointProgress().getDocumentsIndexed(),
equalTo(clientInstance.getCheckpointProgress().getDocumentsIndexed()));
assertThat(serverTestInstance.getCheckpointProgress().getDocumentsProcessed(),
equalTo(clientInstance.getCheckpointProgress().getDocumentsProcessed()));
assertThat(serverTestInstance.getCheckpointProgress().getPercentComplete(),
equalTo(clientInstance.getCheckpointProgress().getPercentComplete()));
assertThat(serverTestInstance.getCheckpointProgress().getTotalDocs(),
equalTo(clientInstance.getCheckpointProgress().getTotalDocs()));
}
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> field.startsWith("position");
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.client.AbstractHlrcXContentTestCase;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
import java.io.IOException;
import java.time.Instant;
import java.util.function.Predicate;
public class DataFrameTransformCheckpointingInfoTests extends AbstractHlrcXContentTestCase<
@ -35,7 +36,8 @@ public class DataFrameTransformCheckpointingInfoTests extends AbstractHlrcXConte
return new DataFrameTransformCheckpointingInfo(
DataFrameTransformCheckpointStatsTests.fromHlrc(instance.getLast()),
DataFrameTransformCheckpointStatsTests.fromHlrc(instance.getNext()),
instance.getOperationsBehind());
instance.getOperationsBehind(),
instance.getChangesLastDetectedAt());
}
@Override
@ -50,8 +52,11 @@ public class DataFrameTransformCheckpointingInfoTests extends AbstractHlrcXConte
}
public static DataFrameTransformCheckpointingInfo randomDataFrameTransformCheckpointingInfo() {
return new DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(), randomNonNegativeLong());
return new DataFrameTransformCheckpointingInfo(
DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
randomNonNegativeLong(),
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()));
}
@Override

View File

@ -34,12 +34,20 @@ public class DataFrameTransformProgressTests extends AbstractResponseTestCase<
if (instance == null) {
return null;
}
return new DataFrameTransformProgress(instance.getTotalDocs(), instance.getRemainingDocs());
return new DataFrameTransformProgress(instance.getTotalDocs(),
instance.getRemainingDocs(),
instance.getDocumentsProcessed(),
instance.getDocumentsIndexed());
}
public static DataFrameTransformProgress randomDataFrameTransformProgress() {
long totalDocs = randomNonNegativeLong();
return new DataFrameTransformProgress(totalDocs, randomBoolean() ? null : randomLongBetween(0, totalDocs));
Long totalDocs = randomBoolean() ? null : randomNonNegativeLong();
Long docsRemaining = totalDocs != null ? randomLongBetween(0, totalDocs) : null;
return new DataFrameTransformProgress(
totalDocs,
docsRemaining,
totalDocs != null ? totalDocs - docsRemaining : randomNonNegativeLong(),
randomBoolean() ? null : randomNonNegativeLong());
}
@Override
@ -56,7 +64,8 @@ public class DataFrameTransformProgressTests extends AbstractResponseTestCase<
protected void assertInstances(DataFrameTransformProgress serverTestInstance,
org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress clientInstance) {
assertThat(serverTestInstance.getTotalDocs(), equalTo(clientInstance.getTotalDocs()));
assertThat(serverTestInstance.getRemainingDocs(), equalTo(clientInstance.getRemainingDocs()));
assertThat(serverTestInstance.getDocumentsProcessed(), equalTo(clientInstance.getDocumentsProcessed()));
assertThat(serverTestInstance.getPercentComplete(), equalTo(clientInstance.getPercentComplete()));
assertThat(serverTestInstance.getDocumentsIndexed(), equalTo(clientInstance.getDocumentsIndexed()));
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStats
import org.elasticsearch.xpack.core.dataframe.transforms.NodeAttributes;
import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;
@ -97,14 +98,19 @@ public class DataFrameTransformStatsTests extends AbstractHlrcXContentTestCase<D
}
public static DataFrameTransformProgress randomDataFrameTransformProgress() {
long totalDocs = randomNonNegativeLong();
Long remainingDocs = randomBoolean() ? null : randomLongBetween(0, totalDocs);
return new DataFrameTransformProgress(totalDocs, remainingDocs);
Long totalDocs = randomBoolean() ? null : randomNonNegativeLong();
Long docsRemaining = totalDocs != null ? randomLongBetween(0, totalDocs) : null;
return new DataFrameTransformProgress(
totalDocs,
docsRemaining,
totalDocs != null ? totalDocs - docsRemaining : randomNonNegativeLong(),
randomBoolean() ? null : randomNonNegativeLong());
}
public static DataFrameTransformCheckpointingInfo randomDataFrameTransformCheckpointingInfo() {
return new DataFrameTransformCheckpointingInfo(randomDataFrameTransformCheckpointStats(),
randomDataFrameTransformCheckpointStats(), randomNonNegativeLong());
randomDataFrameTransformCheckpointStats(), randomNonNegativeLong(),
randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()));
}
public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() {
@ -132,7 +138,10 @@ public class DataFrameTransformStatsTests extends AbstractHlrcXContentTestCase<D
return new DataFrameIndexerTransformStats(randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L));
randomLongBetween(0L, 10000L),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
}
@Override

View File

@ -137,7 +137,10 @@ The API returns the following results:
"index_failures" : 0,
"search_time_in_ms" : 52,
"search_total" : 2,
"search_failures" : 0
"search_failures" : 0,
"exponential_avg_checkpoint_duration_ms" : 77,
"exponential_avg_documents_indexed" : 2,
"exponential_avg_documents_processed" : 12
},
"checkpointing" : {
"last" : {
@ -156,10 +159,10 @@ The API returns the following results:
}
},
"checkpoint_progress" : {
"total_docs" : 1900883,
"docs_remaining" : 1722762,
"percent_complete" : 9.370434687458408
"documents_processed": 100,
"documents_indexed": 1
},
"changes_last_detected_at": 1561740629170,
"timestamp_millis" : 1561740629172,
"time_upper_bound_millis" : 1561740569172
},

View File

@ -19,6 +19,7 @@ import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class DataFrameIndexerTransformStats extends IndexerJobStats {
@ -35,12 +36,23 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
public static ParseField SEARCH_TOTAL = new ParseField("search_total");
public static ParseField SEARCH_FAILURES = new ParseField("search_failures");
public static ParseField INDEX_FAILURES = new ParseField("index_failures");
public static ParseField EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS =
new ParseField("exponential_avg_checkpoint_duration_ms");
public static ParseField EXPONENTIAL_AVG_DOCUMENTS_INDEXED =
new ParseField("exponential_avg_documents_indexed");
public static ParseField EXPONENTIAL_AVG_DOCUMENTS_PROCESSED =
new ParseField("exponential_avg_documents_processed");
// This changes how much "weight" past calculations have.
// The shorter the window, the less "smoothing" will occur.
private static final int EXP_AVG_WINDOW = 10;
private static final double ALPHA = 2.0/(EXP_AVG_WINDOW + 1);
private static final ConstructingObjectParser<DataFrameIndexerTransformStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
NAME, true,
args -> new DataFrameIndexerTransformStats(
(long) args[0], (long) args[1], (long) args[2], (long) args[3], (long) args[4], (long) args[5], (long) args[6],
(long) args[7], (long) args[8], (long) args[9]));
(long) args[7], (long) args[8], (long) args[9], (Double) args[10], (Double) args[11], (Double) args[12]));
static {
LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES);
@ -53,8 +65,14 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_FAILURES);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_INDEXED);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_PROCESSED);
}
private double expAvgCheckpointDurationMs;
private double expAvgDocumentsIndexed;
private double expAvgDocumentsProcessed;
/**
* Create with all stats set to zero
*/
@ -64,14 +82,28 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
public DataFrameIndexerTransformStats(long numPages, long numInputDocuments, long numOutputDocuments,
long numInvocations, long indexTime, long searchTime, long indexTotal, long searchTotal,
long indexFailures, long searchFailures) {
long indexFailures, long searchFailures, Double expAvgCheckpointDurationMs,
Double expAvgDocumentsIndexed, Double expAvgDocumentsProcessed ) {
super(numPages, numInputDocuments, numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal,
indexFailures, searchFailures);
this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs == null ? 0.0 : expAvgCheckpointDurationMs;
this.expAvgDocumentsIndexed = expAvgDocumentsIndexed == null ? 0.0 : expAvgDocumentsIndexed;
this.expAvgDocumentsProcessed = expAvgDocumentsProcessed == null ? 0.0 : expAvgDocumentsProcessed;
}
public DataFrameIndexerTransformStats(long numPages, long numInputDocuments, long numOutputDocuments,
long numInvocations, long indexTime, long searchTime, long indexTotal, long searchTotal,
long indexFailures, long searchFailures) {
this(numPages, numInputDocuments, numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal,
indexFailures, searchFailures, 0.0, 0.0, 0.0);
}
public DataFrameIndexerTransformStats(DataFrameIndexerTransformStats other) {
this(other.numPages, other.numInputDocuments, other.numOuputDocuments, other.numInvocations,
other.indexTime, other.searchTime, other.indexTotal, other.searchTotal, other.indexFailures, other.searchFailures);
this.expAvgCheckpointDurationMs = other.expAvgCheckpointDurationMs;
this.expAvgDocumentsIndexed = other.expAvgDocumentsIndexed;
this.expAvgDocumentsProcessed = other.expAvgDocumentsProcessed;
}
public DataFrameIndexerTransformStats(StreamInput in) throws IOException {
@ -79,6 +111,11 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
if (in.getVersion().before(Version.V_7_4_0)) {
in.readString(); // was transformId
}
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
this.expAvgCheckpointDurationMs = in.readDouble();
this.expAvgDocumentsIndexed = in.readDouble();
this.expAvgDocumentsProcessed = in.readDouble();
}
}
@Override
@ -87,6 +124,11 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
if (out.getVersion().before(Version.V_7_4_0)) {
out.writeString(DEFAULT_TRANSFORM_ID);
}
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeDouble(this.expAvgCheckpointDurationMs);
out.writeDouble(this.expAvgDocumentsIndexed);
out.writeDouble(this.expAvgDocumentsProcessed);
}
}
@Override
@ -102,23 +144,43 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
builder.field(SEARCH_TIME_IN_MS.getPreferredName(), searchTime);
builder.field(SEARCH_TOTAL.getPreferredName(), searchTotal);
builder.field(SEARCH_FAILURES.getPreferredName(), searchFailures);
builder.field(EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(), this.expAvgCheckpointDurationMs);
builder.field(EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(), this.expAvgDocumentsIndexed);
builder.field(EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(), this.expAvgDocumentsProcessed);
builder.endObject();
return builder;
}
public DataFrameIndexerTransformStats merge(DataFrameIndexerTransformStats other) {
numPages += other.numPages;
numInputDocuments += other.numInputDocuments;
numOuputDocuments += other.numOuputDocuments;
numInvocations += other.numInvocations;
indexTime += other.indexTime;
searchTime += other.searchTime;
indexTotal += other.indexTotal;
searchTotal += other.searchTotal;
indexFailures += other.indexFailures;
searchFailures += other.searchFailures;
public double getExpAvgCheckpointDurationMs() {
return expAvgCheckpointDurationMs;
}
return this;
public double getExpAvgDocumentsIndexed() {
return expAvgDocumentsIndexed;
}
public double getExpAvgDocumentsProcessed() {
return expAvgDocumentsProcessed;
}
public void incrementCheckpointExponentialAverages(long checkpointDurationMs, long docsIndexed, long docsProcessed) {
assert checkpointDurationMs >= 0;
assert docsIndexed >= 0;
assert docsProcessed >= 0;
// If all our exp averages are 0.0, just assign the new values.
if (expAvgCheckpointDurationMs == 0.0 && expAvgDocumentsIndexed == 0.0 && expAvgDocumentsProcessed == 0.0) {
expAvgCheckpointDurationMs = checkpointDurationMs;
expAvgDocumentsIndexed = docsIndexed;
expAvgDocumentsProcessed = docsProcessed;
} else {
expAvgCheckpointDurationMs = calculateExpAvg(expAvgCheckpointDurationMs, ALPHA, checkpointDurationMs);
expAvgDocumentsIndexed = calculateExpAvg(expAvgDocumentsIndexed, ALPHA, docsIndexed);
expAvgDocumentsProcessed = calculateExpAvg(expAvgDocumentsProcessed, ALPHA, docsProcessed);
}
}
private double calculateExpAvg(double previousExpValue, double alpha, long observedValue) {
return alpha * observedValue + (1-alpha) * previousExpValue;
}
@Override
@ -142,13 +204,17 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
&& Objects.equals(this.indexFailures, that.indexFailures)
&& Objects.equals(this.searchFailures, that.searchFailures)
&& Objects.equals(this.indexTotal, that.indexTotal)
&& Objects.equals(this.searchTotal, that.searchTotal);
&& Objects.equals(this.searchTotal, that.searchTotal)
&& Objects.equals(this.expAvgCheckpointDurationMs, that.expAvgCheckpointDurationMs)
&& Objects.equals(this.expAvgDocumentsIndexed, that.expAvgDocumentsIndexed)
&& Objects.equals(this.expAvgDocumentsProcessed, that.expAvgDocumentsProcessed);
}
@Override
public int hashCode() {
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal);
indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal,
expAvgCheckpointDurationMs, expAvgDocumentsIndexed, expAvgDocumentsProcessed);
}
public static DataFrameIndexerTransformStats fromXContent(XContentParser parser) {

View File

@ -6,17 +6,21 @@
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import java.io.IOException;
import java.time.Instant;
import java.util.Objects;
/**
@ -30,24 +34,30 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten
public static final DataFrameTransformCheckpointingInfo EMPTY = new DataFrameTransformCheckpointingInfo(
DataFrameTransformCheckpointStats.EMPTY,
DataFrameTransformCheckpointStats.EMPTY,
0L);
0L,
null);
public static final ParseField LAST_CHECKPOINT = new ParseField("last");
public static final ParseField NEXT_CHECKPOINT = new ParseField("next");
public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind");
public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at");
private final DataFrameTransformCheckpointStats last;
private final DataFrameTransformCheckpointStats next;
private final long operationsBehind;
private Instant changesLastDetectedAt;
private static final ConstructingObjectParser<DataFrameTransformCheckpointingInfo, Void> LENIENT_PARSER =
new ConstructingObjectParser<>(
"data_frame_transform_checkpointing_info", true, a -> {
"data_frame_transform_checkpointing_info",
true,
a -> {
long behind = a[2] == null ? 0L : (Long) a[2];
Instant changesLastDetectedAt = (Instant)a[3];
return new DataFrameTransformCheckpointingInfo(
a[0] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[0],
a[1] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[1], behind);
a[1] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[1],
behind,
changesLastDetectedAt);
});
static {
@ -56,6 +66,10 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
DataFrameTransformCheckpointStats.LENIENT_PARSER::apply, NEXT_CHECKPOINT);
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND);
LENIENT_PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
p -> TimeUtils.parseTimeFieldToInstant(p, CHANGES_LAST_DETECTED_AT.getPreferredName()),
CHANGES_LAST_DETECTED_AT,
ObjectParser.ValueType.VALUE);
}
/**
@ -65,18 +79,31 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten
* @param last stats of the last checkpoint
* @param next stats of the next checkpoint
* @param operationsBehind counter of operations the current checkpoint is behind source
* @param changesLastDetectedAt the last time the source indices were checked for changes
*/
public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats last, DataFrameTransformCheckpointStats next,
long operationsBehind) {
public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats last,
DataFrameTransformCheckpointStats next,
long operationsBehind,
Instant changesLastDetectedAt) {
this.last = Objects.requireNonNull(last);
this.next = Objects.requireNonNull(next);
this.operationsBehind = operationsBehind;
this.changesLastDetectedAt = changesLastDetectedAt == null ? null : Instant.ofEpochMilli(changesLastDetectedAt.toEpochMilli());
}
public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats last,
DataFrameTransformCheckpointStats next,
long operationsBehind) {
this(last, next, operationsBehind, null);
}
public DataFrameTransformCheckpointingInfo(StreamInput in) throws IOException {
last = new DataFrameTransformCheckpointStats(in);
next = new DataFrameTransformCheckpointStats(in);
operationsBehind = in.readLong();
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
changesLastDetectedAt = in.readOptionalInstant();
}
}
public DataFrameTransformCheckpointStats getLast() {
@ -91,6 +118,15 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten
return operationsBehind;
}
public Instant getChangesLastDetectedAt() {
return changesLastDetectedAt;
}
public DataFrameTransformCheckpointingInfo setChangesLastDetectedAt(Instant changesLastDetectedAt) {
this.changesLastDetectedAt = Instant.ofEpochMilli(Objects.requireNonNull(changesLastDetectedAt).toEpochMilli());
return this;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -99,6 +135,11 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten
builder.field(NEXT_CHECKPOINT.getPreferredName(), next);
}
builder.field(OPERATIONS_BEHIND.getPreferredName(), operationsBehind);
if (changesLastDetectedAt != null) {
builder.timeField(CHANGES_LAST_DETECTED_AT.getPreferredName(),
CHANGES_LAST_DETECTED_AT.getPreferredName() + "_string",
changesLastDetectedAt.toEpochMilli());
}
builder.endObject();
return builder;
}
@ -108,6 +149,9 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten
last.writeTo(out);
next.writeTo(out);
out.writeLong(operationsBehind);
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeOptionalInstant(changesLastDetectedAt);
}
}
public static DataFrameTransformCheckpointingInfo fromXContent(XContentParser p) {
@ -116,7 +160,7 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten
@Override
public int hashCode() {
return Objects.hash(last, next, operationsBehind);
return Objects.hash(last, next, operationsBehind, changesLastDetectedAt);
}
@Override
@ -133,7 +177,8 @@ public class DataFrameTransformCheckpointingInfo implements Writeable, ToXConten
return Objects.equals(this.last, that.last) &&
Objects.equals(this.next, that.next) &&
this.operationsBehind == that.operationsBehind;
this.operationsBehind == that.operationsBehind &&
Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt);
}
@Override

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -18,79 +19,110 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class DataFrameTransformProgress implements Writeable, ToXContentObject {
public static final ParseField TOTAL_DOCS = new ParseField("total_docs");
public static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
public static final ParseField DOCS_PROCESSED = new ParseField("docs_processed");
public static final ParseField DOCS_INDEXED = new ParseField("docs_indexed");
public static final String PERCENT_COMPLETE = "percent_complete";
public static final ConstructingObjectParser<DataFrameTransformProgress, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_transform_progress",
true,
a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1]));
a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1], (Long)a[2], (Long)a[3]));
static {
PARSER.declareLong(constructorArg(), TOTAL_DOCS);
PARSER.declareLong(optionalConstructorArg(), TOTAL_DOCS);
PARSER.declareLong(optionalConstructorArg(), DOCS_REMAINING);
PARSER.declareLong(optionalConstructorArg(), DOCS_PROCESSED);
PARSER.declareLong(optionalConstructorArg(), DOCS_INDEXED);
}
private final long totalDocs;
private long remainingDocs;
private final Long totalDocs;
private long documentsProcessed;
private long documentsIndexed;
public DataFrameTransformProgress(long totalDocs, Long remainingDocs) {
if (totalDocs < 0) {
public DataFrameTransformProgress() {
this(null, 0L, 0L);
}
// If we are reading from an old document we need to convert docsRemaining to docsProcessed
public DataFrameTransformProgress(Long totalDocs, Long docsRemaining, Long documentsProcessed, Long documentsIndexed) {
this(totalDocs,
documentsProcessed != null ?
documentsProcessed :
docsRemaining != null && totalDocs != null ? totalDocs - docsRemaining : 0L,
documentsIndexed);
}
public DataFrameTransformProgress(Long totalDocs, Long documentsProcessed, Long documentsIndexed) {
if (totalDocs != null && totalDocs < 0) {
throw new IllegalArgumentException("[total_docs] must be >0.");
}
this.totalDocs = totalDocs;
if (remainingDocs != null && remainingDocs < 0) {
throw new IllegalArgumentException("[docs_remaining] must be >0.");
if (documentsProcessed != null && documentsProcessed < 0) {
throw new IllegalArgumentException("[docs_processed] must be >0.");
}
this.remainingDocs = remainingDocs == null ? totalDocs : remainingDocs;
this.documentsProcessed = documentsProcessed == null ? 0 : documentsProcessed;
if (documentsIndexed != null && documentsIndexed < 0) {
throw new IllegalArgumentException("[docs_indexed] must be >0.");
}
this.documentsIndexed = documentsIndexed == null ? 0 : documentsIndexed;
}
public DataFrameTransformProgress(DataFrameTransformProgress otherProgress) {
this.totalDocs = otherProgress.totalDocs;
this.remainingDocs = otherProgress.remainingDocs;
this.documentsProcessed = otherProgress.documentsProcessed;
this.documentsIndexed = otherProgress.documentsIndexed;
}
public DataFrameTransformProgress(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
this.totalDocs = in.readOptionalLong();
this.documentsProcessed = in.readVLong();
this.documentsIndexed = in.readVLong();
} else {
this.totalDocs = in.readLong();
this.remainingDocs = in.readLong();
long remainingDocs = in.readLong();
this.documentsProcessed = this.totalDocs - remainingDocs;
// was not previously tracked
this.documentsIndexed = 0;
}
}
public Double getPercentComplete() {
if (totalDocs == 0) {
if (totalDocs == null) {
return null;
}
if (documentsProcessed >= totalDocs) {
return 100.0;
}
long docsRead = totalDocs - remainingDocs;
if (docsRead < 0) {
return 100.0;
}
return 100.0*(double)docsRead/totalDocs;
return 100.0*(double)documentsProcessed/totalDocs;
}
public long getTotalDocs() {
public Long getTotalDocs() {
return totalDocs;
}
public long getRemainingDocs() {
return remainingDocs;
}
public void resetRemainingDocs() {
this.remainingDocs = totalDocs;
}
public void docsProcessed(long docsProcessed) {
public void incrementDocsProcessed(long docsProcessed) {
assert docsProcessed >= 0;
if (docsProcessed > remainingDocs) {
remainingDocs = 0;
} else {
remainingDocs -= docsProcessed;
this.documentsProcessed += docsProcessed;
}
public void incrementDocsIndexed(long documentsIndexed) {
assert documentsIndexed >= 0;
this.documentsIndexed += documentsIndexed;
}
public long getDocumentsProcessed() {
return documentsProcessed;
}
public long getDocumentsIndexed() {
return documentsIndexed;
}
@Override
@ -104,26 +136,42 @@ public class DataFrameTransformProgress implements Writeable, ToXContentObject {
}
DataFrameTransformProgress that = (DataFrameTransformProgress) other;
return Objects.equals(this.remainingDocs, that.remainingDocs) && Objects.equals(this.totalDocs, that.totalDocs);
return Objects.equals(this.documentsIndexed, that.documentsIndexed)
&& Objects.equals(this.totalDocs, that.totalDocs)
&& Objects.equals(this.documentsProcessed, that.documentsProcessed);
}
@Override
public int hashCode(){
return Objects.hash(remainingDocs, totalDocs);
return Objects.hash(documentsProcessed, documentsIndexed, totalDocs);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(totalDocs);
out.writeLong(remainingDocs);
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeOptionalLong(totalDocs);
out.writeVLong(documentsProcessed);
out.writeVLong(documentsIndexed);
} else {
// What if our total docs number is `null` because we are in a continuous checkpoint, but are serializing to an old version?
// totalDocs was always incorrect in past versions when in a continuous checkpoint. So, just write 0
// which will imply documentsRemaining == 0.
long unboxedTotalDocs = totalDocs == null ? 0 : totalDocs;
out.writeLong(unboxedTotalDocs);
out.writeLong(unboxedTotalDocs < documentsProcessed ? 0 : unboxedTotalDocs - documentsProcessed);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (totalDocs != null) {
builder.field(DOCS_REMAINING.getPreferredName(), documentsProcessed > totalDocs ? 0 : totalDocs - documentsProcessed);
builder.field(TOTAL_DOCS.getPreferredName(), totalDocs);
builder.field(DOCS_REMAINING.getPreferredName(), remainingDocs);
builder.field(PERCENT_COMPLETE, getPercentComplete());
}
builder.field(DOCS_INDEXED.getPreferredName(), documentsIndexed);
builder.field(DOCS_PROCESSED.getPreferredName(), documentsProcessed);
builder.endObject();
return builder;
}

View File

@ -10,7 +10,8 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
public class DataFrameIndexerTransformStatsTests extends AbstractSerializingTestCase<DataFrameIndexerTransformStats> {
@ -33,23 +34,29 @@ public class DataFrameIndexerTransformStatsTests extends AbstractSerializingTest
return new DataFrameIndexerTransformStats(randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L));
randomLongBetween(0L, 10000L),
randomBoolean() ? randomDouble() : null,
randomBoolean() ? randomDouble() : null,
randomBoolean() ? randomDouble() : null);
}
public void testMerge() throws IOException {
DataFrameIndexerTransformStats emptyStats = new DataFrameIndexerTransformStats();
DataFrameIndexerTransformStats randomStats = randomStats();
public void testExpAvgIncrement() {
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
assertEquals(randomStats, emptyStats.merge(randomStats));
assertEquals(randomStats, randomStats.merge(emptyStats));
assertThat(stats.getExpAvgCheckpointDurationMs(), equalTo(0.0));
assertThat(stats.getExpAvgDocumentsIndexed(), equalTo(0.0));
assertThat(stats.getExpAvgDocumentsProcessed(), equalTo(0.0));
DataFrameIndexerTransformStats randomStatsClone = copyInstance(randomStats);
stats.incrementCheckpointExponentialAverages(100, 20, 50);
DataFrameIndexerTransformStats tripleRandomStats = new DataFrameIndexerTransformStats(3 * randomStats.getNumPages(),
3 * randomStats.getNumDocuments(), 3 * randomStats.getOutputDocuments(), 3 * randomStats.getNumInvocations(),
3 * randomStats.getIndexTime(), 3 * randomStats.getSearchTime(), 3 * randomStats.getIndexTotal(),
3 * randomStats.getSearchTotal(), 3 * randomStats.getIndexFailures(), 3 * randomStats.getSearchFailures());
assertThat(stats.getExpAvgCheckpointDurationMs(), equalTo(100.0));
assertThat(stats.getExpAvgDocumentsIndexed(), equalTo(20.0));
assertThat(stats.getExpAvgDocumentsProcessed(), equalTo(50.0));
assertEquals(tripleRandomStats, randomStats.merge(randomStatsClone).merge(randomStatsClone));
stats.incrementCheckpointExponentialAverages(150, 23, 100);
assertThat(stats.getExpAvgCheckpointDurationMs(), closeTo(109.090909, 0.0000001));
assertThat(stats.getExpAvgDocumentsIndexed(), closeTo(20.54545454, 0.0000001));
assertThat(stats.getExpAvgDocumentsProcessed(), closeTo(59.0909090, 0.0000001));
}
}

View File

@ -6,16 +6,23 @@
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.time.Instant;
public class DataFrameTransformCheckpointingInfoTests extends AbstractSerializingDataFrameTestCase<DataFrameTransformCheckpointingInfo> {
public static DataFrameTransformCheckpointingInfo randomDataFrameTransformCheckpointingInfo() {
return new DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(), randomNonNegativeLong());
return new DataFrameTransformCheckpointingInfo(
DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
randomNonNegativeLong(),
randomBoolean() ? null : Instant.ofEpochMilli(randomLongBetween(1, 100000)));
}
@Override
@ -32,4 +39,22 @@ public class DataFrameTransformCheckpointingInfoTests extends AbstractSerializin
protected Reader<DataFrameTransformCheckpointingInfo> instanceReader() {
return DataFrameTransformCheckpointingInfo::new;
}
public void testBackwardsSerialization() throws IOException {
DataFrameTransformCheckpointingInfo checkpointingInfo = new DataFrameTransformCheckpointingInfo(
DataFrameTransformCheckpointStats.EMPTY,
DataFrameTransformCheckpointStats.EMPTY,
randomNonNegativeLong(),
// changesLastDetectedAt is not serialized to past values, so when it is pulled back in, it will be null
null);
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_4_0);
checkpointingInfo.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
in.setVersion(Version.V_7_4_0);
DataFrameTransformCheckpointingInfo streamedCheckpointingInfo = new DataFrameTransformCheckpointingInfo(in);
assertEquals(checkpointingInfo, streamedCheckpointingInfo);
}
}
}
}

View File

@ -6,19 +6,26 @@
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class DataFrameTransformProgressTests extends AbstractSerializingDataFrameTestCase<DataFrameTransformProgress> {
public static DataFrameTransformProgress randomDataFrameTransformProgress() {
long totalDocs = randomNonNegativeLong();
return new DataFrameTransformProgress(totalDocs, randomBoolean() ? null : randomLongBetween(0, totalDocs));
return new DataFrameTransformProgress(
randomBoolean() ? null : randomLongBetween(0, 10000),
randomBoolean() ? null : randomLongBetween(0, 10000),
randomBoolean() ? null : randomLongBetween(1, 10000));
}
@Override
@ -37,29 +44,63 @@ public class DataFrameTransformProgressTests extends AbstractSerializingDataFram
}
public void testPercentComplete() {
DataFrameTransformProgress progress = new DataFrameTransformProgress(0L, 100L);
DataFrameTransformProgress progress = new DataFrameTransformProgress(0L, 100L, null);
assertThat(progress.getPercentComplete(), equalTo(100.0));
progress = new DataFrameTransformProgress(100L, 0L);
assertThat(progress.getPercentComplete(), equalTo(100.0));
progress = new DataFrameTransformProgress(100L, 10000L);
assertThat(progress.getPercentComplete(), equalTo(100.0));
progress = new DataFrameTransformProgress(100L, null);
progress = new DataFrameTransformProgress(100L, 0L, null);
assertThat(progress.getPercentComplete(), equalTo(0.0));
progress = new DataFrameTransformProgress(100L, 50L);
progress = new DataFrameTransformProgress(100L, 10000L, null);
assertThat(progress.getPercentComplete(), equalTo(100.0));
progress = new DataFrameTransformProgress(100L, null, null);
assertThat(progress.getPercentComplete(), equalTo(0.0));
progress = new DataFrameTransformProgress(100L, 50L, null);
assertThat(progress.getPercentComplete(), closeTo(50.0, 0.000001));
progress = new DataFrameTransformProgress(null, 50L, 10L);
assertThat(progress.getPercentComplete(), is(nullValue()));
}
public void testConstructor() {
IllegalArgumentException ex =
expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformProgress(-1, null));
expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformProgress(-1L, null, null));
assertThat(ex.getMessage(), equalTo("[total_docs] must be >0."));
ex = expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformProgress(1L, -1L));
assertThat(ex.getMessage(), equalTo("[docs_remaining] must be >0."));
ex = expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformProgress(1L, -1L, null));
assertThat(ex.getMessage(), equalTo("[docs_processed] must be >0."));
ex = expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformProgress(1L, 1L, -1L));
assertThat(ex.getMessage(), equalTo("[docs_indexed] must be >0."));
}
public void testBackwardsSerialization() throws IOException {
long totalDocs = 10_000;
long processedDocs = randomLongBetween(0, totalDocs);
// documentsIndexed are not in past versions, so it would be zero coming in
DataFrameTransformProgress progress = new DataFrameTransformProgress(totalDocs, processedDocs, 0L);
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_2_0);
progress.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
in.setVersion(Version.V_7_2_0);
DataFrameTransformProgress streamedProgress = new DataFrameTransformProgress(in);
assertEquals(progress, streamedProgress);
}
}
progress = new DataFrameTransformProgress(null, processedDocs, 0L);
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_2_0);
progress.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
in.setVersion(Version.V_7_2_0);
DataFrameTransformProgress streamedProgress = new DataFrameTransformProgress(in);
assertEquals(new DataFrameTransformProgress(0L, 0L, 0L), streamedProgress);
}
}
}
}

View File

@ -20,8 +20,10 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.oneOf;
public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
@ -211,7 +213,7 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
}
@SuppressWarnings("unchecked")
public void testGetProgressResetWithContinuous() throws Exception {
public void testGetStatsWithContinuous() throws Exception {
String transformId = "pivot_progress_continuous";
String transformDest = transformId + "_idx";
String transformSrc = "reviews_cont_pivot_test";
@ -244,18 +246,20 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
Map<String, Object> stats = entityAsMap(client().performRequest(getRequest));
List<Map<String, Object>> transformsStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", stats);
assertEquals(1, transformsStats.size());
/* TODO progress is now checkpoint progress and it may be that no checkpoint is in progress here
// Verify that the transforms progress
// No continuous checkpoints have been seen and thus all exponential averages should be 0.0
for (Map<String, Object> transformStats : transformsStats) {
Map<String, Object> progress =
(Map<String, Object>)XContentMapValues.extractValue("checkpointing.next.checkpoint_progress", transformStats);
assertThat("total_docs is not 1000", progress.get("total_docs"), equalTo(1000));
assertThat("docs_remaining is not 0", progress.get("docs_remaining"), equalTo(0));
assertThat("percent_complete is not 100.0", progress.get("percent_complete"), equalTo(100.0));
transformStats = (Map<String, Object>)transformStats.get("stats");
assertThat("exponential_avg_checkpoint_duration_ms is not 0.0",
transformStats.get("exponential_avg_checkpoint_duration_ms"),
equalTo(0.0));
assertThat("exponential_avg_documents_indexed is not 0.0",
transformStats.get("exponential_avg_documents_indexed"),
equalTo(0.0));
assertThat("exponential_avg_documents_processed is not 0.0",
transformStats.get("exponential_avg_documents_processed"),
equalTo(0.0));
}
*/
// add more docs to verify total_docs gets updated with continuous
int numDocs = 10;
final StringBuilder bulk = new StringBuilder();
long now = Instant.now().toEpochMilli() - 1_000;
@ -282,19 +286,27 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
waitForDataFrameCheckpoint(transformId, 2L);
// We should now have exp avgs since we have processed a continuous checkpoint
assertBusy(() -> {
Map<String, Object> statsResponse = entityAsMap(client().performRequest(getRequest));
List<Map<String, Object>> contStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", statsResponse);
assertEquals(1, contStats.size());
/* TODO progress is now checkpoint progress and it may be that no checkpoint is in progress here
for (Map<String, Object> transformStats : contStats) {
Map<String, Object> progress =
(Map<String, Object>)XContentMapValues.extractValue("checkpointing.next.checkpoint_progress", transformStats);
assertThat("total_docs is not 10", progress.get("total_docs"), equalTo(numDocs));
assertThat("docs_remaining is not 0", progress.get("docs_remaining"), equalTo(0));
assertThat("percent_complete is not 100.0", progress.get("percent_complete"), equalTo(100.0));
Map<String, Object> statsObj = (Map<String, Object>)transformStats.get("stats");
assertThat("exponential_avg_checkpoint_duration_ms is 0",
(Double)statsObj.get("exponential_avg_checkpoint_duration_ms"),
greaterThan(0.0));
assertThat("exponential_avg_documents_indexed is 0",
(Double)statsObj.get("exponential_avg_documents_indexed"),
greaterThan(0.0));
assertThat("exponential_avg_documents_processed is 0",
(Double)statsObj.get("exponential_avg_documents_processed"),
greaterThan(0.0));
Map<String, Object> checkpointing = (Map<String, Object>)transformStats.get("checkpointing");
assertThat("changes_last_detected_at is null",
checkpointing.get("changes_last_detected_at"),
is(notNullValue()));
}
*/
}, 60, TimeUnit.SECONDS);
}, 120, TimeUnit.SECONDS);
}
}

View File

@ -145,7 +145,7 @@ public class DataFrameTransformProgressIT extends ESRestTestCase {
TransformProgressGatherer.searchResponseToDataFrameTransformProgressFunction().apply(response);
assertThat(progress.getTotalDocs(), equalTo(1000L));
assertThat(progress.getRemainingDocs(), equalTo(1000L));
assertThat(progress.getDocumentsProcessed(), equalTo(0L));
assertThat(progress.getPercentComplete(), equalTo(0.0));
@ -166,7 +166,7 @@ public class DataFrameTransformProgressIT extends ESRestTestCase {
progress = TransformProgressGatherer.searchResponseToDataFrameTransformProgressFunction().apply(response);
assertThat(progress.getTotalDocs(), equalTo(35L));
assertThat(progress.getRemainingDocs(), equalTo(35L));
assertThat(progress.getDocumentsProcessed(), equalTo(0L));
assertThat(progress.getPercentComplete(), equalTo(0.0));
histgramGroupConfig = new GroupConfig(Collections.emptyMap(),
@ -186,7 +186,7 @@ public class DataFrameTransformProgressIT extends ESRestTestCase {
progress = TransformProgressGatherer.searchResponseToDataFrameTransformProgressFunction().apply(response);
assertThat(progress.getTotalDocs(), equalTo(0L));
assertThat(progress.getRemainingDocs(), equalTo(0L));
assertThat(progress.getDocumentsProcessed(), equalTo(0L));
assertThat(progress.getPercentComplete(), equalTo(100.0));
deleteIndex(REVIEWS_INDEX_NAME);

View File

@ -53,6 +53,7 @@ public final class DataFrameInternalIndex {
// data types
public static final String FLOAT = "float";
public static final String DOUBLE = "double";
public static final String LONG = "long";
public static final String KEYWORD = "keyword";
@ -173,6 +174,12 @@ public final class DataFrameInternalIndex {
.startObject(DataFrameTransformProgress.PERCENT_COMPLETE)
.field(TYPE, FLOAT)
.endObject()
.startObject(DataFrameTransformProgress.DOCS_INDEXED.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameTransformProgress.DOCS_PROCESSED.getPreferredName())
.field(TYPE, LONG)
.endObject()
.endObject()
.endObject()
.endObject()
@ -209,6 +216,15 @@ public final class DataFrameInternalIndex {
.startObject(DataFrameIndexerTransformStats.INDEX_FAILURES.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.endObject()
.endObject()
// This is obsolete and can be removed for future versions of the index, but is left here as a warning/reminder that

View File

@ -226,7 +226,8 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<DataFrameInd
// NOTE: progress is also mutated in ClientDataFrameIndexer#onFinished
if (progress != null) {
progress.docsProcessed(getStats().getNumDocuments() - docsBeforeProcess);
progress.incrementDocsProcessed(getStats().getNumDocuments() - docsBeforeProcess);
progress.incrementDocsIndexed(result.getToIndex().size());
}
return result;

View File

@ -21,6 +21,7 @@ import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
@ -53,6 +54,7 @@ import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.pivot.AggregationResultUtils;
import java.time.Instant;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@ -86,6 +88,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final DataFrameAuditor auditor;
private final DataFrameIndexerPosition initialPosition;
private final IndexerState initialIndexerState;
private volatile Instant changesLastDetectedAt;
private final SetOnce<ClientDataFrameIndexer> indexer = new SetOnce<>();
@ -197,7 +200,16 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
indexer.getNextCheckpoint(),
indexer.getPosition(),
indexer.getProgress(),
listener);
ActionListener.wrap(
info -> {
if (changesLastDetectedAt == null) {
listener.onResponse(info);
} else {
listener.onResponse(info.setChangesLastDetectedAt(changesLastDetectedAt));
}
},
listener::onFailure
));
}
public DataFrameTransformCheckpoint getLastCheckpoint() {
@ -319,6 +331,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
logger.debug("Trigger initial run");
getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
} else if (getIndexer().isContinuous() && getIndexer().sourceHasChanged()) {
changesLastDetectedAt = Instant.now();
logger.debug("Source has changed, triggering new indexer run");
getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
}
@ -616,6 +629,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
if (initialRun()) {
createCheckpoint(ActionListener.wrap(cp -> {
nextCheckpoint = cp;
// If nextCheckpoint > 1, this means that we are now on the checkpoint AFTER the batch checkpoint
// Consequently, the idea of percent complete no longer makes sense.
if (nextCheckpoint.getCheckpoint() > 1) {
progress = new DataFrameTransformProgress(null, 0L, 0L);
super.onStart(now, listener);
return;
}
TransformProgressGatherer.getInitialProgress(this.client, buildFilterQuery(), getConfig(), ActionListener.wrap(
newProgress -> {
logger.trace("[{}] reset the progress from [{}] to [{}]", transformId, progress, newProgress);
@ -825,14 +845,29 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
// Reset our failure count as we have finished and may start again with a new checkpoint
failureCount.set(0);
// TODO: progress hack to get around bucket_selector filtering out buckets
// With bucket_selector we could have read all the buckets and completed the transform
// but not "see" all the buckets since they were filtered out. Consequently, progress would
// show less than 100% even though we are done.
// NOTE: this method is called in the same thread as the processing thread.
// Theoretically, there should not be a race condition with updating progress here.
if (progress != null && progress.getRemainingDocs() > 0) {
progress.docsProcessed(progress.getRemainingDocs());
// NOTE 2: getPercentComplete should only NOT be null on the first (batch) checkpoint
if (progress != null && progress.getPercentComplete() != null && progress.getPercentComplete() < 100.0) {
progress.incrementDocsProcessed(progress.getTotalDocs() - progress.getDocumentsProcessed());
}
logger.info("Last checkpoint for {} {}", getJobId(), Strings.toString(lastCheckpoint));
// If the last checkpoint is now greater than 1, that means that we have just processed the first
// continuous checkpoint and should start recording the exponential averages
if (lastCheckpoint != null && lastCheckpoint.getCheckpoint() > 1) {
long docsIndexed = 0;
long docsProcessed = 0;
// This should not happen as we simply create a new one when we reach continuous checkpoints
// but this is a paranoid `null` check
if (progress != null) {
docsIndexed = progress.getDocumentsIndexed();
docsProcessed = progress.getDocumentsProcessed();
}
long durationMs = System.currentTimeMillis() - lastCheckpoint.getTimestamp();
getStats().incrementCheckpointExponentialAverages(durationMs < 0 ? 0 : durationMs, docsIndexed, docsProcessed);
}
if (shouldAuditOnFinish(checkpoint)) {
auditor.info(transformTask.getTransformId(),

View File

@ -73,6 +73,6 @@ public final class TransformProgressGatherer {
}
public static Function<SearchResponse, DataFrameTransformProgress> searchResponseToDataFrameTransformProgressFunction() {
return searchResponse -> new DataFrameTransformProgress(searchResponse.getHits().getTotalHits().value, null);
return searchResponse -> new DataFrameTransformProgress(searchResponse.getHits().getTotalHits().value, 0L, 0L);
}
}

View File

@ -246,6 +246,9 @@ teardown:
- gte: { transforms.0.stats.search_time_in_ms: 0 }
- lte: { transforms.0.stats.search_total: 1 }
- match: { transforms.0.stats.search_failures: 0 }
- match: { transforms.0.stats.exponential_avg_checkpoint_duration_ms: 0.0 }
- match: { transforms.0.stats.exponential_avg_documents_indexed: 0.0 }
- match: { transforms.0.stats.exponential_avg_documents_processed: 0.0 }
- do:
data_frame.stop_data_frame_transform: