[ML] Use Index Templates (elastic/x-pack-elasticsearch#622)
* [ML] Index template for notifications, meta and state indices * Add ignore_unavailable option to searches so missing indices don’t throw errors * Test for job existence before returning results * Template mapping for results indices * Fix tests * Reinstate merge conflict dropped code * Fix and tidy up JobProviderTests Tests had badly named & duplicated tests * Don’t check if the job indices already exist when allocating Indices are created on demand from templates * No need to create indices in initialisation service Using templates now * Remove unused code and add comment Original commit: elastic/x-pack-elasticsearch@1ba115d4fc
This commit is contained in:
parent
7f6edbdd4b
commit
a09e8a260a
|
@ -35,9 +35,10 @@ public class MlInitializationService extends AbstractComponent implements Cluste
|
|||
private final JobProvider jobProvider;
|
||||
|
||||
private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false);
|
||||
private final AtomicBoolean createMlAuditIndexCheck = new AtomicBoolean(false);
|
||||
private final AtomicBoolean createMlMetaIndexCheck = new AtomicBoolean(false);
|
||||
private final AtomicBoolean createStateIndexCheck = new AtomicBoolean(false);
|
||||
private final AtomicBoolean putMlNotificationsIndexTemplateCheck = new AtomicBoolean(false);
|
||||
private final AtomicBoolean putMlMetaIndexTemplateCheck = new AtomicBoolean(false);
|
||||
private final AtomicBoolean putStateIndexTemplateCheck = new AtomicBoolean(false);
|
||||
private final AtomicBoolean putResultsIndexTemplateCheck = new AtomicBoolean(false);
|
||||
|
||||
private volatile MlDailyManagementService mlDailyManagementService;
|
||||
|
||||
|
@ -62,9 +63,10 @@ public class MlInitializationService extends AbstractComponent implements Cluste
|
|||
if (event.localNodeMaster()) {
|
||||
MetaData metaData = event.state().metaData();
|
||||
installMlMetadata(metaData);
|
||||
createMlAuditIndex(metaData);
|
||||
createMlMetaIndex(metaData);
|
||||
createStateIndex(metaData);
|
||||
putMlNoficationsIndexTemplate(metaData);
|
||||
putMlMetaIndexTemplate(metaData);
|
||||
putStateIndexTemplate(metaData);
|
||||
putResultsIndexTemplate(metaData);
|
||||
installDailyManagementService();
|
||||
} else {
|
||||
uninstallDailyManagementService();
|
||||
|
@ -97,67 +99,80 @@ public class MlInitializationService extends AbstractComponent implements Cluste
|
|||
}
|
||||
}
|
||||
|
||||
private void createMlAuditIndex(MetaData metaData) {
|
||||
if (metaData.hasIndex(Auditor.NOTIFICATIONS_INDEX) == false) {
|
||||
if (createMlAuditIndexCheck.compareAndSet(false, true)) {
|
||||
private void putMlNoficationsIndexTemplate(MetaData metaData) {
|
||||
if (metaData.templates().containsKey(Auditor.NOTIFICATIONS_INDEX) == false) {
|
||||
if (putMlNotificationsIndexTemplateCheck.compareAndSet(false, true)) {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
jobProvider.createNotificationMessageIndex((result, error) -> {
|
||||
jobProvider.putNotificationMessageIndexTemplate((result, error) -> {
|
||||
if (result) {
|
||||
logger.info("successfully created {} index", Auditor.NOTIFICATIONS_INDEX);
|
||||
logger.info("successfully created {} index template", Auditor.NOTIFICATIONS_INDEX);
|
||||
} else {
|
||||
if (error instanceof ResourceAlreadyExistsException) {
|
||||
logger.debug("not able to create {} index as it already exists", Auditor.NOTIFICATIONS_INDEX);
|
||||
} else {
|
||||
logger.error(
|
||||
new ParameterizedMessage("not able to create {} index", Auditor.NOTIFICATIONS_INDEX), error);
|
||||
}
|
||||
logger.error(
|
||||
new ParameterizedMessage("not able to create {} index template", Auditor.NOTIFICATIONS_INDEX), error);
|
||||
}
|
||||
createMlAuditIndexCheck.set(false);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
putMlNotificationsIndexTemplateCheck.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void createMlMetaIndex(MetaData metaData) {
|
||||
if (metaData.hasIndex(JobProvider.ML_META_INDEX) == false) {
|
||||
if (createMlMetaIndexCheck.compareAndSet(false, true)) {
|
||||
private void putMlMetaIndexTemplate(MetaData metaData) {
|
||||
if (metaData.templates().containsKey(JobProvider.ML_META_INDEX) == false) {
|
||||
if (putMlMetaIndexTemplateCheck.compareAndSet(false, true)) {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
jobProvider.createMetaIndex((result, error) -> {
|
||||
jobProvider.putMetaIndexTemplate((result, error) -> {
|
||||
if (result) {
|
||||
logger.info("successfully created {} index", JobProvider.ML_META_INDEX);
|
||||
logger.info("successfully created {} index template", JobProvider.ML_META_INDEX);
|
||||
} else {
|
||||
if (error instanceof ResourceAlreadyExistsException) {
|
||||
logger.debug("not able to create {} index as it already exists", JobProvider.ML_META_INDEX);
|
||||
} else {
|
||||
logger.error(new ParameterizedMessage("not able to create {} index", JobProvider.ML_META_INDEX), error);
|
||||
}
|
||||
logger.error(
|
||||
new ParameterizedMessage("not able to create {} index template", JobProvider.ML_META_INDEX), error);
|
||||
}
|
||||
createMlMetaIndexCheck.set(false);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
putMlMetaIndexTemplateCheck.set(false);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private void createStateIndex(MetaData metaData) {
|
||||
private void putStateIndexTemplate(MetaData metaData) {
|
||||
String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName();
|
||||
if (metaData.hasIndex(stateIndexName) == false) {
|
||||
if (createStateIndexCheck.compareAndSet(false, true)) {
|
||||
if (metaData.templates().containsKey(stateIndexName) == false) {
|
||||
if (putStateIndexTemplateCheck.compareAndSet(false, true)) {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
jobProvider.createJobStateIndex((result, error) -> {
|
||||
jobProvider.putJobStateIndexTemplate((result, error) -> {
|
||||
if (result) {
|
||||
logger.info("successfully created {} index", stateIndexName);
|
||||
logger.info("successfully created {} index template", stateIndexName);
|
||||
} else {
|
||||
if (error instanceof ResourceAlreadyExistsException) {
|
||||
logger.debug("not able to create {} index as it already exists", stateIndexName);
|
||||
} else {
|
||||
logger.error("not able to create " + stateIndexName + " index", error);
|
||||
}
|
||||
logger.error("not able to create " + stateIndexName + " index template", error);
|
||||
}
|
||||
createStateIndexCheck.set(false);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
putStateIndexTemplateCheck.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void putResultsIndexTemplate(MetaData metaData) {
|
||||
if (metaData.templates().containsKey(AnomalyDetectorsIndex.jobResultsIndexPrefix()) == false) {
|
||||
if (putResultsIndexTemplateCheck.compareAndSet(false, true)) {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
jobProvider.putJobResultsIndexTemplate((result, error) -> {
|
||||
if (result) {
|
||||
logger.info("successfully created {} index template", AnomalyDetectorsIndex.jobResultsIndexPrefix());
|
||||
} else {
|
||||
logger.error(
|
||||
new ParameterizedMessage("not able to create {} index template",
|
||||
AnomalyDetectorsIndex.jobResultsIndexPrefix()), error);
|
||||
}
|
||||
});
|
||||
});
|
||||
} else {
|
||||
putResultsIndexTemplateCheck.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -196,3 +211,4 @@ public class MlInitializationService extends AbstractComponent implements Cluste
|
|||
mlDailyManagementService = service;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ml.action.util.PageParams;
|
||||
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.ml.job.JobManager;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
|
@ -385,17 +386,21 @@ public class GetBucketsAction extends Action<GetBucketsAction.Request, GetBucket
|
|||
public static class TransportAction extends HandledTransportAction<Request, Response> {
|
||||
|
||||
private final JobProvider jobProvider;
|
||||
private final JobManager jobManager;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
JobProvider jobProvider) {
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
JobProvider jobProvider, JobManager jobManager) {
|
||||
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
|
||||
this.jobProvider = jobProvider;
|
||||
this.jobManager = jobManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Request request, ActionListener<Response> listener) {
|
||||
jobManager.getJobOrThrowIfUnknown(request.getJobId());
|
||||
|
||||
BucketsQueryBuilder query =
|
||||
new BucketsQueryBuilder().expand(request.expand)
|
||||
.includeInterim(request.includeInterim)
|
||||
|
|
|
@ -300,6 +300,7 @@ public class GetFiltersAction extends Action<GetFiltersAction.Request, GetFilter
|
|||
.size(pageParams.getSize());
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest(new String[]{JobProvider.ML_META_INDEX}, sourceBuilder)
|
||||
.indicesOptions(JobProvider.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS))
|
||||
.types(MlFilter.TYPE.getPreferredName());
|
||||
|
||||
transportSearchAction.execute(searchRequest, new ActionListener<SearchResponse>() {
|
||||
|
|
|
@ -339,9 +339,13 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
|
|||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
|
||||
if (Job.ALL.equals(request.getJobId())) {
|
||||
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
|
||||
request.expandedJobsIds = mlMetadata.getJobs().keySet().stream().collect(Collectors.toList());
|
||||
} else {
|
||||
if (mlMetadata.getJobs().containsKey(request.getJobId()) == false) {
|
||||
throw ExceptionsHelper.missingJobException(request.getJobId());
|
||||
}
|
||||
}
|
||||
|
||||
ActionListener<Response> finalListener = listener;
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ml.action.util.PageParams;
|
||||
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.ml.job.JobManager;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder;
|
||||
|
@ -336,17 +337,22 @@ public class GetRecordsAction extends Action<GetRecordsAction.Request, GetRecord
|
|||
public static class TransportAction extends HandledTransportAction<Request, Response> {
|
||||
|
||||
private final JobProvider jobProvider;
|
||||
private final JobManager jobManager;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
JobProvider jobProvider) {
|
||||
JobProvider jobProvider, JobManager jobManager) {
|
||||
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
|
||||
this.jobProvider = jobProvider;
|
||||
this.jobManager = jobManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Request request, ActionListener<Response> listener) {
|
||||
|
||||
jobManager.getJobOrThrowIfUnknown(request.getJobId());
|
||||
|
||||
RecordsQueryBuilder.RecordsQuery query = new RecordsQueryBuilder()
|
||||
.includeInterim(request.includeInterim)
|
||||
.epochStart(request.start)
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
|
|||
import org.elasticsearch.action.bulk.byscroll.ParentBulkByScrollTask;
|
||||
import org.elasticsearch.action.bulk.byscroll.WorkingBulkByScrollTask;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
|
@ -30,6 +31,7 @@ import org.elasticsearch.script.ScriptService;
|
|||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
|
||||
public class MlDeleteByQueryAction extends Action<DeleteByQueryRequest, BulkByScrollResponse,
|
||||
MlDeleteByQueryAction.MlDeleteByQueryRequestBuilder> {
|
||||
|
@ -54,7 +56,7 @@ public class MlDeleteByQueryAction extends Action<DeleteByQueryRequest, BulkBySc
|
|||
public static class MlDeleteByQueryRequestBuilder extends
|
||||
AbstractBulkByScrollRequestBuilder<DeleteByQueryRequest, MlDeleteByQueryRequestBuilder> {
|
||||
|
||||
public MlDeleteByQueryRequestBuilder(ElasticsearchClient client,
|
||||
private MlDeleteByQueryRequestBuilder(ElasticsearchClient client,
|
||||
Action<DeleteByQueryRequest, BulkByScrollResponse, MlDeleteByQueryRequestBuilder> action) {
|
||||
this(client, action, new SearchRequestBuilder(client, SearchAction.INSTANCE));
|
||||
}
|
||||
|
@ -62,7 +64,9 @@ public class MlDeleteByQueryAction extends Action<DeleteByQueryRequest, BulkBySc
|
|||
private MlDeleteByQueryRequestBuilder(ElasticsearchClient client,
|
||||
Action<DeleteByQueryRequest, BulkByScrollResponse, MlDeleteByQueryRequestBuilder> action,
|
||||
SearchRequestBuilder search) {
|
||||
super(client, action, search, new DeleteByQueryRequest(search.request()));
|
||||
super(client, action, search,
|
||||
new DeleteByQueryRequest(search.setIndicesOptions(
|
||||
JobProvider.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)).request()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -386,7 +386,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
|
|||
|
||||
static DiscoveryNode selectLeastLoadedMlNode(String jobId, ClusterState clusterState, int maxConcurrentJobAllocations,
|
||||
Logger logger) {
|
||||
if (verifyIndicesExistAndPrimaryShardsAreActive(logger, jobId, clusterState) == false) {
|
||||
if (verifyIndicesPrimaryShardsAreActive(logger, jobId, clusterState) == false) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -454,16 +454,16 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
|
|||
return minLoadedNode;
|
||||
}
|
||||
|
||||
static boolean verifyIndicesExistAndPrimaryShardsAreActive(Logger logger, String jobId, ClusterState clusterState) {
|
||||
static boolean verifyIndicesPrimaryShardsAreActive(Logger logger, String jobId, ClusterState clusterState) {
|
||||
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
|
||||
Job job = mlMetadata.getJobs().get(jobId);
|
||||
String jobResultIndex = AnomalyDetectorsIndex.jobResultsIndexName(job.getResultsIndexName());
|
||||
String[] indices = new String[]{AnomalyDetectorsIndex.jobStateIndexName(), jobResultIndex, JobProvider.ML_META_INDEX,
|
||||
Auditor.NOTIFICATIONS_INDEX};
|
||||
String[] indices = new String[]{AnomalyDetectorsIndex.jobStateIndexName(), jobResultIndex, JobProvider.ML_META_INDEX};
|
||||
for (String index : indices) {
|
||||
// Indices are created on demand from templates.
|
||||
// It is not an error if the index doesn't exist yet
|
||||
if (clusterState.metaData().hasIndex(index) == false) {
|
||||
logger.warn("Not opening job [{}], because [{}] index is missing.", jobId, index);
|
||||
return false;
|
||||
continue;
|
||||
}
|
||||
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index);
|
||||
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
|
||||
|
|
|
@ -18,6 +18,10 @@ public final class AnomalyDetectorsIndex {
|
|||
private AnomalyDetectorsIndex() {
|
||||
}
|
||||
|
||||
public static String jobResultsIndexPrefix() {
|
||||
return RESULTS_INDEX_PREFIX;
|
||||
}
|
||||
|
||||
/**
|
||||
* The name of the default index where the job's results are stored
|
||||
* @param jobId Job Id
|
||||
|
|
|
@ -124,6 +124,7 @@ public abstract class BatchedDocumentsIterator<T> {
|
|||
isScrollInitialised = true;
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest(index);
|
||||
searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
|
||||
searchRequest.types(getType());
|
||||
searchRequest.scroll(CONTEXT_ALIVE_DURATION);
|
||||
searchRequest.source(new SearchSourceBuilder()
|
||||
|
|
|
@ -115,13 +115,10 @@ public class ElasticsearchMappings {
|
|||
* <li>Influencer.influencer_field_value</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param termFieldNames All the term fields (by, over, partition) and influencers
|
||||
* included in the mapping
|
||||
*
|
||||
* @return The mapping
|
||||
* @throws IOException On write error
|
||||
*/
|
||||
public static XContentBuilder resultsMapping(Collection<String> termFieldNames) throws IOException {
|
||||
public static XContentBuilder resultsMapping() throws IOException {
|
||||
XContentBuilder builder = jsonBuilder()
|
||||
.startObject()
|
||||
.startObject(Result.TYPE.getPreferredName())
|
||||
|
@ -258,8 +255,6 @@ public class ElasticsearchMappings {
|
|||
addInfluencerFieldsToMapping(builder);
|
||||
addModelSizeStatsFieldsToMapping(builder);
|
||||
|
||||
addTermFields(builder, termFieldNames);
|
||||
|
||||
// End result properties
|
||||
builder.endObject();
|
||||
// End result
|
||||
|
|
|
@ -16,6 +16,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
|||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
|
||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.get.MultiGetItemResponse;
|
||||
|
@ -24,6 +25,7 @@ import org.elasticsearch.action.search.MultiSearchRequest;
|
|||
import org.elasticsearch.action.search.MultiSearchResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -147,39 +149,55 @@ public class JobProvider {
|
|||
}
|
||||
|
||||
/**
|
||||
* Create the Audit index with the audit message document mapping.
|
||||
* Index template for notifications
|
||||
*/
|
||||
public void createNotificationMessageIndex(BiConsumer<Boolean, Exception> listener) {
|
||||
public void putNotificationMessageIndexTemplate(BiConsumer<Boolean, Exception> listener) {
|
||||
try {
|
||||
LOGGER.info("Creating the internal '{}' index", Auditor.NOTIFICATIONS_INDEX);
|
||||
XContentBuilder auditMessageMapping = ElasticsearchMappings.auditMessageMapping();
|
||||
XContentBuilder auditActivityMapping = ElasticsearchMappings.auditActivityMapping();
|
||||
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(Auditor.NOTIFICATIONS_INDEX);
|
||||
templateRequest.patterns(Collections.singletonList(Auditor.NOTIFICATIONS_INDEX));
|
||||
templateRequest.settings(mlNotificationIndexSettings());
|
||||
templateRequest.mapping(AuditMessage.TYPE.getPreferredName(), ElasticsearchMappings.auditMessageMapping());
|
||||
templateRequest.mapping(AuditActivity.TYPE.getPreferredName(), ElasticsearchMappings.auditActivityMapping());
|
||||
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest(Auditor.NOTIFICATIONS_INDEX);
|
||||
createIndexRequest.settings(mlNotificationIndexSettings());
|
||||
createIndexRequest.mapping(AuditMessage.TYPE.getPreferredName(), auditMessageMapping);
|
||||
createIndexRequest.mapping(AuditActivity.TYPE.getPreferredName(), auditActivityMapping);
|
||||
|
||||
client.admin().indices().create(createIndexRequest,
|
||||
client.admin().indices().putTemplate(templateRequest,
|
||||
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
|
||||
} catch (IOException e) {
|
||||
LOGGER.warn("Error creating mappings for the audit message index", e);
|
||||
LOGGER.warn("Error putting the template for the notification message index", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the meta index with the filter list document mapping.
|
||||
* Index template for meta data
|
||||
*/
|
||||
public void createMetaIndex(BiConsumer<Boolean, Exception> listener) {
|
||||
LOGGER.info("Creating the internal '{}' index", ML_META_INDEX);
|
||||
public void putMetaIndexTemplate(BiConsumer<Boolean, Exception> listener) {
|
||||
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(ML_META_INDEX);
|
||||
templateRequest.patterns(Collections.singletonList(ML_META_INDEX));
|
||||
templateRequest.settings(mlNotificationIndexSettings());
|
||||
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest(ML_META_INDEX);
|
||||
createIndexRequest.settings(mlNotificationIndexSettings());
|
||||
|
||||
client.admin().indices().create(createIndexRequest,
|
||||
client.admin().indices().putTemplate(templateRequest,
|
||||
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
|
||||
}
|
||||
|
||||
public void putJobStateIndexTemplate(BiConsumer<Boolean, Exception> listener) {
|
||||
try {
|
||||
XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping();
|
||||
XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping();
|
||||
XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping();
|
||||
|
||||
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobStateIndexName());
|
||||
templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()));
|
||||
templateRequest.settings(mlStateIndexSettings());
|
||||
templateRequest.mapping(CategorizerState.TYPE, categorizerStateMapping);
|
||||
templateRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping);
|
||||
templateRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping);
|
||||
|
||||
client.admin().indices().putTemplate(templateRequest,
|
||||
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error putting the template for the " + AnomalyDetectorsIndex.jobStateIndexName() + " index", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the Elasticsearch index settings that we want to apply to results
|
||||
* indexes. It's better to do this in code rather than in elasticsearch.yml
|
||||
|
@ -250,79 +268,88 @@ public class JobProvider {
|
|||
.put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), true);
|
||||
}
|
||||
|
||||
public void putJobResultsIndexTemplate(BiConsumer<Boolean, Exception> listener) {
|
||||
try {
|
||||
XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping();
|
||||
XContentBuilder categoryDefinitionMapping = ElasticsearchMappings.categoryDefinitionMapping();
|
||||
XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping();
|
||||
XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping();
|
||||
|
||||
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobResultsIndexPrefix());
|
||||
templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"));
|
||||
templateRequest.settings(mlResultsIndexSettings());
|
||||
templateRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping);
|
||||
templateRequest.mapping(CategoryDefinition.TYPE.getPreferredName(), categoryDefinitionMapping);
|
||||
templateRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping);
|
||||
templateRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping);
|
||||
|
||||
client.admin().indices().putTemplate(templateRequest,
|
||||
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error putting the template for the " + AnomalyDetectorsIndex.jobResultsIndexPrefix() + " indices", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the Elasticsearch index and the mappings
|
||||
*/
|
||||
public void createJobResultIndex(Job job, ClusterState state, ActionListener<Boolean> listener) {
|
||||
Collection<String> termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList();
|
||||
try {
|
||||
XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping(termFields);
|
||||
XContentBuilder categoryDefinitionMapping = ElasticsearchMappings.categoryDefinitionMapping();
|
||||
XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping();
|
||||
XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping();
|
||||
String jobId = job.getId();
|
||||
boolean createIndexAlias = !job.getResultsIndexName().equals(job.getId());
|
||||
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(job.getResultsIndexName());
|
||||
|
||||
String jobId = job.getId();
|
||||
boolean createIndexAlias = !job.getResultsIndexName().equals(job.getId());
|
||||
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(job.getResultsIndexName());
|
||||
if (createIndexAlias) {
|
||||
final ActionListener<Boolean> responseListener = listener;
|
||||
listener = ActionListener.wrap(aBoolean -> {
|
||||
client.admin().indices().prepareAliases()
|
||||
.addAlias(indexName, AnomalyDetectorsIndex.jobResultsIndexName(jobId))
|
||||
.execute(ActionListener.wrap(r -> responseListener.onResponse(true), responseListener::onFailure));
|
||||
},
|
||||
listener::onFailure);
|
||||
}
|
||||
|
||||
if (createIndexAlias) {
|
||||
final ActionListener<Boolean> responseListener = listener;
|
||||
listener = ActionListener.wrap(aBoolean -> {
|
||||
client.admin().indices().prepareAliases()
|
||||
.addAlias(indexName, AnomalyDetectorsIndex.jobResultsIndexName(jobId))
|
||||
.execute(ActionListener.wrap(r -> responseListener.onResponse(true), responseListener::onFailure));
|
||||
},
|
||||
listener::onFailure);
|
||||
}
|
||||
// Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if
|
||||
// already in the CS
|
||||
if (!state.getMetaData().hasIndex(indexName)) {
|
||||
LOGGER.trace("ES API CALL: create index {}", indexName);
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
|
||||
|
||||
// Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if
|
||||
// already in the CS
|
||||
if (!state.getMetaData().hasIndex(indexName)) {
|
||||
LOGGER.trace("ES API CALL: create index {}", indexName);
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
|
||||
createIndexRequest.settings(mlResultsIndexSettings());
|
||||
createIndexRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping);
|
||||
createIndexRequest.mapping(CategoryDefinition.TYPE.getPreferredName(), categoryDefinitionMapping);
|
||||
createIndexRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping);
|
||||
createIndexRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping);
|
||||
|
||||
final ActionListener<Boolean> createdListener = listener;
|
||||
client.admin().indices().create(createIndexRequest,
|
||||
ActionListener.wrap(r -> createdListener.onResponse(true),
|
||||
e -> {
|
||||
// Possible that the index was created while the request was executing,
|
||||
// so we need to handle that possibility
|
||||
if (e instanceof ResourceAlreadyExistsException) {
|
||||
LOGGER.info("Index already exists");
|
||||
// Create the alias
|
||||
createdListener.onResponse(true);
|
||||
} else {
|
||||
createdListener.onFailure(e);
|
||||
}
|
||||
final ActionListener<Boolean> createdListener = listener;
|
||||
client.admin().indices().create(createIndexRequest,
|
||||
ActionListener.wrap(
|
||||
r -> {
|
||||
updateIndexMappingWithTermFields(indexName, termFields, createdListener);
|
||||
},
|
||||
e -> {
|
||||
// Possible that the index was created while the request was executing,
|
||||
// so we need to handle that possibility
|
||||
if (e instanceof ResourceAlreadyExistsException) {
|
||||
LOGGER.info("Index already exists");
|
||||
// Create the alias
|
||||
createdListener.onResponse(true);
|
||||
} else {
|
||||
createdListener.onFailure(e);
|
||||
}
|
||||
));
|
||||
} else {
|
||||
// Add the job's term fields to the index mapping
|
||||
updateIndexMappingWithTermFields(indexName, termFields, listener);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
));
|
||||
} else {
|
||||
// Add the job's term fields to the index mapping
|
||||
final ActionListener<Boolean> updateMappingListener = listener;
|
||||
checkNumberOfFieldsLimit(indexName, termFields.size(), ActionListener.wrap(
|
||||
r -> updateIndexMappingWithTermFields(indexName, termFields, updateMappingListener),
|
||||
updateMappingListener::onFailure));
|
||||
}
|
||||
}
|
||||
|
||||
private void updateIndexMappingWithTermFields(String indexName, Collection<String> termFields, ActionListener<Boolean> listener) {
|
||||
|
||||
checkNumberOfFieldsLimit(indexName, termFields.size(), new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(Boolean aBoolean) {
|
||||
try {
|
||||
client.admin().indices().preparePutMapping(indexName).setType(Result.TYPE.getPreferredName())
|
||||
.setSource(ElasticsearchMappings.termFieldsMapping(termFields))
|
||||
.execute(new ActionListener<PutMappingResponse>() {
|
||||
try {
|
||||
client.admin().indices().preparePutMapping(indexName).setType(Result.TYPE.getPreferredName())
|
||||
.setSource(ElasticsearchMappings.termFieldsMapping(termFields))
|
||||
.execute(new ActionListener<PutMappingResponse>() {
|
||||
@Override
|
||||
public void onResponse(PutMappingResponse putMappingResponse) {
|
||||
listener.onResponse(putMappingResponse.isAcknowledged());
|
||||
listener.onResponse(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -330,16 +357,9 @@ public class JobProvider {
|
|||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkNumberOfFieldsLimit(String indexName, long additionalFieldCount, ActionListener<Boolean> listener) {
|
||||
|
@ -376,26 +396,6 @@ public class JobProvider {
|
|||
});
|
||||
}
|
||||
|
||||
public void createJobStateIndex(BiConsumer<Boolean, Exception> listener) {
|
||||
try {
|
||||
XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping();
|
||||
XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping();
|
||||
XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping();
|
||||
|
||||
LOGGER.trace("ES API CALL: create state index {}", AnomalyDetectorsIndex.jobStateIndexName());
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest(AnomalyDetectorsIndex.jobStateIndexName());
|
||||
createIndexRequest.settings(mlStateIndexSettings());
|
||||
createIndexRequest.mapping(CategorizerState.TYPE, categorizerStateMapping);
|
||||
createIndexRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping);
|
||||
createIndexRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping);
|
||||
|
||||
client.admin().indices().create(createIndexRequest,
|
||||
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error creating the " + AnomalyDetectorsIndex.jobStateIndexName() + " index", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all the job related documents from the database.
|
||||
*/
|
||||
|
@ -427,9 +427,10 @@ public class JobProvider {
|
|||
private <T, U> void get(String jobId, String indexName, String type, String id, Consumer<T> handler, Consumer<Exception> errorHandler,
|
||||
BiFunction<XContentParser, U, T> objectParser, Supplier<T> notFoundSupplier) {
|
||||
GetRequest getRequest = new GetRequest(indexName, type, id);
|
||||
|
||||
client.get(getRequest, ActionListener.wrap(
|
||||
response -> {
|
||||
if (response.isExists() == false) {
|
||||
response -> {
|
||||
if (response.isExists() == false) {
|
||||
handler.accept(notFoundSupplier.get());
|
||||
} else {
|
||||
BytesReference source = response.getSourceAsBytesRef();
|
||||
|
@ -440,11 +441,11 @@ public class JobProvider {
|
|||
}
|
||||
}
|
||||
},
|
||||
e -> {
|
||||
if (e instanceof IndexNotFoundException) {
|
||||
errorHandler.accept(ExceptionsHelper.missingJobException(jobId));
|
||||
error -> {
|
||||
if (error instanceof IndexNotFoundException == false) {
|
||||
errorHandler.accept(error);
|
||||
} else {
|
||||
errorHandler.accept(e);
|
||||
handler.accept(notFoundSupplier.get());
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
@ -477,27 +478,19 @@ public class JobProvider {
|
|||
}
|
||||
handler.accept(objects);
|
||||
},
|
||||
errorHandler)
|
||||
e -> {
|
||||
if (e instanceof IndexNotFoundException == false) {
|
||||
errorHandler.accept(e);
|
||||
} else {
|
||||
handler.accept(new HashSet<>());
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
private <T, U> Optional<T> getBlocking(String indexName, String type, String id, BiFunction<XContentParser, U, T> objectParser) {
|
||||
GetRequest getRequest = new GetRequest(indexName, type, id);
|
||||
try {
|
||||
GetResponse response = client.get(getRequest).actionGet();
|
||||
if (!response.isExists()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
BytesReference source = response.getSourceAsBytesRef();
|
||||
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
|
||||
return Optional.of(objectParser.apply(parser, null));
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchParseException("failed to parse " + type, e);
|
||||
}
|
||||
} catch (IndexNotFoundException e) {
|
||||
LOGGER.error("Missing index when getting " + type, e);
|
||||
throw e;
|
||||
}
|
||||
public static IndicesOptions addIgnoreUnavailable(IndicesOptions indicesOptions) {
|
||||
return IndicesOptions.fromOptions(true, indicesOptions.allowNoIndices(), indicesOptions.expandWildcardsOpen(),
|
||||
indicesOptions.expandWildcardsClosed(), indicesOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -532,6 +525,7 @@ public class JobProvider {
|
|||
searchRequest.source(searchSourceBuilder);
|
||||
|
||||
MultiSearchRequest mrequest = new MultiSearchRequest();
|
||||
mrequest.indicesOptions(addIgnoreUnavailable(mrequest.indicesOptions()));
|
||||
mrequest.add(searchRequest);
|
||||
if (Strings.hasLength(query.getPartitionValue())) {
|
||||
mrequest.add(createPartitionMaxNormailizedProbabilitiesRequest(jobId, query.getStart(), query.getEnd(),
|
||||
|
@ -541,12 +535,7 @@ public class JobProvider {
|
|||
client.multiSearch(mrequest, ActionListener.wrap(mresponse -> {
|
||||
MultiSearchResponse.Item item1 = mresponse.getResponses()[0];
|
||||
if (item1.isFailure()) {
|
||||
Exception e = item1.getFailure();
|
||||
if (e instanceof IndexNotFoundException) {
|
||||
errorHandler.accept(ExceptionsHelper.missingJobException(jobId));
|
||||
} else {
|
||||
errorHandler.accept(e);
|
||||
}
|
||||
errorHandler.accept(item1.getFailure());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -581,12 +570,7 @@ public class JobProvider {
|
|||
if (Strings.hasLength(query.getPartitionValue())) {
|
||||
MultiSearchResponse.Item item2 = mresponse.getResponses()[1];
|
||||
if (item2.isFailure()) {
|
||||
Exception e = item2.getFailure();
|
||||
if (e instanceof IndexNotFoundException) {
|
||||
errorHandler.accept(ExceptionsHelper.missingJobException(jobId));
|
||||
} else {
|
||||
errorHandler.accept(e);
|
||||
}
|
||||
errorHandler.accept(item2.getFailure());
|
||||
return;
|
||||
}
|
||||
List<PerPartitionMaxProbabilities> partitionProbs =
|
||||
|
@ -658,6 +642,7 @@ public class JobProvider {
|
|||
sourceBuilder.query(boolQuery);
|
||||
SearchRequest searchRequest = new SearchRequest(indexName);
|
||||
searchRequest.source(sourceBuilder);
|
||||
searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions()));
|
||||
return searchRequest;
|
||||
}
|
||||
|
||||
|
@ -788,6 +773,7 @@ public class JobProvider {
|
|||
CategoryDefinition.TYPE.getPreferredName(), indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size);
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest(indexName);
|
||||
searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions()));
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
||||
if (categoryId != null) {
|
||||
String documentId = CategoryDefinition.documentId(jobId, categoryId);
|
||||
|
@ -817,13 +803,7 @@ public class JobProvider {
|
|||
QueryPage<CategoryDefinition> result =
|
||||
new QueryPage<>(results, searchResponse.getHits().getTotalHits(), CategoryDefinition.RESULTS_FIELD);
|
||||
handler.accept(result);
|
||||
}, e -> {
|
||||
if (e instanceof IndexNotFoundException) {
|
||||
errorHandler.accept(ExceptionsHelper.missingJobException(jobId));
|
||||
} else {
|
||||
errorHandler.accept(e);
|
||||
}
|
||||
}));
|
||||
}, errorHandler::accept));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -861,6 +841,7 @@ public class JobProvider {
|
|||
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), AnomalyRecord.RESULT_TYPE_VALUE));
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest(indexName);
|
||||
searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions()));
|
||||
searchRequest.types(Result.TYPE.getPreferredName());
|
||||
searchRequest.source(new SearchSourceBuilder()
|
||||
.from(from)
|
||||
|
@ -890,13 +871,7 @@ public class JobProvider {
|
|||
QueryPage<AnomalyRecord> queryPage =
|
||||
new QueryPage<>(results, searchResponse.getHits().getTotalHits(), AnomalyRecord.RESULTS_FIELD);
|
||||
handler.accept(queryPage);
|
||||
}, e -> {
|
||||
if (e instanceof IndexNotFoundException) {
|
||||
errorHandler.accept(ExceptionsHelper.missingJobException(jobId));
|
||||
} else {
|
||||
errorHandler.accept(e);
|
||||
}
|
||||
}));
|
||||
}, errorHandler::accept));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -926,6 +901,7 @@ public class JobProvider {
|
|||
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), Influencer.RESULT_TYPE_VALUE));
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest(indexName);
|
||||
searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions()));
|
||||
searchRequest.types(Result.TYPE.getPreferredName());
|
||||
FieldSortBuilder sb = query.getSortField() == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)
|
||||
: new FieldSortBuilder(query.getSortField()).order(query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC);
|
||||
|
@ -1061,6 +1037,7 @@ public class JobProvider {
|
|||
ModelSnapshot.TYPE, indexName, sortField, from, size);
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest(indexName);
|
||||
searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions()));
|
||||
searchRequest.types(ModelSnapshot.TYPE.getPreferredName());
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
||||
sourceBuilder.sort(sb);
|
||||
|
@ -1147,19 +1124,16 @@ public class JobProvider {
|
|||
|
||||
public QueryPage<ModelDebugOutput> modelDebugOutput(String jobId, int from, int size) {
|
||||
SearchResponse searchResponse;
|
||||
try {
|
||||
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
|
||||
LOGGER.trace("ES API CALL: search result type {} from index {} from {}, size {}",
|
||||
ModelDebugOutput.RESULT_TYPE_VALUE, indexName, from, size);
|
||||
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
|
||||
LOGGER.trace("ES API CALL: search result type {} from index {} from {}, size {}",
|
||||
ModelDebugOutput.RESULT_TYPE_VALUE, indexName, from, size);
|
||||
|
||||
searchResponse = client.prepareSearch(indexName)
|
||||
.setTypes(Result.TYPE.getPreferredName())
|
||||
.setQuery(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), ModelDebugOutput.RESULT_TYPE_VALUE))
|
||||
.setFrom(from).setSize(size)
|
||||
.get();
|
||||
} catch (IndexNotFoundException e) {
|
||||
throw ExceptionsHelper.missingJobException(jobId);
|
||||
}
|
||||
searchResponse = client.prepareSearch(indexName)
|
||||
.setIndicesOptions(addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS))
|
||||
.setTypes(Result.TYPE.getPreferredName())
|
||||
.setQuery(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), ModelDebugOutput.RESULT_TYPE_VALUE))
|
||||
.setFrom(from).setSize(size)
|
||||
.get();
|
||||
|
||||
List<ModelDebugOutput> results = new ArrayList<>();
|
||||
|
||||
|
|
|
@ -70,6 +70,7 @@ public class JobStorageDeletionTask extends Task {
|
|||
// Step 1. DeleteByQuery on the index, matching all docs with the right job_id
|
||||
// -------
|
||||
SearchRequest searchRequest = new SearchRequest(indexPattern);
|
||||
searchRequest.indicesOptions(JobProvider.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
|
||||
searchRequest.source(new SearchSourceBuilder().query(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)));
|
||||
request.setSlices(5);
|
||||
|
@ -96,5 +97,4 @@ public class JobStorageDeletionTask extends Task {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
|
|||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
@ -27,6 +28,7 @@ import org.junit.Before;
|
|||
import java.net.InetAddress;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
|
||||
import static org.elasticsearch.mock.orig.Mockito.times;
|
||||
|
@ -64,6 +66,9 @@ public class MlInitializationServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testInitialize() throws Exception {
|
||||
JobProvider jobProvider = mockJobProvider();
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
MlInitializationService initializationService =
|
||||
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider);
|
||||
|
||||
|
@ -77,10 +82,11 @@ public class MlInitializationServiceTests extends ESTestCase {
|
|||
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
|
||||
|
||||
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any());
|
||||
verify(jobProvider, times(1)).createNotificationMessageIndex(any());
|
||||
verify(jobProvider, times(1)).createMetaIndex(any());
|
||||
verify(jobProvider, times(1)).createJobStateIndex(any());
|
||||
assertThat(initializationService.getDailyManagementService().isStarted(), is(true));
|
||||
verify(jobProvider, times(1)).putNotificationMessageIndexTemplate(any());
|
||||
verify(jobProvider, times(1)).putMetaIndexTemplate(any());
|
||||
verify(jobProvider, times(1)).putJobStateIndexTemplate(any());
|
||||
verify(jobProvider, times(1)).putJobResultsIndexTemplate(any());
|
||||
}
|
||||
|
||||
public void testInitialize_noMasterNode() throws Exception {
|
||||
|
@ -95,13 +101,24 @@ public class MlInitializationServiceTests extends ESTestCase {
|
|||
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
|
||||
|
||||
verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any());
|
||||
verify(jobProvider, times(0)).createNotificationMessageIndex(any());
|
||||
verify(jobProvider, times(0)).createMetaIndex(any());
|
||||
verify(jobProvider, times(0)).createJobStateIndex(any());
|
||||
assertThat(initializationService.getDailyManagementService(), is(nullValue()));
|
||||
verify(jobProvider, times(0)).putNotificationMessageIndexTemplate(any());
|
||||
verify(jobProvider, times(0)).putMetaIndexTemplate(any());
|
||||
verify(jobProvider, times(0)).putJobStateIndexTemplate(any());
|
||||
verify(jobProvider, times(0)).putJobResultsIndexTemplate(any());
|
||||
}
|
||||
|
||||
public void testInitialize_alreadyInitialized() throws Exception {
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
ExecutorService executorService = mock(ExecutorService.class);
|
||||
doAnswer(invocation -> {
|
||||
((Runnable) invocation.getArguments()[0]).run();
|
||||
return null;
|
||||
}).when(executorService).execute(any(Runnable.class));
|
||||
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
JobProvider jobProvider = mockJobProvider();
|
||||
MlInitializationService initializationService =
|
||||
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider);
|
||||
|
||||
|
@ -126,6 +143,10 @@ public class MlInitializationServiceTests extends ESTestCase {
|
|||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
))
|
||||
.put(IndexTemplateMetaData.builder(Auditor.NOTIFICATIONS_INDEX).build())
|
||||
.put(IndexTemplateMetaData.builder(JobProvider.ML_META_INDEX).build())
|
||||
.put(IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName()).build())
|
||||
.put(IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobResultsIndexPrefix()).build())
|
||||
.putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build()))
|
||||
.build();
|
||||
MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class);
|
||||
|
@ -133,13 +154,24 @@ public class MlInitializationServiceTests extends ESTestCase {
|
|||
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
|
||||
|
||||
verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any());
|
||||
verify(jobProvider, times(0)).createNotificationMessageIndex(any());
|
||||
verify(jobProvider, times(0)).createMetaIndex(any());
|
||||
verify(jobProvider, times(0)).createJobStateIndex(any());
|
||||
assertSame(initialDailyManagementService, initializationService.getDailyManagementService());
|
||||
verify(jobProvider, times(0)).putNotificationMessageIndexTemplate(any());
|
||||
verify(jobProvider, times(0)).putMetaIndexTemplate(any());
|
||||
verify(jobProvider, times(0)).putJobStateIndexTemplate(any());
|
||||
verify(jobProvider, times(0)).putJobResultsIndexTemplate(any());
|
||||
}
|
||||
|
||||
public void testInitialize_onlyOnce() throws Exception {
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
ExecutorService executorService = mock(ExecutorService.class);
|
||||
doAnswer(invocation -> {
|
||||
((Runnable) invocation.getArguments()[0]).run();
|
||||
return null;
|
||||
}).when(executorService).execute(any(Runnable.class));
|
||||
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
JobProvider jobProvider = mockJobProvider();
|
||||
MlInitializationService initializationService =
|
||||
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider);
|
||||
|
||||
|
@ -154,9 +186,37 @@ public class MlInitializationServiceTests extends ESTestCase {
|
|||
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
|
||||
|
||||
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any());
|
||||
verify(jobProvider, times(1)).createNotificationMessageIndex(any());
|
||||
verify(jobProvider, times(1)).createMetaIndex(any());
|
||||
verify(jobProvider, times(1)).createJobStateIndex(any());
|
||||
verify(jobProvider, times(1)).putNotificationMessageIndexTemplate(any());
|
||||
verify(jobProvider, times(1)).putMetaIndexTemplate(any());
|
||||
verify(jobProvider, times(1)).putJobStateIndexTemplate(any());
|
||||
verify(jobProvider, times(1)).putJobResultsIndexTemplate(any());
|
||||
}
|
||||
|
||||
private JobProvider mockJobProvider() {
|
||||
JobProvider jobProvider = mock(JobProvider.class);
|
||||
|
||||
doAnswer(invocation -> {
|
||||
BiConsumer<Boolean, Exception> listener = (BiConsumer<Boolean, Exception>)invocation.getArguments()[0];
|
||||
listener.accept(true, null);
|
||||
return null;
|
||||
}).when(jobProvider).putMetaIndexTemplate(any());
|
||||
doAnswer(invocation -> {
|
||||
BiConsumer<Boolean, Exception> listener = (BiConsumer<Boolean, Exception>)invocation.getArguments()[0];
|
||||
listener.accept(true, null);
|
||||
return null;
|
||||
}).when(jobProvider).putNotificationMessageIndexTemplate(any());
|
||||
doAnswer(invocation -> {
|
||||
BiConsumer<Boolean, Exception> listener = (BiConsumer<Boolean, Exception>)invocation.getArguments()[0];
|
||||
listener.accept(true, null);
|
||||
return null;
|
||||
}).when(jobProvider).putJobStateIndexTemplate(any());
|
||||
doAnswer(invocation -> {
|
||||
BiConsumer<Boolean, Exception> listener = (BiConsumer<Boolean, Exception>)invocation.getArguments()[0];
|
||||
listener.accept(true, null);
|
||||
return null;
|
||||
}).when(jobProvider).putJobResultsIndexTemplate(any());
|
||||
|
||||
return jobProvider;
|
||||
}
|
||||
|
||||
public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception {
|
||||
|
|
|
@ -262,7 +262,7 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
assertNull("no node selected, because null state", result);
|
||||
}
|
||||
|
||||
public void testVerifyIndicesExistAndPrimaryShardsAreActive() {
|
||||
public void testVerifyIndicesPrimaryShardsAreActive() {
|
||||
MetaData.Builder metaData = MetaData.builder();
|
||||
RoutingTable.Builder routingTable = RoutingTable.builder();
|
||||
addJobAndIndices(metaData, routingTable, "job_id");
|
||||
|
@ -272,14 +272,14 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
csBuilder.metaData(metaData);
|
||||
|
||||
ClusterState cs = csBuilder.build();
|
||||
assertTrue(OpenJobAction.verifyIndicesExistAndPrimaryShardsAreActive(logger, "job_id", cs));
|
||||
assertTrue(OpenJobAction.verifyIndicesPrimaryShardsAreActive(logger, "job_id", cs));
|
||||
|
||||
metaData = new MetaData.Builder(cs.metaData());
|
||||
routingTable = new RoutingTable.Builder(cs.routingTable());
|
||||
String indexToRemove = randomFrom(cs.metaData().getConcreteAllIndices());
|
||||
if (randomBoolean()) {
|
||||
routingTable.remove(indexToRemove);
|
||||
} else if (randomBoolean()) {
|
||||
} else {
|
||||
Index index = new Index(indexToRemove, "_uuid");
|
||||
ShardId shardId = new ShardId(index, 0);
|
||||
ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE,
|
||||
|
@ -287,12 +287,11 @@ public class OpenJobActionTests extends ESTestCase {
|
|||
shardRouting = shardRouting.initialize("node_id", null, 0L);
|
||||
routingTable.add(IndexRoutingTable.builder(index)
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build()));
|
||||
} else {
|
||||
metaData.remove(indexToRemove);
|
||||
}
|
||||
|
||||
csBuilder.routingTable(routingTable.build());
|
||||
csBuilder.metaData(metaData);
|
||||
assertFalse(OpenJobAction.verifyIndicesExistAndPrimaryShardsAreActive(logger, "job_id", csBuilder.build()));
|
||||
assertFalse(OpenJobAction.verifyIndicesPrimaryShardsAreActive(logger, "job_id", csBuilder.build()));
|
||||
}
|
||||
|
||||
public static PersistentTaskInProgress<OpenJobAction.Request> createJobTask(long id, String jobId, String nodeId, JobState jobState) {
|
||||
|
|
|
@ -5,14 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.integration;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -21,11 +14,8 @@ import org.elasticsearch.plugins.Plugin;
|
|||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.xpack.XPackPlugin;
|
||||
import org.elasticsearch.xpack.XPackSettings;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
|
||||
|
@ -64,9 +54,6 @@ import java.util.Set;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -83,6 +70,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
|
|||
protected Settings nodeSettings() {
|
||||
return Settings.builder().put(super.nodeSettings())
|
||||
.put(XPackSettings.SECURITY_ENABLED.getKey(), false)
|
||||
.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -108,7 +96,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
|
|||
}
|
||||
|
||||
public void testProcessResults() throws Exception {
|
||||
createJob();
|
||||
putResultsIndexMappingTemplate();
|
||||
|
||||
ResultsBuilder builder = new ResultsBuilder();
|
||||
Bucket bucket = createBucket(false);
|
||||
|
@ -168,7 +156,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
|
|||
}
|
||||
|
||||
public void testDeleteInterimResults() throws Exception {
|
||||
createJob();
|
||||
putResultsIndexMappingTemplate();
|
||||
Bucket nonInterimBucket = createBucket(false);
|
||||
Bucket interimBucket = createBucket(true);
|
||||
|
||||
|
@ -197,7 +185,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
|
|||
}
|
||||
|
||||
public void testMultipleFlushesBetweenPersisting() throws Exception {
|
||||
createJob();
|
||||
putResultsIndexMappingTemplate();
|
||||
Bucket finalBucket = createBucket(true);
|
||||
List<AnomalyRecord> finalAnomalyRecords = createRecords(true);
|
||||
|
||||
|
@ -227,7 +215,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
|
|||
}
|
||||
|
||||
public void testEndOfStreamTriggersPersisting() throws Exception {
|
||||
createJob();
|
||||
putResultsIndexMappingTemplate();
|
||||
Bucket bucket = createBucket(false);
|
||||
List<AnomalyRecord> firstSetOfRecords = createRecords(false);
|
||||
List<AnomalyRecord> secondSetOfRecords = createRecords(false);
|
||||
|
@ -249,36 +237,10 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
|
|||
assertResultsAreSame(allRecords, persistedRecords);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void createJob() {
|
||||
Detector.Builder detectorBuilder = new Detector.Builder("avg", "metric_field");
|
||||
detectorBuilder.setByFieldName("by_instance");
|
||||
Job.Builder jobBuilder = new Job.Builder(JOB_ID);
|
||||
AnalysisConfig.Builder analysisConfBuilder = new AnalysisConfig.Builder(Collections.singletonList(detectorBuilder.build()));
|
||||
analysisConfBuilder.setInfluencers(Collections.singletonList("influence_field"));
|
||||
jobBuilder.setAnalysisConfig(analysisConfBuilder);
|
||||
jobBuilder.setCreateTime(new Date());
|
||||
|
||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(ImmutableOpenMap.of())).build();
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
|
||||
doAnswer(invocationOnMock -> {
|
||||
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
|
||||
task.execute(cs);
|
||||
return null;
|
||||
}).when(clusterService).submitStateUpdateTask(eq("put-job-" + JOB_ID), any(AckedClusterStateUpdateTask.class));
|
||||
|
||||
jobProvider.createJobResultIndex(jobBuilder.build(), cs, new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(Boolean aBoolean) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
}
|
||||
});
|
||||
private void putResultsIndexMappingTemplate() throws InterruptedException {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
jobProvider.putJobResultsIndexTemplate((aBoolean, e) -> {latch.countDown();});
|
||||
latch.await();
|
||||
}
|
||||
|
||||
private Bucket createBucket(boolean isInterim) {
|
||||
|
|
|
@ -153,6 +153,8 @@ public class MlJobIT extends ESRestTestCase {
|
|||
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
|
||||
assertThat(e.getMessage(), containsString("No known job with id '1'"));
|
||||
|
||||
createFarequoteJob("1");
|
||||
|
||||
addBucketResult("1", "1234", 1);
|
||||
addBucketResult("1", "1235", 1);
|
||||
addBucketResult("1", "1236", 1);
|
||||
|
@ -192,6 +194,8 @@ public class MlJobIT extends ESRestTestCase {
|
|||
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
|
||||
assertThat(e.getMessage(), containsString("No known job with id '1'"));
|
||||
|
||||
createFarequoteJob("1");
|
||||
|
||||
addRecordResult("1", "1234", 1, 1);
|
||||
addRecordResult("1", "1235", 1, 2);
|
||||
addRecordResult("1", "1236", 1, 3);
|
||||
|
|
|
@ -84,7 +84,7 @@ public class ElasticsearchMappingsTests extends ESTestCase {
|
|||
|
||||
// Only the mappings for the results index should be added below. Do NOT add mappings for other indexes here.
|
||||
|
||||
XContentBuilder builder = ElasticsearchMappings.resultsMapping(Collections.emptyList());
|
||||
XContentBuilder builder = ElasticsearchMappings.resultsMapping();
|
||||
BufferedInputStream inputStream =
|
||||
new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
|
||||
JsonParser parser = new JsonFactory().createParser(inputStream);
|
||||
|
@ -126,28 +126,6 @@ public class ElasticsearchMappingsTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testResultMapping_WithExtraTermFields() throws IOException {
|
||||
|
||||
XContentBuilder builder = ElasticsearchMappings.resultsMapping(
|
||||
Arrays.asList("instance", AnomalyRecord.ANOMALY_SCORE.getPreferredName()));
|
||||
XContentParser parser = createParser(builder);
|
||||
Map<String, Object> type = (Map<String, Object>) parser.map().get(Result.TYPE.getPreferredName());
|
||||
Map<String, Object> properties = (Map<String, Object>) type.get(ElasticsearchMappings.PROPERTIES);
|
||||
|
||||
// check a keyword mapping for the 'instance' field was created
|
||||
Map<String, Object> instanceMapping = (Map<String, Object>) properties.get("instance");
|
||||
assertNotNull(instanceMapping);
|
||||
String dataType = (String)instanceMapping.get(ElasticsearchMappings.TYPE);
|
||||
assertEquals(ElasticsearchMappings.KEYWORD, dataType);
|
||||
|
||||
// check anomaly score wasn't overwritten
|
||||
Map<String, Object> anomalyScoreMapping = (Map<String, Object>) properties.get(AnomalyRecord.ANOMALY_SCORE.getPreferredName());
|
||||
assertNotNull(anomalyScoreMapping);
|
||||
dataType = (String)anomalyScoreMapping.get(ElasticsearchMappings.TYPE);
|
||||
assertEquals(ElasticsearchMappings.DOUBLE, dataType);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testTermFieldMapping() throws IOException {
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
|
||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.search.MultiSearchRequest;
|
||||
import org.elasticsearch.action.search.MultiSearchResponse;
|
||||
|
@ -75,6 +76,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
|
||||
import static org.elasticsearch.xpack.ml.job.persistence.JobProvider.ML_META_INDEX;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.mockito.Matchers.any;
|
||||
|
@ -148,39 +150,50 @@ public class JobProviderTests extends ESTestCase {
|
|||
assertEquals("all_field_values", settings.get("index.query.default_field"));
|
||||
}
|
||||
|
||||
public void testPutJobResultsIndexTemplate() {
|
||||
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
|
||||
ArgumentCaptor<PutIndexTemplateRequest> captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
|
||||
clientBuilder.putTemplate(captor);
|
||||
|
||||
Job.Builder job = buildJobBuilder("foo");
|
||||
JobProvider provider = createProvider(clientBuilder.build());
|
||||
|
||||
provider.putJobResultsIndexTemplate((result, error) -> {
|
||||
assertTrue(result);
|
||||
PutIndexTemplateRequest request = captor.getValue();
|
||||
assertNotNull(request);
|
||||
assertEquals(provider.mlResultsIndexSettings().build(), request.settings());
|
||||
assertTrue(request.mappings().containsKey(Result.TYPE.getPreferredName()));
|
||||
assertTrue(request.mappings().containsKey(CategoryDefinition.TYPE.getPreferredName()));
|
||||
assertTrue(request.mappings().containsKey(DataCounts.TYPE.getPreferredName()));
|
||||
assertTrue(request.mappings().containsKey(ModelSnapshot.TYPE.getPreferredName()));
|
||||
assertEquals(4, request.mappings().size());
|
||||
assertEquals(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"), request.patterns());
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCreateJobResultsIndex() {
|
||||
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
|
||||
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
||||
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor);
|
||||
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
|
||||
|
||||
Job.Builder job = buildJobBuilder("foo");
|
||||
JobProvider provider = createProvider(clientBuilder.build());
|
||||
AtomicReference<Boolean> resultHolder = new AtomicReference<>();
|
||||
|
||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(ImmutableOpenMap.of())).build();
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
|
||||
doAnswer(invocationOnMock -> {
|
||||
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
|
||||
task.execute(cs);
|
||||
return null;
|
||||
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class));
|
||||
|
||||
provider.createJobResultIndex(job.build(), cs, new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(Boolean aBoolean) {
|
||||
CreateIndexRequest request = captor.getValue();
|
||||
assertNotNull(request);
|
||||
assertEquals(provider.mlResultsIndexSettings().build(), request.settings());
|
||||
assertTrue(request.mappings().containsKey(Result.TYPE.getPreferredName()));
|
||||
assertTrue(request.mappings().containsKey(CategoryDefinition.TYPE.getPreferredName()));
|
||||
assertTrue(request.mappings().containsKey(DataCounts.TYPE.getPreferredName()));
|
||||
assertTrue(request.mappings().containsKey(ModelSnapshot.TYPE.getPreferredName()));
|
||||
assertEquals(4, request.mappings().size());
|
||||
|
||||
assertEquals(AnomalyDetectorsIndex.jobResultsIndexName("foo"), request.index());
|
||||
clientBuilder.verifyIndexCreated(AnomalyDetectorsIndex.jobResultsIndexName("foo"));
|
||||
resultHolder.set(aBoolean);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -188,6 +201,9 @@ public class JobProviderTests extends ESTestCase {
|
|||
fail(e.toString());
|
||||
}
|
||||
});
|
||||
|
||||
assertNotNull(resultHolder.get());
|
||||
assertTrue(resultHolder.get());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -252,11 +268,12 @@ public class JobProviderTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCreateJobRelatedIndicies_createsAliasIfIndexNameIsSet() {
|
||||
public void testCreateJobRelatedIndicies_createsAliasBecauseIndexNameIsSet() {
|
||||
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
|
||||
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
||||
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor);
|
||||
clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsIndexName("bar"), AnomalyDetectorsIndex.jobResultsIndexName("foo"));
|
||||
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
|
||||
|
||||
Job.Builder job = buildJobBuilder("foo");
|
||||
job.setResultsIndexName("bar");
|
||||
|
@ -276,14 +293,6 @@ public class JobProviderTests extends ESTestCase {
|
|||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build();
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
|
||||
doAnswer(invocationOnMock -> {
|
||||
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
|
||||
task.execute(cs);
|
||||
return null;
|
||||
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class));
|
||||
|
||||
provider.createJobResultIndex(job.build(), cs, new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(Boolean aBoolean) {
|
||||
|
@ -330,14 +339,6 @@ public class JobProviderTests extends ESTestCase {
|
|||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build();
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
|
||||
doAnswer(invocationOnMock -> {
|
||||
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
|
||||
task.execute(cs);
|
||||
return null;
|
||||
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class));
|
||||
|
||||
provider.createJobResultIndex(job.build(), cs, new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(Boolean aBoolean) {
|
||||
|
@ -363,41 +364,39 @@ public class JobProviderTests extends ESTestCase {
|
|||
assertEquals("true", settings.get("index.mapper.dynamic"));
|
||||
}
|
||||
|
||||
public void testCreateAuditMessageIndex() {
|
||||
public void testPutNotificationIndexTemplate() {
|
||||
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
|
||||
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
||||
clientBuilder.createIndexRequest(Auditor.NOTIFICATIONS_INDEX, captor);
|
||||
ArgumentCaptor<PutIndexTemplateRequest> captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
|
||||
clientBuilder.putTemplate(captor);
|
||||
|
||||
JobProvider provider = createProvider(clientBuilder.build());
|
||||
|
||||
provider.createNotificationMessageIndex((result, error) -> {
|
||||
provider.putNotificationMessageIndexTemplate((result, error) -> {
|
||||
assertTrue(result);
|
||||
CreateIndexRequest request = captor.getValue();
|
||||
PutIndexTemplateRequest request = captor.getValue();
|
||||
assertNotNull(request);
|
||||
assertEquals(provider.mlNotificationIndexSettings().build(), request.settings());
|
||||
assertTrue(request.mappings().containsKey(AuditMessage.TYPE.getPreferredName()));
|
||||
assertTrue(request.mappings().containsKey(AuditActivity.TYPE.getPreferredName()));
|
||||
assertEquals(2, request.mappings().size());
|
||||
|
||||
clientBuilder.verifyIndexCreated(Auditor.NOTIFICATIONS_INDEX);
|
||||
assertEquals(Collections.singletonList(Auditor.NOTIFICATIONS_INDEX), request.patterns());
|
||||
});
|
||||
}
|
||||
|
||||
public void testCreateMetaIndex() {
|
||||
public void testPutMetaIndexTemplate() {
|
||||
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
|
||||
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
||||
clientBuilder.createIndexRequest(JobProvider.ML_META_INDEX, captor);
|
||||
ArgumentCaptor<PutIndexTemplateRequest> captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
|
||||
clientBuilder.putTemplate(captor);
|
||||
|
||||
JobProvider provider = createProvider(clientBuilder.build());
|
||||
|
||||
provider.createMetaIndex((result, error) -> {
|
||||
provider.putMetaIndexTemplate((result, error) -> {
|
||||
assertTrue(result);
|
||||
CreateIndexRequest request = captor.getValue();
|
||||
PutIndexTemplateRequest request = captor.getValue();
|
||||
assertNotNull(request);
|
||||
assertEquals(provider.mlNotificationIndexSettings().build(), request.settings());
|
||||
assertEquals(0, request.mappings().size());
|
||||
|
||||
clientBuilder.verifyIndexCreated(JobProvider.ML_META_INDEX);
|
||||
assertEquals(Collections.singletonList(ML_META_INDEX), request.patterns());
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -411,68 +410,28 @@ public class JobProviderTests extends ESTestCase {
|
|||
assertEquals("async", settings.get("index.translog.durability"));
|
||||
}
|
||||
|
||||
public void testCreateJobStateIndex() {
|
||||
public void testPutJobStateIndexTemplate() {
|
||||
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
|
||||
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
||||
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobStateIndexName(), captor);
|
||||
ArgumentCaptor<PutIndexTemplateRequest> captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
|
||||
clientBuilder.putTemplate(captor);
|
||||
|
||||
Job.Builder job = buildJobBuilder("foo");
|
||||
JobProvider provider = createProvider(clientBuilder.build());
|
||||
|
||||
provider.createJobStateIndex((result, error) -> {
|
||||
provider.putJobStateIndexTemplate((result, error) -> {
|
||||
assertTrue(result);
|
||||
CreateIndexRequest request = captor.getValue();
|
||||
PutIndexTemplateRequest request = captor.getValue();
|
||||
assertNotNull(request);
|
||||
assertEquals(provider.mlStateIndexSettings().build(), request.settings());
|
||||
assertTrue(request.mappings().containsKey(CategorizerState.TYPE));
|
||||
assertTrue(request.mappings().containsKey(Quantiles.TYPE.getPreferredName()));
|
||||
assertTrue(request.mappings().containsKey(ModelState.TYPE.getPreferredName()));
|
||||
assertEquals(3, request.mappings().size());
|
||||
assertEquals(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()), request.patterns());
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCreateJob() throws InterruptedException, ExecutionException {
|
||||
Job.Builder job = buildJobBuilder("marscapone");
|
||||
job.setDescription("This is a very cheesy job");
|
||||
AnalysisLimits limits = new AnalysisLimits(9878695309134L, null);
|
||||
job.setAnalysisLimits(limits);
|
||||
|
||||
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
||||
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME)
|
||||
.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName(job.getId()), captor);
|
||||
|
||||
Client client = clientBuilder.build();
|
||||
JobProvider provider = createProvider(client);
|
||||
AtomicReference<Boolean> resultHolder = new AtomicReference<>();
|
||||
|
||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA)).build();
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
|
||||
doAnswer(invocationOnMock -> {
|
||||
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
|
||||
task.execute(cs);
|
||||
return null;
|
||||
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class));
|
||||
|
||||
provider.createJobResultIndex(job.build(), cs, new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(Boolean aBoolean) {
|
||||
resultHolder.set(aBoolean);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
|
||||
}
|
||||
});
|
||||
assertNotNull(resultHolder.get());
|
||||
assertTrue(resultHolder.get());
|
||||
}
|
||||
|
||||
public void testDeleteJob() throws InterruptedException, ExecutionException, IOException {
|
||||
public void testDeleteJobRelatedIndices() throws InterruptedException, ExecutionException, IOException {
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<DeleteJobAction.Response> actionListener = mock(ActionListener.class);
|
||||
String jobId = "ThisIsMyJob";
|
||||
|
@ -492,7 +451,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
assertTrue(responseCaptor.getValue().isAcknowledged());
|
||||
}
|
||||
|
||||
public void testDeleteJob_InvalidIndex() throws InterruptedException, ExecutionException, IOException {
|
||||
public void testDeleteJobRelatedIndices_InvalidIndex() throws InterruptedException, ExecutionException, IOException {
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<DeleteJobAction.Response> actionListener = mock(ActionListener.class);
|
||||
String jobId = "ThisIsMyJob";
|
||||
|
@ -525,7 +484,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
source.add(map);
|
||||
|
||||
QueryBuilder[] queryBuilderHolder = new QueryBuilder[1];
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
int from = 0;
|
||||
int size = 10;
|
||||
Client client = getMockedClient(queryBuilder -> {queryBuilderHolder[0] = queryBuilder;}, response);
|
||||
|
@ -559,7 +518,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
source.add(map);
|
||||
|
||||
QueryBuilder[] queryBuilderHolder = new QueryBuilder[1];
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
int from = 99;
|
||||
int size = 17;
|
||||
|
||||
|
@ -594,7 +553,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
source.add(map);
|
||||
|
||||
QueryBuilder[] queryBuilderHolder = new QueryBuilder[1];
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
int from = 99;
|
||||
int size = 17;
|
||||
|
||||
|
@ -626,7 +585,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
Long timestamp = 98765432123456789L;
|
||||
List<Map<String, Object>> source = new ArrayList<>();
|
||||
|
||||
SearchResponse response = createSearchResponse(false, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
|
||||
Client client = getMockedClient(queryBuilder -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
@ -650,7 +609,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
map.put("bucket_span", 22);
|
||||
source.add(map);
|
||||
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(queryBuilder -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -678,7 +637,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
map.put("is_interim", true);
|
||||
source.add(map);
|
||||
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(queryBuilder -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -717,7 +676,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
int from = 14;
|
||||
int size = 2;
|
||||
String sortfield = "minefield";
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(qb -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -767,7 +726,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
int from = 14;
|
||||
int size = 2;
|
||||
String sortfield = "minefield";
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
|
||||
Client client = getMockedClient(qb -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
@ -825,7 +784,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
int from = 14;
|
||||
int size = 2;
|
||||
String sortfield = "minefield";
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(qb -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -862,7 +821,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
source.add(recordMap);
|
||||
}
|
||||
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(qb -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -891,7 +850,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
source.add(recordMap);
|
||||
}
|
||||
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(qb -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -917,7 +876,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
|
||||
source.add(map);
|
||||
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
int from = 0;
|
||||
int size = 10;
|
||||
Client client = getMockedClient(q -> {}, response);
|
||||
|
@ -943,7 +902,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
source.put("category_id", categoryId);
|
||||
source.put("terms", terms);
|
||||
|
||||
SearchResponse response = createSearchResponse(true, Collections.singletonList(source));
|
||||
SearchResponse response = createSearchResponse(Collections.singletonList(source));
|
||||
Client client = getMockedClient(q -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
|
@ -986,7 +945,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
int from = 4;
|
||||
int size = 3;
|
||||
QueryBuilder[] qbHolder = new QueryBuilder[1];
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(q -> qbHolder[0] = q, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -1048,7 +1007,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
int from = 4;
|
||||
int size = 3;
|
||||
QueryBuilder[] qbHolder = new QueryBuilder[1];
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(q -> qbHolder[0] = q, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -1103,7 +1062,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
|
||||
int from = 4;
|
||||
int size = 3;
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(qb -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -1154,7 +1113,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
int from = 4;
|
||||
int size = 3;
|
||||
QueryBuilder[] qbHolder = new QueryBuilder[1];
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(qb -> qbHolder[0] = qb, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -1312,7 +1271,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
return getResponse;
|
||||
}
|
||||
|
||||
private static SearchResponse createSearchResponse(boolean exists, List<Map<String, Object>> source) throws IOException {
|
||||
private static SearchResponse createSearchResponse(List<Map<String, Object>> source) throws IOException {
|
||||
SearchResponse response = mock(SearchResponse.class);
|
||||
List<SearchHit> list = new ArrayList<>();
|
||||
|
||||
|
|
|
@ -24,6 +24,8 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequestBuil
|
|||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
|
||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
|
@ -348,6 +350,19 @@ public class MockClientBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
public MockClientBuilder putTemplate(ArgumentCaptor<PutIndexTemplateRequest> requestCaptor) {
|
||||
doAnswer(new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
ActionListener<PutIndexTemplateResponse> listener =
|
||||
(ActionListener<PutIndexTemplateResponse>) invocationOnMock.getArguments()[1];
|
||||
listener.onResponse(mock(PutIndexTemplateResponse.class));
|
||||
return null;
|
||||
}
|
||||
}).when(indicesAdminClient).putTemplate(requestCaptor.capture(), any(ActionListener.class));
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
public Client build() {
|
||||
return client;
|
||||
|
|
|
@ -34,14 +34,6 @@
|
|||
}
|
||||
- match: { job_id: "farequote" }
|
||||
|
||||
- do:
|
||||
indices.get:
|
||||
index: ".ml-anomalies-farequote"
|
||||
|
||||
- do:
|
||||
indices.get:
|
||||
index: ".ml-state"
|
||||
|
||||
- do:
|
||||
xpack.ml.get_jobs:
|
||||
job_id: "farequote"
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
setup:
|
||||
- do:
|
||||
indices.create:
|
||||
index: .ml-anomalies-farequote
|
||||
body:
|
||||
mappings:
|
||||
results:
|
||||
properties:
|
||||
"job_id":
|
||||
type: keyword
|
||||
"result_type":
|
||||
type: keyword
|
||||
"timestamp":
|
||||
type: date
|
||||
xpack.ml.put_job:
|
||||
job_id: farequote
|
||||
body: >
|
||||
{
|
||||
"analysis_config" : {
|
||||
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
|
||||
},
|
||||
"data_description" : {
|
||||
"format":"JSON",
|
||||
"time_field":"time"
|
||||
}
|
||||
}
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
|
|
@ -1,31 +1,17 @@
|
|||
setup:
|
||||
- do:
|
||||
indices.create:
|
||||
index: .ml-anomalies-farequote
|
||||
body:
|
||||
mappings:
|
||||
record:
|
||||
properties:
|
||||
"job_id":
|
||||
type: keyword
|
||||
"result_type":
|
||||
type: keyword
|
||||
"timestamp":
|
||||
type: date
|
||||
"normalized_probability":
|
||||
type: float
|
||||
"anomaly_score":
|
||||
type: float
|
||||
"over_field_value":
|
||||
type: keyword
|
||||
"partition_field_value":
|
||||
type: keyword
|
||||
"by_field_value":
|
||||
type: keyword
|
||||
"field_name":
|
||||
type: keyword
|
||||
"function":
|
||||
type: keyword
|
||||
xpack.ml.put_job:
|
||||
job_id: farequote
|
||||
body: >
|
||||
{
|
||||
"analysis_config" : {
|
||||
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
|
||||
},
|
||||
"data_description" : {
|
||||
"format":"JSON",
|
||||
"time_field":"time"
|
||||
}
|
||||
}
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
|
Loading…
Reference in New Issue