* [ML][Transforms] fixing stop on changes check bug * Adding new method finishAndCheckState to cover race conditions in early terminations * changing stopping conditions in `onStart` * allow indexer to finish when exiting early
This commit is contained in:
parent
2383acaa89
commit
53df54c703
|
@ -158,7 +158,9 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
|
||||||
if (r) {
|
if (r) {
|
||||||
nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
|
nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
|
||||||
} else {
|
} else {
|
||||||
finishAndSetState();
|
onFinish(ActionListener.wrap(
|
||||||
|
onFinishResponse -> doSaveState(finishAndSetState(), position.get(), () -> {}),
|
||||||
|
onFinishFailure -> doSaveState(finishAndSetState(), position.get(), () -> {})));
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
this::finishWithFailure));
|
this::finishWithFailure));
|
||||||
|
|
|
@ -6,7 +6,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.xpack.dataframe.integration;
|
package org.elasticsearch.xpack.dataframe.integration;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
|
||||||
import org.elasticsearch.client.Request;
|
import org.elasticsearch.client.Request;
|
||||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||||
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
|
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
|
||||||
|
@ -27,7 +26,6 @@ import static org.hamcrest.Matchers.greaterThan;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.hamcrest.Matchers.oneOf;
|
import static org.hamcrest.Matchers.oneOf;
|
||||||
|
|
||||||
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45610")
|
|
||||||
public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
|
public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
|
||||||
|
|
||||||
private static final String TEST_USER_NAME = "df_user";
|
private static final String TEST_USER_NAME = "df_user";
|
||||||
|
|
|
@ -640,6 +640,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
||||||
private final DataFrameTransformTask transformTask;
|
private final DataFrameTransformTask transformTask;
|
||||||
private final AtomicInteger failureCount;
|
private final AtomicInteger failureCount;
|
||||||
private volatile boolean auditBulkFailures = true;
|
private volatile boolean auditBulkFailures = true;
|
||||||
|
// Indicates that the source has changed for the current run
|
||||||
|
private volatile boolean hasSourceChanged = true;
|
||||||
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
|
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
|
||||||
private volatile String lastAuditedExceptionMessage = null;
|
private volatile String lastAuditedExceptionMessage = null;
|
||||||
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);
|
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);
|
||||||
|
@ -760,18 +762,26 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
||||||
if (transformTask.currentCheckpoint.get() > 0 && initialRun()) {
|
if (transformTask.currentCheckpoint.get() > 0 && initialRun()) {
|
||||||
sourceHasChanged(ActionListener.wrap(
|
sourceHasChanged(ActionListener.wrap(
|
||||||
hasChanged -> {
|
hasChanged -> {
|
||||||
|
hasSourceChanged = hasChanged;
|
||||||
if (hasChanged) {
|
if (hasChanged) {
|
||||||
transformTask.changesLastDetectedAt = Instant.now();
|
transformTask.changesLastDetectedAt = Instant.now();
|
||||||
logger.debug("[{}] source has changed, triggering new indexer run.", transformId);
|
logger.debug("[{}] source has changed, triggering new indexer run.", transformId);
|
||||||
changedSourceListener.onResponse(null);
|
changedSourceListener.onResponse(null);
|
||||||
} else {
|
} else {
|
||||||
|
logger.trace("[{}] source has not changed, finish indexer early.", transformId);
|
||||||
// No changes, stop executing
|
// No changes, stop executing
|
||||||
listener.onResponse(false);
|
listener.onResponse(false);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
listener::onFailure
|
failure -> {
|
||||||
|
// If we failed determining if the source changed, it's safer to assume there were changes.
|
||||||
|
// We should allow the failure path to complete as normal
|
||||||
|
hasSourceChanged = true;
|
||||||
|
listener.onFailure(failure);
|
||||||
|
}
|
||||||
));
|
));
|
||||||
} else {
|
} else {
|
||||||
|
hasSourceChanged = true;
|
||||||
changedSourceListener.onResponse(null);
|
changedSourceListener.onResponse(null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -869,6 +879,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
||||||
next.run();
|
next.run();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// This means that the indexer was triggered to discover changes, found none, and exited early.
|
||||||
|
// If the state is `STOPPED` this means that DataFrameTransformTask#stop was called while we were checking for changes.
|
||||||
|
// Allow the stop call path to continue
|
||||||
|
if (hasSourceChanged == false && indexerState.equals(IndexerState.STOPPED) == false) {
|
||||||
|
next.run();
|
||||||
|
return;
|
||||||
|
}
|
||||||
// If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING`
|
// If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING`
|
||||||
// OR we called `doSaveState` manually as the indexer was not actively running.
|
// OR we called `doSaveState` manually as the indexer was not actively running.
|
||||||
// Since we save the state to an index, we should make sure that our task state is in parity with the indexer state
|
// Since we save the state to an index, we should make sure that our task state is in parity with the indexer state
|
||||||
|
@ -959,6 +976,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
||||||
@Override
|
@Override
|
||||||
protected void onFinish(ActionListener<Void> listener) {
|
protected void onFinish(ActionListener<Void> listener) {
|
||||||
try {
|
try {
|
||||||
|
// This indicates an early exit since no changes were found.
|
||||||
|
// So, don't treat this like a checkpoint being completed, as no work was done.
|
||||||
|
if (hasSourceChanged == false) {
|
||||||
|
listener.onResponse(null);
|
||||||
|
}
|
||||||
// TODO: needs cleanup super is called with a listener, but listener.onResponse is called below
|
// TODO: needs cleanup super is called with a listener, but listener.onResponse is called below
|
||||||
// super.onFinish() fortunately ignores the listener
|
// super.onFinish() fortunately ignores the listener
|
||||||
super.onFinish(listener);
|
super.onFinish(listener);
|
||||||
|
|
Loading…
Reference in New Issue