[ML] Fix race condition when creating multiple jobs (#40049)

If multiple jobs are created together and the anomaly
results index does not exist then some of the jobs could
fail to update the mappings of the results index. This
lead them to fail to write their results correctly later.

Although this scenario sounds rare, it is exactly what
happens if the user creates their first jobs using the
Nginx module in the ML UI.

This change fixes the problem by updating the mappings
of the results index if it is found to exist during a
creation attempt.

Fixes #38785
This commit is contained in:
David Roberts 2019-03-15 09:25:55 +00:00
parent 78a9754318
commit 8d01b11918
3 changed files with 104 additions and 23 deletions

View File

@ -16,6 +16,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
@ -37,12 +39,12 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
@ -301,30 +303,53 @@ public class JobResultsProvider {
// so we need to handle that possibility // so we need to handle that possibility
if (e instanceof ResourceAlreadyExistsException) { if (e instanceof ResourceAlreadyExistsException) {
LOGGER.info("Index already exists"); LOGGER.info("Index already exists");
// Create the alias // Add the term field mappings and alias. The complication is that the state at the
createAliasListener.onResponse(true); // beginning of the operation doesn't have any knowledge of the index, as it's only
// just been created. So we need yet another operation to get the mappings for it.
getLatestIndexMappings(indexName, ActionListener.wrap(
response -> {
// Expect one index and one type. If this is not the case then it means the
// index has been deleted almost immediately after being created, and this is
// so unlikely that it's reasonable to fail the whole operation.
ImmutableOpenMap<String, MappingMetaData> indexMappings =
response.getMappings().iterator().next().value;
MappingMetaData typeMappings = indexMappings.iterator().next().value;
addTermsAndAliases(typeMappings, indexName, termFields, createAliasListener);
},
finalListener::onFailure
));
} else { } else {
finalListener.onFailure(e); finalListener.onFailure(e);
} }
} }
), client.admin().indices()::create); ), client.admin().indices()::create);
} else { } else {
long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings); MappingMetaData mapping = state.metaData().index(indexName).mapping();
IndexMetaData indexMetaData = state.metaData().index(indexName); addTermsAndAliases(mapping, indexName, termFields, createAliasListener);
if (violatedFieldCountLimit(termFields.size(), fieldCountLimit, indexMetaData)) {
String message = "Cannot create job in index '" + indexName + "' as the " +
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated";
finalListener.onFailure(new IllegalArgumentException(message));
} else {
updateIndexMappingWithTermFields(indexName, indexMetaData.mapping().type(), termFields,
ActionListener.wrap(createAliasListener::onResponse, finalListener::onFailure));
}
} }
} }
public static boolean violatedFieldCountLimit(long additionalFieldCount, long fieldCountLimit, IndexMetaData indexMetaData) { private void getLatestIndexMappings(final String indexName, final ActionListener<GetMappingsResponse> listener) {
MappingMetaData mapping = indexMetaData.mapping();
GetMappingsRequest getMappingsRequest = client.admin().indices().prepareGetMappings(indexName).request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, getMappingsRequest, listener,
client.admin().indices()::getMappings);
}
private void addTermsAndAliases(final MappingMetaData mapping, final String indexName, final Collection<String> termFields,
final ActionListener<Boolean> listener) {
long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings);
if (violatedFieldCountLimit(termFields.size(), fieldCountLimit, mapping)) {
String message = "Cannot create job in index '" + indexName + "' as the " +
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated";
listener.onFailure(new IllegalArgumentException(message));
} else {
updateIndexMappingWithTermFields(indexName, mapping.type(), termFields, listener);
}
}
public static boolean violatedFieldCountLimit(long additionalFieldCount, long fieldCountLimit, MappingMetaData mapping) {
long numFields = countFields(mapping.sourceAsMap()); long numFields = countFields(mapping.sourceAsMap());
return numFields + additionalFieldCount > fieldCountLimit; return numFields + additionalFieldCount > fieldCountLimit;
} }

View File

