diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index bd159e6fc33..c10b69ed17d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -154,7 +154,12 @@ public abstract class AsyncTwoPhaseIndexer { onStart(now, ActionListener.wrap(r -> { - nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure)); + assert r != null; + if (r) { + nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure)); + } else { + finishAndSetState(); + } }, e -> { finishAndSetState(); onFailure(e); @@ -200,9 +205,11 @@ public abstract class AsyncTwoPhaseIndexer listener); + protected abstract void onStart(long now, ActionListener listener); /** * Executes the {@link SearchRequest} and calls nextPhase with the diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 883ac7c0248..423ae1297a5 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -88,10 +88,10 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { } @Override - protected void onStart(long now, ActionListener listener) { + protected void onStart(long now, ActionListener listener) { assertThat(step, equalTo(0)); ++step; - listener.onResponse(null); + listener.onResponse(true); } @Override @@ -186,9 +186,9 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { } @Override - protected void onStart(long now, ActionListener listener) { + protected void onStart(long now, ActionListener listener) { started = true; - listener.onResponse(null); + listener.onResponse(true); } @Override @@ -270,10 +270,10 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { } @Override - protected void onStart(long now, ActionListener listener) { + protected void onStart(long now, ActionListener listener) { assertThat(step, equalTo(0)); ++step; - listener.onResponse(null); + listener.onResponse(true); } @Override diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index 861855abbad..6076c6dd15b 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -147,7 +147,7 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer listener); @Override - protected void onStart(long now, ActionListener listener) { + protected void onStart(long now, ActionListener listener) { try { pivot = new Pivot(getConfig().getPivotConfig()); @@ -157,7 +157,7 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer hasChangedListener); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 611c50b636f..974566a491a 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -12,7 +12,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -57,8 +56,6 @@ import org.elasticsearch.xpack.dataframe.transforms.pivot.AggregationResultUtils import java.time.Instant; import java.util.Arrays; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -324,15 +321,27 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S return; } - logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), getIndexer().getState()); + if (taskState.get() == DataFrameTransformTaskState.FAILED) { + logger.debug("Schedule was triggered for transform [{}] but task is failed. Ignoring trigger.", getTransformId()); + return; + } + + // ignore trigger if indexer is running or completely stopped + IndexerState indexerState = getIndexer().getState(); + if (IndexerState.INDEXING.equals(indexerState) || + IndexerState.STOPPING.equals(indexerState) || + IndexerState.STOPPED.equals(indexerState)) { + logger.debug("Indexer for transform [{}] has state [{}], ignoring trigger", getTransformId(), indexerState); + return; + } + + logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), indexerState); // if it runs for the 1st time we just do it, if not we check for changes - if (currentCheckpoint.get() == 0 ) { + if (currentCheckpoint.get() == 0) { logger.debug("Trigger initial run"); getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis()); - } else if (getIndexer().isContinuous() && getIndexer().sourceHasChanged()) { - changesLastDetectedAt = Instant.now(); - logger.debug("Source has changed, triggering new indexer run"); + } else if (getIndexer().isContinuous()) { getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis()); } } @@ -620,7 +629,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } @Override - protected void onStart(long now, ActionListener listener) { + protected void onStart(long now, ActionListener listener) { // On each run, we need to get the total number of docs and reset the count of processed docs // Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather // the progress here, and not in the executor. @@ -657,30 +666,55 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S ); // If we are continuous, we will want to verify we have the latest stored configuration - if (isContinuous()) { - transformsConfigManager.getTransformConfiguration(getJobId(), ActionListener.wrap( - config -> { - transformConfig = config; - logger.debug("[" + getJobId() + "] successfully refreshed data frame transform config from index."); + ActionListener changedSourceListener = ActionListener.wrap( + r -> { + if (isContinuous()) { + transformsConfigManager.getTransformConfiguration(getJobId(), ActionListener.wrap( + config -> { + transformConfig = config; + logger.debug("[{}] successfully refreshed data frame transform config from index.", transformId); + updateConfigListener.onResponse(null); + }, + failure -> { + String msg = DataFrameMessages.getMessage( + DataFrameMessages.FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION, + getJobId()); + logger.error(msg, failure); + // If the transform config index or the transform config is gone, something serious occurred + // We are in an unknown state and should fail out + if (failure instanceof ResourceNotFoundException) { + updateConfigListener.onFailure(new TransformConfigReloadingException(msg, failure)); + } else { + auditor.warning(getJobId(), msg); + updateConfigListener.onResponse(null); + } + } + )); + } else { updateConfigListener.onResponse(null); - }, - failure -> { - String msg = DataFrameMessages.getMessage( - DataFrameMessages.FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION, - getJobId()); - logger.error(msg, failure); - // If the transform config index or the transform config is gone, something serious occurred - // We are in an unknown state and should fail out - if (failure instanceof ResourceNotFoundException) { - updateConfigListener.onFailure(new TransformConfigReloadingException(msg, failure)); - } else { - auditor.warning(getJobId(), msg); - updateConfigListener.onResponse(null); - } } + }, + listener::onFailure + ); + + // If we are not on the initial batch checkpoint and its the first pass of whatever continuous checkpoint we are on, + // we should verify if there are local changes based on the sync config. If not, do not proceed further and exit. + if (transformTask.currentCheckpoint.get() > 0 && initialRun()) { + sourceHasChanged(ActionListener.wrap( + hasChanged -> { + if (hasChanged) { + transformTask.changesLastDetectedAt = Instant.now(); + logger.debug("[{}] source has changed, triggering new indexer run.", transformId); + changedSourceListener.onResponse(null); + } else { + // No changes, stop executing + listener.onResponse(false); + } + }, + listener::onFailure )); } else { - updateConfigListener.onResponse(null); + changedSourceListener.onResponse(null); } } @@ -930,41 +964,22 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } @Override - public boolean sourceHasChanged() { - if (getState() == IndexerState.INDEXING) { - logger.trace("Indexer is still running, ignore"); - return false; - } - - CountDownLatch latch = new CountDownLatch(1); - SetOnce changed = new SetOnce<>(); - + protected void sourceHasChanged(ActionListener hasChangedListener) { checkpointProvider.sourceHasChanged(getLastCheckpoint(), - new LatchedActionListener<>(ActionListener.wrap(changed::set, e -> { - changed.set(false); + ActionListener.wrap( + hasChanged -> { + logger.trace("[{}] change detected [{}]", transformId, hasChanged); + hasChangedListener.onResponse(hasChanged); + }, + e -> { logger.warn( - "Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check", - e); - + "Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check.", + e); auditor.warning(transformId, - "Failed to detect changes for data frame transform, skipping update till next check. Exception: " - + e.getMessage()); - }), latch)); - - try { - if (latch.await(5, TimeUnit.SECONDS)) { - logger.trace("Change detected:" + changed.get()); - return changed.get(); - } - } catch (InterruptedException e) { - logger.warn("Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check", e); - - auditor.warning(transformId, - "Failed to detect changes for data frame transform, skipping update till next check. Exception: " + "Failed to detect changes for data frame transform, skipping update till next check. Exception: " + e.getMessage()); - } - - return false; + hasChangedListener.onResponse(false); + })); } private boolean isIrrecoverableFailure(Exception e) { diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameSingleNodeTestCase.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameSingleNodeTestCase.java index b3c087983e8..2c2ad5ba0b3 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameSingleNodeTestCase.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameSingleNodeTestCase.java @@ -63,6 +63,7 @@ public abstract class DataFrameSingleNodeTestCase extends ESSingleNodeTestCase { } }, e -> { if (onException == null) { + logger.error("got unexpected exception", e); fail("got unexpected exception: " + e.getMessage()); } else { onException.accept(e); diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java index c049e34ec69..7de80b12a03 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java @@ -181,8 +181,8 @@ public class DataFrameIndexerTests extends ESTestCase { } @Override - protected boolean sourceHasChanged() { - return false; + protected void sourceHasChanged(ActionListener listener) { + listener.onResponse(false); } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index 83ec16e85ff..ddfa2bc1dd6 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -108,7 +108,7 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer listener) { + protected void onStart(long now, ActionListener listener) { try { // this is needed to exclude buckets that can still receive new documents DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram(); @@ -116,7 +116,7 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer