diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java index 5fdefb0fc7d..0d2193e2859 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java @@ -56,6 +56,7 @@ import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager; import org.elasticsearch.xpack.prelert.job.manager.JobManager; import org.elasticsearch.xpack.prelert.job.metadata.JobAllocator; import org.elasticsearch.xpack.prelert.job.metadata.JobLifeCycleService; +import org.elasticsearch.xpack.prelert.job.metadata.PrelertInitializationService; import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleterFactory; import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister; @@ -180,7 +181,8 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { new JobAllocator(settings, clusterService, threadPool), new JobLifeCycleService(settings, client, clusterService, scheduledJobService, dataProcessor, threadPool.generic()), new ElasticsearchBulkDeleterFactory(client), //NORELEASE: this should use Delete-by-query - dataProcessor + dataProcessor, + new PrelertInitializationService(settings, threadPool, clusterService, jobProvider) ); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocator.java index f07b4463301..8fe2e393a91 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobAllocator.java @@ -67,6 +67,10 @@ public class JobAllocator extends AbstractComponent implements ClusterStateListe boolean shouldAllocate(ClusterState current) { PrelertMetadata prelertMetadata = current.getMetaData().custom(PrelertMetadata.TYPE); + if (prelertMetadata == null) { + return false; + } + for (String jobId : prelertMetadata.getJobs().keySet()) { if (prelertMetadata.getAllocations().containsKey(jobId) == false) { return true; @@ -75,32 +79,10 @@ public class JobAllocator extends AbstractComponent implements ClusterStateListe return false; } - boolean prelertMetaDataMissing(ClusterState clusterState) { - return clusterState.getMetaData().custom(PrelertMetadata.TYPE) == null; - } - @Override public void clusterChanged(ClusterChangedEvent event) { if (event.localNodeMaster()) { - if (prelertMetaDataMissing(event.state())) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - clusterService.submitStateUpdateTask("install-prelert-metadata", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - ClusterState.Builder builder = new ClusterState.Builder(currentState); - MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); - metadataBuilder.putCustom(PrelertMetadata.TYPE, PrelertMetadata.PROTO); - builder.metaData(metadataBuilder.build()); - return builder.build(); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error("unable to install prelert metadata upon startup", e); - } - }); - }); - } else if (shouldAllocate(event.state())) { + if (shouldAllocate(event.state())) { threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { clusterService.submitStateUpdateTask("allocate_jobs", new ClusterStateUpdateTask() { @Override diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationService.java new file mode 100644 index 00000000000..30166ffebdc --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationService.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.prelert.job.metadata; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider; + +public class PrelertInitializationService extends AbstractComponent implements ClusterStateListener { + + private final ThreadPool threadPool; + private final ClusterService clusterService; + private final ElasticsearchJobProvider jobProvider; + + public PrelertInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, + ElasticsearchJobProvider jobProvider) { + super(settings); + this.threadPool = threadPool; + this.clusterService = clusterService; + this.jobProvider = jobProvider; + clusterService.add(this); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.localNodeMaster()) { + MetaData metaData = event.state().metaData(); + if (metaData.custom(PrelertMetadata.TYPE) == null) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + clusterService.submitStateUpdateTask("install-prelert-metadata", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + ClusterState.Builder builder = new ClusterState.Builder(currentState); + MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); + metadataBuilder.putCustom(PrelertMetadata.TYPE, PrelertMetadata.PROTO); + builder.metaData(metadataBuilder.build()); + return builder.build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error("unable to install prelert metadata upon startup", e); + } + }); + }); + } + if (metaData.hasIndex(ElasticsearchJobProvider.PRELERT_USAGE_INDEX) == false) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + jobProvider.createUsageMeteringIndex((result, error) -> { + if (result) { + logger.info("successfully created prelert-usage index"); + } else { + logger.error("not able to create prelert-usage index", error); + } + }); + }); + } + } + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobProvider.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobProvider.java index 8cb14af6a78..5f5da1944f6 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobProvider.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobProvider.java @@ -9,20 +9,16 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.Strings; @@ -80,10 +76,9 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; -public class ElasticsearchJobProvider implements JobProvider -{ +public class ElasticsearchJobProvider implements JobProvider { private static final Logger LOGGER = Loggers.getLogger(ElasticsearchJobProvider.class); /** @@ -110,7 +105,7 @@ public class ElasticsearchJobProvider implements JobProvider AnomalyRecord.BY_FIELD_VALUE.getPreferredName(), AnomalyRecord.FIELD_NAME.getPreferredName(), AnomalyRecord.FUNCTION.getPreferredName() - ); + ); private static final int RECORDS_SIZE_PARAM = 500; @@ -125,63 +120,32 @@ public class ElasticsearchJobProvider implements JobProvider this.numberOfReplicas = numberOfReplicas; } - public void initialize() { - LOGGER.info("Connecting to Elasticsearch cluster '" + client.settings().get("cluster.name") - + "'"); - - // This call was added because if we try to connect to Elasticsearch - // while it's doing the recovery operations it does at startup then we - // can get weird effects like indexes being reported as not existing - // when they do. See EL16-182 in Jira. - LOGGER.trace("ES API CALL: wait for yellow status on whole cluster"); - ClusterHealthResponse response = client.admin().cluster() - .prepareHealth() - .setWaitForYellowStatus() - .execute().actionGet(); - - // The wait call above can time out. - // Throw an error if in cluster health is red - if (response.getStatus() == ClusterHealthStatus.RED) { - String msg = "Waited for the Elasticsearch status to be YELLOW but is RED after wait timeout"; - LOGGER.error(msg); - throw new IllegalStateException(msg); - } - - LOGGER.info("Elasticsearch cluster '" + client.settings().get("cluster.name") - + "' now ready to use"); - - - createUsageMeteringIndex(); - } - /** * If the {@value ElasticsearchJobProvider#PRELERT_USAGE_INDEX} index does * not exist then create it here with the usage document mapping. */ - private void createUsageMeteringIndex() { + public void createUsageMeteringIndex(BiConsumer listener) { try { - LOGGER.trace("ES API CALL: index exists? {}", PRELERT_USAGE_INDEX); - boolean indexExists = client.admin().indices() - .exists(new IndicesExistsRequest(PRELERT_USAGE_INDEX)) - .get().isExists(); + LOGGER.info("Creating the internal '{}' index", PRELERT_USAGE_INDEX); + XContentBuilder usageMapping = ElasticsearchMappings.usageMapping(); + LOGGER.trace("ES API CALL: create index {}", PRELERT_USAGE_INDEX); + client.admin().indices().prepareCreate(PRELERT_USAGE_INDEX) + .setSettings(prelertIndexSettings()) + .addMapping(Usage.TYPE, usageMapping) + .execute(new ActionListener() { + @Override + public void onResponse(CreateIndexResponse createIndexResponse) { + listener.accept(true, null); + } - if (indexExists == false) { - LOGGER.info("Creating the internal '" + PRELERT_USAGE_INDEX + "' index"); + @Override + public void onFailure(Exception e) { + listener.accept(false, e); + } + }); - XContentBuilder usageMapping = ElasticsearchMappings.usageMapping(); - - LOGGER.trace("ES API CALL: create index {}", PRELERT_USAGE_INDEX); - client.admin().indices().prepareCreate(PRELERT_USAGE_INDEX) - .setSettings(prelertIndexSettings()) - .addMapping(Usage.TYPE, usageMapping) - .get(); - LOGGER.trace("ES API CALL: wait for yellow status {}", PRELERT_USAGE_INDEX); - client.admin().cluster().prepareHealth(PRELERT_USAGE_INDEX).setWaitForYellowStatus().execute().actionGet(); - } - } catch (InterruptedException | ExecutionException | IOException e) { + } catch (IOException e) { LOGGER.warn("Error checking the usage metering index", e); - } catch (ResourceAlreadyExistsException e) { - LOGGER.debug("Usage metering index already exists", e); } } @@ -191,11 +155,11 @@ public class ElasticsearchJobProvider implements JobProvider * because then the settings can be applied regardless of whether we're * using our own Elasticsearch to store results or a customer's pre-existing * Elasticsearch. + * * @return An Elasticsearch builder initialised with the desired settings * for Prelert indexes. */ - private Settings.Builder prelertIndexSettings() - { + private Settings.Builder prelertIndexSettings() { return Settings.builder() // Our indexes are small and one shard puts the // least possible burden on Elasticsearch @@ -478,22 +442,19 @@ public class ElasticsearchJobProvider implements JobProvider return new QueryPage<>(Collections.singletonList(bucket), 1, Bucket.RESULTS_FIELD); } - final class ScoreTimestamp - { + final class ScoreTimestamp { double score; Date timestamp; - public ScoreTimestamp(Date timestamp, double score) - { + public ScoreTimestamp(Date timestamp, double score) { this.score = score; this.timestamp = timestamp; } } private List partitionScores(String jobId, Object epochStart, - Object epochEnd, String partitionFieldValue) - throws ResourceNotFoundException - { + Object epochEnd, String partitionFieldValue) + throws ResourceNotFoundException { QueryBuilder qb = new ResultsFilterBuilder() .timeRange(ElasticsearchMappings.ES_TIMESTAMP, epochStart, epochEnd) .build(); @@ -517,18 +478,15 @@ public class ElasticsearchJobProvider implements JobProvider List results = new ArrayList<>(); // expect 1 document per bucket - if (searchResponse.getHits().totalHits() > 0) - { + if (searchResponse.getHits().totalHits() > 0) { - Map m = searchResponse.getHits().getAt(0).getSource(); + Map m = searchResponse.getHits().getAt(0).getSource(); @SuppressWarnings("unchecked") List> probs = (List>) - m.get(ReservedFieldNames.PARTITION_NORMALIZED_PROBS); - for (Map prob : probs) - { - if (partitionFieldValue.equals(prob.get(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName()))) - { + m.get(ReservedFieldNames.PARTITION_NORMALIZED_PROBS); + for (Map prob : probs) { + if (partitionFieldValue.equals(prob.get(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName()))) { Date ts = new Date(TimeUtils.dateStringToEpoch((String) m.get(ElasticsearchMappings.ES_TIMESTAMP))); results.add(new ScoreTimestamp(ts, (Double) prob.get(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName()))); @@ -540,8 +498,7 @@ public class ElasticsearchJobProvider implements JobProvider } public int expandBucketForPartitionValue(String jobId, boolean includeInterim, Bucket bucket, - String partitionFieldValue) throws ResourceNotFoundException - { + String partitionFieldValue) throws ResourceNotFoundException { int from = 0; QueryPage page = bucketRecords( @@ -549,8 +506,7 @@ public class ElasticsearchJobProvider implements JobProvider AnomalyRecord.PROBABILITY.getPreferredName(), false, partitionFieldValue); bucket.setRecords(page.results()); - while (page.count() > from + RECORDS_SIZE_PARAM) - { + while (page.count() > from + RECORDS_SIZE_PARAM) { from += RECORDS_SIZE_PARAM; page = bucketRecords( jobId, bucket, from, RECORDS_SIZE_PARAM, includeInterim, @@ -563,8 +519,7 @@ public class ElasticsearchJobProvider implements JobProvider @Override - public BatchedDocumentsIterator newBatchedBucketsIterator(String jobId) - { + public BatchedDocumentsIterator newBatchedBucketsIterator(String jobId) { return new ElasticsearchBatchedBucketsIterator(client, jobId, parseFieldMatcher); } @@ -577,8 +532,7 @@ public class ElasticsearchJobProvider implements JobProvider AnomalyRecord.PROBABILITY.getPreferredName(), false, null); bucket.setRecords(page.results()); - while (page.count() > from + RECORDS_SIZE_PARAM) - { + while (page.count() > from + RECORDS_SIZE_PARAM) { from += RECORDS_SIZE_PARAM; page = bucketRecords( jobId, bucket, from, RECORDS_SIZE_PARAM, includeInterim, @@ -590,10 +544,9 @@ public class ElasticsearchJobProvider implements JobProvider } QueryPage bucketRecords(String jobId, - Bucket bucket, int from, int size, boolean includeInterim, - String sortField, boolean descending, String partitionFieldValue) - throws ResourceNotFoundException - { + Bucket bucket, int from, int size, boolean includeInterim, + String sortField, boolean descending, String partitionFieldValue) + throws ResourceNotFoundException { // Find the records using the time stamp rather than a parent-child // relationship. The parent-child filter involves two queries behind // the scenes, and Elasticsearch documentation claims it's significantly @@ -607,8 +560,7 @@ public class ElasticsearchJobProvider implements JobProvider .build(); FieldSortBuilder sb = null; - if (sortField != null) - { + if (sortField != null) { sb = new FieldSortBuilder(esSortField(sortField)) .missing("_last") .order(descending ? SortOrder.DESC : SortOrder.ASC); @@ -698,13 +650,11 @@ public class ElasticsearchJobProvider implements JobProvider private QueryPage records(String jobId, - int from, int size, QueryBuilder recordFilter, - String sortField, boolean descending) - throws ResourceNotFoundException - { + int from, int size, QueryBuilder recordFilter, + String sortField, boolean descending) + throws ResourceNotFoundException { FieldSortBuilder sb = null; - if (sortField != null) - { + if (sortField != null) { sb = new FieldSortBuilder(esSortField(sortField)) .missing("_last") .order(descending ? SortOrder.DESC : SortOrder.ASC); @@ -718,8 +668,8 @@ public class ElasticsearchJobProvider implements JobProvider * The returned records have their id set. */ private QueryPage records(String jobId, int from, int size, - QueryBuilder recordFilter, FieldSortBuilder sb, List secondarySort, - boolean descending) throws ResourceNotFoundException { + QueryBuilder recordFilter, FieldSortBuilder sb, List secondarySort, + boolean descending) throws ResourceNotFoundException { String indexName = JobResultsPersister.getJobIndexName(jobId); recordFilter = new BoolQueryBuilder() @@ -733,8 +683,7 @@ public class ElasticsearchJobProvider implements JobProvider .addSort(sb == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC) : sb) .setFetchSource(true); // the field option turns off source so request it explicitly - for (String sortField : secondarySort) - { + for (String sortField : secondarySort) { searchBuilder.addSort(esSortField(sortField), descending ? SortOrder.DESC : SortOrder.ASC); } @@ -750,8 +699,7 @@ public class ElasticsearchJobProvider implements JobProvider } List results = new ArrayList<>(); - for (SearchHit hit : searchResponse.getHits().getHits()) - { + for (SearchHit hit : searchResponse.getHits().getHits()) { BytesReference source = hit.getSourceRef(); XContentParser parser; try { @@ -770,9 +718,7 @@ public class ElasticsearchJobProvider implements JobProvider } @Override - public QueryPage influencers(String jobId, InfluencersQuery query) throws ResourceNotFoundException - { - + public QueryPage influencers(String jobId, InfluencersQuery query) throws ResourceNotFoundException { QueryBuilder fb = new ResultsFilterBuilder() .timeRange(ElasticsearchMappings.ES_TIMESTAMP, query.getEpochStart(), query.getEpochEnd()) .score(Bucket.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreFilter()) @@ -784,7 +730,7 @@ public class ElasticsearchJobProvider implements JobProvider } private QueryPage influencers(String jobId, int from, int size, QueryBuilder filterBuilder, String sortField, - boolean sortDescending) throws ResourceNotFoundException { + boolean sortDescending) throws ResourceNotFoundException { String indexName = JobResultsPersister.getJobIndexName(jobId); LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}", () -> Influencer.RESULT_TYPE_VALUE, () -> indexName, @@ -801,19 +747,15 @@ public class ElasticsearchJobProvider implements JobProvider : new FieldSortBuilder(esSortField(sortField)).order(sortDescending ? SortOrder.DESC : SortOrder.ASC); searchRequestBuilder.addSort(sb); - SearchResponse response = null; - try - { + SearchResponse response; + try { response = searchRequestBuilder.get(); - } - catch (IndexNotFoundException e) - { + } catch (IndexNotFoundException e) { throw new ResourceNotFoundException("job " + jobId + " not found"); } List influencers = new ArrayList<>(); - for (SearchHit hit : response.getHits().getHits()) - { + for (SearchHit hit : response.getHits().getHits()) { BytesReference source = hit.getSourceRef(); XContentParser parser; try { @@ -831,101 +773,83 @@ public class ElasticsearchJobProvider implements JobProvider } @Override - public Optional influencer(String jobId, String influencerId) - { + public Optional influencer(String jobId, String influencerId) { throw new IllegalStateException(); } @Override - public BatchedDocumentsIterator newBatchedInfluencersIterator(String jobId) - { + public BatchedDocumentsIterator newBatchedInfluencersIterator(String jobId) { return new ElasticsearchBatchedInfluencersIterator(client, jobId, parseFieldMatcher); } @Override - public BatchedDocumentsIterator newBatchedModelSnapshotIterator(String jobId) - { + public BatchedDocumentsIterator newBatchedModelSnapshotIterator(String jobId) { return new ElasticsearchBatchedModelSnapshotIterator(client, jobId, parseFieldMatcher); } @Override - public BatchedDocumentsIterator newBatchedModelDebugOutputIterator(String jobId) - { + public BatchedDocumentsIterator newBatchedModelDebugOutputIterator(String jobId) { return new ElasticsearchBatchedModelDebugOutputIterator(client, jobId, parseFieldMatcher); } @Override - public BatchedDocumentsIterator newBatchedModelSizeStatsIterator(String jobId) - { + public BatchedDocumentsIterator newBatchedModelSizeStatsIterator(String jobId) { return new ElasticsearchBatchedModelSizeStatsIterator(client, jobId, parseFieldMatcher); } @Override - public Optional getQuantiles(String jobId) - { + public Optional getQuantiles(String jobId) { String indexName = JobResultsPersister.getJobIndexName(jobId); - try - { + try { LOGGER.trace("ES API CALL: get ID " + Quantiles.QUANTILES_ID + " type " + Quantiles.TYPE + " from index " + indexName); GetResponse response = client.prepareGet( indexName, Quantiles.TYPE.getPreferredName(), Quantiles.QUANTILES_ID).get(); - if (!response.isExists()) - { + if (!response.isExists()) { LOGGER.info("There are currently no quantiles for job " + jobId); return Optional.empty(); } return Optional.of(createQuantiles(jobId, response)); - } - catch (IndexNotFoundException e) - { + } catch (IndexNotFoundException e) { LOGGER.error("Missing index when getting quantiles", e); throw e; } } @Override - public QueryPage modelSnapshots(String jobId, int from, int size) - { + public QueryPage modelSnapshots(String jobId, int from, int size) { return modelSnapshots(jobId, from, size, null, null, null, true, null, null); } @Override public QueryPage modelSnapshots(String jobId, int from, int size, String startEpochMs, String endEpochMs, String sortField, boolean sortDescending, - String snapshotId, String description) - { + String snapshotId, String description) { boolean haveId = snapshotId != null && !snapshotId.isEmpty(); boolean haveDescription = description != null && !description.isEmpty(); ResultsFilterBuilder fb; - if (haveId || haveDescription) - { + if (haveId || haveDescription) { BoolQueryBuilder query = QueryBuilders.boolQuery(); - if (haveId) - { + if (haveId) { query.must(QueryBuilders.termQuery(ModelSnapshot.SNAPSHOT_ID.getPreferredName(), snapshotId)); } - if (haveDescription) - { + if (haveDescription) { query.must(QueryBuilders.termQuery(ModelSnapshot.DESCRIPTION.getPreferredName(), description)); } fb = new ResultsFilterBuilder(query); - } - else - { + } else { fb = new ResultsFilterBuilder(); } return modelSnapshots(jobId, from, size, (sortField == null || sortField.isEmpty()) ? ModelSnapshot.RESTORE_PRIORITY.getPreferredName() : sortField, - sortDescending, fb.timeRange( - ElasticsearchMappings.ES_TIMESTAMP, startEpochMs, endEpochMs).build()); + sortDescending, fb.timeRange( + ElasticsearchMappings.ES_TIMESTAMP, startEpochMs, endEpochMs).build()); } private QueryPage modelSnapshots(String jobId, int from, int size, - String sortField, boolean sortDescending, QueryBuilder fb) - { + String sortField, boolean sortDescending, QueryBuilder fb) { FieldSortBuilder sb = new FieldSortBuilder(esSortField(sortField)) .order(sortDescending ? SortOrder.DESC : SortOrder.ASC); @@ -934,8 +858,7 @@ public class ElasticsearchJobProvider implements JobProvider fb = new ConstantScoreQueryBuilder(fb); SearchResponse searchResponse; - try - { + try { String indexName = JobResultsPersister.getJobIndexName(jobId); LOGGER.trace("ES API CALL: search all of type " + ModelSnapshot.TYPE + " from index " + indexName + " sort ascending " + esSortField(sortField) + @@ -946,27 +869,23 @@ public class ElasticsearchJobProvider implements JobProvider .setQuery(fb) .setFrom(from).setSize(size) .get(); - } - catch (IndexNotFoundException e) - { + } catch (IndexNotFoundException e) { LOGGER.error("Failed to read modelSnapshots", e); throw e; } List results = new ArrayList<>(); - for (SearchHit hit : searchResponse.getHits().getHits()) - { + for (SearchHit hit : searchResponse.getHits().getHits()) { // Remove the Kibana/Logstash '@timestamp' entry as stored in Elasticsearch, // and replace using the API 'timestamp' key. Object timestamp = hit.getSource().remove(ElasticsearchMappings.ES_TIMESTAMP); hit.getSource().put(ModelSnapshot.TIMESTAMP.getPreferredName(), timestamp); Object o = hit.getSource().get(ModelSizeStats.TYPE.getPreferredName()); - if (o instanceof Map) - { + if (o instanceof Map) { @SuppressWarnings("unchecked") - Map map = (Map)o; + Map map = (Map) o; Object ts = map.remove(ElasticsearchMappings.ES_TIMESTAMP); map.put(ModelSizeStats.TIMESTAMP_FIELD.getPreferredName(), ts); } @@ -1050,8 +969,7 @@ public class ElasticsearchJobProvider implements JobProvider throw new ElasticsearchParseException("failed to parser quantiles", e); } Quantiles quantiles = Quantiles.PARSER.apply(parser, () -> parseFieldMatcher); - if (quantiles.getQuantileState() == null) - { + if (quantiles.getQuantileState() == null) { LOGGER.error("Inconsistency - no " + Quantiles.QUANTILE_STATE + " field in quantiles for job " + jobId); } @@ -1092,8 +1010,7 @@ public class ElasticsearchJobProvider implements JobProvider @Override public Optional getList(String listId) { GetResponse response = client.prepareGet(PRELERT_INFO_INDEX, ListDocument.TYPE.getPreferredName(), listId).get(); - if (!response.isExists()) - { + if (!response.isExists()) { return Optional.empty(); } BytesReference source = response.getSourceAsBytesRef(); @@ -1108,8 +1025,7 @@ public class ElasticsearchJobProvider implements JobProvider } @Override - public Auditor audit(String jobId) - { + public Auditor audit(String jobId) { // NORELEASE Create proper auditor or remove // return new ElasticsearchAuditor(client, PRELERT_INFO_INDEX, jobId); return new Auditor() { @@ -1136,8 +1052,7 @@ public class ElasticsearchJobProvider implements JobProvider } - private String esSortField(String sortField) - { + private String esSortField(String sortField) { // Beware: There's an assumption here that Bucket.TIMESTAMP, // AnomalyRecord.TIMESTAMP, Influencer.TIMESTAMP and // ModelSnapshot.TIMESTAMP are all the same diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationServiceTests.java new file mode 100644 index 00000000000..9eafdb4428d --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationServiceTests.java @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.prelert.job.metadata; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider; + +import java.util.concurrent.ExecutorService; + +import static org.elasticsearch.mock.orig.Mockito.doAnswer; +import static org.elasticsearch.mock.orig.Mockito.times; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class PrelertInitializationServiceTests extends ESTestCase { + + public void testInitialize() { + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executorService = mock(ExecutorService.class); + doAnswer(invocation -> { + ((Runnable) invocation.getArguments()[0]).run(); + return null; + }).when(executorService).execute(any(Runnable.class)); + when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); + + ClusterService clusterService = mock(ClusterService.class); + ElasticsearchJobProvider jobProvider = mock(ElasticsearchJobProvider.class); + PrelertInitializationService initializationService = + new PrelertInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider); + + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) + .metaData(MetaData.builder()) + .build(); + initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); + + verify(clusterService, times(1)).submitStateUpdateTask(eq("install-prelert-metadata"), any()); + verify(jobProvider, times(1)).createUsageMeteringIndex(any()); + } + + public void testInitialize_noMasterNode() { + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executorService = mock(ExecutorService.class); + doAnswer(invocation -> { + ((Runnable) invocation.getArguments()[0]).run(); + return null; + }).when(executorService).execute(any(Runnable.class)); + when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); + + ClusterService clusterService = mock(ClusterService.class); + ElasticsearchJobProvider jobProvider = mock(ElasticsearchJobProvider.class); + PrelertInitializationService initializationService = + new PrelertInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider); + + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))) + .metaData(MetaData.builder()) + .build(); + initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); + + verify(clusterService, times(0)).submitStateUpdateTask(eq("install-prelert-metadata"), any()); + verify(jobProvider, times(0)).createUsageMeteringIndex(any()); + } + + public void testInitialize_alreadyInitialized() { + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executorService = mock(ExecutorService.class); + doAnswer(invocation -> { + ((Runnable) invocation.getArguments()[0]).run(); + return null; + }).when(executorService).execute(any(Runnable.class)); + when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); + + ClusterService clusterService = mock(ClusterService.class); + ElasticsearchJobProvider jobProvider = mock(ElasticsearchJobProvider.class); + PrelertInitializationService initializationService = + new PrelertInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider); + + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) + .metaData(MetaData.builder() + .put(IndexMetaData.builder(ElasticsearchJobProvider.PRELERT_USAGE_INDEX).settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + )) + .putCustom(PrelertMetadata.TYPE, new PrelertMetadata.Builder().build())) + .build(); + initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); + + verify(clusterService, times(0)).submitStateUpdateTask(eq("install-prelert-metadata"), any()); + verify(jobProvider, times(0)).createUsageMeteringIndex(any()); + } + +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobProviderTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobProviderTests.java index 1dbf98f93f1..0980c40fd4a 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobProviderTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/ElasticsearchJobProviderTests.java @@ -133,7 +133,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase { .addClusterStatusYellowResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX); Client client = clientBuilder.build(); ElasticsearchJobProvider provider = createProvider(client); - provider.initialize(); + provider.createUsageMeteringIndex((result, error) -> logger.info("result={}", result)); clientBuilder.verifyIndexCreated(ElasticsearchJobProvider.PRELERT_USAGE_INDEX); }