diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java index 1589623bc22..09e564f2eaf 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java @@ -61,6 +61,7 @@ import org.elasticsearch.xpack.prelert.job.metadata.JobAllocator; import org.elasticsearch.xpack.prelert.job.metadata.JobLifeCycleService; import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleterFactory; +import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobDataCountsPersister; import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider; import org.elasticsearch.xpack.prelert.job.process.NativeController; import org.elasticsearch.xpack.prelert.job.process.ProcessCtrl; @@ -154,8 +155,9 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { // For this reason we can't use interfaces in the constructor of transport actions. // This ok for now as we will remove Guice soon ElasticsearchJobProvider jobProvider = new ElasticsearchJobProvider(client, 0, parseFieldMatcherSupplier.getParseFieldMatcher()); + ElasticsearchJobDataCountsPersister jobDataCountsPersister = new ElasticsearchJobDataCountsPersister(client); - JobManager jobManager = new JobManager(env, settings, jobProvider, clusterService); + JobManager jobManager = new JobManager(env, settings, jobProvider, jobDataCountsPersister, clusterService); AutodetectProcessFactory processFactory; if (USE_NATIVE_PROCESS_OPTION.get(settings)) { try { @@ -169,8 +171,8 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { processFactory = (JobDetails, ignoreDowntime) -> new BlackHoleAutodetectProcess(); } AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier); - DataProcessor dataProcessor = - new AutodetectProcessManager(settings, client, env, threadPool, jobManager, autodetectResultsParser, processFactory); + DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, env, threadPool, + jobManager, jobProvider, autodetectResultsParser, processFactory); ScheduledJobService scheduledJobService = new ScheduledJobService(threadPool, client, jobProvider, dataProcessor, new HttpDataExtractorFactory(), System::currentTimeMillis); return Arrays.asList( diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/Job.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/Job.java index f818ddb7808..aa6317f22b6 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/Job.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/Job.java @@ -47,7 +47,7 @@ import java.util.regex.Pattern; public class Job extends AbstractDiffable implements Writeable, ToXContent { public static final Job PROTO = new Job(null, null, null, null, null, 0L, null, null, null, null, null, - null, null, null, null, null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null, null); public static final long DEFAULT_BUCKETSPAN = 300; @@ -59,7 +59,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent public static final ParseField ID = new ParseField("jobId"); public static final ParseField ANALYSIS_CONFIG = new ParseField("analysisConfig"); public static final ParseField ANALYSIS_LIMITS = new ParseField("analysisLimits"); - public static final ParseField COUNTS = new ParseField("counts"); public static final ParseField CREATE_TIME = new ParseField("createTime"); public static final ParseField CUSTOM_SETTINGS = new ParseField("customSettings"); public static final ParseField DATA_DESCRIPTION = new ParseField("dataDescription"); @@ -75,7 +74,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent public static final ParseField RESULTS_RETENTION_DAYS = new ParseField("resultsRetentionDays"); public static final ParseField TIMEOUT = new ParseField("timeout"); public static final ParseField TRANSFORMS = new ParseField("transforms"); - public static final ParseField MODEL_SIZE_STATS = new ParseField("modelSizeStats"); public static final ParseField AVERAGE_BUCKET_PROCESSING_TIME = new ParseField("averageBucketProcessingTimeMs"); public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("modelSnapshotId"); @@ -114,10 +112,8 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent PARSER.declareObject(Builder::setAnalysisLimits, AnalysisLimits.PARSER, ANALYSIS_LIMITS); PARSER.declareObject(Builder::setSchedulerConfig, SchedulerConfig.PARSER, SCHEDULER_CONFIG); PARSER.declareObject(Builder::setDataDescription, DataDescription.PARSER, DATA_DESCRIPTION); - PARSER.declareObject(Builder::setModelSizeStats, ModelSizeStats.PARSER, MODEL_SIZE_STATS); PARSER.declareObjectArray(Builder::setTransforms, TransformConfig.PARSER, TRANSFORMS); PARSER.declareObject(Builder::setModelDebugConfig, ModelDebugConfig.PARSER, MODEL_DEBUG_CONFIG); - PARSER.declareObject(Builder::setCounts, DataCounts.PARSER, COUNTS); PARSER.declareField(Builder::setIgnoreDowntime, (p, c) -> IgnoreDowntime.fromString(p.text()), IGNORE_DOWNTIME, ValueType.STRING); PARSER.declareLong(Builder::setTimeout, TIMEOUT); PARSER.declareLong(Builder::setRenormalizationWindowDays, RENORMALIZATION_WINDOW_DAYS); @@ -140,10 +136,8 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent private final AnalysisLimits analysisLimits; private final SchedulerConfig schedulerConfig; private final DataDescription dataDescription; - private final ModelSizeStats modelSizeStats; private final List transforms; private final ModelDebugConfig modelDebugConfig; - private final DataCounts counts; private final IgnoreDowntime ignoreDowntime; private final Long renormalizationWindowDays; private final Long backgroundPersistInterval; @@ -155,8 +149,8 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent public Job(String jobId, String description, Date createTime, Date finishedTime, Date lastDataTime, long timeout, AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, SchedulerConfig schedulerConfig, - DataDescription dataDescription, ModelSizeStats modelSizeStats, List transforms, - ModelDebugConfig modelDebugConfig, DataCounts counts, IgnoreDowntime ignoreDowntime, Long renormalizationWindowDays, + DataDescription dataDescription, List transforms, + ModelDebugConfig modelDebugConfig, IgnoreDowntime ignoreDowntime, Long renormalizationWindowDays, Long backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map customSettings, Double averageBucketProcessingTimeMs, String modelSnapshotId) { this.jobId = jobId; @@ -169,10 +163,8 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent this.analysisLimits = analysisLimits; this.schedulerConfig = schedulerConfig; this.dataDescription = dataDescription; - this.modelSizeStats = modelSizeStats; this.transforms = transforms; this.modelDebugConfig = modelDebugConfig; - this.counts = counts; this.ignoreDowntime = ignoreDowntime; this.renormalizationWindowDays = renormalizationWindowDays; this.backgroundPersistInterval = backgroundPersistInterval; @@ -194,10 +186,8 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent analysisLimits = in.readOptionalWriteable(AnalysisLimits::new); schedulerConfig = in.readOptionalWriteable(SchedulerConfig::new); dataDescription = in.readOptionalWriteable(DataDescription::new); - modelSizeStats = in.readOptionalWriteable(ModelSizeStats::new); transforms = in.readList(TransformConfig::new); modelDebugConfig = in.readOptionalWriteable(ModelDebugConfig::new); - counts = in.readOptionalWriteable(DataCounts::new); ignoreDowntime = in.readOptionalWriteable(IgnoreDowntime::fromStream); renormalizationWindowDays = in.readOptionalLong(); backgroundPersistInterval = in.readOptionalLong(); @@ -321,15 +311,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent return modelDebugConfig; } - /** - * The memory usage object - * - * @return The ModelSizeStats - */ - public ModelSizeStats getModelSizeStats() { - return modelSizeStats; - } - /** * If not set the input data is assumed to be csv with a '_time' field in * epoch format. @@ -345,15 +326,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent return transforms; } - /** - * Processed records count - * - * @return the processed records counts - */ - public DataCounts getCounts() { - return counts; - } - /** * The duration of the renormalization window in days * @@ -452,10 +424,8 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent out.writeOptionalWriteable(analysisLimits); out.writeOptionalWriteable(schedulerConfig); out.writeOptionalWriteable(dataDescription); - out.writeOptionalWriteable(modelSizeStats); out.writeList(transforms); out.writeOptionalWriteable(modelDebugConfig); - out.writeOptionalWriteable(counts); out.writeOptionalWriteable(ignoreDowntime); out.writeOptionalLong(renormalizationWindowDays); out.writeOptionalLong(backgroundPersistInterval); @@ -495,18 +465,12 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent if (dataDescription != null) { builder.field(DATA_DESCRIPTION.getPreferredName(), dataDescription, params); } - if (modelSizeStats != null) { - builder.field(MODEL_SIZE_STATS.getPreferredName(), modelSizeStats, params); - } if (transforms != null) { builder.field(TRANSFORMS.getPreferredName(), transforms); } if (modelDebugConfig != null) { builder.field(MODEL_DEBUG_CONFIG.getPreferredName(), modelDebugConfig, params); } - if (counts != null) { - builder.field(COUNTS.getPreferredName(), counts, params); - } if (ignoreDowntime != null) { builder.field(IGNORE_DOWNTIME.getPreferredName(), ignoreDowntime); } @@ -550,8 +514,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent && Objects.equals(this.finishedTime, that.finishedTime) && Objects.equals(this.lastDataTime, that.lastDataTime) && (this.timeout == that.timeout) && Objects.equals(this.analysisConfig, that.analysisConfig) && Objects.equals(this.analysisLimits, that.analysisLimits) && Objects.equals(this.dataDescription, that.dataDescription) - && Objects.equals(this.modelDebugConfig, that.modelDebugConfig) && Objects.equals(this.modelSizeStats, that.modelSizeStats) - && Objects.equals(this.transforms, that.transforms) && Objects.equals(this.counts, that.counts) + && Objects.equals(this.modelDebugConfig, that.modelDebugConfig) && Objects.equals(this.transforms, that.transforms) && Objects.equals(this.ignoreDowntime, that.ignoreDowntime) && Objects.equals(this.renormalizationWindowDays, that.renormalizationWindowDays) && Objects.equals(this.backgroundPersistInterval, that.backgroundPersistInterval) @@ -564,7 +527,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent @Override public int hashCode() { return Objects.hash(jobId, description, createTime, finishedTime, lastDataTime, timeout, analysisConfig, - analysisLimits, dataDescription, modelDebugConfig, modelSizeStats, transforms, counts, renormalizationWindowDays, + analysisLimits, dataDescription, modelDebugConfig, transforms, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, ignoreDowntime, customSettings, modelSnapshotId); } @@ -601,7 +564,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent private AnalysisLimits analysisLimits; private SchedulerConfig schedulerConfig; private List transforms = new ArrayList<>(); - private ModelSizeStats modelSizeStats; private DataDescription dataDescription; private Date createTime; private Date finishedTime; @@ -612,7 +574,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent private Long backgroundPersistInterval; private Long modelSnapshotRetentionDays; private Long resultsRetentionDays; - private DataCounts counts; private IgnoreDowntime ignoreDowntime; private Map customSettings; private Double averageBucketProcessingTimeMs; @@ -631,7 +592,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent this.analysisConfig = job.getAnalysisConfig(); this.schedulerConfig = job.getSchedulerConfig(); this.transforms = job.getTransforms(); - this.modelSizeStats = job.getModelSizeStats(); this.dataDescription = job.getDataDescription(); this.createTime = job.getCreateTime(); this.finishedTime = job.getFinishedTime(); @@ -641,7 +601,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent this.renormalizationWindowDays = job.getRenormalizationWindowDays(); this.backgroundPersistInterval = job.getBackgroundPersistInterval(); this.resultsRetentionDays = job.getResultsRetentionDays(); - this.counts = job.getCounts(); this.ignoreDowntime = job.getIgnoreDowntime(); this.customSettings = job.getCustomSettings(); this.averageBucketProcessingTimeMs = job.getAverageBucketProcessingTimeMs(); @@ -697,6 +656,10 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent this.finishedTime = finishedTime; } + /** + * Set the wall clock time of the last data upload + * @param lastDataTime Wall clock time + */ public void setLastDataTime(Date lastDataTime) { this.lastDataTime = lastDataTime; } @@ -705,10 +668,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent this.transforms = transforms; } - public void setModelSizeStats(ModelSizeStats.Builder modelSizeStats) { - this.modelSizeStats = modelSizeStats.build(); - } - public void setDataDescription(DataDescription.Builder description) { dataDescription = description.build(); } @@ -737,10 +696,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent this.ignoreDowntime = ignoreDowntime; } - public void setCounts(DataCounts counts) { - this.counts = counts; - } - public void setAverageBucketProcessingTimeMs(Double averageBucketProcessingTimeMs) { this.averageBucketProcessingTimeMs = averageBucketProcessingTimeMs; } @@ -802,8 +757,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent Date createTime; Date finishedTime; Date lastDataTime; - DataCounts counts; - ModelSizeStats modelSizeStats; Double averageBucketProcessingTimeMs; String modelSnapshotId; if (fromApi) { @@ -811,8 +764,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent createTime = this.createTime == null ? new Date() : this.createTime; finishedTime = null; lastDataTime = null; - counts = new DataCounts(id); - modelSizeStats = null; averageBucketProcessingTimeMs = null; modelSnapshotId = null; } else { @@ -820,8 +771,6 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent createTime = this.createTime; finishedTime = this.finishedTime; lastDataTime = this.lastDataTime; - counts = this.counts; - modelSizeStats = this.modelSizeStats; averageBucketProcessingTimeMs = this.averageBucketProcessingTimeMs; modelSnapshotId = this.modelSnapshotId; } @@ -833,9 +782,9 @@ public class Job extends AbstractDiffable implements Writeable, ToXContent } return new Job( id, description, createTime, finishedTime, lastDataTime, timeout, analysisConfig, analysisLimits, - schedulerConfig, dataDescription, modelSizeStats, transforms, modelDebugConfig, counts, - ignoreDowntime, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, - resultsRetentionDays, customSettings, averageBucketProcessingTimeMs, modelSnapshotId + schedulerConfig, dataDescription, transforms, modelDebugConfig, ignoreDowntime, renormalizationWindowDays, + backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, + averageBucketProcessingTimeMs, modelSnapshotId ); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java index 6f4658a014a..921f028ffea 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobDataCount import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchPersister; import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchUsagePersister; import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister; +import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectCommunicator; import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcess; @@ -51,13 +52,15 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP private final Environment env; private final ThreadPool threadPool; private final JobManager jobManager; + private final JobProvider jobProvider; private final AutodetectResultsParser parser; private final AutodetectProcessFactory autodetectProcessFactory; private final ConcurrentMap autoDetectCommunicatorByJob; public AutodetectProcessManager(Settings settings, Client client, Environment env, ThreadPool threadPool, JobManager jobManager, - AutodetectResultsParser parser, AutodetectProcessFactory autodetectProcessFactory) { + JobProvider jobProvider, AutodetectResultsParser parser, + AutodetectProcessFactory autodetectProcessFactory) { super(settings); this.client = client; this.env = env; @@ -65,6 +68,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP this.parser = parser; this.autodetectProcessFactory = autodetectProcessFactory; this.jobManager = jobManager; + this.jobProvider = jobProvider; this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>(); } @@ -105,9 +109,9 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP ElasticsearchUsagePersister usagePersister = new ElasticsearchUsagePersister(client, jobLogger); UsageReporter usageReporter = new UsageReporter(settings, job.getJobId(), usagePersister, jobLogger); - JobDataCountsPersister jobDataCountsPersister = new ElasticsearchJobDataCountsPersister(client, jobLogger); - StatusReporter statusReporter = new StatusReporter(env, settings, job.getJobId(), job.getCounts(), usageReporter, - jobDataCountsPersister, jobLogger, job.getAnalysisConfig().getBucketSpanOrDefault()); + JobDataCountsPersister jobDataCountsPersister = new ElasticsearchJobDataCountsPersister(client); + StatusReporter statusReporter = new StatusReporter(env, settings, job.getJobId(), jobProvider.dataCounts(jobId), + usageReporter, jobDataCountsPersister, jobLogger, job.getAnalysisConfig().getBucketSpanOrDefault()); AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, ignoreDowntime); JobResultsPersister persister = new ElasticsearchPersister(jobId, client); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java index eba9aa94754..136a2e8a188 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/JobManager.java @@ -35,12 +35,12 @@ import org.elasticsearch.xpack.prelert.job.logs.JobLogs; import org.elasticsearch.xpack.prelert.job.messages.Messages; import org.elasticsearch.xpack.prelert.job.metadata.Allocation; import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; +import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord; import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.Objects; @@ -52,10 +52,7 @@ import java.util.stream.Collectors; *
    *
  • creation
  • *
  • deletion
  • - *
  • flushing
  • *
  • updating
  • - *
  • sending of data
  • - *
  • fetching jobs and results
  • *
  • starting/stopping of scheduled jobs
  • *
*/ @@ -70,18 +67,22 @@ public class JobManager { public static final String DEFAULT_RECORD_SORT_FIELD = AnomalyRecord.PROBABILITY.getPreferredName(); private final JobProvider jobProvider; + private final JobDataCountsPersister jobDataCountsPersister; private final ClusterService clusterService; private final Environment env; private final Settings settings; + /** * Create a JobManager */ - public JobManager(Environment env, Settings settings, JobProvider jobProvider, ClusterService clusterService) { + public JobManager(Environment env, Settings settings, JobProvider jobProvider, + JobDataCountsPersister jobDataCountsPersister, ClusterService clusterService) { this.env = env; this.settings = settings; this.jobProvider = Objects.requireNonNull(jobProvider); this.clusterService = clusterService; + this.jobDataCountsPersister = jobDataCountsPersister; } /** @@ -458,12 +459,11 @@ public class JobManager { builder.setModelSnapshotId(modelSnapshot.getSnapshotId()); if (request.getDeleteInterveningResults()) { builder.setIgnoreDowntime(IgnoreDowntime.NEVER); - Date latestRecordTime = modelSnapshot.getLatestResultTimeStamp(); - LOGGER.info("Resetting latest record time to '" + latestRecordTime + "'"); - builder.setLastDataTime(latestRecordTime); - DataCounts counts = job.getCounts(); - counts.setLatestRecordTimeStamp(latestRecordTime); - builder.setCounts(counts); + DataCounts counts = jobProvider.dataCounts(request.getJobId()); + counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp()); + // NORELEASE This update should be async. See #127 + jobDataCountsPersister.persistDataCounts(request.getJobId(), counts); + } else { builder.setIgnoreDowntime(IgnoreDowntime.ONCE); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobDataCountsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobDataCountsPersister.java index 747e7edf7e7..c0a12434c97 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobDataCountsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobDataCountsPersister.java @@ -10,6 +10,7 @@ import java.util.Locale; import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Client; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.IndexNotFoundException; @@ -20,12 +21,11 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; public class ElasticsearchJobDataCountsPersister implements JobDataCountsPersister { + private static final Logger LOGGER = Loggers.getLogger(ElasticsearchJobDataCountsPersister.class); private Client client; - private Logger logger; - public ElasticsearchJobDataCountsPersister(Client client, Logger logger) { + public ElasticsearchJobDataCountsPersister(Client client) { this.client = client; - this.logger = logger; } private XContentBuilder serialiseCounts(DataCounts counts) throws IOException { @@ -33,12 +33,8 @@ public class ElasticsearchJobDataCountsPersister implements JobDataCountsPersist return counts.toXContent(builder, ToXContent.EMPTY_PARAMS); } - @Override public void persistDataCounts(String jobId, DataCounts counts) { - // NORELEASE - Should these stats be stored in memory? - - try { XContentBuilder content = serialiseCounts(counts); @@ -47,11 +43,11 @@ public class ElasticsearchJobDataCountsPersister implements JobDataCountsPersist .setSource(content).execute().actionGet(); } catch (IOException ioe) { - logger.warn("Error serialising DataCounts stats", ioe); + LOGGER.warn("Error serialising DataCounts stats", ioe); } catch (IndexNotFoundException e) { String msg = String.format(Locale.ROOT, "Error writing the job '%s' status stats.", jobId); - logger.warn(msg, e); + LOGGER.warn(msg, e); } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobDetailsMapper.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobDetailsMapper.java deleted file mode 100644 index f3cbf38cabf..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobDetailsMapper.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.job.persistence; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.ParseFieldMatcher; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.ModelSizeStats; -import org.elasticsearch.xpack.prelert.job.results.ReservedFieldNames; - -import java.io.IOException; -import java.util.Objects; - -class ElasticsearchJobDetailsMapper { - private static final Logger LOGGER = Loggers.getLogger(ElasticsearchJobDetailsMapper.class); - - private final Client client; - private final ParseFieldMatcher parseFieldMatcher; - - public ElasticsearchJobDetailsMapper(Client client, ParseFieldMatcher parseFieldMatcher) { - this.client = Objects.requireNonNull(client); - this.parseFieldMatcher = Objects.requireNonNull(parseFieldMatcher); - } - - /** - * Maps an Elasticsearch source map to a {@link Job} object - * - * @param source The source of an Elasticsearch search response - * @return the {@code Job} object - */ - public Job map(BytesReference source) { - try (XContentParser parser = XContentFactory.xContent(source).createParser(source)) { - Job.Builder builder = Job.PARSER.apply(parser, () -> parseFieldMatcher); - addModelSizeStats(builder, builder.getId()); - addBucketProcessingTime(builder, builder.getId()); - return builder.build(); - } catch (IOException e) { - throw new ElasticsearchParseException("failed to parser job", e); - } - } - - private void addModelSizeStats(Job.Builder job, String jobId) { - String indexName = ElasticsearchPersister.getJobIndexName(jobId); - // Pull out the modelSizeStats document, and add this to the Job - LOGGER.trace("ES API CALL: get ID " + ModelSizeStats.TYPE + - " type " + ModelSizeStats.TYPE + " from index " + indexName); - GetResponse modelSizeStatsResponse = client.prepareGet( - indexName, ModelSizeStats.TYPE.getPreferredName(), ModelSizeStats.TYPE.getPreferredName()).get(); - - if (!modelSizeStatsResponse.isExists()) { - String msg = "No memory usage details for job with id " + jobId; - LOGGER.warn(msg); - } else { - // Remove the Kibana/Logstash '@timestamp' entry as stored in Elasticsearch, - // and replace using the API 'timestamp' key. - Object timestamp = modelSizeStatsResponse.getSource().remove(ElasticsearchMappings.ES_TIMESTAMP); - modelSizeStatsResponse.getSource().put(ModelSizeStats.TIMESTAMP_FIELD.getPreferredName(), timestamp); - BytesReference source = modelSizeStatsResponse.getSourceAsBytesRef(); - XContentParser parser; - try { - parser = XContentFactory.xContent(source).createParser(source); - } catch (IOException e) { - throw new ElasticsearchParseException("failed to parser model size stats", e); - } - ModelSizeStats.Builder modelSizeStats = ModelSizeStats.PARSER.apply(parser, () -> parseFieldMatcher); - job.setModelSizeStats(modelSizeStats); - } - } - - private void addBucketProcessingTime(Job.Builder job, String jobId) { - String indexName = ElasticsearchPersister.getJobIndexName(jobId); - // Pull out the modelSizeStats document, and add this to the Job - LOGGER.trace("ES API CALL: get ID " + ReservedFieldNames.BUCKET_PROCESSING_TIME_TYPE + - " type " + ReservedFieldNames.AVERAGE_PROCESSING_TIME_MS + " from index " + indexName); - GetResponse procTimeResponse = client.prepareGet( - indexName, ReservedFieldNames.BUCKET_PROCESSING_TIME_TYPE, - ReservedFieldNames.AVERAGE_PROCESSING_TIME_MS).get(); - - if (!procTimeResponse.isExists()) { - String msg = "No average bucket processing time details for job with id " + jobId; - LOGGER.warn(msg); - } else { - Object averageTime = procTimeResponse.getSource() - .get(ReservedFieldNames.AVERAGE_PROCESSING_TIME_MS); - if (averageTime instanceof Double) { - job.setAverageBucketProcessingTimeMs((Double) averageTime); - } - } - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobProvider.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobProvider.java index 5c42c591bd7..e20c43500eb 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobProvider.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobProvider.java @@ -307,6 +307,7 @@ public class ElasticsearchJobProvider implements JobProvider } } + @Override public DataCounts dataCounts(String jobId) { String indexName = ElasticsearchPersister.getJobIndexName(jobId); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappings.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappings.java index f9c5e017f70..a080e81ffc4 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappings.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchMappings.java @@ -107,6 +107,9 @@ public class ElasticsearchMappings { .field(ANALYZER, WHITESPACE) .endObject() .startObject(PROPERTIES) + .startObject(Job.ID.getPreferredName()) + .field(TYPE, KEYWORD) + .endObject() .startObject(DataCounts.PROCESSED_RECORD_COUNT.getPreferredName()) .field(TYPE, LONG) .endObject() @@ -131,6 +134,9 @@ public class ElasticsearchMappings { .startObject(DataCounts.OUT_OF_ORDER_TIME_COUNT.getPreferredName()) .field(TYPE, LONG) .endObject() + .startObject(DataCounts.EARLIEST_RECORD_TIME.getPreferredName()) + .field(TYPE, DATE) + .endObject() .startObject(DataCounts.LATEST_RECORD_TIME.getPreferredName()) .field(TYPE, DATE) .endObject() diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java index 0b1dd5bf1a5..3443d2b6270 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.prelert.job.persistence; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.ModelSizeStats; import org.elasticsearch.xpack.prelert.job.ModelSnapshot; @@ -80,6 +81,13 @@ public interface JobProvider extends JobResultsProvider { */ Optional modelSizeStats(String jobId); + /** + * Get the job's data counts + * @param jobId The job id + * @return The dataCounts or default constructed object if not found + */ + DataCounts dataCounts(String jobId); + /** * Retrieves the list with the given {@code listId} from the datastore. * diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/ReservedFieldNames.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/ReservedFieldNames.java index e2f001d459c..e3dc116276f 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/ReservedFieldNames.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/ReservedFieldNames.java @@ -122,6 +122,7 @@ public final class ReservedFieldNames { DataCounts.MISSING_FIELD_COUNT.getPreferredName(), DataCounts.OUT_OF_ORDER_TIME_COUNT.getPreferredName(), DataCounts.LATEST_RECORD_TIME.getPreferredName(), + DataCounts.EARLIEST_RECORD_TIME.getPreferredName(), Influence.INFLUENCER_FIELD_NAME.getPreferredName(), Influence.INFLUENCER_FIELD_VALUES.getPreferredName(), diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobService.java index 6d17f4e5d8c..b28dad99f88 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobService.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.prelert.PrelertPlugin; import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction; +import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus; import org.elasticsearch.xpack.prelert.job.SchedulerState; @@ -176,7 +177,7 @@ public class ScheduledJobService extends AbstractComponent { DataExtractor dataExtractor = dataExtractorFactory.newExtractor(job); ScheduledJob scheduledJob = new ScheduledJob(job.getJobId(), frequency.toMillis(), queryDelay.toMillis(), dataExtractor, dataProcessor, auditor, currentTimeSupplier, getLatestFinalBucketEndTimeMs(job), - getLatestRecordTimestamp(job)); + getLatestRecordTimestamp(job.getJobId())); return new Holder(scheduledJob, new ProblemTracker(() -> auditor)); } @@ -200,10 +201,11 @@ public class ScheduledJobService extends AbstractComponent { return latestFinalBucketEndMs; } - private long getLatestRecordTimestamp(Job job) { + private long getLatestRecordTimestamp(String jobId) { long latestRecordTimeMs = -1L; - if (job.getCounts() != null && job.getCounts().getLatestRecordTimeStamp() != null) { - latestRecordTimeMs = job.getCounts().getLatestRecordTimeStamp().getTime(); + DataCounts dataCounts = jobProvider.dataCounts(jobId); + if (dataCounts.getLatestRecordTimeStamp() != null) { + latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime(); } return latestRecordTimeMs; } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionResponseTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionResponseTests.java index 819259507dc..3ff530fa6ce 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionResponseTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/GetJobActionResponseTests.java @@ -47,14 +47,12 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase transformConfigList = new ArrayList<>(numTransformers); for (int i = 0; i < numTransformers; i++) { transformConfigList.add(new TransformConfig(TransformType.UPPERCASE.prettyName())); } ModelDebugConfig modelDebugConfig = randomBoolean() ? new ModelDebugConfig(randomDouble(), randomAsciiOfLength(10)) : null; - DataCounts counts = randomBoolean() ? new DataCounts(jobId) : null; IgnoreDowntime ignoreDowntime = randomFrom(IgnoreDowntime.values()); Long normalizationWindowDays = randomBoolean() ? randomLong() : null; Long backgroundPersistInterval = randomBoolean() ? randomLong() : null; @@ -65,8 +63,8 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase { assertNull(job.getIgnoreDowntime()); assertNull(job.getLastDataTime()); assertNull(job.getModelDebugConfig()); - assertNull(job.getModelSizeStats()); assertNull(job.getRenormalizationWindowDays()); assertNull(job.getBackgroundPersistInterval()); assertNull(job.getModelSnapshotRetentionDays()); @@ -595,9 +594,6 @@ public class JobTests extends AbstractSerializingTestCase { if (randomBoolean()) { builder.setDataDescription(new DataDescription.Builder()); } - if (randomBoolean()) { - builder.setModelSizeStats(new ModelSizeStats.Builder("foo")); - } String[] outputs; TransformType[] transformTypes ; AnalysisConfig ac = analysisConfig.build(); @@ -619,7 +615,6 @@ public class JobTests extends AbstractSerializingTestCase { if (randomBoolean()) { builder.setModelDebugConfig(new ModelDebugConfig(randomDouble(), randomAsciiOfLength(10))); } - builder.setCounts(new DataCounts(jobId)); builder.setIgnoreDowntime(randomFrom(IgnoreDowntime.values())); if (randomBoolean()) { builder.setRenormalizationWindowDays(randomPositiveLong()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java index 0be789b5539..eef2f3a5a7d 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManagerTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.prelert.job.Detector; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.metadata.Allocation; +import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectCommunicator; import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcessFactory; import org.elasticsearch.xpack.prelert.job.process.autodetect.output.parsing.AutodetectResultsParser; @@ -51,10 +52,12 @@ import static org.mockito.Mockito.spy; public class AutodetectProcessManagerTests extends ESTestCase { private JobManager jobManager; + private JobProvider jobProvider; @Before public void initMocks() { jobManager = Mockito.mock(JobManager.class); + jobProvider = Mockito.mock(JobProvider.class); givenAllocationWithStatus(JobStatus.CLOSED); } @@ -187,8 +190,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { ThreadPool threadPool = mock(ThreadPool.class); AutodetectResultsParser parser = mock(AutodetectResultsParser.class); AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class); - AutodetectProcessManager manager = - new AutodetectProcessManager(Settings.EMPTY, client, environment, threadPool, jobManager, parser, autodetectProcessFactory); + AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, environment, threadPool, + jobManager, jobProvider, parser, autodetectProcessFactory); manager = spy(manager); doReturn(communicator).when(manager).create(any(), anyBoolean()); return manager; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java index 8ee3c867732..9b0d0d13d05 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/manager/JobManagerTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; +import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; import org.junit.Before; @@ -39,12 +40,14 @@ public class JobManagerTests extends ESTestCase { private ClusterService clusterService; private JobProvider jobProvider; + private JobDataCountsPersister jobDataCountsPersister; private Auditor auditor; @Before public void setupMocks() { clusterService = mock(ClusterService.class); jobProvider = mock(JobProvider.class); + jobDataCountsPersister = mock(JobDataCountsPersister.class); auditor = mock(Auditor.class); when(jobProvider.audit(anyString())).thenReturn(auditor); } @@ -188,7 +191,7 @@ public class JobManagerTests extends ESTestCase { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); Environment env = new Environment( settings); - return new JobManager(env, settings, jobProvider, clusterService); + return new JobManager(env, settings, jobProvider, jobDataCountsPersister, clusterService); } private ClusterState createClusterState() { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobDetailsMapperTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobDetailsMapperTests.java deleted file mode 100644 index ee43c2ca83f..00000000000 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobDetailsMapperTests.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.job.persistence; - -import org.elasticsearch.action.get.GetRequestBuilder; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.ParseFieldMatcher; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.prelert.job.Job; -import org.elasticsearch.xpack.prelert.job.ModelSizeStats; -import org.elasticsearch.xpack.prelert.job.results.ReservedFieldNames; -import org.junit.Before; -import org.mockito.Mockito; - -import java.util.Date; -import java.util.HashMap; -import java.util.Map; - -import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class ElasticsearchJobDetailsMapperTests extends ESTestCase { - private Client client; - - @Before - public void setUpMocks() { - client = Mockito.mock(Client.class); - } - - public void testMap_GivenJobSourceCannotBeParsed() { - BytesArray source = new BytesArray("{ \"invalidKey\": true }"); - - GetResponse getResponse = mock(GetResponse.class); - when(getResponse.isExists()).thenReturn(false); - GetRequestBuilder getRequestBuilder = mock(GetRequestBuilder.class); - when(getRequestBuilder.get()).thenReturn(getResponse); - when(client.prepareGet("prelertresults-foo", ModelSizeStats.TYPE.getPreferredName(), ModelSizeStats.TYPE.getPreferredName())) - .thenReturn(getRequestBuilder); - - ElasticsearchJobDetailsMapper mapper = new ElasticsearchJobDetailsMapper(client, ParseFieldMatcher.STRICT); - - ESTestCase.expectThrows(IllegalArgumentException.class, () -> mapper.map(source)); - } - - public void testMap_GivenModelSizeStatsExists() throws Exception { - ModelSizeStats.Builder modelSizeStats = new ModelSizeStats.Builder("foo"); - modelSizeStats.setModelBytes(42L); - Date now = new Date(); - modelSizeStats.setTimestamp(now); - - Job originalJob = buildJobBuilder("foo").build(); - - BytesReference source = originalJob.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).bytes(); - BytesReference modelSizeStatsSource = modelSizeStats.build().toXContent( - XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).bytes(); - - GetResponse getModelSizeResponse = mock(GetResponse.class); - when(getModelSizeResponse.isExists()).thenReturn(true); - when(getModelSizeResponse.getSourceAsBytesRef()).thenReturn(modelSizeStatsSource); - GetRequestBuilder getModelSizeRequestBuilder = mock(GetRequestBuilder.class); - when(getModelSizeRequestBuilder.get()).thenReturn(getModelSizeResponse); - when(client.prepareGet("prelertresults-foo", ModelSizeStats.TYPE.getPreferredName(), ModelSizeStats.TYPE.getPreferredName())) - .thenReturn(getModelSizeRequestBuilder); - - - Map procTimeSource = new HashMap<>(); - procTimeSource.put(ReservedFieldNames.AVERAGE_PROCESSING_TIME_MS, 20.2); - - GetResponse getProcTimeResponse = mock(GetResponse.class); - when(getProcTimeResponse.isExists()).thenReturn(true); - when(getProcTimeResponse.getSource()).thenReturn(procTimeSource); - GetRequestBuilder getProcTimeRequestBuilder = mock(GetRequestBuilder.class); - when(getProcTimeRequestBuilder.get()).thenReturn(getProcTimeResponse); - when(client.prepareGet("prelertresults-foo", ReservedFieldNames.BUCKET_PROCESSING_TIME_TYPE, - ReservedFieldNames.AVERAGE_PROCESSING_TIME_MS)) - .thenReturn(getProcTimeRequestBuilder); - - - ElasticsearchJobDetailsMapper mapper = new ElasticsearchJobDetailsMapper(client, ParseFieldMatcher.STRICT); - - Job mappedJob = mapper.map(source); - - assertEquals("foo", mappedJob.getId()); - assertEquals(42L, mappedJob.getModelSizeStats().getModelBytes()); - assertEquals(now, mappedJob.getModelSizeStats().getTimestamp()); - assertEquals(20.2, mappedJob.getAverageBucketProcessingTimeMs(), 0.0001); - } - - public void testMap_GivenModelSizeStatsDoesNotExist() throws Exception { - Job originalJob = buildJobBuilder("foo").build(); - - BytesReference source = originalJob.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).bytes(); - - GetResponse getResponse = mock(GetResponse.class); - when(getResponse.isExists()).thenReturn(false); - GetRequestBuilder getRequestBuilder = mock(GetRequestBuilder.class); - when(getRequestBuilder.get()).thenReturn(getResponse); - when(client.prepareGet("prelertresults-foo", ModelSizeStats.TYPE.getPreferredName(), ModelSizeStats.TYPE.getPreferredName())) - .thenReturn(getRequestBuilder); - - - GetResponse getProcTimeResponse = mock(GetResponse.class); - when(getProcTimeResponse.isExists()).thenReturn(false); - GetRequestBuilder getProcTimeRequestBuilder = mock(GetRequestBuilder.class); - when(getProcTimeRequestBuilder.get()).thenReturn(getProcTimeResponse); - when(client.prepareGet("prelertresults-foo", ReservedFieldNames.BUCKET_PROCESSING_TIME_TYPE, - ReservedFieldNames.AVERAGE_PROCESSING_TIME_MS)) - .thenReturn(getProcTimeRequestBuilder); - - ElasticsearchJobDetailsMapper mapper = new ElasticsearchJobDetailsMapper(client, ParseFieldMatcher.STRICT); - - Job mappedJob = mapper.map(source); - - assertEquals("foo", mappedJob.getId()); - assertNull(mappedJob.getModelSizeStats()); - assertNull(mappedJob.getAverageBucketProcessingTimeMs()); - } -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java index 505e3dd3df5..6873d74c703 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/scheduler/ScheduledJobServiceTests.java @@ -95,7 +95,7 @@ public class ScheduledJobServiceTests extends ESTestCase { Allocation allocation = new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STARTING, 0L, 60000L)); DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); - builder.setCounts(dataCounts); + when(jobManager.getJobAllocation("foo")).thenReturn(allocation); DataExtractor dataExtractor = mock(DataExtractor.class); @@ -118,7 +118,6 @@ public class ScheduledJobServiceTests extends ESTestCase { Allocation allocation = new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STARTING, 0L, null)); DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); - builder.setCounts(dataCounts); when(jobManager.getJobAllocation("foo")).thenReturn(allocation); DataExtractor dataExtractor = mock(DataExtractor.class); @@ -148,9 +147,6 @@ public class ScheduledJobServiceTests extends ESTestCase { Job.Builder builder = createScheduledJob(); Allocation allocation1 = new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STARTED, 0, null)); - DataCounts dataCounts = new DataCounts("foo"); - dataCounts.setLatestRecordTimeStamp(new Date(0)); - builder.setCounts(dataCounts); when(jobManager.getJobAllocation("foo")).thenReturn(allocation1); DataExtractor dataExtractor = mock(DataExtractor.class); diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml index bc3772cb72e..65c433a4e98 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/revert_model_snapshot.yaml @@ -216,3 +216,12 @@ setup: - match: { hitCount: 1 } - match: { hits.0.jobId: "foo" } - match: { hits.0.timestamp: 1462060800000 } + + - do: + xpack.prelert.get_job: + job_id: foo + metric: data_counts + + - match: { document.data_counts.latest_record_timestamp: 1464739200000 } + +