From 53df54c7036a34b54b6cdacce0e5dc473cd8a1db Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 3 Sep 2019 11:04:18 -0500 Subject: [PATCH] [ML][Transforms] fixing stop on changes check bug (#46162) (#46273) * [ML][Transforms] fixing stop on changes check bug * Adding new method finishAndCheckState to cover race conditions in early terminations * changing stopping conditions in `onStart` * allow indexer to finish when exiting early --- .../core/indexing/AsyncTwoPhaseIndexer.java | 4 +++- .../DataFrameGetAndGetStatsIT.java | 2 -- .../transforms/DataFrameTransformTask.java | 24 ++++++++++++++++++- 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index ac5bbb14494..52aa9304ce2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -158,7 +158,9 @@ public abstract class AsyncTwoPhaseIndexer doSaveState(finishAndSetState(), position.get(), () -> {}), + onFinishFailure -> doSaveState(finishAndSetState(), position.get(), () -> {}))); } }, this::finishWithFailure)); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java index 76bb90d300a..3a7809125c7 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.dataframe.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.xpack.core.dataframe.DataFrameField; @@ -27,7 +26,6 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.oneOf; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45610") public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase { private static final String TEST_USER_NAME = "df_user"; diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 7a4162ad6e5..0515640f2ba 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -640,6 +640,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private final DataFrameTransformTask transformTask; private final AtomicInteger failureCount; private volatile boolean auditBulkFailures = true; + // Indicates that the source has changed for the current run + private volatile boolean hasSourceChanged = true; // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index private volatile String lastAuditedExceptionMessage = null; private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false); @@ -760,18 +762,26 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S if (transformTask.currentCheckpoint.get() > 0 && initialRun()) { sourceHasChanged(ActionListener.wrap( hasChanged -> { + hasSourceChanged = hasChanged; if (hasChanged) { transformTask.changesLastDetectedAt = Instant.now(); logger.debug("[{}] source has changed, triggering new indexer run.", transformId); changedSourceListener.onResponse(null); } else { + logger.trace("[{}] source has not changed, finish indexer early.", transformId); // No changes, stop executing listener.onResponse(false); } }, - listener::onFailure + failure -> { + // If we failed determining if the source changed, it's safer to assume there were changes. + // We should allow the failure path to complete as normal + hasSourceChanged = true; + listener.onFailure(failure); + } )); } else { + hasSourceChanged = true; changedSourceListener.onResponse(null); } } @@ -869,6 +879,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S next.run(); return; } + // This means that the indexer was triggered to discover changes, found none, and exited early. + // If the state is `STOPPED` this means that DataFrameTransformTask#stop was called while we were checking for changes. + // Allow the stop call path to continue + if (hasSourceChanged == false && indexerState.equals(IndexerState.STOPPED) == false) { + next.run(); + return; + } // If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING` // OR we called `doSaveState` manually as the indexer was not actively running. // Since we save the state to an index, we should make sure that our task state is in parity with the indexer state @@ -959,6 +976,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void onFinish(ActionListener listener) { try { + // This indicates an early exit since no changes were found. + // So, don't treat this like a checkpoint being completed, as no work was done. + if (hasSourceChanged == false) { + listener.onResponse(null); + } // TODO: needs cleanup super is called with a listener, but listener.onResponse is called below // super.onFinish() fortunately ignores the listener super.onFinish(listener);