From 08b7cb6bca6e6cef9d042b1a0f7f77674404786f Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 24 Feb 2017 15:56:38 +0000 Subject: [PATCH] [ML] Refactor Auditor (elastic/x-pack-elasticsearch#631) * Refactor Auditor * Inject Auditor Original commit: elastic/x-pack-elasticsearch@d9efe5fcef0769f49251444dbf4ee097ac295565 --- .../xpack/ml/MachineLearning.java | 11 +-- .../xpack/ml/MlInitializationService.java | 9 +-- .../ml/action/DeleteModelSnapshotAction.java | 7 +- .../xpack/ml/datafeed/DatafeedJob.java | 8 +-- .../xpack/ml/datafeed/DatafeedJobRunner.java | 9 +-- .../xpack/ml/datafeed/ProblemTracker.java | 16 ++--- .../xpack/ml/job/JobManager.java | 24 ++----- .../persistence/ElasticsearchMappings.java | 3 + .../xpack/ml/job/persistence/JobProvider.java | 10 --- .../job/retention/ExpiredResultsRemover.java | 8 +-- .../xpack/ml/notifications/AuditMessage.java | 39 ++++++++--- .../xpack/ml/notifications/Auditor.java | 24 +++---- .../ml/MlInitializationServiceTests.java | 12 ++-- .../ml/datafeed/DatafeedJobRunnerTests.java | 37 +++++++--- .../ml/datafeed/ProblemTrackerTests.java | 22 +++--- .../xpack/ml/job/JobManagerTests.java | 6 +- .../retention/ExpiredResultsRemoverTests.java | 2 +- .../ml/notifications/AuditMessageTests.java | 32 ++------- .../xpack/ml/notifications/AuditorTests.java | 69 +++++++------------ 19 files changed, 165 insertions(+), 183 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index ab2f4b3a7f2..c0991a7c9da 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -87,6 +87,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.MultiplyingNormalizerPr import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory; +import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.rest.datafeeds.RestDeleteDatafeedAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedStatsAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedsAction; @@ -252,7 +253,8 @@ public class MachineLearning extends Plugin implements ActionPlugin { JobProvider jobProvider = new JobProvider(client, 1, settings); JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client); - JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService); + Auditor auditor = new Auditor(client, clusterService); + JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor); AutodetectProcessFactory autodetectProcessFactory; NormalizerProcessFactory normalizerProcessFactory; if (AUTODETECT_PROCESS.get(settings)) { @@ -281,7 +283,7 @@ public class MachineLearning extends Plugin implements ActionPlugin { AutodetectProcessManager dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory); DatafeedJobRunner datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider, - System::currentTimeMillis); + System::currentTimeMillis, auditor); PersistentActionService persistentActionService = new PersistentActionService(Settings.EMPTY, threadPool, clusterService, client); PersistentActionRegistry persistentActionRegistry = new PersistentActionRegistry(Settings.EMPTY); @@ -289,12 +291,13 @@ public class MachineLearning extends Plugin implements ActionPlugin { jobProvider, jobManager, dataProcessor, - new MlInitializationService(settings, threadPool, clusterService, client, jobProvider), + new MlInitializationService(settings, threadPool, clusterService, client, jobProvider, auditor), jobDataCountsPersister, datafeedJobRunner, persistentActionService, persistentActionRegistry, - new PersistentTaskClusterService(Settings.EMPTY, persistentActionRegistry, clusterService) + new PersistentTaskClusterService(Settings.EMPTY, persistentActionRegistry, clusterService), + auditor ); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 57988e9d391..bdc951604bc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -33,6 +33,7 @@ public class MlInitializationService extends AbstractComponent implements Cluste private final ClusterService clusterService; private final Client client; private final JobProvider jobProvider; + private final Auditor auditor; private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false); private final AtomicBoolean putMlNotificationsIndexTemplateCheck = new AtomicBoolean(false); @@ -43,12 +44,13 @@ public class MlInitializationService extends AbstractComponent implements Cluste private volatile MlDailyManagementService mlDailyManagementService; public MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client, - JobProvider jobProvider) { + JobProvider jobProvider, Auditor auditor) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; this.client = client; this.jobProvider = jobProvider; + this.auditor = auditor; clusterService.addListener(this); clusterService.addLifecycleListener(new LifecycleListener() { @Override @@ -179,9 +181,8 @@ public class MlInitializationService extends AbstractComponent implements Cluste private void installDailyManagementService() { if (mlDailyManagementService == null) { - mlDailyManagementService = new MlDailyManagementService(threadPool, - Arrays.asList((MlDailyManagementService.Listener) - new ExpiredResultsRemover(client, clusterService, jobId -> jobProvider.audit(jobId)), + mlDailyManagementService = new MlDailyManagementService(threadPool, Arrays.asList( + new ExpiredResultsRemover(client, clusterService, auditor), new ExpiredModelSnapshotsRemover(client, clusterService) )); mlDailyManagementService.start(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteModelSnapshotAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteModelSnapshotAction.java index 69f38f99938..615e2a5b77d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteModelSnapshotAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteModelSnapshotAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.action.util.QueryPage; +import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -133,17 +134,19 @@ public class DeleteModelSnapshotAction extends Action currentTimeSupplier; + private final Auditor auditor; public DatafeedJobRunner(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider, - Supplier currentTimeSupplier) { + Supplier currentTimeSupplier, Auditor auditor) { super(Settings.EMPTY); this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); this.jobProvider = Objects.requireNonNull(jobProvider); this.threadPool = threadPool; this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); + this.auditor = auditor; } public void run(StartDatafeedAction.DatafeedTask task, Consumer handler) { @@ -174,13 +176,12 @@ public class DatafeedJobRunner extends AbstractComponent { Holder createJobDatafeed(DatafeedConfig datafeed, Job job, long finalBucketEndMs, long latestRecordTimeMs, Consumer handler, StartDatafeedAction.DatafeedTask task) { - Auditor auditor = jobProvider.audit(job.getId()); Duration frequency = getFrequencyOrDefault(datafeed, job); Duration queryDelay = Duration.ofSeconds(datafeed.getQueryDelay()); DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job); DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(), dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs); - Holder holder = new Holder(datafeed, datafeedJob, task.isLookbackOnly(), new ProblemTracker(() -> auditor), handler); + Holder holder = new Holder(datafeed, datafeedJob, task.isLookbackOnly(), new ProblemTracker(auditor, job.getId()), handler); task.setHolder(holder); return holder; } @@ -284,7 +285,7 @@ public class DatafeedJobRunner extends AbstractComponent { if (datafeedJob.stop()) { FutureUtils.cancel(future); handler.accept(e); - jobProvider.audit(datafeed.getJobId()).info(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); + auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId()); if (autoCloseJob) { closeJob(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java index 79dc342815a..4d3537e1de7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java @@ -9,7 +9,6 @@ import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.util.Objects; -import java.util.function.Supplier; /** *

@@ -27,16 +26,17 @@ class ProblemTracker { private static final int EMPTY_DATA_WARN_COUNT = 10; - private final Supplier auditor; + private final Auditor auditor; + private final String jobId; private volatile boolean hasProblems; private volatile boolean hadProblems; private volatile String previousProblem; - private volatile int emptyDataCount; - ProblemTracker(Supplier auditor) { + ProblemTracker(Auditor auditor, String jobId) { this.auditor = Objects.requireNonNull(auditor); + this.jobId = Objects.requireNonNull(jobId); } /** @@ -66,7 +66,7 @@ class ProblemTracker { hasProblems = true; if (!Objects.equals(previousProblem, problemMessage)) { previousProblem = problemMessage; - auditor.get().error(Messages.getMessage(template, problemMessage)); + auditor.error(jobId, Messages.getMessage(template, problemMessage)); } } @@ -78,14 +78,14 @@ class ProblemTracker { if (emptyDataCount < EMPTY_DATA_WARN_COUNT) { emptyDataCount++; if (emptyDataCount == EMPTY_DATA_WARN_COUNT) { - auditor.get().warning(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_NO_DATA)); + auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_NO_DATA)); } } } public void reportNoneEmptyCount() { if (emptyDataCount >= EMPTY_DATA_WARN_COUNT) { - auditor.get().info(Messages.getMessage(Messages.JOB_AUDIR_DATAFEED_DATA_SEEN_AGAIN)); + auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIR_DATAFEED_DATA_SEEN_AGAIN)); } emptyDataCount = 0; } @@ -99,7 +99,7 @@ class ProblemTracker { */ public void finishReport() { if (!hasProblems && hadProblems) { - auditor.get().info(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_RECOVERED)); + auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_RECOVERED)); } hadProblems = hasProblems; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index f54be3d22a1..25eaba2f0ed 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -32,7 +32,6 @@ import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; -import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; @@ -56,27 +55,21 @@ import java.util.stream.Collectors; */ public class JobManager extends AbstractComponent { - /** - * Field name in which to store the API version in the usage info - */ - public static final String APP_VER_FIELDNAME = "appVer"; - - public static final String DEFAULT_RECORD_SORT_FIELD = AnomalyRecord.PROBABILITY.getPreferredName(); - private final JobProvider jobProvider; private final ClusterService clusterService; private final JobResultsPersister jobResultsPersister; - + private final Auditor auditor; /** * Create a JobManager */ public JobManager(Settings settings, JobProvider jobProvider, JobResultsPersister jobResultsPersister, - ClusterService clusterService) { + ClusterService clusterService, Auditor auditor) { super(settings); this.jobProvider = Objects.requireNonNull(jobProvider); this.clusterService = clusterService; this.jobResultsPersister = jobResultsPersister; + this.auditor = auditor; } /** @@ -169,7 +162,7 @@ public class JobManager extends AbstractComponent { jobProvider.createJobResultIndex(job, state, new ActionListener() { @Override public void onResponse(Boolean indicesCreated) { - audit(job.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED)); + auditor.info(job.getId(), Messages.getMessage(Messages.JOB_AUDIT_CREATED)); // Also I wonder if we need to audit log infra // structure in ml as when we merge into xpack @@ -269,7 +262,7 @@ public class JobManager extends AbstractComponent { if (jobDeleted) { logger.info("Job [" + jobId + "] deleted."); actionListener.onResponse(new DeleteJobAction.Response(true)); - audit(jobId).info(Messages.getMessage(Messages.JOB_AUDIT_DELETED)); + auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DELETED)); } else { actionListener.onResponse(new DeleteJobAction.Response(false)); } @@ -336,10 +329,6 @@ public class JobManager extends AbstractComponent { }); } - public Auditor audit(String jobId) { - return jobProvider.audit(jobId); - } - public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionListener actionListener, ModelSnapshot modelSnapshot) { @@ -349,8 +338,7 @@ public class JobManager extends AbstractComponent { @Override protected RevertModelSnapshotAction.Response newResponse(boolean acknowledged) { if (acknowledged) { - audit(request.getJobId()) - .info(Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); + auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); return new RevertModelSnapshotAction.Response(modelSnapshot); } throw new IllegalStateException("Could not revert modelSnapshot on job [" diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index 0761c82c46a..bb12eef1a55 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -661,6 +661,9 @@ public class ElasticsearchMappings { .startObject(AuditMessage.TIMESTAMP.getPreferredName()) .field(TYPE, DATE) .endObject() + .startObject(AuditMessage.NODE_NAME.getPreferredName()) + .field(TYPE, KEYWORD) + .endObject() .endObject() .endObject() .endObject(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 5aa8dd8f32c..b24f1d932af 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -1174,14 +1174,4 @@ public class JobProvider { public void getFilters(Consumer> handler, Consumer errorHandler, Set ids) { mget(ML_META_INDEX, MlFilter.TYPE.getPreferredName(), ids, handler, errorHandler, MlFilter.PARSER); } - - /** - * Get an auditor for the given job - * - * @param jobId the job id - * @return the {@code Auditor} - */ - public Auditor audit(String jobId) { - return new Auditor(client, jobId); - } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index d20dac047e1..48c53792577 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -42,12 +42,12 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { private static final Logger LOGGER = Loggers.getLogger(ExpiredResultsRemover.class); private final Client client; - private final Function auditorSupplier; + private final Auditor auditor; - public ExpiredResultsRemover(Client client, ClusterService clusterService, Function auditorSupplier) { + public ExpiredResultsRemover(Client client, ClusterService clusterService, Auditor auditor) { super(clusterService); this.client = Objects.requireNonNull(client); - this.auditorSupplier = Objects.requireNonNull(auditorSupplier); + this.auditor = Objects.requireNonNull(auditor); } @Override @@ -99,6 +99,6 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(instant, ZoneOffset.systemDefault()); String formatted = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(zonedDateTime); String msg = Messages.getMessage(Messages.JOB_AUDIT_OLD_RESULTS_DELETED, formatted); - auditorSupplier.apply(jobId).info(msg); + auditor.info(jobId, msg); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/notifications/AuditMessage.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/notifications/AuditMessage.java index 4e4af11af5d..bf5a0a7b6ae 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/notifications/AuditMessage.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/notifications/AuditMessage.java @@ -28,6 +28,7 @@ public class AuditMessage extends ToXContentToBytes implements Writeable { public static final ParseField MESSAGE = new ParseField("message"); public static final ParseField LEVEL = new ParseField("level"); public static final ParseField TIMESTAMP = new ParseField("timestamp"); + public static final ParseField NODE_NAME = new ParseField("node_name"); public static final ObjectParser PARSER = new ObjectParser<>(TYPE.getPreferredName(), AuditMessage::new); @@ -49,22 +50,25 @@ public class AuditMessage extends ToXContentToBytes implements Writeable { } throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]"); }, TIMESTAMP, ValueType.VALUE); + PARSER.declareString(AuditMessage::setNodeName, NODE_NAME); } private String jobId; private String message; private Level level; private Date timestamp; + private String nodeName; + + private AuditMessage() { - public AuditMessage() { - // Default constructor } - private AuditMessage(String jobId, String message, Level level) { + AuditMessage(String jobId, String message, Level level, String nodeName) { this.jobId = jobId; this.message = message; this.level = level; timestamp = new Date(); + this.nodeName = nodeName; } public AuditMessage(StreamInput in) throws IOException { @@ -76,6 +80,7 @@ public class AuditMessage extends ToXContentToBytes implements Writeable { if (in.readBoolean()) { timestamp = new Date(in.readLong()); } + nodeName = in.readOptionalString(); } @Override @@ -92,6 +97,7 @@ public class AuditMessage extends ToXContentToBytes implements Writeable { if (hasTimestamp) { out.writeLong(timestamp.getTime()); } + out.writeOptionalString(nodeName); } public String getJobId() { @@ -126,20 +132,28 @@ public class AuditMessage extends ToXContentToBytes implements Writeable { this.timestamp = timestamp; } - public static AuditMessage newInfo(String jobId, String message) { - return new AuditMessage(jobId, message, Level.INFO); + public String getNodeName() { + return nodeName; } - public static AuditMessage newWarning(String jobId, String message) { - return new AuditMessage(jobId, message, Level.WARNING); + public void setNodeName(String nodeName) { + this.nodeName = nodeName; } - public static AuditMessage newActivity(String jobId, String message) { - return new AuditMessage(jobId, message, Level.ACTIVITY); + public static AuditMessage newInfo(String jobId, String message, String nodeName) { + return new AuditMessage(jobId, message, Level.INFO, nodeName); } - public static AuditMessage newError(String jobId, String message) { - return new AuditMessage(jobId, message, Level.ERROR); + public static AuditMessage newWarning(String jobId, String message, String nodeName) { + return new AuditMessage(jobId, message, Level.WARNING, nodeName); + } + + public static AuditMessage newActivity(String jobId, String message, String nodeName) { + return new AuditMessage(jobId, message, Level.ACTIVITY, nodeName); + } + + public static AuditMessage newError(String jobId, String message, String nodeName) { + return new AuditMessage(jobId, message, Level.ERROR, nodeName); } @Override @@ -157,6 +171,9 @@ public class AuditMessage extends ToXContentToBytes implements Writeable { if (timestamp != null) { builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime()); } + if (nodeName != null) { + builder.field(NODE_NAME.getPreferredName(), nodeName); + } builder.endObject(); return builder; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/notifications/Auditor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/notifications/Auditor.java index 00ea92ffa80..ad7f2f64fe7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/notifications/Auditor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/notifications/Auditor.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.xcontent.ToXContent; @@ -26,28 +27,23 @@ public class Auditor { private static final Logger LOGGER = Loggers.getLogger(Auditor.class); private final Client client; - private final String jobId; + private final ClusterService clusterService; - public Auditor(Client client, String jobId) { + public Auditor(Client client, ClusterService clusterService) { this.client = Objects.requireNonNull(client); - this.jobId = jobId; + this.clusterService = clusterService; } - public void info(String message) { - indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newInfo(jobId, message)); + public void info(String jobId, String message) { + indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newInfo(jobId, message, clusterService.localNode().getName())); } - public void warning(String message) { - indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newWarning(jobId, message)); + public void warning(String jobId, String message) { + indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newWarning(jobId, message, clusterService.localNode().getName())); } - public void error(String message) { - indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newError(jobId, message)); - } - - public void activity(int totalJobs, int totalDetectors, int runningJobs, int runningDetectors) { - String type = AuditActivity.TYPE.getPreferredName(); - indexDoc(type, AuditActivity.newActivity(totalJobs, totalDetectors, runningJobs, runningDetectors)); + public void error(String jobId, String message) { + indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newError(jobId, message, clusterService.localNode().getName())); } private void indexDoc(String type, ToXContent toXContent) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index 9dd7fd5c927..d384c88c98e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -47,6 +47,7 @@ public class MlInitializationServiceTests extends ESTestCase { private ClusterService clusterService; private Client client; private JobProvider jobProvider; + private Auditor auditor; @Before public void setUpMocks() { @@ -55,6 +56,7 @@ public class MlInitializationServiceTests extends ESTestCase { clusterService = mock(ClusterService.class); client = mock(Client.class); jobProvider = mock(JobProvider.class); + auditor = mock(Auditor.class); doAnswer(invocation -> { ((Runnable) invocation.getArguments()[0]).run(); @@ -70,7 +72,7 @@ public class MlInitializationServiceTests extends ESTestCase { ClusterService clusterService = mock(ClusterService.class); MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider); + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() @@ -91,7 +93,7 @@ public class MlInitializationServiceTests extends ESTestCase { public void testInitialize_noMasterNode() throws Exception { MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider); + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() @@ -120,7 +122,7 @@ public class MlInitializationServiceTests extends ESTestCase { ClusterService clusterService = mock(ClusterService.class); JobProvider jobProvider = mockJobProvider(); MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider); + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() @@ -173,7 +175,7 @@ public class MlInitializationServiceTests extends ESTestCase { ClusterService clusterService = mock(ClusterService.class); JobProvider jobProvider = mockJobProvider(); MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider); + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() @@ -221,7 +223,7 @@ public class MlInitializationServiceTests extends ESTestCase { public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception { MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider); + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class); initializationService.setDailyManagementService(initialDailyManagementService); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java index 30a1b1f6df8..8914540315c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.mock.orig.Mockito; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -40,13 +41,16 @@ import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.notifications.AuditMessage; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction.Response; import org.junit.Before; +import org.mockito.ArgumentCaptor; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -77,7 +81,6 @@ import static org.mockito.Mockito.when; public class DatafeedJobRunnerTests extends ESTestCase { private Client client; - private Auditor auditor; private ActionFuture jobDataFuture; private ActionFuture flushJobFuture; private ClusterService clusterService; @@ -85,6 +88,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { private DataExtractorFactory dataExtractorFactory; private DatafeedJobRunner datafeedJobRunner; private long currentTime = 120000; + private Auditor auditor; @Before @SuppressWarnings("unchecked") @@ -107,9 +111,18 @@ public class DatafeedJobRunnerTests extends ESTestCase { clusterService = mock(ClusterService.class); when(clusterService.state()).thenReturn(cs.build()); - client = mock(Client.class); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(XContentBuilder.class); + client = new MockClientBuilder("foo") + .prepareIndex(Auditor.NOTIFICATIONS_INDEX, AuditMessage.TYPE.getPreferredName(), "responseId", argumentCaptor) + .build(); + jobDataFuture = mock(ActionFuture.class); flushJobFuture = mock(ActionFuture.class); + DiscoveryNode dNode = mock(DiscoveryNode.class); + when(dNode.getName()).thenReturn("this_node_has_a_name"); + when(clusterService.localNode()).thenReturn(dNode); + auditor = mock(Auditor.class); JobProvider jobProvider = mock(JobProvider.class); Mockito.doAnswer(invocationOnMock -> { @@ -132,14 +145,13 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(client.execute(same(PostDataAction.INSTANCE), any())).thenReturn(jobDataFuture); when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); - datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider, () -> currentTime) { + datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider, () -> currentTime, auditor) { @Override DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) { return dataExtractorFactory; } }; - when(jobProvider.audit(anyString())).thenReturn(auditor); doAnswer(invocationOnMock -> { @SuppressWarnings("rawtypes") Consumer consumer = (Consumer) invocationOnMock.getArguments()[3]; @@ -230,7 +242,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { datafeedJobRunner.doDatafeedRealtime(10L, "foo", holder); verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME), any()); - verify(auditor, times(1)).warning(anyString()); + verify(auditor, times(1)).warning(eq("job_id"), anyString()); verify(client, never()).execute(same(PostDataAction.INSTANCE), any()); verify(client, never()).execute(same(FlushJobAction.INSTANCE), any()); } @@ -266,7 +278,8 @@ public class DatafeedJobRunnerTests extends ESTestCase { Job.Builder jobBuilder = createDatafeedJob(); jobBuilder.setDataDescription(dataDescription); DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build(); - DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime); + DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), + () -> currentTime, auditor); DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig, jobBuilder.build()); @@ -280,7 +293,8 @@ public class DatafeedJobRunnerTests extends ESTestCase { jobBuilder.setDataDescription(dataDescription); DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo"); datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto()); - DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime); + DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), + () -> currentTime, auditor); DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build()); @@ -294,7 +308,8 @@ public class DatafeedJobRunnerTests extends ESTestCase { jobBuilder.setDataDescription(dataDescription); DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo"); datafeedConfig.setChunkingConfig(ChunkingConfig.newOff()); - DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime); + DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), + () -> currentTime, auditor); DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build()); @@ -308,7 +323,8 @@ public class DatafeedJobRunnerTests extends ESTestCase { jobBuilder.setDataDescription(dataDescription); DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo"); datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(AggregationBuilders.avg("a"))); - DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime); + DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), + () -> currentTime, auditor); DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build()); @@ -323,7 +339,8 @@ public class DatafeedJobRunnerTests extends ESTestCase { DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo"); datafeedConfig.setAggregations(AggregatorFactories.builder()); datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto()); - DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime); + DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), + () -> currentTime, auditor); DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java index fb2eea13975..4d3458d8577 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java @@ -23,20 +23,20 @@ public class ProblemTrackerTests extends ESTestCase { @Before public void setUpTests() { auditor = mock(Auditor.class); - problemTracker = new ProblemTracker(() -> auditor); + problemTracker = new ProblemTracker(auditor, "foo"); } public void testReportExtractionProblem() { problemTracker.reportExtractionProblem("foo"); - verify(auditor).error("Datafeed is encountering errors extracting data: foo"); + verify(auditor).error("foo", "Datafeed is encountering errors extracting data: foo"); assertTrue(problemTracker.hasProblems()); } public void testReportAnalysisProblem() { problemTracker.reportAnalysisProblem("foo"); - verify(auditor).error("Datafeed is encountering errors submitting data for analysis: foo"); + verify(auditor).error("foo", "Datafeed is encountering errors submitting data for analysis: foo"); assertTrue(problemTracker.hasProblems()); } @@ -44,7 +44,7 @@ public class ProblemTrackerTests extends ESTestCase { problemTracker.reportExtractionProblem("foo"); problemTracker.reportAnalysisProblem("foo"); - verify(auditor, times(1)).error("Datafeed is encountering errors extracting data: foo"); + verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: foo"); assertTrue(problemTracker.hasProblems()); } @@ -53,7 +53,7 @@ public class ProblemTrackerTests extends ESTestCase { problemTracker.finishReport(); problemTracker.reportExtractionProblem("foo"); - verify(auditor, times(1)).error("Datafeed is encountering errors extracting data: foo"); + verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: foo"); assertTrue(problemTracker.hasProblems()); } @@ -70,7 +70,7 @@ public class ProblemTrackerTests extends ESTestCase { problemTracker.reportEmptyDataCount(); } - verify(auditor).warning("Datafeed has been retrieving no data for a while"); + verify(auditor).warning("foo", "Datafeed has been retrieving no data for a while"); } public void testUpdateEmptyDataCount_GivenEmptyElevenTimes() { @@ -78,7 +78,7 @@ public class ProblemTrackerTests extends ESTestCase { problemTracker.reportEmptyDataCount(); } - verify(auditor, times(1)).warning("Datafeed has been retrieving no data for a while"); + verify(auditor, times(1)).warning("foo", "Datafeed has been retrieving no data for a while"); } public void testUpdateEmptyDataCount_GivenNonEmptyAfterNineEmpty() { @@ -96,8 +96,8 @@ public class ProblemTrackerTests extends ESTestCase { } problemTracker.reportNoneEmptyCount(); - verify(auditor).warning("Datafeed has been retrieving no data for a while"); - verify(auditor).info("Datafeed has started retrieving data again"); + verify(auditor).warning("foo", "Datafeed has been retrieving no data for a while"); + verify(auditor).info("foo", "Datafeed has started retrieving data again"); } public void testFinishReport_GivenNoProblems() { @@ -112,8 +112,8 @@ public class ProblemTrackerTests extends ESTestCase { problemTracker.finishReport(); problemTracker.finishReport(); - verify(auditor).error("Datafeed is encountering errors extracting data: bar"); - verify(auditor).info("Datafeed has recovered data extraction and analysis"); + verify(auditor).error("foo", "Datafeed is encountering errors extracting data: bar"); + verify(auditor).info("foo", "Datafeed has recovered data extraction and analysis"); assertFalse(problemTracker.hasProblems()); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 0fe53ea983b..7054d91e5ff 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -36,13 +36,13 @@ public class JobManagerTests extends ESTestCase { private ClusterService clusterService; private JobProvider jobProvider; + private Auditor auditor; @Before public void setupMocks() { clusterService = mock(ClusterService.class); jobProvider = mock(JobProvider.class); - Auditor auditor = mock(Auditor.class); - when(jobProvider.audit(anyString())).thenReturn(auditor); + auditor = mock(Auditor.class); } public void testGetJob() { @@ -121,7 +121,7 @@ public class JobManagerTests extends ESTestCase { private JobManager createJobManager() { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class); - return new JobManager(settings, jobProvider, jobResultsPersister, clusterService); + return new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor); } private ClusterState createClusterState() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index 8bc40327d52..cd5eff0e53b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -156,6 +156,6 @@ public class ExpiredResultsRemoverTests extends ESTestCase { } private ExpiredResultsRemover createExpiredResultsRemover() { - return new ExpiredResultsRemover(client, clusterService, jobId -> mock(Auditor.class)); + return new ExpiredResultsRemover(client, clusterService, mock(Auditor.class)); } } \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/notifications/AuditMessageTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/notifications/AuditMessageTests.java index 3bbd84e3c1a..da9c80e4c4e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/notifications/AuditMessageTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/notifications/AuditMessageTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.notifications; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; -import org.elasticsearch.xpack.ml.utils.time.TimeUtils; import org.junit.Before; import java.util.Date; @@ -21,15 +20,8 @@ public class AuditMessageTests extends AbstractSerializingTestCase startMillis = System.currentTimeMillis(); } - public void testDefaultConstructor() { - AuditMessage auditMessage = new AuditMessage(); - assertNull(auditMessage.getMessage()); - assertNull(auditMessage.getLevel()); - assertNull(auditMessage.getTimestamp()); - } - public void testNewInfo() { - AuditMessage info = AuditMessage.newInfo("foo", "some info"); + AuditMessage info = AuditMessage.newInfo("foo", "some info", "some_node"); assertEquals("foo", info.getJobId()); assertEquals("some info", info.getMessage()); assertEquals(Level.INFO, info.getLevel()); @@ -37,7 +29,7 @@ public class AuditMessageTests extends AbstractSerializingTestCase } public void testNewWarning() { - AuditMessage warning = AuditMessage.newWarning("bar", "some warning"); + AuditMessage warning = AuditMessage.newWarning("bar", "some warning", "some_node"); assertEquals("bar", warning.getJobId()); assertEquals("some warning", warning.getMessage()); assertEquals(Level.WARNING, warning.getLevel()); @@ -46,7 +38,7 @@ public class AuditMessageTests extends AbstractSerializingTestCase public void testNewError() { - AuditMessage error = AuditMessage.newError("foo", "some error"); + AuditMessage error = AuditMessage.newError("foo", "some error", "some_node"); assertEquals("foo", error.getJobId()); assertEquals("some error", error.getMessage()); assertEquals(Level.ERROR, error.getLevel()); @@ -54,7 +46,7 @@ public class AuditMessageTests extends AbstractSerializingTestCase } public void testNewActivity() { - AuditMessage error = AuditMessage.newActivity("foo", "some error"); + AuditMessage error = AuditMessage.newActivity("foo", "some error", "some_node"); assertEquals("foo", error.getJobId()); assertEquals("some error", error.getMessage()); assertEquals(Level.ACTIVITY, error.getLevel()); @@ -74,20 +66,8 @@ public class AuditMessageTests extends AbstractSerializingTestCase @Override protected AuditMessage createTestInstance() { - AuditMessage message = new AuditMessage(); - if (randomBoolean()) { - message.setJobId(randomAsciiOfLengthBetween(1, 20)); - } - if (randomBoolean()) { - message.setMessage(randomAsciiOfLengthBetween(1, 200)); - } - if (randomBoolean()) { - message.setLevel(randomFrom(Level.values())); - } - if (randomBoolean()) { - message.setTimestamp(new Date(TimeUtils.dateStringToEpoch(randomTimeValue()))); - } - return message; + return new AuditMessage(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 200), + randomFrom(Level.values()), randomAsciiOfLengthBetween(1, 20)); } @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/notifications/AuditorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/notifications/AuditorTests.java index dad1307c923..8eb475f55cb 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/notifications/AuditorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/notifications/AuditorTests.java @@ -9,6 +9,8 @@ import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -16,15 +18,16 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.ESTestCase; import org.junit.Before; import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import java.io.IOException; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class AuditorTests extends ESTestCase { private Client client; + private ClusterService clusterService; private ListenableActionFuture indexResponse; private ArgumentCaptor indexCaptor; private ArgumentCaptor typeCaptor; @@ -33,17 +36,22 @@ public class AuditorTests extends ESTestCase { @SuppressWarnings("unchecked") @Before public void setUpMocks() { - client = Mockito.mock(Client.class); - indexResponse = Mockito.mock(ListenableActionFuture.class); + client = mock(Client.class); + clusterService = mock(ClusterService.class); + DiscoveryNode dNode = mock(DiscoveryNode.class); + when(dNode.getName()).thenReturn("this_node_has_a_name"); + when(clusterService.localNode()).thenReturn(dNode); + + indexResponse = mock(ListenableActionFuture.class); indexCaptor = ArgumentCaptor.forClass(String.class); typeCaptor = ArgumentCaptor.forClass(String.class); jsonCaptor = ArgumentCaptor.forClass(XContentBuilder.class); } - public void testInfo() { + public void testInfo() throws IOException { givenClientPersistsSuccessfully(); - Auditor auditor = new Auditor(client, "foo"); - auditor.info("Here is my info"); + Auditor auditor = new Auditor(client, clusterService); + auditor.info("foo", "Here is my info"); assertEquals(".ml-notifications", indexCaptor.getValue()); assertEquals("audit_message", typeCaptor.getValue()); AuditMessage auditMessage = parseAuditMessage(); @@ -52,10 +60,10 @@ public class AuditorTests extends ESTestCase { assertEquals(Level.INFO, auditMessage.getLevel()); } - public void testWarning() { + public void testWarning() throws IOException { givenClientPersistsSuccessfully(); - Auditor auditor = new Auditor(client, "bar"); - auditor.warning("Here is my warning"); + Auditor auditor = new Auditor(client, clusterService); + auditor.warning("bar", "Here is my warning"); assertEquals(".ml-notifications", indexCaptor.getValue()); assertEquals("audit_message", typeCaptor.getValue()); AuditMessage auditMessage = parseAuditMessage(); @@ -64,10 +72,10 @@ public class AuditorTests extends ESTestCase { assertEquals(Level.WARNING, auditMessage.getLevel()); } - public void testError() { + public void testError() throws IOException { givenClientPersistsSuccessfully(); - Auditor auditor = new Auditor(client, "foobar"); - auditor.error("Here is my error"); + Auditor auditor = new Auditor(client, clusterService); + auditor.error("foobar", "Here is my error"); assertEquals(".ml-notifications", indexCaptor.getValue()); assertEquals("audit_message", typeCaptor.getValue()); AuditMessage auditMessage = parseAuditMessage(); @@ -76,21 +84,8 @@ public class AuditorTests extends ESTestCase { assertEquals(Level.ERROR, auditMessage.getLevel()); } - public void testActivity_GivenNumbers() { - givenClientPersistsSuccessfully(); - Auditor auditor = new Auditor(client, ""); - auditor.activity(10, 100, 5, 50); - assertEquals(".ml-notifications", indexCaptor.getValue()); - assertEquals("audit_activity", typeCaptor.getValue()); - AuditActivity auditActivity = parseAuditActivity(); - assertEquals(10, auditActivity.getTotalJobs()); - assertEquals(100, auditActivity.getTotalDetectors()); - assertEquals(5, auditActivity.getRunningJobs()); - assertEquals(50, auditActivity.getRunningDetectors()); - } - private void givenClientPersistsSuccessfully() { - IndexRequestBuilder indexRequestBuilder = Mockito.mock(IndexRequestBuilder.class); + IndexRequestBuilder indexRequestBuilder = mock(IndexRequestBuilder.class); when(indexRequestBuilder.setSource(jsonCaptor.capture())).thenReturn(indexRequestBuilder); when(indexRequestBuilder.execute()).thenReturn(indexResponse); when(client.prepareIndex(indexCaptor.capture(), typeCaptor.capture(), any())) @@ -99,23 +94,9 @@ public class AuditorTests extends ESTestCase { .thenReturn(indexRequestBuilder); } - private AuditMessage parseAuditMessage() { - try { - String json = jsonCaptor.getValue().string(); - XContentParser parser = XContentFactory.xContent(json).createParser(NamedXContentRegistry.EMPTY, json); - return AuditMessage.PARSER.apply(parser, null); - } catch (IOException e) { - return new AuditMessage(); - } - } - - private AuditActivity parseAuditActivity() { - try { - String json = jsonCaptor.getValue().string(); - XContentParser parser = XContentFactory.xContent(json).createParser(NamedXContentRegistry.EMPTY, json); - return AuditActivity.PARSER.apply(parser, null); - } catch (IOException e) { - return new AuditActivity(); - } + private AuditMessage parseAuditMessage() throws IOException { + String json = jsonCaptor.getValue().string(); + XContentParser parser = XContentFactory.xContent(json).createParser(NamedXContentRegistry.EMPTY, json); + return AuditMessage.PARSER.apply(parser, null); } }