From 2357afd91bf1051075985b15d71db7d13f136860 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 28 Mar 2017 17:00:13 +0200 Subject: [PATCH] [TEST] Use search api to retrieve data counts instead of via get job stats api. This to avoid to lose data counts when the job gets restarted on another node. The job stats api returns live data counts, which may not have been persisted to an index, so getting the data counts via search api will give us a better guarantee that when the job gets restarted the datacounts are there too. During job restart a get call is being done to get data counts in the order to initialize the job. Original commit: elastic/x-pack-elasticsearch@901952da8509eb4cb7660d4a2c71512961e6e3d0 --- .../integration/MlDistributedFailureIT.java | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index 323df5207d6..eae9158f021 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -5,10 +5,17 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction.Response.DatafeedStats; @@ -27,6 +34,7 @@ import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; +import java.io.IOException; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -117,7 +125,7 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(config.getId(), 0L); client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get(); assertBusy(() -> { - DataCounts dataCounts = getDataCounts(job.getId()); + DataCounts dataCounts = getDataCountsFromIndex(job.getId()); assertEquals(numDocs1, dataCounts.getProcessedRecordCount()); assertEquals(0L, dataCounts.getOutOfOrderTimeStampCount()); }); @@ -155,4 +163,25 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { }, 30, TimeUnit.SECONDS); } + // Get datacounts from index instead of via job stats api, + // because then data counts have been persisted to an index (happens each 10s (DataCountsReporter)), + // so when restarting job on another node the data counts + // are what we expect them to be: + private static DataCounts getDataCountsFromIndex(String jobId) { + SearchResponse searchResponse = client().prepareSearch() + .setTypes(DataCounts.TYPE.getPreferredName()) + .setQuery(QueryBuilders.idsQuery().addIds(jobId + "-data-counts")) + .get(); + if (searchResponse.getHits().getTotalHits() != 1) { + return new DataCounts(jobId); + } + + BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef(); + try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, source, XContentType.JSON)) { + return DataCounts.PARSER.apply(parser, null); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }