diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 671c6ef9969..7d91fe0cee5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -14,7 +14,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; @@ -27,11 +27,9 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -107,7 +105,6 @@ public class JobProvider { private static final int RECORDS_SIZE_PARAM = 500; - private final Client client; private final Settings settings; @@ -122,49 +119,46 @@ public class JobProvider { public void createJobResultIndex(Job job, ClusterState state, ActionListener listener) { Collection termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList(); + String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()); + String indexName = job.getResultsIndexName(); - String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()); - String indexName = job.getResultsIndexName(); + final ActionListener responseListener = listener; + listener = ActionListener.wrap(aBoolean -> { + client.admin().indices().prepareAliases() + .addAlias(indexName, aliasName, QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId())) + .execute(ActionListener.wrap(r -> responseListener.onResponse(true), responseListener::onFailure)); + }, + listener::onFailure); + // Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if + // already in the CS + if (!state.getMetaData().hasIndex(indexName)) { + LOGGER.trace("ES API CALL: create index {}", indexName); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); - final ActionListener responseListener = listener; - listener = ActionListener.wrap(aBoolean -> { - client.admin().indices().prepareAliases() - .addAlias(indexName, aliasName) - .execute(ActionListener.wrap(r -> responseListener.onResponse(true), responseListener::onFailure)); - }, - listener::onFailure); - - // Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if - // already in the CS - if (!state.getMetaData().hasIndex(indexName)) { - LOGGER.trace("ES API CALL: create index {}", indexName); - CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); - - final ActionListener createdListener = listener; - client.admin().indices().create(createIndexRequest, - ActionListener.wrap( - r -> updateIndexMappingWithTermFields(indexName, termFields, createdListener), - e -> { - // Possible that the index was created while the request was executing, - // so we need to handle that possibility - if (e instanceof ResourceAlreadyExistsException) { - LOGGER.info("Index already exists"); - // Create the alias - createdListener.onResponse(true); - } else { - createdListener.onFailure(e); - } + final ActionListener createdListener = listener; + client.admin().indices().create(createIndexRequest, + ActionListener.wrap( + r -> updateIndexMappingWithTermFields(indexName, termFields, createdListener), + e -> { + // Possible that the index was created while the request was executing, + // so we need to handle that possibility + if (e instanceof ResourceAlreadyExistsException) { + LOGGER.info("Index already exists"); + // Create the alias + createdListener.onResponse(true); + } else { + createdListener.onFailure(e); } - )); - } else { - // Add the job's term fields to the index mapping - final ActionListener updateMappingListener = listener; - checkNumberOfFieldsLimit(indexName, termFields.size(), ActionListener.wrap( - r -> updateIndexMappingWithTermFields(indexName, termFields, updateMappingListener), - updateMappingListener::onFailure)); - } - + } + )); + } else { + // Add the job's term fields to the index mapping + final ActionListener updateMappingListener = listener; + checkNumberOfFieldsLimit(indexName, termFields.size(), ActionListener.wrap( + r -> updateIndexMappingWithTermFields(indexName, termFields, updateMappingListener), + updateMappingListener::onFailure)); + } } private void updateIndexMappingWithTermFields(String indexName, Collection termFields, ActionListener listener) { @@ -188,37 +182,36 @@ public class JobProvider { } private void checkNumberOfFieldsLimit(String indexName, long additionalFieldCount, ActionListener listener) { - client.admin().indices().prepareGetMappings(indexName).execute(new ActionListener() { - @Override - public void onResponse(GetMappingsResponse getMappingsResponse) { - ImmutableOpenMap typeMappings = getMappingsResponse.mappings().get(indexName); - Iterator> iter = typeMappings.iterator(); - long numFields = 0; - try { - while (iter.hasNext()) { - Map props = (Map)iter.next().value.getSourceAsMap().get("properties"); - numFields += props.size(); + client.admin().indices().prepareGetFieldMappings(indexName).setTypes("*").setFields("*").execute( + new ActionListener() { + @Override + public void onResponse(GetFieldMappingsResponse getFieldMappingsResponse) { + long numFields = 0; + Map>> indexMappings = + getFieldMappingsResponse.mappings(); + for (String index : indexMappings.keySet()) { + Map> typeMappings = indexMappings.get(index); + for (String type : typeMappings.keySet()) { + Map fieldMappings = typeMappings.get(type); + numFields += fieldMappings.size(); + } + } + + long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings); + if (numFields + additionalFieldCount > fieldCountLimit) { + String message = "Cannot create job in index '" + indexName + "' as the " + + MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated"; + listener.onFailure(new IllegalArgumentException(message)); + } else { + listener.onResponse(true); + } } - } - catch (IOException e) { - listener.onFailure(e); - } - long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings); - if (numFields + additionalFieldCount > fieldCountLimit) { - String message = "Cannot create job in index '" + indexName + "' as the " + - MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated"; - listener.onFailure(new IllegalArgumentException(message)); - } else { - listener.onResponse(true); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } /** @@ -792,7 +785,7 @@ public class JobProvider { * @param size number of snapshots to retrieve */ public void modelSnapshots(String jobId, int from, int size, Consumer> handler, - Consumer errorHandler) { + Consumer errorHandler) { modelSnapshots(jobId, from, size, null, true, QueryBuilders.matchAllQuery(), handler, errorHandler); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index 0d659d07d98..3393c3c5365 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -259,7 +259,8 @@ public class MlJobIT extends ESRestTestCase { String responseAsString = responseEntityToString(response); assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName("custom-" + indexName) - + "\":{\"aliases\":{\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "\":{},\"" + + + "\":{\"aliases\":{\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + + "\":{\"filter\":{\"term\":{\"job_id\":{\"value\":\"" + jobId1 + "\",\"boost\":1.0}}}},\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId2))); response = client().performRequest("get", "_cat/indices"); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java index 240d01c6fda..a94dc4e417c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java @@ -31,14 +31,15 @@ import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.SearchHits; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery; import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; @@ -73,7 +74,6 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -131,11 +131,12 @@ public class JobProviderTests extends ESTestCase { @SuppressWarnings("unchecked") public void testCreateJobResultsIndex() { String resultsIndexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT; + QueryBuilder jobFilter = QueryBuilders.termQuery("job_id", "foo"); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); clientBuilder.createIndexRequest(resultsIndexName, captor); - clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.jobResultsAliasedName("foo")); + clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.jobResultsAliasedName("foo"), jobFilter); clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName()); Job.Builder job = buildJobBuilder("foo"); @@ -175,9 +176,10 @@ public class JobProviderTests extends ESTestCase { @SuppressWarnings("unchecked") public void testCreateJobWithExistingIndex() { + QueryBuilder jobFilter = QueryBuilders.termQuery("job_id", "foo"); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsAliasedName("foo"), - AnomalyDetectorsIndex.jobResultsAliasedName("foo123")); + AnomalyDetectorsIndex.jobResultsAliasedName("foo123"), jobFilter); clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName()); GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class); @@ -239,11 +241,12 @@ public class JobProviderTests extends ESTestCase { public void testCreateJobRelatedIndicies_createsAliasBecauseIndexNameIsSet() { String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-bar"; String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName("foo"); + QueryBuilder jobFilter = QueryBuilders.termQuery("job_id", "foo"); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); clientBuilder.createIndexRequest(indexName, captor); - clientBuilder.prepareAlias(indexName, aliasName); + clientBuilder.prepareAlias(indexName, aliasName, jobFilter); clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName()); Job.Builder job = buildJobBuilder("foo"); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java index b6fc209bd1a..72f1deeb198 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java @@ -281,9 +281,9 @@ public class MockClientBuilder { } @SuppressWarnings("unchecked") - public MockClientBuilder prepareAlias(String indexName, String alias) { + public MockClientBuilder prepareAlias(String indexName, String alias, QueryBuilder filter) { IndicesAliasesRequestBuilder aliasesRequestBuilder = mock(IndicesAliasesRequestBuilder.class); - when(aliasesRequestBuilder.addAlias(eq(indexName), eq(alias))).thenReturn(aliasesRequestBuilder); + when(aliasesRequestBuilder.addAlias(eq(indexName), eq(alias), eq(filter))).thenReturn(aliasesRequestBuilder); when(indicesAdminClient.prepareAliases()).thenReturn(aliasesRequestBuilder); doAnswer(new Answer() { @Override