Put model state in the .mlstate-anomalydetectors index (elastic/elasticsearch#589)

* Put model state in the .mlstate index

* Revert results index rename

* Put ModelSnapshots in the results index

* Change state index in C++

* Fix logging

* Rename state index ‘.ml-state’

Original commit: elastic/x-pack-elasticsearch@dbe5f6b525
This commit is contained in:
David Kyle 2016-12-21 12:03:17 +00:00 committed by GitHub
parent 3e494070bf
commit 6de846d4c6
25 changed files with 319 additions and 181 deletions

View File

@ -278,8 +278,7 @@ PutModelSnapshotDescriptionAction.RequestBuilder> {
modelSnapshot.setDescription(request.getDescriptionString());
// The quantiles can be large, and totally dominate the output -
// it's
// clearer to remove them
// it's clearer to remove them
modelSnapshot.setQuantiles(null);
listener.onResponse(new Response(modelSnapshot));

View File

@ -166,7 +166,7 @@ public class JobManager extends AbstractComponent {
Job job = request.getJob();
ActionListener<Boolean> delegateListener = ActionListener.wrap(jobSaved ->
jobProvider.createJobRelatedIndices(job, new ActionListener<Boolean>() {
jobProvider.createJobResultIndex(job, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean indicesCreated) {
audit(job.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED));
@ -195,12 +195,11 @@ public class JobManager extends AbstractComponent {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState cs = updateClusterState(job, request.isOverwrite(), currentState);
if (currentState.metaData().index(AnomalyDetectorsIndex.getJobIndexName(job.getIndexName())) != null) {
if (currentState.metaData().index(AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName())) != null) {
throw new ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_INDEX_ALREADY_EXISTS, job.getIndexName()));
}
return cs;
}
});
}
@ -390,7 +389,7 @@ public class JobManager extends AbstractComponent {
}
// Commit so that when the REST API call that triggered the update
// returns the updated document is searchable
jobResultsPersister.commitWrites(jobId);
jobResultsPersister.commitStateWrites(jobId);
}
private static PrelertMetadata.Builder createPrelertMetadataBuilder(ClusterState currentState) {

View File

@ -15,6 +15,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
public class PrelertInitializationService extends AbstractComponent implements ClusterStateListener {
@ -62,7 +63,7 @@ public class PrelertInitializationService extends AbstractComponent implements C
logger.info("successfully created prelert-usage index");
} else {
if (error instanceof ResourceAlreadyExistsException) {
logger.debug("not able to create prelert-usage index", error);
logger.debug("not able to create prelert-usage index as it already exists");
} else {
logger.error("not able to create prelert-usage index", error);
}
@ -70,6 +71,22 @@ public class PrelertInitializationService extends AbstractComponent implements C
});
});
}
String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName();
if (metaData.hasIndex(stateIndexName) == false) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
jobProvider.createJobStateIndex((result, error) -> {
if (result) {
logger.info("successfully created {} index", stateIndexName);
} else {
if (error instanceof ResourceAlreadyExistsException) {
logger.debug("not able to create {} index as it already exists", stateIndexName);
} else {
logger.error("not able to create " + stateIndexName + " index", error);
}
}
});
});
}
}
}
}

View File

