[ML] Refactor Auditor (elastic/x-pack-elasticsearch#631)
* Refactor Auditor * Inject Auditor Original commit: elastic/x-pack-elasticsearch@d9efe5fcef
This commit is contained in:
parent
a189e1b759
commit
08b7cb6bca
|
@ -87,6 +87,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.MultiplyingNormalizerPr
|
|||
import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory;
|
||||
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
|
||||
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.rest.datafeeds.RestDeleteDatafeedAction;
|
||||
import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedStatsAction;
|
||||
import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedsAction;
|
||||
|
@ -252,7 +253,8 @@ public class MachineLearning extends Plugin implements ActionPlugin {
|
|||
JobProvider jobProvider = new JobProvider(client, 1, settings);
|
||||
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client);
|
||||
|
||||
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService);
|
||||
Auditor auditor = new Auditor(client, clusterService);
|
||||
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor);
|
||||
AutodetectProcessFactory autodetectProcessFactory;
|
||||
NormalizerProcessFactory normalizerProcessFactory;
|
||||
if (AUTODETECT_PROCESS.get(settings)) {
|
||||
|
@ -281,7 +283,7 @@ public class MachineLearning extends Plugin implements ActionPlugin {
|
|||
AutodetectProcessManager dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider,
|
||||
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory);
|
||||
DatafeedJobRunner datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider,
|
||||
System::currentTimeMillis);
|
||||
System::currentTimeMillis, auditor);
|
||||
PersistentActionService persistentActionService = new PersistentActionService(Settings.EMPTY, threadPool, clusterService, client);
|
||||
PersistentActionRegistry persistentActionRegistry = new PersistentActionRegistry(Settings.EMPTY);
|
||||
|
||||
|
@ -289,12 +291,13 @@ public class MachineLearning extends Plugin implements ActionPlugin {
|
|||
jobProvider,
|
||||
jobManager,
|
||||
dataProcessor,
|
||||
new MlInitializationService(settings, threadPool, clusterService, client, jobProvider),
|
||||
new MlInitializationService(settings, threadPool, clusterService, client, jobProvider, auditor),
|
||||
jobDataCountsPersister,
|
||||
datafeedJobRunner,
|
||||
persistentActionService,
|
||||
persistentActionRegistry,
|
||||
new PersistentTaskClusterService(Settings.EMPTY, persistentActionRegistry, clusterService)
|
||||
new PersistentTaskClusterService(Settings.EMPTY, persistentActionRegistry, clusterService),
|
||||
auditor
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ public class MlInitializationService extends AbstractComponent implements Cluste
|
|||
private final ClusterService clusterService;
|
||||
private final Client client;
|
||||
private final JobProvider jobProvider;
|
||||
private final Auditor auditor;
|
||||
|
||||
private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false);
|
||||
private final AtomicBoolean putMlNotificationsIndexTemplateCheck = new AtomicBoolean(false);
|
||||
|
@ -43,12 +44,13 @@ public class MlInitializationService extends AbstractComponent implements Cluste
|
|||
private volatile MlDailyManagementService mlDailyManagementService;
|
||||
|
||||
public MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client,
|
||||
JobProvider jobProvider) {
|
||||
JobProvider jobProvider, Auditor auditor) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.client = client;
|
||||
this.jobProvider = jobProvider;
|
||||
this.auditor = auditor;
|
||||
clusterService.addListener(this);
|
||||
clusterService.addLifecycleListener(new LifecycleListener() {
|
||||
@Override
|
||||
|
@ -179,9 +181,8 @@ public class MlInitializationService extends AbstractComponent implements Cluste
|
|||
|
||||
private void installDailyManagementService() {
|
||||
if (mlDailyManagementService == null) {
|
||||
mlDailyManagementService = new MlDailyManagementService(threadPool,
|
||||
Arrays.asList((MlDailyManagementService.Listener)
|
||||
new ExpiredResultsRemover(client, clusterService, jobId -> jobProvider.audit(jobId)),
|
||||
mlDailyManagementService = new MlDailyManagementService(threadPool, Arrays.asList(
|
||||
new ExpiredResultsRemover(client, clusterService, auditor),
|
||||
new ExpiredModelSnapshotsRemover(client, clusterService)
|
||||
));
|
||||
mlDailyManagementService.start();
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.xpack.ml.job.messages.Messages;
|
|||
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -133,17 +134,19 @@ public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.
|
|||
private final JobProvider jobProvider;
|
||||
private final JobManager jobManager;
|
||||
private final ClusterService clusterService;
|
||||
private final Auditor auditor;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
JobProvider jobProvider, JobManager jobManager, ClusterService clusterService,
|
||||
Client client) {
|
||||
Client client, Auditor auditor) {
|
||||
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
|
||||
this.client = client;
|
||||
this.jobProvider = jobProvider;
|
||||
this.jobManager = jobManager;
|
||||
this.clusterService = clusterService;
|
||||
this.auditor = auditor;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -193,7 +196,7 @@ public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.
|
|||
}
|
||||
});
|
||||
|
||||
jobManager.audit(request.getJobId()).info(Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_DELETED,
|
||||
auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_DELETED,
|
||||
deleteCandidate.getDescription()));
|
||||
}, listener::onFailure);
|
||||
}
|
||||
|
|
|
@ -73,7 +73,7 @@ class DatafeedJob {
|
|||
if (isLookbackOnly) {
|
||||
return null;
|
||||
} else {
|
||||
auditor.info(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STARTED_REALTIME));
|
||||
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STARTED_REALTIME));
|
||||
return nextRealtimeTimestamp();
|
||||
}
|
||||
}
|
||||
|
@ -81,17 +81,17 @@ class DatafeedJob {
|
|||
String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STARTED_FROM_TO,
|
||||
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackStartTimeMs),
|
||||
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackEnd));
|
||||
auditor.info(msg);
|
||||
auditor.info(jobId, msg);
|
||||
|
||||
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
|
||||
request.setCalcInterim(true);
|
||||
run(lookbackStartTimeMs, lookbackEnd, request);
|
||||
auditor.info(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_LOOKBACK_COMPLETED));
|
||||
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_LOOKBACK_COMPLETED));
|
||||
LOGGER.info("[{}] Lookback has finished", jobId);
|
||||
if (isLookbackOnly) {
|
||||
return null;
|
||||
} else {
|
||||
auditor.info(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_CONTINUED_REALTIME));
|
||||
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_CONTINUED_REALTIME));
|
||||
return nextRealtimeTimestamp();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,15 +57,17 @@ public class DatafeedJobRunner extends AbstractComponent {
|
|||
private final JobProvider jobProvider;
|
||||
private final ThreadPool threadPool;
|
||||
private final Supplier<Long> currentTimeSupplier;
|
||||
private final Auditor auditor;
|
||||
|
||||
public DatafeedJobRunner(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider,
|
||||
Supplier<Long> currentTimeSupplier) {
|
||||
Supplier<Long> currentTimeSupplier, Auditor auditor) {
|
||||
super(Settings.EMPTY);
|
||||
this.client = Objects.requireNonNull(client);
|
||||
this.clusterService = Objects.requireNonNull(clusterService);
|
||||
this.jobProvider = Objects.requireNonNull(jobProvider);
|
||||
this.threadPool = threadPool;
|
||||
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
|
||||
this.auditor = auditor;
|
||||
}
|
||||
|
||||
public void run(StartDatafeedAction.DatafeedTask task, Consumer<Exception> handler) {
|
||||
|
@ -174,13 +176,12 @@ public class DatafeedJobRunner extends AbstractComponent {
|
|||
|
||||
Holder createJobDatafeed(DatafeedConfig datafeed, Job job, long finalBucketEndMs, long latestRecordTimeMs,
|
||||
Consumer<Exception> handler, StartDatafeedAction.DatafeedTask task) {
|
||||
Auditor auditor = jobProvider.audit(job.getId());
|
||||
Duration frequency = getFrequencyOrDefault(datafeed, job);
|
||||
Duration queryDelay = Duration.ofSeconds(datafeed.getQueryDelay());
|
||||
DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job);
|
||||
DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(),
|
||||
dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs);
|
||||
Holder holder = new Holder(datafeed, datafeedJob, task.isLookbackOnly(), new ProblemTracker(() -> auditor), handler);
|
||||
Holder holder = new Holder(datafeed, datafeedJob, task.isLookbackOnly(), new ProblemTracker(auditor, job.getId()), handler);
|
||||
task.setHolder(holder);
|
||||
return holder;
|
||||
}
|
||||
|
@ -284,7 +285,7 @@ public class DatafeedJobRunner extends AbstractComponent {
|
|||
if (datafeedJob.stop()) {
|
||||
FutureUtils.cancel(future);
|
||||
handler.accept(e);
|
||||
jobProvider.audit(datafeed.getJobId()).info(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
|
||||
auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
|
||||
logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId());
|
||||
if (autoCloseJob) {
|
||||
closeJob();
|
||||
|
|
|
@ -9,7 +9,6 @@ import org.elasticsearch.xpack.ml.job.messages.Messages;
|
|||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
@ -27,16 +26,17 @@ class ProblemTracker {
|
|||
|
||||
private static final int EMPTY_DATA_WARN_COUNT = 10;
|
||||
|
||||
private final Supplier<Auditor> auditor;
|
||||
private final Auditor auditor;
|
||||
private final String jobId;
|
||||
|
||||
private volatile boolean hasProblems;
|
||||
private volatile boolean hadProblems;
|
||||
private volatile String previousProblem;
|
||||
|
||||
private volatile int emptyDataCount;
|
||||
|
||||
ProblemTracker(Supplier<Auditor> auditor) {
|
||||
ProblemTracker(Auditor auditor, String jobId) {
|
||||
this.auditor = Objects.requireNonNull(auditor);
|
||||
this.jobId = Objects.requireNonNull(jobId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -66,7 +66,7 @@ class ProblemTracker {
|
|||
hasProblems = true;
|
||||
if (!Objects.equals(previousProblem, problemMessage)) {
|
||||
previousProblem = problemMessage;
|
||||
auditor.get().error(Messages.getMessage(template, problemMessage));
|
||||
auditor.error(jobId, Messages.getMessage(template, problemMessage));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -78,14 +78,14 @@ class ProblemTracker {
|
|||
if (emptyDataCount < EMPTY_DATA_WARN_COUNT) {
|
||||
emptyDataCount++;
|
||||
if (emptyDataCount == EMPTY_DATA_WARN_COUNT) {
|
||||
auditor.get().warning(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_NO_DATA));
|
||||
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_NO_DATA));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void reportNoneEmptyCount() {
|
||||
if (emptyDataCount >= EMPTY_DATA_WARN_COUNT) {
|
||||
auditor.get().info(Messages.getMessage(Messages.JOB_AUDIR_DATAFEED_DATA_SEEN_AGAIN));
|
||||
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIR_DATAFEED_DATA_SEEN_AGAIN));
|
||||
}
|
||||
emptyDataCount = 0;
|
||||
}
|
||||
|
@ -99,7 +99,7 @@ class ProblemTracker {
|
|||
*/
|
||||
public void finishReport() {
|
||||
if (!hasProblems && hadProblems) {
|
||||
auditor.get().info(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_RECOVERED));
|
||||
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_RECOVERED));
|
||||
}
|
||||
|
||||
hadProblems = hasProblems;
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
|||
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
|
||||
|
@ -56,27 +55,21 @@ import java.util.stream.Collectors;
|
|||
*/
|
||||
public class JobManager extends AbstractComponent {
|
||||
|
||||
/**
|
||||
* Field name in which to store the API version in the usage info
|
||||
*/
|
||||
public static final String APP_VER_FIELDNAME = "appVer";
|
||||
|
||||
public static final String DEFAULT_RECORD_SORT_FIELD = AnomalyRecord.PROBABILITY.getPreferredName();
|
||||
|
||||
private final JobProvider jobProvider;
|
||||
private final ClusterService clusterService;
|
||||
private final JobResultsPersister jobResultsPersister;
|
||||
|
||||
private final Auditor auditor;
|
||||
|
||||
/**
|
||||
* Create a JobManager
|
||||
*/
|
||||
public JobManager(Settings settings, JobProvider jobProvider, JobResultsPersister jobResultsPersister,
|
||||
ClusterService clusterService) {
|
||||
ClusterService clusterService, Auditor auditor) {
|
||||
super(settings);
|
||||
this.jobProvider = Objects.requireNonNull(jobProvider);
|
||||
this.clusterService = clusterService;
|
||||
this.jobResultsPersister = jobResultsPersister;
|
||||
this.auditor = auditor;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -169,7 +162,7 @@ public class JobManager extends AbstractComponent {
|
|||
jobProvider.createJobResultIndex(job, state, new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(Boolean indicesCreated) {
|
||||
audit(job.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED));
|
||||
auditor.info(job.getId(), Messages.getMessage(Messages.JOB_AUDIT_CREATED));
|
||||
|
||||
// Also I wonder if we need to audit log infra
|
||||
// structure in ml as when we merge into xpack
|
||||
|
@ -269,7 +262,7 @@ public class JobManager extends AbstractComponent {
|
|||
if (jobDeleted) {
|
||||
logger.info("Job [" + jobId + "] deleted.");
|
||||
actionListener.onResponse(new DeleteJobAction.Response(true));
|
||||
audit(jobId).info(Messages.getMessage(Messages.JOB_AUDIT_DELETED));
|
||||
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DELETED));
|
||||
} else {
|
||||
actionListener.onResponse(new DeleteJobAction.Response(false));
|
||||
}
|
||||
|
@ -336,10 +329,6 @@ public class JobManager extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
public Auditor audit(String jobId) {
|
||||
return jobProvider.audit(jobId);
|
||||
}
|
||||
|
||||
public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionListener<RevertModelSnapshotAction.Response> actionListener,
|
||||
ModelSnapshot modelSnapshot) {
|
||||
|
||||
|
@ -349,8 +338,7 @@ public class JobManager extends AbstractComponent {
|
|||
@Override
|
||||
protected RevertModelSnapshotAction.Response newResponse(boolean acknowledged) {
|
||||
if (acknowledged) {
|
||||
audit(request.getJobId())
|
||||
.info(Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription()));
|
||||
auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription()));
|
||||
return new RevertModelSnapshotAction.Response(modelSnapshot);
|
||||
}
|
||||
throw new IllegalStateException("Could not revert modelSnapshot on job ["
|
||||
|
|
|
@ -661,6 +661,9 @@ public class ElasticsearchMappings {
|
|||
.startObject(AuditMessage.TIMESTAMP.getPreferredName())
|
||||
.field(TYPE, DATE)
|
||||
.endObject()
|
||||
.startObject(AuditMessage.NODE_NAME.getPreferredName())
|
||||
.field(TYPE, KEYWORD)
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject();
|
||||
|
|
|
@ -1174,14 +1174,4 @@ public class JobProvider {
|
|||
public void getFilters(Consumer<Set<MlFilter>> handler, Consumer<Exception> errorHandler, Set<String> ids) {
|
||||
mget(ML_META_INDEX, MlFilter.TYPE.getPreferredName(), ids, handler, errorHandler, MlFilter.PARSER);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an auditor for the given job
|
||||
*
|
||||
* @param jobId the job id
|
||||
* @return the {@code Auditor}
|
||||
*/
|
||||
public Auditor audit(String jobId) {
|
||||
return new Auditor(client, jobId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,12 +42,12 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
|
|||
private static final Logger LOGGER = Loggers.getLogger(ExpiredResultsRemover.class);
|
||||
|
||||
private final Client client;
|
||||
private final Function<String, Auditor> auditorSupplier;
|
||||
private final Auditor auditor;
|
||||
|
||||
public ExpiredResultsRemover(Client client, ClusterService clusterService, Function<String, Auditor> auditorSupplier) {
|
||||
public ExpiredResultsRemover(Client client, ClusterService clusterService, Auditor auditor) {
|
||||
super(clusterService);
|
||||
this.client = Objects.requireNonNull(client);
|
||||
this.auditorSupplier = Objects.requireNonNull(auditorSupplier);
|
||||
this.auditor = Objects.requireNonNull(auditor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -99,6 +99,6 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
|
|||
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(instant, ZoneOffset.systemDefault());
|
||||
String formatted = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(zonedDateTime);
|
||||
String msg = Messages.getMessage(Messages.JOB_AUDIT_OLD_RESULTS_DELETED, formatted);
|
||||
auditorSupplier.apply(jobId).info(msg);
|
||||
auditor.info(jobId, msg);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ public class AuditMessage extends ToXContentToBytes implements Writeable {
|
|||
public static final ParseField MESSAGE = new ParseField("message");
|
||||
public static final ParseField LEVEL = new ParseField("level");
|
||||
public static final ParseField TIMESTAMP = new ParseField("timestamp");
|
||||
public static final ParseField NODE_NAME = new ParseField("node_name");
|
||||
|
||||
public static final ObjectParser<AuditMessage, Void> PARSER = new ObjectParser<>(TYPE.getPreferredName(),
|
||||
AuditMessage::new);
|
||||
|
@ -49,22 +50,25 @@ public class AuditMessage extends ToXContentToBytes implements Writeable {
|
|||
}
|
||||
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
|
||||
}, TIMESTAMP, ValueType.VALUE);
|
||||
PARSER.declareString(AuditMessage::setNodeName, NODE_NAME);
|
||||
}
|
||||
|
||||
private String jobId;
|
||||
private String message;
|
||||
private Level level;
|
||||
private Date timestamp;
|
||||
private String nodeName;
|
||||
|
||||
private AuditMessage() {
|
||||
|
||||
public AuditMessage() {
|
||||
// Default constructor
|
||||
}
|
||||
|
||||
private AuditMessage(String jobId, String message, Level level) {
|
||||
AuditMessage(String jobId, String message, Level level, String nodeName) {
|
||||
this.jobId = jobId;
|
||||
this.message = message;
|
||||
this.level = level;
|
||||
timestamp = new Date();
|
||||
this.nodeName = nodeName;
|
||||
}
|
||||
|
||||
public AuditMessage(StreamInput in) throws IOException {
|
||||
|
@ -76,6 +80,7 @@ public class AuditMessage extends ToXContentToBytes implements Writeable {
|
|||
if (in.readBoolean()) {
|
||||
timestamp = new Date(in.readLong());
|
||||
}
|
||||
nodeName = in.readOptionalString();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -92,6 +97,7 @@ public class AuditMessage extends ToXContentToBytes implements Writeable {
|
|||
if (hasTimestamp) {
|
||||
out.writeLong(timestamp.getTime());
|
||||
}
|
||||
out.writeOptionalString(nodeName);
|
||||
}
|
||||
|
||||
public String getJobId() {
|
||||
|
@ -126,20 +132,28 @@ public class AuditMessage extends ToXContentToBytes implements Writeable {
|
|||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
public static AuditMessage newInfo(String jobId, String message) {
|
||||
return new AuditMessage(jobId, message, Level.INFO);
|
||||
public String getNodeName() {
|
||||
return nodeName;
|
||||
}
|
||||
|
||||
public static AuditMessage newWarning(String jobId, String message) {
|
||||
return new AuditMessage(jobId, message, Level.WARNING);
|
||||
public void setNodeName(String nodeName) {
|
||||
this.nodeName = nodeName;
|
||||
}
|
||||
|
||||
public static AuditMessage newActivity(String jobId, String message) {
|
||||
return new AuditMessage(jobId, message, Level.ACTIVITY);
|
||||
public static AuditMessage newInfo(String jobId, String message, String nodeName) {
|
||||
return new AuditMessage(jobId, message, Level.INFO, nodeName);
|
||||
}
|
||||
|
||||
public static AuditMessage newError(String jobId, String message) {
|
||||
return new AuditMessage(jobId, message, Level.ERROR);
|
||||
public static AuditMessage newWarning(String jobId, String message, String nodeName) {
|
||||
return new AuditMessage(jobId, message, Level.WARNING, nodeName);
|
||||
}
|
||||
|
||||
public static AuditMessage newActivity(String jobId, String message, String nodeName) {
|
||||
return new AuditMessage(jobId, message, Level.ACTIVITY, nodeName);
|
||||
}
|
||||
|
||||
public static AuditMessage newError(String jobId, String message, String nodeName) {
|
||||
return new AuditMessage(jobId, message, Level.ERROR, nodeName);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -157,6 +171,9 @@ public class AuditMessage extends ToXContentToBytes implements Writeable {
|
|||
if (timestamp != null) {
|
||||
builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime());
|
||||
}
|
||||
if (nodeName != null) {
|
||||
builder.field(NODE_NAME.getPreferredName(), nodeName);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
|
@ -26,28 +27,23 @@ public class Auditor {
|
|||
private static final Logger LOGGER = Loggers.getLogger(Auditor.class);
|
||||
|
||||
private final Client client;
|
||||
private final String jobId;
|
||||
private final ClusterService clusterService;
|
||||
|
||||
public Auditor(Client client, String jobId) {
|
||||
public Auditor(Client client, ClusterService clusterService) {
|
||||
this.client = Objects.requireNonNull(client);
|
||||
this.jobId = jobId;
|
||||
this.clusterService = clusterService;
|
||||
}
|
||||
|
||||
public void info(String message) {
|
||||
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newInfo(jobId, message));
|
||||
public void info(String jobId, String message) {
|
||||
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newInfo(jobId, message, clusterService.localNode().getName()));
|
||||
}
|
||||
|
||||
public void warning(String message) {
|
||||
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newWarning(jobId, message));
|
||||
public void warning(String jobId, String message) {
|
||||
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newWarning(jobId, message, clusterService.localNode().getName()));
|
||||
}
|
||||
|
||||
public void error(String message) {
|
||||
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newError(jobId, message));
|
||||
}
|
||||
|
||||
public void activity(int totalJobs, int totalDetectors, int runningJobs, int runningDetectors) {
|
||||
String type = AuditActivity.TYPE.getPreferredName();
|
||||
indexDoc(type, AuditActivity.newActivity(totalJobs, totalDetectors, runningJobs, runningDetectors));
|
||||
public void error(String jobId, String message) {
|
||||
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newError(jobId, message, clusterService.localNode().getName()));
|
||||
}
|
||||
|
||||
private void indexDoc(String type, ToXContent toXContent) {
|
||||
|
|
|
@ -47,6 +47,7 @@ public class MlInitializationServiceTests extends ESTestCase {
|
|||
private ClusterService clusterService;
|
||||
private Client client;
|
||||
private JobProvider jobProvider;
|
||||
private Auditor auditor;
|
||||
|
||||
@Before
|
||||
public void setUpMocks() {
|
||||
|
@ -55,6 +56,7 @@ public class MlInitializationServiceTests extends ESTestCase {
|
|||
clusterService = mock(ClusterService.class);
|
||||
client = mock(Client.class);
|
||||
jobProvider = mock(JobProvider.class);
|
||||
auditor = mock(Auditor.class);
|
||||
|
||||
doAnswer(invocation -> {
|
||||
((Runnable) invocation.getArguments()[0]).run();
|
||||
|
@ -70,7 +72,7 @@ public class MlInitializationServiceTests extends ESTestCase {
|
|||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
MlInitializationService initializationService =
|
||||
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider);
|
||||
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor);
|
||||
|
||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
|
@ -91,7 +93,7 @@ public class MlInitializationServiceTests extends ESTestCase {
|
|||
|
||||
public void testInitialize_noMasterNode() throws Exception {
|
||||
MlInitializationService initializationService =
|
||||
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider);
|
||||
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor);
|
||||
|
||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
|
@ -120,7 +122,7 @@ public class MlInitializationServiceTests extends ESTestCase {
|
|||
ClusterService clusterService = mock(ClusterService.class);
|
||||
JobProvider jobProvider = mockJobProvider();
|
||||
MlInitializationService initializationService =
|
||||
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider);
|
||||
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor);
|
||||
|
||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
|
@ -173,7 +175,7 @@ public class MlInitializationServiceTests extends ESTestCase {
|
|||
ClusterService clusterService = mock(ClusterService.class);
|
||||
JobProvider jobProvider = mockJobProvider();
|
||||
MlInitializationService initializationService =
|
||||
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider);
|
||||
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor);
|
||||
|
||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
|
@ -221,7 +223,7 @@ public class MlInitializationServiceTests extends ESTestCase {
|
|||
|
||||
public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception {
|
||||
MlInitializationService initializationService =
|
||||
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider);
|
||||
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor);
|
||||
MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class);
|
||||
initializationService.setDailyManagementService(initialDailyManagementService);
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.mock.orig.Mockito;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
|
@ -40,13 +41,16 @@ import org.elasticsearch.xpack.ml.job.config.Detector;
|
|||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
|
||||
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
|
||||
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
|
||||
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction.Response;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.InputStream;
|
||||
|
@ -77,7 +81,6 @@ import static org.mockito.Mockito.when;
|
|||
public class DatafeedJobRunnerTests extends ESTestCase {
|
||||
|
||||
private Client client;
|
||||
private Auditor auditor;
|
||||
private ActionFuture<PostDataAction.Response> jobDataFuture;
|
||||
private ActionFuture<FlushJobAction.Response> flushJobFuture;
|
||||
private ClusterService clusterService;
|
||||
|
@ -85,6 +88,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
|
|||
private DataExtractorFactory dataExtractorFactory;
|
||||
private DatafeedJobRunner datafeedJobRunner;
|
||||
private long currentTime = 120000;
|
||||
private Auditor auditor;
|
||||
|
||||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -107,9 +111,18 @@ public class DatafeedJobRunnerTests extends ESTestCase {
|
|||
clusterService = mock(ClusterService.class);
|
||||
when(clusterService.state()).thenReturn(cs.build());
|
||||
|
||||
client = mock(Client.class);
|
||||
|
||||
ArgumentCaptor<XContentBuilder> argumentCaptor = ArgumentCaptor.forClass(XContentBuilder.class);
|
||||
client = new MockClientBuilder("foo")
|
||||
.prepareIndex(Auditor.NOTIFICATIONS_INDEX, AuditMessage.TYPE.getPreferredName(), "responseId", argumentCaptor)
|
||||
.build();
|
||||
|
||||
jobDataFuture = mock(ActionFuture.class);
|
||||
flushJobFuture = mock(ActionFuture.class);
|
||||
DiscoveryNode dNode = mock(DiscoveryNode.class);
|
||||
when(dNode.getName()).thenReturn("this_node_has_a_name");
|
||||
when(clusterService.localNode()).thenReturn(dNode);
|
||||
auditor = mock(Auditor.class);
|
||||
|
||||
JobProvider jobProvider = mock(JobProvider.class);
|
||||
Mockito.doAnswer(invocationOnMock -> {
|
||||
|
@ -132,14 +145,13 @@ public class DatafeedJobRunnerTests extends ESTestCase {
|
|||
when(client.execute(same(PostDataAction.INSTANCE), any())).thenReturn(jobDataFuture);
|
||||
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
|
||||
|
||||
datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider, () -> currentTime) {
|
||||
datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider, () -> currentTime, auditor) {
|
||||
@Override
|
||||
DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) {
|
||||
return dataExtractorFactory;
|
||||
}
|
||||
};
|
||||
|
||||
when(jobProvider.audit(anyString())).thenReturn(auditor);
|
||||
doAnswer(invocationOnMock -> {
|
||||
@SuppressWarnings("rawtypes")
|
||||
Consumer consumer = (Consumer) invocationOnMock.getArguments()[3];
|
||||
|
@ -230,7 +242,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
|
|||
datafeedJobRunner.doDatafeedRealtime(10L, "foo", holder);
|
||||
|
||||
verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME), any());
|
||||
verify(auditor, times(1)).warning(anyString());
|
||||
verify(auditor, times(1)).warning(eq("job_id"), anyString());
|
||||
verify(client, never()).execute(same(PostDataAction.INSTANCE), any());
|
||||
verify(client, never()).execute(same(FlushJobAction.INSTANCE), any());
|
||||
}
|
||||
|
@ -266,7 +278,8 @@ public class DatafeedJobRunnerTests extends ESTestCase {
|
|||
Job.Builder jobBuilder = createDatafeedJob();
|
||||
jobBuilder.setDataDescription(dataDescription);
|
||||
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build();
|
||||
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime);
|
||||
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class),
|
||||
() -> currentTime, auditor);
|
||||
|
||||
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig, jobBuilder.build());
|
||||
|
||||
|
@ -280,7 +293,8 @@ public class DatafeedJobRunnerTests extends ESTestCase {
|
|||
jobBuilder.setDataDescription(dataDescription);
|
||||
DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo");
|
||||
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
|
||||
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime);
|
||||
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class),
|
||||
() -> currentTime, auditor);
|
||||
|
||||
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build());
|
||||
|
||||
|
@ -294,7 +308,8 @@ public class DatafeedJobRunnerTests extends ESTestCase {
|
|||
jobBuilder.setDataDescription(dataDescription);
|
||||
DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo");
|
||||
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
|
||||
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime);
|
||||
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class),
|
||||
() -> currentTime, auditor);
|
||||
|
||||
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build());
|
||||
|
||||
|
@ -308,7 +323,8 @@ public class DatafeedJobRunnerTests extends ESTestCase {
|
|||
jobBuilder.setDataDescription(dataDescription);
|
||||
DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo");
|
||||
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(AggregationBuilders.avg("a")));
|
||||
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime);
|
||||
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class),
|
||||
() -> currentTime, auditor);
|
||||
|
||||
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build());
|
||||
|
||||
|
@ -323,7 +339,8 @@ public class DatafeedJobRunnerTests extends ESTestCase {
|
|||
DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo");
|
||||
datafeedConfig.setAggregations(AggregatorFactories.builder());
|
||||
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
|
||||
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), () -> currentTime);
|
||||
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class),
|
||||
() -> currentTime, auditor);
|
||||
|
||||
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build());
|
||||
|
||||
|
|
|
@ -23,20 +23,20 @@ public class ProblemTrackerTests extends ESTestCase {
|
|||
@Before
|
||||
public void setUpTests() {
|
||||
auditor = mock(Auditor.class);
|
||||
problemTracker = new ProblemTracker(() -> auditor);
|
||||
problemTracker = new ProblemTracker(auditor, "foo");
|
||||
}
|
||||
|
||||
public void testReportExtractionProblem() {
|
||||
problemTracker.reportExtractionProblem("foo");
|
||||
|
||||
verify(auditor).error("Datafeed is encountering errors extracting data: foo");
|
||||
verify(auditor).error("foo", "Datafeed is encountering errors extracting data: foo");
|
||||
assertTrue(problemTracker.hasProblems());
|
||||
}
|
||||
|
||||
public void testReportAnalysisProblem() {
|
||||
problemTracker.reportAnalysisProblem("foo");
|
||||
|
||||
verify(auditor).error("Datafeed is encountering errors submitting data for analysis: foo");
|
||||
verify(auditor).error("foo", "Datafeed is encountering errors submitting data for analysis: foo");
|
||||
assertTrue(problemTracker.hasProblems());
|
||||
}
|
||||
|
||||
|
@ -44,7 +44,7 @@ public class ProblemTrackerTests extends ESTestCase {
|
|||
problemTracker.reportExtractionProblem("foo");
|
||||
problemTracker.reportAnalysisProblem("foo");
|
||||
|
||||
verify(auditor, times(1)).error("Datafeed is encountering errors extracting data: foo");
|
||||
verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: foo");
|
||||
assertTrue(problemTracker.hasProblems());
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ public class ProblemTrackerTests extends ESTestCase {
|
|||
problemTracker.finishReport();
|
||||
problemTracker.reportExtractionProblem("foo");
|
||||
|
||||
verify(auditor, times(1)).error("Datafeed is encountering errors extracting data: foo");
|
||||
verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: foo");
|
||||
assertTrue(problemTracker.hasProblems());
|
||||
}
|
||||
|
||||
|
@ -70,7 +70,7 @@ public class ProblemTrackerTests extends ESTestCase {
|
|||
problemTracker.reportEmptyDataCount();
|
||||
}
|
||||
|
||||
verify(auditor).warning("Datafeed has been retrieving no data for a while");
|
||||
verify(auditor).warning("foo", "Datafeed has been retrieving no data for a while");
|
||||
}
|
||||
|
||||
public void testUpdateEmptyDataCount_GivenEmptyElevenTimes() {
|
||||
|
@ -78,7 +78,7 @@ public class ProblemTrackerTests extends ESTestCase {
|
|||
problemTracker.reportEmptyDataCount();
|
||||
}
|
||||
|
||||
verify(auditor, times(1)).warning("Datafeed has been retrieving no data for a while");
|
||||
verify(auditor, times(1)).warning("foo", "Datafeed has been retrieving no data for a while");
|
||||
}
|
||||
|
||||
public void testUpdateEmptyDataCount_GivenNonEmptyAfterNineEmpty() {
|
||||
|
@ -96,8 +96,8 @@ public class ProblemTrackerTests extends ESTestCase {
|
|||
}
|
||||
problemTracker.reportNoneEmptyCount();
|
||||
|
||||
verify(auditor).warning("Datafeed has been retrieving no data for a while");
|
||||
verify(auditor).info("Datafeed has started retrieving data again");
|
||||
verify(auditor).warning("foo", "Datafeed has been retrieving no data for a while");
|
||||
verify(auditor).info("foo", "Datafeed has started retrieving data again");
|
||||
}
|
||||
|
||||
public void testFinishReport_GivenNoProblems() {
|
||||
|
@ -112,8 +112,8 @@ public class ProblemTrackerTests extends ESTestCase {
|
|||
problemTracker.finishReport();
|
||||
problemTracker.finishReport();
|
||||
|
||||
verify(auditor).error("Datafeed is encountering errors extracting data: bar");
|
||||
verify(auditor).info("Datafeed has recovered data extraction and analysis");
|
||||
verify(auditor).error("foo", "Datafeed is encountering errors extracting data: bar");
|
||||
verify(auditor).info("foo", "Datafeed has recovered data extraction and analysis");
|
||||
assertFalse(problemTracker.hasProblems());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,13 +36,13 @@ public class JobManagerTests extends ESTestCase {
|
|||
|
||||
private ClusterService clusterService;
|
||||
private JobProvider jobProvider;
|
||||
private Auditor auditor;
|
||||
|
||||
@Before
|
||||
public void setupMocks() {
|
||||
clusterService = mock(ClusterService.class);
|
||||
jobProvider = mock(JobProvider.class);
|
||||
Auditor auditor = mock(Auditor.class);
|
||||
when(jobProvider.audit(anyString())).thenReturn(auditor);
|
||||
auditor = mock(Auditor.class);
|
||||
}
|
||||
|
||||
public void testGetJob() {
|
||||
|
@ -121,7 +121,7 @@ public class JobManagerTests extends ESTestCase {
|
|||
private JobManager createJobManager() {
|
||||
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
|
||||
JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class);
|
||||
return new JobManager(settings, jobProvider, jobResultsPersister, clusterService);
|
||||
return new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor);
|
||||
}
|
||||
|
||||
private ClusterState createClusterState() {
|
||||
|
|
|
@ -156,6 +156,6 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private ExpiredResultsRemover createExpiredResultsRemover() {
|
||||
return new ExpiredResultsRemover(client, clusterService, jobId -> mock(Auditor.class));
|
||||
return new ExpiredResultsRemover(client, clusterService, mock(Auditor.class));
|
||||
}
|
||||
}
|
|
@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.notifications;
|
|||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Date;
|
||||
|
@ -21,15 +20,8 @@ public class AuditMessageTests extends AbstractSerializingTestCase<AuditMessage>
|
|||
startMillis = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public void testDefaultConstructor() {
|
||||
AuditMessage auditMessage = new AuditMessage();
|
||||
assertNull(auditMessage.getMessage());
|
||||
assertNull(auditMessage.getLevel());
|
||||
assertNull(auditMessage.getTimestamp());
|
||||
}
|
||||
|
||||
public void testNewInfo() {
|
||||
AuditMessage info = AuditMessage.newInfo("foo", "some info");
|
||||
AuditMessage info = AuditMessage.newInfo("foo", "some info", "some_node");
|
||||
assertEquals("foo", info.getJobId());
|
||||
assertEquals("some info", info.getMessage());
|
||||
assertEquals(Level.INFO, info.getLevel());
|
||||
|
@ -37,7 +29,7 @@ public class AuditMessageTests extends AbstractSerializingTestCase<AuditMessage>
|
|||
}
|
||||
|
||||
public void testNewWarning() {
|
||||
AuditMessage warning = AuditMessage.newWarning("bar", "some warning");
|
||||
AuditMessage warning = AuditMessage.newWarning("bar", "some warning", "some_node");
|
||||
assertEquals("bar", warning.getJobId());
|
||||
assertEquals("some warning", warning.getMessage());
|
||||
assertEquals(Level.WARNING, warning.getLevel());
|
||||
|
@ -46,7 +38,7 @@ public class AuditMessageTests extends AbstractSerializingTestCase<AuditMessage>
|
|||
|
||||
|
||||
public void testNewError() {
|
||||
AuditMessage error = AuditMessage.newError("foo", "some error");
|
||||
AuditMessage error = AuditMessage.newError("foo", "some error", "some_node");
|
||||
assertEquals("foo", error.getJobId());
|
||||
assertEquals("some error", error.getMessage());
|
||||
assertEquals(Level.ERROR, error.getLevel());
|
||||
|
@ -54,7 +46,7 @@ public class AuditMessageTests extends AbstractSerializingTestCase<AuditMessage>
|
|||
}
|
||||
|
||||
public void testNewActivity() {
|
||||
AuditMessage error = AuditMessage.newActivity("foo", "some error");
|
||||
AuditMessage error = AuditMessage.newActivity("foo", "some error", "some_node");
|
||||
assertEquals("foo", error.getJobId());
|
||||
assertEquals("some error", error.getMessage());
|
||||
assertEquals(Level.ACTIVITY, error.getLevel());
|
||||
|
@ -74,20 +66,8 @@ public class AuditMessageTests extends AbstractSerializingTestCase<AuditMessage>
|
|||
|
||||
@Override
|
||||
protected AuditMessage createTestInstance() {
|
||||
AuditMessage message = new AuditMessage();
|
||||
if (randomBoolean()) {
|
||||
message.setJobId(randomAsciiOfLengthBetween(1, 20));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
message.setMessage(randomAsciiOfLengthBetween(1, 200));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
message.setLevel(randomFrom(Level.values()));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
message.setTimestamp(new Date(TimeUtils.dateStringToEpoch(randomTimeValue())));
|
||||
}
|
||||
return message;
|
||||
return new AuditMessage(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 200),
|
||||
randomFrom(Level.values()), randomAsciiOfLengthBetween(1, 20));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -9,6 +9,8 @@ import org.elasticsearch.action.ListenableActionFuture;
|
|||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
|
@ -16,15 +18,16 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class AuditorTests extends ESTestCase {
|
||||
private Client client;
|
||||
private ClusterService clusterService;
|
||||
private ListenableActionFuture<IndexResponse> indexResponse;
|
||||
private ArgumentCaptor<String> indexCaptor;
|
||||
private ArgumentCaptor<String> typeCaptor;
|
||||
|
@ -33,17 +36,22 @@ public class AuditorTests extends ESTestCase {
|
|||
@SuppressWarnings("unchecked")
|
||||
@Before
|
||||
public void setUpMocks() {
|
||||
client = Mockito.mock(Client.class);
|
||||
indexResponse = Mockito.mock(ListenableActionFuture.class);
|
||||
client = mock(Client.class);
|
||||
clusterService = mock(ClusterService.class);
|
||||
DiscoveryNode dNode = mock(DiscoveryNode.class);
|
||||
when(dNode.getName()).thenReturn("this_node_has_a_name");
|
||||
when(clusterService.localNode()).thenReturn(dNode);
|
||||
|
||||
indexResponse = mock(ListenableActionFuture.class);
|
||||
indexCaptor = ArgumentCaptor.forClass(String.class);
|
||||
typeCaptor = ArgumentCaptor.forClass(String.class);
|
||||
jsonCaptor = ArgumentCaptor.forClass(XContentBuilder.class);
|
||||
}
|
||||
|
||||
public void testInfo() {
|
||||
public void testInfo() throws IOException {
|
||||
givenClientPersistsSuccessfully();
|
||||
Auditor auditor = new Auditor(client, "foo");
|
||||
auditor.info("Here is my info");
|
||||
Auditor auditor = new Auditor(client, clusterService);
|
||||
auditor.info("foo", "Here is my info");
|
||||
assertEquals(".ml-notifications", indexCaptor.getValue());
|
||||
assertEquals("audit_message", typeCaptor.getValue());
|
||||
AuditMessage auditMessage = parseAuditMessage();
|
||||
|
@ -52,10 +60,10 @@ public class AuditorTests extends ESTestCase {
|
|||
assertEquals(Level.INFO, auditMessage.getLevel());
|
||||
}
|
||||
|
||||
public void testWarning() {
|
||||
public void testWarning() throws IOException {
|
||||
givenClientPersistsSuccessfully();
|
||||
Auditor auditor = new Auditor(client, "bar");
|
||||
auditor.warning("Here is my warning");
|
||||
Auditor auditor = new Auditor(client, clusterService);
|
||||
auditor.warning("bar", "Here is my warning");
|
||||
assertEquals(".ml-notifications", indexCaptor.getValue());
|
||||
assertEquals("audit_message", typeCaptor.getValue());
|
||||
AuditMessage auditMessage = parseAuditMessage();
|
||||
|
@ -64,10 +72,10 @@ public class AuditorTests extends ESTestCase {
|
|||
assertEquals(Level.WARNING, auditMessage.getLevel());
|
||||
}
|
||||
|
||||
public void testError() {
|
||||
public void testError() throws IOException {
|
||||
givenClientPersistsSuccessfully();
|
||||
Auditor auditor = new Auditor(client, "foobar");
|
||||
auditor.error("Here is my error");
|
||||
Auditor auditor = new Auditor(client, clusterService);
|
||||
auditor.error("foobar", "Here is my error");
|
||||
assertEquals(".ml-notifications", indexCaptor.getValue());
|
||||
assertEquals("audit_message", typeCaptor.getValue());
|
||||
AuditMessage auditMessage = parseAuditMessage();
|
||||
|
@ -76,21 +84,8 @@ public class AuditorTests extends ESTestCase {
|
|||
assertEquals(Level.ERROR, auditMessage.getLevel());
|
||||
}
|
||||
|
||||
public void testActivity_GivenNumbers() {
|
||||
givenClientPersistsSuccessfully();
|
||||
Auditor auditor = new Auditor(client, "");
|
||||
auditor.activity(10, 100, 5, 50);
|
||||
assertEquals(".ml-notifications", indexCaptor.getValue());
|
||||
assertEquals("audit_activity", typeCaptor.getValue());
|
||||
AuditActivity auditActivity = parseAuditActivity();
|
||||
assertEquals(10, auditActivity.getTotalJobs());
|
||||
assertEquals(100, auditActivity.getTotalDetectors());
|
||||
assertEquals(5, auditActivity.getRunningJobs());
|
||||
assertEquals(50, auditActivity.getRunningDetectors());
|
||||
}
|
||||
|
||||
private void givenClientPersistsSuccessfully() {
|
||||
IndexRequestBuilder indexRequestBuilder = Mockito.mock(IndexRequestBuilder.class);
|
||||
IndexRequestBuilder indexRequestBuilder = mock(IndexRequestBuilder.class);
|
||||
when(indexRequestBuilder.setSource(jsonCaptor.capture())).thenReturn(indexRequestBuilder);
|
||||
when(indexRequestBuilder.execute()).thenReturn(indexResponse);
|
||||
when(client.prepareIndex(indexCaptor.capture(), typeCaptor.capture(), any()))
|
||||
|
@ -99,23 +94,9 @@ public class AuditorTests extends ESTestCase {
|
|||
.thenReturn(indexRequestBuilder);
|
||||
}
|
||||
|
||||
private AuditMessage parseAuditMessage() {
|
||||
try {
|
||||
String json = jsonCaptor.getValue().string();
|
||||
XContentParser parser = XContentFactory.xContent(json).createParser(NamedXContentRegistry.EMPTY, json);
|
||||
return AuditMessage.PARSER.apply(parser, null);
|
||||
} catch (IOException e) {
|
||||
return new AuditMessage();
|
||||
}
|
||||
}
|
||||
|
||||
private AuditActivity parseAuditActivity() {
|
||||
try {
|
||||
String json = jsonCaptor.getValue().string();
|
||||
XContentParser parser = XContentFactory.xContent(json).createParser(NamedXContentRegistry.EMPTY, json);
|
||||
return AuditActivity.PARSER.apply(parser, null);
|
||||
} catch (IOException e) {
|
||||
return new AuditActivity();
|
||||
}
|
||||
private AuditMessage parseAuditMessage() throws IOException {
|
||||
String json = jsonCaptor.getValue().string();
|
||||
XContentParser parser = XContentFactory.xContent(json).createParser(NamedXContentRegistry.EMPTY, json);
|
||||
return AuditMessage.PARSER.apply(parser, null);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue