[ML-DataFrame] reset/clear the position after indexer is done (#41736)

reset/clear the position after indexer is done
This commit is contained in:
Hendrik Muhs 2019-05-06 09:39:58 +02:00
parent ee84038699
commit 0c03707704
4 changed files with 22 additions and 0 deletions

View File

@ -313,6 +313,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
if (iterationResult.isDone()) {
logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down.");
position.set(iterationResult.getPosition());
// execute finishing tasks
onFinish(ActionListener.wrap(
r -> doSaveState(finishAndSetState(), position.get(), () -> {}),

View File

@ -103,6 +103,14 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
stats = entityAsMap(client().performRequest(getRequest));
assertEquals(1, XContentMapValues.extractValue("count", stats));
transformsStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", stats);
assertEquals(1, transformsStats.size());
Map<String, Object> state = (Map<String, Object>) XContentMapValues.extractValue("state", transformsStats.get(0));
assertEquals(1, transformsStats.size());
assertEquals("started", XContentMapValues.extractValue("task_state", state));
assertEquals(null, XContentMapValues.extractValue("current_position", state));
assertEquals(1, XContentMapValues.extractValue("checkpoint", state));
// check all the different ways to retrieve all transforms
getRequest = createRequestWithAuth("GET", DATAFRAME_ENDPOINT, authHeader);
Map<String, Object> transforms = entityAsMap(client().performRequest(getRequest));

View File

@ -14,6 +14,7 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
@ -31,6 +32,7 @@ import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
@ -127,6 +129,12 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
@Override
protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchResponse) {
final CompositeAggregation agg = searchResponse.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
// we reached the end
if (agg.getBuckets().isEmpty()) {
return new IterationResult<>(Collections.emptyList(), null, true);
}
long docsBeforeProcess = getStats().getNumDocuments();
IterationResult<Map<String, Object>> result = new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()),
agg.afterKey(),

View File

@ -143,6 +143,11 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchResponse) {
final CompositeAggregation response = searchResponse.getAggregations().get(AGGREGATION_NAME);
if (response.getBuckets().isEmpty()) {
// do not reset the position as we want to continue from where we stopped
return new IterationResult<>(Collections.emptyList(), getPosition(), true);
}
return new IterationResult<>(
IndexerUtils.processBuckets(response, job.getConfig().getRollupIndex(), getStats(),
job.getConfig().getGroupConfig(), job.getConfig().getId(), upgradedDocumentID.get()),