@ -5,13 +5,19 @@
*/ */
package org.elasticsearch.xpack.ml.integration; package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -32,6 +38,7 @@ import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.config.RuleAction; import org.elasticsearch.xpack.core.ml.job.config.RuleAction;
import org.elasticsearch.xpack.core.ml.job.config.RuleScope; import org.elasticsearch.xpack.core.ml.job.config.RuleScope;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCountsTests; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCountsTests;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
@ -55,6 +62,7 @@ import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -79,6 +87,54 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
waitForMlTemplates(); waitForMlTemplates();
} }
public void testMultipleSimultaneousJobCreations() {
int numJobs = randomIntBetween(4, 7);
// Each job should result in one extra field being added to the results index mappings: field1, field2, field3, etc.
// Due to all being created simultaneously this test may reveal race conditions in the code that updates the mappings.
List<PutJobAction.Request> requests = new ArrayList<>(numJobs);
for (int i = 1; i <= numJobs; ++i) {
Job.Builder builder = new Job.Builder("job" + i);
AnalysisConfig.Builder ac = createAnalysisConfig("field" + i, Collections.emptyList());
DataDescription.Builder dc = new DataDescription.Builder();
builder.setAnalysisConfig(ac);
builder.setDataDescription(dc);
requests.add(new PutJobAction.Request(builder));
}
// Start the requests as close together as possible, without waiting for each to complete before starting the next one.
List<ActionFuture<PutJobAction.Response>> futures = new ArrayList<>(numJobs);
for (PutJobAction.Request request : requests) {
futures.add(client().execute(PutJobAction.INSTANCE, request));
}
// Only after all requests are in-flight, wait for all the requests to complete.
for (ActionFuture<PutJobAction.Response> future : futures) {
future.actionGet();
}
// Assert that the mappings contain all the additional fields: field1, field2, field3, etc.
String sharedResultsIndex = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
GetMappingsRequest request = new GetMappingsRequest().indices(sharedResultsIndex);
GetMappingsResponse response = client().execute(GetMappingsAction.INSTANCE, request).actionGet();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> indexMappings = response.getMappings();
assertNotNull(indexMappings);
ImmutableOpenMap<String, MappingMetaData> typeMappings = indexMappings.get(sharedResultsIndex);
assertNotNull("expected " + sharedResultsIndex + " in " + indexMappings, typeMappings);
assertEquals("expected 1 type in " + typeMappings, 1, typeMappings.size());
Map<String, Object> mappings = typeMappings.iterator().next().value.getSourceAsMap();
assertNotNull(mappings);
@SuppressWarnings("unchecked")
Map<String, Object> properties = (Map<String, Object>) mappings.get("properties");
assertNotNull("expected 'properties' field in " + mappings, properties);
for (int i = 1; i <= numJobs; ++i) {
String fieldName = "field" + i;
assertNotNull("expected '" + fieldName + "' field in " + properties, properties.get(fieldName));
}
}
public void testGetCalandarByJobId() throws Exception { public void testGetCalandarByJobId() throws Exception {
List<Calendar> calendars = new ArrayList<>(); List<Calendar> calendars = new ArrayList<>();
calendars.add(new Calendar("empty calendar", Collections.emptyList(), null)); calendars.add(new Calendar("empty calendar", Collections.emptyList(), null));
@ -473,7 +529,7 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
private Job.Builder createJob(String jobId, List<String> filterIds, List<String> jobGroups) { private Job.Builder createJob(String jobId, List<String> filterIds, List<String> jobGroups) {
Job.Builder builder = new Job.Builder(jobId); Job.Builder builder = new Job.Builder(jobId);
builder.setGroups(jobGroups); builder.setGroups(jobGroups);
AnalysisConfig.Builder ac = createAnalysisConfig(filterIds); AnalysisConfig.Builder ac = createAnalysisConfig("by_field", filterIds);
DataDescription.Builder dc = new DataDescription.Builder(); DataDescription.Builder dc = new DataDescription.Builder();
builder.setAnalysisConfig(ac); builder.setAnalysisConfig(ac);
builder.setDataDescription(dc); builder.setDataDescription(dc);
@ -483,14 +539,14 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
return builder; return builder;
} }
private AnalysisConfig.Builder createAnalysisConfig(List<String> filterIds) { private AnalysisConfig.Builder createAnalysisConfig(String byFieldName, List<String> filterIds) {
Detector.Builder detector = new Detector.Builder("mean", "field"); Detector.Builder detector = new Detector.Builder("mean", "field");
detector.setByFieldName("by_field"); detector.setByFieldName(byFieldName);
List<DetectionRule> rules = new ArrayList<>(); List<DetectionRule> rules = new ArrayList<>();
for (String filterId : filterIds) { for (String filterId : filterIds) {
RuleScope.Builder ruleScope = RuleScope.builder(); RuleScope.Builder ruleScope = RuleScope.builder();
ruleScope.include("by_field", filterId); ruleScope.include(byFieldName, filterId);
rules.add(new DetectionRule.Builder(ruleScope).setActions(RuleAction.SKIP_RESULT).build()); rules.add(new DetectionRule.Builder(ruleScope).setActions(RuleAction.SKIP_RESULT).build());
} }

View File

@ -783,10 +783,10 @@ public class JobResultsProviderTests extends ESTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
.putMapping(new MappingMetaData("type1", Collections.singletonMap("properties", mapping))) .putMapping(new MappingMetaData("type1", Collections.singletonMap("properties", mapping)))
.build(); .build();
boolean result = JobResultsProvider.violatedFieldCountLimit(0, 10, indexMetaData1); boolean result = JobResultsProvider.violatedFieldCountLimit(0, 10, indexMetaData1.mapping());
assertFalse(result); assertFalse(result);
result = JobResultsProvider.violatedFieldCountLimit(1, 10, indexMetaData1); result = JobResultsProvider.violatedFieldCountLimit(1, 10, indexMetaData1.mapping());
assertTrue(result); assertTrue(result);
for (; i < 20; i++) { for (; i < 20; i++) {
@ -801,7 +801,7 @@ public class JobResultsProviderTests extends ESTestCase {
.putMapping(new MappingMetaData("type1", Collections.singletonMap("properties", mapping))) .putMapping(new MappingMetaData("type1", Collections.singletonMap("properties", mapping)))
.build(); .build();
result = JobResultsProvider.violatedFieldCountLimit(0, 19, indexMetaData2); result = JobResultsProvider.violatedFieldCountLimit(0, 19, indexMetaData2.mapping());
assertTrue(result); assertTrue(result);
} }