From dff3e636c2d6a32479269d9f18caf89d4efc54a1 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 22 Aug 2019 13:15:07 -0500 Subject: [PATCH] [ML][Transforms] unifying logging, adding some more logging (#45788) (#45859) * [ML][Transforms] unifying logging, adding some more logging * using parameterizedMessage instead of string concat * fixing bracket closure --- .../transforms/DataFrameTransformTask.java | 97 ++++++++++++------- .../ClientDataFrameIndexerTests.java | 28 +++--- 2 files changed, 74 insertions(+), 51 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 190a3303520..973aa07b189 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.dataframe.transforms; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; @@ -21,7 +22,6 @@ import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.TimeValue; @@ -248,7 +248,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S * @param listener Started listener */ public synchronized void start(Long startingCheckpoint, boolean force, ActionListener listener) { - logger.debug("[{}] start called with force [{}] and state [{}]", getTransformId(), force, getState()); + logger.debug("[{}] start called with force [{}] and state [{}].", getTransformId(), force, getState()); if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) { listener.onFailure(new ElasticsearchStatusException( DataFrameMessages.getMessage(DATA_FRAME_CANNOT_START_FAILED_TRANSFORM, @@ -290,7 +290,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S null, getIndexer().getProgress()); - logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString()); + logger.info("[{}] updating state for data frame transform to [{}].", transform.getId(), state.toString()); // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate // This keeps track of STARTED, FAILED, STOPPED // This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that @@ -306,6 +306,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S listener.onResponse(new StartDataFrameTransformTaskAction.Response(true)); }, exc -> { + logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc); getIndexer().stop(); listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform [" + transform.getId() + "] to [" + state.getIndexerState() + "].", exc)); @@ -354,12 +355,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } if (getIndexer() == null) { - logger.warn("Data frame task [{}] triggered with an unintialized indexer", getTransformId()); + logger.warn("[{}] data frame task triggered with an unintialized indexer.", getTransformId()); return; } if (taskState.get() == DataFrameTransformTaskState.FAILED) { - logger.debug("Schedule was triggered for transform [{}] but task is failed. Ignoring trigger.", getTransformId()); + logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getTransformId()); return; } @@ -368,15 +369,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S if (IndexerState.INDEXING.equals(indexerState) || IndexerState.STOPPING.equals(indexerState) || IndexerState.STOPPED.equals(indexerState)) { - logger.debug("Indexer for transform [{}] has state [{}], ignoring trigger", getTransformId(), indexerState); + logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getTransformId(), indexerState); return; } - logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), indexerState); + logger.debug("[{}] data frame indexer schedule has triggered, state: [{}].", event.getJobName(), indexerState); // if it runs for the 1st time we just do it, if not we check for changes if (currentCheckpoint.get() == 0) { - logger.debug("Trigger initial run"); + logger.debug("Trigger initial run."); getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis()); } else if (getIndexer().isContinuous()) { getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis()); @@ -407,12 +408,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S ActionListener> listener) { updatePersistentTaskState(state, ActionListener.wrap( success -> { - logger.debug("Successfully updated state for data frame transform [{}] to [{}]", transform.getId(), state.toString()); + logger.debug("[{}] successfully updated state for data frame transform to [{}].", transform.getId(), state.toString()); listener.onResponse(success); }, failure -> { auditor.warning(transform.getId(), "Failed to persist to state to cluster state: " + failure.getMessage()); - logger.error("Failed to update state for data frame transform [" + transform.getId() + "]", failure); + logger.error(new ParameterizedMessage("[{}] failed to update cluster state for data frame transform.", + transform.getId()), + failure); listener.onFailure(failure); } )); @@ -422,7 +425,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S // If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to // flag the previously triggered indexer as failed. Exit early as we are already flagged as failed. if (taskState.get() == DataFrameTransformTaskState.FAILED) { - logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason); + logger.warn("[{}] is already failed but encountered new failure; reason [{}].", getTransformId(), reason); listener.onResponse(null); return; } @@ -430,7 +433,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S // the indexer to fail. Since `ClientDataFrameIndexer#doSaveState` will persist the state to the index once the indexer stops, // it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue. if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPING) { - logger.info("Attempt to fail transform [" + getTransformId() + "] with reason [" + reason + "] while it was stopping."); + logger.info("[{}] attempt to fail transform with reason [{}] while it was stopping.", getTransformId(), reason); auditor.info(getTransformId(), "Attempted to fail transform with reason [" + reason + "] while in STOPPING state."); listener.onResponse(null); return; @@ -438,7 +441,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S // If we are stopped, this means that between the failure occurring and being handled, somebody called stop // We should just allow that stop to continue if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPED) { - logger.info("[{}] encountered a failure but indexer is STOPPED; reason [{}]", getTransformId(), reason); + logger.info("[{}] encountered a failure but indexer is STOPPED; reason [{}].", getTransformId(), reason); listener.onResponse(null); return; } @@ -456,7 +459,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S persistStateToClusterState(newState, ActionListener.wrap( r -> listener.onResponse(null), e -> { - logger.error("Failed to set task state as failed to cluster state", e); + logger.error(new ParameterizedMessage("[{}] failed to set task state as failed to cluster state.", getTransformId()), + e); listener.onFailure(e); } )); @@ -469,8 +473,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S */ @Override public synchronized void onCancelled() { - logger.info( - "Received cancellation request for data frame transform [" + transform.getId() + "], state: [" + taskState.get() + "]"); + logger.info("[{}] received cancellation request for data frame transform, state: [{}].", + getTransformId(), + taskState.get()); if (getIndexer() != null && getIndexer().abort()) { // there is no background transform running, we can shutdown safely shutdown(); @@ -695,13 +700,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } TransformProgressGatherer.getInitialProgress(this.client, buildFilterQuery(), getConfig(), ActionListener.wrap( newProgress -> { - logger.trace("[{}] reset the progress from [{}] to [{}]", transformId, progress, newProgress); + logger.trace("[{}] reset the progress from [{}] to [{}].", transformId, progress, newProgress); progress = newProgress; super.onStart(now, listener); }, failure -> { progress = null; - logger.warn("Unable to load progress information for task [" + transformId + "]", failure); + logger.warn(new ParameterizedMessage("[{}] unable to load progress information for task.", + transformId), + failure); super.onStart(now, listener); } )); @@ -778,14 +785,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override public synchronized boolean maybeTriggerAsyncJob(long now) { if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) { - logger.debug("Schedule was triggered for transform [{}] but task is failed. Ignoring trigger.", getJobId()); + logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getJobId()); return false; } // ignore trigger if indexer is running, prevents log spam in A2P indexer IndexerState indexerState = getState(); if (IndexerState.INDEXING.equals(indexerState) || IndexerState.STOPPING.equals(indexerState)) { - logger.debug("Indexer for transform [{}] has state [{}], ignoring trigger", getJobId(), indexerState); + logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getJobId(), indexerState); return false; } @@ -876,7 +883,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S indexerState = IndexerState.STOPPED; auditor.info(transformConfig.getId(), "Data frame finished indexing all data, initiating stop"); - logger.info("Data frame [{}] finished indexing all data, initiating stop", transformConfig.getId()); + logger.info("[{}] data frame transform finished indexing all data, initiating stop.", transformConfig.getId()); } final DataFrameTransformState state = new DataFrameTransformState( @@ -886,7 +893,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S transformTask.currentCheckpoint.get(), transformTask.stateReason.get(), getProgress()); - logger.debug("Updating persistent state of transform [{}] to [{}]", transformConfig.getId(), state.toString()); + logger.debug("[{}] updating persistent state of transform to [{}].", transformConfig.getId(), state.toString()); // Persist the current state and stats in the internal index. The interval of this method being // called is controlled by AsyncTwoPhaseIndexer#onBulkResponse which calls doSaveState every so @@ -918,7 +925,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } }, statsExc -> { - logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc); + logger.error(new ParameterizedMessage("[{}] updating stats of transform failed.", + transformConfig.getId()), + statsExc); auditor.warning(getJobId(), "Failure updating stats of transform: " + statsExc.getMessage()); // for auto stop shutdown the task @@ -942,7 +951,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } handleFailure(exc); } catch (Exception e) { - logger.error("Data frame transform encountered an unexpected internal exception: " ,e); + logger.error( + new ParameterizedMessage("[{}] data frame transform encountered an unexpected internal exception: ", transformId), + e); } } @@ -967,7 +978,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S if (progress != null && progress.getPercentComplete() != null && progress.getPercentComplete() < 100.0) { progress.incrementDocsProcessed(progress.getTotalDocs() - progress.getDocumentsProcessed()); } - logger.info("Last checkpoint for {} {}", getJobId(), Strings.toString(lastCheckpoint)); // If the last checkpoint is now greater than 1, that means that we have just processed the first // continuous checkpoint and should start recording the exponential averages if (lastCheckpoint != null && lastCheckpoint.getCheckpoint() > 1) { @@ -987,7 +997,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S "Finished indexing for data frame transform checkpoint [" + checkpoint + "]."); } logger.debug( - "Finished indexing for data frame transform [" + transformTask.getTransformId() + "] checkpoint [" + checkpoint + "]"); + "[{}] finished indexing for data frame transform checkpoint [{}].", getJobId(), checkpoint); auditBulkFailures = true; listener.onResponse(null); } catch (Exception e) { @@ -1009,7 +1019,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S if (++logCount % logEvery != 0) { return false; } - int log10Checkpoint = (int) Math.floor(Math.log10(completedCheckpoint + 1)); + if (completedCheckpoint == 0) { + return true; + } + int log10Checkpoint = (int) Math.floor(Math.log10(completedCheckpoint)); logEvery = log10Checkpoint >= 3 ? 1_000 : (int)Math.pow(10.0, log10Checkpoint); logCount = 0; return true; @@ -1018,13 +1031,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void onStop() { auditor.info(transformConfig.getId(), "Data frame transform has stopped."); - logger.info("Data frame transform [{}] has stopped", transformConfig.getId()); + logger.info("[{}] data frame transform has stopped.", transformConfig.getId()); } @Override protected void onAbort() { auditor.info(transformConfig.getId(), "Received abort request, stopping data frame transform."); - logger.info("Data frame transform [" + transformConfig.getId() + "] received abort request, stopping indexer"); + logger.info("[{}] data frame transform received abort request. Stopping indexer.", transformConfig.getId()); transformTask.shutdown(); } @@ -1034,11 +1047,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S checkpoint -> transformsConfigManager.putTransformCheckpoint(checkpoint, ActionListener.wrap( putCheckPointResponse -> listener.onResponse(checkpoint), - createCheckpointException -> - listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException)) + createCheckpointException -> { + logger.warn(new ParameterizedMessage("[{}] failed to create checkpoint.", transformId), + createCheckpointException); + listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException)); + } )), - getCheckPointException -> - listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException)) + getCheckPointException -> { + logger.warn(new ParameterizedMessage("[{}] failed to retrieve checkpoint.", transformId), + getCheckPointException); + listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException)); + } )); } @@ -1047,12 +1066,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S checkpointProvider.sourceHasChanged(getLastCheckpoint(), ActionListener.wrap( hasChanged -> { - logger.trace("[{}] change detected [{}]", transformId, hasChanged); + logger.trace("[{}] change detected [{}].", transformId, hasChanged); hasChangedListener.onResponse(hasChanged); }, e -> { logger.warn( - "Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check.", + new ParameterizedMessage( + "[{}] failed to detect changes for data frame transform. Skipping update till next check.", + transformId), e); auditor.warning(transformId, "Failed to detect changes for data frame transform, skipping update till next check. Exception: " @@ -1068,7 +1089,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } synchronized void handleFailure(Exception e) { - logger.warn("Data frame transform [" + transformTask.getTransformId() + "] encountered an exception: ", e); + logger.warn(new ParameterizedMessage("[{}] data frame transform encountered an exception: ", + transformTask.getTransformId()), + e); if (handleCircuitBreakingException(e)) { return; } @@ -1083,7 +1106,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void failIndexer(String failureMessage) { - logger.error("Data frame transform [" + getJobId() + "]: " + failureMessage); + logger.error("[{}] transform has failed; experienced: [{}].", getJobId(), failureMessage); auditor.error(transformTask.getTransformId(), failureMessage); transformTask.markAsFailed(failureMessage, ActionListener.wrap( r -> { diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/ClientDataFrameIndexerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/ClientDataFrameIndexerTests.java index 2090e75ab45..4a23a57efcc 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/ClientDataFrameIndexerTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/ClientDataFrameIndexerTests.java @@ -74,30 +74,30 @@ public class ClientDataFrameIndexerTests extends ESTestCase { // Audit every checkpoint for the first 10 assertTrue(shouldAudit.get(0)); assertTrue(shouldAudit.get(1)); - assertTrue(shouldAudit.get(9)); + assertTrue(shouldAudit.get(10)); // Then audit every 10 while < 100 - assertFalse(shouldAudit.get(10)); assertFalse(shouldAudit.get(11)); - assertTrue(shouldAudit.get(19)); - assertTrue(shouldAudit.get(29)); - assertFalse(shouldAudit.get(30)); - assertTrue(shouldAudit.get(99)); + assertTrue(shouldAudit.get(20)); + assertFalse(shouldAudit.get(29)); + assertTrue(shouldAudit.get(30)); + assertFalse(shouldAudit.get(99)); // Then audit every 100 < 1000 - assertFalse(shouldAudit.get(100)); + assertTrue(shouldAudit.get(100)); assertFalse(shouldAudit.get(109)); assertFalse(shouldAudit.get(110)); - assertTrue(shouldAudit.get(199)); + assertFalse(shouldAudit.get(199)); // Then audit every 1000 for the rest of time - assertTrue(shouldAudit.get(1999)); + assertFalse(shouldAudit.get(1999)); assertFalse(shouldAudit.get(2199)); - assertTrue(shouldAudit.get(2999)); - assertTrue(shouldAudit.get(9999)); - assertTrue(shouldAudit.get(10_999)); - assertFalse(shouldAudit.get(11_000)); - assertTrue(shouldAudit.get(11_999)); + assertTrue(shouldAudit.get(3000)); + assertTrue(shouldAudit.get(10_000)); + assertFalse(shouldAudit.get(10_999)); + assertTrue(shouldAudit.get(11_000)); + assertFalse(shouldAudit.get(11_001)); + assertFalse(shouldAudit.get(11_999)); } }