[ML][Data Frame] adjusting change detection workflow (#45511) (#45580)

* [ML][Data Frame] adjusting change detection workflow

* adjusting for PR comment

* disallowing null as an argument value
This commit is contained in:
Benjamin Trent 2019-08-14 17:26:24 -05:00 committed by GitHub
parent 647a8308c3
commit fde5dae387
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 99 additions and 76 deletions

View File

@ -154,7 +154,12 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
// fire off the search. Note this is async, the method will return from here
executor.execute(() -> {
onStart(now, ActionListener.wrap(r -> {
assert r != null;
if (r) {
nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
} else {
finishAndSetState();
}
}, e -> {
finishAndSetState();
onFailure(e);
@ -200,9 +205,11 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
* internal state is {@link IndexerState#STARTED}.
*
* @param now The current time in milliseconds passed through from {@link #maybeTriggerAsyncJob(long)}
* @param listener listener to call after done
* @param listener listener to call after done. The argument passed to the listener indicates if the indexer should continue or not
* true: continue execution as normal
* false: cease execution. This does NOT call onFinish
*/
protected abstract void onStart(long now, ActionListener<Void> listener);
protected abstract void onStart(long now, ActionListener<Boolean> listener);
/**
* Executes the {@link SearchRequest} and calls <code>nextPhase</code> with the

View File

@ -88,10 +88,10 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
}
@Override
protected void onStart(long now, ActionListener<Void> listener) {
protected void onStart(long now, ActionListener<Boolean> listener) {
assertThat(step, equalTo(0));
++step;
listener.onResponse(null);
listener.onResponse(true);
}
@Override
@ -186,9 +186,9 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
}
@Override
protected void onStart(long now, ActionListener<Void> listener) {
protected void onStart(long now, ActionListener<Boolean> listener) {
started = true;
listener.onResponse(null);
listener.onResponse(true);
}
@Override
@ -270,10 +270,10 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
}
@Override
protected void onStart(long now, ActionListener<Void> listener) {
protected void onStart(long now, ActionListener<Boolean> listener) {
assertThat(step, equalTo(0));
++step;
listener.onResponse(null);
listener.onResponse(true);
}
@Override

View File

@ -147,7 +147,7 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<DataFrameInd
protected abstract void createCheckpoint(ActionListener<DataFrameTransformCheckpoint> listener);
@Override
protected void onStart(long now, ActionListener<Void> listener) {
protected void onStart(long now, ActionListener<Boolean> listener) {
try {
pivot = new Pivot(getConfig().getPivotConfig());
@ -157,7 +157,7 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<DataFrameInd
}
runState = determineRunStateAtStart();
listener.onResponse(null);
listener.onResponse(true);
} catch (Exception e) {
listener.onFailure(e);
}
@ -525,5 +525,5 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<DataFrameInd
return null;
}
protected abstract boolean sourceHasChanged();
protected abstract void sourceHasChanged(ActionListener<Boolean> hasChangedListener);
}

View File

@ -12,7 +12,6 @@ import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
@ -57,8 +56,6 @@ import org.elasticsearch.xpack.dataframe.transforms.pivot.AggregationResultUtils
import java.time.Instant;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -324,15 +321,27 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return;
}
logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), getIndexer().getState());
if (taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.debug("Schedule was triggered for transform [{}] but task is failed. Ignoring trigger.", getTransformId());
return;
}
// ignore trigger if indexer is running or completely stopped
IndexerState indexerState = getIndexer().getState();
if (IndexerState.INDEXING.equals(indexerState) ||
IndexerState.STOPPING.equals(indexerState) ||
IndexerState.STOPPED.equals(indexerState)) {
logger.debug("Indexer for transform [{}] has state [{}], ignoring trigger", getTransformId(), indexerState);
return;
}
logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), indexerState);
// if it runs for the 1st time we just do it, if not we check for changes
if (currentCheckpoint.get() == 0 ) {
if (currentCheckpoint.get() == 0) {
logger.debug("Trigger initial run");
getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
} else if (getIndexer().isContinuous() && getIndexer().sourceHasChanged()) {
changesLastDetectedAt = Instant.now();
logger.debug("Source has changed, triggering new indexer run");
} else if (getIndexer().isContinuous()) {
getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
}
}
@ -620,7 +629,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
@Override
protected void onStart(long now, ActionListener<Void> listener) {
protected void onStart(long now, ActionListener<Boolean> listener) {
// On each run, we need to get the total number of docs and reset the count of processed docs
// Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather
// the progress here, and not in the executor.
@ -657,11 +666,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
);
// If we are continuous, we will want to verify we have the latest stored configuration
ActionListener<Void> changedSourceListener = ActionListener.wrap(
r -> {
if (isContinuous()) {
transformsConfigManager.getTransformConfiguration(getJobId(), ActionListener.wrap(
config -> {
transformConfig = config;
logger.debug("[" + getJobId() + "] successfully refreshed data frame transform config from index.");
logger.debug("[{}] successfully refreshed data frame transform config from index.", transformId);
updateConfigListener.onResponse(null);
},
failure -> {
@ -682,6 +693,29 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
} else {
updateConfigListener.onResponse(null);
}
},
listener::onFailure
);
// If we are not on the initial batch checkpoint and its the first pass of whatever continuous checkpoint we are on,
// we should verify if there are local changes based on the sync config. If not, do not proceed further and exit.
if (transformTask.currentCheckpoint.get() > 0 && initialRun()) {
sourceHasChanged(ActionListener.wrap(
hasChanged -> {
if (hasChanged) {
transformTask.changesLastDetectedAt = Instant.now();
logger.debug("[{}] source has changed, triggering new indexer run.", transformId);
changedSourceListener.onResponse(null);
} else {
// No changes, stop executing
listener.onResponse(false);
}
},
listener::onFailure
));
} else {
changedSourceListener.onResponse(null);
}
}
@Override
@ -930,41 +964,22 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
@Override
public boolean sourceHasChanged() {
if (getState() == IndexerState.INDEXING) {
logger.trace("Indexer is still running, ignore");
return false;
}
CountDownLatch latch = new CountDownLatch(1);
SetOnce<Boolean> changed = new SetOnce<>();
protected void sourceHasChanged(ActionListener<Boolean> hasChangedListener) {
checkpointProvider.sourceHasChanged(getLastCheckpoint(),
new LatchedActionListener<>(ActionListener.wrap(changed::set, e -> {
changed.set(false);
ActionListener.wrap(
hasChanged -> {
logger.trace("[{}] change detected [{}]", transformId, hasChanged);
hasChangedListener.onResponse(hasChanged);
},
e -> {
logger.warn(
"Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check",
"Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check.",
e);
auditor.warning(transformId,
"Failed to detect changes for data frame transform, skipping update till next check. Exception: "
+ e.getMessage());
}), latch));
try {
if (latch.await(5, TimeUnit.SECONDS)) {
logger.trace("Change detected:" + changed.get());
return changed.get();
}
} catch (InterruptedException e) {
logger.warn("Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check", e);
auditor.warning(transformId,
"Failed to detect changes for data frame transform, skipping update till next check. Exception: "
+ e.getMessage());
}
return false;
hasChangedListener.onResponse(false);
}));
}
private boolean isIrrecoverableFailure(Exception e) {

View File

@ -63,6 +63,7 @@ public abstract class DataFrameSingleNodeTestCase extends ESSingleNodeTestCase {
}
}, e -> {
if (onException == null) {
logger.error("got unexpected exception", e);
fail("got unexpected exception: " + e.getMessage());
} else {
onException.accept(e);

View File

@ -181,8 +181,8 @@ public class DataFrameIndexerTests extends ESTestCase {
}
@Override
protected boolean sourceHasChanged() {
return false;
protected void sourceHasChanged(ActionListener<Boolean> listener) {
listener.onResponse(false);
}
}

View File

@ -108,7 +108,7 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
}
@Override
protected void onStart(long now, ActionListener<Void> listener) {
protected void onStart(long now, ActionListener<Boolean> listener) {
try {
// this is needed to exclude buckets that can still receive new documents
DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram();
@ -116,7 +116,7 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
long delay = dateHisto.getDelay() != null ?
TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis() : 0;
maxBoundary = dateHisto.createRounding().round(now - delay);
listener.onResponse(null);
listener.onResponse(true);
} catch (Exception e) {
listener.onFailure(e);
}