[ML] Share job result indices by default (elastic/x-pack-elasticsearch#638)

This moves the index structure to using a single, shared index
(.ml-anomalies-shared).  Custom indices can still be used by manually
setting `results_index`.

An alias is always created which points from `.ml-anomalies-<jobid>`
to `.ml-anomalies-shared`.

User defined indices are prepended with "custom-"

Index helper functions have been renamed to make this clear.  Furthermore,
accessing an index should always be done either by fetching the
currently configured index/alias from the state, or using the preconfigured
alias.  Because the user can specify a custom physical index, it is
impossible to determine the physical index "by convention" now.
The helpers have been configured to reflect that.

Original commit: elastic/x-pack-elasticsearch@a5368eb230
This commit is contained in:
Zachary Tong 2017-03-01 13:28:12 -05:00 committed by GitHub
parent b20578b9f6
commit eed0e41a29
24 changed files with 708 additions and 293 deletions

View File

@ -245,7 +245,7 @@ public class MlMetadata implements MetaData.Custom {
JobState jobState = MlMetadata.getJobState(jobId, tasks);
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
throw ExceptionsHelper.conflictStatusException("Unexpected job state [" + jobState + "], expected [" +
JobState.CLOSED + "]");
JobState.CLOSED + " or " + JobState.FAILED + "]");
}
Job job = jobs.remove(jobId);
if (job == null) {

View File

@ -481,15 +481,15 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
}
}
static String[] indicesOfInterest(Job job) {
String jobResultIndex = AnomalyDetectorsIndex.jobResultsIndexName(job.getResultsIndexName());
static String[] indicesOfInterest(ClusterState clusterState, String job) {
String jobResultIndex = AnomalyDetectorsIndex.getPhysicalIndexFromState(clusterState, job);
return new String[]{AnomalyDetectorsIndex.jobStateIndexName(), jobResultIndex, AnomalyDetectorsIndex.ML_META_INDEX};
}
static List<String> verifyIndicesPrimaryShardsAreActive(Logger logger, String jobId, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
Job job = mlMetadata.getJobs().get(jobId);
String[] indices = indicesOfInterest(job);
String[] indices = indicesOfInterest(clusterState, jobId);
List<String> unavailableIndices = new ArrayList<>(indices.length);
for (String index : indices) {
// Indices are created on demand from templates.

View File

@ -18,6 +18,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction;
@ -26,8 +27,6 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask;
@ -253,7 +252,6 @@ public class JobManager extends AbstractComponent {
ActionListener<DeleteJobAction.Response> actionListener) {
String jobId = request.getJobId();
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
logger.debug("Deleting job '" + jobId + "'");
// Step 3. When the job has been removed from the cluster state, return a response

View File

@ -18,6 +18,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.utils.MlStrings;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
@ -195,6 +196,15 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
* @return The job's index name
*/
public String getResultsIndexName() {
return AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + resultsIndexName;
}
/**
* Private version of getResultsIndexName so that a job can be built from another
* job and pass index name validation
* @return The job's index name, minus prefix
*/
private String getResultsIndexNameNoPrefix() {
return resultsIndexName;
}
@ -525,10 +535,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
this.backgroundPersistInterval = job.getBackgroundPersistInterval();
this.modelSnapshotRetentionDays = job.getModelSnapshotRetentionDays();
this.resultsRetentionDays = job.getResultsRetentionDays();
this.modelSnapshotRetentionDays = job.getModelSnapshotRetentionDays();
this.customSettings = job.getCustomSettings();
this.modelSnapshotId = job.getModelSnapshotId();
this.resultsIndexName = job.getResultsIndexName();
this.resultsIndexName = job.getResultsIndexNameNoPrefix();
this.deleted = job.isDeleted();
}
@ -667,15 +676,23 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
}
if (Strings.isNullOrEmpty(resultsIndexName)) {
resultsIndexName = id;
resultsIndexName = AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT;
} else if (!MlStrings.isValidId(resultsIndexName)) {
throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, RESULTS_INDEX_NAME.getPreferredName()));
throw new IllegalArgumentException(
Messages.getMessage(Messages.INVALID_ID, RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName));
} else if (!resultsIndexName.equals(AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT)) {
// User-defined names are prepended with "custom"
// Conditional guards against multiple prepending due to updates instead of first creation
resultsIndexName = resultsIndexName.startsWith("custom-")
? resultsIndexName
: "custom-" + resultsIndexName;
}
return new Job(
id, description, createTime, finishedTime, lastDataTime, analysisConfig, analysisLimits,
dataDescription, modelDebugConfig, renormalizationWindowDays, backgroundPersistInterval,
modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, resultsIndexName, deleted);
modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId,
resultsIndexName, deleted);
}
}
}

View File

