[ML] Correctly initialise model size stats on job reopen (elastic/x-pack-elasticsearch#822)

This commit ensures that upon reopening a job, the in-memory
model size stats are correctly initialized from the ones
last persisted in the results index.

This fixes the bug that could be seen upon opening a job
that has processed data and immediately calling its _stats
API only to see the model size stats are zero.

In addition, this PR refactors getting the parameters needed to
open an autodetect job:

- Previously, there was a method chaining together multiple
callbacks to the job provider.
- These methods were retrieving data via GETs which is not
going to work with index rollover.

Note, this PR is not eliminating all GETs. More work is needed
to fully support index rollover.

relates elastic/x-pack-elasticsearch#801

Original commit: elastic/x-pack-elasticsearch@1ef1d44b32
This commit is contained in:
Dimitris Athanasiou 2017-03-27 13:17:44 +01:00 committed by GitHub
parent 9cd21cca4b
commit 27a313c07d
9 changed files with 314 additions and 207 deletions

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.job.persistence;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
@ -17,12 +18,13 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchRequestBuilder;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
@ -61,6 +63,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder.BucketsQuery;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
@ -74,6 +77,7 @@ import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.security.support.Exceptions;
import java.io.IOException;
@ -82,13 +86,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -239,6 +241,98 @@ public class JobProvider {
DataCounts.PARSER, () -> new DataCounts(jobId));
}
public void getAutodetectParams(Job job, Consumer<AutodetectParams> consumer, Consumer<Exception> errorHandler) {
AutodetectParams.Builder paramsBuilder = new AutodetectParams.Builder(job.getId());
String jobId = job.getId();
String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
String stateIndex = AnomalyDetectorsIndex.jobStateIndexName();
MultiSearchRequestBuilder msearch = client.prepareMultiSearch()
.add(createDocIdSearch(resultsIndex, DataCounts.TYPE.getPreferredName(), DataCounts.documentId(jobId)))
.add(createDocIdSearch(resultsIndex, Result.TYPE.getPreferredName(), ModelSizeStats.documentId(jobId)))
.add(createDocIdSearch(resultsIndex, ModelSnapshot.TYPE.getPreferredName(),
ModelSnapshot.documentId(jobId, job.getModelSnapshotId())))
.add(createDocIdSearch(stateIndex, Quantiles.TYPE.getPreferredName(), Quantiles.documentId(jobId)));
for (String filterId : job.getAnalysisConfig().extractReferencedFilters()) {
msearch.add(createDocIdSearch(ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId));
}
msearch.execute(ActionListener.wrap(
response -> {
for (MultiSearchResponse.Item itemResponse : response.getResponses()) {
if (itemResponse.isFailure()) {
if (itemResponse.getFailure() instanceof IndexNotFoundException == false) {
throw itemResponse.getFailure();
} else {
// Ignore IndexNotFoundException; AutodetectParamsBuilder has defaults for new jobs
}
} else {
SearchResponse searchResponse = itemResponse.getResponse();
ShardSearchFailure[] shardFailures = searchResponse.getShardFailures();
int unavailableShards = searchResponse.getTotalShards() - searchResponse.getSuccessfulShards();
if (shardFailures != null && shardFailures.length > 0) {
LOGGER.error("[{}] Search request returned shard failures: {}", jobId,
Arrays.toString(shardFailures));
errorHandler.accept(new ElasticsearchException(
ExceptionsHelper.shardFailuresToErrorMsg(jobId, shardFailures)));
} else if (unavailableShards > 0) {
errorHandler.accept(new ElasticsearchException("[" + jobId
+ "] Search request encountered [" + unavailableShards + "] unavailable shards"));
} else {
SearchHit[] hits = searchResponse.getHits().getHits();
if (hits.length == 1) {
parseAutodetectParamSearchHit(paramsBuilder, hits[0], errorHandler);
} else if (hits.length > 1) {
errorHandler.accept(new IllegalStateException("Got ["
+ hits.length + "] even though search size was 1"));
}
}
}
}
consumer.accept(paramsBuilder.build());
},
errorHandler
));
}
private SearchRequestBuilder createDocIdSearch(String index, String type, String id) {
return client.prepareSearch(index).setSize(1)
.setQuery(QueryBuilders.idsQuery(type).addIds(id))
.setRouting(id);
}
private void parseAutodetectParamSearchHit(AutodetectParams.Builder paramsBuilder,
SearchHit hit, Consumer<Exception> errorHandler) {
String type = hit.getType();
if (DataCounts.TYPE.getPreferredName().equals(type)) {
paramsBuilder.setDataCounts(parseSearchHit(hit, DataCounts.PARSER, errorHandler));
} else if (Result.TYPE.getPreferredName().equals(type)) {
ModelSizeStats.Builder modelSizeStats = parseSearchHit(hit, ModelSizeStats.PARSER, errorHandler);
paramsBuilder.setModelSizeStats(modelSizeStats == null ? null : modelSizeStats.build());
} else if (ModelSnapshot.TYPE.getPreferredName().equals(type)) {
ModelSnapshot.Builder modelSnapshot = parseSearchHit(hit, ModelSnapshot.PARSER, errorHandler);
paramsBuilder.setModelSnapshot(modelSnapshot == null ? null : modelSnapshot.build());
} else if (Quantiles.TYPE.getPreferredName().equals(type)) {
paramsBuilder.setQuantiles(parseSearchHit(hit, Quantiles.PARSER, errorHandler));
} else if (MlFilter.TYPE.getPreferredName().equals(type)) {
paramsBuilder.addFilter(parseSearchHit(hit, MlFilter.PARSER, errorHandler));
} else {
errorHandler.accept(new IllegalStateException("Unexpected type [" + type + "]"));
}
}
private <T, U> T parseSearchHit(SearchHit hit, BiFunction<XContentParser, U, T> objectParser,
Consumer<Exception> errorHandler) {
BytesReference source = hit.getSourceRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
return objectParser.apply(parser, null);
} catch (IOException e) {
errorHandler.accept(new ElasticsearchParseException("failed to parse " + hit.getType(), e));
return null;
}
}
private <T, U> void get(String indexName, String type, String id, Consumer<T> handler, Consumer<Exception> errorHandler,
BiFunction<XContentParser, U, T> objectParser, Supplier<T> notFoundSupplier) {
GetRequest getRequest = new GetRequest(indexName, type, id);
@ -264,44 +358,6 @@ public class JobProvider {
}));
}
private <T, U> void mget(String indexName, String type, Set<String> ids, Consumer<Set<T>> handler, Consumer<Exception> errorHandler,
BiFunction<XContentParser, U, T> objectParser) {
if (ids.isEmpty()) {
handler.accept(Collections.emptySet());
return;
}
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (String id : ids) {
multiGetRequest.add(indexName, type, id);
}
client.multiGet(multiGetRequest, ActionListener.wrap(
mresponse -> {
Set<T> objects = new HashSet<>();
for (MultiGetItemResponse item : mresponse) {
GetResponse response = item.getResponse();
if (response.isExists()) {
BytesReference source = response.getSourceAsBytesRef();
try (XContentParser parser = XContentFactory.xContent(source)
.createParser(NamedXContentRegistry.EMPTY, source)) {
objects.add(objectParser.apply(parser, null));
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse " + type, e);
}
}
}
handler.accept(objects);
},
e -> {
if (e instanceof IndexNotFoundException == false) {
errorHandler.accept(e);
} else {
handler.accept(new HashSet<>());
}
})
);
}
public static IndicesOptions addIgnoreUnavailable(IndicesOptions indicesOptions) {
return IndicesOptions.fromOptions(true, indicesOptions.allowNoIndices(), indicesOptions.expandWildcardsOpen(),
indicesOptions.expandWildcardsClosed(), indicesOptions);
@ -751,19 +807,6 @@ public class JobProvider {
return new BatchedInfluencersIterator(client, jobId);
}
/**
* Get the persisted quantiles state for the job
*/
public void getQuantiles(String jobId, Consumer<Quantiles> handler, Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobStateIndexName();
String quantilesId = Quantiles.documentId(jobId);
LOGGER.trace("ES API CALL: get ID {} type {} from index {}", quantilesId, Quantiles.TYPE.getPreferredName(), indexName);
get(indexName, Quantiles.TYPE.getPreferredName(), quantilesId, handler, errorHandler, Quantiles.PARSER, () -> {
LOGGER.info("There are currently no quantiles for job " + jobId);
return null;
});
}
/**
* Get a job's model snapshot by its id
*/
@ -980,19 +1023,10 @@ public class JobProvider {
handler, errorHandler, (parser, context) -> ModelSizeStats.PARSER.apply(parser, null).build(),
() -> {
LOGGER.trace("No memory usage details for job with id {}", jobId);
return null;
return new ModelSizeStats.Builder(jobId).build();
});
}
/**
* Retrieves the filter with the given {@code filterId} from the datastore.
*
* @param ids the id of the requested filter
*/
public void getFilters(Consumer<Set<MlFilter>> handler, Consumer<Exception> errorHandler, Set<String> ids) {
mget(ML_META_INDEX, MlFilter.TYPE.getPreferredName(), ids, handler, errorHandler, MlFilter.PARSER);
}
/**
* Maps authorization failures when querying ML indexes to job-specific authorization failures attributed to the ML actions.
* Works by replacing the action name with another provided by the caller, and appending the job ID.

View File

@ -24,7 +24,6 @@ import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
@ -32,12 +31,11 @@ import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersiste
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater;
@ -54,7 +52,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@ -211,7 +208,8 @@ public class AutodetectProcessManager extends AbstractComponent {
}
public void openJob(String jobId, long taskId, boolean ignoreDowntime, Consumer<Exception> handler) {
gatherRequiredInformation(jobId, (dataCounts, modelSnapshot, quantiles, filters) -> {
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
jobProvider.getAutodetectParams(job, params -> {
// We need to fork, otherwise we restore model state from a network thread (several GET api calls):
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
@Override
@ -223,7 +221,7 @@ public class AutodetectProcessManager extends AbstractComponent {
protected void doRun() throws Exception {
try {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.computeIfAbsent(jobId, id ->
create(id, taskId, dataCounts, modelSnapshot, quantiles, filters, ignoreDowntime, handler));
create(id, taskId, params, ignoreDowntime, handler));
communicator.writeJobInputHeader();
setJobState(taskId, jobId, JobState.OPENED);
} catch (Exception e1) {
@ -243,48 +241,37 @@ public class AutodetectProcessManager extends AbstractComponent {
});
}
// TODO: add a method on JobProvider that fetches all required info via 1 msearch call, so that we have a single lambda
// instead of 4 nested lambdas.
void gatherRequiredInformation(String jobId, MultiConsumer handler, Consumer<Exception> errorHandler) {
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
jobProvider.dataCounts(jobId, dataCounts -> {
jobProvider.getModelSnapshot(jobId, job.getModelSnapshotId(), modelSnapshot -> {
jobProvider.getQuantiles(jobId, quantiles -> {
Set<String> ids = job.getAnalysisConfig().extractReferencedFilters();
jobProvider.getFilters(filterDocument -> handler.accept(dataCounts, modelSnapshot, quantiles, filterDocument),
errorHandler, ids);
}, errorHandler);
}, errorHandler);
}, errorHandler);
}
interface MultiConsumer {
void accept(DataCounts dataCounts, ModelSnapshot modelSnapshot, Quantiles quantiles, Set<MlFilter> filters);
}
AutodetectCommunicator create(String jobId, long taskId, DataCounts dataCounts, ModelSnapshot modelSnapshot, Quantiles quantiles,
Set<MlFilter> filters, boolean ignoreDowntime, Consumer<Exception> handler) {
AutodetectCommunicator create(String jobId, long taskId, AutodetectParams autodetectParams,
boolean ignoreDowntime, Consumer<Exception> handler) {
if (autoDetectCommunicatorByJob.size() == maxAllowedRunningJobs) {
throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached",
RestStatus.TOO_MANY_REQUESTS);
}
if (autodetectParams.dataCounts().getProcessedRecordCount() > 0) {
if (autodetectParams.modelSnapshot() == null) {
logger.error("[{}] No model snapshot could be found for a job with processed records", jobId);
}
if (autodetectParams.quantiles() == null) {
logger.error("[{}] No quantiles could be found for a job with processed records", jobId);
}
}
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
// A TP with no queue, so that we fail immediately if there are no threads available
ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_PROCESS_THREAD_POOL_NAME);
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, dataCounts,
jobDataCountsPersister)) {
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job,
autodetectParams.dataCounts(), jobDataCountsPersister)) {
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client),
normalizerFactory);
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater,
threadPool.executor(MachineLearning.THREAD_POOL_NAME), job.getAnalysisConfig().getUsePerPartitionNormalization());
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, modelSnapshot, quantiles, filters,
ignoreDowntime, executorService);
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(),
autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime, executorService);
boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization();
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(client, jobId, renormalizer, jobResultsPersister);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(
client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats());
try {
executorService.submit(() -> processor.process(process, usePerPartitionNormalization));
} catch (EsRejectedExecutionException e) {

View File

@ -31,6 +31,7 @@ import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
/**
@ -62,22 +63,24 @@ public class AutoDetectResultProcessor {
final CountDownLatch completionLatch = new CountDownLatch(1);
private final FlushListener flushListener;
/**
* New model size stats are read as the process is running
*/
private volatile ModelSizeStats latestModelSizeStats;
public AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister) {
this(client, jobId, renormalizer, persister, new FlushListener());
public AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister,
ModelSizeStats latestModelSizeStats) {
this(client, jobId, renormalizer, persister, latestModelSizeStats, new FlushListener());
}
AutoDetectResultProcessor(Client client,String jobId, Renormalizer renormalizer, JobResultsPersister persister,
FlushListener flushListener) {
this.client = client;
this.jobId = jobId;
this.renormalizer = renormalizer;
this.persister = persister;
this.flushListener = flushListener;
ModelSizeStats.Builder builder = new ModelSizeStats.Builder(jobId);
latestModelSizeStats = builder.build();
AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister,
ModelSizeStats latestModelSizeStats, FlushListener flushListener) {
this.client = Objects.requireNonNull(client);
this.jobId = Objects.requireNonNull(jobId);
this.renormalizer = Objects.requireNonNull(renormalizer);
this.persister = Objects.requireNonNull(persister);
this.flushListener = Objects.requireNonNull(flushListener);
this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats);
}
public void process(AutodetectProcess process, boolean isPerPartitionNormalization) {

View File

@ -0,0 +1,137 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.params;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class AutodetectParams {
private final DataCounts dataCounts;
private final ModelSizeStats modelSizeStats;
@Nullable
private final ModelSnapshot modelSnapshot;
@Nullable
private final Quantiles quantiles;
private final Set<MlFilter> filters;
private AutodetectParams(DataCounts dataCounts, ModelSizeStats modelSizeStats,
@Nullable ModelSnapshot modelSnapshot,
@Nullable Quantiles quantiles, Set<MlFilter> filters) {
this.dataCounts = Objects.requireNonNull(dataCounts);
this.modelSizeStats = Objects.requireNonNull(modelSizeStats);
this.modelSnapshot = modelSnapshot;
this.quantiles = quantiles;
this.filters = filters;
}
public DataCounts dataCounts() {
return dataCounts;
}
public ModelSizeStats modelSizeStats() {
return modelSizeStats;
}
@Nullable
public ModelSnapshot modelSnapshot() {
return modelSnapshot;
}
@Nullable
public Quantiles quantiles() {
return quantiles;
}
public Set<MlFilter> filters() {
return filters;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof AutodetectParams == false) {
return false;
}
AutodetectParams that = (AutodetectParams) other;
return Objects.equals(this.dataCounts, that.dataCounts)
&& Objects.equals(this.modelSizeStats, that.modelSizeStats)
&& Objects.equals(this.modelSnapshot, that.modelSnapshot)
&& Objects.equals(this.quantiles, that.quantiles)
&& Objects.equals(this.filters, that.filters);
}
@Override
public int hashCode() {
return Objects.hash(dataCounts, modelSizeStats, modelSnapshot, quantiles, filters);
}
public static class Builder {
private DataCounts dataCounts;
private ModelSizeStats modelSizeStats;
private ModelSnapshot modelSnapshot;
private Quantiles quantiles;
private Set<MlFilter> filters;
public Builder(String jobId) {
dataCounts = new DataCounts(jobId);
modelSizeStats = new ModelSizeStats.Builder(jobId).build();
filters = new HashSet<>();
}
public Builder setDataCounts(DataCounts dataCounts) {
this.dataCounts = dataCounts;
return this;
}
public Builder setModelSizeStats(ModelSizeStats modelSizeStats) {
this.modelSizeStats = modelSizeStats;
return this;
}
public Builder setModelSnapshot(ModelSnapshot modelSnapshot) {
this.modelSnapshot = modelSnapshot;
return this;
}
public Builder setQuantiles(Quantiles quantiles) {
this.quantiles = quantiles;
return this;
}
public Builder addFilter(MlFilter filter) {
filters.add(filter);
return this;
}
public Builder setFilters(Set<MlFilter> filters) {
filters = Objects.requireNonNull(filters);
return this;
}
public AutodetectParams build() {
return new AutodetectParams(dataCounts, modelSizeStats, modelSnapshot, quantiles,
filters);
}
}
}

