From 044a4e127a20d7c1337685cc0d76e2c0634c77f5 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 24 Feb 2020 15:21:51 +0000 Subject: [PATCH] [ML] Add reason to DataFrameAnalyticsTask setFailed log message (#52659) (#52707) --- .../xpack/core/ml/job/messages/Messages.java | 3 ++- .../dataframe/DataFrameAnalyticsManager.java | 18 +++++++++--------- .../ml/dataframe/DataFrameAnalyticsTask.java | 13 ++++++++----- .../process/AnalyticsProcessManager.java | 8 +++----- .../process/AnalyticsProcessManagerTests.java | 3 +-- 5 files changed, 23 insertions(+), 22 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 80d5c4d02a2..2acdcf3c3b0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -60,7 +60,8 @@ public final class Messages { public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED = "Started analytics"; public static final String DATA_FRAME_ANALYTICS_AUDIT_STOPPED = "Stopped analytics"; public static final String DATA_FRAME_ANALYTICS_AUDIT_DELETED = "Deleted analytics"; - public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE = "Successfully updated analytics task state to [{0}]"; + public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE_WITH_REASON = + "Updated analytics task state to [{0}] with reason [{1}]"; public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]"; public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]"; public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 8b150f4fbba..65c8e2066db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -93,12 +93,12 @@ public class DataFrameAnalyticsManager { executeJobInMiddleOfReindexing(task, config); break; default: - task.updateState(DataFrameAnalyticsState.FAILED, "Cannot execute analytics task [" + config.getId() + + task.setFailed("Cannot execute analytics task [" + config.getId() + "] as it is in unknown state [" + currentState + "]. Must be one of [STARTED, REINDEXING, ANALYZING]"); } }, - error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) + error -> task.setFailed(error.getMessage()) ); // Retrieve configuration @@ -122,13 +122,13 @@ public class DataFrameAnalyticsManager { case FIRST_TIME: task.updatePersistentTaskState(reindexingState, ActionListener.wrap( updatedTask -> reindexDataframeAndStartAnalysis(task, config), - error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) + error -> task.setFailed(error.getMessage()) )); break; case RESUMING_REINDEXING: task.updatePersistentTaskState(reindexingState, ActionListener.wrap( updatedTask -> executeJobInMiddleOfReindexing(task, config), - error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) + error -> task.setFailed(error.getMessage()) )); break; case RESUMING_ANALYZING: @@ -136,7 +136,7 @@ public class DataFrameAnalyticsManager { break; case FINISHED: default: - task.updateState(DataFrameAnalyticsState.FAILED, "Unexpected starting state [" + startingState + "]"); + task.setFailed("Unexpected starting state [" + startingState + "]"); } } @@ -151,7 +151,7 @@ public class DataFrameAnalyticsManager { if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) { reindexDataframeAndStartAnalysis(task, config); } else { - task.updateState(DataFrameAnalyticsState.FAILED, e.getMessage()); + task.setFailed(e.getMessage()); } } )); @@ -178,7 +178,7 @@ public class DataFrameAnalyticsManager { Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex())); startAnalytics(task, config); }, - error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) + error -> task.setFailed(error.getMessage()) ); // Reindex @@ -244,12 +244,12 @@ public class DataFrameAnalyticsManager { if (ExceptionsHelper.unwrapCause(error) instanceof ResourceNotFoundException) { // Task has stopped } else { - task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()); + task.setFailed(error.getMessage()); } } )); }, - error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) + error -> task.setFailed(error.getMessage()) ); ActionListener refreshListener = ActionListener.wrap( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java index 585987dc60d..06653d1eb84 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java @@ -177,17 +177,20 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S } } - public void updateState(DataFrameAnalyticsState state, @Nullable String reason) { - DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, getAllocationId(), reason); + public void setFailed(String reason) { + DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED, + getAllocationId(), reason); updatePersistentTaskState( newTaskState, ActionListener.wrap( updatedTask -> { - auditor.info(getParams().getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE, state)); - LOGGER.info("[{}] Successfully update task state to [{}]", getParams().getId(), state); + String message = Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE_WITH_REASON, + DataFrameAnalyticsState.FAILED, reason); + auditor.info(getParams().getId(), message); + LOGGER.info("[{}] {}", getParams().getId(), message); }, e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}] with reason [{}]", - getParams().getId(), state, reason), e) + getParams().getId(), DataFrameAnalyticsState.FAILED, reason), e) ) ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index c1c8d95bd71..da65cd768e5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -23,7 +23,6 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; -import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.analyses.DataFrameAnalysis; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -110,8 +109,7 @@ public class AnalyticsProcessManager { return; } if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) { - task.updateState( - DataFrameAnalyticsState.FAILED, "[" + config.getId() + "] Could not create process as one already exists"); + task.setFailed("[" + config.getId() + "] Could not create process as one already exists"); return; } } @@ -193,7 +191,7 @@ public class AnalyticsProcessManager { task.markAsCompleted(); } else { LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason()); - task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason()); + task.setFailed(processContext.getFailureReason()); // Note: We are not marking the task as failed here as we want the user to be able to inspect the failure reason. } } @@ -265,7 +263,7 @@ public class AnalyticsProcessManager { process.restoreState(state); } catch (Exception e) { LOGGER.error(new ParameterizedMessage("[{}] Failed to restore state", process.getConfig().jobId()), e); - task.updateState(DataFrameAnalyticsState.FAILED, "Failed to restore state: " + e.getMessage()); + task.setFailed("Failed to restore state: " + e.getMessage()); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java index 185042ba5c1..8b1f4ea5bd3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests; -import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetectionTests; import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor; @@ -131,7 +130,7 @@ public class AnalyticsProcessManagerTests extends ESTestCase { inOrder.verify(task).getStatsHolder(); inOrder.verify(task).isStopping(); inOrder.verify(task).getAllocationId(); - inOrder.verify(task).updateState(DataFrameAnalyticsState.FAILED, "[config-id] Could not create process as one already exists"); + inOrder.verify(task).setFailed("[config-id] Could not create process as one already exists"); verifyNoMoreInteractions(task); }