@ -9,13 +9,26 @@ package org.elasticsearch.xpack.prelert.job.persistence;
* Methods for handling index naming related functions
*/
public final class AnomalyDetectorsIndex {
private static final String INDEX_PREFIX = "prelertresults-";
private static final String RESULTS_INDEX_PREFIX = "prelertresults-";
private static final String STATE_INDEX_NAME = ".ml-state";
private AnomalyDetectorsIndex() {
}
public static String getJobIndexName(String jobId) {
return INDEX_PREFIX + jobId;
/**
* The name of the default index where the job's results are stored
* @param jobId Job Id
* @return The index name
*/
public static String jobResultsIndexName(String jobId) {
return RESULTS_INDEX_PREFIX + jobId;
}
/**
* The name of the default index where a job's state is stored
* @return The index name
*/
public static String jobStateIndexName() {
return STATE_INDEX_NAME;
}
}

View File

@ -19,7 +19,7 @@ import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
class ElasticsearchBatchedModelSnapshotIterator extends ElasticsearchBatchedDocumentsIterator<ModelSnapshot> {
public ElasticsearchBatchedModelSnapshotIterator(Client client, String jobId, ParseFieldMatcher parserFieldMatcher) {
super(client, AnomalyDetectorsIndex.getJobIndexName(jobId), parserFieldMatcher);
super(client, AnomalyDetectorsIndex.jobStateIndexName(), parserFieldMatcher);
}
@Override

View File

@ -13,7 +13,7 @@ import org.elasticsearch.xpack.prelert.job.results.Result;
abstract class ElasticsearchBatchedResultsIterator<T> extends ElasticsearchBatchedDocumentsIterator<T> {
public ElasticsearchBatchedResultsIterator(Client client, String jobId, String resultType, ParseFieldMatcher parseFieldMatcher) {
super(client, AnomalyDetectorsIndex.getJobIndexName(jobId), parseFieldMatcher,
super(client, AnomalyDetectorsIndex.jobResultsIndexName(jobId), parseFieldMatcher,
new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), resultType));
}

View File

@ -48,7 +48,7 @@ public class JobDataCountsPersister extends AbstractComponent {
public void persistDataCounts(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
try {
XContentBuilder content = serialiseCounts(counts);
client.prepareIndex(AnomalyDetectorsIndex.getJobIndexName(jobId), DataCounts.TYPE.getPreferredName(),
client.prepareIndex(AnomalyDetectorsIndex.jobResultsIndexName(jobId), DataCounts.TYPE.getPreferredName(),
jobId + DataCounts.DOCUMENT_SUFFIX)
.setSource(content).execute(new ActionListener<IndexResponse>() {
@Override

View File

@ -67,7 +67,7 @@ public class JobDataDeleter {
* @param listener Response listener
*/
public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> listener) {
String index = AnomalyDetectorsIndex.getJobIndexName(jobId);
String index = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(Bucket.TIMESTAMP.getPreferredName());
timeRange.gte(cutoffEpochMs);
@ -108,17 +108,18 @@ public class JobDataDeleter {
public void deleteModelSnapshot(ModelSnapshot modelSnapshot) {
String snapshotId = modelSnapshot.getSnapshotId();
int docCount = modelSnapshot.getSnapshotDocCount();
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName();
// Deduce the document IDs of the state documents from the information
// in the snapshot document - we cannot query the state itself as it's
// too big and has no mappings
for (int i = 0; i < docCount; ++i) {
String stateId = snapshotId + '_' + i;
bulkRequestBuilder.add(client.prepareDelete(indexName, ModelState.TYPE.getPreferredName(), stateId));
bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ModelState.TYPE.getPreferredName(), stateId));
++deletedModelStateCount;
}
bulkRequestBuilder.add(client.prepareDelete(indexName, ModelSnapshot.TYPE.getPreferredName(), snapshotId));
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId()),
ModelSnapshot.TYPE.getPreferredName(), snapshotId));
++deletedModelSnapshotCount;
}
@ -126,7 +127,7 @@ public class JobDataDeleter {
* Delete all results marked as interim
*/
public void deleteInterimResults() {
String index = AnomalyDetectorsIndex.getJobIndexName(jobId);
String index = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true);
@ -192,7 +193,7 @@ public class JobDataDeleter {
}
/**
* Repeats a scroll search adding the hits a bulk delete request
* Repeats a scroll search adding the hits to the bulk delete request
*/
private class RepeatingSearchScrollListener implements ActionListener<SearchResponse> {

View File

@ -130,7 +130,7 @@ public class JobProvider {
XContentBuilder usageMapping = ElasticsearchMappings.usageMapping();
LOGGER.trace("ES API CALL: create index {}", PRELERT_USAGE_INDEX);
client.admin().indices().prepareCreate(PRELERT_USAGE_INDEX)
.setSettings(prelertIndexSettings())
.setSettings(mlResultsIndexSettings())
.addMapping(Usage.TYPE, usageMapping)
.execute(new ActionListener<CreateIndexResponse>() {
@Override
@ -145,12 +145,12 @@ public class JobProvider {
});
} catch (IOException e) {
LOGGER.warn("Error checking the usage metering index", e);
LOGGER.warn("Error creating the usage metering index", e);
}
}
/**
* Build the Elasticsearch index settings that we want to apply to Prelert
* Build the Elasticsearch index settings that we want to apply to results
* indexes. It's better to do this in code rather than in elasticsearch.yml
* because then the settings can be applied regardless of whether we're
* using our own Elasticsearch to store results or a customer's pre-existing
@ -159,7 +159,7 @@ public class JobProvider {
* @return An Elasticsearch builder initialised with the desired settings
* for Prelert indexes.
*/
Settings.Builder prelertIndexSettings() {
Settings.Builder mlResultsIndexSettings() {
return Settings.builder()
// Our indexes are small and one shard puts the
// least possible burden on Elasticsearch
@ -176,39 +176,54 @@ public class JobProvider {
.put(IndexSettings.DEFAULT_FIELD_SETTING.getKey(), ElasticsearchMappings.ALL_FIELD_VALUES);
}
/**
* Build the Elasticsearch index settings that we want to apply to the state
* index. It's better to do this in code rather than in elasticsearch.yml
* because then the settings can be applied regardless of whether we're
* using our own Elasticsearch to store results or a customer's pre-existing
* Elasticsearch.
*
* @return An Elasticsearch builder initialised with the desired settings
* for Prelert indexes.
*/
Settings.Builder mlStateIndexSettings() {
// TODO review these settings
return Settings.builder()
// Our indexes are small and one shard puts the
// least possible burden on Elasticsearch
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
// Sacrifice durability for performance: in the event of power
// failure we can lose the last 5 seconds of changes, but it's
// much faster
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), ASYNC);
}
/**
* Create the Elasticsearch index and the mappings
*/
// TODO: rename and move?
public void createJobRelatedIndices(Job job, ActionListener<Boolean> listener) {
public void createJobResultIndex(Job job, ActionListener<Boolean> listener) {
Collection<String> termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList();
try {
XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping(termFields);
XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping();
XContentBuilder categoryDefinitionMapping = ElasticsearchMappings.categoryDefinitionMapping();
XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping();
XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping();
XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping();
XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping();
XContentBuilder usageMapping = ElasticsearchMappings.usageMapping();
XContentBuilder auditMessageMapping = ElasticsearchMappings.auditMessageMapping();
XContentBuilder auditActivityMapping = ElasticsearchMappings.auditActivityMapping();
XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping();
String jobId = job.getId();
boolean createIndexAlias = !job.getIndexName().equals(job.getId());
String indexName = AnomalyDetectorsIndex.getJobIndexName(job.getIndexName());
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName());
LOGGER.trace("ES API CALL: create index {}", indexName);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
Settings.Builder settingsBuilder = prelertIndexSettings();
createIndexRequest.settings(settingsBuilder);
createIndexRequest.settings(mlResultsIndexSettings());
createIndexRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping);
createIndexRequest.mapping(CategorizerState.TYPE, categorizerStateMapping);
createIndexRequest.mapping(CategoryDefinition.TYPE.getPreferredName(), categoryDefinitionMapping);
createIndexRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping);
createIndexRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping);
createIndexRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping);
createIndexRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping);
createIndexRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping);
// NORELASE These mappings shouldn't go in the results index once the index
// strategy has been reworked
createIndexRequest.mapping(Usage.TYPE, usageMapping);
@ -220,7 +235,7 @@ public class JobProvider {
final ActionListener<Boolean> responseListener = listener;
listener = ActionListener.wrap(aBoolean -> {
client.admin().indices().prepareAliases()
.addAlias(indexName, AnomalyDetectorsIndex.getJobIndexName(jobId))
.addAlias(indexName, AnomalyDetectorsIndex.jobResultsIndexName(jobId))
.execute(new ActionListener<IndicesAliasesResponse>() {
@Override
public void onResponse(IndicesAliasesResponse indicesAliasesResponse) {
@ -254,12 +269,42 @@ public class JobProvider {
}
}
public void createJobStateIndex(BiConsumer<Boolean, Exception> listener) {
try {
XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping();
XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping();
XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping();
LOGGER.trace("ES API CALL: create state index {}", AnomalyDetectorsIndex.jobStateIndexName());
CreateIndexRequest createIndexRequest = new CreateIndexRequest(AnomalyDetectorsIndex.jobStateIndexName());
createIndexRequest.settings(mlStateIndexSettings());
createIndexRequest.mapping(CategorizerState.TYPE, categorizerStateMapping);
createIndexRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping);
createIndexRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping);
client.admin().indices().create(createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
listener.accept(true, null);
}
@Override
public void onFailure(Exception e) {
listener.accept(false, e);
}
});
} catch (Exception e) {
LOGGER.warn("Error creating the usage metering index", e);
}
}
/**
* Delete all the job related documents from the database.
*/
// TODO: should live together with createJobRelatedIndices (in case it moves)?
public void deleteJobRelatedIndices(String jobId, ActionListener<DeleteJobAction.Response> listener) {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
LOGGER.trace("ES API CALL: delete index {}", indexName);
try {
@ -286,7 +331,7 @@ public class JobProvider {
* @return The dataCounts or default constructed object if not found
*/
public DataCounts dataCounts(String jobId) {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
try {
GetResponse response = client.prepareGet(indexName, DataCounts.TYPE.getPreferredName(),
@ -378,7 +423,7 @@ public class JobProvider {
SearchResponse searchResponse;
try {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
LOGGER.trace("ES API CALL: search all of result type {} from index {} with filter from {} size {}",
Bucket.RESULT_TYPE_VALUE, indexName, from, size);
@ -421,7 +466,7 @@ public class JobProvider {
* @throws ResourceNotFoundException If the job id is not recognised
*/
public QueryPage<Bucket> bucket(String jobId, BucketQueryBuilder.BucketQuery query) throws ResourceNotFoundException {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
SearchHits hits;
try {
LOGGER.trace("ES API CALL: get Bucket with timestamp {} from index {}", query.getTimestamp(), indexName);
@ -504,7 +549,7 @@ public class JobProvider {
.filter(new TermsQueryBuilder(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue));
FieldSortBuilder sb = new FieldSortBuilder(Bucket.TIMESTAMP.getPreferredName()).order(SortOrder.ASC);
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
SearchRequestBuilder searchBuilder = client
.prepareSearch(indexName)
.setQuery(boolQuery)
@ -630,7 +675,7 @@ public class JobProvider {
* @return QueryPage of CategoryDefinition
*/
public QueryPage<CategoryDefinition> categoryDefinitions(String jobId, int from, int size) {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} from {} size {}",
CategoryDefinition.TYPE.getPreferredName(), indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size);
@ -670,7 +715,7 @@ public class JobProvider {
* @return QueryPage CategoryDefinition
*/
public QueryPage<CategoryDefinition> categoryDefinition(String jobId, String categoryId) {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
GetResponse response;
try {
@ -737,7 +782,7 @@ public class JobProvider {
private QueryPage<AnomalyRecord> records(String jobId, int from, int size,
QueryBuilder recordFilter, FieldSortBuilder sb, List<String> secondarySort,
boolean descending) throws ResourceNotFoundException {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
recordFilter = new BoolQueryBuilder()
.filter(recordFilter)
@ -804,7 +849,7 @@ public class JobProvider {
private QueryPage<Influencer> influencers(String jobId, int from, int size, QueryBuilder filterBuilder, String sortField,
boolean sortDescending) throws ResourceNotFoundException {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}",
() -> Influencer.RESULT_TYPE_VALUE, () -> indexName,
() -> (sortField != null) ?
@ -884,13 +929,12 @@ public class JobProvider {
* Get the persisted quantiles state for the job
*/
public Optional<Quantiles> getQuantiles(String jobId) {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobStateIndexName();
try {
String quantilesId = Quantiles.quantilesId(jobId);
LOGGER.trace("ES API CALL: get ID {} type {} from index {}", quantilesId, Quantiles.TYPE.getPreferredName(), indexName);
GetResponse response = client.prepareGet(
indexName, Quantiles.TYPE.getPreferredName(), quantilesId).get();
GetResponse response = client.prepareGet(indexName, Quantiles.TYPE.getPreferredName(), quantilesId).get();
if (!response.isExists()) {
LOGGER.info("There are currently no quantiles for job " + jobId);
return Optional.empty();
@ -965,7 +1009,7 @@ public class JobProvider {
SearchResponse searchResponse;
try {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} with filter after sort from {} size {}",
ModelSnapshot.TYPE, indexName, sortField, from, size);
@ -1006,7 +1050,7 @@ public class JobProvider {
* @param restoreStream the stream to write the state to
*/
public void restoreStateToStream(String jobId, ModelSnapshot modelSnapshot, OutputStream restoreStream) throws IOException {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobStateIndexName();
// First try to restore categorizer state. There are no snapshots for this, so the IDs simply
// count up until a document is not found. It's NOT an error to have no categorizer state.
@ -1081,7 +1125,7 @@ public class JobProvider {
SearchResponse searchResponse;
try {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
LOGGER.trace("ES API CALL: search result type {} from index {} from {}, size {}",
ModelDebugOutput.RESULT_TYPE_VALUE, indexName, from, size);
@ -1115,7 +1159,7 @@ public class JobProvider {
* Get the job's model size stats.
*/
public Optional<ModelSizeStats> modelSizeStats(String jobId) {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
try {
LOGGER.trace("ES API CALL: get result type {} ID {} from index {}",
ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, indexName);
@ -1173,6 +1217,6 @@ public class JobProvider {
* @return the {@code Auditor}
*/
public Auditor audit(String jobId) {
return new Auditor(client, AnomalyDetectorsIndex.getJobIndexName(jobId), jobId);
return new Auditor(client, AnomalyDetectorsIndex.jobResultsIndexName(jobId), jobId);
}
}

View File

@ -71,7 +71,7 @@ public class JobResultsPersister extends AbstractComponent {
private Builder(String jobId) {
this.jobId = Objects.requireNonNull(jobId);
indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
bulkRequest = client.prepareBulk();
}
@ -208,7 +208,7 @@ public class JobResultsPersister extends AbstractComponent {
public void persistCategoryDefinition(CategoryDefinition category) {
Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE.getPreferredName(),
String.valueOf(category.getCategoryId()));
persistable.persist();
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(category.getJobId()));
// Don't commit as we expect masses of these updates and they're not
// read again by this process
}
@ -219,13 +219,13 @@ public class JobResultsPersister extends AbstractComponent {
public void persistQuantiles(Quantiles quantiles) {
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(),
Quantiles.quantilesId(quantiles.getJobId()));
if (persistable.persist()) {
if (persistable.persist(AnomalyDetectorsIndex.jobStateIndexName())) {
// Refresh the index when persisting quantiles so that previously
// persisted results will be available for searching. Do this using the
// indices API rather than the index API (used to write the quantiles
// above), because this will refresh all shards rather than just the
// shard that the quantiles document itself was written to.
commitWrites(quantiles.getJobId());
commitStateWrites(quantiles.getJobId());
}
}
@ -235,7 +235,7 @@ public class JobResultsPersister extends AbstractComponent {
public void persistModelSnapshot(ModelSnapshot modelSnapshot) {
Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(),
modelSnapshot.getSnapshotId());
persistable.persist();
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId()));
}
/**
@ -246,9 +246,9 @@ public class JobResultsPersister extends AbstractComponent {
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
Persistable persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(),
ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName());
persistable.persist();
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(jobId));
persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(), null);
persistable.persist();
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(jobId));
// Don't commit as we expect masses of these updates and they're only
// for information at the API level
}
@ -258,7 +258,7 @@ public class JobResultsPersister extends AbstractComponent {
*/
public void persistModelDebugOutput(ModelDebugOutput modelDebugOutput) {
Persistable persistable = new Persistable(modelDebugOutput.getJobId(), modelDebugOutput, Result.TYPE.getPreferredName(), null);
persistable.persist();
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(modelDebugOutput.getJobId()));
// Don't commit as we expect masses of these updates and they're not
// read again by this process
}
@ -293,10 +293,26 @@ public class JobResultsPersister extends AbstractComponent {
* Once all the job data has been written this function will be
* called to commit the writes to the datastore.
*
* @param jobId The job Id
* @return True if successful
*/
public boolean commitWrites(String jobId) {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
public boolean commitResultWrites(String jobId) {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
// Refresh should wait for Lucene to make the data searchable
logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName);
client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
return true;
}
/**
* Once the job state has been written calling this function makes it
* immediately searchable.
*
* @param jobId The job Id
* @return True if successful
* */
public boolean commitStateWrites(String jobId) {
String indexName = AnomalyDetectorsIndex.jobStateIndexName();
// Refresh should wait for Lucene to make the data searchable
logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName);
client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
@ -329,16 +345,15 @@ public class JobResultsPersister extends AbstractComponent {
this.id = id;
}
boolean persist() {
boolean persist(String indexName) {
if (object == null) {
logger.warn("[{}] No {} to persist for job ", jobId, type);
return false;
}
logCall();
logCall(indexName);
try {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
client.prepareIndex(indexName, type, id)
.setSource(toXContentBuilder(object))
.execute().actionGet();
@ -349,8 +364,7 @@ public class JobResultsPersister extends AbstractComponent {
}
}
private void logCall() {
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
private void logCall(String indexName) {
if (id != null) {
logger.trace("[{}] ES API CALL: index type {} to index {} with ID {}", jobId, type, indexName, id);
} else {

View File

@ -48,7 +48,7 @@ public class UsagePersister extends AbstractComponent {
// update global count
updateDocument(jobId, PRELERT_USAGE_INDEX, docId, bytesRead, fieldsRead, recordsRead);
updateDocument(jobId, AnomalyDetectorsIndex.getJobIndexName(jobId), docId, bytesRead,
updateDocument(jobId, AnomalyDetectorsIndex.jobResultsIndexName(jobId), docId, bytesRead,
fieldsRead, recordsRead);
}

View File

@ -84,6 +84,8 @@ public class AutoDetectResultProcessor {
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
}
}
context.bulkResultsPersister.executeRequest();
LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount);
LOGGER.info("[{}] Parse results Complete", jobId);
} catch (Exception e) {
@ -157,7 +159,7 @@ public class AutoDetectResultProcessor {
// Commit previous writes here, effectively continuing
// the flush from the C++ autodetect process right
// through to the data store
persister.commitWrites(context.jobId);
persister.commitResultWrites(context.jobId);
flushListener.acknowledgeFlush(flushAcknowledgement.getId());
// Interim results may have been produced by the flush,
// which need to be

View File

@ -191,7 +191,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
}
private DataCounts getDataCounts(String jobId) {
GetResponse getResponse = client().prepareGet(AnomalyDetectorsIndex.getJobIndexName(jobId),
GetResponse getResponse = client().prepareGet(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
DataCounts.TYPE.getPreferredName(), jobId + "-data-counts").get();
if (getResponse.isExists() == false) {
return new DataCounts(jobId);

View File

@ -111,7 +111,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
}).start();
resultProcessor.process(JOB_ID, inputStream, false);
jobResultsPersister.commitWrites(JOB_ID);
jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build());
assertEquals(1, persistedBucket.count());
@ -178,7 +178,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
}).start();
resultProcessor.process(JOB_ID, inputStream, false);
jobResultsPersister.commitWrites(JOB_ID);
jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build());
assertEquals(1, persistedBucket.count());
@ -227,7 +227,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
}).start();
resultProcessor.process(JOB_ID, inputStream, false);
jobResultsPersister.commitWrites(JOB_ID);
jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build());
assertEquals(1, persistedBucket.count());
@ -249,7 +249,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
Job.Builder jobBuilder = new Job.Builder(JOB_ID);
jobBuilder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector)));
jobProvider.createJobRelatedIndices(jobBuilder.build(), new ActionListener<Boolean>() {
jobProvider.createJobResultIndex(jobBuilder.build(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
}

View File

@ -155,14 +155,14 @@ public class JobManagerTests extends ESTestCase {
PutJobAction.Request request = new PutJobAction.Request(jobBuilder.build());
Index index = mock(Index.class);
when(index.getName()).thenReturn(AnomalyDetectorsIndex.getJobIndexName("my-special-place"));
when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("my-special-place"));
IndexMetaData indexMetaData = mock(IndexMetaData.class);
when(indexMetaData.getIndex()).thenReturn(index);
ImmutableOpenMap<String, AliasMetaData> aliases = ImmutableOpenMap.of();
when(indexMetaData.getAliases()).thenReturn(aliases);
ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder()
.fPut(AnomalyDetectorsIndex.getJobIndexName("my-special-place"), indexMetaData).build();
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("my-special-place"), indexMetaData).build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, PrelertMetadata.PROTO).indices(indexMap)).build();

View File

@ -18,6 +18,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import java.util.concurrent.ExecutorService;
@ -57,6 +58,7 @@ public class PrelertInitializationServiceTests extends ESTestCase {
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-prelert-metadata"), any());
verify(jobProvider, times(1)).createUsageMeteringIndex(any());
verify(jobProvider, times(1)).createJobStateIndex(any());
}
public void testInitialize_noMasterNode() {
@ -109,12 +111,17 @@ public class PrelertInitializationServiceTests extends ESTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
))
.put(IndexMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName()).settings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
))
.putCustom(PrelertMetadata.TYPE, new PrelertMetadata.Builder().build()))
.build();
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
verify(clusterService, times(0)).submitStateUpdateTask(eq("install-prelert-metadata"), any());
verify(jobProvider, times(0)).createUsageMeteringIndex(any());
verify(jobProvider, times(0)).createJobStateIndex(any());
}
}

View File

@ -92,8 +92,8 @@ public class ElasticsearchMappingsTests extends ESTestCase {
overridden.add(Quantiles.TYPE.getPreferredName());
overridden.add(Usage.TYPE);
// These are not reserved because they're in the prelert-int index, not
// prelertresults-*
// These are not reserved because they're in the prelert-int index
// not the job indices
overridden.add(ListDocument.ID.getPreferredName());
overridden.add(ListDocument.ITEMS.getPreferredName());

View File

@ -13,15 +13,20 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.ModelState;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import static org.elasticsearch.mock.orig.Mockito.mock;
import static org.elasticsearch.mock.orig.Mockito.times;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
public class JobDataDeleterTests extends ESTestCase {
@ -35,7 +40,7 @@ public class JobDataDeleterTests extends ESTestCase {
BulkResponse bulkResponse = Mockito.mock(BulkResponse.class);
Client client = new MockClientBuilder("myCluster")
.prepareSearchExecuteListener(AnomalyDetectorsIndex.getJobIndexName("foo"), response)
.prepareSearchExecuteListener(AnomalyDetectorsIndex.jobResultsIndexName("foo"), response)
.prepareSearchScrollExecuteListener(response)
.prepareBulk(bulkResponse).build();
@ -73,6 +78,24 @@ public class JobDataDeleterTests extends ESTestCase {
verify(client.prepareBulk(), times(1)).execute(bulkListener);
}
public void testDeleteModelSnapShot() {
String jobId = "foo";
ModelSnapshot snapshot = new ModelSnapshot(jobId);
snapshot.setSnapshotDocCount(5);
snapshot.setSnapshotId("snap-1");
BulkResponse bulkResponse = Mockito.mock(BulkResponse.class);
Client client = new MockClientBuilder("myCluster").prepareBulk(bulkResponse).build();
JobDataDeleter bulkDeleter = new JobDataDeleter(client, jobId);
bulkDeleter.deleteModelSnapshot(snapshot);
verify(client, times(5))
.prepareDelete(eq(AnomalyDetectorsIndex.jobStateIndexName()), eq(ModelState.TYPE.getPreferredName()), anyString());
verify(client, times(1))
.prepareDelete(eq(AnomalyDetectorsIndex.jobResultsIndexName(jobId)), eq(ModelSnapshot.TYPE.getPreferredName()),
eq("snap-1"));
}
private SearchResponse createSearchResponseWithHits(long totalHitCount, int hitsPerSearchResult) {
SearchHits hits = mockSearchHits(totalHitCount, hitsPerSearchResult);
SearchResponse searchResponse = Mockito.mock(SearchResponse.class);

View File

@ -14,7 +14,6 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
@ -67,28 +66,16 @@ import static org.mockito.Mockito.when;
public class JobProviderTests extends ESTestCase {
private static final String CLUSTER_NAME = "myCluster";
private static final String JOB_ID = "foo";
private static final String INDEX_NAME = "prelertresults-foo";
private static final String STATE_INDEX_NAME = ".ml-state";
@Captor
private ArgumentCaptor<Map<String, Object>> mapCaptor;
public void testGetQuantiles_GivenNoIndexForJob() throws InterruptedException, ExecutionException {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.throwMissingIndexOnPrepareGet(INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(JOB_ID));
JobProvider provider = createProvider(clientBuilder.build());
ESTestCase.expectThrows(IndexNotFoundException.class, () -> provider.getQuantiles(JOB_ID));
}
public void testGetQuantiles_GivenNoQuantilesForJob() throws Exception {
GetResponse getResponse = createGetResponse(false, null);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareGet(INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(JOB_ID), getResponse);
.prepareGet(STATE_INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(JOB_ID), getResponse);
JobProvider provider = createProvider(clientBuilder.build());
@ -105,8 +92,7 @@ public class JobProviderTests extends ESTestCase {
GetResponse getResponse = createGetResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareGet(INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(JOB_ID), getResponse);
.prepareGet(STATE_INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId(JOB_ID), getResponse);
JobProvider provider = createProvider(clientBuilder.build());
@ -124,8 +110,7 @@ public class JobProviderTests extends ESTestCase {
GetResponse getResponse = createGetResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareGet(INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId("foo"), getResponse);
.prepareGet(STATE_INDEX_NAME, Quantiles.TYPE.getPreferredName(), Quantiles.quantilesId("foo"), getResponse);
JobProvider provider = createProvider(clientBuilder.build());
@ -146,10 +131,10 @@ public class JobProviderTests extends ESTestCase {
clientBuilder.verifyIndexCreated(JobProvider.PRELERT_USAGE_INDEX);
}
public void testIndexSettings() {
public void testMlResultsIndexSettings() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
JobProvider provider = createProvider(clientBuilder.build());
Settings settings = provider.prelertIndexSettings().build();
Settings settings = provider.mlResultsIndexSettings().build();
assertEquals("1", settings.get("index.number_of_shards"));
assertEquals("0", settings.get("index.number_of_replicas"));
@ -158,31 +143,28 @@ public class JobProviderTests extends ESTestCase {
assertEquals("all_field_values", settings.get("index.query.default_field"));
}
public void testCreateJobRelatedIndicies() {
public void testCreateJobResultsIndex() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.getJobIndexName("foo"), captor);
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor);
Job.Builder job = buildJobBuilder("foo");
JobProvider provider = createProvider(clientBuilder.build());
provider.createJobRelatedIndices(job.build(), new ActionListener<Boolean>() {
provider.createJobResultIndex(job.build(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
CreateIndexRequest request = captor.getValue();
assertNotNull(request);
assertEquals(provider.prelertIndexSettings().build(), request.settings());
assertEquals(provider.mlResultsIndexSettings().build(), request.settings());
assertTrue(request.mappings().containsKey(Result.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(CategorizerState.TYPE));
assertTrue(request.mappings().containsKey(CategoryDefinition.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(Quantiles.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(ModelState.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(ModelSnapshot.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(DataCounts.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(Usage.TYPE));
assertTrue(request.mappings().containsKey(AuditMessage.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(AuditActivity.TYPE.getPreferredName()));
assertEquals(10, request.mappings().size());
assertTrue(request.mappings().containsKey(ModelSnapshot.TYPE.getPreferredName()));
assertEquals(7, request.mappings().size());
}
@Override
@ -195,15 +177,15 @@ public class JobProviderTests extends ESTestCase {
public void testCreateJobRelatedIndicies_createsAliasIfIndexNameIsSet() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.getJobIndexName("foo"), captor);
clientBuilder.prepareAlias(AnomalyDetectorsIndex.getJobIndexName("bar"), AnomalyDetectorsIndex.getJobIndexName("foo"));
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor);
clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsIndexName("bar"), AnomalyDetectorsIndex.jobResultsIndexName("foo"));
Job.Builder job = buildJobBuilder("foo");
job.setIndexName("bar");
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
provider.createJobRelatedIndices(job.build(), new ActionListener<Boolean>() {
provider.createJobResultIndex(job.build(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
verify(client.admin().indices(), times(1)).prepareAliases();
@ -220,14 +202,14 @@ public class JobProviderTests extends ESTestCase {
public void testCreateJobRelatedIndicies_doesntCreateAliasIfIndexNameIsSameAsJobId() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.getJobIndexName("foo"), captor);
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor);
Job.Builder job = buildJobBuilder("foo");
job.setIndexName("foo");
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
provider.createJobRelatedIndices(job.build(), new ActionListener<Boolean>() {
provider.createJobResultIndex(job.build(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
verify(client.admin().indices(), never()).prepareAliases();
@ -240,6 +222,36 @@ public class JobProviderTests extends ESTestCase {
});
}
public void testMlStateIndexSettings() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
JobProvider provider = createProvider(clientBuilder.build());
Settings settings = provider.mlResultsIndexSettings().build();
assertEquals("1", settings.get("index.number_of_shards"));
assertEquals("0", settings.get("index.number_of_replicas"));
assertEquals("async", settings.get("index.translog.durability"));
}
public void testCreateJobStateIndex() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobStateIndexName(), captor);
Job.Builder job = buildJobBuilder("foo");
JobProvider provider = createProvider(clientBuilder.build());
provider.createJobStateIndex((result, error) -> {
assertTrue(result);
CreateIndexRequest request = captor.getValue();
assertNotNull(request);
assertEquals(provider.mlStateIndexSettings().build(), request.settings());
assertTrue(request.mappings().containsKey(CategorizerState.TYPE));
assertTrue(request.mappings().containsKey(Quantiles.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(ModelState.TYPE.getPreferredName()));
assertEquals(3, request.mappings().size());
});
}
public void testCreateJob() throws InterruptedException, ExecutionException {
Job.Builder job = buildJobBuilder("marscapone");
job.setDescription("This is a very cheesy job");
@ -247,12 +259,13 @@ public class JobProviderTests extends ESTestCase {
job.setAnalysisLimits(limits);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).createIndexRequest("prelertresults-" + job.getId(), captor);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME)
.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName(job.getId()), captor);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
AtomicReference<Boolean> resultHolder = new AtomicReference<>();
provider.createJobRelatedIndices(job.build(), new ActionListener<Boolean>() {
provider.createJobResultIndex(job.build(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
resultHolder.set(aBoolean);
@ -271,12 +284,12 @@ public class JobProviderTests extends ESTestCase {
@SuppressWarnings("unchecked")
ActionListener<DeleteJobAction.Response> actionListener = mock(ActionListener.class);
String jobId = "ThisIsMyJob";
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse();
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
clientBuilder.resetIndices();
clientBuilder.addIndicesExistsResponse("prelertresults-" + jobId, true).addIndicesDeleteResponse("prelertresults-" + jobId, true,
clientBuilder.addIndicesExistsResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true)
.addIndicesDeleteResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true,
false, actionListener);
clientBuilder.build();
@ -291,12 +304,12 @@ public class JobProviderTests extends ESTestCase {
@SuppressWarnings("unchecked")
ActionListener<DeleteJobAction.Response> actionListener = mock(ActionListener.class);
String jobId = "ThisIsMyJob";
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse();
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
clientBuilder.resetIndices();
clientBuilder.addIndicesExistsResponse("prelertresults-" + jobId, true).addIndicesDeleteResponse("prelertresults-" + jobId, true,
clientBuilder.addIndicesExistsResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true)
.addIndicesDeleteResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true,
true, actionListener);
clientBuilder.build();
@ -324,8 +337,8 @@ public class JobProviderTests extends ESTestCase {
int from = 0;
int size = 10;
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -359,8 +372,8 @@ public class JobProviderTests extends ESTestCase {
int from = 99;
int size = 17;
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -394,8 +407,8 @@ public class JobProviderTests extends ESTestCase {
int from = 99;
int size = 17;
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -423,15 +436,11 @@ public class JobProviderTests extends ESTestCase {
Date now = new Date();
List<Map<String, Object>> source = new ArrayList<>();
Map<String, Object> map = new HashMap<>();
map.put("timestamp", now.getTime());
// source.add(map);
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(false, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -457,8 +466,8 @@ public class JobProviderTests extends ESTestCase {
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -487,8 +496,8 @@ public class JobProviderTests extends ESTestCase {
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -529,8 +538,8 @@ public class JobProviderTests extends ESTestCase {
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -581,8 +590,8 @@ public class JobProviderTests extends ESTestCase {
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -640,8 +649,8 @@ public class JobProviderTests extends ESTestCase {
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -679,8 +688,8 @@ public class JobProviderTests extends ESTestCase {
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearchAnySize("prelertresults-" + jobId, Result.TYPE.getPreferredName(), response, queryBuilder);
.prepareSearchAnySize(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -711,8 +720,8 @@ public class JobProviderTests extends ESTestCase {
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearchAnySize("prelertresults-" + jobId, Result.TYPE.getPreferredName(), response, queryBuilder);
.prepareSearchAnySize(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -742,8 +751,8 @@ public class JobProviderTests extends ESTestCase {
int from = 0;
int size = 10;
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, CategoryDefinition.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
CategoryDefinition.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -766,8 +775,8 @@ public class JobProviderTests extends ESTestCase {
GetResponse getResponse = createGetResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareGet("prelertresults-" + jobId, CategoryDefinition.TYPE.getPreferredName(), categoryId, getResponse);
.prepareGet(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
CategoryDefinition.TYPE.getPreferredName(), categoryId, getResponse);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -810,8 +819,7 @@ public class JobProviderTests extends ESTestCase {
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(),
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), Result.TYPE.getPreferredName(),
from, size, response, queryBuilder);
Client client = clientBuilder.build();
@ -875,8 +883,7 @@ public class JobProviderTests extends ESTestCase {
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, Result.TYPE.getPreferredName(), from, size, response,
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId), Result.TYPE.getPreferredName(), from, size, response,
queryBuilder);
Client client = clientBuilder.build();
@ -910,8 +917,7 @@ public class JobProviderTests extends ESTestCase {
String jobId = "TestJobIdentificationForInfluencers";
String influencerId = "ThisIsAnInfluencerId";
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse();
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -952,8 +958,8 @@ public class JobProviderTests extends ESTestCase {
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, ModelSnapshot.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
ModelSnapshot.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -1008,8 +1014,8 @@ public class JobProviderTests extends ESTestCase {
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareSearch("prelertresults-" + jobId, ModelSnapshot.TYPE.getPreferredName(), from, size, response, queryBuilder);
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
ModelSnapshot.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
@ -1038,8 +1044,7 @@ public class JobProviderTests extends ESTestCase {
}
public void testMergePartitionScoresIntoBucket() throws InterruptedException, ExecutionException {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME)
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true).addClusterStatusYellowResponse();
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
JobProvider provider = createProvider(clientBuilder.build());
@ -1093,8 +1098,7 @@ public class JobProviderTests extends ESTestCase {
}
public void testMergePartitionScoresIntoBucket_WithEmptyScoresList() throws InterruptedException, ExecutionException {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME)
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true).addClusterStatusYellowResponse();
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
JobProvider provider = createProvider(clientBuilder.build());
@ -1125,11 +1129,10 @@ public class JobProviderTests extends ESTestCase {
GetResponse modelStateGetResponse2 = createGetResponse(true, modelState);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.addIndicesExistsResponse(JobProvider.PRELERT_USAGE_INDEX, true)
.prepareGet(INDEX_NAME, CategorizerState.TYPE, JOB_ID + "_1", categorizerStateGetResponse1)
.prepareGet(INDEX_NAME, CategorizerState.TYPE, JOB_ID + "_2", categorizerStateGetResponse2)
.prepareGet(INDEX_NAME, ModelState.TYPE.getPreferredName(), "123_1", modelStateGetResponse1)
.prepareGet(INDEX_NAME, ModelState.TYPE.getPreferredName(), "123_2", modelStateGetResponse2);
.prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, JOB_ID + "_1", categorizerStateGetResponse1)
.prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), CategorizerState.TYPE, JOB_ID + "_2", categorizerStateGetResponse2)
.prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ModelState.TYPE.getPreferredName(), "123_1", modelStateGetResponse1)
.prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ModelState.TYPE.getPreferredName(), "123_2", modelStateGetResponse2);
JobProvider provider = createProvider(clientBuilder.build());

View File

@ -204,7 +204,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result);
verify(flushListener, times(1)).acknowledgeFlush(JOB_ID);
verify(persister, times(1)).commitWrites(JOB_ID);
verify(persister, times(1)).commitResultWrites(JOB_ID);
verify(bulkBuilder, never()).executeRequest();
verifyNoMoreInteractions(persister);
assertTrue(context.deleteInterimRequired);
@ -230,7 +230,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result);
inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition);
inOrder.verify(persister, times(1)).commitWrites(JOB_ID);
inOrder.verify(persister, times(1)).commitResultWrites(JOB_ID);
inOrder.verify(flushListener, times(1)).acknowledgeFlush(JOB_ID);
verify(bulkBuilder, never()).executeRequest();
verifyNoMoreInteractions(persister);

View File

@ -43,7 +43,7 @@ setup:
- do:
index:
index: prelertresults-foo
index: .ml-state
type: model_state
id: "foo1_0"
body: >
@ -53,7 +53,7 @@ setup:
- do:
index:
index: prelertresults-foo
index: .ml-state
type: model_state
id: "foo1_1"
body: >
@ -77,6 +77,10 @@ setup:
"latest_result_time_stamp": "2016-06-01T00:00:00Z"
}
- do:
indices.refresh:
index: .ml-state
- do:
indices.refresh:
index: prelertresults-foo
@ -104,7 +108,7 @@ setup:
- do:
count:
index: prelertresults-foo
index: .ml-state
type: model_state
- match: { count: 2 }
@ -119,6 +123,10 @@ setup:
indices.refresh:
index: prelertresults-foo
- do:
indices.refresh:
index: .ml-state
- do:
xpack.prelert.get_model_snapshots:
job_id: "foo"
@ -127,7 +135,7 @@ setup:
- do:
count:
index: prelertresults-foo
index: .ml-state
type: model_state
- match: { count: 0 }

View File

@ -10,6 +10,7 @@ setup:
type: date
"restore_priority":
type: integer
- do:
index:
index: prelertresults-foo

View File

@ -21,7 +21,10 @@
- do:
indices.get:
index: "prelertresults-farequote"
- is_true: "prelertresults-farequote"
- do:
indices.get:
index: ".ml-state"
- do:
xpack.prelert.get_jobs:

View File

@ -10,6 +10,7 @@ setup:
type: date
"restore_priority":
type: integer
- do:
index:
index: prelertresults-foo

View File

@ -128,6 +128,9 @@ setup:
indices.refresh:
index: prelertresults-foo
- do:
indices.refresh:
index: .ml-state
---
"Test revert model with only job_id":
- do: