[7.x][Transform] provide exponential_avg* stats for batch transforms (#52041) (#52323)

provide exponential_avg* stats for batch transforms, avoids confusion
why those values are all 0 otherwise
This commit is contained in:
Hendrik Muhs 2020-02-14 07:48:23 +01:00 committed by GitHub
parent a66988281f
commit efd7542b2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 222 additions and 120 deletions

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.transform.transforms;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
@ -36,12 +37,9 @@ public class TransformIndexerStats extends IndexerJobStats {
public static ParseField SEARCH_TOTAL = new ParseField("search_total");
public static ParseField SEARCH_FAILURES = new ParseField("search_failures");
public static ParseField INDEX_FAILURES = new ParseField("index_failures");
public static ParseField EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS =
new ParseField("exponential_avg_checkpoint_duration_ms");
public static ParseField EXPONENTIAL_AVG_DOCUMENTS_INDEXED =
new ParseField("exponential_avg_documents_indexed");
public static ParseField EXPONENTIAL_AVG_DOCUMENTS_PROCESSED =
new ParseField("exponential_avg_documents_processed");
public static ParseField EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS = new ParseField("exponential_avg_checkpoint_duration_ms");
public static ParseField EXPONENTIAL_AVG_DOCUMENTS_INDEXED = new ParseField("exponential_avg_documents_indexed");
public static ParseField EXPONENTIAL_AVG_DOCUMENTS_PROCESSED = new ParseField("exponential_avg_documents_processed");
// This changes how much "weight" past calculations have.
// The shorter the window, the less "smoothing" will occur.
@ -49,10 +47,24 @@ public class TransformIndexerStats extends IndexerJobStats {
private static final double ALPHA = 2.0 / (EXP_AVG_WINDOW + 1);
private static final ConstructingObjectParser<TransformIndexerStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
NAME, true,
NAME,
true,
args -> new TransformIndexerStats(
(long) args[0], (long) args[1], (long) args[2], (long) args[3], (long) args[4], (long) args[5], (long) args[6],
(long) args[7], (long) args[8], (long) args[9], (Double) args[10], (Double) args[11], (Double) args[12]));
(long) args[0],
(long) args[1],
(long) args[2],
(long) args[3],
(long) args[4],
(long) args[5],
(long) args[6],
(long) args[7],
(long) args[8],
(long) args[9],
(Double) args[10],
(Double) args[11],
(Double) args[12]
)
);
static {
LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES);
@ -73,6 +85,7 @@ public class TransformIndexerStats extends IndexerJobStats {
private double expAvgCheckpointDurationMs;
private double expAvgDocumentsIndexed;
private double expAvgDocumentsProcessed;
/**
* Create with all stats set to zero
*/
@ -80,30 +93,54 @@ public class TransformIndexerStats extends IndexerJobStats {
super();
}
public TransformIndexerStats(long numPages, long numInputDocuments, long numOutputDocuments,
long numInvocations, long indexTime, long searchTime, long indexTotal, long searchTotal,
long indexFailures, long searchFailures, Double expAvgCheckpointDurationMs,
Double expAvgDocumentsIndexed, Double expAvgDocumentsProcessed ) {
super(numPages, numInputDocuments, numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal,
indexFailures, searchFailures);
public TransformIndexerStats(
long numPages,
long numInputDocuments,
long numOutputDocuments,
long numInvocations,
long indexTime,
long searchTime,
long indexTotal,
long searchTotal,
long indexFailures,
long searchFailures,
Double expAvgCheckpointDurationMs,
Double expAvgDocumentsIndexed,
Double expAvgDocumentsProcessed
) {
super(
numPages,
numInputDocuments,
numOutputDocuments,
numInvocations,
indexTime,
searchTime,
indexTotal,
searchTotal,
indexFailures,
searchFailures
);
this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs == null ? 0.0 : expAvgCheckpointDurationMs;
this.expAvgDocumentsIndexed = expAvgDocumentsIndexed == null ? 0.0 : expAvgDocumentsIndexed;
this.expAvgDocumentsProcessed = expAvgDocumentsProcessed == null ? 0.0 : expAvgDocumentsProcessed;
}
public TransformIndexerStats(long numPages, long numInputDocuments, long numOutputDocuments,
long numInvocations, long indexTime, long searchTime, long indexTotal, long searchTotal,
long indexFailures, long searchFailures) {
this(numPages, numInputDocuments, numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal,
indexFailures, searchFailures, 0.0, 0.0, 0.0);
}
public TransformIndexerStats(TransformIndexerStats other) {
this(other.numPages, other.numInputDocuments, other.numOuputDocuments, other.numInvocations,
other.indexTime, other.searchTime, other.indexTotal, other.searchTotal, other.indexFailures, other.searchFailures);
this.expAvgCheckpointDurationMs = other.expAvgCheckpointDurationMs;
this.expAvgDocumentsIndexed = other.expAvgDocumentsIndexed;
this.expAvgDocumentsProcessed = other.expAvgDocumentsProcessed;
this(
other.numPages,
other.numInputDocuments,
other.numOuputDocuments,
other.numInvocations,
other.indexTime,
other.searchTime,
other.indexTotal,
other.searchTotal,
other.indexFailures,
other.searchFailures,
other.expAvgCheckpointDurationMs,
other.expAvgDocumentsIndexed,
other.expAvgDocumentsProcessed
);
}
public TransformIndexerStats(StreamInput in) throws IOException {
@ -212,9 +249,26 @@ public class TransformIndexerStats extends IndexerJobStats {
@Override
public int hashCode() {
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal,
expAvgCheckpointDurationMs, expAvgDocumentsIndexed, expAvgDocumentsProcessed);
return Objects.hash(
numPages,
numInputDocuments,
numOuputDocuments,
numInvocations,
indexTime,
searchTime,
indexFailures,
searchFailures,
indexTotal,
searchTotal,
expAvgCheckpointDurationMs,
expAvgDocumentsIndexed,
expAvgDocumentsProcessed
);
}
@Override
public String toString() {
return Strings.toString(this);
}
public static TransformIndexerStats fromXContent(XContentParser parser) {

View File

@ -31,13 +31,21 @@ public class TransformIndexerStatsTests extends AbstractSerializingTestCase<Tran
}
public static TransformIndexerStats randomStats() {
return new TransformIndexerStats(randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
return new TransformIndexerStats(
randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomBoolean() ? randomDouble() : null,
randomBoolean() ? randomDouble() : null,
randomBoolean() ? randomDouble() : null);
randomBoolean() ? randomDouble() : null
);
}
public void testExpAvgIncrement() {

View File

@ -69,7 +69,7 @@ public class TransformStatsTests extends AbstractSerializingTestCase<TransformSt
STARTED,
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
new TransformIndexerStats(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
new TransformIndexerStats(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0.0, 0.0, 0.0),
new TransformCheckpointingInfo(
new TransformCheckpointStats(0, null, null, 10, 100),
new TransformCheckpointStats(0, null, null, 100, 1000),

View File

@ -247,7 +247,6 @@ teardown:
- gte: { transforms.0.stats.search_time_in_ms: 0 }
- lte: { transforms.0.stats.search_total: 1 }
- match: { transforms.0.stats.search_failures: 0 }
- match: { transforms.0.stats.exponential_avg_checkpoint_duration_ms: 0.0 }
- match: { transforms.0.stats.exponential_avg_documents_indexed: 0.0 }
- match: { transforms.0.stats.exponential_avg_documents_processed: 0.0 }

View File

@ -29,11 +29,9 @@ import static org.hamcrest.Matchers.oneOf;
public class TransformGetAndGetStatsIT extends TransformRestTestCase {
private static final String TEST_USER_NAME = "transform_user";
private static final String BASIC_AUTH_VALUE_TRANSFORM_USER =
basicAuthHeaderValue(TEST_USER_NAME, TEST_PASSWORD_SECURE_STRING);
private static final String BASIC_AUTH_VALUE_TRANSFORM_USER = basicAuthHeaderValue(TEST_USER_NAME, TEST_PASSWORD_SECURE_STRING);
private static final String TEST_ADMIN_USER_NAME = "transform_admin";
private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN =
basicAuthHeaderValue(TEST_ADMIN_USER_NAME, TEST_PASSWORD_SECURE_STRING);
private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN = basicAuthHeaderValue(TEST_ADMIN_USER_NAME, TEST_PASSWORD_SECURE_STRING);
private static boolean indicesCreated = false;
@ -138,7 +136,6 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase {
assertThat(XContentMapValues.extractValue("state", transformsStats.get(0)), oneOf("started", "indexing"));
assertEquals(1, XContentMapValues.extractValue("checkpointing.last.checkpoint", transformsStats.get(0)));
// check all the different ways to retrieve all transforms
getRequest = createRequestWithAuth("GET", getTransformEndpoint(), authHeader);
Map<String, Object> transforms = entityAsMap(client().performRequest(getRequest));
@ -168,8 +165,9 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase {
client().performRequest(new Request("POST", "_tasks/_cancel?actions=" + TransformField.TASK_NAME + "*"));
// Verify that the task is gone
Map<String, Object> tasks =
entityAsMap(client().performRequest(new Request("GET", "_tasks?actions="+TransformField.TASK_NAME+"*")));
Map<String, Object> tasks = entityAsMap(
client().performRequest(new Request("GET", "_tasks?actions=" + TransformField.TASK_NAME + "*"))
);
assertTrue(((Map<?, ?>) XContentMapValues.extractValue("nodes", tasks)).isEmpty());
createPivotReviewsTransform("pivot_stats_2", "pivot_reviews_stats_2", null);
@ -226,8 +224,12 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase {
String transformSrc = "reviews_cont_pivot_test";
createReviewsIndex(transformSrc);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, null);
String config = "{ \"dest\": {\"index\":\"" + transformDest + "\"},"
+ " \"source\": {\"index\":\"" + transformSrc + "\"},"
String config = "{ \"dest\": {\"index\":\""
+ transformDest
+ "\"},"
+ " \"source\": {\"index\":\""
+ transformSrc
+ "\"},"
+ " \"frequency\": \"1s\","
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"1s\"}},"
+ " \"pivot\": {"
@ -253,18 +255,26 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase {
Map<String, Object> stats = entityAsMap(client().performRequest(getRequest));
List<Map<String, Object>> transformsStats = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", stats);
assertEquals(1, transformsStats.size());
// No continuous checkpoints have been seen and thus all exponential averages should be 0.0
// No continuous checkpoints have been seen and thus all exponential averages should be equal to the batch stats
for (Map<String, Object> transformStats : transformsStats) {
transformStats = (Map<String, Object>) transformStats.get("stats");
assertThat("exponential_avg_checkpoint_duration_ms is not 0.0",
transformStats.get("exponential_avg_checkpoint_duration_ms"),
equalTo(0.0));
assertThat("exponential_avg_documents_indexed is not 0.0",
transformStats.get("exponential_avg_documents_indexed"),
equalTo(0.0));
assertThat("exponential_avg_documents_processed is not 0.0",
assertThat(transformStats.get("documents_processed"), equalTo(1000));
assertThat(transformStats.get("documents_indexed"), equalTo(27));
assertThat(
"exponential_avg_checkpoint_duration_ms is not 0.0",
(Double) transformStats.get("exponential_avg_checkpoint_duration_ms"),
greaterThan(0.0)
);
assertThat(
"exponential_avg_documents_indexed does not match documents_indexed",
(Double) transformStats.get("exponential_avg_documents_indexed"),
equalTo(((Integer) transformStats.get("documents_indexed")).doubleValue())
);
assertThat(
"exponential_avg_documents_processed does not match documents_processed",
transformStats.get("exponential_avg_documents_processed"),
equalTo(0.0));
equalTo(((Integer) transformStats.get("documents_processed")).doubleValue())
);
}
int numDocs = 10;
@ -300,19 +310,23 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase {
assertEquals(1, contStats.size());
for (Map<String, Object> transformStats : contStats) {
Map<String, Object> statsObj = (Map<String, Object>) transformStats.get("stats");
assertThat("exponential_avg_checkpoint_duration_ms is 0",
assertThat(
"exponential_avg_checkpoint_duration_ms is 0",
(Double) statsObj.get("exponential_avg_checkpoint_duration_ms"),
greaterThan(0.0));
assertThat("exponential_avg_documents_indexed is 0",
greaterThan(0.0)
);
assertThat(
"exponential_avg_documents_indexed is 0",
(Double) statsObj.get("exponential_avg_documents_indexed"),
greaterThan(0.0));
assertThat("exponential_avg_documents_processed is 0",
greaterThan(0.0)
);
assertThat(
"exponential_avg_documents_processed is 0",
(Double) statsObj.get("exponential_avg_documents_processed"),
greaterThan(0.0));
greaterThan(0.0)
);
Map<String, Object> checkpointing = (Map<String, Object>) transformStats.get("checkpointing");
assertThat("changes_last_detected_at is null",
checkpointing.get("changes_last_detected_at"),
is(notNullValue()));
assertThat("changes_last_detected_at is null", checkpointing.get("changes_last_detected_at"), is(notNullValue()));
}
}, 120, TimeUnit.SECONDS);
}

View File

@ -70,13 +70,21 @@ public class TransformUsageIT extends TransformRestTestCase {
Request getRequest = new Request("GET", getTransformEndpoint() + "test_usage/_stats");
Map<String, Object> stats = entityAsMap(client().performRequest(getRequest));
Map<String, Integer> expectedStats = new HashMap<>();
Map<String, Double> expectedStats = new HashMap<>();
for (String statName : PROVIDED_STATS) {
@SuppressWarnings("unchecked")
List<Integer> specificStatistic = ((List<Integer>) XContentMapValues.extractValue("transforms.stats." + statName, stats));
List<Object> specificStatistic = (List<Object>) (XContentMapValues.extractValue("transforms.stats." + statName, stats));
assertNotNull(specificStatistic);
Integer statistic = (specificStatistic).get(0);
expectedStats.put(statName, statistic);
expectedStats.put(statName, extractStatsAsDouble(specificStatistic.get(0)));
}
getRequest = new Request("GET", getTransformEndpoint() + "test_usage_continuous/_stats");
stats = entityAsMap(client().performRequest(getRequest));
for (String statName : PROVIDED_STATS) {
@SuppressWarnings("unchecked")
List<Object> specificStatistic = (List<Object>) (XContentMapValues.extractValue("transforms.stats." + statName, stats));
assertNotNull(specificStatistic);
expectedStats.compute(statName, (key, value) -> value + extractStatsAsDouble(specificStatistic.get(0)));
}
// Simply because we wait for continuous to reach checkpoint 1, does not mean that the statistics are written yet.
@ -96,8 +104,9 @@ public class TransformUsageIT extends TransformRestTestCase {
}
assertEquals(
"Incorrect stat " + statName,
expectedStats.get(statName) * 2,
XContentMapValues.extractValue("transform.stats." + statName, statsMap)
expectedStats.get(statName).doubleValue(),
extractStatsAsDouble(XContentMapValues.extractValue("transform.stats." + statName, statsMap)),
0.0001
);
}
// Refresh the index so that statistics are searchable
@ -112,4 +121,14 @@ public class TransformUsageIT extends TransformRestTestCase {
assertEquals(3, XContentMapValues.extractValue("transform.transforms._all", usageAsMap));
assertEquals(3, XContentMapValues.extractValue("transform.transforms.stopped", usageAsMap));
}
private double extractStatsAsDouble(Object statsObject) {
if (statsObject instanceof Integer) {
return ((Integer) statsObject).doubleValue();
} else if (statsObject instanceof Double) {
return (Double) statsObject;
}
fail("unexpected value type for stats");
return 0;
}
}

View File

@ -36,8 +36,8 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import java.util.ArrayList;
import java.util.Arrays;
@ -67,7 +67,10 @@ public class TransformFeatureSet implements XPackFeatureSet {
TransformIndexerStats.INDEX_TOTAL.getPreferredName(),
TransformIndexerStats.SEARCH_TOTAL.getPreferredName(),
TransformIndexerStats.INDEX_FAILURES.getPreferredName(),
TransformIndexerStats.SEARCH_FAILURES.getPreferredName(), };
TransformIndexerStats.SEARCH_FAILURES.getPreferredName(),
TransformIndexerStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(),
TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(), };
@Inject
public TransformFeatureSet(Settings settings, ClusterService clusterService, Client client, @Nullable XPackLicenseState licenseState) {
@ -166,29 +169,32 @@ public class TransformFeatureSet implements XPackFeatureSet {
}
static TransformIndexerStats parseSearchAggs(SearchResponse searchResponse) {
List<Long> statisticsList = new ArrayList<>(PROVIDED_STATS.length);
List<Double> statisticsList = new ArrayList<>(PROVIDED_STATS.length);
for (String statName : PROVIDED_STATS) {
Aggregation agg = searchResponse.getAggregations().get(statName);
if (agg instanceof NumericMetricsAggregation.SingleValue) {
statisticsList.add((long) ((NumericMetricsAggregation.SingleValue) agg).value());
statisticsList.add(((NumericMetricsAggregation.SingleValue) agg).value());
} else {
statisticsList.add(0L);
statisticsList.add(0.0);
}
}
return new TransformIndexerStats(
statisticsList.get(0), // numPages
statisticsList.get(1), // numInputDocuments
statisticsList.get(2), // numOutputDocuments
statisticsList.get(3), // numInvocations
statisticsList.get(4), // indexTime
statisticsList.get(5), // searchTime
statisticsList.get(6), // indexTotal
statisticsList.get(7), // searchTotal
statisticsList.get(8), // indexFailures
statisticsList.get(9)
); // searchFailures
statisticsList.get(0).longValue(), // numPages
statisticsList.get(1).longValue(), // numInputDocuments
statisticsList.get(2).longValue(), // numOutputDocuments
statisticsList.get(3).longValue(), // numInvocations
statisticsList.get(4).longValue(), // indexTime
statisticsList.get(5).longValue(), // searchTime
statisticsList.get(6).longValue(), // indexTotal
statisticsList.get(7).longValue(), // searchTotal
statisticsList.get(8).longValue(), // indexFailures
statisticsList.get(9).longValue(), // searchFailures
statisticsList.get(10), // exponential_avg_checkpoint_duration_ms
statisticsList.get(11), // exponential_avg_documents_indexed
statisticsList.get(12) // exponential_avg_documents_processed
);
}
static void getStatisticSummations(Client client, ActionListener<TransformIndexerStats> statsListener) {

View File

@ -359,9 +359,8 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
if (progress != null && progress.getPercentComplete() != null && progress.getPercentComplete() < 100.0) {
progress.incrementDocsProcessed(progress.getTotalDocs() - progress.getDocumentsProcessed());
}
// If the last checkpoint is now greater than 1, that means that we have just processed the first
// continuous checkpoint and should start recording the exponential averages
if (lastCheckpoint != null && lastCheckpoint.getCheckpoint() > 1) {
if (lastCheckpoint != null) {
long docsIndexed = 0;
long docsProcessed = 0;
// This should not happen as we simply create a new one when we reach continuous checkpoints

View File

@ -135,8 +135,11 @@ public class TransformFeatureSetTests extends ESTestCase {
7, // indexTotal
8, // searchTotal
9, // indexFailures
10
); // searchFailures
10, // searchFailures
11.0, // exponential_avg_checkpoint_duration_ms
12.0, // exponential_avg_documents_indexed
13.0 // exponential_avg_documents_processed
);
int currentStat = 1;
List<Aggregation> aggs = new ArrayList<>(PROVIDED_STATS.length);