[7.x][Transform][Rollup] add processing stats to record the ti… (#54027)

add 2 additional stats: processing time and processing total which capture the
time spent for processing results and how often it ran. The 2 new stats
correspond to the existing indexing and search stats. Together with indexing
and search this now allows the user to see the full picture, all 3 stages.
This commit is contained in:
Hendrik Muhs 2020-03-24 09:22:02 +01:00 committed by GitHub
parent cff10368b8
commit 7dcacf531f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 582 additions and 201 deletions

View File

@ -31,8 +31,10 @@ public abstract class IndexerJobStats {
public static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
public static ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
public static ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
public static ParseField PROCESSING_TIME_IN_MS = new ParseField("processing_time_in_ms");
public static ParseField INDEX_TOTAL = new ParseField("index_total");
public static ParseField SEARCH_TOTAL = new ParseField("search_total");
public static ParseField PROCESSING_TOTAL = new ParseField("processing_total");
public static ParseField SEARCH_FAILURES = new ParseField("search_failures");
public static ParseField INDEX_FAILURES = new ParseField("index_failures");
@ -44,11 +46,14 @@ public abstract class IndexerJobStats {
protected final long indexTotal;
protected final long searchTime;
protected final long searchTotal;
protected final long processingTime;
protected final long processingTotal;
protected final long indexFailures;
protected final long searchFailures;
public IndexerJobStats(long numPages, long numInputDocuments, long numOutputDocuments, long numInvocations,
long indexTime, long searchTime, long indexTotal, long searchTotal, long indexFailures, long searchFailures) {
long indexTime, long searchTime, long processingTime, long indexTotal, long searchTotal, long processingTotal,
long indexFailures, long searchFailures) {
this.numPages = numPages;
this.numInputDocuments = numInputDocuments;
this.numOuputDocuments = numOutputDocuments;
@ -57,6 +62,8 @@ public abstract class IndexerJobStats {
this.indexTotal = indexTotal;
this.searchTime = searchTime;
this.searchTotal = searchTotal;
this.processingTime = processingTime;
this.processingTotal = processingTotal;
this.indexFailures = indexFailures;
this.searchFailures = searchFailures;
}
@ -117,6 +124,13 @@ public abstract class IndexerJobStats {
return searchTime;
}
/**
* Returns the time spent processing (cumulative) in milliseconds
*/
public long getProcessingTime() {
return processingTime;
}
/**
* Returns the total number of indexing requests that have been processed
* (Note: this is not the number of _documents_ that have been indexed)
@ -132,6 +146,14 @@ public abstract class IndexerJobStats {
return searchTotal;
}
/**
* Returns the total number of processing runs that have been made
*/
public long getProcessingTotal() {
return processingTotal;
}
@Override
public boolean equals(Object other) {
if (this == other) {
@ -149,16 +171,19 @@ public abstract class IndexerJobStats {
&& Objects.equals(this.numInvocations, that.numInvocations)
&& Objects.equals(this.indexTime, that.indexTime)
&& Objects.equals(this.searchTime, that.searchTime)
&& Objects.equals(this.processingTime, that.processingTime)
&& Objects.equals(this.indexFailures, that.indexFailures)
&& Objects.equals(this.searchFailures, that.searchFailures)
&& Objects.equals(this.searchTotal, that.searchTotal)
&& Objects.equals(this.processingTotal, that.processingTotal)
&& Objects.equals(this.indexTotal, that.indexTotal);
}
@Override
public int hashCode() {
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, searchTotal, indexTotal);
indexTime, searchTime, processingTime, indexFailures, searchFailures, searchTotal,
indexTotal, processingTotal);
}
@Override
@ -172,6 +197,8 @@ public abstract class IndexerJobStats {
+ ", index_time_in_ms=" + indexTime
+ ", index_total=" + indexTotal
+ ", search_time_in_ms=" + searchTime
+ ", search_total=" + searchTotal+ "}";
+ ", search_total=" + searchTotal
+ ", processing_time_in_ms=" + processingTime
+ ", processing_total=" + processingTotal + "}";
}
}

View File

@ -177,16 +177,18 @@ public class GetRollupJobResponse {
public static class RollupIndexerJobStats extends IndexerJobStats {
RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
long indexTime, long indexTotal, long searchTime, long searchTotal, long indexFailures, long searchFailures) {
long indexTime, long indexTotal, long searchTime, long searchTotal, long processingTime,
long processingTotal, long indexFailures, long searchFailures) {
super(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexTotal, searchTotal, indexFailures, searchFailures);
indexTime, searchTime, processingTime, indexTotal, searchTotal, processingTotal, indexFailures, searchFailures);
}
private static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER = new ConstructingObjectParser<>(
STATS.getPreferredName(),
true,
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3],
(long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9]));
(long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9],
(long) args[10], (long) args[11]));
static {
PARSER.declareLong(constructorArg(), NUM_PAGES);
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
@ -196,6 +198,8 @@ public class GetRollupJobResponse {
PARSER.declareLong(constructorArg(), INDEX_TOTAL);
PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
PARSER.declareLong(constructorArg(), PROCESSING_TIME_IN_MS);
PARSER.declareLong(constructorArg(), PROCESSING_TOTAL);
PARSER.declareLong(constructorArg(), INDEX_FAILURES);
PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class TransformIndexerStats extends IndexerJobStats {
@ -39,21 +38,38 @@ public class TransformIndexerStats extends IndexerJobStats {
public static final ConstructingObjectParser<TransformIndexerStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
NAME,
true,
args -> new TransformIndexerStats((long) args[0], (long) args[1], (long) args[2],
(long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9],
(Double) args[10], (Double) args[11], (Double) args[12]));
args -> new TransformIndexerStats(
unboxSafe(args[0], 0L),
unboxSafe(args[1], 0L),
unboxSafe(args[2], 0L),
unboxSafe(args[3], 0L),
unboxSafe(args[4], 0L),
unboxSafe(args[5], 0L),
unboxSafe(args[6], 0L),
unboxSafe(args[7], 0L),
unboxSafe(args[8], 0L),
unboxSafe(args[9], 0L),
unboxSafe(args[10], 0L),
unboxSafe(args[11], 0L),
unboxSafe(args[12], 0.0),
unboxSafe(args[13], 0.0),
unboxSafe(args[14], 0.0)
)
);
static {
LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES);
LENIENT_PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
LENIENT_PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
LENIENT_PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_TOTAL);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_FAILURES);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_PAGES);
LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_INPUT_DOCUMENTS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_OUTPUT_DOCUMENTS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_INVOCATIONS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), INDEX_TIME_IN_MS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), SEARCH_TIME_IN_MS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), PROCESSING_TIME_IN_MS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), INDEX_TOTAL);
LENIENT_PARSER.declareLong(optionalConstructorArg(), SEARCH_TOTAL);
LENIENT_PARSER.declareLong(optionalConstructorArg(), PROCESSING_TOTAL);
LENIENT_PARSER.declareLong(optionalConstructorArg(), INDEX_FAILURES);
LENIENT_PARSER.declareLong(optionalConstructorArg(), SEARCH_FAILURES);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_INDEXED);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_PROCESSED);
@ -67,16 +83,40 @@ public class TransformIndexerStats extends IndexerJobStats {
private final double expAvgDocumentsIndexed;
private final double expAvgDocumentsProcessed;
public TransformIndexerStats(long numPages, long numInputDocuments, long numOuputDocuments,
long numInvocations, long indexTime, long searchTime,
long indexTotal, long searchTotal, long indexFailures, long searchFailures,
Double expAvgCheckpointDurationMs, Double expAvgDocumentsIndexed,
Double expAvgDocumentsProcessed) {
super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime,
indexTotal, searchTotal, indexFailures, searchFailures);
this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs == null ? 0.0 : expAvgCheckpointDurationMs;
this.expAvgDocumentsIndexed = expAvgDocumentsIndexed == null ? 0.0 : expAvgDocumentsIndexed;
this.expAvgDocumentsProcessed = expAvgDocumentsProcessed == null ? 0.0 : expAvgDocumentsProcessed;
public TransformIndexerStats(
long numPages,
long numInputDocuments,
long numOuputDocuments,
long numInvocations,
long indexTime,
long searchTime,
long processingTime,
long indexTotal,
long searchTotal,
long processingTotal,
long indexFailures,
long searchFailures,
double expAvgCheckpointDurationMs,
double expAvgDocumentsIndexed,
double expAvgDocumentsProcessed
) {
super(
numPages,
numInputDocuments,
numOuputDocuments,
numInvocations,
indexTime,
searchTime,
processingTime,
indexTotal,
searchTotal,
processingTotal,
indexFailures,
searchFailures
);
this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs;
this.expAvgDocumentsIndexed = expAvgDocumentsIndexed;
this.expAvgDocumentsProcessed = expAvgDocumentsProcessed;
}
public double getExpAvgCheckpointDurationMs() {
@ -109,10 +149,12 @@ public class TransformIndexerStats extends IndexerJobStats {
&& Objects.equals(this.numInvocations, that.numInvocations)
&& Objects.equals(this.indexTime, that.indexTime)
&& Objects.equals(this.searchTime, that.searchTime)
&& Objects.equals(this.processingTime, that.processingTime)
&& Objects.equals(this.indexFailures, that.indexFailures)
&& Objects.equals(this.searchFailures, that.searchFailures)
&& Objects.equals(this.indexTotal, that.indexTotal)
&& Objects.equals(this.searchTotal, that.searchTotal)
&& Objects.equals(this.processingTotal, that.processingTotal)
&& Objects.equals(this.expAvgCheckpointDurationMs, that.expAvgCheckpointDurationMs)
&& Objects.equals(this.expAvgDocumentsIndexed, that.expAvgDocumentsIndexed)
&& Objects.equals(this.expAvgDocumentsProcessed, that.expAvgDocumentsProcessed);
@ -120,8 +162,31 @@ public class TransformIndexerStats extends IndexerJobStats {
@Override
public int hashCode() {
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal,
expAvgCheckpointDurationMs, expAvgDocumentsIndexed, expAvgDocumentsProcessed);
return Objects.hash(
numPages,
numInputDocuments,
numOuputDocuments,
numInvocations,
indexTime,
searchTime,
processingTime,
indexFailures,
searchFailures,
indexTotal,
searchTotal,
processingTotal,
expAvgCheckpointDurationMs,
expAvgDocumentsIndexed,
expAvgDocumentsProcessed
);
}
@SuppressWarnings("unchecked")
private static <T> T unboxSafe(Object l, T default_value) {
if (l == null) {
return default_value;
} else {
return (T) l;
}
}
}

View File

@ -442,6 +442,8 @@ public class TransformIT extends ESRestHighLevelClientTestCase {
0L,
0L,
0L,
0L,
0L,
0.0,
0.0,
0.0);

View File

@ -64,8 +64,9 @@ public class GetRollupJobResponseTests extends ESTestCase {
private RollupIndexerJobStats randomStats() {
return new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong());
}
private RollupJobStatus randomStatus() {
@ -124,6 +125,8 @@ public class GetRollupJobResponseTests extends ESTestCase {
builder.field(IndexerJobStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
builder.field(IndexerJobStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
builder.field(IndexerJobStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
builder.field(IndexerJobStats.PROCESSING_TIME_IN_MS.getPreferredName(), stats.getProcessingTime());
builder.field(IndexerJobStats.PROCESSING_TOTAL.getPreferredName(), stats.getProcessingTotal());
builder.endObject();
}

View File

@ -31,41 +31,103 @@ public class TransformIndexerStatsTests extends ESTestCase {
public void testFromXContent() throws IOException {
xContentTester(
this::createParser,
TransformIndexerStatsTests::randomStats,
TransformIndexerStatsTests::toXContent,
TransformIndexerStats::fromXContent)
.supportsUnknownFields(true)
.test();
this::createParser,
TransformIndexerStatsTests::randomStats,
TransformIndexerStatsTests::toXContent,
TransformIndexerStats::fromXContent
).supportsUnknownFields(true).test();
}
public static TransformIndexerStats randomStats() {
return new TransformIndexerStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
return new TransformIndexerStats(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomDouble(),
randomDouble(),
randomDouble()
);
}
public static void toXContent(TransformIndexerStats stats, XContentBuilder builder) throws IOException {
builder.startObject();
builder.field(IndexerJobStats.NUM_PAGES.getPreferredName(), stats.getNumPages());
builder.field(IndexerJobStats.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
builder.field(IndexerJobStats.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
builder.field(IndexerJobStats.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
builder.field(IndexerJobStats.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
builder.field(IndexerJobStats.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
builder.field(IndexerJobStats.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
builder.field(IndexerJobStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
builder.field(IndexerJobStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
builder.field(IndexerJobStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
builder.field(TransformIndexerStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
stats.getExpAvgCheckpointDurationMs());
builder.field(TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(),
stats.getExpAvgDocumentsIndexed());
builder.field(TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(),
stats.getExpAvgDocumentsProcessed());
if (randomBoolean()) {
builder.field(IndexerJobStats.NUM_PAGES.getPreferredName(), stats.getNumPages());
builder.field(IndexerJobStats.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
builder.field(IndexerJobStats.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
builder.field(IndexerJobStats.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
builder.field(IndexerJobStats.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
builder.field(IndexerJobStats.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
builder.field(IndexerJobStats.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
builder.field(IndexerJobStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
builder.field(IndexerJobStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
builder.field(IndexerJobStats.PROCESSING_TIME_IN_MS.getPreferredName(), stats.getProcessingTime());
builder.field(IndexerJobStats.PROCESSING_TOTAL.getPreferredName(), stats.getProcessingTotal());
builder.field(IndexerJobStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
builder.field(
TransformIndexerStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
stats.getExpAvgCheckpointDurationMs()
);
builder.field(TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(), stats.getExpAvgDocumentsIndexed());
builder.field(
TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(),
stats.getExpAvgDocumentsProcessed()
);
} else {
// a toXContent version which leaves out field with value 0 (simulating the case that an older version misses a field)
xContentFieldIfNotZero(builder, IndexerJobStats.NUM_PAGES.getPreferredName(), stats.getNumPages());
xContentFieldIfNotZero(builder, IndexerJobStats.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
xContentFieldIfNotZero(builder, IndexerJobStats.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
xContentFieldIfNotZero(builder, IndexerJobStats.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
xContentFieldIfNotZero(builder, IndexerJobStats.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
xContentFieldIfNotZero(builder, IndexerJobStats.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
xContentFieldIfNotZero(builder, IndexerJobStats.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
xContentFieldIfNotZero(builder, IndexerJobStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
xContentFieldIfNotZero(builder, IndexerJobStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
xContentFieldIfNotZero(builder, IndexerJobStats.PROCESSING_TIME_IN_MS.getPreferredName(), stats.getProcessingTime());
xContentFieldIfNotZero(builder, IndexerJobStats.PROCESSING_TOTAL.getPreferredName(), stats.getProcessingTotal());
xContentFieldIfNotZero(builder, IndexerJobStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
xContentFieldIfNotZero(
builder,
TransformIndexerStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
stats.getExpAvgCheckpointDurationMs()
);
xContentFieldIfNotZero(
builder,
TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(),
stats.getExpAvgDocumentsIndexed()
);
xContentFieldIfNotZero(
builder,
TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(),
stats.getExpAvgDocumentsProcessed()
);
}
builder.endObject();
}
private static XContentBuilder xContentFieldIfNotZero(XContentBuilder builder, String name, long value) throws IOException {
if (value > 0) {
builder.field(name, value);
}
return builder;
}
private static XContentBuilder xContentFieldIfNotZero(XContentBuilder builder, String name, double value) throws IOException {
if (value > 0.0) {
builder.field(name, value);
}
return builder;
}
}

View File

@ -30,17 +30,31 @@ import static org.elasticsearch.client.transform.transforms.hlrc.TransformStatsT
public class TransformIndexerStatsTests extends AbstractResponseTestCase<
org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats,
TransformIndexerStats> {
TransformIndexerStats> {
public static org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats randomStats() {
return new org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats(
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomDouble(),
randomDouble(),
randomDouble()
);
}
@Override
protected org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats createServerTestInstance(XContentType xContentType) {
return new org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats(randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
return randomStats();
}
@Override
@ -49,8 +63,10 @@ public class TransformIndexerStatsTests extends AbstractResponseTestCase<
}
@Override
protected void assertInstances(org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats serverTestInstance,
TransformIndexerStats clientInstance) {
protected void assertInstances(
org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats serverTestInstance,
TransformIndexerStats clientInstance
) {
assertTransformIndexerStats(serverTestInstance, clientInstance);
}
}

View File

@ -36,40 +36,36 @@ import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class TransformStatsTests extends AbstractResponseTestCase<org.elasticsearch.xpack.core.transform.transforms.TransformStats,
public class TransformStatsTests extends AbstractResponseTestCase<
org.elasticsearch.xpack.core.transform.transforms.TransformStats,
org.elasticsearch.client.transform.transforms.TransformStats> {
public static org.elasticsearch.xpack.core.transform.transforms.NodeAttributes randomNodeAttributes() {
int numberOfAttributes = randomIntBetween(1, 10);
Map<String, String> attributes = new HashMap<>(numberOfAttributes);
for(int i = 0; i < numberOfAttributes; i++) {
for (int i = 0; i < numberOfAttributes; i++) {
String val = randomAlphaOfLength(10);
attributes.put("key-"+i, val);
attributes.put("key-" + i, val);
}
return new org.elasticsearch.xpack.core.transform.transforms.NodeAttributes(randomAlphaOfLength(10),
return new org.elasticsearch.xpack.core.transform.transforms.NodeAttributes(
randomAlphaOfLength(10),
randomAlphaOfLength(10),
randomAlphaOfLength(10),
attributes);
randomAlphaOfLength(10),
attributes
);
}
public static org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats randomStats() {
return new org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats(randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
}
@Override
protected org.elasticsearch.xpack.core.transform.transforms.TransformStats createServerTestInstance(XContentType xContentType) {
return new org.elasticsearch.xpack.core.transform.transforms.TransformStats(randomAlphaOfLength(10),
return new org.elasticsearch.xpack.core.transform.transforms.TransformStats(
randomAlphaOfLength(10),
randomFrom(org.elasticsearch.xpack.core.transform.transforms.TransformStats.State.values()),
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : randomNodeAttributes(),
randomStats(),
TransformCheckpointingInfoTests.randomTransformCheckpointingInfo());
TransformIndexerStatsTests.randomStats(),
TransformCheckpointingInfoTests.randomTransformCheckpointingInfo()
);
}
@Override
@ -78,8 +74,10 @@ public class TransformStatsTests extends AbstractResponseTestCase<org.elasticsea
}
@Override
protected void assertInstances(org.elasticsearch.xpack.core.transform.transforms.TransformStats serverTestInstance,
TransformStats clientInstance) {
protected void assertInstances(
org.elasticsearch.xpack.core.transform.transforms.TransformStats serverTestInstance,
TransformStats clientInstance
) {
assertThat(serverTestInstance.getId(), equalTo(clientInstance.getId()));
assertThat(serverTestInstance.getState().value(), equalTo(clientInstance.getState().value()));
assertTransformIndexerStats(serverTestInstance.getIndexerStats(), clientInstance.getIndexerStats());
@ -88,8 +86,10 @@ public class TransformStatsTests extends AbstractResponseTestCase<org.elasticsea
assertThat(serverTestInstance.getReason(), equalTo(clientInstance.getReason()));
}
private void assertNodeAttributes(org.elasticsearch.xpack.core.transform.transforms.NodeAttributes serverTestInstance,
NodeAttributes clientInstance) {
private void assertNodeAttributes(
org.elasticsearch.xpack.core.transform.transforms.NodeAttributes serverTestInstance,
NodeAttributes clientInstance
) {
if (serverTestInstance == null || clientInstance == null) {
assertNull(serverTestInstance);
assertNull(clientInstance);
@ -102,8 +102,10 @@ public class TransformStatsTests extends AbstractResponseTestCase<org.elasticsea
assertThat(serverTestInstance.getTransportAddress(), equalTo(clientInstance.getTransportAddress()));
}
public static void assertTransformProgress(org.elasticsearch.xpack.core.transform.transforms.TransformProgress serverTestInstance,
TransformProgress clientInstance) {
public static void assertTransformProgress(
org.elasticsearch.xpack.core.transform.transforms.TransformProgress serverTestInstance,
TransformProgress clientInstance
) {
if (serverTestInstance == null || clientInstance == null) {
assertNull(serverTestInstance);
assertNull(clientInstance);
@ -115,16 +117,18 @@ public class TransformStatsTests extends AbstractResponseTestCase<org.elasticsea
assertThat(serverTestInstance.getDocumentsIndexed(), equalTo(clientInstance.getDocumentsIndexed()));
}
public static void assertPosition(org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition serverTestInstance,
TransformIndexerPosition clientInstance) {
public static void assertPosition(
org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition serverTestInstance,
TransformIndexerPosition clientInstance
) {
assertThat(serverTestInstance.getIndexerPosition(), equalTo(clientInstance.getIndexerPosition()));
assertThat(serverTestInstance.getBucketsPosition(), equalTo(clientInstance.getBucketsPosition()));
}
public static void assertTransformCheckpointStats(
org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats serverTestInstance,
TransformCheckpointStats clientInstance) {
org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats serverTestInstance,
TransformCheckpointStats clientInstance
) {
assertTransformProgress(serverTestInstance.getCheckpointProgress(), clientInstance.getCheckpointProgress());
assertThat(serverTestInstance.getCheckpoint(), equalTo(clientInstance.getCheckpoint()));
assertPosition(serverTestInstance.getPosition(), clientInstance.getPosition());
@ -133,8 +137,9 @@ public class TransformStatsTests extends AbstractResponseTestCase<org.elasticsea
}
public static void assertTransformCheckpointInfo(
org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo serverTestInstance,
TransformCheckpointingInfo clientInstance) {
org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo serverTestInstance,
TransformCheckpointingInfo clientInstance
) {
assertTransformCheckpointStats(serverTestInstance.getNext(), clientInstance.getNext());
assertTransformCheckpointStats(serverTestInstance.getLast(), clientInstance.getLast());
assertThat(serverTestInstance.getChangesLastDetectedAt(), equalTo(clientInstance.getChangesLastDetectedAt()));
@ -142,8 +147,9 @@ public class TransformStatsTests extends AbstractResponseTestCase<org.elasticsea
}
public static void assertTransformIndexerStats(
org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats serverTestInstance,
TransformIndexerStats clientInstance) {
org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats serverTestInstance,
TransformIndexerStats clientInstance
) {
assertThat(serverTestInstance.getExpAvgCheckpointDurationMs(), equalTo(clientInstance.getExpAvgCheckpointDurationMs()));
assertThat(serverTestInstance.getExpAvgDocumentsProcessed(), equalTo(clientInstance.getExpAvgDocumentsProcessed()));
assertThat(serverTestInstance.getExpAvgDocumentsIndexed(), equalTo(clientInstance.getExpAvgDocumentsIndexed()));

View File

@ -143,14 +143,16 @@ The API yields the following response:
"index_total": 0,
"search_failures": 0,
"search_time_in_ms": 0,
"search_total": 0
"search_total": 0,
"processing_time_in_ms": 0,
"processing_total": 0
}
}
]
}
----
The `jobs` array contains a single job (`id: sensor`) since we requested a single job in the endpoint's URL.
The `jobs` array contains a single job (`id: sensor`) since we requested a single job in the endpoint's URL.
If we add another job, we can see how multi-job responses are handled:
[source,console]
@ -247,7 +249,9 @@ Which will yield the following response:
"index_total": 0,
"search_failures": 0,
"search_time_in_ms": 0,
"search_total": 0
"search_total": 0,
"processing_time_in_ms": 0,
"processing_total": 0
}
},
{
@ -302,7 +306,9 @@ Which will yield the following response:
"index_total": 0,
"search_failures": 0,
"search_time_in_ms": 0,
"search_total": 0
"search_total": 0,
"processing_time_in_ms": 0,
"processing_total": 0
}
}
]

View File

@ -347,7 +347,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
// allowPartialSearchResults is set to false, so we should never see shard failures here
assert (searchResponse.getShardFailures().length == 0);
stats.markStartProcessing();
stats.incrementNumPages(1);
IterationResult<JobPosition> iterationResult = doProcess(searchResponse);
@ -355,11 +355,11 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down.");
position.set(iterationResult.getPosition());
stats.markEndProcessing();
// execute finishing tasks
onFinish(ActionListener.wrap(
r -> doSaveState(finishAndSetState(), position.get(), () -> {}),
e -> doSaveState(finishAndSetState(), position.get(), () -> {})));
return;
}
@ -369,7 +369,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
if (docs.isEmpty() == false) {
final BulkRequest bulkRequest = new BulkRequest();
docs.forEach(bulkRequest::add);
stats.markEndProcessing();
stats.markStartIndexing();
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
// TODO we should check items in the response and move after accordingly to
@ -390,6 +390,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
onBulkResponse(bulkResponse, newPosition);
}, this::finishWithIndexingFailure));
} else {
stats.markEndProcessing();
// no documents need to be indexed, continue with search
try {
JobPosition newPosition = iterationResult.getPosition();

View File

@ -32,26 +32,31 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
protected long searchTime = 0;
protected long indexTotal = 0;
protected long searchTotal = 0;
protected long processingTime = 0;
protected long processingTotal = 0;
protected long indexFailures = 0;
protected long searchFailures = 0;
private long startIndexTime;
private long startSearchTime;
private long startProcessingTime;
public IndexerJobStats() {
}
public IndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
long indexTime, long searchTime, long indexTotal, long searchTotal,
long indexFailures, long searchFailures) {
long indexTime, long searchTime, long processingTime, long indexTotal, long searchTotal,
long processingTotal, long indexFailures, long searchFailures) {
this.numPages = numPages;
this.numInputDocuments = numInputDocuments;
this.numOuputDocuments = numOuputDocuments;
this.numInvocations = numInvocations;
this.indexTime = indexTime;
this.searchTime = searchTime;
this.processingTime = processingTime;
this.indexTotal = indexTotal;
this.searchTotal = searchTotal;
this.processingTotal = processingTotal;
this.indexFailures = indexFailures;
this.searchFailures = searchFailures;
}
@ -68,6 +73,11 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
this.searchTotal = in.readVLong();
this.indexFailures = in.readVLong();
this.searchFailures = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
this.processingTime = in.readVLong();
this.processingTotal = in.readVLong();
}
}
}
@ -103,6 +113,10 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
return searchTime;
}
public long getProcessingTime() {
return processingTime;
}
public long getIndexTotal() {
return indexTotal;
}
@ -111,6 +125,10 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
return searchTotal;
}
public long getProcessingTotal() {
return processingTotal;
}
public void incrementNumPages(long n) {
assert(n >= 0);
numPages += n;
@ -157,6 +175,15 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
searchTotal += 1;
}
public void markStartProcessing() {
this.startProcessingTime = System.nanoTime();
}
public void markEndProcessing() {
processingTime += ((System.nanoTime() - startProcessingTime) / 1000000);
processingTotal += 1;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(numPages);
@ -171,6 +198,10 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
out.writeVLong(indexFailures);
out.writeVLong(searchFailures);
}
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
out.writeVLong(processingTime);
out.writeVLong(processingTotal);
}
}
@Override
@ -191,15 +222,17 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
&& Objects.equals(this.numInvocations, that.numInvocations)
&& Objects.equals(this.indexTime, that.indexTime)
&& Objects.equals(this.searchTime, that.searchTime)
&& Objects.equals(this.processingTime, that.processingTime)
&& Objects.equals(this.indexFailures, that.indexFailures)
&& Objects.equals(this.searchFailures, that.searchFailures)
&& Objects.equals(this.indexTotal, that.indexTotal)
&& Objects.equals(this.searchTotal, that.searchTotal);
&& Objects.equals(this.searchTotal, that.searchTotal)
&& Objects.equals(this.processingTotal, that.processingTotal);
}
@Override
public int hashCode() {
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal);
indexTime, searchTime, processingTime, indexFailures, searchFailures, indexTotal, searchTotal, processingTotal);
}
}

View File

@ -27,15 +27,18 @@ public class RollupIndexerJobStats extends IndexerJobStats {
private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
private static ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
private static ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
private static ParseField PROCESSING_TIME_IN_MS = new ParseField("processing_time_in_ms");
private static ParseField INDEX_TOTAL = new ParseField("index_total");
private static ParseField SEARCH_TOTAL = new ParseField("search_total");
private static ParseField PROCESSING_TOTAL = new ParseField("processing_total");
private static ParseField SEARCH_FAILURES = new ParseField("search_failures");
private static ParseField INDEX_FAILURES = new ParseField("index_failures");
public static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER =
new ConstructingObjectParser<>(NAME.getPreferredName(),
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3],
(long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9]));
(long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9],
(long) args[10], (long) args[11]));
static {
PARSER.declareLong(constructorArg(), NUM_PAGES);
@ -44,8 +47,10 @@ public class RollupIndexerJobStats extends IndexerJobStats {
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
PARSER.declareLong(constructorArg(), PROCESSING_TIME_IN_MS);
PARSER.declareLong(constructorArg(), INDEX_TOTAL);
PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
PARSER.declareLong(constructorArg(), PROCESSING_TOTAL);
PARSER.declareLong(constructorArg(), INDEX_FAILURES);
PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
}
@ -55,10 +60,10 @@ public class RollupIndexerJobStats extends IndexerJobStats {
}
public RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
long indexTime, long searchTime, long indexTotal, long searchTotal, long indexFailures,
long searchFailures) {
super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime,
indexTotal, searchTotal, indexFailures, searchFailures);
long indexTime, long searchTime, long processingTime, long indexTotal, long searchTotal,
long processingTotal, long indexFailures, long searchFailures) {
super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime, processingTime,
indexTotal, searchTotal, processingTotal, indexFailures, searchFailures);
}
public RollupIndexerJobStats(StreamInput in) throws IOException {
@ -78,6 +83,8 @@ public class RollupIndexerJobStats extends IndexerJobStats {
builder.field(SEARCH_TIME_IN_MS.getPreferredName(), searchTime);
builder.field(SEARCH_TOTAL.getPreferredName(), searchTotal);
builder.field(SEARCH_FAILURES.getPreferredName(), searchFailures);
builder.field(PROCESSING_TIME_IN_MS.getPreferredName(), processingTime);
builder.field(PROCESSING_TOTAL.getPreferredName(), processingTotal);
builder.endObject();
return builder;
}

View File

@ -19,7 +19,6 @@ import org.elasticsearch.xpack.core.indexing.IndexerJobStats;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class TransformIndexerStats extends IndexerJobStats {
@ -33,8 +32,10 @@ public class TransformIndexerStats extends IndexerJobStats {
public static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
public static ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
public static ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
public static ParseField PROCESSING_TIME_IN_MS = new ParseField("processing_time_in_ms");
public static ParseField INDEX_TOTAL = new ParseField("index_total");
public static ParseField SEARCH_TOTAL = new ParseField("search_total");
public static ParseField PROCESSING_TOTAL = new ParseField("processing_total");
public static ParseField SEARCH_FAILURES = new ParseField("search_failures");
public static ParseField INDEX_FAILURES = new ParseField("index_failures");
public static ParseField EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS = new ParseField("exponential_avg_checkpoint_duration_ms");
@ -50,33 +51,37 @@ public class TransformIndexerStats extends IndexerJobStats {
NAME,
true,
args -> new TransformIndexerStats(
(long) args[0],
(long) args[1],
(long) args[2],
(long) args[3],
(long) args[4],
(long) args[5],
(long) args[6],
(long) args[7],
(long) args[8],
(long) args[9],
(Double) args[10],
(Double) args[11],
(Double) args[12]
unboxSafe(args[0], 0L),
unboxSafe(args[1], 0L),
unboxSafe(args[2], 0L),
unboxSafe(args[3], 0L),
unboxSafe(args[4], 0L),
unboxSafe(args[5], 0L),
unboxSafe(args[6], 0L),
unboxSafe(args[7], 0L),
unboxSafe(args[8], 0L),
unboxSafe(args[9], 0L),
unboxSafe(args[10], 0L),
unboxSafe(args[11], 0L),
unboxSafe(args[12], 0.0),
unboxSafe(args[13], 0.0),
unboxSafe(args[14], 0.0)
)
);
static {
LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES);
LENIENT_PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
LENIENT_PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
LENIENT_PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_TOTAL);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_FAILURES);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_PAGES);
LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_INPUT_DOCUMENTS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_OUTPUT_DOCUMENTS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_INVOCATIONS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), INDEX_TIME_IN_MS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), SEARCH_TIME_IN_MS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), PROCESSING_TIME_IN_MS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), INDEX_TOTAL);
LENIENT_PARSER.declareLong(optionalConstructorArg(), SEARCH_TOTAL);
LENIENT_PARSER.declareLong(optionalConstructorArg(), PROCESSING_TOTAL);
LENIENT_PARSER.declareLong(optionalConstructorArg(), INDEX_FAILURES);
LENIENT_PARSER.declareLong(optionalConstructorArg(), SEARCH_FAILURES);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_INDEXED);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_PROCESSED);
@ -100,13 +105,15 @@ public class TransformIndexerStats extends IndexerJobStats {
long numInvocations,
long indexTime,
long searchTime,
long processingTime,
long indexTotal,
long searchTotal,
long processingTotal,
long indexFailures,
long searchFailures,
Double expAvgCheckpointDurationMs,
Double expAvgDocumentsIndexed,
Double expAvgDocumentsProcessed
double expAvgCheckpointDurationMs,
double expAvgDocumentsIndexed,
double expAvgDocumentsProcessed
) {
super(
numPages,
@ -115,14 +122,16 @@ public class TransformIndexerStats extends IndexerJobStats {
numInvocations,
indexTime,
searchTime,
processingTime,
indexTotal,
searchTotal,
processingTotal,
indexFailures,
searchFailures
);
this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs == null ? 0.0 : expAvgCheckpointDurationMs;
this.expAvgDocumentsIndexed = expAvgDocumentsIndexed == null ? 0.0 : expAvgDocumentsIndexed;
this.expAvgDocumentsProcessed = expAvgDocumentsProcessed == null ? 0.0 : expAvgDocumentsProcessed;
this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs;
this.expAvgDocumentsIndexed = expAvgDocumentsIndexed;
this.expAvgDocumentsProcessed = expAvgDocumentsProcessed;
}
public TransformIndexerStats(TransformIndexerStats other) {
@ -133,8 +142,10 @@ public class TransformIndexerStats extends IndexerJobStats {
other.numInvocations,
other.indexTime,
other.searchTime,
other.processingTime,
other.indexTotal,
other.searchTotal,
other.processingTotal,
other.indexFailures,
other.searchFailures,
other.expAvgCheckpointDurationMs,
@ -181,6 +192,8 @@ public class TransformIndexerStats extends IndexerJobStats {
builder.field(SEARCH_TIME_IN_MS.getPreferredName(), searchTime);
builder.field(SEARCH_TOTAL.getPreferredName(), searchTotal);
builder.field(SEARCH_FAILURES.getPreferredName(), searchFailures);
builder.field(PROCESSING_TIME_IN_MS.getPreferredName(), processingTime);
builder.field(PROCESSING_TOTAL.getPreferredName(), processingTotal);
builder.field(EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(), this.expAvgCheckpointDurationMs);
builder.field(EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(), this.expAvgDocumentsIndexed);
builder.field(EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(), this.expAvgDocumentsProcessed);
@ -238,10 +251,12 @@ public class TransformIndexerStats extends IndexerJobStats {
&& Objects.equals(this.numInvocations, that.numInvocations)
&& Objects.equals(this.indexTime, that.indexTime)
&& Objects.equals(this.searchTime, that.searchTime)
&& Objects.equals(this.processingTime, that.processingTime)
&& Objects.equals(this.indexFailures, that.indexFailures)
&& Objects.equals(this.searchFailures, that.searchFailures)
&& Objects.equals(this.indexTotal, that.indexTotal)
&& Objects.equals(this.searchTotal, that.searchTotal)
&& Objects.equals(this.processingTotal, that.processingTotal)
&& Objects.equals(this.expAvgCheckpointDurationMs, that.expAvgCheckpointDurationMs)
&& Objects.equals(this.expAvgDocumentsIndexed, that.expAvgDocumentsIndexed)
&& Objects.equals(this.expAvgDocumentsProcessed, that.expAvgDocumentsProcessed);
@ -256,10 +271,12 @@ public class TransformIndexerStats extends IndexerJobStats {
numInvocations,
indexTime,
searchTime,
processingTime,
indexFailures,
searchFailures,
indexTotal,
searchTotal,
processingTotal,
expAvgCheckpointDurationMs,
expAvgDocumentsIndexed,
expAvgDocumentsProcessed
@ -278,4 +295,13 @@ public class TransformIndexerStats extends IndexerJobStats {
throw new RuntimeException(e);
}
}
@SuppressWarnings("unchecked")
private static <T> T unboxSafe(Object l, T default_value) {
if (l == null) {
return default_value;
} else {
return (T) l;
}
}
}

View File

@ -24,7 +24,7 @@ public final class TransformInternalIndexConstants {
// internal index
// version is not a rollover pattern, however padded because sort is string based
public static final String INDEX_VERSION = "004";
public static final String INDEX_VERSION = "005";
public static final String INDEX_PATTERN = ".transform-internal-";
public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION;
public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME;

View File

@ -43,7 +43,8 @@ public class JobWrapperSerializingTests extends AbstractSerializingTestCase<GetR
return new GetRollupJobsAction.JobWrapper(ConfigTestHelpers.randomRollupJobConfig(random()),
new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong()),
new RollupJobStatus(state, Collections.emptyMap(), randomBoolean()));
}
}

View File

@ -29,7 +29,8 @@ public class RollupIndexerJobStatsTests extends AbstractSerializingTestCase<Roll
public static RollupIndexerJobStats randomStats() {
return new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong());
}
@Override

View File

@ -7,14 +7,43 @@
package org.elasticsearch.xpack.core.transform.transforms;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
public class TransformIndexerStatsTests extends AbstractSerializingTestCase<TransformIndexerStats> {
public static TransformIndexerStats randomStats() {
return new TransformIndexerStats(
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomDouble(),
randomDouble(),
randomDouble()
);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected TransformIndexerStats createTestInstance() {
return randomStats();
@ -30,24 +59,6 @@ public class TransformIndexerStatsTests extends AbstractSerializingTestCase<Tran
return TransformIndexerStats.fromXContent(parser);
}
public static TransformIndexerStats randomStats() {
return new TransformIndexerStats(
randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomBoolean() ? randomDouble() : null,
randomBoolean() ? randomDouble() : null,
randomBoolean() ? randomDouble() : null
);
}
public void testExpAvgIncrement() {
TransformIndexerStats stats = new TransformIndexerStats();
@ -67,4 +78,66 @@ public class TransformIndexerStatsTests extends AbstractSerializingTestCase<Tran
assertThat(stats.getExpAvgDocumentsIndexed(), closeTo(20.54545454, 0.0000001));
assertThat(stats.getExpAvgDocumentsProcessed(), closeTo(59.0909090, 0.0000001));
}
public void testXContentLeniencyForMissingFields() throws IOException {
// this is essentially the same test as done in the super class, but with the difference of a custom toXContent method that leaves
// out fields if the value is 0, this allow us to test successful parsing if fields are not available, e.g. on older versions
xContentTester(this::createParser, this::createTestInstance, TransformIndexerStatsTests::toXContentIfNotZero, this::doParseInstance)
.numberOfTestRuns(NUMBER_OF_TEST_RUNS)
.supportsUnknownFields(supportsUnknownFields())
.shuffleFieldsExceptions(getShuffleFieldsExceptions())
.randomFieldsExcludeFilter(getRandomFieldsExcludeFilter())
.assertEqualsConsumer(this::assertEqualInstances)
.assertToXContentEquivalence(assertToXContentEquivalence())
.test();
}
public static void toXContentIfNotZero(TransformIndexerStats stats, XContentBuilder builder) throws IOException {
// a toXContent version which leaves out field with value 0
builder.startObject();
xContentFieldIfNotZero(builder, TransformIndexerStats.NUM_PAGES.getPreferredName(), stats.getNumPages());
xContentFieldIfNotZero(builder, TransformIndexerStats.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
xContentFieldIfNotZero(builder, TransformIndexerStats.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
xContentFieldIfNotZero(builder, TransformIndexerStats.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
xContentFieldIfNotZero(builder, TransformIndexerStats.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
xContentFieldIfNotZero(builder, TransformIndexerStats.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
xContentFieldIfNotZero(builder, TransformIndexerStats.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
xContentFieldIfNotZero(builder, TransformIndexerStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
xContentFieldIfNotZero(builder, TransformIndexerStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
xContentFieldIfNotZero(builder, TransformIndexerStats.PROCESSING_TIME_IN_MS.getPreferredName(), stats.getProcessingTime());
xContentFieldIfNotZero(builder, TransformIndexerStats.PROCESSING_TOTAL.getPreferredName(), stats.getProcessingTotal());
xContentFieldIfNotZero(builder, TransformIndexerStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
xContentFieldIfNotZero(
builder,
TransformIndexerStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
stats.getExpAvgCheckpointDurationMs()
);
xContentFieldIfNotZero(
builder,
TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(),
stats.getExpAvgDocumentsIndexed()
);
xContentFieldIfNotZero(
builder,
TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(),
stats.getExpAvgDocumentsProcessed()
);
builder.endObject();
}
private static XContentBuilder xContentFieldIfNotZero(XContentBuilder builder, String name, long value) throws IOException {
if (value > 0) {
builder.field(name, value);
}
return builder;
}
private static XContentBuilder xContentFieldIfNotZero(XContentBuilder builder, String name, double value) throws IOException {
if (value > 0.0) {
builder.field(name, value);
}
return builder;
}
}

View File

@ -69,7 +69,7 @@ public class TransformStatsTests extends AbstractSerializingTestCase<TransformSt
STARTED,
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
new TransformIndexerStats(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0.0, 0.0, 0.0),
new TransformIndexerStats(1, 2, 3, 4, 5, 6, 0, 8, 9, 0, 11, 12, 0.0, 0.0, 0.0),
new TransformCheckpointingInfo(
new TransformCheckpointStats(0, null, null, 10, 100),
new TransformCheckpointStats(0, null, null, 100, 1000),
@ -89,4 +89,32 @@ public class TransformStatsTests extends AbstractSerializingTestCase<TransformSt
}
}
}
public void testBwcWith76() throws IOException {
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
TransformStats stats = new TransformStats(
"bwc-id",
STARTED,
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
new TransformIndexerStats(1, 2, 3, 4, 5, 6, 0, 8, 9, 0, 11, 12, 13.0, 14.0, 15.0),
new TransformCheckpointingInfo(
new TransformCheckpointStats(0, null, null, 10, 100),
new TransformCheckpointStats(0, null, null, 100, 1000),
// changesLastDetectedAt aren't serialized back
100,
null
)
);
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_6_0);
stats.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
in.setVersion(Version.V_7_6_0);
TransformStats statsFromOld = new TransformStats(in);
assertThat(statsFromOld, equalTo(stats));
}
}
}
}
}

View File

@ -67,7 +67,7 @@ import static org.mockito.Mockito.when;
public class IndexerUtilsTests extends AggregatorTestCase {
public void testMissingFields() throws IOException {
String indexName = randomAlphaOfLengthBetween(1, 10);
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
String timestampField = "the_histo";
String valueField = "the_avg";
@ -131,7 +131,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
public void testCorrectFields() throws IOException {
String indexName = randomAlphaOfLengthBetween(1, 10);
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
String timestampField = "the_histo";
String valueField = "the_avg";
@ -199,7 +199,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
public void testNumericTerms() throws IOException {
String indexName = randomAlphaOfLengthBetween(1, 10);
RollupIndexerJobStats stats= new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
RollupIndexerJobStats stats= new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
String valueField = "the_avg";
@ -255,7 +255,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
public void testEmptyCounts() throws IOException {
String indexName = randomAlphaOfLengthBetween(1, 10);
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
String timestampField = "ts";
String valueField = "the_avg";
@ -491,7 +491,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
public void testMissingBuckets() throws IOException {
String indexName = randomAlphaOfLengthBetween(1, 10);
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
String metricField = "metric_field";
String valueField = "value_field";
@ -564,7 +564,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
public void testTimezone() throws IOException {
String indexName = randomAlphaOfLengthBetween(1, 10);
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
String timestampField = "the_histo";
String valueField = "the_avg";

View File

@ -75,6 +75,8 @@ setup:
index_total: 0
search_time_in_ms: 0
search_total: 0
processing_time_in_ms: 0
processing_total: 0
status:
job_state: "stopped"
upgraded_doc_id: true
@ -128,6 +130,8 @@ setup:
index_total: 0
search_time_in_ms: 0
search_total: 0
processing_time_in_ms: 0
processing_total: 0
status:
job_state: "stopped"
upgraded_doc_id: true
@ -181,6 +185,8 @@ setup:
index_total: 0
search_time_in_ms: 0
search_total: 0
processing_time_in_ms: 0
processing_total: 0
status:
job_state: "stopped"
upgraded_doc_id: true

View File

@ -76,6 +76,8 @@ setup:
index_total: 0
search_time_in_ms: 0
search_total: 0
processing_time_in_ms: 0
processing_total: 0
status:
job_state: "stopped"
upgraded_doc_id: true
@ -219,6 +221,8 @@ setup:
index_total: 0
search_time_in_ms: 0
search_total: 0
processing_time_in_ms: 0
processing_total: 0
status:
job_state: "stopped"
upgraded_doc_id: true

View File

@ -76,6 +76,8 @@ setup:
index_total: 0
search_time_in_ms: 0
search_total: 0
processing_time_in_ms: 0
processing_total: 0
status:
job_state: "stopped"
upgraded_doc_id: true

View File

@ -100,11 +100,6 @@ public class TransformUsageIT extends TransformRestTestCase {
assertEquals(2, XContentMapValues.extractValue("transform.transforms.stopped", statsMap));
assertEquals(1, XContentMapValues.extractValue("transform.transforms.started", statsMap));
for (String statName : PROVIDED_STATS) {
if (statName.equals(TransformIndexerStats.INDEX_TIME_IN_MS.getPreferredName())
|| statName.equals(TransformIndexerStats.SEARCH_TIME_IN_MS.getPreferredName())) {
continue;
}
// the trigger count can be higher if the scheduler kicked before usage has been called, therefore check for gte
if (statName.equals(TransformIndexerStats.NUM_INVOCATIONS.getPreferredName())) {
assertThat(

View File

@ -64,8 +64,10 @@ public class TransformFeatureSet implements XPackFeatureSet {
TransformIndexerStats.NUM_INVOCATIONS.getPreferredName(),
TransformIndexerStats.INDEX_TIME_IN_MS.getPreferredName(),
TransformIndexerStats.SEARCH_TIME_IN_MS.getPreferredName(),
TransformIndexerStats.PROCESSING_TIME_IN_MS.getPreferredName(),
TransformIndexerStats.INDEX_TOTAL.getPreferredName(),
TransformIndexerStats.SEARCH_TOTAL.getPreferredName(),
TransformIndexerStats.PROCESSING_TOTAL.getPreferredName(),
TransformIndexerStats.INDEX_FAILURES.getPreferredName(),
TransformIndexerStats.SEARCH_FAILURES.getPreferredName(),
TransformIndexerStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
@ -187,13 +189,15 @@ public class TransformFeatureSet implements XPackFeatureSet {
statisticsList.get(3).longValue(), // numInvocations
statisticsList.get(4).longValue(), // indexTime
statisticsList.get(5).longValue(), // searchTime
statisticsList.get(6).longValue(), // indexTotal
statisticsList.get(7).longValue(), // searchTotal
statisticsList.get(8).longValue(), // indexFailures
statisticsList.get(9).longValue(), // searchFailures
statisticsList.get(10), // exponential_avg_checkpoint_duration_ms
statisticsList.get(11), // exponential_avg_documents_indexed
statisticsList.get(12) // exponential_avg_documents_processed
statisticsList.get(6).longValue(), // processingTime
statisticsList.get(7).longValue(), // indexTotal
statisticsList.get(8).longValue(), // searchTotal
statisticsList.get(9).longValue(), // processingTotal
statisticsList.get(10).longValue(), // indexFailures
statisticsList.get(11).longValue(), // searchFailures
statisticsList.get(12), // exponential_avg_checkpoint_duration_ms
statisticsList.get(13), // exponential_avg_documents_indexed
statisticsList.get(14) // exponential_avg_documents_processed
);
}

View File

@ -59,6 +59,7 @@ public final class TransformInternalIndex {
* version 3 (7.5): rename to .transform-internal-xxx
* version 4 (7.6): state::should_stop_at_checkpoint
* checkpoint::checkpoint
* version 5 (7.7): stats::processing_time_in_ms, stats::processing_total
*/
// constants for mappings
@ -241,10 +242,16 @@ public final class TransformInternalIndex {
.startObject(TransformIndexerStats.SEARCH_TIME_IN_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(TransformIndexerStats.PROCESSING_TIME_IN_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(TransformIndexerStats.INDEX_TOTAL.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(TransformIndexerStats.SEARCH_TOTAL.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(TransformIndexerStats.PROCESSING_TOTAL.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(TransformIndexerStats.SEARCH_FAILURES.getPreferredName())

View File

@ -132,13 +132,15 @@ public class TransformFeatureSetTests extends ESTestCase {
4, // numInvocations
5, // indexTime
6, // searchTime
7, // indexTotal
8, // searchTotal
9, // indexFailures
10, // searchFailures
11.0, // exponential_avg_checkpoint_duration_ms
12.0, // exponential_avg_documents_indexed
13.0 // exponential_avg_documents_processed
7, // processingTime
8, // indexTotal
9, // searchTotal
10, // processingTotal
11, // indexFailures
12, // searchFailures
13.0, // exponential_avg_checkpoint_duration_ms
14.0, // exponential_avg_documents_indexed
15.0 // exponential_avg_documents_processed
);
int currentStat = 1;

View File

@ -280,6 +280,6 @@ setup:
- do:
indices.get_mapping:
index: .transform-internal-004
- match: { \.transform-internal-004.mappings.dynamic: "false" }
- match: { \.transform-internal-004.mappings.properties.id.type: "keyword" }
index: .transform-internal-005
- match: { \.transform-internal-005.mappings.dynamic: "false" }
- match: { \.transform-internal-005.mappings.properties.id.type: "keyword" }