diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 2cdeb532902..2716399f522 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; @@ -17,12 +18,13 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.get.MultiGetItemResponse; -import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.search.MultiSearchRequest; +import org.elasticsearch.action.search.MultiSearchRequestBuilder; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -61,6 +63,7 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder.BucketsQuery; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; @@ -74,6 +77,7 @@ import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.Result; +import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.security.support.Exceptions; import java.io.IOException; @@ -82,13 +86,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Supplier; @@ -239,6 +241,98 @@ public class JobProvider { DataCounts.PARSER, () -> new DataCounts(jobId)); } + public void getAutodetectParams(Job job, Consumer consumer, Consumer errorHandler) { + AutodetectParams.Builder paramsBuilder = new AutodetectParams.Builder(job.getId()); + + String jobId = job.getId(); + String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + String stateIndex = AnomalyDetectorsIndex.jobStateIndexName(); + MultiSearchRequestBuilder msearch = client.prepareMultiSearch() + .add(createDocIdSearch(resultsIndex, DataCounts.TYPE.getPreferredName(), DataCounts.documentId(jobId))) + .add(createDocIdSearch(resultsIndex, Result.TYPE.getPreferredName(), ModelSizeStats.documentId(jobId))) + .add(createDocIdSearch(resultsIndex, ModelSnapshot.TYPE.getPreferredName(), + ModelSnapshot.documentId(jobId, job.getModelSnapshotId()))) + .add(createDocIdSearch(stateIndex, Quantiles.TYPE.getPreferredName(), Quantiles.documentId(jobId))); + + for (String filterId : job.getAnalysisConfig().extractReferencedFilters()) { + msearch.add(createDocIdSearch(ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId)); + } + + msearch.execute(ActionListener.wrap( + response -> { + for (MultiSearchResponse.Item itemResponse : response.getResponses()) { + if (itemResponse.isFailure()) { + if (itemResponse.getFailure() instanceof IndexNotFoundException == false) { + throw itemResponse.getFailure(); + } else { + // Ignore IndexNotFoundException; AutodetectParamsBuilder has defaults for new jobs + } + } else { + SearchResponse searchResponse = itemResponse.getResponse(); + ShardSearchFailure[] shardFailures = searchResponse.getShardFailures(); + int unavailableShards = searchResponse.getTotalShards() - searchResponse.getSuccessfulShards(); + if (shardFailures != null && shardFailures.length > 0) { + LOGGER.error("[{}] Search request returned shard failures: {}", jobId, + Arrays.toString(shardFailures)); + errorHandler.accept(new ElasticsearchException( + ExceptionsHelper.shardFailuresToErrorMsg(jobId, shardFailures))); + } else if (unavailableShards > 0) { + errorHandler.accept(new ElasticsearchException("[" + jobId + + "] Search request encountered [" + unavailableShards + "] unavailable shards")); + } else { + SearchHit[] hits = searchResponse.getHits().getHits(); + if (hits.length == 1) { + parseAutodetectParamSearchHit(paramsBuilder, hits[0], errorHandler); + } else if (hits.length > 1) { + errorHandler.accept(new IllegalStateException("Got [" + + hits.length + "] even though search size was 1")); + } + } + } + } + consumer.accept(paramsBuilder.build()); + }, + errorHandler + )); + } + + private SearchRequestBuilder createDocIdSearch(String index, String type, String id) { + return client.prepareSearch(index).setSize(1) + .setQuery(QueryBuilders.idsQuery(type).addIds(id)) + .setRouting(id); + } + + private void parseAutodetectParamSearchHit(AutodetectParams.Builder paramsBuilder, + SearchHit hit, Consumer errorHandler) { + String type = hit.getType(); + if (DataCounts.TYPE.getPreferredName().equals(type)) { + paramsBuilder.setDataCounts(parseSearchHit(hit, DataCounts.PARSER, errorHandler)); + } else if (Result.TYPE.getPreferredName().equals(type)) { + ModelSizeStats.Builder modelSizeStats = parseSearchHit(hit, ModelSizeStats.PARSER, errorHandler); + paramsBuilder.setModelSizeStats(modelSizeStats == null ? null : modelSizeStats.build()); + } else if (ModelSnapshot.TYPE.getPreferredName().equals(type)) { + ModelSnapshot.Builder modelSnapshot = parseSearchHit(hit, ModelSnapshot.PARSER, errorHandler); + paramsBuilder.setModelSnapshot(modelSnapshot == null ? null : modelSnapshot.build()); + } else if (Quantiles.TYPE.getPreferredName().equals(type)) { + paramsBuilder.setQuantiles(parseSearchHit(hit, Quantiles.PARSER, errorHandler)); + } else if (MlFilter.TYPE.getPreferredName().equals(type)) { + paramsBuilder.addFilter(parseSearchHit(hit, MlFilter.PARSER, errorHandler)); + } else { + errorHandler.accept(new IllegalStateException("Unexpected type [" + type + "]")); + } + } + + private T parseSearchHit(SearchHit hit, BiFunction objectParser, + Consumer errorHandler) { + BytesReference source = hit.getSourceRef(); + try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) { + return objectParser.apply(parser, null); + } catch (IOException e) { + errorHandler.accept(new ElasticsearchParseException("failed to parse " + hit.getType(), e)); + return null; + } + } + private void get(String indexName, String type, String id, Consumer handler, Consumer errorHandler, BiFunction objectParser, Supplier notFoundSupplier) { GetRequest getRequest = new GetRequest(indexName, type, id); @@ -264,44 +358,6 @@ public class JobProvider { })); } - private void mget(String indexName, String type, Set ids, Consumer> handler, Consumer errorHandler, - BiFunction objectParser) { - if (ids.isEmpty()) { - handler.accept(Collections.emptySet()); - return; - } - - MultiGetRequest multiGetRequest = new MultiGetRequest(); - for (String id : ids) { - multiGetRequest.add(indexName, type, id); - } - client.multiGet(multiGetRequest, ActionListener.wrap( - mresponse -> { - Set objects = new HashSet<>(); - for (MultiGetItemResponse item : mresponse) { - GetResponse response = item.getResponse(); - if (response.isExists()) { - BytesReference source = response.getSourceAsBytesRef(); - try (XContentParser parser = XContentFactory.xContent(source) - .createParser(NamedXContentRegistry.EMPTY, source)) { - objects.add(objectParser.apply(parser, null)); - } catch (IOException e) { - throw new ElasticsearchParseException("failed to parse " + type, e); - } - } - } - handler.accept(objects); - }, - e -> { - if (e instanceof IndexNotFoundException == false) { - errorHandler.accept(e); - } else { - handler.accept(new HashSet<>()); - } - }) - ); - } - public static IndicesOptions addIgnoreUnavailable(IndicesOptions indicesOptions) { return IndicesOptions.fromOptions(true, indicesOptions.allowNoIndices(), indicesOptions.expandWildcardsOpen(), indicesOptions.expandWildcardsClosed(), indicesOptions); @@ -751,19 +807,6 @@ public class JobProvider { return new BatchedInfluencersIterator(client, jobId); } - /** - * Get the persisted quantiles state for the job - */ - public void getQuantiles(String jobId, Consumer handler, Consumer errorHandler) { - String indexName = AnomalyDetectorsIndex.jobStateIndexName(); - String quantilesId = Quantiles.documentId(jobId); - LOGGER.trace("ES API CALL: get ID {} type {} from index {}", quantilesId, Quantiles.TYPE.getPreferredName(), indexName); - get(indexName, Quantiles.TYPE.getPreferredName(), quantilesId, handler, errorHandler, Quantiles.PARSER, () -> { - LOGGER.info("There are currently no quantiles for job " + jobId); - return null; - }); - } - /** * Get a job's model snapshot by its id */ @@ -980,19 +1023,10 @@ public class JobProvider { handler, errorHandler, (parser, context) -> ModelSizeStats.PARSER.apply(parser, null).build(), () -> { LOGGER.trace("No memory usage details for job with id {}", jobId); - return null; + return new ModelSizeStats.Builder(jobId).build(); }); } - /** - * Retrieves the filter with the given {@code filterId} from the datastore. - * - * @param ids the id of the requested filter - */ - public void getFilters(Consumer> handler, Consumer errorHandler, Set ids) { - mget(ML_META_INDEX, MlFilter.TYPE.getPreferredName(), ids, handler, errorHandler, MlFilter.PARSER); - } - /** * Maps authorization failures when querying ML indexes to job-specific authorization failures attributed to the ML actions. * Works by replacing the action name with another provided by the caller, and appending the job ID. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index c150a646985..71c58974fd0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -24,7 +24,6 @@ import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobUpdate; -import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; @@ -32,12 +31,11 @@ import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersiste import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater; @@ -54,7 +52,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -211,7 +208,8 @@ public class AutodetectProcessManager extends AbstractComponent { } public void openJob(String jobId, long taskId, boolean ignoreDowntime, Consumer handler) { - gatherRequiredInformation(jobId, (dataCounts, modelSnapshot, quantiles, filters) -> { + Job job = jobManager.getJobOrThrowIfUnknown(jobId); + jobProvider.getAutodetectParams(job, params -> { // We need to fork, otherwise we restore model state from a network thread (several GET api calls): threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { @Override @@ -223,7 +221,7 @@ public class AutodetectProcessManager extends AbstractComponent { protected void doRun() throws Exception { try { AutodetectCommunicator communicator = autoDetectCommunicatorByJob.computeIfAbsent(jobId, id -> - create(id, taskId, dataCounts, modelSnapshot, quantiles, filters, ignoreDowntime, handler)); + create(id, taskId, params, ignoreDowntime, handler)); communicator.writeJobInputHeader(); setJobState(taskId, jobId, JobState.OPENED); } catch (Exception e1) { @@ -243,48 +241,37 @@ public class AutodetectProcessManager extends AbstractComponent { }); } - // TODO: add a method on JobProvider that fetches all required info via 1 msearch call, so that we have a single lambda - // instead of 4 nested lambdas. - void gatherRequiredInformation(String jobId, MultiConsumer handler, Consumer errorHandler) { - Job job = jobManager.getJobOrThrowIfUnknown(jobId); - jobProvider.dataCounts(jobId, dataCounts -> { - jobProvider.getModelSnapshot(jobId, job.getModelSnapshotId(), modelSnapshot -> { - jobProvider.getQuantiles(jobId, quantiles -> { - Set ids = job.getAnalysisConfig().extractReferencedFilters(); - jobProvider.getFilters(filterDocument -> handler.accept(dataCounts, modelSnapshot, quantiles, filterDocument), - errorHandler, ids); - }, errorHandler); - }, errorHandler); - }, errorHandler); - } - - interface MultiConsumer { - - void accept(DataCounts dataCounts, ModelSnapshot modelSnapshot, Quantiles quantiles, Set filters); - - } - - AutodetectCommunicator create(String jobId, long taskId, DataCounts dataCounts, ModelSnapshot modelSnapshot, Quantiles quantiles, - Set filters, boolean ignoreDowntime, Consumer handler) { + AutodetectCommunicator create(String jobId, long taskId, AutodetectParams autodetectParams, + boolean ignoreDowntime, Consumer handler) { if (autoDetectCommunicatorByJob.size() == maxAllowedRunningJobs) { throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached", RestStatus.TOO_MANY_REQUESTS); } + if (autodetectParams.dataCounts().getProcessedRecordCount() > 0) { + if (autodetectParams.modelSnapshot() == null) { + logger.error("[{}] No model snapshot could be found for a job with processed records", jobId); + } + if (autodetectParams.quantiles() == null) { + logger.error("[{}] No quantiles could be found for a job with processed records", jobId); + } + } + Job job = jobManager.getJobOrThrowIfUnknown(jobId); // A TP with no queue, so that we fail immediately if there are no threads available ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_PROCESS_THREAD_POOL_NAME); - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, dataCounts, - jobDataCountsPersister)) { + try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, + autodetectParams.dataCounts(), jobDataCountsPersister)) { ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), normalizerFactory); Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, threadPool.executor(MachineLearning.THREAD_POOL_NAME), job.getAnalysisConfig().getUsePerPartitionNormalization()); - AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, modelSnapshot, quantiles, filters, - ignoreDowntime, executorService); + AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(), + autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime, executorService); boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization(); - AutoDetectResultProcessor processor = new AutoDetectResultProcessor(client, jobId, renormalizer, jobResultsPersister); + AutoDetectResultProcessor processor = new AutoDetectResultProcessor( + client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats()); try { executorService.submit(() -> processor.process(process, usePerPartitionNormalization)); } catch (EsRejectedExecutionException e) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index e74c9679453..ebecf342946 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -31,6 +31,7 @@ import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import java.time.Duration; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.concurrent.CountDownLatch; /** @@ -62,22 +63,24 @@ public class AutoDetectResultProcessor { final CountDownLatch completionLatch = new CountDownLatch(1); private final FlushListener flushListener; + /** + * New model size stats are read as the process is running + */ private volatile ModelSizeStats latestModelSizeStats; - public AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister) { - this(client, jobId, renormalizer, persister, new FlushListener()); + public AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister, + ModelSizeStats latestModelSizeStats) { + this(client, jobId, renormalizer, persister, latestModelSizeStats, new FlushListener()); } - AutoDetectResultProcessor(Client client,String jobId, Renormalizer renormalizer, JobResultsPersister persister, - FlushListener flushListener) { - this.client = client; - this.jobId = jobId; - this.renormalizer = renormalizer; - this.persister = persister; - this.flushListener = flushListener; - - ModelSizeStats.Builder builder = new ModelSizeStats.Builder(jobId); - latestModelSizeStats = builder.build(); + AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister, + ModelSizeStats latestModelSizeStats, FlushListener flushListener) { + this.client = Objects.requireNonNull(client); + this.jobId = Objects.requireNonNull(jobId); + this.renormalizer = Objects.requireNonNull(renormalizer); + this.persister = Objects.requireNonNull(persister); + this.flushListener = Objects.requireNonNull(flushListener); + this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats); } public void process(AutodetectProcess process, boolean isPerPartitionNormalization) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParams.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParams.java new file mode 100644 index 00000000000..00c146a2194 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParams.java @@ -0,0 +1,137 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect.params; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.xpack.ml.job.config.MlFilter; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +public class AutodetectParams { + + private final DataCounts dataCounts; + private final ModelSizeStats modelSizeStats; + + @Nullable + private final ModelSnapshot modelSnapshot; + + @Nullable + private final Quantiles quantiles; + + private final Set filters; + + private AutodetectParams(DataCounts dataCounts, ModelSizeStats modelSizeStats, + @Nullable ModelSnapshot modelSnapshot, + @Nullable Quantiles quantiles, Set filters) { + this.dataCounts = Objects.requireNonNull(dataCounts); + this.modelSizeStats = Objects.requireNonNull(modelSizeStats); + this.modelSnapshot = modelSnapshot; + this.quantiles = quantiles; + this.filters = filters; + } + + public DataCounts dataCounts() { + return dataCounts; + } + + public ModelSizeStats modelSizeStats() { + return modelSizeStats; + } + + @Nullable + public ModelSnapshot modelSnapshot() { + return modelSnapshot; + } + + @Nullable + public Quantiles quantiles() { + return quantiles; + } + + public Set filters() { + return filters; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other instanceof AutodetectParams == false) { + return false; + } + + AutodetectParams that = (AutodetectParams) other; + + return Objects.equals(this.dataCounts, that.dataCounts) + && Objects.equals(this.modelSizeStats, that.modelSizeStats) + && Objects.equals(this.modelSnapshot, that.modelSnapshot) + && Objects.equals(this.quantiles, that.quantiles) + && Objects.equals(this.filters, that.filters); + } + + @Override + public int hashCode() { + return Objects.hash(dataCounts, modelSizeStats, modelSnapshot, quantiles, filters); + } + + public static class Builder { + + private DataCounts dataCounts; + private ModelSizeStats modelSizeStats; + private ModelSnapshot modelSnapshot; + private Quantiles quantiles; + private Set filters; + + public Builder(String jobId) { + dataCounts = new DataCounts(jobId); + modelSizeStats = new ModelSizeStats.Builder(jobId).build(); + filters = new HashSet<>(); + } + + public Builder setDataCounts(DataCounts dataCounts) { + this.dataCounts = dataCounts; + return this; + } + + public Builder setModelSizeStats(ModelSizeStats modelSizeStats) { + this.modelSizeStats = modelSizeStats; + return this; + } + + public Builder setModelSnapshot(ModelSnapshot modelSnapshot) { + this.modelSnapshot = modelSnapshot; + return this; + } + + public Builder setQuantiles(Quantiles quantiles) { + this.quantiles = quantiles; + return this; + } + + public Builder addFilter(MlFilter filter) { + filters.add(filter); + return this; + } + + public Builder setFilters(Set filters) { + filters = Objects.requireNonNull(filters); + return this; + } + + public AutodetectParams build() { + return new AutodetectParams(dataCounts, modelSizeStats, modelSnapshot, quantiles, + filters); + } + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/ExceptionsHelper.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/ExceptionsHelper.java index e5b901c93d1..920e8560ba7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/ExceptionsHelper.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/ExceptionsHelper.java @@ -9,7 +9,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.xpack.ml.job.messages.Messages; public class ExceptionsHelper { @@ -38,6 +40,20 @@ public class ExceptionsHelper { return new ElasticsearchStatusException(msg, RestStatus.CONFLICT); } + /** + * Creates an error message that explains there are shard failures, displays info + * for the first failure (shard/reason) and kindly asks to see more info in the logs + */ + public static String shardFailuresToErrorMsg(String jobId, ShardSearchFailure[] shardFailures) { + if (shardFailures == null || shardFailures.length == 0) { + throw new IllegalStateException("Invalid call with null or empty shardFailures"); + } + SearchShardTarget shardTarget = shardFailures[0].shard(); + return "[" + jobId + "] Search request returned shard failures; first failure: shard [" + + (shardTarget == null ? "_na" : shardTarget) + "], reason [" + + shardFailures[0].reason() + "]; see logs for more info"; + } + /** * A more REST-friendly Object.requireNonNull() */ diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 60aad2e87af..5eab63fbb1c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -17,6 +17,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearningTemplateRegistry; import org.elasticsearch.xpack.ml.action.util.QueryPage; +import org.elasticsearch.xpack.ml.job.config.JobTests; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; @@ -77,7 +78,8 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1)); jobProvider = new JobProvider(client(), builder.build()); capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>(); - resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, renormalizer, jobResultsPersister) { + resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, renormalizer, jobResultsPersister, + new ModelSizeStats.Builder(JOB_ID).build()) { @Override protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) { capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot); @@ -490,12 +492,13 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { } return resultHolder.get(); } + private Optional getQuantiles() throws Exception { AtomicReference errorHolder = new AtomicReference<>(); AtomicReference> resultHolder = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - jobProvider.getQuantiles(JOB_ID, q -> { - resultHolder.set(Optional.ofNullable(q)); + jobProvider.getAutodetectParams(JobTests.buildJobBuilder(JOB_ID).build(),params -> { + resultHolder.set(Optional.ofNullable(params.quantiles())); latch.countDown(); }, e -> { errorHolder.set(e); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java index 2ec8f78772f..40f7d358d5d 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java @@ -44,12 +44,10 @@ import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.Influe import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Influencer; -import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.Result; import org.mockito.ArgumentCaptor; @@ -81,52 +79,6 @@ public class JobProviderTests extends ESTestCase { private static final String CLUSTER_NAME = "myCluster"; private static final String JOB_ID = "foo"; - public void testGetQuantiles_GivenNoQuantilesForJob() throws Exception { - GetResponse getResponse = createGetResponse(false, null); - - Client client = getMockedClient(getResponse); - JobProvider provider = createProvider(client); - - Quantiles[] holder = new Quantiles[1]; - provider.getQuantiles(JOB_ID, quantiles -> holder[0] = quantiles, RuntimeException::new); - Quantiles quantiles = holder[0]; - assertNull(quantiles); - } - - public void testGetQuantiles_GivenQuantilesHaveNonEmptyState() throws Exception { - Map source = new HashMap<>(); - source.put(Job.ID.getPreferredName(), "foo"); - source.put(Quantiles.TIMESTAMP.getPreferredName(), 0L); - source.put(Quantiles.QUANTILE_STATE.getPreferredName(), "state"); - GetResponse getResponse = createGetResponse(true, source); - - Client client = getMockedClient(getResponse); - JobProvider provider = createProvider(client); - - Quantiles[] holder = new Quantiles[1]; - provider.getQuantiles(JOB_ID, quantiles -> holder[0] = quantiles, RuntimeException::new); - Quantiles quantiles = holder[0]; - assertNotNull(quantiles); - assertEquals("state", quantiles.getQuantileState()); - } - - public void testGetQuantiles_GivenQuantilesHaveEmptyState() throws Exception { - Map source = new HashMap<>(); - source.put(Job.ID.getPreferredName(), "foo"); - source.put(Quantiles.TIMESTAMP.getPreferredName(), new Date(0L).getTime()); - source.put(Quantiles.QUANTILE_STATE.getPreferredName(), ""); - GetResponse getResponse = createGetResponse(true, source); - - Client client = getMockedClient(getResponse); - JobProvider provider = createProvider(client); - - Quantiles[] holder = new Quantiles[1]; - provider.getQuantiles(JOB_ID, quantiles -> holder[0] = quantiles, RuntimeException::new); - Quantiles quantiles = holder[0]; - assertNotNull(quantiles); - assertEquals("", quantiles.getQuantileState()); - } - @SuppressWarnings("unchecked") public void testCreateJobResultsIndex() { String resultsIndexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index db74717c24f..9199557d285 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -28,10 +28,12 @@ import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; @@ -82,6 +84,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private NormalizerFactory normalizerFactory; private DataCounts dataCounts = new DataCounts("foo"); + private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build(); private ModelSnapshot modelSnapshot = new ModelSnapshot.Builder("foo").build(); private Quantiles quantiles = new Quantiles("foo", new Date(), "state"); private Set filters = new HashSet<>(); @@ -98,28 +101,10 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") - Consumer handler = (Consumer) invocationOnMock.getArguments()[1]; - handler.accept(dataCounts); + Consumer handler = (Consumer) invocationOnMock.getArguments()[1]; + handler.accept(buildAutodetectParams()); return null; - }).when(jobProvider).dataCounts(any(), any(), any()); - doAnswer(invocationOnMock -> { - @SuppressWarnings("unchecked") - Consumer handler = (Consumer) invocationOnMock.getArguments()[2]; - handler.accept(modelSnapshot); - return null; - }).when(jobProvider).getModelSnapshot(anyString(), anyString(), any(), any()); - doAnswer(invocationOnMock -> { - @SuppressWarnings("unchecked") - Consumer handler = (Consumer) invocationOnMock.getArguments()[1]; - handler.accept(quantiles); - return null; - }).when(jobProvider).getQuantiles(any(), any(), any()); - doAnswer(invocationOnMock -> { - @SuppressWarnings("unchecked") - Consumer> handler = (Consumer>) invocationOnMock.getArguments()[0]; - handler.accept(filters); - return null; - }).when(jobProvider).getFilters(any(), any(), any()); + }).when(jobProvider).getAutodetectParams(any(), any(), any()); } public void testOpenJob() { @@ -137,14 +122,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { public void testOpenJob_exceedMaxNumJobs() { when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); - doAnswer(invocationOnMock -> { - String jobId = (String) invocationOnMock.getArguments()[0]; - @SuppressWarnings("unchecked") - Consumer handler = (Consumer) invocationOnMock.getArguments()[1]; - handler.accept(new DataCounts(jobId)); - return null; - }).when(jobProvider).dataCounts(any(), any(), any()); - when(jobManager.getJobOrThrowIfUnknown("bar")).thenReturn(createJobDetails("bar")); when(jobManager.getJobOrThrowIfUnknown("baz")).thenReturn(createJobDetails("baz")); when(jobManager.getJobOrThrowIfUnknown("foobar")).thenReturn(createJobDetails("foobar")); @@ -170,11 +147,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { ModelSnapshot modelSnapshot = new ModelSnapshot.Builder("foo").build(); Quantiles quantiles = new Quantiles("foo", new Date(), "state"); Set filters = new HashSet<>(); - doAnswer(invocationOnMock -> { - AutodetectProcessManager.MultiConsumer consumer = (AutodetectProcessManager.MultiConsumer) invocationOnMock.getArguments()[1]; - consumer.accept(dataCounts, modelSnapshot, quantiles, filters); - return null; - }).when(manager).gatherRequiredInformation(any(), any(), any()); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") CheckedConsumer consumer = (CheckedConsumer) invocationOnMock.getArguments()[2]; @@ -323,13 +295,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(threadPool.executor(anyString())).thenReturn(executorService); when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(mock(ThreadPool.Cancellable.class)); when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id")); - doAnswer(invocationOnMock -> { - String jobId = (String) invocationOnMock.getArguments()[0]; - @SuppressWarnings("unchecked") - Consumer handler = (Consumer) invocationOnMock.getArguments()[1]; - handler.accept(new DataCounts(jobId)); - return null; - }).when(jobProvider).dataCounts(eq("my_id"), any(), any()); PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess; @@ -339,10 +304,20 @@ public class AutodetectProcessManagerTests extends ESTestCase { new NamedXContentRegistry(Collections.emptyList())); expectThrows(EsRejectedExecutionException.class, - () -> manager.create("my_id", 1L, dataCounts, modelSnapshot, quantiles, filters, false, e -> {})); + () -> manager.create("my_id", 1L, buildAutodetectParams(), false, e -> {})); verify(autodetectProcess, times(1)).close(); } + private AutodetectParams buildAutodetectParams() { + return new AutodetectParams.Builder("foo") + .setDataCounts(dataCounts) + .setModelSizeStats(modelSizeStats) + .setModelSnapshot(modelSnapshot) + .setQuantiles(quantiles) + .setFilters(filters) + .build(); + } + private AutodetectProcessManager createManager(AutodetectCommunicator communicator) { Client client = mock(Client.class); PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); @@ -359,8 +334,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { autodetectProcessFactory, normalizerFactory, persistentTasksService, new NamedXContentRegistry(Collections.emptyList())); manager = spy(manager); - doReturn(communicator).when(manager) - .create(any(), anyLong(), eq(dataCounts), eq(modelSnapshot), eq(quantiles), eq(filters), anyBoolean(), any()); + doReturn(communicator).when(manager).create(any(), anyLong(), eq(buildAutodetectParams()), anyBoolean(), any()); return manager; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 7c058da06be..0c62c073b3e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -57,7 +57,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase { renormalizer = mock(Renormalizer.class); persister = mock(JobResultsPersister.class); flushListener = mock(FlushListener.class); - processorUnderTest = new AutoDetectResultProcessor(client, JOB_ID, renormalizer, persister, flushListener); + processorUnderTest = new AutoDetectResultProcessor(client, JOB_ID, renormalizer, persister, + new ModelSizeStats.Builder(JOB_ID).build(), flushListener); } public void testProcess() {