From 0c03707704097f9a225fa19f890340dfa003d43e Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 6 May 2019 09:39:58 +0200 Subject: [PATCH] [ML-DataFrame] reset/clear the position after indexer is done (#41736) reset/clear the position after indexer is done --- .../xpack/core/indexing/AsyncTwoPhaseIndexer.java | 1 + .../dataframe/integration/DataFrameGetAndGetStatsIT.java | 8 ++++++++ .../xpack/dataframe/transforms/DataFrameIndexer.java | 8 ++++++++ .../org/elasticsearch/xpack/rollup/job/RollupIndexer.java | 5 +++++ 4 files changed, 22 insertions(+) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 636a3978443..ec7e0de9e34 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -313,6 +313,7 @@ public abstract class AsyncTwoPhaseIndexer doSaveState(finishAndSetState(), position.get(), () -> {}), diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java index 62101e4e120..d9927cd09ed 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java @@ -103,6 +103,14 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase { stats = entityAsMap(client().performRequest(getRequest)); assertEquals(1, XContentMapValues.extractValue("count", stats)); + transformsStats = (List>)XContentMapValues.extractValue("transforms", stats); + assertEquals(1, transformsStats.size()); + Map state = (Map) XContentMapValues.extractValue("state", transformsStats.get(0)); + assertEquals(1, transformsStats.size()); + assertEquals("started", XContentMapValues.extractValue("task_state", state)); + assertEquals(null, XContentMapValues.extractValue("current_position", state)); + assertEquals(1, XContentMapValues.extractValue("checkpoint", state)); + // check all the different ways to retrieve all transforms getRequest = createRequestWithAuth("GET", DATAFRAME_ENDPOINT, authHeader); Map transforms = entityAsMap(client().performRequest(getRequest)); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index 142388cb10f..86c26af2994 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; @@ -31,6 +32,7 @@ import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.concurrent.Executor; @@ -127,6 +129,12 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer> doProcess(SearchResponse searchResponse) { final CompositeAggregation agg = searchResponse.getAggregations().get(COMPOSITE_AGGREGATION_NAME); + + // we reached the end + if (agg.getBuckets().isEmpty()) { + return new IterationResult<>(Collections.emptyList(), null, true); + } + long docsBeforeProcess = getStats().getNumDocuments(); IterationResult> result = new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()), agg.afterKey(), diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index daa888562e9..b60a37d3fa4 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -143,6 +143,11 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer> doProcess(SearchResponse searchResponse) { final CompositeAggregation response = searchResponse.getAggregations().get(AGGREGATION_NAME); + if (response.getBuckets().isEmpty()) { + // do not reset the position as we want to continue from where we stopped + return new IterationResult<>(Collections.emptyList(), getPosition(), true); + } + return new IterationResult<>( IndexerUtils.processBuckets(response, job.getConfig().getRollupIndex(), getStats(), job.getConfig().getGroupConfig(), job.getConfig().getId(), upgradedDocumentID.get()),