From b756e1b9be92123528dc6e46a3ef7ee7743e9a08 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 23 Aug 2019 13:53:02 -0500 Subject: [PATCH] [ML][Transforms] adjusting when and what to audit (#45876) (#45916) * [ML][Transforms] adjusting when and what to audit * Update DataFrameTransformTask.java * removing unnecessary audit message --- .../transforms/DataFrameTransformTask.java | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index f2976dc25d4..7a4162ad6e5 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -306,6 +306,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S listener.onResponse(new StartDataFrameTransformTaskAction.Response(true)); }, exc -> { + auditor.warning(transform.getId(), + "Failed to persist to cluster state while marking task as started. Failure: " + exc.getMessage()); logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc); getIndexer().stop(); listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform [" @@ -412,7 +414,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S listener.onResponse(success); }, failure -> { - auditor.warning(transform.getId(), "Failed to persist to state to cluster state: " + failure.getMessage()); logger.error(new ParameterizedMessage("[{}] failed to update cluster state for data frame transform.", transform.getId()), failure); @@ -434,7 +435,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S // it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue. if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPING) { logger.info("[{}] attempt to fail transform with reason [{}] while it was stopping.", getTransformId(), reason); - auditor.info(getTransformId(), "Attempted to fail transform with reason [" + reason + "] while in STOPPING state."); listener.onResponse(null); return; } @@ -459,7 +459,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S persistStateToClusterState(newState, ActionListener.wrap( r -> listener.onResponse(null), e -> { - logger.error(new ParameterizedMessage("[{}] failed to set task state as failed to cluster state.", getTransformId()), + String msg = "Failed to persist to cluster state while marking task as failed with reason [" + reason + "]."; + auditor.warning(transform.getId(), + msg + " Failure: " + e.getMessage()); + logger.error(new ParameterizedMessage("[{}] {}", getTransformId(), msg), e); listener.onFailure(e); } @@ -945,12 +948,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S protected void onFailure(Exception exc) { // the failure handler must not throw an exception due to internal problems try { - // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous - // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one - if (exc.getMessage().equals(lastAuditedExceptionMessage) == false) { - auditor.warning(transformTask.getTransformId(), "Data frame transform encountered an exception: " + exc.getMessage()); - lastAuditedExceptionMessage = exc.getMessage(); - } handleFailure(exc); } catch (Exception e) { logger.error( @@ -1052,13 +1049,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S createCheckpointException -> { logger.warn(new ParameterizedMessage("[{}] failed to create checkpoint.", transformId), createCheckpointException); - listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException)); + listener.onFailure( + new RuntimeException("Failed to create checkpoint due to " + createCheckpointException.getMessage(), + createCheckpointException)); } )), getCheckPointException -> { logger.warn(new ParameterizedMessage("[{}] failed to retrieve checkpoint.", transformId), getCheckPointException); - listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException)); + listener.onFailure( + new RuntimeException("Failed to retrieve checkpoint due to " + getCheckPointException.getMessage(), + getCheckPointException)); } )); } @@ -1103,6 +1104,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S "task encountered irrecoverable failure: " + e.getMessage() : "task encountered more than " + transformTask.getNumFailureRetries() + " failures; latest failure: " + e.getMessage(); failIndexer(failureMessage); + } else { + // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous + // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one + if (e.getMessage().equals(lastAuditedExceptionMessage) == false) { + auditor.warning(transformTask.getTransformId(), + "Data frame transform encountered an exception: " + e.getMessage() + + " Will attempt again at next scheduled trigger."); + lastAuditedExceptionMessage = e.getMessage(); + } } }