From 8d01b119189e06f8abc4b5bd2a3b6c7532494fae Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 15 Mar 2019 09:25:55 +0000 Subject: [PATCH] [ML] Fix race condition when creating multiple jobs (#40049) If multiple jobs are created together and the anomaly results index does not exist then some of the jobs could fail to update the mappings of the results index. This lead them to fail to write their results correctly later. Although this scenario sounds rare, it is exactly what happens if the user creates their first jobs using the Nginx module in the ML UI. This change fixes the problem by updating the mappings of the results index if it is found to exist during a creation attempt. Fixes #38785 --- .../job/persistence/JobResultsProvider.java | 57 ++++++++++++----- .../ml/integration/JobResultsProviderIT.java | 64 +++++++++++++++++-- .../persistence/JobResultsProviderTests.java | 6 +- 3 files changed, 104 insertions(+), 23 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java index 6ce92d735b0..f2c53f38515 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java @@ -16,6 +16,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -37,12 +39,12 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -301,30 +303,53 @@ public class JobResultsProvider { // so we need to handle that possibility if (e instanceof ResourceAlreadyExistsException) { LOGGER.info("Index already exists"); - // Create the alias - createAliasListener.onResponse(true); + // Add the term field mappings and alias. The complication is that the state at the + // beginning of the operation doesn't have any knowledge of the index, as it's only + // just been created. So we need yet another operation to get the mappings for it. + getLatestIndexMappings(indexName, ActionListener.wrap( + response -> { + // Expect one index and one type. If this is not the case then it means the + // index has been deleted almost immediately after being created, and this is + // so unlikely that it's reasonable to fail the whole operation. + ImmutableOpenMap indexMappings = + response.getMappings().iterator().next().value; + MappingMetaData typeMappings = indexMappings.iterator().next().value; + addTermsAndAliases(typeMappings, indexName, termFields, createAliasListener); + }, + finalListener::onFailure + )); } else { finalListener.onFailure(e); } } ), client.admin().indices()::create); } else { - long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings); - IndexMetaData indexMetaData = state.metaData().index(indexName); - - if (violatedFieldCountLimit(termFields.size(), fieldCountLimit, indexMetaData)) { - String message = "Cannot create job in index '" + indexName + "' as the " + - MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated"; - finalListener.onFailure(new IllegalArgumentException(message)); - } else { - updateIndexMappingWithTermFields(indexName, indexMetaData.mapping().type(), termFields, - ActionListener.wrap(createAliasListener::onResponse, finalListener::onFailure)); - } + MappingMetaData mapping = state.metaData().index(indexName).mapping(); + addTermsAndAliases(mapping, indexName, termFields, createAliasListener); } } - public static boolean violatedFieldCountLimit(long additionalFieldCount, long fieldCountLimit, IndexMetaData indexMetaData) { - MappingMetaData mapping = indexMetaData.mapping(); + private void getLatestIndexMappings(final String indexName, final ActionListener listener) { + + GetMappingsRequest getMappingsRequest = client.admin().indices().prepareGetMappings(indexName).request(); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, getMappingsRequest, listener, + client.admin().indices()::getMappings); + } + + private void addTermsAndAliases(final MappingMetaData mapping, final String indexName, final Collection termFields, + final ActionListener listener) { + long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings); + + if (violatedFieldCountLimit(termFields.size(), fieldCountLimit, mapping)) { + String message = "Cannot create job in index '" + indexName + "' as the " + + MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated"; + listener.onFailure(new IllegalArgumentException(message)); + } else { + updateIndexMappingWithTermFields(indexName, mapping.type(), termFields, listener); + } + } + + public static boolean violatedFieldCountLimit(long additionalFieldCount, long fieldCountLimit, MappingMetaData mapping) { long numFields = countFields(mapping.sourceAsMap()); return numFields + additionalFieldCount > fieldCountLimit; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index c468736659f..cc1a857b5d8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -5,13 +5,19 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; @@ -32,6 +38,7 @@ import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.RuleAction; import org.elasticsearch.xpack.core.ml.job.config.RuleScope; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCountsTests; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; @@ -55,6 +62,7 @@ import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -79,6 +87,54 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase { waitForMlTemplates(); } + public void testMultipleSimultaneousJobCreations() { + + int numJobs = randomIntBetween(4, 7); + + // Each job should result in one extra field being added to the results index mappings: field1, field2, field3, etc. + // Due to all being created simultaneously this test may reveal race conditions in the code that updates the mappings. + List requests = new ArrayList<>(numJobs); + for (int i = 1; i <= numJobs; ++i) { + Job.Builder builder = new Job.Builder("job" + i); + AnalysisConfig.Builder ac = createAnalysisConfig("field" + i, Collections.emptyList()); + DataDescription.Builder dc = new DataDescription.Builder(); + builder.setAnalysisConfig(ac); + builder.setDataDescription(dc); + + requests.add(new PutJobAction.Request(builder)); + } + + // Start the requests as close together as possible, without waiting for each to complete before starting the next one. + List> futures = new ArrayList<>(numJobs); + for (PutJobAction.Request request : requests) { + futures.add(client().execute(PutJobAction.INSTANCE, request)); + } + + // Only after all requests are in-flight, wait for all the requests to complete. + for (ActionFuture future : futures) { + future.actionGet(); + } + + // Assert that the mappings contain all the additional fields: field1, field2, field3, etc. + String sharedResultsIndex = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT; + GetMappingsRequest request = new GetMappingsRequest().indices(sharedResultsIndex); + GetMappingsResponse response = client().execute(GetMappingsAction.INSTANCE, request).actionGet(); + ImmutableOpenMap> indexMappings = response.getMappings(); + assertNotNull(indexMappings); + ImmutableOpenMap typeMappings = indexMappings.get(sharedResultsIndex); + assertNotNull("expected " + sharedResultsIndex + " in " + indexMappings, typeMappings); + assertEquals("expected 1 type in " + typeMappings, 1, typeMappings.size()); + Map mappings = typeMappings.iterator().next().value.getSourceAsMap(); + assertNotNull(mappings); + @SuppressWarnings("unchecked") + Map properties = (Map) mappings.get("properties"); + assertNotNull("expected 'properties' field in " + mappings, properties); + for (int i = 1; i <= numJobs; ++i) { + String fieldName = "field" + i; + assertNotNull("expected '" + fieldName + "' field in " + properties, properties.get(fieldName)); + } + } + public void testGetCalandarByJobId() throws Exception { List calendars = new ArrayList<>(); calendars.add(new Calendar("empty calendar", Collections.emptyList(), null)); @@ -473,7 +529,7 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase { private Job.Builder createJob(String jobId, List filterIds, List jobGroups) { Job.Builder builder = new Job.Builder(jobId); builder.setGroups(jobGroups); - AnalysisConfig.Builder ac = createAnalysisConfig(filterIds); + AnalysisConfig.Builder ac = createAnalysisConfig("by_field", filterIds); DataDescription.Builder dc = new DataDescription.Builder(); builder.setAnalysisConfig(ac); builder.setDataDescription(dc); @@ -483,14 +539,14 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase { return builder; } - private AnalysisConfig.Builder createAnalysisConfig(List filterIds) { + private AnalysisConfig.Builder createAnalysisConfig(String byFieldName, List filterIds) { Detector.Builder detector = new Detector.Builder("mean", "field"); - detector.setByFieldName("by_field"); + detector.setByFieldName(byFieldName); List rules = new ArrayList<>(); for (String filterId : filterIds) { RuleScope.Builder ruleScope = RuleScope.builder(); - ruleScope.include("by_field", filterId); + ruleScope.include(byFieldName, filterId); rules.add(new DetectionRule.Builder(ruleScope).setActions(RuleAction.SKIP_RESULT).build()); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java index b1020e1536c..2513aeac596 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java @@ -783,10 +783,10 @@ public class JobResultsProviderTests extends ESTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)) .putMapping(new MappingMetaData("type1", Collections.singletonMap("properties", mapping))) .build(); - boolean result = JobResultsProvider.violatedFieldCountLimit(0, 10, indexMetaData1); + boolean result = JobResultsProvider.violatedFieldCountLimit(0, 10, indexMetaData1.mapping()); assertFalse(result); - result = JobResultsProvider.violatedFieldCountLimit(1, 10, indexMetaData1); + result = JobResultsProvider.violatedFieldCountLimit(1, 10, indexMetaData1.mapping()); assertTrue(result); for (; i < 20; i++) { @@ -801,7 +801,7 @@ public class JobResultsProviderTests extends ESTestCase { .putMapping(new MappingMetaData("type1", Collections.singletonMap("properties", mapping))) .build(); - result = JobResultsProvider.violatedFieldCountLimit(0, 19, indexMetaData2); + result = JobResultsProvider.violatedFieldCountLimit(0, 19, indexMetaData2.mapping()); assertTrue(result); }