[ML][Transforms] unifying logging, adding some more logging (#45788) (#45859)

* [ML][Transforms] unifying logging, adding some more logging

* using parameterizedMessage instead of string concat

* fixing bracket closure
This commit is contained in:
Benjamin Trent 2019-08-22 13:15:07 -05:00 committed by GitHub
parent c60399c77f
commit dff3e636c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 74 additions and 51 deletions

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.dataframe.transforms;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
@ -21,7 +22,6 @@ import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.TimeValue;
@ -248,7 +248,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
* @param listener Started listener
*/
public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
logger.debug("[{}] start called with force [{}] and state [{}]", getTransformId(), force, getState());
logger.debug("[{}] start called with force [{}] and state [{}].", getTransformId(), force, getState());
if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DATA_FRAME_CANNOT_START_FAILED_TRANSFORM,
@ -290,7 +290,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
null,
getIndexer().getProgress());
logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
logger.info("[{}] updating state for data frame transform to [{}].", transform.getId(), state.toString());
// Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate
// This keeps track of STARTED, FAILED, STOPPED
// This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that
@ -306,6 +306,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
},
exc -> {
logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc);
getIndexer().stop();
listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform ["
+ transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
@ -354,12 +355,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
if (getIndexer() == null) {
logger.warn("Data frame task [{}] triggered with an unintialized indexer", getTransformId());
logger.warn("[{}] data frame task triggered with an unintialized indexer.", getTransformId());
return;
}
if (taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.debug("Schedule was triggered for transform [{}] but task is failed. Ignoring trigger.", getTransformId());
logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getTransformId());
return;
}
@ -368,15 +369,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
if (IndexerState.INDEXING.equals(indexerState) ||
IndexerState.STOPPING.equals(indexerState) ||
IndexerState.STOPPED.equals(indexerState)) {
logger.debug("Indexer for transform [{}] has state [{}], ignoring trigger", getTransformId(), indexerState);
logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getTransformId(), indexerState);
return;
}
logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), indexerState);
logger.debug("[{}] data frame indexer schedule has triggered, state: [{}].", event.getJobName(), indexerState);
// if it runs for the 1st time we just do it, if not we check for changes
if (currentCheckpoint.get() == 0) {
logger.debug("Trigger initial run");
logger.debug("Trigger initial run.");
getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
} else if (getIndexer().isContinuous()) {
getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
@ -407,12 +408,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
updatePersistentTaskState(state, ActionListener.wrap(
success -> {
logger.debug("Successfully updated state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
logger.debug("[{}] successfully updated state for data frame transform to [{}].", transform.getId(), state.toString());
listener.onResponse(success);
},
failure -> {
auditor.warning(transform.getId(), "Failed to persist to state to cluster state: " + failure.getMessage());
logger.error("Failed to update state for data frame transform [" + transform.getId() + "]", failure);
logger.error(new ParameterizedMessage("[{}] failed to update cluster state for data frame transform.",
transform.getId()),
failure);
listener.onFailure(failure);
}
));
@ -422,7 +425,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
// If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to
// flag the previously triggered indexer as failed. Exit early as we are already flagged as failed.
if (taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason);
logger.warn("[{}] is already failed but encountered new failure; reason [{}].", getTransformId(), reason);
listener.onResponse(null);
return;
}
@ -430,7 +433,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
// the indexer to fail. Since `ClientDataFrameIndexer#doSaveState` will persist the state to the index once the indexer stops,
// it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue.
if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPING) {
logger.info("Attempt to fail transform [" + getTransformId() + "] with reason [" + reason + "] while it was stopping.");
logger.info("[{}] attempt to fail transform with reason [{}] while it was stopping.", getTransformId(), reason);
auditor.info(getTransformId(), "Attempted to fail transform with reason [" + reason + "] while in STOPPING state.");
listener.onResponse(null);
return;
@ -438,7 +441,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
// If we are stopped, this means that between the failure occurring and being handled, somebody called stop
// We should just allow that stop to continue
if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPED) {
logger.info("[{}] encountered a failure but indexer is STOPPED; reason [{}]", getTransformId(), reason);
logger.info("[{}] encountered a failure but indexer is STOPPED; reason [{}].", getTransformId(), reason);
listener.onResponse(null);
return;
}
@ -456,7 +459,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
persistStateToClusterState(newState, ActionListener.wrap(
r -> listener.onResponse(null),
e -> {
logger.error("Failed to set task state as failed to cluster state", e);
logger.error(new ParameterizedMessage("[{}] failed to set task state as failed to cluster state.", getTransformId()),
e);
listener.onFailure(e);
}
));
@ -469,8 +473,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
*/
@Override
public synchronized void onCancelled() {
logger.info(
"Received cancellation request for data frame transform [" + transform.getId() + "], state: [" + taskState.get() + "]");
logger.info("[{}] received cancellation request for data frame transform, state: [{}].",
getTransformId(),
taskState.get());
if (getIndexer() != null && getIndexer().abort()) {
// there is no background transform running, we can shutdown safely
shutdown();
@ -695,13 +700,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
TransformProgressGatherer.getInitialProgress(this.client, buildFilterQuery(), getConfig(), ActionListener.wrap(
newProgress -> {
logger.trace("[{}] reset the progress from [{}] to [{}]", transformId, progress, newProgress);
logger.trace("[{}] reset the progress from [{}] to [{}].", transformId, progress, newProgress);
progress = newProgress;
super.onStart(now, listener);
},
failure -> {
progress = null;
logger.warn("Unable to load progress information for task [" + transformId + "]", failure);
logger.warn(new ParameterizedMessage("[{}] unable to load progress information for task.",
transformId),
failure);
super.onStart(now, listener);
}
));
@ -778,14 +785,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
public synchronized boolean maybeTriggerAsyncJob(long now) {
if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.debug("Schedule was triggered for transform [{}] but task is failed. Ignoring trigger.", getJobId());
logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getJobId());
return false;
}
// ignore trigger if indexer is running, prevents log spam in A2P indexer
IndexerState indexerState = getState();
if (IndexerState.INDEXING.equals(indexerState) || IndexerState.STOPPING.equals(indexerState)) {
logger.debug("Indexer for transform [{}] has state [{}], ignoring trigger", getJobId(), indexerState);
logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getJobId(), indexerState);
return false;
}
@ -876,7 +883,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
indexerState = IndexerState.STOPPED;
auditor.info(transformConfig.getId(), "Data frame finished indexing all data, initiating stop");
logger.info("Data frame [{}] finished indexing all data, initiating stop", transformConfig.getId());
logger.info("[{}] data frame transform finished indexing all data, initiating stop.", transformConfig.getId());
}
final DataFrameTransformState state = new DataFrameTransformState(
@ -886,7 +893,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
transformTask.currentCheckpoint.get(),
transformTask.stateReason.get(),
getProgress());
logger.debug("Updating persistent state of transform [{}] to [{}]", transformConfig.getId(), state.toString());
logger.debug("[{}] updating persistent state of transform to [{}].", transformConfig.getId(), state.toString());
// Persist the current state and stats in the internal index. The interval of this method being
// called is controlled by AsyncTwoPhaseIndexer#onBulkResponse which calls doSaveState every so
@ -918,7 +925,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
logger.error(new ParameterizedMessage("[{}] updating stats of transform failed.",
transformConfig.getId()),
statsExc);
auditor.warning(getJobId(),
"Failure updating stats of transform: " + statsExc.getMessage());
// for auto stop shutdown the task
@ -942,7 +951,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
handleFailure(exc);
} catch (Exception e) {
logger.error("Data frame transform encountered an unexpected internal exception: " ,e);
logger.error(
new ParameterizedMessage("[{}] data frame transform encountered an unexpected internal exception: ", transformId),
e);
}
}
@ -967,7 +978,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
if (progress != null && progress.getPercentComplete() != null && progress.getPercentComplete() < 100.0) {
progress.incrementDocsProcessed(progress.getTotalDocs() - progress.getDocumentsProcessed());
}
logger.info("Last checkpoint for {} {}", getJobId(), Strings.toString(lastCheckpoint));
// If the last checkpoint is now greater than 1, that means that we have just processed the first
// continuous checkpoint and should start recording the exponential averages
if (lastCheckpoint != null && lastCheckpoint.getCheckpoint() > 1) {
@ -987,7 +997,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
"Finished indexing for data frame transform checkpoint [" + checkpoint + "].");
}
logger.debug(
"Finished indexing for data frame transform [" + transformTask.getTransformId() + "] checkpoint [" + checkpoint + "]");
"[{}] finished indexing for data frame transform checkpoint [{}].", getJobId(), checkpoint);
auditBulkFailures = true;
listener.onResponse(null);
} catch (Exception e) {
@ -1009,7 +1019,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
if (++logCount % logEvery != 0) {
return false;
}
int log10Checkpoint = (int) Math.floor(Math.log10(completedCheckpoint + 1));
if (completedCheckpoint == 0) {
return true;
}
int log10Checkpoint = (int) Math.floor(Math.log10(completedCheckpoint));
logEvery = log10Checkpoint >= 3 ? 1_000 : (int)Math.pow(10.0, log10Checkpoint);
logCount = 0;
return true;
@ -1018,13 +1031,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
protected void onStop() {
auditor.info(transformConfig.getId(), "Data frame transform has stopped.");
logger.info("Data frame transform [{}] has stopped", transformConfig.getId());
logger.info("[{}] data frame transform has stopped.", transformConfig.getId());
}
@Override
protected void onAbort() {
auditor.info(transformConfig.getId(), "Received abort request, stopping data frame transform.");
logger.info("Data frame transform [" + transformConfig.getId() + "] received abort request, stopping indexer");
logger.info("[{}] data frame transform received abort request. Stopping indexer.", transformConfig.getId());
transformTask.shutdown();
}
@ -1034,11 +1047,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
checkpoint -> transformsConfigManager.putTransformCheckpoint(checkpoint,
ActionListener.wrap(
putCheckPointResponse -> listener.onResponse(checkpoint),
createCheckpointException ->
listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException))
createCheckpointException -> {
logger.warn(new ParameterizedMessage("[{}] failed to create checkpoint.", transformId),
createCheckpointException);
listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException));
}
)),
getCheckPointException ->
listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException))
getCheckPointException -> {
logger.warn(new ParameterizedMessage("[{}] failed to retrieve checkpoint.", transformId),
getCheckPointException);
listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException));
}
));
}
@ -1047,12 +1066,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
checkpointProvider.sourceHasChanged(getLastCheckpoint(),
ActionListener.wrap(
hasChanged -> {
logger.trace("[{}] change detected [{}]", transformId, hasChanged);
logger.trace("[{}] change detected [{}].", transformId, hasChanged);
hasChangedListener.onResponse(hasChanged);
},
e -> {
logger.warn(
"Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check.",
new ParameterizedMessage(
"[{}] failed to detect changes for data frame transform. Skipping update till next check.",
transformId),
e);
auditor.warning(transformId,
"Failed to detect changes for data frame transform, skipping update till next check. Exception: "
@ -1068,7 +1089,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
synchronized void handleFailure(Exception e) {
logger.warn("Data frame transform [" + transformTask.getTransformId() + "] encountered an exception: ", e);
logger.warn(new ParameterizedMessage("[{}] data frame transform encountered an exception: ",
transformTask.getTransformId()),
e);
if (handleCircuitBreakingException(e)) {
return;
}
@ -1083,7 +1106,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
protected void failIndexer(String failureMessage) {
logger.error("Data frame transform [" + getJobId() + "]: " + failureMessage);
logger.error("[{}] transform has failed; experienced: [{}].", getJobId(), failureMessage);
auditor.error(transformTask.getTransformId(), failureMessage);
transformTask.markAsFailed(failureMessage, ActionListener.wrap(
r -> {

View File

@ -74,30 +74,30 @@ public class ClientDataFrameIndexerTests extends ESTestCase {
// Audit every checkpoint for the first 10
assertTrue(shouldAudit.get(0));
assertTrue(shouldAudit.get(1));
assertTrue(shouldAudit.get(9));
assertTrue(shouldAudit.get(10));
// Then audit every 10 while < 100
assertFalse(shouldAudit.get(10));
assertFalse(shouldAudit.get(11));
assertTrue(shouldAudit.get(19));
assertTrue(shouldAudit.get(29));
assertFalse(shouldAudit.get(30));
assertTrue(shouldAudit.get(99));
assertTrue(shouldAudit.get(20));
assertFalse(shouldAudit.get(29));
assertTrue(shouldAudit.get(30));
assertFalse(shouldAudit.get(99));
// Then audit every 100 < 1000
assertFalse(shouldAudit.get(100));
assertTrue(shouldAudit.get(100));
assertFalse(shouldAudit.get(109));
assertFalse(shouldAudit.get(110));
assertTrue(shouldAudit.get(199));
assertFalse(shouldAudit.get(199));
// Then audit every 1000 for the rest of time
assertTrue(shouldAudit.get(1999));
assertFalse(shouldAudit.get(1999));
assertFalse(shouldAudit.get(2199));
assertTrue(shouldAudit.get(2999));
assertTrue(shouldAudit.get(9999));
assertTrue(shouldAudit.get(10_999));
assertFalse(shouldAudit.get(11_000));
assertTrue(shouldAudit.get(11_999));
assertTrue(shouldAudit.get(3000));
assertTrue(shouldAudit.get(10_000));
assertFalse(shouldAudit.get(10_999));
assertTrue(shouldAudit.get(11_000));
assertFalse(shouldAudit.get(11_001));
assertFalse(shouldAudit.get(11_999));
}
}