[ML][Transforms] adjusting when and what to audit (#45876) (#45916)

* [ML][Transforms] adjusting when and what to audit

* Update DataFrameTransformTask.java

* removing unnecessary audit message
This commit is contained in:
Benjamin Trent 2019-08-23 13:53:02 -05:00 committed by GitHub
parent f3825767f4
commit b756e1b9be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 11 deletions

View File

@ -306,6 +306,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
},
exc -> {
auditor.warning(transform.getId(),
"Failed to persist to cluster state while marking task as started. Failure: " + exc.getMessage());
logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc);
getIndexer().stop();
listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform ["
@ -412,7 +414,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
listener.onResponse(success);
},
failure -> {
auditor.warning(transform.getId(), "Failed to persist to state to cluster state: " + failure.getMessage());
logger.error(new ParameterizedMessage("[{}] failed to update cluster state for data frame transform.",
transform.getId()),
failure);
@ -434,7 +435,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
// it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue.
if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPING) {
logger.info("[{}] attempt to fail transform with reason [{}] while it was stopping.", getTransformId(), reason);
auditor.info(getTransformId(), "Attempted to fail transform with reason [" + reason + "] while in STOPPING state.");
listener.onResponse(null);
return;
}
@ -459,7 +459,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
persistStateToClusterState(newState, ActionListener.wrap(
r -> listener.onResponse(null),
e -> {
logger.error(new ParameterizedMessage("[{}] failed to set task state as failed to cluster state.", getTransformId()),
String msg = "Failed to persist to cluster state while marking task as failed with reason [" + reason + "].";
auditor.warning(transform.getId(),
msg + " Failure: " + e.getMessage());
logger.error(new ParameterizedMessage("[{}] {}", getTransformId(), msg),
e);
listener.onFailure(e);
}
@ -945,12 +948,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
protected void onFailure(Exception exc) {
// the failure handler must not throw an exception due to internal problems
try {
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
if (exc.getMessage().equals(lastAuditedExceptionMessage) == false) {
auditor.warning(transformTask.getTransformId(), "Data frame transform encountered an exception: " + exc.getMessage());
lastAuditedExceptionMessage = exc.getMessage();
}
handleFailure(exc);
} catch (Exception e) {
logger.error(
@ -1052,13 +1049,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
createCheckpointException -> {
logger.warn(new ParameterizedMessage("[{}] failed to create checkpoint.", transformId),
createCheckpointException);
listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException));
listener.onFailure(
new RuntimeException("Failed to create checkpoint due to " + createCheckpointException.getMessage(),
createCheckpointException));
}
)),
getCheckPointException -> {
logger.warn(new ParameterizedMessage("[{}] failed to retrieve checkpoint.", transformId),
getCheckPointException);
listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException));
listener.onFailure(
new RuntimeException("Failed to retrieve checkpoint due to " + getCheckPointException.getMessage(),
getCheckPointException));
}
));
}
@ -1103,6 +1104,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
"task encountered irrecoverable failure: " + e.getMessage() :
"task encountered more than " + transformTask.getNumFailureRetries() + " failures; latest failure: " + e.getMessage();
failIndexer(failureMessage);
} else {
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
auditor.warning(transformTask.getTransformId(),
"Data frame transform encountered an exception: " + e.getMessage() +
" Will attempt again at next scheduled trigger.");
lastAuditedExceptionMessage = e.getMessage();
}
}
}