@ -18,8 +18,9 @@ public final class AnomalyDetectorsIndex {
*/
public static final String ML_META_INDEX = ".ml-meta";
private static final String RESULTS_INDEX_PREFIX = ".ml-anomalies-";
public static final String RESULTS_INDEX_PREFIX = ".ml-anomalies-";
private static final String STATE_INDEX_NAME = ".ml-state";
public static final String RESULTS_INDEX_DEFAULT = "shared";
private AnomalyDetectorsIndex() {
}
@ -33,18 +34,18 @@ public final class AnomalyDetectorsIndex {
* @param jobId Job Id
* @return The index name
*/
public static String jobResultsIndexName(String jobId) {
public static String jobResultsAliasedName(String jobId) {
return RESULTS_INDEX_PREFIX + jobId;
}
/**
* The default index pattern for rollover index results
* Retrieves the currently defined physical index from the job state
* @param jobId Job Id
* @return The index name
*/
public static String getCurrentResultsIndex(ClusterState state, String jobId) {
public static String getPhysicalIndexFromState(ClusterState state, String jobId) {
MlMetadata meta = state.getMetaData().custom(MlMetadata.TYPE);
return RESULTS_INDEX_PREFIX + meta.getJobs().get(jobId).getResultsIndexName();
return meta.getJobs().get(jobId).getResultsIndexName();
}
/**

View File

@ -13,7 +13,7 @@ public abstract class BatchedResultsIterator<T>
extends BatchedDocumentsIterator<BatchedResultsIterator.ResultWithIndex<T>> {
public BatchedResultsIterator(Client client, String jobId, String resultType) {
super(client, AnomalyDetectorsIndex.jobResultsIndexName(jobId),
super(client, AnomalyDetectorsIndex.jobResultsAliasedName(jobId),
new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), resultType));
}

View File

@ -426,7 +426,6 @@ public class ElasticsearchMappings {
return jsonBuilder()
.startObject()
.startObject(DataCounts.TYPE.getPreferredName())
.field(ENABLED, false)
.startObject(PROPERTIES)
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)

View File

@ -48,7 +48,7 @@ public class JobDataCountsPersister extends AbstractComponent {
public void persistDataCounts(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
try {
XContentBuilder content = serialiseCounts(counts);
client.prepareIndex(AnomalyDetectorsIndex.jobResultsIndexName(jobId), DataCounts.TYPE.getPreferredName(),
client.prepareIndex(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), DataCounts.TYPE.getPreferredName(),
DataCounts.documentId(jobId))
.setSource(content).execute(new ActionListener<IndexResponse>() {
@Override

View File

@ -67,7 +67,7 @@ public class JobDataDeleter {
* @param listener Response listener
*/
public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> listener) {
String index = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String index = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName());
timeRange.gte(cutoffEpochMs);
@ -119,7 +119,7 @@ public class JobDataDeleter {
++deletedModelStateCount;
}
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId()),
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()),
ModelSnapshot.TYPE.getPreferredName(), snapshotDocId));
++deletedModelSnapshotCount;
}
@ -128,7 +128,7 @@ public class JobDataDeleter {
* Delete all results marked as interim
*/
public void deleteInterimResults() {
String index = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String index = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true);

View File

@ -28,14 +28,12 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
@ -54,7 +52,6 @@ import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.Job;
@ -124,19 +121,19 @@ public class JobProvider {
*/
public void createJobResultIndex(Job job, ClusterState state, ActionListener<Boolean> listener) {
Collection<String> termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList();
String jobId = job.getId();
boolean createIndexAlias = !job.getResultsIndexName().equals(job.getId());
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(job.getResultsIndexName());
if (createIndexAlias) {
String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId());
String indexName = job.getResultsIndexName();
final ActionListener<Boolean> responseListener = listener;
listener = ActionListener.wrap(aBoolean -> {
client.admin().indices().prepareAliases()
.addAlias(indexName, AnomalyDetectorsIndex.jobResultsIndexName(jobId))
.addAlias(indexName, aliasName)
.execute(ActionListener.wrap(r -> responseListener.onResponse(true), responseListener::onFailure));
},
listener::onFailure);
}
// Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if
// already in the CS
@ -147,9 +144,7 @@ public class JobProvider {
final ActionListener<Boolean> createdListener = listener;
client.admin().indices().create(createIndexRequest,
ActionListener.wrap(
r -> {
updateIndexMappingWithTermFields(indexName, termFields, createdListener);
},
r -> updateIndexMappingWithTermFields(indexName, termFields, createdListener),
e -> {
// Possible that the index was created while the request was executing,
// so we need to handle that possibility
@ -169,6 +164,7 @@ public class JobProvider {
r -> updateIndexMappingWithTermFields(indexName, termFields, updateMappingListener),
updateMappingListener::onFailure));
}
}
private void updateIndexMappingWithTermFields(String indexName, Collection<String> termFields, ActionListener<Boolean> listener) {
@ -230,7 +226,7 @@ public class JobProvider {
*/
// TODO: should live together with createJobRelatedIndices (in case it moves)?
public void deleteJobRelatedIndices(String jobId, ActionListener<DeleteJobAction.Response> listener) {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
LOGGER.trace("ES API CALL: delete index {}", indexName);
try {
@ -248,7 +244,7 @@ public class JobProvider {
* @param jobId The job id
*/
public void dataCounts(String jobId, Consumer<DataCounts> handler, Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
get(jobId, indexName, DataCounts.TYPE.getPreferredName(), DataCounts.documentId(jobId), handler, errorHandler,
DataCounts.PARSER, () -> new DataCounts(jobId));
}
@ -256,7 +252,6 @@ public class JobProvider {
private <T, U> void get(String jobId, String indexName, String type, String id, Consumer<T> handler, Consumer<Exception> errorHandler,
BiFunction<XContentParser, U, T> objectParser, Supplier<T> notFoundSupplier) {
GetRequest getRequest = new GetRequest(indexName, type, id);
client.get(getRequest, ActionListener.wrap(
response -> {
if (response.isExists() == false) {
@ -343,7 +338,7 @@ public class JobProvider {
QueryBuilder boolQuery = new BoolQueryBuilder()
.filter(rfb.build())
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE));
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(Result.TYPE.getPreferredName());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
@ -465,7 +460,7 @@ public class JobProvider {
.filter(new TermsQueryBuilder(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue));
FieldSortBuilder sb = new FieldSortBuilder(Result.TIMESTAMP.getPreferredName()).order(SortOrder.ASC);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.sort(sb);
sourceBuilder.query(boolQuery);
@ -597,7 +592,7 @@ public class JobProvider {
throw new IllegalStateException("Both categoryId and pageParams are specified");
}
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} from {} size {}",
CategoryDefinition.TYPE.getPreferredName(), indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size);
@ -663,7 +658,7 @@ public class JobProvider {
QueryBuilder recordFilter, FieldSortBuilder sb, List<String> secondarySort,
boolean descending, Consumer<QueryPage<AnomalyRecord>> handler,
Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
recordFilter = new BoolQueryBuilder()
.filter(recordFilter)
@ -718,7 +713,7 @@ public class JobProvider {
.interim(Bucket.IS_INTERIM.getPreferredName(), query.isIncludeInterim())
.build();
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}",
() -> Influencer.RESULT_TYPE_VALUE, () -> indexName,
() -> (query.getSortField() != null) ?
@ -785,7 +780,7 @@ public class JobProvider {
handler.accept(null);
return;
}
get(jobId, AnomalyDetectorsIndex.jobResultsIndexName(jobId), ModelSnapshot.TYPE.getPreferredName(),
get(jobId, AnomalyDetectorsIndex.jobResultsAliasedName(jobId), ModelSnapshot.TYPE.getPreferredName(),
ModelSnapshot.documentId(jobId, modelSnapshotId), handler, errorHandler, ModelSnapshot.PARSER, () -> null);
}
@ -861,7 +856,7 @@ public class JobProvider {
FieldSortBuilder sb = new FieldSortBuilder(sortField)
.order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} with filter after sort from {} size {}",
ModelSnapshot.TYPE, indexName, sortField, from, size);
@ -953,7 +948,7 @@ public class JobProvider {
public QueryPage<ModelDebugOutput> modelDebugOutput(String jobId, int from, int size) {
SearchResponse searchResponse;
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
LOGGER.trace("ES API CALL: search result type {} from index {} from {}, size {}",
ModelDebugOutput.RESULT_TYPE_VALUE, indexName, from, size);
@ -986,7 +981,7 @@ public class JobProvider {
LOGGER.trace("ES API CALL: get result type {} ID {} for job {}",
ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, jobId);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
get(jobId, indexName, Result.TYPE.getPreferredName(), ModelSizeStats.documentId(jobId),
handler, errorHandler, (parser, context) -> ModelSizeStats.PARSER.apply(parser, null).build(),
() -> {

View File

@ -73,7 +73,7 @@ public class JobResultsPersister extends AbstractComponent {
private Builder(String jobId) {
this.jobId = Objects.requireNonNull(jobId);
indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
bulkRequest = new BulkRequest();
}
@ -212,7 +212,7 @@ public class JobResultsPersister extends AbstractComponent {
public void persistCategoryDefinition(CategoryDefinition category) {
Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE.getPreferredName(),
CategoryDefinition.documentId(category.getJobId(), Long.toString(category.getCategoryId())));
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(category.getJobId()));
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(category.getJobId()));
// Don't commit as we expect masses of these updates and they're not
// read again by this process
}
@ -239,11 +239,11 @@ public class JobResultsPersister extends AbstractComponent {
public void persistModelSnapshot(ModelSnapshot modelSnapshot) {
Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(),
ModelSnapshot.documentId(modelSnapshot));
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId()));
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()));
}
public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer<Boolean> handler, Consumer<Exception> errorHandler) {
String index = AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId());
String index = AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId());
IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot));
try {
indexRequest.source(toXContentBuilder(modelSnapshot));
@ -261,9 +261,9 @@ public class JobResultsPersister extends AbstractComponent {
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
Persistable persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(),
ModelSizeStats.documentId(jobId));
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(jobId));
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(), null);
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(jobId));
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
// Don't commit as we expect masses of these updates and they're only
// for information at the API level
}
@ -273,7 +273,7 @@ public class JobResultsPersister extends AbstractComponent {
*/
public void persistModelDebugOutput(ModelDebugOutput modelDebugOutput) {
Persistable persistable = new Persistable(modelDebugOutput.getJobId(), modelDebugOutput, Result.TYPE.getPreferredName(), null);
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(modelDebugOutput.getJobId()));
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelDebugOutput.getJobId()));
// Don't commit as we expect masses of these updates and they're not
// read again by this process
}
@ -295,7 +295,7 @@ public class JobResultsPersister extends AbstractComponent {
* @return True if successful
*/
public boolean commitResultWrites(String jobId) {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
// Refresh should wait for Lucene to make the data searchable
logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName);
client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet();

View File

@ -7,13 +7,16 @@ package org.elasticsearch.xpack.ml.job.persistence;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
@ -38,54 +41,38 @@ public class JobStorageDeletionTask extends Task {
CheckedConsumer<Boolean, Exception> finishedHandler,
Consumer<Exception> failureHandler) {
String indexName = AnomalyDetectorsIndex.getCurrentResultsIndex(state, jobId);
String indexPattern = indexName + "-*";
final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(state, jobId);
final String indexPattern = indexName + "-*";
final String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
// Step 2. Regardless of if the DBQ succeeds, we delete the physical index
CheckedConsumer<IndicesAliasesResponse, Exception> deleteAliasHandler = indicesAliasesResponse -> {
if (!indicesAliasesResponse.isAcknowledged()) {
logger.warn("Delete Alias request not acknowledged for alias [" + aliasName + "].");
} else {
logger.info("Done deleting alias [" + aliasName + "]");
}
finishedHandler.accept(true);
};
// Step 2. DBQ done, delete the alias
// -------
// TODO this will be removed once shared indices are used
// TODO norelease more robust handling of failures?
CheckedConsumer<BulkByScrollResponse, Exception> dbqHandler = bulkByScrollResponse -> {
if (bulkByScrollResponse.isTimedOut()) {
logger.warn("DeleteByQuery for index [" + indexPattern + "] timed out. Continuing to delete index.");
logger.warn("DeleteByQuery for indices [" + indexName + ", " + indexPattern + "] timed out.");
}
if (!bulkByScrollResponse.getBulkFailures().isEmpty()) {
logger.warn("[" + bulkByScrollResponse.getBulkFailures().size()
+ "] failures encountered while running DeleteByQuery on index [" + indexPattern + "]. "
+ "Continuing to delete index");
+ "] failures encountered while running DeleteByQuery on indices [" + indexName + ", "
+ indexPattern + "]. ");
}
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
client.admin().indices().delete(deleteIndexRequest, ActionListener.wrap(deleteIndexResponse -> {
logger.info("Deleting index [" + indexName + "] successful");
if (deleteIndexResponse.isAcknowledged()) {
logger.info("Index deletion acknowledged");
} else {
logger.warn("Index deletion not acknowledged");
}
finishedHandler.accept(deleteIndexResponse.isAcknowledged());
}, missingIndexHandler(indexName, finishedHandler, failureHandler)));
};
// Step 1. DeleteByQuery on the index, matching all docs with the right job_id
// -------
SearchRequest searchRequest = new SearchRequest(indexPattern);
searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
searchRequest.source(new SearchSourceBuilder().query(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)));
request.setSlices(5);
client.execute(MlDeleteByQueryAction.INSTANCE, request,
ActionListener.wrap(dbqHandler, missingIndexHandler(indexName, finishedHandler, failureHandler)));
}
// If the index doesn't exist, we need to catch the exception and carry onwards so that the cluster
// state is properly updated
private Consumer<Exception> missingIndexHandler(String indexName, CheckedConsumer<Boolean, Exception> finishedHandler,
Consumer<Exception> failureHandler) {
return e -> {
IndicesAliasesRequest request = new IndicesAliasesRequest()
.addAliasAction(IndicesAliasesRequest.AliasActions.remove().alias(aliasName).index(indexName));
client.admin().indices().aliases(request, ActionListener.wrap(deleteAliasHandler,
e -> {
if (e instanceof IndexNotFoundException) {
logger.warn("Physical index [" + indexName + "] not found. Continuing to delete job.");
logger.warn("Alias [" + aliasName + "] not found. Continuing to delete job.");
try {
finishedHandler.accept(false);
} catch (Exception e1) {
@ -95,6 +82,19 @@ public class JobStorageDeletionTask extends Task {
// all other exceptions should die
failureHandler.accept(e);
}
}));
};
// Step 1. DeleteByQuery on the index, matching all docs with the right job_id
// -------
logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]");
SearchRequest searchRequest = new SearchRequest(indexName, indexPattern);
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
searchRequest.source(new SearchSourceBuilder().query(query));
searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setSlices(5);
client.execute(MlDeleteByQueryAction.INSTANCE, request, ActionListener.wrap(dbqHandler, failureHandler));
}
}

View File

@ -66,7 +66,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
LOGGER.info("Removing model snapshots of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
QueryBuilder excludeFilter = QueryBuilders.termQuery(ModelSnapshot.SNAPSHOT_ID.getPreferredName(), job.getModelSnapshotId());
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(AnomalyDetectorsIndex.jobResultsIndexName(job.getId()));
searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
searchRequest.types(ModelSnapshot.TYPE.getPreferredName());
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs).mustNot(excludeFilter);
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE));

