Adds audits for when the job starts reindexing, loading data, analyzing, writing results. Also adds some info logging. Backport of #53179
This commit is contained in:
parent
7ddbda4c20
commit
9abf537527
|
@ -65,9 +65,14 @@ public final class Messages {
|
|||
public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING = "Finished reindexing to destination index [{0}]";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING = "Started reindexing to destination index [{0}]";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING =
|
||||
"Finished reindexing to destination index [{0}], took [{1}]";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS = "Finished analysis";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_RESTORING_STATE = "Restoring from previous model state";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_LOADING_DATA = "Started loading data";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_ANALYZING = "Started analyzing";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS = "Started writing results";
|
||||
|
||||
public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}";
|
||||
public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed";
|
||||
|
|
|
@ -120,7 +120,11 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
|
|||
"Starting analytics on node",
|
||||
"Started analytics",
|
||||
expectedDestIndexAuditMessage(),
|
||||
"Started reindexing to destination index [" + destIndex + "]",
|
||||
"Finished reindexing to destination index [" + destIndex + "]",
|
||||
"Started loading data",
|
||||
"Started analyzing",
|
||||
"Started writing results",
|
||||
"Finished analysis");
|
||||
assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
|
||||
}
|
||||
|
@ -161,7 +165,11 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
|
|||
"Starting analytics on node",
|
||||
"Started analytics",
|
||||
expectedDestIndexAuditMessage(),
|
||||
"Started reindexing to destination index [" + destIndex + "]",
|
||||
"Finished reindexing to destination index [" + destIndex + "]",
|
||||
"Started loading data",
|
||||
"Started analyzing",
|
||||
"Started writing results",
|
||||
"Finished analysis");
|
||||
assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
|
||||
}
|
||||
|
@ -224,7 +232,11 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
|
|||
"Starting analytics on node",
|
||||
"Started analytics",
|
||||
expectedDestIndexAuditMessage(),
|
||||
"Started reindexing to destination index [" + destIndex + "]",
|
||||
"Finished reindexing to destination index [" + destIndex + "]",
|
||||
"Started loading data",
|
||||
"Started analyzing",
|
||||
"Started writing results",
|
||||
"Finished analysis");
|
||||
assertEvaluation(dependentVariable, dependentVariableValues, "ml." + predictedClassField);
|
||||
}
|
||||
|
|
|
@ -233,6 +233,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
|
|||
// Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start,
|
||||
// finished the job (as this is a very short analytics job), all without the audit being fully written.
|
||||
assertBusy(() -> assertTrue(indexExists(NotificationsIndex.NOTIFICATIONS_INDEX)));
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Matcher<String>[] itemMatchers = Arrays.stream(expectedAuditMessagePrefixes).map(Matchers::startsWith).toArray(Matcher[]::new);
|
||||
assertBusy(() -> {
|
||||
|
@ -252,6 +253,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
|
|||
.setIndices(NotificationsIndex.NOTIFICATIONS_INDEX)
|
||||
.addSort("timestamp", SortOrder.ASC)
|
||||
.setQuery(QueryBuilders.termQuery("job_id", dataFrameAnalyticsId))
|
||||
.setSize(100)
|
||||
.request();
|
||||
SearchResponse searchResponse = client().execute(SearchAction.INSTANCE, searchRequest).actionGet();
|
||||
|
||||
|
|
|
@ -105,7 +105,11 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
|
|||
"Starting analytics on node",
|
||||
"Started analytics",
|
||||
"Creating destination index [" + destIndex + "]",
|
||||
"Started reindexing to destination index [" + destIndex + "]",
|
||||
"Finished reindexing to destination index [" + destIndex + "]",
|
||||
"Started loading data",
|
||||
"Started analyzing",
|
||||
"Started writing results",
|
||||
"Finished analysis");
|
||||
}
|
||||
|
||||
|
@ -144,7 +148,11 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
|
|||
"Starting analytics on node",
|
||||
"Started analytics",
|
||||
"Creating destination index [" + destIndex + "]",
|
||||
"Started reindexing to destination index [" + destIndex + "]",
|
||||
"Finished reindexing to destination index [" + destIndex + "]",
|
||||
"Started loading data",
|
||||
"Started analyzing",
|
||||
"Started writing results",
|
||||
"Finished analysis");
|
||||
}
|
||||
|
||||
|
@ -198,7 +206,11 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
|
|||
"Starting analytics on node",
|
||||
"Started analytics",
|
||||
"Creating destination index [" + destIndex + "]",
|
||||
"Started reindexing to destination index [" + destIndex + "]",
|
||||
"Finished reindexing to destination index [" + destIndex + "]",
|
||||
"Started loading data",
|
||||
"Started analyzing",
|
||||
"Started writing results",
|
||||
"Finished analysis");
|
||||
}
|
||||
|
||||
|
|
|
@ -126,7 +126,11 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
|
|||
"Starting analytics on node",
|
||||
"Started analytics",
|
||||
"Creating destination index [test-outlier-detection-with-few-docs-results]",
|
||||
"Started reindexing to destination index [test-outlier-detection-with-few-docs-results]",
|
||||
"Finished reindexing to destination index [test-outlier-detection-with-few-docs-results]",
|
||||
"Started loading data",
|
||||
"Started analyzing",
|
||||
"Started writing results",
|
||||
"Finished analysis");
|
||||
}
|
||||
|
||||
|
@ -181,7 +185,11 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
|
|||
"Starting analytics on node",
|
||||
"Started analytics",
|
||||
"Creating destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
|
||||
"Started reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
|
||||
"Finished reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
|
||||
"Started loading data",
|
||||
"Started analyzing",
|
||||
"Started writing results",
|
||||
"Finished analysis");
|
||||
}
|
||||
|
||||
|
@ -262,7 +270,11 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
|
|||
"Starting analytics on node",
|
||||
"Started analytics",
|
||||
"Creating destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
|
||||
"Started reindexing to destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
|
||||
"Finished reindexing to destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
|
||||
"Started loading data",
|
||||
"Started analyzing",
|
||||
"Started writing results",
|
||||
"Finished analysis");
|
||||
}
|
||||
|
||||
|
@ -387,7 +399,11 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
|
|||
"Starting analytics on node",
|
||||
"Started analytics",
|
||||
"Creating destination index [test-outlier-detection-with-multiple-source-indices-results]",
|
||||
"Started reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]",
|
||||
"Finished reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]",
|
||||
"Started loading data",
|
||||
"Started analyzing",
|
||||
"Started writing results",
|
||||
"Finished analysis");
|
||||
}
|
||||
|
||||
|
@ -445,7 +461,11 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
|
|||
"Starting analytics on node",
|
||||
"Started analytics",
|
||||
"Using existing destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
|
||||
"Started reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
|
||||
"Finished reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
|
||||
"Started loading data",
|
||||
"Started analyzing",
|
||||
"Started writing results",
|
||||
"Finished analysis");
|
||||
}
|
||||
|
||||
|
@ -699,7 +719,11 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
|
|||
"Starting analytics on node",
|
||||
"Started analytics",
|
||||
"Creating destination index [test-outlier-detection-with-custom-params-results]",
|
||||
"Started reindexing to destination index [test-outlier-detection-with-custom-params-results]",
|
||||
"Finished reindexing to destination index [test-outlier-detection-with-custom-params-results]",
|
||||
"Started loading data",
|
||||
"Started analyzing",
|
||||
"Started writing results",
|
||||
"Finished analysis");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -213,7 +213,8 @@ public class DataFrameAnalyticsManager {
|
|||
task.setReindexingFinished();
|
||||
auditor.info(
|
||||
config.getId(),
|
||||
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex()));
|
||||
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex(),
|
||||
reindexResponse.getTook()));
|
||||
startAnalytics(task, config);
|
||||
},
|
||||
error -> task.setFailed(ExceptionsHelper.unwrapCause(error).getMessage())
|
||||
|
@ -233,9 +234,12 @@ public class DataFrameAnalyticsManager {
|
|||
final ThreadContext threadContext = client.threadPool().getThreadContext();
|
||||
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashWithOrigin(ML_ORIGIN)) {
|
||||
LOGGER.info("[{}] Started reindexing", config.getId());
|
||||
Task reindexTask = client.executeLocally(ReindexAction.INSTANCE, reindexRequest,
|
||||
new ContextPreservingActionListener<>(supplier, reindexCompletedListener));
|
||||
task.setReindexingTaskId(reindexTask.getId());
|
||||
auditor.info(config.getId(),
|
||||
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING, config.getDest().getIndex()));
|
||||
}
|
||||
},
|
||||
reindexCompletedListener::onFailure
|
||||
|
|
|
@ -147,6 +147,9 @@ public class AnalyticsProcessManager {
|
|||
}
|
||||
|
||||
private void processData(DataFrameAnalyticsTask task, ProcessContext processContext, BytesReference state) {
|
||||
LOGGER.info("[{}] Started loading data", processContext.config.getId());
|
||||
auditor.info(processContext.config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_LOADING_DATA));
|
||||
|
||||
DataFrameAnalyticsConfig config = processContext.config;
|
||||
DataFrameDataExtractor dataExtractor = processContext.dataExtractor.get();
|
||||
AnalyticsProcess<AnalyticsResult> process = processContext.process.get();
|
||||
|
@ -159,6 +162,9 @@ public class AnalyticsProcessManager {
|
|||
|
||||
restoreState(task, config, state, process);
|
||||
|
||||
LOGGER.info("[{}] Started analyzing", processContext.config.getId());
|
||||
auditor.info(processContext.config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_ANALYZING));
|
||||
|
||||
LOGGER.info("[{}] Waiting for result processor to complete", config.getId());
|
||||
resultProcessor.awaitForCompletion();
|
||||
processContext.setFailureReason(resultProcessor.getFailure());
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage;
|
|||
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
|
||||
import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition;
|
||||
import org.elasticsearch.xpack.core.ml.inference.TrainedModelInput;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
|
||||
import org.elasticsearch.xpack.core.security.user.XPackUser;
|
||||
|
@ -120,6 +121,10 @@ public class AnalyticsResultProcessor {
|
|||
AnalyticsResult result = iterator.next();
|
||||
processResult(result, resultsJoiner);
|
||||
if (result.getRowResults() != null) {
|
||||
if (processedRows == 0) {
|
||||
LOGGER.info("[{}] Started writing results", analytics.getId());
|
||||
auditor.info(analytics.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS));
|
||||
}
|
||||
processedRows++;
|
||||
updateResultsProgress(processedRows >= totalRows ? 100 : (int) (processedRows * 100.0 / totalRows));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue