diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 8c44e00b24a..6ccdbcb2948 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -268,7 +268,7 @@ public class MachineLearning extends Plugin implements ActionPlugin { } JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client); - JobProvider jobProvider = new JobProvider(client, 1, settings); + JobProvider jobProvider = new JobProvider(client, settings); JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client); Auditor auditor = new Auditor(client, clusterService); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index 61087f44d8f..e2f2e31d103 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -264,10 +264,22 @@ public class ElasticsearchMappings { return builder; } - public static XContentBuilder termFieldsMapping(Collection termFields) throws IOException { - XContentBuilder builder = jsonBuilder().startObject().startObject(PROPERTIES); - addTermFields(builder, termFields); - return builder.endObject().endObject(); + static XContentBuilder termFieldsMapping(String type, Collection termFields) { + try { + XContentBuilder builder = jsonBuilder().startObject(); + if (type != null) { + builder.startObject(type); + } + builder.startObject(PROPERTIES); + addTermFields(builder, termFields); + builder.endObject(); + if (type != null) { + builder.endObject(); + } + return builder.endObject(); + } catch (IOException e) { + throw new RuntimeException(e); + } } private static void addTermFields(XContentBuilder builder, Collection termFields) throws IOException { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 7d91fe0cee5..def8dc5a19a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -14,7 +14,6 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; @@ -27,6 +26,8 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; @@ -108,7 +109,7 @@ public class JobProvider { private final Client client; private final Settings settings; - public JobProvider(Client client, int numberOfReplicas, Settings settings) { + public JobProvider(Client client, Settings settings) { this.client = Objects.requireNonNull(client); this.settings = settings; } @@ -116,95 +117,105 @@ public class JobProvider { /** * Create the Elasticsearch index and the mappings */ - public void createJobResultIndex(Job job, ClusterState state, ActionListener listener) { + public void createJobResultIndex(Job job, ClusterState state, final ActionListener finalListener) { Collection termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList(); String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()); String indexName = job.getResultsIndexName(); - final ActionListener responseListener = listener; - listener = ActionListener.wrap(aBoolean -> { + final ActionListener createAliasListener = ActionListener.wrap(success -> { client.admin().indices().prepareAliases() .addAlias(indexName, aliasName, QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId())) - .execute(ActionListener.wrap(r -> responseListener.onResponse(true), responseListener::onFailure)); + // we could return 'sucess && r.isAcknowledged()' instead of 'true', but that makes + // testing not possible as we can't create IndicesAliasesResponse instance or + // mock IndicesAliasesResponse#isAcknowledged() + .execute(ActionListener.wrap(r -> finalListener.onResponse(true), + finalListener::onFailure)); }, - listener::onFailure); + finalListener::onFailure); // Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if // already in the CS if (!state.getMetaData().hasIndex(indexName)) { LOGGER.trace("ES API CALL: create index {}", indexName); CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); - - final ActionListener createdListener = listener; + String type = Result.TYPE.getPreferredName(); + createIndexRequest.mapping(type, ElasticsearchMappings.termFieldsMapping(type, termFields)); client.admin().indices().create(createIndexRequest, ActionListener.wrap( - r -> updateIndexMappingWithTermFields(indexName, termFields, createdListener), + r -> createAliasListener.onResponse(r.isAcknowledged()), e -> { // Possible that the index was created while the request was executing, // so we need to handle that possibility if (e instanceof ResourceAlreadyExistsException) { LOGGER.info("Index already exists"); // Create the alias - createdListener.onResponse(true); + createAliasListener.onResponse(true); } else { - createdListener.onFailure(e); + finalListener.onFailure(e); } } )); } else { - // Add the job's term fields to the index mapping - final ActionListener updateMappingListener = listener; - checkNumberOfFieldsLimit(indexName, termFields.size(), ActionListener.wrap( - r -> updateIndexMappingWithTermFields(indexName, termFields, updateMappingListener), - updateMappingListener::onFailure)); + long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings); + if (violatedFieldCountLimit(indexName, termFields.size(), fieldCountLimit, state)) { + String message = "Cannot create job in index '" + indexName + "' as the " + + MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated"; + finalListener.onFailure(new IllegalArgumentException(message)); + } else { + updateIndexMappingWithTermFields(indexName, termFields, + ActionListener.wrap(createAliasListener::onResponse, finalListener::onFailure)); + } } } + static boolean violatedFieldCountLimit(String indexName, long additionalFieldCount, long fieldCountLimit, ClusterState clusterState) { + long numFields = 0; + IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + Iterator mappings = indexMetaData.getMappings().valuesIt(); + while (mappings.hasNext()) { + MappingMetaData mapping = mappings.next(); + try { + numFields += countFields(mapping.sourceAsMap()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if (numFields + additionalFieldCount > fieldCountLimit) { + return true; + } else { + return false; + } + } + + @SuppressWarnings("unchecked") + static int countFields(Map mapping) { + Object propertiesNode = mapping.get("properties"); + if (propertiesNode != null && propertiesNode instanceof Map) { + mapping = (Map) propertiesNode; + } else { + return 0; + } + + int count = 0; + for (Map.Entry entry : mapping.entrySet()) { + if (entry.getValue() instanceof Map) { + Map fieldMapping = (Map) entry.getValue(); + // take into account object and nested fields: + count += countFields(fieldMapping); + } + count++; + } + return count; + } + private void updateIndexMappingWithTermFields(String indexName, Collection termFields, ActionListener listener) { - try { - client.admin().indices().preparePutMapping(indexName).setType(Result.TYPE.getPreferredName()) - .setSource(ElasticsearchMappings.termFieldsMapping(termFields)) - .execute(new ActionListener() { - @Override - public void onResponse(PutMappingResponse putMappingResponse) { - listener.onResponse(true); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } catch (IOException e) { - listener.onFailure(e); - } - } - - private void checkNumberOfFieldsLimit(String indexName, long additionalFieldCount, ActionListener listener) { - client.admin().indices().prepareGetFieldMappings(indexName).setTypes("*").setFields("*").execute( - new ActionListener() { + client.admin().indices().preparePutMapping(indexName).setType(Result.TYPE.getPreferredName()) + .setSource(ElasticsearchMappings.termFieldsMapping(null, termFields)) + .execute(new ActionListener() { @Override - public void onResponse(GetFieldMappingsResponse getFieldMappingsResponse) { - long numFields = 0; - Map>> indexMappings = - getFieldMappingsResponse.mappings(); - for (String index : indexMappings.keySet()) { - Map> typeMappings = indexMappings.get(index); - for (String type : typeMappings.keySet()) { - Map fieldMappings = typeMappings.get(type); - numFields += fieldMappings.size(); - } - } - - long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings); - if (numFields + additionalFieldCount > fieldCountLimit) { - String message = "Cannot create job in index '" + indexName + "' as the " + - MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated"; - listener.onFailure(new IllegalArgumentException(message)); - } else { - listener.onResponse(true); - } + public void onResponse(PutMappingResponse putMappingResponse) { + listener.onResponse(putMappingResponse.isAcknowledged()); } @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 3eab323bc76..e2f98e17afb 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -76,7 +76,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { jobResultsPersister = new JobResultsPersister(nodeSettings(), client()); Settings.Builder builder = Settings.builder() .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1)); - jobProvider = new JobProvider(client(), 1, builder.build()); + jobProvider = new JobProvider(client(), builder.build()); capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>(); resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, renormalizer, jobResultsPersister) { @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappingsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappingsTests.java index adc537c4fa1..ad8f8a81525 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappingsTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappingsTests.java @@ -5,6 +5,10 @@ */ package org.elasticsearch.xpack.ml.job.persistence; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.ESTestCase; @@ -17,18 +21,12 @@ import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.ReservedFieldNames; import org.elasticsearch.xpack.ml.job.results.Result; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; - import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -129,7 +127,7 @@ public class ElasticsearchMappingsTests extends ESTestCase { @SuppressWarnings("unchecked") public void testTermFieldMapping() throws IOException { - XContentBuilder builder = ElasticsearchMappings.termFieldsMapping(Arrays.asList("apple", "strawberry", + XContentBuilder builder = ElasticsearchMappings.termFieldsMapping(null, Arrays.asList("apple", "strawberry", AnomalyRecord.BUCKET_SPAN.getPreferredName())); XContentParser parser = createParser(builder); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java index a94dc4e417c..c4bcb43159b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -137,7 +138,6 @@ public class JobProviderTests extends ESTestCase { ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); clientBuilder.createIndexRequest(resultsIndexName, captor); clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.jobResultsAliasedName("foo"), jobFilter); - clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName()); Job.Builder job = buildJobBuilder("foo"); JobProvider provider = createProvider(clientBuilder.build()); @@ -1100,6 +1100,63 @@ public class JobProviderTests extends ESTestCase { assertEquals("{\"modName\":\"modVal2\"}", restoreData[2]); } + public void testViolatedFieldCountLimit() throws Exception { + Map mapping = new HashMap<>(); + for (int i = 0; i < 10; i++) { + mapping.put("field" + i, Collections.singletonMap("type", "string")); + } + + IndexMetaData.Builder indexMetaData1 = new IndexMetaData.Builder("index1") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)) + .putMapping(new MappingMetaData("type1", Collections.singletonMap("properties", mapping))); + MetaData metaData = MetaData.builder() + .put(indexMetaData1) + .build(); + boolean result = JobProvider.violatedFieldCountLimit("index1", 0, 10, + ClusterState.builder(new ClusterName("_name")).metaData(metaData).build()); + assertFalse(result); + + result = JobProvider.violatedFieldCountLimit("index1", 1, 10, + ClusterState.builder(new ClusterName("_name")).metaData(metaData).build()); + assertTrue(result); + + IndexMetaData.Builder indexMetaData2 = new IndexMetaData.Builder("index1") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)) + .putMapping(new MappingMetaData("type1", Collections.singletonMap("properties", mapping))) + .putMapping(new MappingMetaData("type2", Collections.singletonMap("properties", mapping))); + metaData = MetaData.builder() + .put(indexMetaData2) + .build(); + result = JobProvider.violatedFieldCountLimit("index1", 0, 19, + ClusterState.builder(new ClusterName("_name")).metaData(metaData).build()); + assertTrue(result); + } + + public void testCountFields() { + Map mapping = new HashMap<>(); + mapping.put("field1", Collections.singletonMap("type", "string")); + mapping.put("field2", Collections.singletonMap("type", "string")); + mapping.put("field3", Collections.singletonMap("type", "string")); + assertEquals(3, JobProvider.countFields(Collections.singletonMap("properties", mapping))); + + Map objectProperties = new HashMap<>(); + objectProperties.put("field4", Collections.singletonMap("type", "string")); + objectProperties.put("field5", Collections.singletonMap("type", "string")); + objectProperties.put("field6", Collections.singletonMap("type", "string")); + Map objectField = new HashMap<>(); + objectField.put("type", "object"); + objectField.put("properties", objectProperties); + + mapping.put("field4", objectField); + assertEquals(7, JobProvider.countFields(Collections.singletonMap("properties", mapping))); + } + private Bucket createBucketAtEpochTime(long epoch) { Bucket b = new Bucket("foo", new Date(epoch), 123); b.setMaxNormalizedProbability(10.0); @@ -1107,7 +1164,7 @@ public class JobProviderTests extends ESTestCase { } private JobProvider createProvider(Client client) { - return new JobProvider(client, 0, Settings.EMPTY); + return new JobProvider(client, Settings.EMPTY); } private static GetResponse createGetResponse(boolean exists, Map source) throws IOException { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java index 72f1deeb198..a25b4b4e1c4 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java @@ -54,7 +54,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; -import java.util.Map; import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertArrayEquals; @@ -179,7 +178,8 @@ public class MockClientBuilder { public MockClientBuilder createIndexRequest(String index, ArgumentCaptor requestCapture) { doAnswer(invocation -> { - ((ActionListener) invocation.getArguments()[1]).onResponse(mock(CreateIndexResponse.class)); + CreateIndexResponse response = new CreateIndexResponse(true, true) {}; + ((ActionListener) invocation.getArguments()[1]).onResponse(response); return null; }).when(indicesAdminClient).create(requestCapture.capture(), any(ActionListener.class)); return this;