Create usage index upon startup.
Also moved all creation logic that is required to run at cluster startup into PrelertInitializationService. Original commit: elastic/x-pack-elasticsearch@453ba3efa3
This commit is contained in:
parent
3362d5c965
commit
2e78706a3f
|
@ -56,6 +56,7 @@ import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager;
|
|||
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
|
||||
import org.elasticsearch.xpack.prelert.job.metadata.JobAllocator;
|
||||
import org.elasticsearch.xpack.prelert.job.metadata.JobLifeCycleService;
|
||||
import org.elasticsearch.xpack.prelert.job.metadata.PrelertInitializationService;
|
||||
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleterFactory;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
|
||||
|
@ -180,7 +181,8 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
|
|||
new JobAllocator(settings, clusterService, threadPool),
|
||||
new JobLifeCycleService(settings, client, clusterService, scheduledJobService, dataProcessor, threadPool.generic()),
|
||||
new ElasticsearchBulkDeleterFactory(client), //NORELEASE: this should use Delete-by-query
|
||||
dataProcessor
|
||||
dataProcessor,
|
||||
new PrelertInitializationService(settings, threadPool, clusterService, jobProvider)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -67,6 +67,10 @@ public class JobAllocator extends AbstractComponent implements ClusterStateListe
|
|||
|
||||
boolean shouldAllocate(ClusterState current) {
|
||||
PrelertMetadata prelertMetadata = current.getMetaData().custom(PrelertMetadata.TYPE);
|
||||
if (prelertMetadata == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (String jobId : prelertMetadata.getJobs().keySet()) {
|
||||
if (prelertMetadata.getAllocations().containsKey(jobId) == false) {
|
||||
return true;
|
||||
|
@ -75,32 +79,10 @@ public class JobAllocator extends AbstractComponent implements ClusterStateListe
|
|||
return false;
|
||||
}
|
||||
|
||||
boolean prelertMetaDataMissing(ClusterState clusterState) {
|
||||
return clusterState.getMetaData().custom(PrelertMetadata.TYPE) == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.localNodeMaster()) {
|
||||
if (prelertMetaDataMissing(event.state())) {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
clusterService.submitStateUpdateTask("install-prelert-metadata", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
ClusterState.Builder builder = new ClusterState.Builder(currentState);
|
||||
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
|
||||
metadataBuilder.putCustom(PrelertMetadata.TYPE, PrelertMetadata.PROTO);
|
||||
builder.metaData(metadataBuilder.build());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.error("unable to install prelert metadata upon startup", e);
|
||||
}
|
||||
});
|
||||
});
|
||||
} else if (shouldAllocate(event.state())) {
|
||||
if (shouldAllocate(event.state())) {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
clusterService.submitStateUpdateTask("allocate_jobs", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.prelert.job.metadata;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider;
|
||||
|
||||
public class PrelertInitializationService extends AbstractComponent implements ClusterStateListener {
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final ClusterService clusterService;
|
||||
private final ElasticsearchJobProvider jobProvider;
|
||||
|
||||
public PrelertInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
ElasticsearchJobProvider jobProvider) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.jobProvider = jobProvider;
|
||||
clusterService.add(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.localNodeMaster()) {
|
||||
MetaData metaData = event.state().metaData();
|
||||
if (metaData.custom(PrelertMetadata.TYPE) == null) {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
clusterService.submitStateUpdateTask("install-prelert-metadata", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
ClusterState.Builder builder = new ClusterState.Builder(currentState);
|
||||
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
|
||||
metadataBuilder.putCustom(PrelertMetadata.TYPE, PrelertMetadata.PROTO);
|
||||
builder.metaData(metadataBuilder.build());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.error("unable to install prelert metadata upon startup", e);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
if (metaData.hasIndex(ElasticsearchJobProvider.PRELERT_USAGE_INDEX) == false) {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
jobProvider.createUsageMeteringIndex((result, error) -> {
|
||||
if (result) {
|
||||
logger.info("successfully created prelert-usage index");
|
||||
} else {
|
||||
logger.error("not able to create prelert-usage index", error);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -9,20 +9,16 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.ResourceAlreadyExistsException;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -80,10 +76,9 @@ import java.util.Locale;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
public class ElasticsearchJobProvider implements JobProvider
|
||||
{
|
||||
public class ElasticsearchJobProvider implements JobProvider {
|
||||
private static final Logger LOGGER = Loggers.getLogger(ElasticsearchJobProvider.class);
|
||||
|
||||
/**
|
||||
|
@ -125,63 +120,32 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
this.numberOfReplicas = numberOfReplicas;
|
||||
}
|
||||
|
||||
public void initialize() {
|
||||
LOGGER.info("Connecting to Elasticsearch cluster '" + client.settings().get("cluster.name")
|
||||
+ "'");
|
||||
|
||||
// This call was added because if we try to connect to Elasticsearch
|
||||
// while it's doing the recovery operations it does at startup then we
|
||||
// can get weird effects like indexes being reported as not existing
|
||||
// when they do. See EL16-182 in Jira.
|
||||
LOGGER.trace("ES API CALL: wait for yellow status on whole cluster");
|
||||
ClusterHealthResponse response = client.admin().cluster()
|
||||
.prepareHealth()
|
||||
.setWaitForYellowStatus()
|
||||
.execute().actionGet();
|
||||
|
||||
// The wait call above can time out.
|
||||
// Throw an error if in cluster health is red
|
||||
if (response.getStatus() == ClusterHealthStatus.RED) {
|
||||
String msg = "Waited for the Elasticsearch status to be YELLOW but is RED after wait timeout";
|
||||
LOGGER.error(msg);
|
||||
throw new IllegalStateException(msg);
|
||||
}
|
||||
|
||||
LOGGER.info("Elasticsearch cluster '" + client.settings().get("cluster.name")
|
||||
+ "' now ready to use");
|
||||
|
||||
|
||||
createUsageMeteringIndex();
|
||||
}
|
||||
|
||||
/**
|
||||
* If the {@value ElasticsearchJobProvider#PRELERT_USAGE_INDEX} index does
|
||||
* not exist then create it here with the usage document mapping.
|
||||
*/
|
||||
private void createUsageMeteringIndex() {
|
||||
public void createUsageMeteringIndex(BiConsumer<Boolean, Exception> listener) {
|
||||
try {
|
||||
LOGGER.trace("ES API CALL: index exists? {}", PRELERT_USAGE_INDEX);
|
||||
boolean indexExists = client.admin().indices()
|
||||
.exists(new IndicesExistsRequest(PRELERT_USAGE_INDEX))
|
||||
.get().isExists();
|
||||
|
||||
if (indexExists == false) {
|
||||
LOGGER.info("Creating the internal '" + PRELERT_USAGE_INDEX + "' index");
|
||||
|
||||
LOGGER.info("Creating the internal '{}' index", PRELERT_USAGE_INDEX);
|
||||
XContentBuilder usageMapping = ElasticsearchMappings.usageMapping();
|
||||
|
||||
LOGGER.trace("ES API CALL: create index {}", PRELERT_USAGE_INDEX);
|
||||
client.admin().indices().prepareCreate(PRELERT_USAGE_INDEX)
|
||||
.setSettings(prelertIndexSettings())
|
||||
.addMapping(Usage.TYPE, usageMapping)
|
||||
.get();
|
||||
LOGGER.trace("ES API CALL: wait for yellow status {}", PRELERT_USAGE_INDEX);
|
||||
client.admin().cluster().prepareHealth(PRELERT_USAGE_INDEX).setWaitForYellowStatus().execute().actionGet();
|
||||
.execute(new ActionListener<CreateIndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(CreateIndexResponse createIndexResponse) {
|
||||
listener.accept(true, null);
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException | IOException e) {
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.accept(false, e);
|
||||
}
|
||||
});
|
||||
|
||||
} catch (IOException e) {
|
||||
LOGGER.warn("Error checking the usage metering index", e);
|
||||
} catch (ResourceAlreadyExistsException e) {
|
||||
LOGGER.debug("Usage metering index already exists", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -191,11 +155,11 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
* because then the settings can be applied regardless of whether we're
|
||||
* using our own Elasticsearch to store results or a customer's pre-existing
|
||||
* Elasticsearch.
|
||||
*
|
||||
* @return An Elasticsearch builder initialised with the desired settings
|
||||
* for Prelert indexes.
|
||||
*/
|
||||
private Settings.Builder prelertIndexSettings()
|
||||
{
|
||||
private Settings.Builder prelertIndexSettings() {
|
||||
return Settings.builder()
|
||||
// Our indexes are small and one shard puts the
|
||||
// least possible burden on Elasticsearch
|
||||
|
@ -478,13 +442,11 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
return new QueryPage<>(Collections.singletonList(bucket), 1, Bucket.RESULTS_FIELD);
|
||||
}
|
||||
|
||||
final class ScoreTimestamp
|
||||
{
|
||||
final class ScoreTimestamp {
|
||||
double score;
|
||||
Date timestamp;
|
||||
|
||||
public ScoreTimestamp(Date timestamp, double score)
|
||||
{
|
||||
public ScoreTimestamp(Date timestamp, double score) {
|
||||
this.score = score;
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
@ -492,8 +454,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
|
||||
private List<ScoreTimestamp> partitionScores(String jobId, Object epochStart,
|
||||
Object epochEnd, String partitionFieldValue)
|
||||
throws ResourceNotFoundException
|
||||
{
|
||||
throws ResourceNotFoundException {
|
||||
QueryBuilder qb = new ResultsFilterBuilder()
|
||||
.timeRange(ElasticsearchMappings.ES_TIMESTAMP, epochStart, epochEnd)
|
||||
.build();
|
||||
|
@ -517,18 +478,15 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
List<ScoreTimestamp> results = new ArrayList<>();
|
||||
|
||||
// expect 1 document per bucket
|
||||
if (searchResponse.getHits().totalHits() > 0)
|
||||
{
|
||||
if (searchResponse.getHits().totalHits() > 0) {
|
||||
|
||||
Map<String, Object> m = searchResponse.getHits().getAt(0).getSource();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Map<String, Object>> probs = (List<Map<String, Object>>)
|
||||
m.get(ReservedFieldNames.PARTITION_NORMALIZED_PROBS);
|
||||
for (Map<String, Object> prob : probs)
|
||||
{
|
||||
if (partitionFieldValue.equals(prob.get(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())))
|
||||
{
|
||||
for (Map<String, Object> prob : probs) {
|
||||
if (partitionFieldValue.equals(prob.get(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName()))) {
|
||||
Date ts = new Date(TimeUtils.dateStringToEpoch((String) m.get(ElasticsearchMappings.ES_TIMESTAMP)));
|
||||
results.add(new ScoreTimestamp(ts,
|
||||
(Double) prob.get(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName())));
|
||||
|
@ -540,8 +498,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
}
|
||||
|
||||
public int expandBucketForPartitionValue(String jobId, boolean includeInterim, Bucket bucket,
|
||||
String partitionFieldValue) throws ResourceNotFoundException
|
||||
{
|
||||
String partitionFieldValue) throws ResourceNotFoundException {
|
||||
int from = 0;
|
||||
|
||||
QueryPage<AnomalyRecord> page = bucketRecords(
|
||||
|
@ -549,8 +506,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
AnomalyRecord.PROBABILITY.getPreferredName(), false, partitionFieldValue);
|
||||
bucket.setRecords(page.results());
|
||||
|
||||
while (page.count() > from + RECORDS_SIZE_PARAM)
|
||||
{
|
||||
while (page.count() > from + RECORDS_SIZE_PARAM) {
|
||||
from += RECORDS_SIZE_PARAM;
|
||||
page = bucketRecords(
|
||||
jobId, bucket, from, RECORDS_SIZE_PARAM, includeInterim,
|
||||
|
@ -563,8 +519,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
|
||||
|
||||
@Override
|
||||
public BatchedDocumentsIterator<Bucket> newBatchedBucketsIterator(String jobId)
|
||||
{
|
||||
public BatchedDocumentsIterator<Bucket> newBatchedBucketsIterator(String jobId) {
|
||||
return new ElasticsearchBatchedBucketsIterator(client, jobId, parseFieldMatcher);
|
||||
}
|
||||
|
||||
|
@ -577,8 +532,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
AnomalyRecord.PROBABILITY.getPreferredName(), false, null);
|
||||
bucket.setRecords(page.results());
|
||||
|
||||
while (page.count() > from + RECORDS_SIZE_PARAM)
|
||||
{
|
||||
while (page.count() > from + RECORDS_SIZE_PARAM) {
|
||||
from += RECORDS_SIZE_PARAM;
|
||||
page = bucketRecords(
|
||||
jobId, bucket, from, RECORDS_SIZE_PARAM, includeInterim,
|
||||
|
@ -592,8 +546,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
QueryPage<AnomalyRecord> bucketRecords(String jobId,
|
||||
Bucket bucket, int from, int size, boolean includeInterim,
|
||||
String sortField, boolean descending, String partitionFieldValue)
|
||||
throws ResourceNotFoundException
|
||||
{
|
||||
throws ResourceNotFoundException {
|
||||
// Find the records using the time stamp rather than a parent-child
|
||||
// relationship. The parent-child filter involves two queries behind
|
||||
// the scenes, and Elasticsearch documentation claims it's significantly
|
||||
|
@ -607,8 +560,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
.build();
|
||||
|
||||
FieldSortBuilder sb = null;
|
||||
if (sortField != null)
|
||||
{
|
||||
if (sortField != null) {
|
||||
sb = new FieldSortBuilder(esSortField(sortField))
|
||||
.missing("_last")
|
||||
.order(descending ? SortOrder.DESC : SortOrder.ASC);
|
||||
|
@ -700,11 +652,9 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
private QueryPage<AnomalyRecord> records(String jobId,
|
||||
int from, int size, QueryBuilder recordFilter,
|
||||
String sortField, boolean descending)
|
||||
throws ResourceNotFoundException
|
||||
{
|
||||
throws ResourceNotFoundException {
|
||||
FieldSortBuilder sb = null;
|
||||
if (sortField != null)
|
||||
{
|
||||
if (sortField != null) {
|
||||
sb = new FieldSortBuilder(esSortField(sortField))
|
||||
.missing("_last")
|
||||
.order(descending ? SortOrder.DESC : SortOrder.ASC);
|
||||
|
@ -733,8 +683,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
.addSort(sb == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC) : sb)
|
||||
.setFetchSource(true); // the field option turns off source so request it explicitly
|
||||
|
||||
for (String sortField : secondarySort)
|
||||
{
|
||||
for (String sortField : secondarySort) {
|
||||
searchBuilder.addSort(esSortField(sortField), descending ? SortOrder.DESC : SortOrder.ASC);
|
||||
}
|
||||
|
||||
|
@ -750,8 +699,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
}
|
||||
|
||||
List<AnomalyRecord> results = new ArrayList<>();
|
||||
for (SearchHit hit : searchResponse.getHits().getHits())
|
||||
{
|
||||
for (SearchHit hit : searchResponse.getHits().getHits()) {
|
||||
BytesReference source = hit.getSourceRef();
|
||||
XContentParser parser;
|
||||
try {
|
||||
|
@ -770,9 +718,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
}
|
||||
|
||||
@Override
|
||||
public QueryPage<Influencer> influencers(String jobId, InfluencersQuery query) throws ResourceNotFoundException
|
||||
{
|
||||
|
||||
public QueryPage<Influencer> influencers(String jobId, InfluencersQuery query) throws ResourceNotFoundException {
|
||||
QueryBuilder fb = new ResultsFilterBuilder()
|
||||
.timeRange(ElasticsearchMappings.ES_TIMESTAMP, query.getEpochStart(), query.getEpochEnd())
|
||||
.score(Bucket.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreFilter())
|
||||
|
@ -801,19 +747,15 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
: new FieldSortBuilder(esSortField(sortField)).order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
|
||||
searchRequestBuilder.addSort(sb);
|
||||
|
||||
SearchResponse response = null;
|
||||
try
|
||||
{
|
||||
SearchResponse response;
|
||||
try {
|
||||
response = searchRequestBuilder.get();
|
||||
}
|
||||
catch (IndexNotFoundException e)
|
||||
{
|
||||
} catch (IndexNotFoundException e) {
|
||||
throw new ResourceNotFoundException("job " + jobId + " not found");
|
||||
}
|
||||
|
||||
List<Influencer> influencers = new ArrayList<>();
|
||||
for (SearchHit hit : response.getHits().getHits())
|
||||
{
|
||||
for (SearchHit hit : response.getHits().getHits()) {
|
||||
BytesReference source = hit.getSourceRef();
|
||||
XContentParser parser;
|
||||
try {
|
||||
|
@ -831,89 +773,72 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
}
|
||||
|
||||
@Override
|
||||
public Optional<Influencer> influencer(String jobId, String influencerId)
|
||||
{
|
||||
public Optional<Influencer> influencer(String jobId, String influencerId) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BatchedDocumentsIterator<Influencer> newBatchedInfluencersIterator(String jobId)
|
||||
{
|
||||
public BatchedDocumentsIterator<Influencer> newBatchedInfluencersIterator(String jobId) {
|
||||
return new ElasticsearchBatchedInfluencersIterator(client, jobId, parseFieldMatcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BatchedDocumentsIterator<ModelSnapshot> newBatchedModelSnapshotIterator(String jobId)
|
||||
{
|
||||
public BatchedDocumentsIterator<ModelSnapshot> newBatchedModelSnapshotIterator(String jobId) {
|
||||
return new ElasticsearchBatchedModelSnapshotIterator(client, jobId, parseFieldMatcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BatchedDocumentsIterator<ModelDebugOutput> newBatchedModelDebugOutputIterator(String jobId)
|
||||
{
|
||||
public BatchedDocumentsIterator<ModelDebugOutput> newBatchedModelDebugOutputIterator(String jobId) {
|
||||
return new ElasticsearchBatchedModelDebugOutputIterator(client, jobId, parseFieldMatcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BatchedDocumentsIterator<ModelSizeStats> newBatchedModelSizeStatsIterator(String jobId)
|
||||
{
|
||||
public BatchedDocumentsIterator<ModelSizeStats> newBatchedModelSizeStatsIterator(String jobId) {
|
||||
return new ElasticsearchBatchedModelSizeStatsIterator(client, jobId, parseFieldMatcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Quantiles> getQuantiles(String jobId)
|
||||
{
|
||||
public Optional<Quantiles> getQuantiles(String jobId) {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
try
|
||||
{
|
||||
try {
|
||||
LOGGER.trace("ES API CALL: get ID " + Quantiles.QUANTILES_ID +
|
||||
" type " + Quantiles.TYPE + " from index " + indexName);
|
||||
GetResponse response = client.prepareGet(
|
||||
indexName, Quantiles.TYPE.getPreferredName(), Quantiles.QUANTILES_ID).get();
|
||||
if (!response.isExists())
|
||||
{
|
||||
if (!response.isExists()) {
|
||||
LOGGER.info("There are currently no quantiles for job " + jobId);
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of(createQuantiles(jobId, response));
|
||||
}
|
||||
catch (IndexNotFoundException e)
|
||||
{
|
||||
} catch (IndexNotFoundException e) {
|
||||
LOGGER.error("Missing index when getting quantiles", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryPage<ModelSnapshot> modelSnapshots(String jobId, int from, int size)
|
||||
{
|
||||
public QueryPage<ModelSnapshot> modelSnapshots(String jobId, int from, int size) {
|
||||
return modelSnapshots(jobId, from, size, null, null, null, true, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryPage<ModelSnapshot> modelSnapshots(String jobId, int from, int size,
|
||||
String startEpochMs, String endEpochMs, String sortField, boolean sortDescending,
|
||||
String snapshotId, String description)
|
||||
{
|
||||
String snapshotId, String description) {
|
||||
boolean haveId = snapshotId != null && !snapshotId.isEmpty();
|
||||
boolean haveDescription = description != null && !description.isEmpty();
|
||||
ResultsFilterBuilder fb;
|
||||
if (haveId || haveDescription)
|
||||
{
|
||||
if (haveId || haveDescription) {
|
||||
BoolQueryBuilder query = QueryBuilders.boolQuery();
|
||||
if (haveId)
|
||||
{
|
||||
if (haveId) {
|
||||
query.must(QueryBuilders.termQuery(ModelSnapshot.SNAPSHOT_ID.getPreferredName(), snapshotId));
|
||||
}
|
||||
if (haveDescription)
|
||||
{
|
||||
if (haveDescription) {
|
||||
query.must(QueryBuilders.termQuery(ModelSnapshot.DESCRIPTION.getPreferredName(), description));
|
||||
}
|
||||
|
||||
fb = new ResultsFilterBuilder(query);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
fb = new ResultsFilterBuilder();
|
||||
}
|
||||
|
||||
|
@ -924,8 +849,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
}
|
||||
|
||||
private QueryPage<ModelSnapshot> modelSnapshots(String jobId, int from, int size,
|
||||
String sortField, boolean sortDescending, QueryBuilder fb)
|
||||
{
|
||||
String sortField, boolean sortDescending, QueryBuilder fb) {
|
||||
FieldSortBuilder sb = new FieldSortBuilder(esSortField(sortField))
|
||||
.order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
|
||||
|
||||
|
@ -934,8 +858,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
fb = new ConstantScoreQueryBuilder(fb);
|
||||
|
||||
SearchResponse searchResponse;
|
||||
try
|
||||
{
|
||||
try {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
LOGGER.trace("ES API CALL: search all of type " + ModelSnapshot.TYPE +
|
||||
" from index " + indexName + " sort ascending " + esSortField(sortField) +
|
||||
|
@ -946,27 +869,23 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
.setQuery(fb)
|
||||
.setFrom(from).setSize(size)
|
||||
.get();
|
||||
}
|
||||
catch (IndexNotFoundException e)
|
||||
{
|
||||
} catch (IndexNotFoundException e) {
|
||||
LOGGER.error("Failed to read modelSnapshots", e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
List<ModelSnapshot> results = new ArrayList<>();
|
||||
|
||||
for (SearchHit hit : searchResponse.getHits().getHits())
|
||||
{
|
||||
for (SearchHit hit : searchResponse.getHits().getHits()) {
|
||||
// Remove the Kibana/Logstash '@timestamp' entry as stored in Elasticsearch,
|
||||
// and replace using the API 'timestamp' key.
|
||||
Object timestamp = hit.getSource().remove(ElasticsearchMappings.ES_TIMESTAMP);
|
||||
hit.getSource().put(ModelSnapshot.TIMESTAMP.getPreferredName(), timestamp);
|
||||
|
||||
Object o = hit.getSource().get(ModelSizeStats.TYPE.getPreferredName());
|
||||
if (o instanceof Map)
|
||||
{
|
||||
if (o instanceof Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> map = (Map<String, Object>)o;
|
||||
Map<String, Object> map = (Map<String, Object>) o;
|
||||
Object ts = map.remove(ElasticsearchMappings.ES_TIMESTAMP);
|
||||
map.put(ModelSizeStats.TIMESTAMP_FIELD.getPreferredName(), ts);
|
||||
}
|
||||
|
@ -1050,8 +969,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
throw new ElasticsearchParseException("failed to parser quantiles", e);
|
||||
}
|
||||
Quantiles quantiles = Quantiles.PARSER.apply(parser, () -> parseFieldMatcher);
|
||||
if (quantiles.getQuantileState() == null)
|
||||
{
|
||||
if (quantiles.getQuantileState() == null) {
|
||||
LOGGER.error("Inconsistency - no " + Quantiles.QUANTILE_STATE
|
||||
+ " field in quantiles for job " + jobId);
|
||||
}
|
||||
|
@ -1092,8 +1010,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
@Override
|
||||
public Optional<ListDocument> getList(String listId) {
|
||||
GetResponse response = client.prepareGet(PRELERT_INFO_INDEX, ListDocument.TYPE.getPreferredName(), listId).get();
|
||||
if (!response.isExists())
|
||||
{
|
||||
if (!response.isExists()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
BytesReference source = response.getSourceAsBytesRef();
|
||||
|
@ -1108,8 +1025,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
}
|
||||
|
||||
@Override
|
||||
public Auditor audit(String jobId)
|
||||
{
|
||||
public Auditor audit(String jobId) {
|
||||
// NORELEASE Create proper auditor or remove
|
||||
// return new ElasticsearchAuditor(client, PRELERT_INFO_INDEX, jobId);
|
||||
return new Auditor() {
|
||||
|
@ -1136,8 +1052,7 @@ public class ElasticsearchJobProvider implements JobProvider
|
|||
|
||||
}
|
||||
|
||||
private String esSortField(String sortField)
|
||||
{
|
||||
private String esSortField(String sortField) {
|
||||
// Beware: There's an assumption here that Bucket.TIMESTAMP,
|
||||
// AnomalyRecord.TIMESTAMP, Influencer.TIMESTAMP and
|
||||
// ModelSnapshot.TIMESTAMP are all the same
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.prelert.job.metadata;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
|
||||
import static org.elasticsearch.mock.orig.Mockito.times;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class PrelertInitializationServiceTests extends ESTestCase {
|
||||
|
||||
public void testInitialize() {
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
ExecutorService executorService = mock(ExecutorService.class);
|
||||
doAnswer(invocation -> {
|
||||
((Runnable) invocation.getArguments()[0]).run();
|
||||
return null;
|
||||
}).when(executorService).execute(any(Runnable.class));
|
||||
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
ElasticsearchJobProvider jobProvider = mock(ElasticsearchJobProvider.class);
|
||||
PrelertInitializationService initializationService =
|
||||
new PrelertInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider);
|
||||
|
||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))
|
||||
.localNodeId("_node_id")
|
||||
.masterNodeId("_node_id"))
|
||||
.metaData(MetaData.builder())
|
||||
.build();
|
||||
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
|
||||
|
||||
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-prelert-metadata"), any());
|
||||
verify(jobProvider, times(1)).createUsageMeteringIndex(any());
|
||||
}
|
||||
|
||||
public void testInitialize_noMasterNode() {
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
ExecutorService executorService = mock(ExecutorService.class);
|
||||
doAnswer(invocation -> {
|
||||
((Runnable) invocation.getArguments()[0]).run();
|
||||
return null;
|
||||
}).when(executorService).execute(any(Runnable.class));
|
||||
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
ElasticsearchJobProvider jobProvider = mock(ElasticsearchJobProvider.class);
|
||||
PrelertInitializationService initializationService =
|
||||
new PrelertInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider);
|
||||
|
||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT)))
|
||||
.metaData(MetaData.builder())
|
||||
.build();
|
||||
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
|
||||
|
||||
verify(clusterService, times(0)).submitStateUpdateTask(eq("install-prelert-metadata"), any());
|
||||
verify(jobProvider, times(0)).createUsageMeteringIndex(any());
|
||||
}
|
||||
|
||||
public void testInitialize_alreadyInitialized() {
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
ExecutorService executorService = mock(ExecutorService.class);
|
||||
doAnswer(invocation -> {
|
||||
((Runnable) invocation.getArguments()[0]).run();
|
||||
return null;
|
||||
}).when(executorService).execute(any(Runnable.class));
|
||||
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
ElasticsearchJobProvider jobProvider = mock(ElasticsearchJobProvider.class);
|
||||
PrelertInitializationService initializationService =
|
||||
new PrelertInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider);
|
||||
|
||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))
|
||||
.localNodeId("_node_id")
|
||||
.masterNodeId("_node_id"))
|
||||
.metaData(MetaData.builder()
|
||||
.put(IndexMetaData.builder(ElasticsearchJobProvider.PRELERT_USAGE_INDEX).settings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
))
|
||||
.putCustom(PrelertMetadata.TYPE, new PrelertMetadata.Builder().build()))
|
||||
.build();
|
||||
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
|
||||
|
||||
verify(clusterService, times(0)).submitStateUpdateTask(eq("install-prelert-metadata"), any());
|
||||
verify(jobProvider, times(0)).createUsageMeteringIndex(any());
|
||||
}
|
||||
|
||||
}
|
|
@ -133,7 +133,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
|
|||
.addClusterStatusYellowResponse(ElasticsearchJobProvider.PRELERT_USAGE_INDEX);
|
||||
Client client = clientBuilder.build();
|
||||
ElasticsearchJobProvider provider = createProvider(client);
|
||||
provider.initialize();
|
||||
provider.createUsageMeteringIndex((result, error) -> logger.info("result={}", result));
|
||||
clientBuilder.verifyIndexCreated(ElasticsearchJobProvider.PRELERT_USAGE_INDEX);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue