Make persisters node level services and remove the notion of a job logger,

the job id should always be included into the log message itself

Original commit: elastic/x-pack-elasticsearch@7dc6464a9a
This commit is contained in:
Martijn van Groningen 2016-11-24 11:09:43 +01:00
parent 26e3ca9155
commit 04968d3ee6
42 changed files with 798 additions and 1145 deletions

View File

@ -58,8 +58,9 @@ import org.elasticsearch.xpack.prelert.job.metadata.JobAllocator;
import org.elasticsearch.xpack.prelert.job.metadata.JobLifeCycleService;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleterFactory;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.process.NativeController;
import org.elasticsearch.xpack.prelert.job.process.ProcessCtrl;
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcessFactory;
@ -151,10 +152,11 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
// `bind(Implementation.class).toInstance(INSTANCE);`
// For this reason we can't use interfaces in the constructor of transport actions.
// This ok for now as we will remove Guice soon
JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client);
ElasticsearchJobProvider jobProvider = new ElasticsearchJobProvider(client, 0, parseFieldMatcherSupplier.getParseFieldMatcher());
ElasticsearchJobDataCountsPersister jobDataCountsPersister = new ElasticsearchJobDataCountsPersister(client);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client);
JobManager jobManager = new JobManager(env, settings, jobProvider, jobDataCountsPersister, clusterService);
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, jobDataCountsPersister, clusterService);
AutodetectProcessFactory processFactory;
if (USE_NATIVE_PROCESS_OPTION.get(settings)) {
try {
@ -168,8 +170,8 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
processFactory = (JobDetails, ignoreDowntime, executorService) -> new BlackHoleAutodetectProcess();
}
AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier);
DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, env, threadPool,
jobManager, jobProvider, autodetectResultsParser, processFactory);
DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory);
ScheduledJobService scheduledJobService = new ScheduledJobService(threadPool, client, jobProvider, dataProcessor,
new HttpDataExtractorFactory(), System::currentTimeMillis);
return Arrays.asList(

View File

@ -32,6 +32,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
@ -240,12 +241,14 @@ PutModelSnapshotDescriptionAction.RequestBuilder> {
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final JobManager jobManager;
private final ElasticsearchJobProvider jobProvider;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, ElasticsearchJobProvider jobProvider) {
IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager, ElasticsearchJobProvider jobProvider) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.jobManager = jobManager;
this.jobProvider = jobProvider;
}
@ -264,7 +267,7 @@ PutModelSnapshotDescriptionAction.RequestBuilder> {
}
ModelSnapshot modelSnapshot = changeCandidates.get(0);
modelSnapshot.setDescription(request.getDescriptionString());
jobProvider.updateModelSnapshot(request.getJobId(), modelSnapshot, false);
jobManager.updateModelSnapshot(request.getJobId(), modelSnapshot, false);
modelSnapshot.setDescription(request.getDescriptionString());

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.prelert.job.logging;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
@ -36,10 +37,11 @@ import java.util.concurrent.TimeoutException;
*/
public class CppLogMessageHandler implements Closeable {
private static final Logger LOGGER = Loggers.getLogger(CppLogMessageHandler.class);
private static final int DEFAULT_READBUF_SIZE = 1024;
private static final int DEFAULT_ERROR_STORE_SIZE = 5;
private final Logger logger;
private final String jobId;
private final InputStream inputStream;
private final int readBufSize;
private final int errorStoreSize;
@ -54,15 +56,14 @@ public class CppLogMessageHandler implements Closeable {
* @param inputStream May not be null.
*/
public CppLogMessageHandler(String jobId, InputStream inputStream) {
this(inputStream, Strings.isNullOrEmpty(jobId) ? Loggers.getLogger(CppLogMessageHandler.class) : Loggers.getLogger(jobId),
DEFAULT_READBUF_SIZE, DEFAULT_ERROR_STORE_SIZE);
this(inputStream, jobId, DEFAULT_READBUF_SIZE, DEFAULT_ERROR_STORE_SIZE);
}
/**
* For testing - allows meddling with the logger, read buffer size and error store size.
*/
CppLogMessageHandler(InputStream inputStream, Logger logger, int readBufSize, int errorStoreSize) {
this.logger = Objects.requireNonNull(logger);
CppLogMessageHandler(InputStream inputStream, String jobId, int readBufSize, int errorStoreSize) {
this.jobId = jobId;
this.inputStream = Objects.requireNonNull(inputStream);
this.readBufSize = readBufSize;
this.errorStoreSize = errorStoreSize;
@ -179,10 +180,20 @@ public class CppLogMessageHandler implements Closeable {
pidLatch.countDown();
}
// TODO: Is there a way to preserve the original timestamp when re-logging?
logger.log(level, "{}/{} {}@{} {}", msg.getLogger(), latestPid, msg.getFile(), msg.getLine(), msg.getMessage());
if (jobId != null) {
LOGGER.log(level, "[{}] {}/{} {}@{} {}", jobId, msg.getLogger(), latestPid, msg.getFile(), msg.getLine(),
msg.getMessage());
} else {
LOGGER.log(level, "{}/{} {}@{} {}", msg.getLogger(), latestPid, msg.getFile(), msg.getLine(), msg.getMessage());
}
// TODO: Could send the message for indexing instead of or as well as logging it
} catch (IOException e) {
logger.warn("Failed to parse C++ log message: " + bytesRef.utf8ToString(), e);
if (jobId != null) {
LOGGER.warn(new ParameterizedMessage("[{}] Failed to parse C++ log message: {}",
new Object[] {jobId, bytesRef.utf8ToString()}, e));
} else {
LOGGER.warn(new ParameterizedMessage("Failed to parse C++ log message: {}", new Object[] {bytesRef.utf8ToString()}, e));
}
}
}

View File

@ -5,15 +5,12 @@
*/
package org.elasticsearch.xpack.prelert.job.manager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
@ -23,10 +20,8 @@ import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.data.DataProcessor;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchPersister;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchUsagePersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.UsagePersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectCommunicator;
@ -59,8 +54,6 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
public static final Setting<Integer> MAX_RUNNING_JOBS_PER_NODE =
Setting.intSetting("max_running_jobs", 10, Setting.Property.NodeScope);
private final Client client;
private final Environment env;
private final int maxRunningJobs;
private final ThreadPool threadPool;
private final JobManager jobManager;
@ -68,20 +61,30 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
private final AutodetectResultsParser parser;
private final AutodetectProcessFactory autodetectProcessFactory;
private final UsagePersister usagePersister;
private final StateProcessor stateProcessor;
private final JobResultsPersister jobResultsPersister;
private final JobDataCountsPersister jobDataCountsPersister;
private final ConcurrentMap<String, AutodetectCommunicator> autoDetectCommunicatorByJob;
public AutodetectProcessManager(Settings settings, Client client, Environment env, ThreadPool threadPool, JobManager jobManager,
JobProvider jobProvider, AutodetectResultsParser parser,
public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager,
JobProvider jobProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser,
AutodetectProcessFactory autodetectProcessFactory) {
super(settings);
this.client = client;
this.env = env;
this.threadPool = threadPool;
this.maxRunningJobs = MAX_RUNNING_JOBS_PER_NODE.get(settings);
this.parser = parser;
this.autodetectProcessFactory = autodetectProcessFactory;
this.jobManager = jobManager;
this.jobProvider = jobProvider;
this.jobResultsPersister = jobResultsPersister;
this.stateProcessor = new StateProcessor(settings, jobResultsPersister);
this.usagePersister = new UsagePersister(settings, client);
this.jobDataCountsPersister = jobDataCountsPersister;
this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>();
}
@ -114,27 +117,22 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
RestStatus.TOO_MANY_REQUESTS);
}
// TODO norelease, once we remove black hole process and all persisters are singletons then we can
// remove this method and move not enough threads logic to the auto detect process factory
// TODO norelease, once we remove black hole process
// then we can remove this method and move not enough threads logic to the auto detect process factory
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
Logger jobLogger = Loggers.getLogger(job.getJobId());
// A TP with no queue, so that we fail immediately if there are no threads available
ExecutorService executorService = threadPool.executor(PrelertPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME);
ElasticsearchUsagePersister usagePersister = new ElasticsearchUsagePersister(client, jobLogger);
UsageReporter usageReporter = new UsageReporter(settings, job.getJobId(), usagePersister, jobLogger);
JobDataCountsPersister jobDataCountsPersister = new ElasticsearchJobDataCountsPersister(client);
JobResultsPersister persister = new ElasticsearchPersister(jobId, client);
StatusReporter statusReporter = new StatusReporter(env, settings, job.getJobId(), jobProvider.dataCounts(jobId),
usageReporter, jobDataCountsPersister, jobLogger, job.getAnalysisConfig().getBucketSpanOrDefault());
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(new NoOpRenormaliser(), persister, parser);
StateProcessor stateProcessor = new StateProcessor(settings, persister);
UsageReporter usageReporter = new UsageReporter(settings, job.getJobId(), usagePersister);
StatusReporter statusReporter =
new StatusReporter(settings, job.getJobId(), jobProvider.dataCounts(jobId), usageReporter, jobDataCountsPersister);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(new NoOpRenormaliser(), jobResultsPersister, parser);
AutodetectProcess process = null;
try {
process = autodetectProcessFactory.createAutodetectProcess(job, ignoreDowntime, executorService);
// TODO Port the normalizer from the old project
return new AutodetectCommunicator(executorService, job, process, jobLogger, statusReporter, processor, stateProcessor);
return new AutodetectCommunicator(executorService, job, process, statusReporter, processor, stateProcessor);
} catch (Exception e) {
try {
IOUtils.close(process);
@ -193,11 +191,11 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
}
}
public int numberOfRunningJobs() {
int numberOfRunningJobs() {
return autoDetectCommunicatorByJob.size();
}
public boolean jobHasActiveAutodetectProcess(String jobId) {
boolean jobHasActiveAutodetectProcess(String jobId) {
return autoDetectCommunicatorByJob.get(jobId) != null;
}

View File

@ -13,7 +13,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
@ -38,6 +38,7 @@ import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
@ -58,7 +59,7 @@ import java.util.stream.Collectors;
* <li>starting/stopping of scheduled jobs</li>
* </ul>
*/
public class JobManager {
public class JobManager extends AbstractComponent {
private static final Logger LOGGER = Loggers.getLogger(JobManager.class);
@ -69,21 +70,20 @@ public class JobManager {
public static final String DEFAULT_RECORD_SORT_FIELD = AnomalyRecord.PROBABILITY.getPreferredName();
private final JobProvider jobProvider;
private final JobDataCountsPersister jobDataCountsPersister;
private final ClusterService clusterService;
private final Environment env;
private final Settings settings;
private final JobResultsPersister jobResultsPersister;
private final JobDataCountsPersister jobDataCountsPersister;
/**
* Create a JobManager
*/
public JobManager(Environment env, Settings settings, JobProvider jobProvider,
public JobManager(Settings settings, JobProvider jobProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister, ClusterService clusterService) {
this.env = env;
this.settings = settings;
super(settings);
this.jobProvider = Objects.requireNonNull(jobProvider);
this.clusterService = clusterService;
this.jobResultsPersister = jobResultsPersister;
this.jobDataCountsPersister = jobDataCountsPersister;
}
@ -542,6 +542,32 @@ public class JobManager {
});
}
/**
* Update a persisted model snapshot metadata document to match the
* argument supplied.
*
* @param jobId the job id
* @param modelSnapshot the updated model snapshot object to be stored
* @param restoreModelSizeStats should the model size stats in this
* snapshot be made the current ones for this job?
*/
public void updateModelSnapshot(String jobId, ModelSnapshot modelSnapshot, boolean restoreModelSizeStats) {
// For Elasticsearch the update can be done in exactly the same way as
// the original persist
jobResultsPersister.persistModelSnapshot(modelSnapshot);
if (restoreModelSizeStats) {
if (modelSnapshot.getModelSizeStats() != null) {
jobResultsPersister.persistModelSizeStats(modelSnapshot.getModelSizeStats());
}
if (modelSnapshot.getQuantiles() != null) {
jobResultsPersister.persistQuantiles(modelSnapshot.getQuantiles());
}
}
// Commit so that when the REST API call that triggered the update
// returns the updated document is searchable
jobResultsPersister.commitWrites(jobId);
}
private ClusterState innerSetJobStatus(String jobId, JobStatus newStatus, ClusterState currentState) {
Allocation allocation = getJobAllocation(jobId);
Allocation.Builder builder = new Allocation.Builder(allocation);

View File

@ -17,11 +17,10 @@ import java.io.IOException;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
class ElasticsearchBatchedBucketsIterator extends ElasticsearchBatchedDocumentsIterator<Bucket>
{
public ElasticsearchBatchedBucketsIterator(Client client, String jobId, ParseFieldMatcher parserFieldMatcher)
{
super(client, ElasticsearchPersister.getJobIndexName(jobId), parserFieldMatcher);
class ElasticsearchBatchedBucketsIterator extends ElasticsearchBatchedDocumentsIterator<Bucket> {
public ElasticsearchBatchedBucketsIterator(Client client, String jobId, ParseFieldMatcher parserFieldMatcher) {
super(client, JobResultsPersister.getJobIndexName(jobId), parserFieldMatcher);
}
@Override
@ -31,8 +30,7 @@ class ElasticsearchBatchedBucketsIterator extends ElasticsearchBatchedDocumentsI
}
@Override
protected Bucket map(SearchHit hit)
{
protected Bucket map(SearchHit hit) {
BytesReference source = hit.getSourceRef();
XContentParser parser;
try {

View File

@ -22,7 +22,7 @@ class ElasticsearchBatchedInfluencersIterator extends ElasticsearchBatchedDocume
public ElasticsearchBatchedInfluencersIterator(Client client, String jobId,
ParseFieldMatcher parserFieldMatcher)
{
super(client, ElasticsearchPersister.getJobIndexName(jobId), parserFieldMatcher);
super(client, JobResultsPersister.getJobIndexName(jobId), parserFieldMatcher);
}
@Override

View File

@ -15,13 +15,14 @@ import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
class ElasticsearchBatchedModelDebugOutputIterator extends ElasticsearchBatchedDocumentsIterator<ModelDebugOutput>
{
public ElasticsearchBatchedModelDebugOutputIterator(Client client, String jobId, ParseFieldMatcher parserFieldMatcher)
{
super(client, ElasticsearchPersister.getJobIndexName(jobId), parserFieldMatcher);
super(client, JobResultsPersister.getJobIndexName(jobId), parserFieldMatcher);
}
@Override

View File

@ -21,7 +21,7 @@ public class ElasticsearchBatchedModelSizeStatsIterator extends ElasticsearchBat
{
public ElasticsearchBatchedModelSizeStatsIterator(Client client, String jobId, ParseFieldMatcher parserFieldMatcher)
{
super(client, ElasticsearchPersister.getJobIndexName(jobId), parserFieldMatcher);
super(client, JobResultsPersister.getJobIndexName(jobId), parserFieldMatcher);
}
@Override

View File

@ -21,7 +21,7 @@ class ElasticsearchBatchedModelSnapshotIterator extends ElasticsearchBatchedDocu
{
public ElasticsearchBatchedModelSnapshotIterator(Client client, String jobId, ParseFieldMatcher parserFieldMatcher)
{
super(client, ElasticsearchPersister.getJobIndexName(jobId), parserFieldMatcher);
super(client, JobResultsPersister.getJobIndexName(jobId), parserFieldMatcher);
}
@Override

View File

@ -72,7 +72,7 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
deleteRecords(bucket);
deleteBucketInfluencers(bucket);
bulkRequestBuilder.add(
client.prepareDelete(ElasticsearchPersister.getJobIndexName(jobId), Bucket.TYPE.getPreferredName(), bucket.getId()));
client.prepareDelete(JobResultsPersister.getJobIndexName(jobId), Bucket.TYPE.getPreferredName(), bucket.getId()));
++deletedBucketCount;
}
@ -94,7 +94,7 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
boolean finished = false;
while (finished == false) {
SearchResponse searchResponse = SearchAction.INSTANCE.newRequestBuilder(client)
.setIndices(ElasticsearchPersister.getJobIndexName(jobId))
.setIndices(JobResultsPersister.getJobIndexName(jobId))
.setTypes(type)
.setQuery(query)
.addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC))
@ -115,7 +115,7 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
private void addDeleteRequest(SearchHit hit) {
DeleteRequestBuilder deleteRequest = DeleteAction.INSTANCE.newRequestBuilder(client)
.setIndex(ElasticsearchPersister.getJobIndexName(jobId))
.setIndex(JobResultsPersister.getJobIndexName(jobId))
.setType(hit.getType())
.setId(hit.getId());
SearchHitField parentField = hit.field(ElasticsearchMappings.PARENT);
@ -151,7 +151,7 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
return;
}
bulkRequestBuilder.add(
client.prepareDelete(ElasticsearchPersister.getJobIndexName(jobId), Influencer.TYPE.getPreferredName(), id));
client.prepareDelete(JobResultsPersister.getJobIndexName(jobId), Influencer.TYPE.getPreferredName(), id));
++deletedInfluencerCount;
}
@ -159,7 +159,7 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
public void deleteModelSnapshot(ModelSnapshot modelSnapshot) {
String snapshotId = modelSnapshot.getSnapshotId();
int docCount = modelSnapshot.getSnapshotDocCount();
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
String indexName = JobResultsPersister.getJobIndexName(jobId);
// Deduce the document IDs of the state documents from the information
// in the snapshot document - we cannot query the state itself as it's
// too big and has no mappings
@ -179,19 +179,19 @@ public class ElasticsearchBulkDeleter implements JobDataDeleter {
public void deleteModelDebugOutput(ModelDebugOutput modelDebugOutput) {
String id = modelDebugOutput.getId();
bulkRequestBuilder.add(
client.prepareDelete(ElasticsearchPersister.getJobIndexName(jobId), ModelDebugOutput.TYPE.getPreferredName(), id));
client.prepareDelete(JobResultsPersister.getJobIndexName(jobId), ModelDebugOutput.TYPE.getPreferredName(), id));
}
@Override
public void deleteModelSizeStats(ModelSizeStats modelSizeStats) {
bulkRequestBuilder.add(client.prepareDelete(
ElasticsearchPersister.getJobIndexName(jobId), ModelSizeStats.TYPE.getPreferredName(), modelSizeStats.getId()));
JobResultsPersister.getJobIndexName(jobId), ModelSizeStats.TYPE.getPreferredName(), modelSizeStats.getId()));
}
public void deleteInterimResults() {
QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true);
SearchResponse searchResponse = client.prepareSearch(ElasticsearchPersister.getJobIndexName(jobId))
SearchResponse searchResponse = client.prepareSearch(JobResultsPersister.getJobIndexName(jobId))
.setTypes(Bucket.TYPE.getPreferredName(), AnomalyRecord.TYPE.getPreferredName(), Influencer.TYPE.getPreferredName(),
BucketInfluencer.TYPE.getPreferredName())
.setQuery(qb)

View File

@ -1,53 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import java.io.IOException;
import java.util.Locale;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class ElasticsearchJobDataCountsPersister implements JobDataCountsPersister {
private static final Logger LOGGER = Loggers.getLogger(ElasticsearchJobDataCountsPersister.class);
private Client client;
public ElasticsearchJobDataCountsPersister(Client client) {
this.client = client;
}
private XContentBuilder serialiseCounts(DataCounts counts) throws IOException {
XContentBuilder builder = jsonBuilder();
return counts.toXContent(builder, ToXContent.EMPTY_PARAMS);
}
@Override
public void persistDataCounts(String jobId, DataCounts counts) {
try {
XContentBuilder content = serialiseCounts(counts);
client.prepareIndex(ElasticsearchPersister.getJobIndexName(jobId), DataCounts.TYPE.getPreferredName(),
jobId + DataCounts.DOCUMENT_SUFFIX)
.setSource(content).execute().actionGet();
}
catch (IOException ioe) {
LOGGER.warn("Error serialising DataCounts stats", ioe);
}
catch (IndexNotFoundException e) {
String msg = String.format(Locale.ROOT, "Error writing the job '%s' status stats.", jobId);
LOGGER.warn(msg, e);
}
}
}

View File

@ -237,7 +237,7 @@ public class ElasticsearchJobProvider implements JobProvider
String jobId = job.getId();
LOGGER.trace("ES API CALL: create index " + job.getId());
CreateIndexRequest createIndexRequest = new CreateIndexRequest(ElasticsearchPersister.getJobIndexName(jobId));
CreateIndexRequest createIndexRequest = new CreateIndexRequest(JobResultsPersister.getJobIndexName(jobId));
createIndexRequest.settings(prelertIndexSettings());
createIndexRequest.mapping(Bucket.TYPE.getPreferredName(), bucketMapping);
createIndexRequest.mapping(BucketInfluencer.TYPE.getPreferredName(), bucketInfluencerMapping);
@ -272,7 +272,7 @@ public class ElasticsearchJobProvider implements JobProvider
@Override
public void deleteJobRelatedIndices(String jobId, ActionListener<DeleteJobAction.Response> listener) {
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
String indexName = JobResultsPersister.getJobIndexName(jobId);
LOGGER.trace("ES API CALL: delete index " + indexName);
try {
@ -295,7 +295,7 @@ public class ElasticsearchJobProvider implements JobProvider
@Override
public DataCounts dataCounts(String jobId) {
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
String indexName = JobResultsPersister.getJobIndexName(jobId);
try {
GetResponse response = client.prepareGet(indexName, DataCounts.TYPE.getPreferredName(),
@ -378,7 +378,7 @@ public class ElasticsearchJobProvider implements JobProvider
QueryBuilder fb, SortBuilder<?> sb) throws ResourceNotFoundException {
SearchResponse searchResponse;
try {
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
String indexName = JobResultsPersister.getJobIndexName(jobId);
LOGGER.trace("ES API CALL: search all of type " + Bucket.TYPE +
" from index " + indexName + " sort ascending " + ElasticsearchMappings.ES_TIMESTAMP +
" with filter after sort from " + from + " size " + size);
@ -416,7 +416,7 @@ public class ElasticsearchJobProvider implements JobProvider
@Override
public QueryPage<Bucket> bucket(String jobId, BucketQueryBuilder.BucketQuery query) throws ResourceNotFoundException {
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
String indexName = JobResultsPersister.getJobIndexName(jobId);
SearchHits hits;
try {
LOGGER.trace("ES API CALL: get Bucket with timestamp " + query.getTimestamp() +
@ -502,7 +502,7 @@ public class ElasticsearchJobProvider implements JobProvider
FieldSortBuilder sb = new FieldSortBuilder(ElasticsearchMappings.ES_TIMESTAMP)
.order(SortOrder.ASC);
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
String indexName = JobResultsPersister.getJobIndexName(jobId);
SearchRequestBuilder searchBuilder = client
.prepareSearch(indexName)
.setPostFilter(qb)
@ -623,7 +623,7 @@ public class ElasticsearchJobProvider implements JobProvider
@Override
public QueryPage<CategoryDefinition> categoryDefinitions(String jobId, int from, int size) {
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
String indexName = JobResultsPersister.getJobIndexName(jobId);
LOGGER.trace("ES API CALL: search all of type " + CategoryDefinition.TYPE +
" from index " + indexName + " sort ascending " + CategoryDefinition.CATEGORY_ID +
" from " + from + " size " + size);
@ -658,7 +658,7 @@ public class ElasticsearchJobProvider implements JobProvider
@Override
public QueryPage<CategoryDefinition> categoryDefinition(String jobId, String categoryId) {
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
String indexName = JobResultsPersister.getJobIndexName(jobId);
GetResponse response;
try {
@ -720,7 +720,7 @@ public class ElasticsearchJobProvider implements JobProvider
private QueryPage<AnomalyRecord> records(String jobId, int from, int size,
QueryBuilder recordFilter, FieldSortBuilder sb, List<String> secondarySort,
boolean descending) throws ResourceNotFoundException {
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
String indexName = JobResultsPersister.getJobIndexName(jobId);
recordFilter = new BoolQueryBuilder()
.filter(recordFilter)
@ -785,7 +785,7 @@ public class ElasticsearchJobProvider implements JobProvider
private QueryPage<Influencer> influencers(String jobId, int from, int size, QueryBuilder filterBuilder, String sortField,
boolean sortDescending) throws ResourceNotFoundException {
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
String indexName = JobResultsPersister.getJobIndexName(jobId);
LOGGER.trace("ES API CALL: search all of type " + Influencer.TYPE + " from index " + indexName
+ ((sortField != null)
? " with sort " + (sortDescending ? "descending" : "ascending") + " on field " + esSortField(sortField) : "")
@ -862,7 +862,7 @@ public class ElasticsearchJobProvider implements JobProvider
@Override
public Optional<Quantiles> getQuantiles(String jobId)
{
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
String indexName = JobResultsPersister.getJobIndexName(jobId);
try
{
LOGGER.trace("ES API CALL: get ID " + Quantiles.QUANTILES_ID +
@ -935,7 +935,7 @@ public class ElasticsearchJobProvider implements JobProvider
SearchResponse searchResponse;
try
{
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
String indexName = JobResultsPersister.getJobIndexName(jobId);
LOGGER.trace("ES API CALL: search all of type " + ModelSnapshot.TYPE +
" from index " + indexName + " sort ascending " + esSortField(sortField) +
" with filter after sort from " + from + " size " + size);
@ -984,34 +984,8 @@ public class ElasticsearchJobProvider implements JobProvider
return new QueryPage<>(results, searchResponse.getHits().getTotalHits(), ModelSnapshot.RESULTS_FIELD);
}
@Override
public void updateModelSnapshot(String jobId, ModelSnapshot modelSnapshot,
boolean restoreModelSizeStats)
{
// For Elasticsearch the update can be done in exactly the same way as
// the original persist
ElasticsearchPersister persister = new ElasticsearchPersister(jobId, client);
persister.persistModelSnapshot(modelSnapshot);
if (restoreModelSizeStats)
{
if (modelSnapshot.getModelSizeStats() != null)
{
persister.persistModelSizeStats(modelSnapshot.getModelSizeStats());
}
if (modelSnapshot.getQuantiles() != null)
{
persister.persistQuantiles(modelSnapshot.getQuantiles());
}
}
// Commit so that when the REST API call that triggered the update
// returns the updated document is searchable
persister.commitWrites();
}
public void restoreStateToStream(String jobId, ModelSnapshot modelSnapshot, OutputStream restoreStream) throws IOException {
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
String indexName = JobResultsPersister.getJobIndexName(jobId);
// First try to restore categorizer state. There are no snapshots for this, so the IDs simply
// count up until a document is not found. It's NOT an error to have no categorizer state.
@ -1084,26 +1058,20 @@ public class ElasticsearchJobProvider implements JobProvider
}
@Override
public Optional<ModelSizeStats> modelSizeStats(String jobId)
{
String indexName = ElasticsearchPersister.getJobIndexName(jobId);
try
{
public Optional<ModelSizeStats> modelSizeStats(String jobId) {
String indexName = JobResultsPersister.getJobIndexName(jobId);
try {
LOGGER.trace("ES API CALL: get ID " + ModelSizeStats.TYPE +
" type " + ModelSizeStats.TYPE + " from index " + indexName);
GetResponse modelSizeStatsResponse = client.prepareGet(
indexName, ModelSizeStats.TYPE.getPreferredName(), ModelSizeStats.TYPE.getPreferredName()).get();
if (!modelSizeStatsResponse.isExists())
{
if (!modelSizeStatsResponse.isExists()) {
String msg = "No memory usage details for job with id " + jobId;
LOGGER.warn(msg);
return Optional.empty();
}
else
{
} else {
BytesReference source = modelSizeStatsResponse.getSourceAsBytesRef();
XContentParser parser;
try {
@ -1114,9 +1082,7 @@ public class ElasticsearchJobProvider implements JobProvider
ModelSizeStats modelSizeStats = ModelSizeStats.PARSER.apply(parser, () -> parseFieldMatcher).build();
return Optional.of(modelSizeStats);
}
}
catch (IndexNotFoundException e)
{
} catch (IndexNotFoundException e) {
LOGGER.warn("Missing index " + indexName, e);
return Optional.empty();
}

View File

@ -1,556 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.ReservedFieldNames;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Map.Entry;
import java.util.function.Supplier;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* Saves result Buckets and Quantiles to Elasticsearch<br>
*
* <b>Buckets</b> are written with the following structure:
* <h2>Bucket</h2> The results of each job are stored in buckets, this is the
* top level structure for the results. A bucket contains multiple anomaly
* records. The anomaly score of the bucket may not match the summed score of
* all the records as all the records may not have been outputted for the
* bucket.
* <h2>Anomaly Record</h2> In Elasticsearch records have a parent &lt;-&lt;
* child relationship with buckets and should only exist is relation to a parent
* bucket. Each record was generated by a detector which can be identified via
* the detectorIndex field.
* <h2>Detector</h2> The Job has a fixed number of detectors but there may not
* be output for every detector in each bucket. <br>
* <b>Quantiles</b> may contain model quantiles used in normalisation and are
* stored in documents of type {@link Quantiles#TYPE} <br>
* <h2>ModelSizeStats</h2> This is stored in a flat structure <br>
*
* @see org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchMappings
*/
public class ElasticsearchPersister implements JobResultsPersister, JobRenormaliser
{
private static final Logger LOGGER = Loggers.getLogger(ElasticsearchPersister.class);
public static final String INDEX_PREFIX = "prelertresults-";
public static String getJobIndexName(String jobId) {
return INDEX_PREFIX + jobId;
}
private final Client client;
// TODO norelease: remove this field, the job id can be interred from most of method's paramters here and for cases
// where there are now parameters we can supply the job id. This way we don't have to create an instance of the class
// per job
private final String jobId;
/**
* Create with the Elasticsearch client. Data will be written to
* the index <code>jobId</code>
*
* @param jobId The job Id/Elasticsearch index
* @param client The Elasticsearch client
*/
public ElasticsearchPersister(String jobId, Client client)
{
this.jobId = jobId;
this.client = client;
}
@Override
public void persistBucket(Bucket bucket)
{
if (bucket.getRecords() == null)
{
return;
}
try
{
XContentBuilder content = serialiseWithJobId(Bucket.TYPE.getPreferredName(), bucket);
String indexName = getJobIndexName(jobId);
LOGGER.trace("ES API CALL: index type " + Bucket.TYPE +
" to index " + indexName + " at epoch " + bucket.getEpoch());
IndexResponse response = client.prepareIndex(indexName, Bucket.TYPE.getPreferredName())
.setSource(content)
.execute().actionGet();
bucket.setId(response.getId());
persistBucketInfluencersStandalone(bucket.getId(), bucket.getBucketInfluencers(),
bucket.getTimestamp(), bucket.isInterim());
if (bucket.getInfluencers() != null && bucket.getInfluencers().isEmpty() == false)
{
BulkRequestBuilder addInfluencersRequest = client.prepareBulk();
for (Influencer influencer : bucket.getInfluencers())
{
influencer.setTimestamp(bucket.getTimestamp());
influencer.setInterim(bucket.isInterim());
content = serialiseWithJobId(Influencer.TYPE.getPreferredName(), influencer);
LOGGER.trace("ES BULK ACTION: index type " + Influencer.TYPE +
" to index " + indexName + " with auto-generated ID");
addInfluencersRequest.add(
client.prepareIndex(indexName, Influencer.TYPE.getPreferredName())
.setSource(content));
}
LOGGER.trace("ES API CALL: bulk request with " + addInfluencersRequest.numberOfActions() + " actions");
BulkResponse addInfluencersResponse = addInfluencersRequest.execute().actionGet();
if (addInfluencersResponse.hasFailures())
{
LOGGER.error("Bulk index of Influencers has errors: "
+ addInfluencersResponse.buildFailureMessage());
}
}
if (bucket.getRecords().isEmpty() == false)
{
BulkRequestBuilder addRecordsRequest = client.prepareBulk();
for (AnomalyRecord record : bucket.getRecords())
{
record.setTimestamp(bucket.getTimestamp());
content = serialiseWithJobId(AnomalyRecord.TYPE.getPreferredName(), record);
LOGGER.trace("ES BULK ACTION: index type " + AnomalyRecord.TYPE +
" to index " + indexName + " with auto-generated ID, for bucket "
+ bucket.getId());
addRecordsRequest.add(client.prepareIndex(indexName, AnomalyRecord.TYPE.getPreferredName())
.setSource(content));
}
LOGGER.trace("ES API CALL: bulk request with " + addRecordsRequest.numberOfActions() + " actions");
BulkResponse addRecordsResponse = addRecordsRequest.execute().actionGet();
if (addRecordsResponse.hasFailures())
{
LOGGER.error("Bulk index of AnomalyRecord has errors: "
+ addRecordsResponse.buildFailureMessage());
}
}
persistPerPartitionMaxProbabilities(bucket);
}
catch (IOException e)
{
LOGGER.error("Error writing bucket state", e);
}
}
private void persistPerPartitionMaxProbabilities(Bucket bucket)
{
if (bucket.getPerPartitionMaxProbability().isEmpty())
{
return;
}
try
{
XContentBuilder builder = jsonBuilder();
builder.startObject()
.field(ElasticsearchMappings.ES_TIMESTAMP, bucket.getTimestamp())
.field(Job.ID.getPreferredName(), jobId);
builder.startArray(ReservedFieldNames.PARTITION_NORMALIZED_PROBS);
for (Entry<String, Double> entry : bucket.getPerPartitionMaxProbability().entrySet())
{
builder.startObject()
.field(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), entry.getKey())
.field(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName(), entry.getValue())
.endObject();
}
builder.endArray().endObject();
String indexName = getJobIndexName(jobId);
LOGGER.trace("ES API CALL: index type " + ReservedFieldNames.PARTITION_NORMALIZED_PROB_TYPE +
" to index " + indexName + " at epoch " + bucket.getEpoch());
client.prepareIndex(indexName, ReservedFieldNames.PARTITION_NORMALIZED_PROB_TYPE)
.setSource(builder)
.setId(bucket.getId())
.execute().actionGet();
}
catch (IOException e)
{
LOGGER.error("Error updating bucket per partition max normalized scores", e);
return;
}
}
@Override
public void persistCategoryDefinition(CategoryDefinition category)
{
Persistable persistable = new Persistable(category, () -> CategoryDefinition.TYPE.getPreferredName(),
() -> String.valueOf(category.getCategoryId()),
() -> serialiseCategoryDefinition(category));
persistable.persist();
// Don't commit as we expect masses of these updates and they're not
// read again by this process
}
/**
* The quantiles objects are written with a fixed ID, so that the
* latest quantiles will overwrite the previous ones. For each ES index,
* which corresponds to a job, there can only be one quantiles document.
* @param quantiles If <code>null</code> then returns straight away.
*/
@Override
public void persistQuantiles(Quantiles quantiles)
{
Persistable persistable = new Persistable(quantiles, () -> Quantiles.TYPE.getPreferredName(),
() -> Quantiles.QUANTILES_ID, () -> serialiseWithJobId(Quantiles.TYPE.getPreferredName(), quantiles));
if (persistable.persist())
{
// Refresh the index when persisting quantiles so that previously
// persisted results will be available for searching. Do this using the
// indices API rather than the index API (used to write the quantiles
// above), because this will refresh all shards rather than just the
// shard that the quantiles document itself was written to.
commitWrites();
}
}
/**
* Write a model snapshot description to Elasticsearch. Note that this is
* only the description - the actual model state is persisted separately.
* @param modelSnapshot If <code>null</code> then returns straight away.
*/
@Override
public void persistModelSnapshot(ModelSnapshot modelSnapshot)
{
Persistable persistable = new Persistable(modelSnapshot, () -> ModelSnapshot.TYPE.getPreferredName(),
() -> modelSnapshot.getSnapshotId(), () -> serialiseWithJobId(ModelSnapshot.TYPE.getPreferredName(), modelSnapshot));
persistable.persist();
}
/**
* Persist the memory usage data
* @param modelSizeStats If <code>null</code> then returns straight away.
*/
@Override
public void persistModelSizeStats(ModelSizeStats modelSizeStats)
{
LOGGER.trace("Persisting model size stats, for size " + modelSizeStats.getModelBytes());
Persistable persistable = new Persistable(modelSizeStats, () -> ModelSizeStats.TYPE.getPreferredName(),
() -> modelSizeStats.getId(),
() -> serialiseWithJobId(ModelSizeStats.TYPE.getPreferredName(), modelSizeStats));
persistable.persist();
persistable = new Persistable(modelSizeStats, () -> ModelSizeStats.TYPE.getPreferredName(),
() -> null,
() -> serialiseWithJobId(ModelSizeStats.TYPE.getPreferredName(), modelSizeStats));
persistable.persist();
// Don't commit as we expect masses of these updates and they're only
// for information at the API level
}
/**
* Persist model debug output
* @param modelDebugOutput If <code>null</code> then returns straight away.
*/
@Override
public void persistModelDebugOutput(ModelDebugOutput modelDebugOutput)
{
Persistable persistable = new Persistable(modelDebugOutput, () -> ModelDebugOutput.TYPE.getPreferredName(),
() -> null, () -> serialiseWithJobId(ModelDebugOutput.TYPE.getPreferredName(), modelDebugOutput));
persistable.persist();
// Don't commit as we expect masses of these updates and they're not
// read again by this process
}
@Override
public void persistInfluencer(Influencer influencer)
{
Persistable persistable = new Persistable(influencer, () -> Influencer.TYPE.getPreferredName(),
() -> influencer.getId(), () -> serialiseWithJobId(Influencer.TYPE.getPreferredName(), influencer));
persistable.persist();
// Don't commit as we expect masses of these updates and they're not
// read again by this process
}
@Override
public void persistBulkState(BytesReference bytesRef) {
try {
// No validation - assume the native process has formatted the state correctly
byte[] bytes = bytesRef.toBytesRef().bytes;
LOGGER.trace("ES API CALL: bulk index");
client.prepareBulk()
.add(bytes, 0, bytes.length)
.execute().actionGet();
} catch (Exception e) {
LOGGER.error("Error persisting bulk state", e);
}
}
/**
* Refreshes the Elasticsearch index.
* Blocks until results are searchable.
*/
@Override
public boolean commitWrites()
{
String indexName = getJobIndexName(jobId);
// Refresh should wait for Lucene to make the data searchable
LOGGER.trace("ES API CALL: refresh index " + indexName);
client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
return true;
}
@Override
public void updateBucket(Bucket bucket)
{
try
{
String indexName = getJobIndexName(jobId);
LOGGER.trace("ES API CALL: index type " + Bucket.TYPE +
" to index " + indexName + " with ID " + bucket.getId());
client.prepareIndex(indexName, Bucket.TYPE.getPreferredName(), bucket.getId())
.setSource(serialiseWithJobId(Bucket.TYPE.getPreferredName(), bucket)).execute().actionGet();
}
catch (IOException e)
{
LOGGER.error("Error updating bucket state", e);
return;
}
// If the update to the bucket was successful, also update the
// standalone copies of the nested bucket influencers
try
{
persistBucketInfluencersStandalone(bucket.getId(), bucket.getBucketInfluencers(),
bucket.getTimestamp(), bucket.isInterim());
}
catch (IOException e)
{
LOGGER.error("Error updating standalone bucket influencer state", e);
return;
}
persistPerPartitionMaxProbabilities(bucket);
}
private void persistBucketInfluencersStandalone(String bucketId, List<BucketInfluencer> bucketInfluencers,
Date bucketTime, boolean isInterim) throws IOException
{
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false)
{
BulkRequestBuilder addBucketInfluencersRequest = client.prepareBulk();
for (BucketInfluencer bucketInfluencer : bucketInfluencers)
{
XContentBuilder content = serialiseBucketInfluencerStandalone(bucketInfluencer,
bucketTime, isInterim);
// Need consistent IDs to ensure overwriting on renormalisation
String id = bucketId + bucketInfluencer.getInfluencerFieldName();
String indexName = getJobIndexName(jobId);
LOGGER.trace("ES BULK ACTION: index type " + BucketInfluencer.TYPE +
" to index " + indexName + " with ID " + id);
addBucketInfluencersRequest.add(
client.prepareIndex(indexName, BucketInfluencer.TYPE.getPreferredName(), id)
.setSource(content));
}
LOGGER.trace("ES API CALL: bulk request with " + addBucketInfluencersRequest.numberOfActions() + " actions");
BulkResponse addBucketInfluencersResponse = addBucketInfluencersRequest.execute().actionGet();
if (addBucketInfluencersResponse.hasFailures())
{
LOGGER.error("Bulk index of Bucket Influencers has errors: "
+ addBucketInfluencersResponse.buildFailureMessage());
}
}
}
@Override
public void updateRecords(String bucketId, List<AnomalyRecord> records)
{
try
{
// Now bulk update the records within the bucket
BulkRequestBuilder bulkRequest = client.prepareBulk();
boolean addedAny = false;
for (AnomalyRecord record : records)
{
String recordId = record.getId();
String indexName = getJobIndexName(jobId);
LOGGER.trace("ES BULK ACTION: update ID " + recordId + " type " + AnomalyRecord.TYPE +
" in index " + indexName + " using map of new values, for bucket " +
bucketId);
bulkRequest.add(
client.prepareIndex(indexName, AnomalyRecord.TYPE.getPreferredName(), recordId)
.setSource(serialiseWithJobId(AnomalyRecord.TYPE.getPreferredName(), record))
// Need to specify the parent ID when updating a child
.setParent(bucketId));
addedAny = true;
}
if (addedAny)
{
LOGGER.trace("ES API CALL: bulk request with " +
bulkRequest.numberOfActions() + " actions");
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures())
{
LOGGER.error("BulkResponse has errors: " + bulkResponse.buildFailureMessage());
}
}
}
catch (IOException | ElasticsearchException e)
{
LOGGER.error("Error updating anomaly records", e);
}
}
@Override
public void updateInfluencer(Influencer influencer)
{
persistInfluencer(influencer);
}
@Override
public void deleteInterimResults()
{
ElasticsearchBulkDeleter deleter = new ElasticsearchBulkDeleter(client, jobId, true);
deleter.deleteInterimResults();
// NOCOMMIT This is called from AutodetectResultsParser, feels wrong...
deleter.commit(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
// don't care?
}
@Override
public void onFailure(Exception e) {
// don't care?
}
});
}
private interface Serialiser
{
XContentBuilder serialise() throws IOException;
}
private class Persistable
{
private final Object object;
private final Supplier<String> typeSupplier;
private final Supplier<String> idSupplier;
private final Serialiser serialiser;
Persistable(Object object, Supplier<String> typeSupplier, Supplier<String> idSupplier,
Serialiser serialiser)
{
this.object = object;
this.typeSupplier = typeSupplier;
this.idSupplier = idSupplier;
this.serialiser = serialiser;
}
boolean persist()
{
String type = typeSupplier.get();
String id = idSupplier.get();
if (object == null)
{
LOGGER.warn("No " + type + " to persist for job " + jobId);
return false;
}
logCall(type, id);
try
{
String indexName = getJobIndexName(jobId);
client.prepareIndex(indexName, type, idSupplier.get())
.setSource(serialiser.serialise())
.execute().actionGet();
return true;
}
catch (IOException e)
{
LOGGER.error("Error writing " + typeSupplier.get(), e);
return false;
}
}
private void logCall(String type, String id)
{
String indexName = getJobIndexName(jobId);
String msg = "ES API CALL: index type " + type + " to index " + indexName;
if (id != null)
{
msg += " with ID " + idSupplier.get();
}
else
{
msg += " with auto-generated ID";
}
LOGGER.trace(msg);
}
}
private XContentBuilder serialiseWithJobId(String objField, ToXContent obj) throws IOException
{
XContentBuilder builder = jsonBuilder();
obj.toXContent(builder, ToXContent.EMPTY_PARAMS);
return builder;
}
private XContentBuilder serialiseCategoryDefinition(CategoryDefinition categoryDefinition)
throws IOException
{
XContentBuilder builder = jsonBuilder();
categoryDefinition.toXContent(builder, ToXContent.EMPTY_PARAMS);
return builder;
}
private XContentBuilder serialiseBucketInfluencerStandalone(BucketInfluencer bucketInfluencer,
Date bucketTime, boolean isInterim) throws IOException
{
BucketInfluencer influencer = new BucketInfluencer(bucketInfluencer);
influencer.setIsInterim(isInterim);
influencer.setTimestamp(bucketTime);
XContentBuilder builder = jsonBuilder();
influencer.toXContent(builder, ToXContent.EMPTY_PARAMS);
return builder;
}
}

View File

@ -1,86 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import static org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider.PRELERT_USAGE_INDEX;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.prelert.job.usage.Usage;
public class ElasticsearchUsagePersister implements UsagePersister {
private static final String USAGE_DOC_ID_PREFIX = "usage-";
private final Client client;
private final Logger logger;
private final DateTimeFormatter dateTimeFormatter;
private final Map<String, Object> upsertMap;
private String docId;
public ElasticsearchUsagePersister(Client client, Logger logger) {
this.client = client;
this.logger = logger;
dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXX", Locale.ROOT);
upsertMap = new HashMap<>();
upsertMap.put(ElasticsearchMappings.ES_TIMESTAMP, "");
upsertMap.put(Usage.INPUT_BYTES, null);
}
@Override
public void persistUsage(String jobId, long bytesRead, long fieldsRead, long recordsRead) {
ZonedDateTime nowTruncatedToHour = ZonedDateTime.now().truncatedTo(ChronoUnit.HOURS);
String formattedNowTruncatedToHour = nowTruncatedToHour.format(dateTimeFormatter);
docId = USAGE_DOC_ID_PREFIX + formattedNowTruncatedToHour;
upsertMap.put(ElasticsearchMappings.ES_TIMESTAMP, formattedNowTruncatedToHour);
// update global count
updateDocument(PRELERT_USAGE_INDEX, docId, bytesRead, fieldsRead, recordsRead);
updateDocument(ElasticsearchPersister.getJobIndexName(jobId), docId, bytesRead,
fieldsRead, recordsRead);
}
/**
* Update the metering document in the given index/id.
* Uses a script to update the volume field and 'upsert'
* to create the doc if it doesn't exist.
*
* @param index the index to persist to
* @param id Doc id is also its timestamp
* @param additionalBytes Add this value to the running total
* @param additionalFields Add this value to the running total
* @param additionalRecords Add this value to the running total
*/
private void updateDocument(String index, String id, long additionalBytes, long additionalFields, long additionalRecords) {
upsertMap.put(Usage.INPUT_BYTES, additionalBytes);
upsertMap.put(Usage.INPUT_FIELD_COUNT, additionalFields);
upsertMap.put(Usage.INPUT_RECORD_COUNT, additionalRecords);
logger.trace("ES API CALL: upsert ID " + id +
" type " + Usage.TYPE + " in index " + index +
" by running Groovy script update-usage with arguments bytes=" + additionalBytes +
" fieldCount=" + additionalFields + " recordCount=" + additionalRecords);
try {
ElasticsearchScripts.upsertViaScript(client, index, Usage.TYPE, id,
ElasticsearchScripts.newUpdateUsage(additionalBytes, additionalFields,
additionalRecords),
upsertMap);
} catch (VersionConflictEngineException e) {
logger.error("Failed to update the Usage document [" + id +"] in index [" + index + "]", e);
}
}
}

View File

@ -5,19 +5,57 @@
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import java.io.IOException;
import java.util.Locale;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister.getJobIndexName;
/**
* Update a job's dataCounts
* i.e. the number of processed records, fields etc.
*/
public interface JobDataCountsPersister
{
public class JobDataCountsPersister extends AbstractComponent {
private final Client client;
public JobDataCountsPersister(Settings settings, Client client) {
super(settings);
this.client = client;
}
private XContentBuilder serialiseCounts(DataCounts counts) throws IOException {
XContentBuilder builder = jsonBuilder();
return counts.toXContent(builder, ToXContent.EMPTY_PARAMS);
}
/**
* Update the job's data counts stats and figures.
*
* @param jobId Job to update
* @param counts The counts
*/
void persistDataCounts(String jobId, DataCounts counts);
public void persistDataCounts(String jobId, DataCounts counts) {
try {
XContentBuilder content = serialiseCounts(counts);
client.prepareIndex(getJobIndexName(jobId), DataCounts.TYPE.getPreferredName(),
jobId + DataCounts.DOCUMENT_SUFFIX)
.setSource(content).execute().actionGet();
} catch (IOException ioe) {
logger.warn((Supplier<?>)() -> new ParameterizedMessage("[{}] Error serialising DataCounts stats", jobId), ioe);
} catch (IndexNotFoundException e) {
String msg = String.format(Locale.ROOT, "[%s] Error writing status stats.", jobId);
logger.warn(msg, e);
}
}
}

View File

@ -55,18 +55,6 @@ public interface JobProvider extends JobResultsProvider {
String startEpochMs, String endEpochMs, String sortField, boolean sortDescending,
String snapshotId, String description);
/**
* Update a persisted model snapshot metadata document to match the
* argument supplied.
*
* @param jobId the job id
* @param modelSnapshot the updated model snapshot object to be stored
* @param restoreModelSizeStats should the model size stats in this
* snapshot be made the current ones for this job?
*/
void updateModelSnapshot(String jobId, ModelSnapshot modelSnapshot,
boolean restoreModelSizeStats);
/**
* Given a model snapshot, get the corresponding state and write it to the supplied
* stream. If there are multiple state documents they are separated using <code>'\0'</code>

View File

@ -5,10 +5,18 @@
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import java.io.IOException;
import java.util.List;
@ -17,15 +25,46 @@ import java.util.List;
* for a particular job with new normalised anomaly scores and
* unusual scores
*/
public interface JobRenormaliser
{
public class JobRenormaliser extends AbstractComponent {
private final Client client;
private final JobResultsPersister jobResultsPersister;
public JobRenormaliser(Settings settings, Client client, JobResultsPersister jobResultsPersister) {
super(settings);
this.client = client;
this.jobResultsPersister = jobResultsPersister;
}
/**
* Update the bucket with the changes that may result
* due to renormalisation.
*
* @param bucket the bucket to update
*/
void updateBucket(Bucket bucket);
public void updateBucket(Bucket bucket) {
String jobId = bucket.getJobId();
try {
String indexName = JobResultsPersister.getJobIndexName(jobId);
logger.trace("[{}] ES API CALL: index type {} to index {} with ID {}", jobId, Bucket.TYPE, indexName, bucket.getId());
client.prepareIndex(indexName, Bucket.TYPE.getPreferredName(), bucket.getId())
.setSource(jobResultsPersister.serialiseWithJobId(Bucket.TYPE.getPreferredName(), bucket)).execute().actionGet();
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error updating bucket state", new Object[]{jobId}, e));
return;
}
// If the update to the bucket was successful, also update the
// standalone copies of the nested bucket influencers
try {
jobResultsPersister.persistBucketInfluencersStandalone(bucket.getJobId(), bucket.getId(), bucket.getBucketInfluencers(),
bucket.getTimestamp(), bucket.isInterim());
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error updating standalone bucket influencer state", new Object[]{jobId}, e));
return;
}
jobResultsPersister.persistPerPartitionMaxProbabilities(bucket);
}
/**
@ -36,11 +75,41 @@ public interface JobRenormaliser
* @param bucketId Id of the bucket to update
* @param records The new record values
*/
void updateRecords(String bucketId, List<AnomalyRecord> records);
public void updateRecords(String jobId, String bucketId, List<AnomalyRecord> records) {
try {
// Now bulk update the records within the bucket
BulkRequestBuilder bulkRequest = client.prepareBulk();
boolean addedAny = false;
for (AnomalyRecord record : records) {
String recordId = record.getId();
String indexName = JobResultsPersister.getJobIndexName(jobId);
logger.trace("[{}] ES BULK ACTION: update ID {} type {} in index {} using map of new values, for bucket {}",
jobId, recordId, AnomalyRecord.TYPE, indexName, bucketId);
bulkRequest.add(
client.prepareIndex(indexName, AnomalyRecord.TYPE.getPreferredName(), recordId)
.setSource(jobResultsPersister.serialiseWithJobId(AnomalyRecord.TYPE.getPreferredName(), record)));
addedAny = true;
}
if (addedAny) {
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, bulkRequest.numberOfActions());
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
logger.error("[{}] BulkResponse has errors: {}", jobId, bulkResponse.buildFailureMessage());
}
}
} catch (IOException | ElasticsearchException e) {
logger.error(new ParameterizedMessage("[{}] Error updating anomaly records", new Object[]{jobId}, e));
}
}
/**
* Update the influencer for a particular job
*/
void updateInfluencer(Influencer influencer);
public void updateInfluencer(Influencer influencer) {
jobResultsPersister.persistInfluencer(influencer);
}
}

View File

@ -5,66 +5,237 @@
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.ReservedFieldNames;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* Interface for classes that persist {@linkplain Bucket Buckets} and
* {@linkplain Quantiles Quantiles}
* Saves result Buckets and Quantiles to Elasticsearch<br>
*
* <b>Buckets</b> are written with the following structure:
* <h2>Bucket</h2> The results of each job are stored in buckets, this is the
* top level structure for the results. A bucket contains multiple anomaly
* records. The anomaly score of the bucket may not match the summed score of
* all the records as all the records may not have been outputted for the
* bucket.
* <h2>Anomaly Record</h2> In Elasticsearch records have a parent &lt;-&lt;
* child relationship with buckets and should only exist is relation to a parent
* bucket. Each record was generated by a detector which can be identified via
* the detectorIndex field.
* <h2>Detector</h2> The Job has a fixed number of detectors but there may not
* be output for every detector in each bucket. <br>
* <b>Quantiles</b> may contain model quantiles used in normalisation and are
* stored in documents of type {@link Quantiles#TYPE} <br>
* <h2>ModelSizeStats</h2> This is stored in a flat structure <br>
*
* @see org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchMappings
*/
public interface JobResultsPersister
{
public class JobResultsPersister extends AbstractComponent {
private final Client client;
public JobResultsPersister(Settings settings, Client client) {
super(settings);
this.client = client;
}
/**
* Persist the result bucket
*/
void persistBucket(Bucket bucket);
public void persistBucket(Bucket bucket) {
if (bucket.getRecords() == null) {
return;
}
String jobId = bucket.getJobId();
try {
XContentBuilder content = serialiseWithJobId(Bucket.TYPE.getPreferredName(), bucket);
String indexName = getJobIndexName(jobId);
logger.trace("[{}] ES API CALL: index type {} to index {} at epoch {}", jobId, Bucket.TYPE, indexName, bucket.getEpoch());
IndexResponse response = client.prepareIndex(indexName, Bucket.TYPE.getPreferredName())
.setSource(content)
.execute().actionGet();
bucket.setId(response.getId());
persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers(), bucket.getTimestamp(),
bucket.isInterim());
if (bucket.getInfluencers() != null && bucket.getInfluencers().isEmpty() == false) {
BulkRequestBuilder addInfluencersRequest = client.prepareBulk();
for (Influencer influencer : bucket.getInfluencers()) {
influencer.setTimestamp(bucket.getTimestamp());
influencer.setInterim(bucket.isInterim());
content = serialiseWithJobId(Influencer.TYPE.getPreferredName(), influencer);
logger.trace("[{}] ES BULK ACTION: index type {} to index {} with auto-generated ID",
jobId, Influencer.TYPE, indexName);
addInfluencersRequest.add(client.prepareIndex(indexName, Influencer.TYPE.getPreferredName())
.setSource(content));
}
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addInfluencersRequest.numberOfActions());
BulkResponse addInfluencersResponse = addInfluencersRequest.execute().actionGet();
if (addInfluencersResponse.hasFailures()) {
logger.error("[{}] Bulk index of Influencers has errors: {}", jobId, addInfluencersResponse.buildFailureMessage());
}
}
if (bucket.getRecords().isEmpty() == false) {
BulkRequestBuilder addRecordsRequest = client.prepareBulk();
for (AnomalyRecord record : bucket.getRecords()) {
record.setTimestamp(bucket.getTimestamp());
content = serialiseWithJobId(AnomalyRecord.TYPE.getPreferredName(), record);
logger.trace("[{}] ES BULK ACTION: index type {} to index {} with auto-generated ID, for bucket {}",
jobId, AnomalyRecord.TYPE, indexName, bucket.getId());
addRecordsRequest.add(client.prepareIndex(indexName, AnomalyRecord.TYPE.getPreferredName())
.setSource(content));
}
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addRecordsRequest.numberOfActions());
BulkResponse addRecordsResponse = addRecordsRequest.execute().actionGet();
if (addRecordsResponse.hasFailures()) {
logger.error("[{}] Bulk index of AnomalyRecord has errors: {}", jobId, addRecordsResponse.buildFailureMessage());
}
}
persistPerPartitionMaxProbabilities(bucket);
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error writing bucket state", new Object[] {jobId}, e));
}
}
/**
* Persist the category definition
* @param category The category to be persisted
*/
void persistCategoryDefinition(CategoryDefinition category);
public void persistCategoryDefinition(CategoryDefinition category) {
Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE::getPreferredName,
() -> String.valueOf(category.getCategoryId()), () -> serialiseCategoryDefinition(category));
persistable.persist();
// Don't commit as we expect masses of these updates and they're not
// read again by this process
}
/**
* Persist the quantiles
*/
void persistQuantiles(Quantiles quantiles);
public void persistQuantiles(Quantiles quantiles) {
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE::getPreferredName,
() -> Quantiles.QUANTILES_ID, () -> serialiseWithJobId(Quantiles.TYPE.getPreferredName(), quantiles));
if (persistable.persist()) {
// Refresh the index when persisting quantiles so that previously
// persisted results will be available for searching. Do this using the
// indices API rather than the index API (used to write the quantiles
// above), because this will refresh all shards rather than just the
// shard that the quantiles document itself was written to.
commitWrites(quantiles.getJobId());
}
}
/**
* Persist a model snapshot description
*/
void persistModelSnapshot(ModelSnapshot modelSnapshot);
public void persistModelSnapshot(ModelSnapshot modelSnapshot) {
Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE::getPreferredName,
modelSnapshot::getSnapshotId, () -> serialiseWithJobId(ModelSnapshot.TYPE.getPreferredName(), modelSnapshot));
persistable.persist();
}
/**
* Persist the memory usage data
*/
void persistModelSizeStats(ModelSizeStats modelSizeStats);
public void persistModelSizeStats(ModelSizeStats modelSizeStats) {
String jobId = modelSizeStats.getJobId();
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
Persistable persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, ModelSizeStats.TYPE::getPreferredName,
() -> jobId, () -> serialiseWithJobId(ModelSizeStats.TYPE.getPreferredName(), modelSizeStats));
persistable.persist();
persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, ModelSizeStats.TYPE::getPreferredName,
() -> null, () -> serialiseWithJobId(ModelSizeStats.TYPE.getPreferredName(), modelSizeStats));
persistable.persist();
// Don't commit as we expect masses of these updates and they're only
// for information at the API level
}
/**
* Persist model debug output
*/
void persistModelDebugOutput(ModelDebugOutput modelDebugOutput);
public void persistModelDebugOutput(ModelDebugOutput modelDebugOutput) {
Persistable persistable = new Persistable(modelDebugOutput.getJobId(), modelDebugOutput, ModelDebugOutput.TYPE::getPreferredName,
() -> null, () -> serialiseWithJobId(ModelDebugOutput.TYPE.getPreferredName(), modelDebugOutput));
persistable.persist();
// Don't commit as we expect masses of these updates and they're not
// read again by this process
}
/**
* Persist the influencer
*/
void persistInfluencer(Influencer influencer);
public void persistInfluencer(Influencer influencer) {
Persistable persistable = new Persistable(influencer.getJobId(), influencer, Influencer.TYPE::getPreferredName,
influencer::getId, () -> serialiseWithJobId(Influencer.TYPE.getPreferredName(), influencer));
persistable.persist();
// Don't commit as we expect masses of these updates and they're not
// read again by this process
}
/**
* Persist state sent from the native process
*/
void persistBulkState(BytesReference bytesRef);
public void persistBulkState(String jobId, BytesReference bytesRef) {
try {
// No validation - assume the native process has formatted the state correctly
byte[] bytes = bytesRef.toBytesRef().bytes;
logger.trace("[{}] ES API CALL: bulk index", jobId);
client.prepareBulk()
.add(bytes, 0, bytes.length)
.execute().actionGet();
} catch (Exception e) {
logger.error((org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("[{}] Error persisting bulk state", jobId), e);
}
}
/**
* Delete any existing interim results
*/
void deleteInterimResults();
public void deleteInterimResults(String jobId) {
ElasticsearchBulkDeleter deleter = new ElasticsearchBulkDeleter(client, jobId, true);
deleter.deleteInterimResults();
deleter.commit(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
// don't care?
}
@Override
public void onFailure(Exception e) {
// don't care?
}
});
}
/**
* Once all the job data has been written this function will be
@ -73,5 +244,149 @@ public interface JobResultsPersister
*
* @return True if successful
*/
boolean commitWrites();
public boolean commitWrites(String jobId) {
String indexName = getJobIndexName(jobId);
// Refresh should wait for Lucene to make the data searchable
logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName);
client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
return true;
}
XContentBuilder serialiseWithJobId(String objField, ToXContent obj) throws IOException {
XContentBuilder builder = jsonBuilder();
obj.toXContent(builder, ToXContent.EMPTY_PARAMS);
return builder;
}
private XContentBuilder serialiseCategoryDefinition(CategoryDefinition categoryDefinition) throws IOException {
XContentBuilder builder = jsonBuilder();
categoryDefinition.toXContent(builder, ToXContent.EMPTY_PARAMS);
return builder;
}
void persistBucketInfluencersStandalone(String jobId, String bucketId, List<BucketInfluencer> bucketInfluencers,
Date bucketTime, boolean isInterim) throws IOException {
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
BulkRequestBuilder addBucketInfluencersRequest = client.prepareBulk();
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
XContentBuilder content = serialiseBucketInfluencerStandalone(bucketInfluencer, bucketTime, isInterim);
// Need consistent IDs to ensure overwriting on renormalisation
String id = bucketId + bucketInfluencer.getInfluencerFieldName();
String indexName = getJobIndexName(jobId);
logger.trace("[{}] ES BULK ACTION: index type {} to index {} with ID {}", jobId, BucketInfluencer.TYPE, indexName, id);
addBucketInfluencersRequest.add(
client.prepareIndex(indexName, BucketInfluencer.TYPE.getPreferredName(), id)
.setSource(content));
}
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addBucketInfluencersRequest.numberOfActions());
BulkResponse addBucketInfluencersResponse = addBucketInfluencersRequest.execute().actionGet();
if (addBucketInfluencersResponse.hasFailures()) {
logger.error("[{}] Bulk index of Bucket Influencers has errors: {}", jobId,
addBucketInfluencersResponse.buildFailureMessage());
}
}
}
private XContentBuilder serialiseBucketInfluencerStandalone(BucketInfluencer bucketInfluencer,
Date bucketTime, boolean isInterim) throws IOException {
BucketInfluencer influencer = new BucketInfluencer(bucketInfluencer);
influencer.setIsInterim(isInterim);
influencer.setTimestamp(bucketTime);
XContentBuilder builder = jsonBuilder();
influencer.toXContent(builder, ToXContent.EMPTY_PARAMS);
return builder;
}
void persistPerPartitionMaxProbabilities(Bucket bucket) {
String jobId = bucket.getJobId();
if (bucket.getPerPartitionMaxProbability().isEmpty()) {
return;
}
try {
XContentBuilder builder = jsonBuilder();
builder.startObject()
.field(ElasticsearchMappings.ES_TIMESTAMP, bucket.getTimestamp())
.field(Job.ID.getPreferredName(), bucket.getJobId());
builder.startArray(ReservedFieldNames.PARTITION_NORMALIZED_PROBS);
for (Map.Entry<String, Double> entry : bucket.getPerPartitionMaxProbability().entrySet()) {
builder.startObject()
.field(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), entry.getKey())
.field(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName(), entry.getValue())
.endObject();
}
builder.endArray().endObject();
String indexName = getJobIndexName(jobId);
logger.trace("[{}] ES API CALL: index type {} to index {} at epoch {}",
jobId, ReservedFieldNames.PARTITION_NORMALIZED_PROB_TYPE, indexName, bucket.getEpoch());
client.prepareIndex(indexName, ReservedFieldNames.PARTITION_NORMALIZED_PROB_TYPE)
.setSource(builder)
.setId(bucket.getId())
.execute().actionGet();
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] error updating bucket per partition max normalized scores",
new Object[]{jobId}, e));
}
}
private static final String INDEX_PREFIX = "prelertresults-";
public static String getJobIndexName(String jobId) {
return INDEX_PREFIX + jobId;
}
private class Persistable {
private final String jobId;
private final Object object;
private final Supplier<String> typeSupplier;
private final Supplier<String> idSupplier;
private final Serialiser serialiser;
Persistable(String jobId, Object object, Supplier<String> typeSupplier, Supplier<String> idSupplier,
Serialiser serialiser) {
this.jobId = jobId;
this.object = object;
this.typeSupplier = typeSupplier;
this.idSupplier = idSupplier;
this.serialiser = serialiser;
}
boolean persist() {
String type = typeSupplier.get();
String id = idSupplier.get();
if (object == null) {
logger.warn("[{}] No {} to persist for job ", jobId, type);
return false;
}
logCall(type, id);
try {
String indexName = getJobIndexName(jobId);
client.prepareIndex(indexName, type, idSupplier.get())
.setSource(serialiser.serialise())
.execute().actionGet();
return true;
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, typeSupplier.get()}, e));
return false;
}
}
private void logCall(String type, String id) {
String indexName = getJobIndexName(jobId);
if (id != null) {
logger.trace("[{}] ES API CALL: index type {} to index {} with ID {}", jobId, type, indexName, id);
} else {
logger.trace("[{}] ES API CALL: index type {} to index {} with auto-generated ID", jobId, type, indexName);
}
}
}
private interface Serialiser {
XContentBuilder serialise() throws IOException;
}
}

View File

@ -5,13 +5,83 @@
*/
package org.elasticsearch.xpack.prelert.job.persistence;
/**
* Interface for classes that persist usage information
*/
public interface UsagePersister
{
/**
* Persist the usage info.
*/
void persistUsage(String jobId, long bytesRead, long fieldsRead, long recordsRead);
import static org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider.PRELERT_USAGE_INDEX;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.prelert.job.usage.Usage;
public class UsagePersister extends AbstractComponent {
private static final String USAGE_DOC_ID_PREFIX = "usage-";
private final Client client;
private final DateTimeFormatter dateTimeFormatter;
private final Map<String, Object> upsertMap;
public UsagePersister(Settings settings, Client client) {
super(settings);
this.client = client;
dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXX", Locale.ROOT);
upsertMap = new HashMap<>();
upsertMap.put(ElasticsearchMappings.ES_TIMESTAMP, "");
upsertMap.put(Usage.INPUT_BYTES, null);
}
public void persistUsage(String jobId, long bytesRead, long fieldsRead, long recordsRead) {
ZonedDateTime nowTruncatedToHour = ZonedDateTime.now().truncatedTo(ChronoUnit.HOURS);
String formattedNowTruncatedToHour = nowTruncatedToHour.format(dateTimeFormatter);
String docId = USAGE_DOC_ID_PREFIX + formattedNowTruncatedToHour;
upsertMap.put(ElasticsearchMappings.ES_TIMESTAMP, formattedNowTruncatedToHour);
// update global count
updateDocument(jobId, PRELERT_USAGE_INDEX, docId, bytesRead, fieldsRead, recordsRead);
updateDocument(jobId, JobResultsPersister.getJobIndexName(jobId), docId, bytesRead,
fieldsRead, recordsRead);
}
/**
* Update the metering document in the given index/id.
* Uses a script to update the volume field and 'upsert'
* to create the doc if it doesn't exist.
*
* @param jobId The id of the job
* @param index the index to persist to
* @param id Doc id is also its timestamp
* @param additionalBytes Add this value to the running total
* @param additionalFields Add this value to the running total
* @param additionalRecords Add this value to the running total
*/
private void updateDocument(String jobId, String index, String id, long additionalBytes, long additionalFields,
long additionalRecords) {
upsertMap.put(Usage.INPUT_BYTES, additionalBytes);
upsertMap.put(Usage.INPUT_FIELD_COUNT, additionalFields);
upsertMap.put(Usage.INPUT_RECORD_COUNT, additionalRecords);
logger.trace("[{}] ES API CALL: upsert ID {} type {} in index {} by running painless script update-usage with " +
"arguments bytes={} fieldCount={} recordCount={}", jobId, id, Usage.TYPE, index, additionalBytes,
additionalFields, additionalRecords);
try {
ElasticsearchScripts.upsertViaScript(client, index, Usage.TYPE, id,
ElasticsearchScripts.newUpdateUsage(additionalBytes, additionalFields,
additionalRecords),
upsertMap);
} catch (VersionConflictEngineException e) {
logger.error(new ParameterizedMessage("[{}] Failed to update the Usage document [{}] in index [{}]",
new Object[]{jobId, id, index}, e));
}
}
}

View File

@ -6,7 +6,9 @@
package org.elasticsearch.xpack.prelert.job.process.autodetect;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.DataCounts;
@ -36,11 +38,12 @@ import java.util.function.Supplier;
public class AutodetectCommunicator implements Closeable {
private static final Logger LOGGER = Loggers.getLogger(AutodetectCommunicator.class);
private static final int DEFAULT_TRY_COUNT = 5;
private static final int DEFAULT_TRY_TIMEOUT_SECS = 6;
private final String jobId;
private final Logger jobLogger;
private final StatusReporter statusReporter;
private final AutodetectProcess autodetectProcess;
private final DataToProcessWriter autoDetectWriter;
@ -48,19 +51,17 @@ public class AutodetectCommunicator implements Closeable {
final AtomicBoolean inUse = new AtomicBoolean(false);
public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process, Logger jobLogger,
StatusReporter statusReporter, AutoDetectResultProcessor autoDetectResultProcessor,
StateProcessor stateProcessor) {
public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process, StatusReporter statusReporter,
AutoDetectResultProcessor autoDetectResultProcessor, StateProcessor stateProcessor) {
this.jobId = job.getJobId();
this.autodetectProcess = process;
this.jobLogger = jobLogger;
this.statusReporter = statusReporter;
this.autoDetectResultProcessor = autoDetectResultProcessor;
AnalysisConfig analysisConfig = job.getAnalysisConfig();
boolean usePerPartitionNormalization = analysisConfig.getUsePerPartitionNormalization();
autoDetectExecutor.execute(() ->
autoDetectResultProcessor.process(jobLogger, process.getProcessOutStream(), usePerPartitionNormalization)
autoDetectResultProcessor.process(jobId, process.getProcessOutStream(), usePerPartitionNormalization)
);
autoDetectExecutor.execute(() ->
stateProcessor.process(job.getId(), process.getPersistStream())
@ -70,7 +71,7 @@ public class AutodetectCommunicator implements Closeable {
private DataToProcessWriter createProcessWriter(Job job, AutodetectProcess process, StatusReporter statusReporter) {
return DataToProcessWriterFactory.create(true, process, job.getDataDescription(), job.getAnalysisConfig(),
job.getSchedulerConfig(), new TransformConfigs(job.getTransforms()) , statusReporter, jobLogger);
job.getSchedulerConfig(), new TransformConfigs(job.getTransforms()) , statusReporter, LOGGER);
}
public DataCounts writeToJob(InputStream inputStream, DataLoadParams params) throws IOException {
@ -117,17 +118,18 @@ public class AutodetectCommunicator implements Closeable {
while (isFlushComplete == false && --tryCountCounter >= 0) {
// Check there wasn't an error in the flush
if (!autodetectProcess.isProcessAlive()) {
String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_UNEXPTECTED_DEATH) + " " + autodetectProcess.readError();
jobLogger.error(msg);
String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_UNEXPTECTED_DEATH, jobId) +
" " + autodetectProcess.readError();
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg);
}
isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, intermittentTimeout);
jobLogger.info("isFlushComplete={}", isFlushComplete);
LOGGER.info("[{}] isFlushComplete={}", jobId, isFlushComplete);
}
if (!isFlushComplete) {
String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_TIMEOUT) + " " + autodetectProcess.readError();
jobLogger.error(msg);
String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_TIMEOUT, jobId) + " " + autodetectProcess.readError();
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg);
}
@ -143,9 +145,10 @@ public class AutodetectCommunicator implements Closeable {
*/
private void checkProcessIsAlive() {
if (!autodetectProcess.isProcessAlive()) {
String errorMsg = "Unexpected death of autodetect: " + autodetectProcess.readError();
jobLogger.error(errorMsg);
throw ExceptionsHelper.serverError(errorMsg);
ParameterizedMessage message =
new ParameterizedMessage("[{}] Unexpected death of autodetect: {}", jobId, autodetectProcess.readError());
LOGGER.error(message);
throw ExceptionsHelper.serverError(message.getFormattedMessage());
}
}

View File

@ -6,6 +6,9 @@
package org.elasticsearch.xpack.prelert.job.process.autodetect.output;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
@ -22,7 +25,6 @@ import java.time.Duration;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* A runnable class that reads the autodetect process output
@ -34,11 +36,13 @@ import java.util.concurrent.TimeUnit;
*/
public class AutoDetectResultProcessor {
private static final Logger LOGGER = Loggers.getLogger(AutoDetectResultProcessor.class);
private final Renormaliser renormaliser;
private final JobResultsPersister persister;
private final AutodetectResultsParser parser;
private final CountDownLatch completionLatch = new CountDownLatch(1);
final CountDownLatch completionLatch = new CountDownLatch(1);
private final FlushListener flushListener;
private volatile ModelSizeStats latestModelSizeStats;
@ -58,24 +62,24 @@ public class AutoDetectResultProcessor {
this.flushListener = flushListener;
}
public void process(Logger jobLogger, InputStream in, boolean isPerPartitionNormalisation) {
public void process(String jobId, InputStream in, boolean isPerPartitionNormalisation) {
try (CloseableIterator<AutodetectResult> iterator = parser.parseResults(in)) {
int bucketCount = 0;
Context context = new Context(jobLogger, isPerPartitionNormalisation);
Context context = new Context(jobId, isPerPartitionNormalisation);
while (iterator.hasNext()) {
AutodetectResult result = iterator.next();
processResult(context, result);
bucketCount++;
jobLogger.trace("Bucket number {} parsed from output", bucketCount);
LOGGER.trace("[{}]Bucket number {} parsed from output", jobId, bucketCount);
}
jobLogger.info(bucketCount + " buckets parsed from autodetect output - about to refresh indexes");
jobLogger.info("Parse results Complete");
LOGGER.info("[{}] {} buckets parsed from autodetect output - about to refresh indexes", jobId, bucketCount);
LOGGER.info("[{}] Parse results Complete", jobId);
} catch (Exception e) {
jobLogger.info("Error parsing autodetect output", e);
LOGGER.error((Supplier<?>) () -> new ParameterizedMessage("[{}] error parsing autodetect output", new Object[] {jobId}, e));
} finally {
completionLatch.countDown();
flushListener.clear();
renormaliser.shutdown(jobLogger);
renormaliser.shutdown();
}
}
@ -88,14 +92,9 @@ public class AutoDetectResultProcessor {
// these are generated by a Flush command, and will
// be replaced or
// superseded by new results
context.jobLogger.trace("Deleting interim results");
// NOCOMMIT: This feels like an odd side-effect to
// have in a parser,
// especially since it has to wire up to
// actionlisteners. Feels like it should
// be refactored out somewhere, after parsing?
persister.deleteInterimResults();
LOGGER.trace("[{}] Deleting interim results", context.jobId);
// TODO: Is this the right place to delete results?
persister.deleteInterimResults(context.jobId);
context.deleteInterimRequired = false;
}
if (context.isPerPartitionNormalization) {
@ -113,10 +112,10 @@ public class AutoDetectResultProcessor {
}
ModelSizeStats modelSizeStats = result.getModelSizeStats();
if (modelSizeStats != null) {
context.jobLogger.trace(String.format(Locale.ROOT, "Parsed ModelSizeStats: %d / %d / %d / %d / %d / %s",
modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(), modelSizeStats.getTotalOverFieldCount(),
modelSizeStats.getTotalPartitionFieldCount(), modelSizeStats.getBucketAllocationFailuresCount(),
modelSizeStats.getMemoryStatus()));
LOGGER.trace(String.format(Locale.ROOT, "[%s] Parsed ModelSizeStats: %d / %d / %d / %d / %d / %s",
context.jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(),
modelSizeStats.getTotalOverFieldCount(), modelSizeStats.getTotalPartitionFieldCount(),
modelSizeStats.getBucketAllocationFailuresCount(), modelSizeStats.getMemoryStatus()));
latestModelSizeStats = modelSizeStats;
persister.persistModelSizeStats(modelSizeStats);
@ -129,20 +128,20 @@ public class AutoDetectResultProcessor {
if (quantiles != null) {
persister.persistQuantiles(quantiles);
context.jobLogger.debug("Quantiles parsed from output - will " + "trigger renormalisation of scores");
LOGGER.debug("[{}] Quantiles parsed from output - will " + "trigger renormalisation of scores", context.jobId);
if (context.isPerPartitionNormalization) {
renormaliser.renormaliseWithPartition(quantiles, context.jobLogger);
renormaliser.renormaliseWithPartition(quantiles);
} else {
renormaliser.renormalise(quantiles, context.jobLogger);
renormaliser.renormalise(quantiles);
}
}
FlushAcknowledgement flushAcknowledgement = result.getFlushAcknowledgement();
if (flushAcknowledgement != null) {
context.jobLogger.debug("Flush acknowledgement parsed from output for ID " + flushAcknowledgement.getId());
LOGGER.debug("[{}] Flush acknowledgement parsed from output for ID {}", context.jobId, flushAcknowledgement.getId());
// Commit previous writes here, effectively continuing
// the flush from the C++ autodetect process right
// through to the data store
persister.commitWrites();
persister.commitWrites(context.jobId);
flushListener.acknowledgeFlush(flushAcknowledgement.getId());
// Interim results may have been produced by the flush,
// which need to be
@ -177,13 +176,13 @@ public class AutoDetectResultProcessor {
static class Context {
private final Logger jobLogger;
private final String jobId;
private final boolean isPerPartitionNormalization;
boolean deleteInterimRequired;
Context(Logger jobLogger, boolean isPerPartitionNormalization) {
this.jobLogger = jobLogger;
Context(String jobId, boolean isPerPartitionNormalization) {
this.jobId = jobId;
this.isPerPartitionNormalization = isPerPartitionNormalization;
this.deleteInterimRequired = true;
}

View File

@ -39,7 +39,7 @@ public class StateProcessor extends AbstractComponent {
} else {
bytesRef = new CompositeBytesReference(bytesRef, new BytesArray(readBuf, 0, bytesRead));
}
bytesRef = splitAndPersist(bytesRef);
bytesRef = splitAndPersist(jobId, bytesRef);
readBuf = new byte[READ_BUF_SIZE];
}
} catch (IOException e) {
@ -53,7 +53,7 @@ public class StateProcessor extends AbstractComponent {
* data is expected to be a series of Elasticsearch bulk requests in UTF-8 JSON
* (as would be uploaded to the public REST API) separated by zero bytes ('\0').
*/
private BytesReference splitAndPersist(BytesReference bytesRef) {
private BytesReference splitAndPersist(String jobId, BytesReference bytesRef) {
int from = 0;
while (true) {
int nextZeroByte = findNextZeroByte(bytesRef, from);
@ -61,7 +61,7 @@ public class StateProcessor extends AbstractComponent {
// No more zero bytes in this block
break;
}
persister.persistBulkState(bytesRef.slice(from, nextZeroByte - from));
persister.persistBulkState(jobId, bytesRef.slice(from, nextZeroByte - from));
from = nextZeroByte + 1;
}
return bytesRef.slice(from, bytesRef.length() - from);

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
public interface Renormaliser {
@ -13,14 +12,14 @@ public interface Renormaliser {
* Update the anomaly score field on all previously persisted buckets
* and all contained records
*/
void renormalise(Quantiles quantiles, Logger logger);
void renormalise(Quantiles quantiles);
/**
* Update the anomaly score field on all previously persisted buckets
* and all contained records and aggregate records to the partition
* level
*/
void renormaliseWithPartition(Quantiles quantiles, Logger logger);
void renormaliseWithPartition(Quantiles quantiles);
/**
@ -31,5 +30,5 @@ public interface Renormaliser {
/**
* Shut down the renormaliser
*/
boolean shutdown(Logger logger);
boolean shutdown();
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer.noop;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormaliser;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
@ -16,12 +15,12 @@ import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
public class NoOpRenormaliser implements Renormaliser {
// NORELEASE Remove once the normaliser code is ported
@Override
public void renormalise(Quantiles quantiles, Logger logger) {
public void renormalise(Quantiles quantiles) {
}
@Override
public void renormaliseWithPartition(Quantiles quantiles, Logger logger) {
public void renormaliseWithPartition(Quantiles quantiles) {
}
@ -31,7 +30,7 @@ public class NoOpRenormaliser implements Renormaliser {
}
@Override
public boolean shutdown(Logger logger) {
public boolean shutdown() {
return true;
}
}

View File

@ -5,11 +5,10 @@
*/
package org.elasticsearch.xpack.prelert.job.status;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.usage.UsageReporter;
@ -25,7 +24,7 @@ import java.util.Locale;
* returns true then the count will be logged and the counts persisted
* via the {@linkplain JobDataCountsPersister}.
*/
public class StatusReporter {
public class StatusReporter extends AbstractComponent {
/**
* The max percentage of date parse errors allowed before
* an exception is thrown.
@ -43,7 +42,6 @@ public class StatusReporter {
private final String jobId;
private final UsageReporter usageReporter;
private final JobDataCountsPersister dataCountsPersister;
private final Logger logger;
private final DataCounts totalRecordStats;
private volatile DataCounts incrementalRecordStats;
@ -58,22 +56,22 @@ public class StatusReporter {
private final int acceptablePercentDateParseErrors;
private final int acceptablePercentOutOfOrderErrors;
public StatusReporter(Environment env, Settings settings, String jobId, UsageReporter usageReporter,
JobDataCountsPersister dataCountsPersister, Logger logger, long bucketSpan) {
this(env, settings, jobId, usageReporter, dataCountsPersister, logger, new DataCounts(jobId), bucketSpan);
public StatusReporter(Settings settings, String jobId, UsageReporter usageReporter,
JobDataCountsPersister dataCountsPersister) {
this(settings, jobId, usageReporter, dataCountsPersister, new DataCounts(jobId));
}
public StatusReporter(Environment env, Settings settings, String jobId, DataCounts counts, UsageReporter usageReporter,
JobDataCountsPersister dataCountsPersister, Logger logger, long bucketSpan) {
this(env, settings, jobId, usageReporter, dataCountsPersister, logger, new DataCounts(counts), bucketSpan);
public StatusReporter(Settings settings, String jobId, DataCounts counts, UsageReporter usageReporter,
JobDataCountsPersister dataCountsPersister) {
this(settings, jobId, usageReporter, dataCountsPersister, new DataCounts(counts));
}
private StatusReporter(Environment env, Settings settings, String jobId, UsageReporter usageReporter,
JobDataCountsPersister dataCountsPersister, Logger logger, DataCounts totalCounts, long bucketSpan) {
private StatusReporter(Settings settings, String jobId, UsageReporter usageReporter, JobDataCountsPersister dataCountsPersister,
DataCounts totalCounts) {
super(settings);
this.jobId = jobId;
this.usageReporter = usageReporter;
this.dataCountsPersister = dataCountsPersister;
this.logger = logger;
totalRecordStats = totalCounts;
incrementalRecordStats = new DataCounts(jobId);

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.prelert.job.usage;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
@ -19,11 +20,11 @@ import java.util.Locale;
* The main difference betweeen this and the {@linkplain org.elasticsearch.xpack.prelert.job.status.StatusReporter}
* is that this writes hourly reports i.e. how much data was read in an hour
*/
public class UsageReporter {
public class UsageReporter extends AbstractComponent {
public static final Setting<Long> UPDATE_INTERVAL_SETTING = Setting.longSetting("usage.update.interval", 300, 0, Property.NodeScope);
private final String jobId;
private final Logger logger;
private long bytesReadSinceLastReport;
private long fieldsReadSinceLastReport;
@ -34,15 +35,14 @@ public class UsageReporter {
private final UsagePersister persister;
public UsageReporter(Settings settings, String jobId, UsagePersister persister, Logger logger) {
public UsageReporter(Settings settings, String jobId, UsagePersister persister) {
super(settings);
bytesReadSinceLastReport = 0;
fieldsReadSinceLastReport = 0;
recordsReadSinceLastReport = 0;
this.jobId = jobId;
this.persister = persister;
this.logger = logger;
lastUpdateTimeMs = System.currentTimeMillis();
long interval = UPDATE_INTERVAL_SETTING.get(settings);

View File

@ -1,8 +1,8 @@
# Prelert Engine API messages
autodetect.flush.timeout = Timed out flushing job.
autodetect.flush.failed.unexpected.death = Flush failed: Unexpected death of the Autodetect process flushing job.
autodetect.flush.timeout =[{0}] Timed out flushing job.
autodetect.flush.failed.unexpected.death =[{0}] Flush failed: Unexpected death of the Autodetect process flushing job.
cpu.limit.jobs = Cannot start job with id ''{0}''. The maximum number of concurrently running jobs is limited as a function of the number of CPU cores see this error code''s help documentation for details of how to elevate the setting

View File

@ -28,7 +28,7 @@ import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.junit.After;
import java.io.IOException;
@ -175,7 +175,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
}
private DataCounts getDataCounts(String jobId) {
GetResponse getResponse = client().prepareGet(ElasticsearchPersister.getJobIndexName(jobId),
GetResponse getResponse = client().prepareGet(JobResultsPersister.getJobIndexName(jobId),
DataCounts.TYPE.getPreferredName(), jobId + "-data-counts").get();
if (getResponse.isExists() == false) {
return new DataCounts("_job_id");

View File

@ -5,9 +5,6 @@
*/
package org.elasticsearch.xpack.prelert.job.logging;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayInputStream;
@ -39,11 +36,7 @@ public class CppLogMessageHandlerTests extends ESTestCase {
+ "\"message\":\"Prelert controller exiting\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":147}\n";
InputStream is = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
Logger logger = Loggers.getLogger(CppLogMessageHandlerTests.class);
Loggers.setLevel(logger, Level.DEBUG);
try (CppLogMessageHandler handler = new CppLogMessageHandler(is, logger, 100, 3)) {
try (CppLogMessageHandler handler = new CppLogMessageHandler(is, "_id", 100, 3)) {
handler.tailStream();
assertTrue(handler.hasLogStreamEnded());

View File

@ -9,7 +9,6 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
@ -19,7 +18,9 @@ import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectCommunicator;
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcessFactory;
@ -58,11 +59,15 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private JobManager jobManager;
private JobProvider jobProvider;
private JobResultsPersister jobResultsPersister;
private JobDataCountsPersister jobDataCountsPersister;
@Before
public void initMocks() {
jobManager = Mockito.mock(JobManager.class);
jobProvider = Mockito.mock(JobProvider.class);
jobManager = mock(JobManager.class);
jobProvider = mock(JobProvider.class);
jobResultsPersister = mock(JobResultsPersister.class);
jobDataCountsPersister = mock(JobDataCountsPersister.class);
givenAllocationWithStatus(JobStatus.CLOSED);
}
@ -184,7 +189,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
public void testCreate_notEnoughThreads() throws IOException {
Client client = mock(Client.class);
Environment environment = mock(Environment.class);
ThreadPool threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class);
doThrow(new EsRejectedExecutionException("")).when(executorService).execute(any());
@ -195,8 +199,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
AutodetectProcessFactory autodetectProcessFactory = (j, i, e) -> autodetectProcess;
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, environment, threadPool,
jobManager, jobProvider, parser, autodetectProcessFactory);
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool,
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory);
expectThrows(EsRejectedExecutionException.class, () -> manager.create("_id", false));
verify(autodetectProcess, times(1)).close();
@ -210,12 +214,11 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private AutodetectProcessManager createManager(AutodetectCommunicator communicator) {
Client client = mock(Client.class);
Environment environment = mock(Environment.class);
ThreadPool threadPool = mock(ThreadPool.class);
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class);
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, environment, threadPool,
jobManager, jobProvider, parser, autodetectProcessFactory);
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager,
jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory);
manager = spy(manager);
doReturn(communicator).when(manager).create(any(), anyBoolean());
return manager;

View File

@ -17,14 +17,13 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.SchedulerState;
import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.junit.Before;
@ -217,9 +216,8 @@ public class JobManagerTests extends ESTestCase {
private JobManager createJobManager() {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = new Environment(
settings);
return new JobManager(env, settings, jobProvider, jobDataCountsPersister, clusterService);
JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class);
return new JobManager(settings, jobProvider, jobResultsPersister, jobDataCountsPersister, clusterService);
}
private ClusterState createClusterState() {

View File

@ -20,6 +20,7 @@ import java.util.Map;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.test.ESTestCase;
@ -39,7 +40,7 @@ public class ElasticsearchUsagePersisterTests extends ESTestCase {
when(client.prepareUpdate(anyString(), anyString(), anyString())).thenReturn(
updateRequestBuilder);
ElasticsearchUsagePersister persister = new ElasticsearchUsagePersister(client, logger);
UsagePersister persister = new UsagePersister(Settings.EMPTY, client);
persister.persistUsage("job1", 10L, 30L, 1L);

View File

@ -11,12 +11,13 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
import org.mockito.ArgumentCaptor;
@ -27,16 +28,16 @@ import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
public class ElasticsearchPersisterTests extends ESTestCase {
public class JobResultsPersisterTests extends ESTestCase {
private static final String CLUSTER_NAME = "myCluster";
private static final String JOB_ID = "testJobId";
private static final String JOB_ID = "foo";
public void testPersistBucket_NoRecords() {
Client client = mock(Client.class);
Bucket bucket = mock(Bucket.class);
when(bucket.getRecords()).thenReturn(null);
ElasticsearchPersister persister = new ElasticsearchPersister(JOB_ID, client);
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
persister.persistBucket(bucket);
verifyNoMoreInteractions(client);
}
@ -53,7 +54,7 @@ public class ElasticsearchPersisterTests extends ESTestCase {
.prepareBulk(response);
Client client = clientBuilder.build();
Bucket bucket = getBucket(1, 0);
Bucket bucket = getBucket(1);
bucket.setId(responseId);
bucket.setAnomalyScore(99.9);
bucket.setBucketSpan(123456);
@ -63,7 +64,7 @@ public class ElasticsearchPersisterTests extends ESTestCase {
bucket.setProcessingTimeMs(8888);
bucket.setRecordCount(1);
BucketInfluencer bi = new BucketInfluencer("foo");
BucketInfluencer bi = new BucketInfluencer(JOB_ID);
bi.setAnomalyScore(14.15);
bi.setInfluencerFieldName("biOne");
bi.setInitialAnomalyScore(18.12);
@ -77,7 +78,7 @@ public class ElasticsearchPersisterTests extends ESTestCase {
inf.setInitialAnomalyScore(55.5);
inf.setProbability(0.4);
inf.setTimestamp(bucket.getTimestamp());
bucket.setInfluencers(Arrays.asList(inf));
bucket.setInfluencers(Collections.singletonList(inf));
AnomalyRecord record = bucket.getRecords().get(0);
List<Double> actuals = new ArrayList<>();
@ -105,7 +106,7 @@ public class ElasticsearchPersisterTests extends ESTestCase {
typicals.add(998765.3);
record.setTypical(typicals);
ElasticsearchPersister persister = new ElasticsearchPersister(JOB_ID, client);
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
persister.persistBucket(bucket);
List<XContentBuilder> list = captor.getAllValues();
assertEquals(4, list.size());
@ -154,8 +155,8 @@ public class ElasticsearchPersisterTests extends ESTestCase {
assertTrue(s.matches(".*overFieldValue.:.overValue.*"));
}
private Bucket getBucket(int numRecords, int numInfluencers) {
Bucket b = new Bucket("foo");
private Bucket getBucket(int numRecords) {
Bucket b = new Bucket(JOB_ID);
b.setId("1");
b.setTimestamp(new Date());
List<AnomalyRecord> records = new ArrayList<>();

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.prelert.job.process.autodetect;
import org.apache.logging.log4j.core.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.test.ESTestCase;
@ -13,7 +12,6 @@ import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.prelert.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
@ -29,9 +27,6 @@ import java.util.Collections;
import java.util.concurrent.ExecutorService;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.elasticsearch.mock.orig.Mockito.never;
import static org.elasticsearch.mock.orig.Mockito.times;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
@ -77,7 +72,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class));
InterimResultsParams params = InterimResultsParams.builder().build();
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> communicator.flushJob(params));
assertEquals("Flush failed: Unexpected death of the Autodetect process flushing job. Mock process is dead", e.getMessage());
assertEquals("[foo] Flush failed: Unexpected death of the Autodetect process flushing job. Mock process is dead", e.getMessage());
}
public void testFlushJob_throwsOnTimeout() throws IOException {
@ -89,7 +84,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) {
InterimResultsParams params = InterimResultsParams.builder().build();
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> communicator.flushJob(params, 1, 1));
assertEquals("Timed out flushing job. Mock process has stalled", e.getMessage());
assertEquals("[foo] Timed out flushing job. Mock process has stalled", e.getMessage());
}
}
@ -132,11 +127,9 @@ public class AutodetectCommunicatorTests extends ESTestCase {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
Logger jobLogger = Mockito.mock(Logger.class);
JobResultsPersister resultsPersister = mock(JobResultsPersister.class);
StatusReporter statusReporter = mock(StatusReporter.class);
StateProcessor stateProcessor = mock(StateProcessor.class);
return new AutodetectCommunicator(executorService, createJobDetails(), autodetectProcess, jobLogger, statusReporter,
return new AutodetectCommunicator(executorService, createJobDetails(), autodetectProcess, statusReporter,
autoDetectResultProcessor, stateProcessor);
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.prelert.job.process.autodetect.output;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
@ -44,10 +43,9 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
Renormaliser renormaliser = mock(Renormaliser.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, parser);
Logger jobLogger = mock(Logger.class);
processor.process(jobLogger, mock(InputStream.class), randomBoolean());
verify(jobLogger, times(1)).info("1 buckets parsed from autodetect output - about to refresh indexes");
verify(renormaliser, times(1)).shutdown(jobLogger);
processor.process("_id", mock(InputStream.class), randomBoolean());
verify(renormaliser, times(1)).shutdown();
assertEquals(0, processor.completionLatch.getCount());
}
public void testProcessResult_bucket() {
@ -55,8 +53,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
Logger jobLogger = mock(Logger.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(jobLogger, false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class);
@ -64,7 +61,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result);
verify(persister, times(1)).persistBucket(bucket);
verify(persister, never()).deleteInterimResults();
verify(persister, never()).deleteInterimResults("_id");
verify(bucket, never()).calcMaxNormalizedProbabilityPerPartition();
verifyNoMoreInteractions(persister);
}
@ -74,8 +71,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
Logger jobLogger = mock(Logger.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(jobLogger, true);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", true);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class);
@ -83,7 +79,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result);
verify(persister, times(1)).persistBucket(bucket);
verify(persister, never()).deleteInterimResults();
verify(persister, never()).deleteInterimResults("_id");
verify(bucket, times(1)).calcMaxNormalizedProbabilityPerPartition();
verifyNoMoreInteractions(persister);
}
@ -93,15 +89,14 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
Logger jobLogger = mock(Logger.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(jobLogger, false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class);
when(result.getBucket()).thenReturn(bucket);
processor.processResult(context, result);
verify(persister, times(1)).persistBucket(bucket);
verify(persister, times(1)).deleteInterimResults();
verify(persister, times(1)).deleteInterimResults("_id");
verify(bucket, never()).calcMaxNormalizedProbabilityPerPartition();
verifyNoMoreInteractions(persister);
assertFalse(context.deleteInterimRequired);
@ -112,8 +107,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
Logger jobLogger = mock(Logger.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(jobLogger, false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
@ -130,8 +124,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
FlushListener flushListener = mock(FlushListener.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null, flushListener);
Logger jobLogger = mock(Logger.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(jobLogger, false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
@ -140,7 +133,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result);
verify(flushListener, times(1)).acknowledgeFlush("_id");
verify(persister, times(1)).commitWrites();
verify(persister, times(1)).commitWrites("_id");
verifyNoMoreInteractions(persister);
assertTrue(context.deleteInterimRequired);
}
@ -151,8 +144,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
FlushListener flushListener = mock(FlushListener.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null, flushListener);
Logger jobLogger = mock(Logger.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(jobLogger, false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
@ -165,7 +157,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result);
inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition);
inOrder.verify(persister, times(1)).commitWrites();
inOrder.verify(persister, times(1)).commitWrites("_id");
inOrder.verify(flushListener, times(1)).acknowledgeFlush("_id");
verifyNoMoreInteractions(persister);
assertTrue(context.deleteInterimRequired);
@ -176,8 +168,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
Logger jobLogger = mock(Logger.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(jobLogger, false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelDebugOutput modelDebugOutput = mock(ModelDebugOutput.class);
@ -193,8 +184,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
Logger jobLogger = mock(Logger.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(jobLogger, false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelSizeStats modelSizeStats = mock(ModelSizeStats.class);
@ -211,8 +201,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
Logger jobLogger = mock(Logger.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(jobLogger, false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelSnapshot modelSnapshot = mock(ModelSnapshot.class);
@ -228,8 +217,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
Logger jobLogger = mock(Logger.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(jobLogger, false);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", false);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class);
@ -237,7 +225,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result);
verify(persister, times(1)).persistQuantiles(quantiles);
verify(renormaliser, times(1)).renormalise(quantiles, jobLogger);
verify(renormaliser, times(1)).renormalise(quantiles);
verifyNoMoreInteractions(persister);
verifyNoMoreInteractions(renormaliser);
}
@ -247,8 +235,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
Logger jobLogger = mock(Logger.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(jobLogger, true);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("_id", true);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class);
@ -256,7 +243,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result);
verify(persister, times(1)).persistQuantiles(quantiles);
verify(renormaliser, times(1)).renormaliseWithPartition(quantiles, jobLogger);
verify(renormaliser, times(1)).renormaliseWithPartition(quantiles);
verifyNoMoreInteractions(persister);
verifyNoMoreInteractions(renormaliser);
}

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.prelert.job.process.autodetect.output;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@ -18,6 +17,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -40,12 +40,12 @@ public class StateProcessorTests extends ESTestCase {
ByteArrayInputStream stream = new ByteArrayInputStream(STATE_SAMPLE.getBytes(StandardCharsets.UTF_8));
ArgumentCaptor<BytesReference> bytesRefCaptor = ArgumentCaptor.forClass(BytesReference.class);
JobResultsPersister persister = Mockito.mock(ElasticsearchPersister.class);
JobResultsPersister persister = Mockito.mock(JobResultsPersister.class);
StateProcessor stateParser = new StateProcessor(Settings.EMPTY, persister);
stateParser.process("_id", stream);
verify(persister, times(3)).persistBulkState(bytesRefCaptor.capture());
verify(persister, times(3)).persistBulkState(eq("_id"), bytesRefCaptor.capture());
String[] threeStates = STATE_SAMPLE.split("\0");
List<BytesReference> capturedBytes = bytesRefCaptor.getAllValues();

View File

@ -10,9 +10,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.persistence.UsagePersister;
import org.junit.Assert;
@ -22,20 +20,14 @@ import org.elasticsearch.xpack.prelert.job.usage.UsageReporter;
public class CountingInputStreamTests extends ESTestCase {
public void testRead_OneByteAtATime() throws IOException {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = new Environment(
settings);
UsageReporter usageReporter = new UsageReporter(settings, "foo", Mockito.mock(UsagePersister.class), Mockito.mock(Logger.class));
DummyStatusReporter statusReporter = new DummyStatusReporter(env, usageReporter);
UsageReporter usageReporter = new UsageReporter(Settings.EMPTY, "foo", Mockito.mock(UsagePersister.class));
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
final String TEXT = "123";
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));
try (CountingInputStream counting = new CountingInputStream(source,
statusReporter)) {
while (counting.read() >= 0) {
;
}
try (CountingInputStream counting = new CountingInputStream(source, statusReporter)) {
while (counting.read() >= 0) {}
// an extra byte is read because we don't check the return
// value of the read() method
Assert.assertEquals(TEXT.length() + 1, usageReporter.getBytesReadSinceLastReport());
@ -46,23 +38,17 @@ public class CountingInputStreamTests extends ESTestCase {
}
public void testRead_WithBuffer() throws IOException {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = new Environment(
settings);
final String TEXT = "To the man who only has a hammer,"
+ " everything he encounters begins to look like a nail.";
UsageReporter usageReporter = new UsageReporter(settings, "foo", Mockito.mock(UsagePersister.class), Mockito.mock(Logger.class));
DummyStatusReporter statusReporter = new DummyStatusReporter(env, usageReporter);
UsageReporter usageReporter = new UsageReporter(Settings.EMPTY, "foo", Mockito.mock(UsagePersister.class));
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));
try (CountingInputStream counting = new CountingInputStream(source,
statusReporter)) {
try (CountingInputStream counting = new CountingInputStream(source, statusReporter)) {
byte buf[] = new byte[256];
while (counting.read(buf) >= 0) {
;
}
while (counting.read(buf) >= 0) {}
// one less byte is reported because we don't check
// the return value of the read() method
Assert.assertEquals(TEXT.length() - 1, usageReporter.getBytesReadSinceLastReport());
@ -73,23 +59,17 @@ public class CountingInputStreamTests extends ESTestCase {
}
public void testRead_WithTinyBuffer() throws IOException {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = new Environment(
settings);
final String TEXT = "To the man who only has a hammer,"
+ " everything he encounters begins to look like a nail.";
UsageReporter usageReporter = new UsageReporter(settings, "foo", Mockito.mock(UsagePersister.class), Mockito.mock(Logger.class));
DummyStatusReporter statusReporter = new DummyStatusReporter(env, usageReporter);
UsageReporter usageReporter = new UsageReporter(Settings.EMPTY, "foo", Mockito.mock(UsagePersister.class));
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));
try (CountingInputStream counting = new CountingInputStream(source,
statusReporter)) {
try (CountingInputStream counting = new CountingInputStream(source, statusReporter)) {
byte buf[] = new byte[8];
while (counting.read(buf, 0, 8) >= 0) {
;
}
while (counting.read(buf, 0, 8) >= 0) {}
// an extra byte is read because we don't check the return
// value of the read() method
Assert.assertEquals(TEXT.length() - 1, usageReporter.getBytesReadSinceLastReport());

View File

@ -5,38 +5,19 @@
*/
package org.elasticsearch.xpack.prelert.job.status;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.usage.UsageReporter;
import static org.mockito.Mockito.mock;
/**
* Dummy StatusReporter for testing abstract class
*/
public class DummyStatusReporter extends StatusReporter {
boolean statusReported = false;
class DummyStatusReporter extends StatusReporter {
public DummyStatusReporter(Environment env, UsageReporter usageReporter) {
super(env, env.settings(), "DummyJobId", usageReporter, new JobDataCountsPersister() {
@Override
public void persistDataCounts(String jobId, DataCounts counts) {
}
}, null, 1);
DummyStatusReporter(UsageReporter usageReporter) {
super(Settings.EMPTY, "DummyJobId", usageReporter, mock(JobDataCountsPersister.class));
}
public DummyStatusReporter(Environment env, DataCounts counts,
UsageReporter usageReporter) {
super(env, env.settings(), "DummyJobId", counts, usageReporter, new JobDataCountsPersister() {
@Override
public void persistDataCounts(String jobId, DataCounts counts) {
}
}, null, 1);
}
public boolean isStatusReported() {
return statusReported;
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.prelert.job.status;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase;
@ -30,7 +29,6 @@ public class StatusReporterTests extends ESTestCase {
private UsageReporter usageReporter;
private JobDataCountsPersister jobDataCountsPersister;
private Logger mockLogger;
private StatusReporter statusReporter;
private Settings settings;
@ -40,11 +38,9 @@ public class StatusReporterTests extends ESTestCase {
settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(StatusReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.getKey(), MAX_PERCENT_DATE_PARSE_ERRORS)
.put(StatusReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.getKey(), MAX_PERCENT_OUT_OF_ORDER_ERRORS).build();
Environment env = new Environment(settings);
usageReporter = Mockito.mock(UsageReporter.class);
jobDataCountsPersister = Mockito.mock(JobDataCountsPersister.class);
mockLogger = Mockito.mock(Logger.class);
statusReporter = new StatusReporter(env, settings, JOB_ID, usageReporter, jobDataCountsPersister, mockLogger, 10L);
statusReporter = new StatusReporter(settings, JOB_ID, usageReporter, jobDataCountsPersister);
}
public void testSettingAcceptablePercentages() {
@ -64,7 +60,7 @@ public class StatusReporterTests extends ESTestCase {
Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build());
DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, new Date(), new Date());
statusReporter = new StatusReporter(env, settings, JOB_ID, counts, usageReporter, jobDataCountsPersister, mockLogger, 1);
statusReporter = new StatusReporter(settings, JOB_ID, counts, usageReporter, jobDataCountsPersister);
DataCounts stats = statusReporter.incrementalStats();
assertNotNull(stats);
assertAllCountFieldsEqualZero(stats);

View File

@ -1,66 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.usage;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.prelert.job.persistence.UsagePersister;
public class DummyUsageReporter extends UsageReporter {
long totalByteCount;
long totalFieldCount;
long totalRecordCount;
public DummyUsageReporter(Settings settings, String jobId, Logger logger) {
super(settings, jobId, new UsagePersister() {
@Override
public void persistUsage(String jobId, long bytesRead, long fieldsRead, long recordsRead) {
}
}, logger);
totalByteCount = 0;
totalFieldCount = 0;
totalRecordCount = 0;
}
public DummyUsageReporter(Settings settings, String jobId, UsagePersister persister, Logger logger) {
super(settings, jobId, persister, logger);
totalByteCount = 0;
totalFieldCount = 0;
totalRecordCount = 0;
}
@Override
public void addBytesRead(long bytesRead) {
super.addBytesRead(bytesRead);
totalByteCount += bytesRead;
}
@Override
public void addFieldsRecordsRead(long fieldsRead) {
super.addFieldsRecordsRead(fieldsRead);
totalFieldCount += fieldsRead;
++totalRecordCount;
}
public long getTotalBytesRead() {
return totalByteCount;
}
public long getTotalFieldsRead() {
return totalFieldCount;
}
public long getTotalRecordsRead() {
return totalRecordCount;
}
}

View File

@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.prelert.job.usage;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase;
@ -19,7 +17,7 @@ public class UsageReporterTests extends ESTestCase {
.put(UsageReporter.UPDATE_INTERVAL_SETTING.getKey(), 1).build();
UsagePersister persister = Mockito.mock(UsagePersister.class);
UsageReporter usage = new UsageReporter(settings, "job1", persister, Mockito.mock(Logger.class));
UsageReporter usage = new UsageReporter(settings, "job1", persister);
usage.addBytesRead(10);
usage.addFieldsRecordsRead(5);