From 87cc6a974c13c65c3136432aa4c6e56ba958ac34 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 3 Jun 2019 19:49:58 -0500 Subject: [PATCH] [ML] [Data Frame] adding and modifying auditor messages (#42722) (#42818) * [ML] [Data Frame] adding and modifying auditor messages * Update DataFrameTransformTask.java --- .../TransportDeleteDataFrameTransformAction.java | 10 ++++++++-- .../TransportPutDataFrameTransformAction.java | 11 +++++++++-- .../TransportStartDataFrameTransformAction.java | 5 ++--- .../transforms/DataFrameTransformTask.java | 16 +++++++++------- 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java index ac40334dfb4..1e0fcd31fb2 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Request; +import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import java.io.IOException; @@ -30,14 +31,16 @@ import java.io.IOException; public class TransportDeleteDataFrameTransformAction extends TransportMasterNodeAction { private final DataFrameTransformsConfigManager transformsConfigManager; + private final DataFrameAuditor auditor; @Inject public TransportDeleteDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, - DataFrameTransformsConfigManager transformsConfigManager) { + DataFrameTransformsConfigManager transformsConfigManager, DataFrameAuditor auditor) { super(DeleteDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); this.transformsConfigManager = transformsConfigManager; + this.auditor = auditor; } @Override @@ -65,7 +68,10 @@ public class TransportDeleteDataFrameTransformAction extends TransportMasterNode } else { // Task is not running, delete the configuration document transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap( - r -> listener.onResponse(new AcknowledgedResponse(r)), + r -> { + auditor.info(request.getId(), "Deleted data frame transform."); + listener.onResponse(new AcknowledgedResponse(r)); + }, listener::onFailure)); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java index 997739b2407..b4d5957c0f5 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java @@ -46,6 +46,7 @@ import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse; import org.elasticsearch.xpack.core.security.authz.RoleDescriptor; import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges; import org.elasticsearch.xpack.core.security.support.Exceptions; +import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; @@ -65,12 +66,14 @@ public class TransportPutDataFrameTransformAction private final Client client; private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager; private final SecurityContext securityContext; + private final DataFrameAuditor auditor; @Inject public TransportPutDataFrameTransformAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, XPackLicenseState licenseState, - DataFrameTransformsConfigManager dataFrameTransformsConfigManager, Client client) { + DataFrameTransformsConfigManager dataFrameTransformsConfigManager, Client client, + DataFrameAuditor auditor) { super(PutDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters, PutDataFrameTransformAction.Request::new, indexNameExpressionResolver); this.licenseState = licenseState; @@ -78,6 +81,7 @@ public class TransportPutDataFrameTransformAction this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager; this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ? new SecurityContext(settings, threadPool.getThreadContext()) : null; + this.auditor = auditor; } @Override @@ -234,7 +238,10 @@ public class TransportPutDataFrameTransformAction // <5> Return the listener, or clean up destination index on failure. ActionListener putTransformConfigurationListener = ActionListener.wrap( - putTransformConfigurationResult -> listener.onResponse(new AcknowledgedResponse(true)), + putTransformConfigurationResult -> { + auditor.info(config.getId(), "Created data frame transform."); + listener.onResponse(new AcknowledgedResponse(true)); + }, listener::onFailure ); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index 8b7bcb8d764..e23e54d67b5 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -184,11 +184,10 @@ public class TransportStartDataFrameTransformAction extends if(dest.length == 0) { auditor.info(request.getId(), - "Could not find destination index [" + destinationIndex + "]." + - " Creating index with deduced mappings."); + "Creating destination index [" + destinationIndex + "] with deduced mappings."); createDestinationIndex(config, createOrGetIndexListener); } else { - auditor.info(request.getId(), "Destination index [" + destinationIndex + "] already exists."); + auditor.info(request.getId(), "Using existing destination index [" + destinationIndex + "]."); ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), ClientHelper.DATA_FRAME_ORIGIN, client.admin() diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 575cd4c15bd..20ef5be09e8 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -213,7 +213,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString()); persistStateToClusterState(state, ActionListener.wrap( task -> { - auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]"); + auditor.info(transform.getId(), + "Updated data frame transform state to [" + state.getTaskState() + "]."); long now = System.currentTimeMillis(); // kick off the indexer triggered(new Event(schedulerJobName(), now, now)); @@ -293,10 +294,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S synchronized void markAsFailed(String reason, ActionListener listener) { taskState.set(DataFrameTransformTaskState.FAILED); stateReason.set(reason); + auditor.error(transform.getId(), reason); persistStateToClusterState(getState(), ActionListener.wrap( - r -> { - listener.onResponse(null); - }, + r -> listener.onResponse(null), listener::onFailure )); } @@ -560,6 +560,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S }, statsExc -> { logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc); + auditor.warning(getJobId(), + "Failure updating stats of transform: " + statsExc.getMessage()); next.run(); } )); @@ -588,7 +590,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S try { super.onFinish(listener); long checkpoint = transformTask.currentCheckpoint.incrementAndGet(); - auditor.info(transformTask.getTransformId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "]"); + auditor.info(transformTask.getTransformId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "]."); logger.info( "Finished indexing for data frame transform [" + transformTask.getTransformId() + "] checkpoint [" + checkpoint + "]"); listener.onResponse(null); @@ -599,14 +601,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void onStop() { - auditor.info(transformConfig.getId(), "Indexer has stopped"); + auditor.info(transformConfig.getId(), "Data frame transform has stopped."); logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId()); transformTask.shutdown(); } @Override protected void onAbort() { - auditor.info(transformConfig.getId(), "Received abort request, stopping indexer"); + auditor.info(transformConfig.getId(), "Received abort request, stopping data frame transform."); logger.info("Data frame transform [" + transformConfig.getId() + "] received abort request, stopping indexer"); transformTask.shutdown(); }