View File

@ -9,7 +9,9 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.xpack.ml.job.messages.Messages;
public class ExceptionsHelper {
@ -38,6 +40,20 @@ public class ExceptionsHelper {
return new ElasticsearchStatusException(msg, RestStatus.CONFLICT);
}
/**
* Creates an error message that explains there are shard failures, displays info
* for the first failure (shard/reason) and kindly asks to see more info in the logs
*/
public static String shardFailuresToErrorMsg(String jobId, ShardSearchFailure[] shardFailures) {
if (shardFailures == null || shardFailures.length == 0) {
throw new IllegalStateException("Invalid call with null or empty shardFailures");
}
SearchShardTarget shardTarget = shardFailures[0].shard();
return "[" + jobId + "] Search request returned shard failures; first failure: shard ["
+ (shardTarget == null ? "_na" : shardTarget) + "], reason ["
+ shardFailures[0].reason() + "]; see logs for more info";
}
/**
* A more REST-friendly Object.requireNonNull()
*/

View File

@ -17,6 +17,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearningTemplateRegistry;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.JobTests;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
@ -77,7 +78,8 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1));
jobProvider = new JobProvider(client(), builder.build());
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, renormalizer, jobResultsPersister) {
resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, renormalizer, jobResultsPersister,
new ModelSizeStats.Builder(JOB_ID).build()) {
@Override
protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) {
capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot);
@ -490,12 +492,13 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
}
return resultHolder.get();
}
private Optional<Quantiles> getQuantiles() throws Exception {
AtomicReference<Exception> errorHolder = new AtomicReference<>();
AtomicReference<Optional<Quantiles>> resultHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
jobProvider.getQuantiles(JOB_ID, q -> {
resultHolder.set(Optional.ofNullable(q));
jobProvider.getAutodetectParams(JobTests.buildJobBuilder(JOB_ID).build(),params -> {
resultHolder.set(Optional.ofNullable(params.quantiles()));
latch.countDown();
}, e -> {
errorHolder.set(e);

View File

@ -44,12 +44,10 @@ import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.Influe
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.mockito.ArgumentCaptor;
@ -81,52 +79,6 @@ public class JobProviderTests extends ESTestCase {
private static final String CLUSTER_NAME = "myCluster";
private static final String JOB_ID = "foo";
public void testGetQuantiles_GivenNoQuantilesForJob() throws Exception {
GetResponse getResponse = createGetResponse(false, null);
Client client = getMockedClient(getResponse);
JobProvider provider = createProvider(client);
Quantiles[] holder = new Quantiles[1];
provider.getQuantiles(JOB_ID, quantiles -> holder[0] = quantiles, RuntimeException::new);
Quantiles quantiles = holder[0];
assertNull(quantiles);
}
public void testGetQuantiles_GivenQuantilesHaveNonEmptyState() throws Exception {
Map<String, Object> source = new HashMap<>();
source.put(Job.ID.getPreferredName(), "foo");
source.put(Quantiles.TIMESTAMP.getPreferredName(), 0L);
source.put(Quantiles.QUANTILE_STATE.getPreferredName(), "state");
GetResponse getResponse = createGetResponse(true, source);
Client client = getMockedClient(getResponse);
JobProvider provider = createProvider(client);
Quantiles[] holder = new Quantiles[1];
provider.getQuantiles(JOB_ID, quantiles -> holder[0] = quantiles, RuntimeException::new);
Quantiles quantiles = holder[0];
assertNotNull(quantiles);
assertEquals("state", quantiles.getQuantileState());
}
public void testGetQuantiles_GivenQuantilesHaveEmptyState() throws Exception {
Map<String, Object> source = new HashMap<>();
source.put(Job.ID.getPreferredName(), "foo");
source.put(Quantiles.TIMESTAMP.getPreferredName(), new Date(0L).getTime());
source.put(Quantiles.QUANTILE_STATE.getPreferredName(), "");
GetResponse getResponse = createGetResponse(true, source);
Client client = getMockedClient(getResponse);
JobProvider provider = createProvider(client);
Quantiles[] holder = new Quantiles[1];
provider.getQuantiles(JOB_ID, quantiles -> holder[0] = quantiles, RuntimeException::new);
Quantiles quantiles = holder[0];
assertNotNull(quantiles);
assertEquals("", quantiles.getQuantileState());
}
@SuppressWarnings("unchecked")
public void testCreateJobResultsIndex() {
String resultsIndexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT;

View File

@ -28,10 +28,12 @@ import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
@ -82,6 +84,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private NormalizerFactory normalizerFactory;
private DataCounts dataCounts = new DataCounts("foo");
private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build();
private ModelSnapshot modelSnapshot = new ModelSnapshot.Builder("foo").build();
private Quantiles quantiles = new Quantiles("foo", new Date(), "state");
private Set<MlFilter> filters = new HashSet<>();
@ -98,28 +101,10 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
Consumer<DataCounts> handler = (Consumer<DataCounts>) invocationOnMock.getArguments()[1];
handler.accept(dataCounts);
Consumer<AutodetectParams> handler = (Consumer<AutodetectParams>) invocationOnMock.getArguments()[1];
handler.accept(buildAutodetectParams());
return null;
}).when(jobProvider).dataCounts(any(), any(), any());
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
Consumer<ModelSnapshot> handler = (Consumer<ModelSnapshot>) invocationOnMock.getArguments()[2];
handler.accept(modelSnapshot);
return null;
}).when(jobProvider).getModelSnapshot(anyString(), anyString(), any(), any());
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
Consumer<Quantiles> handler = (Consumer<Quantiles>) invocationOnMock.getArguments()[1];
handler.accept(quantiles);
return null;
}).when(jobProvider).getQuantiles(any(), any(), any());
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
Consumer<Set<MlFilter>> handler = (Consumer<Set<MlFilter>>) invocationOnMock.getArguments()[0];
handler.accept(filters);
return null;
}).when(jobProvider).getFilters(any(), any(), any());
}).when(jobProvider).getAutodetectParams(any(), any(), any());
}
public void testOpenJob() {
@ -137,14 +122,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
public void testOpenJob_exceedMaxNumJobs() {
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
doAnswer(invocationOnMock -> {
String jobId = (String) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
Consumer<DataCounts> handler = (Consumer<DataCounts>) invocationOnMock.getArguments()[1];
handler.accept(new DataCounts(jobId));
return null;
}).when(jobProvider).dataCounts(any(), any(), any());
when(jobManager.getJobOrThrowIfUnknown("bar")).thenReturn(createJobDetails("bar"));
when(jobManager.getJobOrThrowIfUnknown("baz")).thenReturn(createJobDetails("baz"));
when(jobManager.getJobOrThrowIfUnknown("foobar")).thenReturn(createJobDetails("foobar"));
@ -170,11 +147,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder("foo").build();
Quantiles quantiles = new Quantiles("foo", new Date(), "state");
Set<MlFilter> filters = new HashSet<>();
doAnswer(invocationOnMock -> {
AutodetectProcessManager.MultiConsumer consumer = (AutodetectProcessManager.MultiConsumer) invocationOnMock.getArguments()[1];
consumer.accept(dataCounts, modelSnapshot, quantiles, filters);
return null;
}).when(manager).gatherRequiredInformation(any(), any(), any());
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
CheckedConsumer<Exception, IOException> consumer = (CheckedConsumer<Exception, IOException>) invocationOnMock.getArguments()[2];
@ -323,13 +295,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(threadPool.executor(anyString())).thenReturn(executorService);
when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(mock(ThreadPool.Cancellable.class));
when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id"));
doAnswer(invocationOnMock -> {
String jobId = (String) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
Consumer<DataCounts> handler = (Consumer<DataCounts>) invocationOnMock.getArguments()[1];
handler.accept(new DataCounts(jobId));
return null;
}).when(jobProvider).dataCounts(eq("my_id"), any(), any());
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess;
@ -339,10 +304,20 @@ public class AutodetectProcessManagerTests extends ESTestCase {
new NamedXContentRegistry(Collections.emptyList()));
expectThrows(EsRejectedExecutionException.class,
() -> manager.create("my_id", 1L, dataCounts, modelSnapshot, quantiles, filters, false, e -> {}));
() -> manager.create("my_id", 1L, buildAutodetectParams(), false, e -> {}));
verify(autodetectProcess, times(1)).close();
}
private AutodetectParams buildAutodetectParams() {
return new AutodetectParams.Builder("foo")
.setDataCounts(dataCounts)
.setModelSizeStats(modelSizeStats)
.setModelSnapshot(modelSnapshot)
.setQuantiles(quantiles)
.setFilters(filters)
.build();
}
private AutodetectProcessManager createManager(AutodetectCommunicator communicator) {
Client client = mock(Client.class);
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
@ -359,8 +334,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
autodetectProcessFactory, normalizerFactory, persistentTasksService,
new NamedXContentRegistry(Collections.emptyList()));
manager = spy(manager);
doReturn(communicator).when(manager)
.create(any(), anyLong(), eq(dataCounts), eq(modelSnapshot), eq(quantiles), eq(filters), anyBoolean(), any());
doReturn(communicator).when(manager).create(any(), anyLong(), eq(buildAutodetectParams()), anyBoolean(), any());
return manager;
}

View File

@ -57,7 +57,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
renormalizer = mock(Renormalizer.class);
persister = mock(JobResultsPersister.class);
flushListener = mock(FlushListener.class);
processorUnderTest = new AutoDetectResultProcessor(client, JOB_ID, renormalizer, persister, flushListener);
processorUnderTest = new AutoDetectResultProcessor(client, JOB_ID, renormalizer, persister,
new ModelSizeStats.Builder(JOB_ID).build(), flushListener);
}
public void testProcess() {