View File

@ -87,7 +87,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
request.setSlices(5);
searchRequest.indices(AnomalyDetectorsIndex.jobResultsIndexName(job.getId()));
searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
searchRequest.types(type);
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs).mustNot(excludeFilter);
searchRequest.source(new SearchSourceBuilder().query(query));

View File

@ -283,8 +283,7 @@ public class OpenJobActionTests extends ESTestCase {
routingTable = new RoutingTable.Builder(cs.routingTable());
MlMetadata mlMetadata = cs.metaData().custom(MlMetadata.TYPE);
Job job = mlMetadata.getJobs().get("job_id");
String indexToRemove = randomFrom(OpenJobAction.indicesOfInterest(job));
String indexToRemove = randomFrom(OpenJobAction.indicesOfInterest(cs, "job_id"));
if (randomBoolean()) {
routingTable.remove(indexToRemove);
} else {
@ -317,9 +316,10 @@ public class OpenJobActionTests extends ESTestCase {
indices.add(AnomalyDetectorsIndex.jobStateIndexName());
indices.add(AnomalyDetectorsIndex.ML_META_INDEX);
indices.add(Auditor.NOTIFICATIONS_INDEX);
for (String jobId : jobIds) {
indices.add(AnomalyDetectorsIndex.jobResultsIndexName(jobId));
}
// norelease: randomizing this throws an NPE in the test due to verifyIndicesExistAndPrimaryShardsAreActive()
// returning false. Needs fixing, deferring to a followup PR
indices.add(AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT);
for (String indexName : indices) {
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
indexMetaData.settings(Settings.builder()

View File

@ -341,7 +341,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet());
assertTrue(e.getMessage().startsWith("cannot open job [job_id], no suitable nodes found, allocation explanation"));
assertTrue(e.getMessage().endsWith("because not all primary shards are active for the following indices [.ml-anomalies-job_id]]"));
assertTrue(e.getMessage().endsWith("because not all primary shards are active for the following indices [.ml-anomalies-shared]]"));
logger.info("Start data node");
internalCluster().startNode(Settings.builder()

View File

@ -220,16 +220,15 @@ public class MlJobIT extends ESRestTestCase {
String jobConfig = String.format(Locale.ROOT, jobTemplate, "index-1");
Response response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/repeated-id" , Collections.emptyMap(),
Response response = client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/repeated-id" ,
Collections.emptyMap(),
new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
final String jobConfig2 = String.format(Locale.ROOT, jobTemplate, "index-2");
ResponseException e = expectThrows(ResponseException.class,
() ->client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/repeated-id" , Collections.emptyMap(),
new StringEntity(jobConfig2, ContentType.APPLICATION_JSON)));
() ->client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/repeated-id" ,
Collections.emptyMap(), new StringEntity(jobConfig2, ContentType.APPLICATION_JSON)));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
assertThat(e.getMessage(), containsString("The job cannot be created with the Id 'repeated-id'. The Id is already used."));
@ -259,27 +258,43 @@ public class MlJobIT extends ESRestTestCase {
assertEquals(200, response.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsIndexName(indexName)
+ "\":{\"aliases\":{\"" + AnomalyDetectorsIndex.jobResultsIndexName(jobId1) + "\":{},\"" +
AnomalyDetectorsIndex.jobResultsIndexName(jobId2)));
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName("custom-" + indexName)
+ "\":{\"aliases\":{\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "\":{},\"" +
AnomalyDetectorsIndex.jobResultsAliasedName(jobId2)));
response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(indexName));
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId1))));
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId2))));
assertThat(responseAsString, containsString(AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-" + indexName));
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId1))));
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId2))));
addBucketResult(indexName, "1234", 1);
addBucketResult(indexName, "1236", 1);
response = client().performRequest("get", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId1 + "/results/buckets");
String bucketResult = String.format(Locale.ROOT,
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"result_type\":\"bucket\", \"bucket_span\": \"%s\"}",
jobId1, "1234", 1);
String id = String.format(Locale.ROOT,
"%s_%s_%s", jobId1, "1234", 1);
response = client().performRequest("put", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/result/" + id,
Collections.emptyMap(), new StringEntity(bucketResult, ContentType.APPLICATION_JSON));
assertEquals(201, response.getStatusLine().getStatusCode());
bucketResult = String.format(Locale.ROOT,
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"result_type\":\"bucket\", \"bucket_span\": \"%s\"}",
jobId1, "1236", 1);
id = String.format(Locale.ROOT,
"%s_%s_%s", jobId1, "1236", 1);
response = client().performRequest("put", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/result/" + id,
Collections.emptyMap(), new StringEntity(bucketResult, ContentType.APPLICATION_JSON));
assertEquals(201, response.getStatusLine().getStatusCode());
client().performRequest("post", "_refresh");
response = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1 + "/results/buckets");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":2"));
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(indexName)
+ "/result/_search");
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/result/_search");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"total\":2"));
@ -287,20 +302,66 @@ public class MlJobIT extends ESRestTestCase {
response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
// check index and alias were deleted
// check that indices still exist, but are empty and aliases are gone
response = client().performRequest("get", "_aliases");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId1))));
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId2))));
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId1))));
assertThat(responseAsString, containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId2))); //job2 still exists
response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, not(containsString(indexName)));
assertThat(responseAsString, containsString(AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-" + indexName));
client().performRequest("post", "_refresh");
response = client().performRequest("get", AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-" + indexName + "/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":0"));
}
public void testCreateJobInSharedIndexUpdatesMapping() throws Exception {
String jobTemplate = "{\n" +
" \"analysis_config\" : {\n" +
" \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"metric\", \"by_field_name\":\"%s\"}]\n" +
" }\n" +
"}";
String jobId1 = "job-1";
String byFieldName1 = "responsetime";
String jobId2 = "job-2";
String byFieldName2 = "cpu-usage";
String jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName1);
Response response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
// Check the index mapping contains the first by_field_name
response = client().performRequest("get", AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX
+ AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT + "/_mapping?pretty");
assertEquals(200, response.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(byFieldName1));
assertThat(responseAsString, not(containsString(byFieldName2)));
jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName2);
response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
// Check the index mapping now contains both fields
response = client().performRequest("get", AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX
+ AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT + "/_mapping?pretty");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(byFieldName1));
assertThat(responseAsString, containsString(byFieldName2));
}
public void testCreateJobInCustomSharedIndexUpdatesMapping() throws Exception {
String jobTemplate = "{\n" +
" \"analysis_config\" : {\n" +
" \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"metric\", \"by_field_name\":\"%s\"}]\n" +
@ -318,7 +379,7 @@ public class MlJobIT extends ESRestTestCase {
assertEquals(200, response.getStatusLine().getStatusCode());
// Check the index mapping contains the first by_field_name
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName("shared-index") + "/_mapping?pretty");
response = client().performRequest("get", AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-shared-index" + "/_mapping?pretty");
assertEquals(200, response.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(byFieldName1));
@ -330,7 +391,7 @@ public class MlJobIT extends ESRestTestCase {
assertEquals(200, response.getStatusLine().getStatusCode());
// Check the index mapping now contains both fields
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName("shared-index") + "/_mapping?pretty");
response = client().performRequest("get", AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-shared-index" + "/_mapping?pretty");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(byFieldName1));
@ -339,7 +400,7 @@ public class MlJobIT extends ESRestTestCase {
public void testDeleteJob() throws Exception {
String jobId = "foo";
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT;
createFarequoteJob(jobId);
Response response = client().performRequest("get", "_cat/indices");
@ -350,11 +411,23 @@ public class MlJobIT extends ESRestTestCase {
response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
// check index was deleted
// check that the index still exists (it's shared by default)
response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, not(containsString(indexName)));
assertThat(responseAsString, containsString(indexName));
assertBusy(() -> {
try {
Response r = client().performRequest("get", indexName + "/_count");
assertEquals(200, r.getStatusLine().getStatusCode());
String responseString = responseEntityToString(r);
assertThat(responseString, containsString("\"count\":0"));
} catch (Exception e) {
fail(e.getMessage());
}
});
// check that the job itself is gone
expectThrows(ResponseException.class, () ->
@ -363,7 +436,8 @@ public class MlJobIT extends ESRestTestCase {
public void testDeleteJobAfterMissingIndex() throws Exception {
String jobId = "foo";
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT;
createFarequoteJob(jobId);
Response response = client().performRequest("get", "_cat/indices");
@ -383,6 +457,7 @@ public class MlJobIT extends ESRestTestCase {
response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, not(containsString(aliasName)));
assertThat(responseAsString, not(containsString(indexName)));
expectThrows(ResponseException.class, () ->
@ -391,7 +466,7 @@ public class MlJobIT extends ESRestTestCase {
public void testMultiIndexDelete() throws Exception {
String jobId = "foo";
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT;
createFarequoteJob(jobId);
Response response = client().performRequest("put", indexName + "-001");
@ -412,28 +487,32 @@ public class MlJobIT extends ESRestTestCase {
String.format(Locale.ROOT,
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"sequence_num\": %d, \"result_type\":\"record\"}",
jobId, 123, 1, 1);
client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/result/" + 123,
client().performRequest("put", indexName + "/result/" + 123,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-001/result/" + 123,
client().performRequest("put", indexName + "-001/result/" + 123,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-002/result/" + 123,
client().performRequest("put", indexName + "-002/result/" + 123,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
// Also index a few through the alias for the first job
client().performRequest("put", indexName + "/result/" + 456,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
client().performRequest("post", "_refresh");
// check for the documents
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/_count");
response = client().performRequest("get", indexName+ "/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":2"));
response = client().performRequest("get", indexName + "-001/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":1"));
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-001/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":1"));
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-002/_count");
response = client().performRequest("get", indexName + "-002/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":1"));
@ -444,30 +523,38 @@ public class MlJobIT extends ESRestTestCase {
client().performRequest("post", "_refresh");
// check index was deleted
// check that the indices still exist but are empty
response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, not(containsString("\t" + indexName + "\t")));
assertThat(responseAsString, containsString(indexName));
assertThat(responseAsString, containsString(indexName + "-001"));
assertThat(responseAsString, containsString(indexName + "-002"));
// The other two indices won't be deleted, but the DBQ should have cleared them out
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-001/_count");
response = client().performRequest("get", indexName + "/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":0"));
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "-002/_count");
response = client().performRequest("get", indexName + "-001/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":0"));
response = client().performRequest("get", indexName + "-002/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":0"));
expectThrows(ResponseException.class, () ->
client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
}
private Response addBucketResult(String jobId, String timestamp, long bucketSpan) throws Exception {
try {
client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId),
client().performRequest("put",
AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT,
Collections.emptyMap(), new StringEntity(RESULT_MAPPING, ContentType.APPLICATION_JSON));
} catch (ResponseException e) {
// it is ok: the index already exists
@ -480,14 +567,15 @@ public class MlJobIT extends ESRestTestCase {
jobId, timestamp, bucketSpan);
String id = String.format(Locale.ROOT,
"%s_%s_%s", jobId, timestamp, bucketSpan);
return client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/result/" + id,
return client().performRequest("put", AnomalyDetectorsIndex.jobResultsAliasedName(jobId) + "/result/" + id,
Collections.singletonMap("refresh", "true"), new StringEntity(bucketResult, ContentType.APPLICATION_JSON));
}
private Response addRecordResult(String jobId, String timestamp, long bucketSpan, int sequenceNum) throws Exception {
try {
client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId), Collections.emptyMap(),
new StringEntity(RESULT_MAPPING, ContentType.APPLICATION_JSON));
client().performRequest("put",
AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT,
Collections.emptyMap(), new StringEntity(RESULT_MAPPING, ContentType.APPLICATION_JSON));
} catch (ResponseException e) {
// it is ok: the index already exists
assertThat(e.getMessage(), containsString("resource_already_exists_exception"));
@ -498,7 +586,7 @@ public class MlJobIT extends ESRestTestCase {
String.format(Locale.ROOT,
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"sequence_num\": %d, \"result_type\":\"record\"}",
jobId, timestamp, bucketSpan, sequenceNum);
return client().performRequest("put", AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "/result/" + timestamp,
return client().performRequest("put", AnomalyDetectorsIndex.jobResultsAliasedName(jobId) + "/result/" + timestamp,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
}

View File

@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import java.io.IOException;
import java.util.Collections;
@ -30,6 +31,7 @@ public class MlRestTestStateCleaner {
public void clearMlMetadata() throws IOException {
deleteAllDatafeeds();
deleteAllJobs();
deleteDotML();
}
@SuppressWarnings("unchecked")
@ -82,4 +84,8 @@ public class MlRestTestStateCleaner {
client.performRequest("DELETE", "/_xpack/ml/anomaly_detectors/" + jobId);
}
}
private void deleteDotML() throws IOException {
client.performRequest("DELETE", ".ml-*?ignore_unavailable=true");
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import java.util.Arrays;
@ -323,21 +324,21 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
public void testBuilder_setsDefaultIndexName() {
Job.Builder builder = buildJobBuilder("foo");
Job job = builder.build();
assertEquals("foo", job.getResultsIndexName());
assertEquals(AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT, job.getResultsIndexName());
}
public void testBuilder_setsIndexName() {
Job.Builder builder = buildJobBuilder("foo");
builder.setResultsIndexName("carol");
Job job = builder.build();
assertEquals("carol", job.getResultsIndexName());
assertEquals(AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-carol", job.getResultsIndexName());
}
public void testBuilder_withInvalidIndexNameThrows () {
Job.Builder builder = buildJobBuilder("foo");
builder.setResultsIndexName("_bad^name");
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> builder.build());
assertEquals(Messages.getMessage(Messages.INVALID_ID, Job.RESULTS_INDEX_NAME.getPreferredName()), e.getMessage());
assertEquals(Messages.getMessage(Messages.INVALID_ID, Job.RESULTS_INDEX_NAME.getPreferredName(), "_bad^name"), e.getMessage());
}
public static Job.Builder buildJobBuilder(String id, Date date) {

View File

@ -40,7 +40,7 @@ public class JobDataDeleterTests extends ESTestCase {
BulkResponse bulkResponse = Mockito.mock(BulkResponse.class);
Client client = new MockClientBuilder("myCluster")
.prepareSearchExecuteListener(AnomalyDetectorsIndex.jobResultsIndexName("foo"), response)
.prepareSearchExecuteListener(AnomalyDetectorsIndex.jobResultsAliasedName("foo"), response)
.prepareSearchScrollExecuteListener(response)
.prepareBulk(bulkResponse).build();
@ -92,7 +92,7 @@ public class JobDataDeleterTests extends ESTestCase {
verify(client, times(5))
.prepareDelete(eq(AnomalyDetectorsIndex.jobStateIndexName()), eq(ModelState.TYPE.getPreferredName()), anyString());
verify(client, times(1))
.prepareDelete(eq(AnomalyDetectorsIndex.jobResultsIndexName(jobId)), eq(ModelSnapshot.TYPE.getPreferredName()),
.prepareDelete(eq(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)), eq(ModelSnapshot.TYPE.getPreferredName()),
eq("foo-snap-1"));
}

View File

@ -24,15 +24,12 @@ import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
@ -133,9 +130,12 @@ public class JobProviderTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void testCreateJobResultsIndex() {
String resultsIndexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT;
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor);
clientBuilder.createIndexRequest(resultsIndexName, captor);
clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.jobResultsAliasedName("foo"));
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
Job.Builder job = buildJobBuilder("foo");
@ -145,13 +145,21 @@ public class JobProviderTests extends ESTestCase {
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(ImmutableOpenMap.of())).build();
ClusterService clusterService = mock(ClusterService.class);
doAnswer(invocationOnMock -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs);
return null;
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class));
provider.createJobResultIndex(job.build(), cs, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
CreateIndexRequest request = captor.getValue();
assertNotNull(request);
assertEquals(AnomalyDetectorsIndex.jobResultsIndexName("foo"), request.index());
clientBuilder.verifyIndexCreated(AnomalyDetectorsIndex.jobResultsIndexName("foo"));
assertEquals(resultsIndexName, request.index());
clientBuilder.verifyIndexCreated(resultsIndexName);
resultHolder.set(aBoolean);
}
@ -168,7 +176,8 @@ public class JobProviderTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void testCreateJobWithExistingIndex() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsIndexName("foo"), AnomalyDetectorsIndex.jobResultsIndexName("foo123"));
clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsAliasedName("foo"),
AnomalyDetectorsIndex.jobResultsAliasedName("foo123"));
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class);
@ -176,7 +185,7 @@ public class JobProviderTests extends ESTestCase {
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings =
ImmutableOpenMap.<String, ImmutableOpenMap<String, MappingMetaData>>builder()
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), typeMappings).build();
.fPut(AnomalyDetectorsIndex.jobResultsAliasedName("foo"), typeMappings).build();
when(getMappingsResponse.mappings()).thenReturn(mappings);
clientBuilder.prepareGetMapping(getMappingsResponse);
@ -185,7 +194,7 @@ public class JobProviderTests extends ESTestCase {
JobProvider provider = createProvider(clientBuilder.build());
Index index = mock(Index.class);
when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("foo"));
when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsAliasedName("foo"));
IndexMetaData indexMetaData = mock(IndexMetaData.class);
when(indexMetaData.getIndex()).thenReturn(index);
@ -193,26 +202,26 @@ public class JobProviderTests extends ESTestCase {
when(indexMetaData.getAliases()).thenReturn(aliases);
ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder()
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), indexMetaData).build();
.fPut(AnomalyDetectorsIndex.jobResultsAliasedName("foo"), indexMetaData).build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
ClusterState cs2 = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build();
ClusterService clusterService = mock(ClusterService.class);
doAnswer(invocationOnMock -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs);
task.execute(cs2);
return null;
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo123"), any(AckedClusterStateUpdateTask.class));
doAnswer(invocationOnMock -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs);
task.execute(cs2);
return null;
}).when(clusterService).submitStateUpdateTask(eq("index-aliases"), any(AckedClusterStateUpdateTask.class));
provider.createJobResultIndex(job.build(), cs, new ActionListener<Boolean>() {
provider.createJobResultIndex(job.build(), cs2, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
assertTrue(aBoolean);
@ -228,10 +237,13 @@ public class JobProviderTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void testCreateJobRelatedIndicies_createsAliasBecauseIndexNameIsSet() {
String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-bar";
String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName("foo");
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor);
clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsIndexName("bar"), AnomalyDetectorsIndex.jobResultsIndexName("foo"));
clientBuilder.createIndexRequest(indexName, captor);
clientBuilder.prepareAlias(indexName, aliasName);
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
Job.Builder job = buildJobBuilder("foo");
@ -239,19 +251,19 @@ public class JobProviderTests extends ESTestCase {
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
Index index = mock(Index.class);
when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("foo"));
IndexMetaData indexMetaData = mock(IndexMetaData.class);
when(indexMetaData.getIndex()).thenReturn(index);
ImmutableOpenMap<String, AliasMetaData> aliases = ImmutableOpenMap.of();
when(indexMetaData.getAliases()).thenReturn(aliases);
ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder()
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), indexMetaData).build();
ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder().build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build();
ClusterService clusterService = mock(ClusterService.class);
doAnswer(invocationOnMock -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs);
return null;
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class));
provider.createJobResultIndex(job.build(), cs, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
@ -265,53 +277,6 @@ public class JobProviderTests extends ESTestCase {
});
}
@SuppressWarnings("unchecked")
public void testCreateJobRelatedIndicies_doesntCreateAliasIfIndexNameIsSameAsJobId() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor);
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class);
ImmutableOpenMap<String, MappingMetaData> typeMappings = ImmutableOpenMap.of();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings =
ImmutableOpenMap.<String, ImmutableOpenMap<String, MappingMetaData>>builder()
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), typeMappings).build();
when(getMappingsResponse.mappings()).thenReturn(mappings);
clientBuilder.prepareGetMapping(getMappingsResponse);
Job.Builder job = buildJobBuilder("foo");
job.setResultsIndexName("foo");
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
Index index = mock(Index.class);
when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("foo"));
IndexMetaData indexMetaData = mock(IndexMetaData.class);
when(indexMetaData.getIndex()).thenReturn(index);
ImmutableOpenMap<String, AliasMetaData> aliases = ImmutableOpenMap.of();
when(indexMetaData.getAliases()).thenReturn(aliases);
ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder()
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), indexMetaData).build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build();
provider.createJobResultIndex(job.build(), cs, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
verify(client.admin().indices(), never()).prepareAliases();
verify(clientBuilder.build().admin().indices(), times(1)).preparePutMapping(any());
}
@Override
public void onFailure(Exception e) {
fail(e.toString());
}
});
}
public void testDeleteJobRelatedIndices() throws InterruptedException, ExecutionException, IOException {
@SuppressWarnings("unchecked")
ActionListener<DeleteJobAction.Response> actionListener = mock(ActionListener.class);
@ -320,8 +285,8 @@ public class JobProviderTests extends ESTestCase {
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
clientBuilder.resetIndices();
clientBuilder.addIndicesExistsResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true)
.addIndicesDeleteResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true,
clientBuilder.addIndicesExistsResponse(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), true)
.addIndicesDeleteResponse(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), true,
false, actionListener);
clientBuilder.build();
@ -340,8 +305,8 @@ public class JobProviderTests extends ESTestCase {
Client client = clientBuilder.build();
JobProvider provider = createProvider(client);
clientBuilder.resetIndices();
clientBuilder.addIndicesExistsResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true)
.addIndicesDeleteResponse(AnomalyDetectorsIndex.jobResultsIndexName(jobId), true,
clientBuilder.addIndicesExistsResponse(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), true)
.addIndicesDeleteResponse(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), true,
true, actionListener);
clientBuilder.build();
@ -365,7 +330,7 @@ public class JobProviderTests extends ESTestCase {
source.add(map);
QueryBuilder[] queryBuilderHolder = new QueryBuilder[1];
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
int from = 0;
int size = 10;
Client client = getMockedClient(queryBuilder -> {queryBuilderHolder[0] = queryBuilder;}, response);
@ -399,7 +364,7 @@ public class JobProviderTests extends ESTestCase {
source.add(map);
QueryBuilder[] queryBuilderHolder = new QueryBuilder[1];
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
int from = 99;
int size = 17;
@ -434,7 +399,7 @@ public class JobProviderTests extends ESTestCase {
source.add(map);
QueryBuilder[] queryBuilderHolder = new QueryBuilder[1];
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
int from = 99;
int size = 17;
@ -466,7 +431,7 @@ public class JobProviderTests extends ESTestCase {
Long timestamp = 98765432123456789L;
List<Map<String, Object>> source = new ArrayList<>();
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(false, source);
Client client = getMockedClient(queryBuilder -> {}, response);
JobProvider provider = createProvider(client);
@ -490,7 +455,7 @@ public class JobProviderTests extends ESTestCase {
map.put("bucket_span", 22);
source.add(map);
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
Client client = getMockedClient(queryBuilder -> {}, response);
JobProvider provider = createProvider(client);
@ -518,7 +483,7 @@ public class JobProviderTests extends ESTestCase {
map.put("is_interim", true);
source.add(map);
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
Client client = getMockedClient(queryBuilder -> {}, response);
JobProvider provider = createProvider(client);
@ -557,7 +522,7 @@ public class JobProviderTests extends ESTestCase {
int from = 14;
int size = 2;
String sortfield = "minefield";
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
Client client = getMockedClient(qb -> {}, response);
JobProvider provider = createProvider(client);
@ -607,7 +572,7 @@ public class JobProviderTests extends ESTestCase {
int from = 14;
int size = 2;
String sortfield = "minefield";
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
Client client = getMockedClient(qb -> {}, response);
JobProvider provider = createProvider(client);
@ -665,7 +630,7 @@ public class JobProviderTests extends ESTestCase {
int from = 14;
int size = 2;
String sortfield = "minefield";
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
Client client = getMockedClient(qb -> {}, response);
JobProvider provider = createProvider(client);
@ -702,7 +667,7 @@ public class JobProviderTests extends ESTestCase {
source.add(recordMap);
}
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
Client client = getMockedClient(qb -> {}, response);
JobProvider provider = createProvider(client);
@ -731,7 +696,7 @@ public class JobProviderTests extends ESTestCase {
source.add(recordMap);
}
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
Client client = getMockedClient(qb -> {}, response);
JobProvider provider = createProvider(client);
@ -757,7 +722,7 @@ public class JobProviderTests extends ESTestCase {
source.add(map);
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
int from = 0;
int size = 10;
Client client = getMockedClient(q -> {}, response);
@ -783,7 +748,7 @@ public class JobProviderTests extends ESTestCase {
source.put("category_id", categoryId);
source.put("terms", terms);
SearchResponse response = createSearchResponse(Collections.singletonList(source));
SearchResponse response = createSearchResponse(true, Collections.singletonList(source));
Client client = getMockedClient(q -> {}, response);
JobProvider provider = createProvider(client);
@SuppressWarnings({"unchecked", "rawtypes"})
@ -826,7 +791,7 @@ public class JobProviderTests extends ESTestCase {
int from = 4;
int size = 3;
QueryBuilder[] qbHolder = new QueryBuilder[1];
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
Client client = getMockedClient(q -> qbHolder[0] = q, response);
JobProvider provider = createProvider(client);
@ -888,7 +853,7 @@ public class JobProviderTests extends ESTestCase {
int from = 4;
int size = 3;
QueryBuilder[] qbHolder = new QueryBuilder[1];
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
Client client = getMockedClient(q -> qbHolder[0] = q, response);
JobProvider provider = createProvider(client);
@ -943,7 +908,7 @@ public class JobProviderTests extends ESTestCase {
int from = 4;
int size = 3;
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
Client client = getMockedClient(qb -> {}, response);
JobProvider provider = createProvider(client);
@ -994,7 +959,7 @@ public class JobProviderTests extends ESTestCase {
int from = 4;
int size = 3;
QueryBuilder[] qbHolder = new QueryBuilder[1];
SearchResponse response = createSearchResponse(source);
SearchResponse response = createSearchResponse(true, source);
Client client = getMockedClient(qb -> qbHolder[0] = qb, response);
JobProvider provider = createProvider(client);
@ -1149,7 +1114,7 @@ public class JobProviderTests extends ESTestCase {
return getResponse;
}
private static SearchResponse createSearchResponse(List<Map<String, Object>> source) throws IOException {
private static SearchResponse createSearchResponse(boolean exists, List<Map<String, Object>> source) throws IOException {
SearchResponse response = mock(SearchResponse.class);
List<SearchHit> list = new ArrayList<>();

View File

@ -103,9 +103,9 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
assertThat(capturedSearchRequests.size(), equalTo(2));
SearchRequest searchRequest = capturedSearchRequests.get(0);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-1")}));
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
searchRequest = capturedSearchRequests.get(1);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-2")}));
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")}));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3));
DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0);
@ -137,9 +137,9 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
assertThat(capturedSearchRequests.size(), equalTo(2));
SearchRequest searchRequest = capturedSearchRequests.get(0);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-1")}));
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
searchRequest = capturedSearchRequests.get(1);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-2")}));
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")}));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0));
}
@ -162,9 +162,9 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
assertThat(capturedSearchRequests.size(), equalTo(2));
SearchRequest searchRequest = capturedSearchRequests.get(0);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-1")}));
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
searchRequest = capturedSearchRequests.get(1);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("snapshots-2")}));
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")}));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3));
DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0);

View File

@ -98,9 +98,9 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
assertThat(capturedDeleteByQueryRequests.size(), equalTo(2));
DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-1")}));
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")}));
dbqRequest = capturedDeleteByQueryRequests.get(1);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-2")}));
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")}));
}
public void testOnTrigger_GivenClientRequestsFailed_StillIteratesThroughJobs() throws IOException {
@ -115,9 +115,9 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
assertThat(capturedDeleteByQueryRequests.size(), equalTo(2));
DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-1")}));
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")}));
dbqRequest = capturedDeleteByQueryRequests.get(1);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsIndexName("results-2")}));
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")}));
}
private void givenClientRequestsSucceed() {

View File

@ -0,0 +1,345 @@
setup:
- skip:
features: ["headers"]
---
"Test CRUD on two jobs in shared index":
- do:
xpack.ml.put_job:
job_id: farequote
body: >
{
"job_id":"farequote",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"JSON",
"time_field":"time",
"time_format":"epoch"
}
}
- match: { job_id: "farequote" }
- do:
xpack.ml.put_job:
job_id: farequote2
body: >
{
"job_id":"farequote2",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"JSON",
"time_field":"time",
"time_format":"epoch"
}
}
- match: { job_id: "farequote2" }
- do:
xpack.ml.open_job:
job_id: farequote
- do:
xpack.ml.open_job:
job_id: farequote2
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
xpack.ml.post_data:
job_id: farequote
body: >
{"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"}
{"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"}
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
xpack.ml.post_data:
job_id: farequote2
body: >
{"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"}
- do:
xpack.ml.flush_job:
job_id: farequote
- match: { flushed: true }
- do:
xpack.ml.flush_job:
job_id: farequote2
- match: { flushed: true }
- do:
xpack.ml.close_job:
job_id: farequote
- match: { closed: true }
- do:
xpack.ml.close_job:
job_id: farequote2
- match: { closed: true }
- do:
indices.refresh: {}
- do:
indices.exists:
index: ".ml-anomalies-farequote"
- is_true: ''
- do:
indices.exists:
index: ".ml-anomalies-farequote"
- is_true: ''
- do:
count:
index: .ml-anomalies-shared
- match: {count: 8}
- do:
count:
index: .ml-anomalies-farequote
body:
query:
constant_score:
filter:
term:
job_id: farequote
- match: {count: 4}
- do:
count:
index: .ml-anomalies-shared
body:
query:
constant_score:
filter:
term:
job_id: farequote
- match: {count: 4}
- do:
count:
index: .ml-anomalies-farequote2
body:
query:
constant_score:
filter:
term:
job_id: farequote2
- match: {count: 4}
- do:
count:
index: .ml-anomalies-shared
body:
query:
constant_score:
filter:
term:
job_id: farequote2
- match: {count: 4}
- do:
xpack.ml.delete_job:
job_id: "farequote"
- match: { acknowledged: true }
- do:
indices.refresh: {}
- do:
indices.exists:
index: ".ml-anomalies-shared"
- is_true: ''
- do:
count:
index: .ml-anomalies-shared
body:
query:
constant_score:
filter:
term:
job_id: farequote
- match: {count: 0}
- do:
count:
index: .ml-anomalies-shared
- match: {count: 4}
- do:
count:
index: .ml-anomalies-farequote2
body:
query:
constant_score:
filter:
term:
job_id: farequote2
- match: {count: 4}
- do:
count:
index: .ml-anomalies-shared
body:
query:
constant_score:
filter:
term:
job_id: farequote2
- match: {count: 4}
- do:
xpack.ml.delete_job:
job_id: "farequote2"
- match: { acknowledged: true }
- do:
indices.refresh: {}
- do:
indices.exists:
index: ".ml-anomalies-shared"
- is_true: ''
- do:
indices.exists:
index: ".ml-anomalies-farequote"
- is_false: ''
- do:
indices.exists:
index: ".ml-anomalies-farequote2"
- is_false: ''
- do:
count:
index: .ml-anomalies-shared
- match: {count: 0}
---
"Test unrelated index":
- do:
xpack.ml.put_job:
job_id: farequote
body: >
{
"job_id":"farequote",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span":3600,
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"JSON",
"time_field":"time",
"time_format":"epoch"
}
}
- match: { job_id: "farequote" }
- do:
xpack.ml.open_job:
job_id: farequote
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
xpack.ml.post_data:
job_id: farequote
body: >
{"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"}
{"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"}
- do:
indices.create:
index: foo
- do:
indices.create:
index: .ml-anomalies-foo
- do:
index:
index: foo
type: foo
body:
key: value
- do:
index:
index: .ml-anomalies-foo
type: foo
body:
key: value
- do:
index:
index: .ml-anomalies-foo
type: foo
body:
key: value
job_id: foo
- do:
xpack.ml.flush_job:
job_id: farequote
- match: { flushed: true }
- do:
xpack.ml.close_job:
job_id: farequote
- match: { closed: true }
- do:
xpack.ml.delete_job:
job_id: "farequote"
- match: { acknowledged: true }
- do:
indices.refresh: {}
- do:
count:
index: .ml-anomalies-shared
- match: {count: 0}
- do:
count:
index: foo
- match: {count: 1}
- do:
count:
index: .ml-anomalies-foo
- match: {count: 2}