[ML] [Data Frame] adding and modifying auditor messages (#42722) (#42818)

* [ML] [Data Frame] adding and modifying auditor messages

* Update DataFrameTransformTask.java
This commit is contained in:
Benjamin Trent 2019-06-03 19:49:58 -05:00 committed by GitHub
parent eab88354f2
commit 87cc6a974c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 28 additions and 14 deletions

View File

@ -23,6 +23,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Request;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import java.io.IOException;
@ -30,14 +31,16 @@ import java.io.IOException;
public class TransportDeleteDataFrameTransformAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
private final DataFrameTransformsConfigManager transformsConfigManager;
private final DataFrameAuditor auditor;
@Inject
public TransportDeleteDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
DataFrameTransformsConfigManager transformsConfigManager) {
DataFrameTransformsConfigManager transformsConfigManager, DataFrameAuditor auditor) {
super(DeleteDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters,
Request::new, indexNameExpressionResolver);
this.transformsConfigManager = transformsConfigManager;
this.auditor = auditor;
}
@Override
@ -65,7 +68,10 @@ public class TransportDeleteDataFrameTransformAction extends TransportMasterNode
} else {
// Task is not running, delete the configuration document
transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(
r -> listener.onResponse(new AcknowledgedResponse(r)),
r -> {
auditor.info(request.getId(), "Deleted data frame transform.");
listener.onResponse(new AcknowledgedResponse(r));
},
listener::onFailure));
}
}

View File

@ -46,6 +46,7 @@ import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges;
import org.elasticsearch.xpack.core.security.support.Exceptions;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
@ -65,12 +66,14 @@ public class TransportPutDataFrameTransformAction
private final Client client;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
private final SecurityContext securityContext;
private final DataFrameAuditor auditor;
@Inject
public TransportPutDataFrameTransformAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, XPackLicenseState licenseState,
DataFrameTransformsConfigManager dataFrameTransformsConfigManager, Client client) {
DataFrameTransformsConfigManager dataFrameTransformsConfigManager, Client client,
DataFrameAuditor auditor) {
super(PutDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters,
PutDataFrameTransformAction.Request::new, indexNameExpressionResolver);
this.licenseState = licenseState;
@ -78,6 +81,7 @@ public class TransportPutDataFrameTransformAction
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
new SecurityContext(settings, threadPool.getThreadContext()) : null;
this.auditor = auditor;
}
@Override
@ -234,7 +238,10 @@ public class TransportPutDataFrameTransformAction
// <5> Return the listener, or clean up destination index on failure.
ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(
putTransformConfigurationResult -> listener.onResponse(new AcknowledgedResponse(true)),
putTransformConfigurationResult -> {
auditor.info(config.getId(), "Created data frame transform.");
listener.onResponse(new AcknowledgedResponse(true));
},
listener::onFailure
);

View File

@ -184,11 +184,10 @@ public class TransportStartDataFrameTransformAction extends
if(dest.length == 0) {
auditor.info(request.getId(),
"Could not find destination index [" + destinationIndex + "]." +
" Creating index with deduced mappings.");
"Creating destination index [" + destinationIndex + "] with deduced mappings.");
createDestinationIndex(config, createOrGetIndexListener);
} else {
auditor.info(request.getId(), "Destination index [" + destinationIndex + "] already exists.");
auditor.info(request.getId(), "Using existing destination index [" + destinationIndex + "].");
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ClientHelper.DATA_FRAME_ORIGIN,
client.admin()

View File

@ -213,7 +213,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
persistStateToClusterState(state, ActionListener.wrap(
task -> {
auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]");
auditor.info(transform.getId(),
"Updated data frame transform state to [" + state.getTaskState() + "].");
long now = System.currentTimeMillis();
// kick off the indexer
triggered(new Event(schedulerJobName(), now, now));
@ -293,10 +294,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
synchronized void markAsFailed(String reason, ActionListener<Void> listener) {
taskState.set(DataFrameTransformTaskState.FAILED);
stateReason.set(reason);
auditor.error(transform.getId(), reason);
persistStateToClusterState(getState(), ActionListener.wrap(
r -> {
listener.onResponse(null);
},
r -> listener.onResponse(null),
listener::onFailure
));
}
@ -560,6 +560,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
auditor.warning(getJobId(),
"Failure updating stats of transform: " + statsExc.getMessage());
next.run();
}
));
@ -588,7 +590,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
try {
super.onFinish(listener);
long checkpoint = transformTask.currentCheckpoint.incrementAndGet();
auditor.info(transformTask.getTransformId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "]");
auditor.info(transformTask.getTransformId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "].");
logger.info(
"Finished indexing for data frame transform [" + transformTask.getTransformId() + "] checkpoint [" + checkpoint + "]");
listener.onResponse(null);
@ -599,14 +601,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
protected void onStop() {
auditor.info(transformConfig.getId(), "Indexer has stopped");
auditor.info(transformConfig.getId(), "Data frame transform has stopped.");
logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());
transformTask.shutdown();
}
@Override
protected void onAbort() {
auditor.info(transformConfig.getId(), "Received abort request, stopping indexer");
auditor.info(transformConfig.getId(), "Received abort request, stopping data frame transform.");
logger.info("Data frame transform [" + transformConfig.getId() + "] received abort request, stopping indexer");
transformTask.shutdown();
}