[TEST] Use search api to retrieve data counts instead of via get job stats api.

This to avoid to lose data counts when the job gets restarted on another node.
The job stats api returns live data counts, which may not have been persisted to an index,
so getting the data counts via search api will give us a better guarantee that when
the job gets restarted the datacounts are there too. During job restart a get call is being
done to get data counts in the order to initialize the job.

Original commit: elastic/x-pack-elasticsearch@901952da85
This commit is contained in:
Martijn van Groningen 2017-03-28 17:00:13 +02:00
parent 42e3eb7ba5
commit 2357afd91b
1 changed files with 30 additions and 1 deletions

View File

@ -5,10 +5,17 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction.Response.DatafeedStats;
@ -27,6 +34,7 @@ import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@ -117,7 +125,7 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(config.getId(), 0L);
client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
DataCounts dataCounts = getDataCountsFromIndex(job.getId());
assertEquals(numDocs1, dataCounts.getProcessedRecordCount());
assertEquals(0L, dataCounts.getOutOfOrderTimeStampCount());
});
@ -155,4 +163,25 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
}, 30, TimeUnit.SECONDS);
}
// Get datacounts from index instead of via job stats api,
// because then data counts have been persisted to an index (happens each 10s (DataCountsReporter)),
// so when restarting job on another node the data counts
// are what we expect them to be:
private static DataCounts getDataCountsFromIndex(String jobId) {
SearchResponse searchResponse = client().prepareSearch()
.setTypes(DataCounts.TYPE.getPreferredName())
.setQuery(QueryBuilders.idsQuery().addIds(jobId + "-data-counts"))
.get();
if (searchResponse.getHits().getTotalHits() != 1) {
return new DataCounts(jobId);
}
BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef();
try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, source, XContentType.JSON)) {
return DataCounts.PARSER.apply(parser, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}