Remove dataCounts and modelSizeStats from Job (elastic/elasticsearch#333)
* Remove DataCounts and ModelSizeStats from Job * Delete unused ElasticsearchJobDetailsMapper Conflicts: elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobDetailsMapper.java elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobDetailsMapperTests.java * Add missing mappings for DataCounts Fixes yaml test failures * Add test to assert revert a model snapshot sets DataCounts.latest_record_timestamp * Resolve merge errors * Add NORELEASE for persisting dataCounts in cluster update Original commit: elastic/x-pack-elasticsearch@46099d4db6
This commit is contained in:
parent
617a1b65d2
commit
7c66c3a553
|
@ -61,6 +61,7 @@ import org.elasticsearch.xpack.prelert.job.metadata.JobAllocator;
|
|||
import org.elasticsearch.xpack.prelert.job.metadata.JobLifeCycleService;
|
||||
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleterFactory;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobDataCountsPersister;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider;
|
||||
import org.elasticsearch.xpack.prelert.job.process.NativeController;
|
||||
import org.elasticsearch.xpack.prelert.job.process.ProcessCtrl;
|
||||
|
@ -154,8 +155,9 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
|
|||
// For this reason we can't use interfaces in the constructor of transport actions.
|
||||
// This ok for now as we will remove Guice soon
|
||||
ElasticsearchJobProvider jobProvider = new ElasticsearchJobProvider(client, 0, parseFieldMatcherSupplier.getParseFieldMatcher());
|
||||
ElasticsearchJobDataCountsPersister jobDataCountsPersister = new ElasticsearchJobDataCountsPersister(client);
|
||||
|
||||
JobManager jobManager = new JobManager(env, settings, jobProvider, clusterService);
|
||||
JobManager jobManager = new JobManager(env, settings, jobProvider, jobDataCountsPersister, clusterService);
|
||||
AutodetectProcessFactory processFactory;
|
||||
if (USE_NATIVE_PROCESS_OPTION.get(settings)) {
|
||||
try {
|
||||
|
@ -169,8 +171,8 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
|
|||
processFactory = (JobDetails, ignoreDowntime) -> new BlackHoleAutodetectProcess();
|
||||
}
|
||||
AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier);
|
||||
DataProcessor dataProcessor =
|
||||
new AutodetectProcessManager(settings, client, env, threadPool, jobManager, autodetectResultsParser, processFactory);
|
||||
DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, env, threadPool,
|
||||
jobManager, jobProvider, autodetectResultsParser, processFactory);
|
||||
ScheduledJobService scheduledJobService = new ScheduledJobService(threadPool, client, jobProvider, dataProcessor,
|
||||
new HttpDataExtractorFactory(), System::currentTimeMillis);
|
||||
return Arrays.asList(
|
||||
|
|
|
@ -47,7 +47,7 @@ import java.util.regex.Pattern;
|
|||
public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent {
|
||||
|
||||
public static final Job PROTO = new Job(null, null, null, null, null, 0L, null, null, null, null, null,
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
null, null, null, null, null, null, null, null, null);
|
||||
|
||||
public static final long DEFAULT_BUCKETSPAN = 300;
|
||||
|
||||
|
@ -59,7 +59,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
public static final ParseField ID = new ParseField("jobId");
|
||||
public static final ParseField ANALYSIS_CONFIG = new ParseField("analysisConfig");
|
||||
public static final ParseField ANALYSIS_LIMITS = new ParseField("analysisLimits");
|
||||
public static final ParseField COUNTS = new ParseField("counts");
|
||||
public static final ParseField CREATE_TIME = new ParseField("createTime");
|
||||
public static final ParseField CUSTOM_SETTINGS = new ParseField("customSettings");
|
||||
public static final ParseField DATA_DESCRIPTION = new ParseField("dataDescription");
|
||||
|
@ -75,7 +74,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
public static final ParseField RESULTS_RETENTION_DAYS = new ParseField("resultsRetentionDays");
|
||||
public static final ParseField TIMEOUT = new ParseField("timeout");
|
||||
public static final ParseField TRANSFORMS = new ParseField("transforms");
|
||||
public static final ParseField MODEL_SIZE_STATS = new ParseField("modelSizeStats");
|
||||
public static final ParseField AVERAGE_BUCKET_PROCESSING_TIME = new ParseField("averageBucketProcessingTimeMs");
|
||||
public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("modelSnapshotId");
|
||||
|
||||
|
@ -114,10 +112,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
PARSER.declareObject(Builder::setAnalysisLimits, AnalysisLimits.PARSER, ANALYSIS_LIMITS);
|
||||
PARSER.declareObject(Builder::setSchedulerConfig, SchedulerConfig.PARSER, SCHEDULER_CONFIG);
|
||||
PARSER.declareObject(Builder::setDataDescription, DataDescription.PARSER, DATA_DESCRIPTION);
|
||||
PARSER.declareObject(Builder::setModelSizeStats, ModelSizeStats.PARSER, MODEL_SIZE_STATS);
|
||||
PARSER.declareObjectArray(Builder::setTransforms, TransformConfig.PARSER, TRANSFORMS);
|
||||
PARSER.declareObject(Builder::setModelDebugConfig, ModelDebugConfig.PARSER, MODEL_DEBUG_CONFIG);
|
||||
PARSER.declareObject(Builder::setCounts, DataCounts.PARSER, COUNTS);
|
||||
PARSER.declareField(Builder::setIgnoreDowntime, (p, c) -> IgnoreDowntime.fromString(p.text()), IGNORE_DOWNTIME, ValueType.STRING);
|
||||
PARSER.declareLong(Builder::setTimeout, TIMEOUT);
|
||||
PARSER.declareLong(Builder::setRenormalizationWindowDays, RENORMALIZATION_WINDOW_DAYS);
|
||||
|
@ -140,10 +136,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
private final AnalysisLimits analysisLimits;
|
||||
private final SchedulerConfig schedulerConfig;
|
||||
private final DataDescription dataDescription;
|
||||
private final ModelSizeStats modelSizeStats;
|
||||
private final List<TransformConfig> transforms;
|
||||
private final ModelDebugConfig modelDebugConfig;
|
||||
private final DataCounts counts;
|
||||
private final IgnoreDowntime ignoreDowntime;
|
||||
private final Long renormalizationWindowDays;
|
||||
private final Long backgroundPersistInterval;
|
||||
|
@ -155,8 +149,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
|
||||
public Job(String jobId, String description, Date createTime, Date finishedTime, Date lastDataTime, long timeout,
|
||||
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, SchedulerConfig schedulerConfig,
|
||||
DataDescription dataDescription, ModelSizeStats modelSizeStats, List<TransformConfig> transforms,
|
||||
ModelDebugConfig modelDebugConfig, DataCounts counts, IgnoreDowntime ignoreDowntime, Long renormalizationWindowDays,
|
||||
DataDescription dataDescription, List<TransformConfig> transforms,
|
||||
ModelDebugConfig modelDebugConfig, IgnoreDowntime ignoreDowntime, Long renormalizationWindowDays,
|
||||
Long backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays,
|
||||
Map<String, Object> customSettings, Double averageBucketProcessingTimeMs, String modelSnapshotId) {
|
||||
this.jobId = jobId;
|
||||
|
@ -169,10 +163,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
this.analysisLimits = analysisLimits;
|
||||
this.schedulerConfig = schedulerConfig;
|
||||
this.dataDescription = dataDescription;
|
||||
this.modelSizeStats = modelSizeStats;
|
||||
this.transforms = transforms;
|
||||
this.modelDebugConfig = modelDebugConfig;
|
||||
this.counts = counts;
|
||||
this.ignoreDowntime = ignoreDowntime;
|
||||
this.renormalizationWindowDays = renormalizationWindowDays;
|
||||
this.backgroundPersistInterval = backgroundPersistInterval;
|
||||
|
@ -194,10 +186,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
analysisLimits = in.readOptionalWriteable(AnalysisLimits::new);
|
||||
schedulerConfig = in.readOptionalWriteable(SchedulerConfig::new);
|
||||
dataDescription = in.readOptionalWriteable(DataDescription::new);
|
||||
modelSizeStats = in.readOptionalWriteable(ModelSizeStats::new);
|
||||
transforms = in.readList(TransformConfig::new);
|
||||
modelDebugConfig = in.readOptionalWriteable(ModelDebugConfig::new);
|
||||
counts = in.readOptionalWriteable(DataCounts::new);
|
||||
ignoreDowntime = in.readOptionalWriteable(IgnoreDowntime::fromStream);
|
||||
renormalizationWindowDays = in.readOptionalLong();
|
||||
backgroundPersistInterval = in.readOptionalLong();
|
||||
|
@ -321,15 +311,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
return modelDebugConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
* The memory usage object
|
||||
*
|
||||
* @return The ModelSizeStats
|
||||
*/
|
||||
public ModelSizeStats getModelSizeStats() {
|
||||
return modelSizeStats;
|
||||
}
|
||||
|
||||
/**
|
||||
* If not set the input data is assumed to be csv with a '_time' field in
|
||||
* epoch format.
|
||||
|
@ -345,15 +326,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
return transforms;
|
||||
}
|
||||
|
||||
/**
|
||||
* Processed records count
|
||||
*
|
||||
* @return the processed records counts
|
||||
*/
|
||||
public DataCounts getCounts() {
|
||||
return counts;
|
||||
}
|
||||
|
||||
/**
|
||||
* The duration of the renormalization window in days
|
||||
*
|
||||
|
@ -452,10 +424,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
out.writeOptionalWriteable(analysisLimits);
|
||||
out.writeOptionalWriteable(schedulerConfig);
|
||||
out.writeOptionalWriteable(dataDescription);
|
||||
out.writeOptionalWriteable(modelSizeStats);
|
||||
out.writeList(transforms);
|
||||
out.writeOptionalWriteable(modelDebugConfig);
|
||||
out.writeOptionalWriteable(counts);
|
||||
out.writeOptionalWriteable(ignoreDowntime);
|
||||
out.writeOptionalLong(renormalizationWindowDays);
|
||||
out.writeOptionalLong(backgroundPersistInterval);
|
||||
|
@ -495,18 +465,12 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
if (dataDescription != null) {
|
||||
builder.field(DATA_DESCRIPTION.getPreferredName(), dataDescription, params);
|
||||
}
|
||||
if (modelSizeStats != null) {
|
||||
builder.field(MODEL_SIZE_STATS.getPreferredName(), modelSizeStats, params);
|
||||
}
|
||||
if (transforms != null) {
|
||||
builder.field(TRANSFORMS.getPreferredName(), transforms);
|
||||
}
|
||||
if (modelDebugConfig != null) {
|
||||
builder.field(MODEL_DEBUG_CONFIG.getPreferredName(), modelDebugConfig, params);
|
||||
}
|
||||
if (counts != null) {
|
||||
builder.field(COUNTS.getPreferredName(), counts, params);
|
||||
}
|
||||
if (ignoreDowntime != null) {
|
||||
builder.field(IGNORE_DOWNTIME.getPreferredName(), ignoreDowntime);
|
||||
}
|
||||
|
@ -550,8 +514,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
&& Objects.equals(this.finishedTime, that.finishedTime) && Objects.equals(this.lastDataTime, that.lastDataTime)
|
||||
&& (this.timeout == that.timeout) && Objects.equals(this.analysisConfig, that.analysisConfig)
|
||||
&& Objects.equals(this.analysisLimits, that.analysisLimits) && Objects.equals(this.dataDescription, that.dataDescription)
|
||||
&& Objects.equals(this.modelDebugConfig, that.modelDebugConfig) && Objects.equals(this.modelSizeStats, that.modelSizeStats)
|
||||
&& Objects.equals(this.transforms, that.transforms) && Objects.equals(this.counts, that.counts)
|
||||
&& Objects.equals(this.modelDebugConfig, that.modelDebugConfig) && Objects.equals(this.transforms, that.transforms)
|
||||
&& Objects.equals(this.ignoreDowntime, that.ignoreDowntime)
|
||||
&& Objects.equals(this.renormalizationWindowDays, that.renormalizationWindowDays)
|
||||
&& Objects.equals(this.backgroundPersistInterval, that.backgroundPersistInterval)
|
||||
|
@ -564,7 +527,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(jobId, description, createTime, finishedTime, lastDataTime, timeout, analysisConfig,
|
||||
analysisLimits, dataDescription, modelDebugConfig, modelSizeStats, transforms, counts, renormalizationWindowDays,
|
||||
analysisLimits, dataDescription, modelDebugConfig, transforms, renormalizationWindowDays,
|
||||
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, ignoreDowntime, customSettings,
|
||||
modelSnapshotId);
|
||||
}
|
||||
|
@ -601,7 +564,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
private AnalysisLimits analysisLimits;
|
||||
private SchedulerConfig schedulerConfig;
|
||||
private List<TransformConfig> transforms = new ArrayList<>();
|
||||
private ModelSizeStats modelSizeStats;
|
||||
private DataDescription dataDescription;
|
||||
private Date createTime;
|
||||
private Date finishedTime;
|
||||
|
@ -612,7 +574,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
private Long backgroundPersistInterval;
|
||||
private Long modelSnapshotRetentionDays;
|
||||
private Long resultsRetentionDays;
|
||||
private DataCounts counts;
|
||||
private IgnoreDowntime ignoreDowntime;
|
||||
private Map<String, Object> customSettings;
|
||||
private Double averageBucketProcessingTimeMs;
|
||||
|
@ -631,7 +592,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
this.analysisConfig = job.getAnalysisConfig();
|
||||
this.schedulerConfig = job.getSchedulerConfig();
|
||||
this.transforms = job.getTransforms();
|
||||
this.modelSizeStats = job.getModelSizeStats();
|
||||
this.dataDescription = job.getDataDescription();
|
||||
this.createTime = job.getCreateTime();
|
||||
this.finishedTime = job.getFinishedTime();
|
||||
|
@ -641,7 +601,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
this.renormalizationWindowDays = job.getRenormalizationWindowDays();
|
||||
this.backgroundPersistInterval = job.getBackgroundPersistInterval();
|
||||
this.resultsRetentionDays = job.getResultsRetentionDays();
|
||||
this.counts = job.getCounts();
|
||||
this.ignoreDowntime = job.getIgnoreDowntime();
|
||||
this.customSettings = job.getCustomSettings();
|
||||
this.averageBucketProcessingTimeMs = job.getAverageBucketProcessingTimeMs();
|
||||
|
@ -697,6 +656,10 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
this.finishedTime = finishedTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the wall clock time of the last data upload
|
||||
* @param lastDataTime Wall clock time
|
||||
*/
|
||||
public void setLastDataTime(Date lastDataTime) {
|
||||
this.lastDataTime = lastDataTime;
|
||||
}
|
||||
|
@ -705,10 +668,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
this.transforms = transforms;
|
||||
}
|
||||
|
||||
public void setModelSizeStats(ModelSizeStats.Builder modelSizeStats) {
|
||||
this.modelSizeStats = modelSizeStats.build();
|
||||
}
|
||||
|
||||
public void setDataDescription(DataDescription.Builder description) {
|
||||
dataDescription = description.build();
|
||||
}
|
||||
|
@ -737,10 +696,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
this.ignoreDowntime = ignoreDowntime;
|
||||
}
|
||||
|
||||
public void setCounts(DataCounts counts) {
|
||||
this.counts = counts;
|
||||
}
|
||||
|
||||
public void setAverageBucketProcessingTimeMs(Double averageBucketProcessingTimeMs) {
|
||||
this.averageBucketProcessingTimeMs = averageBucketProcessingTimeMs;
|
||||
}
|
||||
|
@ -802,8 +757,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
Date createTime;
|
||||
Date finishedTime;
|
||||
Date lastDataTime;
|
||||
DataCounts counts;
|
||||
ModelSizeStats modelSizeStats;
|
||||
Double averageBucketProcessingTimeMs;
|
||||
String modelSnapshotId;
|
||||
if (fromApi) {
|
||||
|
@ -811,8 +764,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
createTime = this.createTime == null ? new Date() : this.createTime;
|
||||
finishedTime = null;
|
||||
lastDataTime = null;
|
||||
counts = new DataCounts(id);
|
||||
modelSizeStats = null;
|
||||
averageBucketProcessingTimeMs = null;
|
||||
modelSnapshotId = null;
|
||||
} else {
|
||||
|
@ -820,8 +771,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
createTime = this.createTime;
|
||||
finishedTime = this.finishedTime;
|
||||
lastDataTime = this.lastDataTime;
|
||||
counts = this.counts;
|
||||
modelSizeStats = this.modelSizeStats;
|
||||
averageBucketProcessingTimeMs = this.averageBucketProcessingTimeMs;
|
||||
modelSnapshotId = this.modelSnapshotId;
|
||||
}
|
||||
|
@ -833,9 +782,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
}
|
||||
return new Job(
|
||||
id, description, createTime, finishedTime, lastDataTime, timeout, analysisConfig, analysisLimits,
|
||||
schedulerConfig, dataDescription, modelSizeStats, transforms, modelDebugConfig, counts,
|
||||
ignoreDowntime, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays,
|
||||
resultsRetentionDays, customSettings, averageBucketProcessingTimeMs, modelSnapshotId
|
||||
schedulerConfig, dataDescription, transforms, modelDebugConfig, ignoreDowntime, renormalizationWindowDays,
|
||||
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
|
||||
averageBucketProcessingTimeMs, modelSnapshotId
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobDataCount
|
|||
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchPersister;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchUsagePersister;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectCommunicator;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcess;
|
||||
|
@ -51,13 +52,15 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
|
|||
private final Environment env;
|
||||
private final ThreadPool threadPool;
|
||||
private final JobManager jobManager;
|
||||
private final JobProvider jobProvider;
|
||||
private final AutodetectResultsParser parser;
|
||||
private final AutodetectProcessFactory autodetectProcessFactory;
|
||||
|
||||
private final ConcurrentMap<String, AutodetectCommunicator> autoDetectCommunicatorByJob;
|
||||
|
||||
public AutodetectProcessManager(Settings settings, Client client, Environment env, ThreadPool threadPool, JobManager jobManager,
|
||||
AutodetectResultsParser parser, AutodetectProcessFactory autodetectProcessFactory) {
|
||||
JobProvider jobProvider, AutodetectResultsParser parser,
|
||||
AutodetectProcessFactory autodetectProcessFactory) {
|
||||
super(settings);
|
||||
this.client = client;
|
||||
this.env = env;
|
||||
|
@ -65,6 +68,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
|
|||
this.parser = parser;
|
||||
this.autodetectProcessFactory = autodetectProcessFactory;
|
||||
this.jobManager = jobManager;
|
||||
this.jobProvider = jobProvider;
|
||||
this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
|
@ -105,9 +109,9 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
|
|||
ElasticsearchUsagePersister usagePersister = new ElasticsearchUsagePersister(client, jobLogger);
|
||||
UsageReporter usageReporter = new UsageReporter(settings, job.getJobId(), usagePersister, jobLogger);
|
||||
|
||||
JobDataCountsPersister jobDataCountsPersister = new ElasticsearchJobDataCountsPersister(client, jobLogger);
|
||||
StatusReporter statusReporter = new StatusReporter(env, settings, job.getJobId(), job.getCounts(), usageReporter,
|
||||
jobDataCountsPersister, jobLogger, job.getAnalysisConfig().getBucketSpanOrDefault());
|
||||
JobDataCountsPersister jobDataCountsPersister = new ElasticsearchJobDataCountsPersister(client);
|
||||
StatusReporter statusReporter = new StatusReporter(env, settings, job.getJobId(), jobProvider.dataCounts(jobId),
|
||||
usageReporter, jobDataCountsPersister, jobLogger, job.getAnalysisConfig().getBucketSpanOrDefault());
|
||||
|
||||
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, ignoreDowntime);
|
||||
JobResultsPersister persister = new ElasticsearchPersister(jobId, client);
|
||||
|
|
|
@ -35,12 +35,12 @@ import org.elasticsearch.xpack.prelert.job.logs.JobLogs;
|
|||
import org.elasticsearch.xpack.prelert.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
|
||||
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
|
||||
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
|
||||
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
@ -52,10 +52,7 @@ import java.util.stream.Collectors;
|
|||
* <ul>
|
||||
* <li>creation</li>
|
||||
* <li>deletion</li>
|
||||
* <li>flushing</li>
|
||||
* <li>updating</li>
|
||||
* <li>sending of data</li>
|
||||
* <li>fetching jobs and results</li>
|
||||
* <li>starting/stopping of scheduled jobs</li>
|
||||
* </ul>
|
||||
*/
|
||||
|
@ -70,18 +67,22 @@ public class JobManager {
|
|||
|
||||
public static final String DEFAULT_RECORD_SORT_FIELD = AnomalyRecord.PROBABILITY.getPreferredName();
|
||||
private final JobProvider jobProvider;
|
||||
private final JobDataCountsPersister jobDataCountsPersister;
|
||||
private final ClusterService clusterService;
|
||||
private final Environment env;
|
||||
private final Settings settings;
|
||||
|
||||
|
||||
/**
|
||||
* Create a JobManager
|
||||
*/
|
||||
public JobManager(Environment env, Settings settings, JobProvider jobProvider, ClusterService clusterService) {
|
||||
public JobManager(Environment env, Settings settings, JobProvider jobProvider,
|
||||
JobDataCountsPersister jobDataCountsPersister, ClusterService clusterService) {
|
||||
this.env = env;
|
||||
this.settings = settings;
|
||||
this.jobProvider = Objects.requireNonNull(jobProvider);
|
||||
this.clusterService = clusterService;
|
||||
this.jobDataCountsPersister = jobDataCountsPersister;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -458,12 +459,11 @@ public class JobManager {
|
|||
builder.setModelSnapshotId(modelSnapshot.getSnapshotId());
|
||||
if (request.getDeleteInterveningResults()) {
|
||||
builder.setIgnoreDowntime(IgnoreDowntime.NEVER);
|
||||
Date latestRecordTime = modelSnapshot.getLatestResultTimeStamp();
|
||||
LOGGER.info("Resetting latest record time to '" + latestRecordTime + "'");
|
||||
builder.setLastDataTime(latestRecordTime);
|
||||
DataCounts counts = job.getCounts();
|
||||
counts.setLatestRecordTimeStamp(latestRecordTime);
|
||||
builder.setCounts(counts);
|
||||
DataCounts counts = jobProvider.dataCounts(request.getJobId());
|
||||
counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp());
|
||||
// NORELEASE This update should be async. See #127
|
||||
jobDataCountsPersister.persistDataCounts(request.getJobId(), counts);
|
||||
|
||||
} else {
|
||||
builder.setIgnoreDowntime(IgnoreDowntime.ONCE);
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import java.util.Locale;
|
|||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
|
@ -20,12 +21,11 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|||
|
||||
public class ElasticsearchJobDataCountsPersister implements JobDataCountsPersister {
|
||||
|
||||
private static final Logger LOGGER = Loggers.getLogger(ElasticsearchJobDataCountsPersister.class);
|
||||
private Client client;
|
||||
private Logger logger;
|
||||
|
||||
public ElasticsearchJobDataCountsPersister(Client client, Logger logger) {
|
||||
public ElasticsearchJobDataCountsPersister(Client client) {
|
||||
this.client = client;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
private XContentBuilder serialiseCounts(DataCounts counts) throws IOException {
|
||||
|
@ -33,12 +33,8 @@ public class ElasticsearchJobDataCountsPersister implements JobDataCountsPersist
|
|||
return counts.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void persistDataCounts(String jobId, DataCounts counts) {
|
||||
// NORELEASE - Should these stats be stored in memory?
|
||||
|
||||
|
||||
try {
|
||||
XContentBuilder content = serialiseCounts(counts);
|
||||
|
||||
|
@ -47,11 +43,11 @@ public class ElasticsearchJobDataCountsPersister implements JobDataCountsPersist
|
|||
.setSource(content).execute().actionGet();
|
||||
}
|
||||
catch (IOException ioe) {
|
||||
logger.warn("Error serialising DataCounts stats", ioe);
|
||||
LOGGER.warn("Error serialising DataCounts stats", ioe);
|
||||
}
|
||||
catch (IndexNotFoundException e) {
|
||||
String msg = String.format(Locale.ROOT, "Error writing the job '%s' status stats.", jobId);
|
||||
logger.warn(msg, e);
|
||||
LOGGER.warn(msg, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,100 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.prelert.job.persistence;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.prelert.job.Job;
|
||||
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
|
||||
import org.elasticsearch.xpack.prelert.job.results.ReservedFieldNames;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
class ElasticsearchJobDetailsMapper {
|
||||
private static final Logger LOGGER = Loggers.getLogger(ElasticsearchJobDetailsMapper.class);
|
||||
|
||||
private final Client client;
|
||||
private final ParseFieldMatcher parseFieldMatcher;
|
||||
|
||||
public ElasticsearchJobDetailsMapper(Client client, ParseFieldMatcher parseFieldMatcher) {
|
||||
this.client = Objects.requireNonNull(client);
|
||||
this.parseFieldMatcher = Objects.requireNonNull(parseFieldMatcher);
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps an Elasticsearch source map to a {@link Job} object
|
||||
*
|
||||
* @param source The source of an Elasticsearch search response
|
||||
* @return the {@code Job} object
|
||||
*/
|
||||
public Job map(BytesReference source) {
|
||||
try (XContentParser parser = XContentFactory.xContent(source).createParser(source)) {
|
||||
Job.Builder builder = Job.PARSER.apply(parser, () -> parseFieldMatcher);
|
||||
addModelSizeStats(builder, builder.getId());
|
||||
addBucketProcessingTime(builder, builder.getId());
|
||||
return builder.build();
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchParseException("failed to parser job", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void addModelSizeStats(Job.Builder job, String jobId) {
|
||||
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
|
||||
// Pull out the modelSizeStats document, and add this to the Job
|
||||
LOGGER.trace("ES API CALL: get ID " + ModelSizeStats.TYPE +
|
||||
" type " + ModelSizeStats.TYPE + " from index " + indexName);
|
||||
GetResponse modelSizeStatsResponse = client.prepareGet(
|
||||
indexName, ModelSizeStats.TYPE.getPreferredName(), ModelSizeStats.TYPE.getPreferredName()).get();
|
||||
|
||||
if (!modelSizeStatsResponse.isExists()) {
|
||||
String msg = "No memory usage details for job with id " + jobId;
|
||||
LOGGER.warn(msg);
|
||||
} else {
|
||||
// Remove the Kibana/Logstash '@timestamp' entry as stored in Elasticsearch,
|
||||
// and replace using the API 'timestamp' key.
|
||||
Object timestamp = modelSizeStatsResponse.getSource().remove(ElasticsearchMappings.ES_TIMESTAMP);
|
||||
modelSizeStatsResponse.getSource().put(ModelSizeStats.TIMESTAMP_FIELD.getPreferredName(), timestamp);
|
||||
BytesReference source = modelSizeStatsResponse.getSourceAsBytesRef();
|
||||
XContentParser parser;
|
||||
try {
|
||||
parser = XContentFactory.xContent(source).createParser(source);
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchParseException("failed to parser model size stats", e);
|
||||
}
|
||||
ModelSizeStats.Builder modelSizeStats = ModelSizeStats.PARSER.apply(parser, () -> parseFieldMatcher);
|
||||
job.setModelSizeStats(modelSizeStats);
|
||||
}
|
||||
}
|
||||
|
||||
private void addBucketProcessingTime(Job.Builder job, String jobId) {
|
||||
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
|
||||
// Pull out the modelSizeStats document, and add this to the Job
|
||||
LOGGER.trace("ES API CALL: get ID " + ReservedFieldNames.BUCKET_PROCESSING_TIME_TYPE +
|
||||
" type " + ReservedFieldNames.AVERAGE_PROCESSING_TIME_MS + " from index " + indexName);
|
||||
GetResponse procTimeResponse = client.prepareGet(
|
||||
indexName, ReservedFieldNames.BUCKET_PROCESSING_TIME_TYPE,
|
||||
ReservedFieldNames.AVERAGE_PROCESSING_TIME_MS).get();
|
||||
|
||||
if (!procTimeResponse.isExists()) {
|
||||
String msg = "No average bucket processing time details for job with id " + jobId;
|
||||
LOGGER.warn(msg);
|
||||
} else {
|
||||
Object averageTime = procTimeResponse.getSource()
|
||||
.get(ReservedFieldNames.AVERAGE_PROCESSING_TIME_MS);
|
||||
if (averageTime instanceof Double) {
|
||||
job.setAverageBucketProcessingTimeMs((Double) averageTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -307,6 +307,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataCounts dataCounts(String jobId) {
|
||||
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
|
||||
|
||||
|
|
|
@ -107,6 +107,9 @@ public class ElasticsearchMappings {
|
|||
.field(ANALYZER, WHITESPACE)
|
||||
.endObject()
|
||||
.startObject(PROPERTIES)
|
||||
.startObject(Job.ID.getPreferredName())
|
||||
.field(TYPE, KEYWORD)
|
||||
.endObject()
|
||||
.startObject(DataCounts.PROCESSED_RECORD_COUNT.getPreferredName())
|
||||
.field(TYPE, LONG)
|
||||
.endObject()
|
||||
|
@ -131,6 +134,9 @@ public class ElasticsearchMappings {
|
|||
.startObject(DataCounts.OUT_OF_ORDER_TIME_COUNT.getPreferredName())
|
||||
.field(TYPE, LONG)
|
||||
.endObject()
|
||||
.startObject(DataCounts.EARLIEST_RECORD_TIME.getPreferredName())
|
||||
.field(TYPE, DATE)
|
||||
.endObject()
|
||||
.startObject(DataCounts.LATEST_RECORD_TIME.getPreferredName())
|
||||
.field(TYPE, DATE)
|
||||
.endObject()
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.prelert.job.persistence;
|
|||
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.xpack.prelert.job.DataCounts;
|
||||
import org.elasticsearch.xpack.prelert.job.Job;
|
||||
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
|
||||
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
|
||||
|
@ -80,6 +81,13 @@ public interface JobProvider extends JobResultsProvider {
|
|||
*/
|
||||
Optional<ModelSizeStats> modelSizeStats(String jobId);
|
||||
|
||||
/**
|
||||
* Get the job's data counts
|
||||
* @param jobId The job id
|
||||
* @return The dataCounts or default constructed object if not found
|
||||
*/
|
||||
DataCounts dataCounts(String jobId);
|
||||
|
||||
/**
|
||||
* Retrieves the list with the given {@code listId} from the datastore.
|
||||
*
|
||||
|
|
|
@ -122,6 +122,7 @@ public final class ReservedFieldNames {
|
|||
DataCounts.MISSING_FIELD_COUNT.getPreferredName(),
|
||||
DataCounts.OUT_OF_ORDER_TIME_COUNT.getPreferredName(),
|
||||
DataCounts.LATEST_RECORD_TIME.getPreferredName(),
|
||||
DataCounts.EARLIEST_RECORD_TIME.getPreferredName(),
|
||||
|
||||
Influence.INFLUENCER_FIELD_NAME.getPreferredName(),
|
||||
Influence.INFLUENCER_FIELD_VALUES.getPreferredName(),
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.prelert.PrelertPlugin;
|
||||
import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction;
|
||||
import org.elasticsearch.xpack.prelert.job.DataCounts;
|
||||
import org.elasticsearch.xpack.prelert.job.Job;
|
||||
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
|
||||
import org.elasticsearch.xpack.prelert.job.SchedulerState;
|
||||
|
@ -176,7 +177,7 @@ public class ScheduledJobService extends AbstractComponent {
|
|||
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(job);
|
||||
ScheduledJob scheduledJob = new ScheduledJob(job.getJobId(), frequency.toMillis(), queryDelay.toMillis(),
|
||||
dataExtractor, dataProcessor, auditor, currentTimeSupplier, getLatestFinalBucketEndTimeMs(job),
|
||||
getLatestRecordTimestamp(job));
|
||||
getLatestRecordTimestamp(job.getJobId()));
|
||||
return new Holder(scheduledJob, new ProblemTracker(() -> auditor));
|
||||
}
|
||||
|
||||
|
@ -200,10 +201,11 @@ public class ScheduledJobService extends AbstractComponent {
|
|||
return latestFinalBucketEndMs;
|
||||
}
|
||||
|
||||
private long getLatestRecordTimestamp(Job job) {
|
||||
private long getLatestRecordTimestamp(String jobId) {
|
||||
long latestRecordTimeMs = -1L;
|
||||
if (job.getCounts() != null && job.getCounts().getLatestRecordTimeStamp() != null) {
|
||||
latestRecordTimeMs = job.getCounts().getLatestRecordTimeStamp().getTime();
|
||||
DataCounts dataCounts = jobProvider.dataCounts(jobId);
|
||||
if (dataCounts.getLatestRecordTimeStamp() != null) {
|
||||
latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime();
|
||||
}
|
||||
return latestRecordTimeMs;
|
||||
}
|
||||
|
|
|
@ -47,14 +47,12 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase<GetJob
|
|||
SchedulerConfig.Builder schedulerConfig = new SchedulerConfig.Builder(SchedulerConfig.DataSource.FILE);
|
||||
schedulerConfig.setFilePath("/file/path");
|
||||
DataDescription dataDescription = randomBoolean() ? new DataDescription.Builder().build() : null;
|
||||
ModelSizeStats modelSizeStats = randomBoolean() ? new ModelSizeStats.Builder("foo").build() : null;
|
||||
int numTransformers = randomIntBetween(0, 32);
|
||||
List<TransformConfig> transformConfigList = new ArrayList<>(numTransformers);
|
||||
for (int i = 0; i < numTransformers; i++) {
|
||||
transformConfigList.add(new TransformConfig(TransformType.UPPERCASE.prettyName()));
|
||||
}
|
||||
ModelDebugConfig modelDebugConfig = randomBoolean() ? new ModelDebugConfig(randomDouble(), randomAsciiOfLength(10)) : null;
|
||||
DataCounts counts = randomBoolean() ? new DataCounts(jobId) : null;
|
||||
IgnoreDowntime ignoreDowntime = randomFrom(IgnoreDowntime.values());
|
||||
Long normalizationWindowDays = randomBoolean() ? randomLong() : null;
|
||||
Long backgroundPersistInterval = randomBoolean() ? randomLong() : null;
|
||||
|
@ -65,8 +63,8 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase<GetJob
|
|||
Double averageBucketProcessingTimeMs = randomBoolean() ? randomDouble() : null;
|
||||
String modelSnapshotId = randomBoolean() ? randomAsciiOfLength(10) : null;
|
||||
Job job = new Job(jobId, description, createTime, finishedTime, lastDataTime,
|
||||
timeout, analysisConfig, analysisLimits, schedulerConfig.build(), dataDescription, modelSizeStats, transformConfigList,
|
||||
modelDebugConfig, counts, ignoreDowntime, normalizationWindowDays, backgroundPersistInterval,
|
||||
timeout, analysisConfig, analysisLimits, schedulerConfig.build(), dataDescription, transformConfigList,
|
||||
modelDebugConfig, ignoreDowntime, normalizationWindowDays, backgroundPersistInterval,
|
||||
modelSnapshotRetentionDays, resultsRetentionDays, customConfig, averageBucketProcessingTimeMs, modelSnapshotId);
|
||||
|
||||
|
||||
|
|
|
@ -60,7 +60,6 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
|
|||
assertNull(job.getIgnoreDowntime());
|
||||
assertNull(job.getLastDataTime());
|
||||
assertNull(job.getModelDebugConfig());
|
||||
assertNull(job.getModelSizeStats());
|
||||
assertNull(job.getRenormalizationWindowDays());
|
||||
assertNull(job.getBackgroundPersistInterval());
|
||||
assertNull(job.getModelSnapshotRetentionDays());
|
||||
|
@ -595,9 +594,6 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
|
|||
if (randomBoolean()) {
|
||||
builder.setDataDescription(new DataDescription.Builder());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
builder.setModelSizeStats(new ModelSizeStats.Builder("foo"));
|
||||
}
|
||||
String[] outputs;
|
||||
TransformType[] transformTypes ;
|
||||
AnalysisConfig ac = analysisConfig.build();
|
||||
|
@ -619,7 +615,6 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
|
|||
if (randomBoolean()) {
|
||||
builder.setModelDebugConfig(new ModelDebugConfig(randomDouble(), randomAsciiOfLength(10)));
|
||||
}
|
||||
builder.setCounts(new DataCounts(jobId));
|
||||
builder.setIgnoreDowntime(randomFrom(IgnoreDowntime.values()));
|
||||
if (randomBoolean()) {
|
||||
builder.setRenormalizationWindowDays(randomPositiveLong());
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.elasticsearch.xpack.prelert.job.Detector;
|
|||
import org.elasticsearch.xpack.prelert.job.Job;
|
||||
import org.elasticsearch.xpack.prelert.job.JobStatus;
|
||||
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectCommunicator;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcessFactory;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.output.parsing.AutodetectResultsParser;
|
||||
|
@ -51,10 +52,12 @@ import static org.mockito.Mockito.spy;
|
|||
public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
|
||||
private JobManager jobManager;
|
||||
private JobProvider jobProvider;
|
||||
|
||||
@Before
|
||||
public void initMocks() {
|
||||
jobManager = Mockito.mock(JobManager.class);
|
||||
jobProvider = Mockito.mock(JobProvider.class);
|
||||
givenAllocationWithStatus(JobStatus.CLOSED);
|
||||
}
|
||||
|
||||
|
@ -187,8 +190,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
|
||||
AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class);
|
||||
AutodetectProcessManager manager =
|
||||
new AutodetectProcessManager(Settings.EMPTY, client, environment, threadPool, jobManager, parser, autodetectProcessFactory);
|
||||
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, environment, threadPool,
|
||||
jobManager, jobProvider, parser, autodetectProcessFactory);
|
||||
manager = spy(manager);
|
||||
doReturn(communicator).when(manager).create(any(), anyBoolean());
|
||||
return manager;
|
||||
|
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.xpack.prelert.job.Job;
|
||||
import org.elasticsearch.xpack.prelert.job.audit.Auditor;
|
||||
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
|
||||
import org.junit.Before;
|
||||
|
@ -39,12 +40,14 @@ public class JobManagerTests extends ESTestCase {
|
|||
|
||||
private ClusterService clusterService;
|
||||
private JobProvider jobProvider;
|
||||
private JobDataCountsPersister jobDataCountsPersister;
|
||||
private Auditor auditor;
|
||||
|
||||
@Before
|
||||
public void setupMocks() {
|
||||
clusterService = mock(ClusterService.class);
|
||||
jobProvider = mock(JobProvider.class);
|
||||
jobDataCountsPersister = mock(JobDataCountsPersister.class);
|
||||
auditor = mock(Auditor.class);
|
||||
when(jobProvider.audit(anyString())).thenReturn(auditor);
|
||||
}
|
||||
|
@ -188,7 +191,7 @@ public class JobManagerTests extends ESTestCase {
|
|||
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
|
||||
Environment env = new Environment(
|
||||
settings);
|
||||
return new JobManager(env, settings, jobProvider, clusterService);
|
||||
return new JobManager(env, settings, jobProvider, jobDataCountsPersister, clusterService);
|
||||
}
|
||||
|
||||
private ClusterState createClusterState() {
|
||||
|
|
|
@ -1,127 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.prelert.job.persistence;
|
||||
|
||||
import org.elasticsearch.action.get.GetRequestBuilder;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.prelert.job.Job;
|
||||
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
|
||||
import org.elasticsearch.xpack.prelert.job.results.ReservedFieldNames;
|
||||
import org.junit.Before;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class ElasticsearchJobDetailsMapperTests extends ESTestCase {
|
||||
private Client client;
|
||||
|
||||
@Before
|
||||
public void setUpMocks() {
|
||||
client = Mockito.mock(Client.class);
|
||||
}
|
||||
|
||||
public void testMap_GivenJobSourceCannotBeParsed() {
|
||||
BytesArray source = new BytesArray("{ \"invalidKey\": true }");
|
||||
|
||||
GetResponse getResponse = mock(GetResponse.class);
|
||||
when(getResponse.isExists()).thenReturn(false);
|
||||
GetRequestBuilder getRequestBuilder = mock(GetRequestBuilder.class);
|
||||
when(getRequestBuilder.get()).thenReturn(getResponse);
|
||||
when(client.prepareGet("prelertresults-foo", ModelSizeStats.TYPE.getPreferredName(), ModelSizeStats.TYPE.getPreferredName()))
|
||||
.thenReturn(getRequestBuilder);
|
||||
|
||||
ElasticsearchJobDetailsMapper mapper = new ElasticsearchJobDetailsMapper(client, ParseFieldMatcher.STRICT);
|
||||
|
||||
ESTestCase.expectThrows(IllegalArgumentException.class, () -> mapper.map(source));
|
||||
}
|
||||
|
||||
public void testMap_GivenModelSizeStatsExists() throws Exception {
|
||||
ModelSizeStats.Builder modelSizeStats = new ModelSizeStats.Builder("foo");
|
||||
modelSizeStats.setModelBytes(42L);
|
||||
Date now = new Date();
|
||||
modelSizeStats.setTimestamp(now);
|
||||
|
||||
Job originalJob = buildJobBuilder("foo").build();
|
||||
|
||||
BytesReference source = originalJob.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).bytes();
|
||||
BytesReference modelSizeStatsSource = modelSizeStats.build().toXContent(
|
||||
XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).bytes();
|
||||
|
||||
GetResponse getModelSizeResponse = mock(GetResponse.class);
|
||||
when(getModelSizeResponse.isExists()).thenReturn(true);
|
||||
when(getModelSizeResponse.getSourceAsBytesRef()).thenReturn(modelSizeStatsSource);
|
||||
GetRequestBuilder getModelSizeRequestBuilder = mock(GetRequestBuilder.class);
|
||||
when(getModelSizeRequestBuilder.get()).thenReturn(getModelSizeResponse);
|
||||
when(client.prepareGet("prelertresults-foo", ModelSizeStats.TYPE.getPreferredName(), ModelSizeStats.TYPE.getPreferredName()))
|
||||
.thenReturn(getModelSizeRequestBuilder);
|
||||
|
||||
|
||||
Map<String, Object> procTimeSource = new HashMap<>();
|
||||
procTimeSource.put(ReservedFieldNames.AVERAGE_PROCESSING_TIME_MS, 20.2);
|
||||
|
||||
GetResponse getProcTimeResponse = mock(GetResponse.class);
|
||||
when(getProcTimeResponse.isExists()).thenReturn(true);
|
||||
when(getProcTimeResponse.getSource()).thenReturn(procTimeSource);
|
||||
GetRequestBuilder getProcTimeRequestBuilder = mock(GetRequestBuilder.class);
|
||||
when(getProcTimeRequestBuilder.get()).thenReturn(getProcTimeResponse);
|
||||
when(client.prepareGet("prelertresults-foo", ReservedFieldNames.BUCKET_PROCESSING_TIME_TYPE,
|
||||
ReservedFieldNames.AVERAGE_PROCESSING_TIME_MS))
|
||||
.thenReturn(getProcTimeRequestBuilder);
|
||||
|
||||
|
||||
ElasticsearchJobDetailsMapper mapper = new ElasticsearchJobDetailsMapper(client, ParseFieldMatcher.STRICT);
|
||||
|
||||
Job mappedJob = mapper.map(source);
|
||||
|
||||
assertEquals("foo", mappedJob.getId());
|
||||
assertEquals(42L, mappedJob.getModelSizeStats().getModelBytes());
|
||||
assertEquals(now, mappedJob.getModelSizeStats().getTimestamp());
|
||||
assertEquals(20.2, mappedJob.getAverageBucketProcessingTimeMs(), 0.0001);
|
||||
}
|
||||
|
||||
public void testMap_GivenModelSizeStatsDoesNotExist() throws Exception {
|
||||
Job originalJob = buildJobBuilder("foo").build();
|
||||
|
||||
BytesReference source = originalJob.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).bytes();
|
||||
|
||||
GetResponse getResponse = mock(GetResponse.class);
|
||||
when(getResponse.isExists()).thenReturn(false);
|
||||
GetRequestBuilder getRequestBuilder = mock(GetRequestBuilder.class);
|
||||
when(getRequestBuilder.get()).thenReturn(getResponse);
|
||||
when(client.prepareGet("prelertresults-foo", ModelSizeStats.TYPE.getPreferredName(), ModelSizeStats.TYPE.getPreferredName()))
|
||||
.thenReturn(getRequestBuilder);
|
||||
|
||||
|
||||
GetResponse getProcTimeResponse = mock(GetResponse.class);
|
||||
when(getProcTimeResponse.isExists()).thenReturn(false);
|
||||
GetRequestBuilder getProcTimeRequestBuilder = mock(GetRequestBuilder.class);
|
||||
when(getProcTimeRequestBuilder.get()).thenReturn(getProcTimeResponse);
|
||||
when(client.prepareGet("prelertresults-foo", ReservedFieldNames.BUCKET_PROCESSING_TIME_TYPE,
|
||||
ReservedFieldNames.AVERAGE_PROCESSING_TIME_MS))
|
||||
.thenReturn(getProcTimeRequestBuilder);
|
||||
|
||||
ElasticsearchJobDetailsMapper mapper = new ElasticsearchJobDetailsMapper(client, ParseFieldMatcher.STRICT);
|
||||
|
||||
Job mappedJob = mapper.map(source);
|
||||
|
||||
assertEquals("foo", mappedJob.getId());
|
||||
assertNull(mappedJob.getModelSizeStats());
|
||||
assertNull(mappedJob.getAverageBucketProcessingTimeMs());
|
||||
}
|
||||
}
|
|
@ -95,7 +95,7 @@ public class ScheduledJobServiceTests extends ESTestCase {
|
|||
Allocation allocation =
|
||||
new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STARTING, 0L, 60000L));
|
||||
DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
|
||||
builder.setCounts(dataCounts);
|
||||
|
||||
when(jobManager.getJobAllocation("foo")).thenReturn(allocation);
|
||||
|
||||
DataExtractor dataExtractor = mock(DataExtractor.class);
|
||||
|
@ -118,7 +118,6 @@ public class ScheduledJobServiceTests extends ESTestCase {
|
|||
Allocation allocation =
|
||||
new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STARTING, 0L, null));
|
||||
DataCounts dataCounts = new DataCounts("foo", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
|
||||
builder.setCounts(dataCounts);
|
||||
when(jobManager.getJobAllocation("foo")).thenReturn(allocation);
|
||||
|
||||
DataExtractor dataExtractor = mock(DataExtractor.class);
|
||||
|
@ -148,9 +147,6 @@ public class ScheduledJobServiceTests extends ESTestCase {
|
|||
Job.Builder builder = createScheduledJob();
|
||||
Allocation allocation1 =
|
||||
new Allocation("_nodeId", "foo", JobStatus.RUNNING, new SchedulerState(JobSchedulerStatus.STARTED, 0, null));
|
||||
DataCounts dataCounts = new DataCounts("foo");
|
||||
dataCounts.setLatestRecordTimeStamp(new Date(0));
|
||||
builder.setCounts(dataCounts);
|
||||
when(jobManager.getJobAllocation("foo")).thenReturn(allocation1);
|
||||
|
||||
DataExtractor dataExtractor = mock(DataExtractor.class);
|
||||
|
|
|
@ -216,3 +216,12 @@ setup:
|
|||
- match: { hitCount: 1 }
|
||||
- match: { hits.0.jobId: "foo" }
|
||||
- match: { hits.0.timestamp: 1462060800000 }
|
||||
|
||||
- do:
|
||||
xpack.prelert.get_job:
|
||||
job_id: foo
|
||||
metric: data_counts
|
||||
|
||||
- match: { document.data_counts.latest_record_timestamp: 1464739200000 }
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue