From bbce6e85883ef812a80f5713e001f4ad1de47f22 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Wed, 11 Sep 2013 07:08:44 -0700 Subject: [PATCH] Rare race condition when introducing new fields into a mapping Dynamic mapping allow to dynamically introduce new fields into an existing mapping. There is a (pretty rare) race condition, where a new field/object being introduced will not be immediately visible for another document that introduces it at the same time. closes #3667, closes #3544 --- bin/plugin | 50 -------- .../index/mapper/DocumentMapper.java | 32 +---- .../index/mapper/MapperService.java | 115 +++++++++--------- .../index/mapper/ParseContext.java | 27 ++-- .../index/mapper/object/ObjectMapper.java | 64 +++++++--- .../ConcurrentDynamicTemplateTests.java | 91 ++++++++++++++ 6 files changed, 215 insertions(+), 164 deletions(-) delete mode 100644 bin/plugin create mode 100644 src/test/java/org/elasticsearch/test/integration/indices/mapping/ConcurrentDynamicTemplateTests.java diff --git a/bin/plugin b/bin/plugin deleted file mode 100644 index 42727f9fc15..00000000000 --- a/bin/plugin +++ /dev/null @@ -1,50 +0,0 @@ -#!/bin/sh - -CDPATH="" -SCRIPT="$0" - -# SCRIPT may be an arbitrarily deep series of symlinks. Loop until we have the concrete path. -while [ -h "$SCRIPT" ] ; do - ls=`ls -ld "$SCRIPT"` - # Drop everything prior to -> - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - SCRIPT="$link" - else - SCRIPT=`dirname "$SCRIPT"`/"$link" - fi -done - -# determine elasticsearch home -ES_HOME=`dirname "$SCRIPT"`/.. - -# make ELASTICSEARCH_HOME absolute -ES_HOME=`cd "$ES_HOME"; pwd` - - -if [ -x "$JAVA_HOME/bin/java" ]; then - JAVA=$JAVA_HOME/bin/java -else - JAVA=`which java` -fi - -# this is a poor mans getopt replacement -# real getopt cannot be used because we need to hand options over to the PluginManager -while [ $# -gt 0 ]; do - case $1 in - -D*=*) - properties="$properties $1" - ;; - -D*) - var=$1 - shift - properties="$properties $var=$1" - ;; - *) - args="$args $1" - esac - shift -done - -exec $JAVA $JAVA_OPTS -Xmx64m -Xms16m -Delasticsearch -Des.path.home="$ES_HOME" $properties -cp "$ES_HOME/lib/*" org.elasticsearch.plugins.PluginManager $args - diff --git a/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java b/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java index cf53473f68f..0bd3856c6d3 100644 --- a/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java +++ b/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java @@ -272,7 +272,7 @@ public class DocumentMapper implements ToXContent { private final Filter typeFilter; - private final Object mutex = new Object(); + private final Object mappersMutex = new Object(); private boolean initMappersAdded = true; @@ -512,16 +512,6 @@ public class DocumentMapper implements ToXContent { parser.nextToken(); } - // fire up any new mappers if exists - if (!context.newFieldMappers().mappers.isEmpty()) { - addFieldMappers(context.newFieldMappers().mappers); - context.newFieldMappers().mappers.clear(); - } - if (!context.newObjectMappers().mappers.isEmpty()) { - addObjectMappers(context.newObjectMappers().mappers); - context.newObjectMappers().mappers.clear(); - } - for (RootMapper rootMapper : rootMappersOrdered) { rootMapper.postParse(context); } @@ -530,18 +520,6 @@ public class DocumentMapper implements ToXContent { rootMapper.validate(context); } } catch (Throwable e) { - // we have to fire up any new mappers even on a failure, because they - // have been added internally to each compound mapper... - // ... we have no option to "rollback" a change, which is very tricky in our copy on change system... - if (!context.newFieldMappers().mappers.isEmpty()) { - addFieldMappers(context.newFieldMappers().mappers); - context.newFieldMappers().mappers.clear(); - } - if (!context.newObjectMappers().mappers.isEmpty()) { - addObjectMappers(context.newObjectMappers().mappers); - context.newObjectMappers().mappers.clear(); - } - // if its already a mapper parsing exception, no need to wrap it... if (e instanceof MapperParsingException) { throw (MapperParsingException) e; @@ -586,12 +564,12 @@ public class DocumentMapper implements ToXContent { return doc; } - private void addFieldMappers(Collection fieldMappers) { + public void addFieldMappers(Collection fieldMappers) { addFieldMappers(fieldMappers.toArray(new FieldMapper[fieldMappers.size()])); } private void addFieldMappers(FieldMapper... fieldMappers) { - synchronized (mutex) { + synchronized (mappersMutex) { this.fieldMappers = this.fieldMappers.concat(this, fieldMappers); } for (FieldMapperListener listener : fieldMapperListeners) { @@ -615,12 +593,12 @@ public class DocumentMapper implements ToXContent { rootObjectMapper.traverse(listener); } - private void addObjectMappers(Collection objectMappers) { + public void addObjectMappers(Collection objectMappers) { addObjectMappers(objectMappers.toArray(new ObjectMapper[objectMappers.size()])); } private void addObjectMappers(ObjectMapper... objectMappers) { - synchronized (mutex) { + synchronized (mappersMutex) { MapBuilder builder = MapBuilder.newMapBuilder(this.objectMappers); for (ObjectMapper objectMapper : objectMappers) { builder.put(objectMapper.fullPath(), objectMapper); diff --git a/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/src/main/java/org/elasticsearch/index/mapper/MapperService.java index 50a3f9b98f5..d1b1af06574 100644 --- a/src/main/java/org/elasticsearch/index/mapper/MapperService.java +++ b/src/main/java/org/elasticsearch/index/mapper/MapperService.java @@ -84,7 +84,8 @@ public class MapperService extends AbstractIndexComponent implements Iterable mappers = ImmutableMap.of(); - private final Object mutex = new Object(); + private final Object typeMutex = new Object(); + private final Object mappersMutex = new Object(); private volatile Map nameFieldMappers = ImmutableMap.of(); private volatile Map indexNameFieldMappers = ImmutableMap.of(); @@ -225,7 +226,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable fullPathObjectMappers = newMapBuilder(this.fullPathObjectMappers); for (ObjectMapper objectMapper : objectMappers) { ObjectMappers mappers = fullPathObjectMappers.get(objectMapper.fullPath()); @@ -309,7 +310,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable nameFieldMappers = newMapBuilder(this.nameFieldMappers); MapBuilder indexNameFieldMappers = newMapBuilder(this.indexNameFieldMappers); MapBuilder fullNameFieldMappers = newMapBuilder(this.fullNameFieldMappers); @@ -348,7 +349,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable nameFieldMappers = newMapBuilder(this.nameFieldMappers); - MapBuilder indexNameFieldMappers = newMapBuilder(this.indexNameFieldMappers); - MapBuilder fullNameFieldMappers = newMapBuilder(this.fullNameFieldMappers); + synchronized (mappersMutex) { + // we need to remove those mappers + MapBuilder nameFieldMappers = newMapBuilder(this.nameFieldMappers); + MapBuilder indexNameFieldMappers = newMapBuilder(this.indexNameFieldMappers); + MapBuilder fullNameFieldMappers = newMapBuilder(this.fullNameFieldMappers); - for (FieldMapper mapper : docMapper.mappers()) { - FieldMappers mappers = nameFieldMappers.get(mapper.names().name()); - if (mappers != null) { - mappers = mappers.remove(mapper); - if (mappers.isEmpty()) { - nameFieldMappers.remove(mapper.names().name()); - } else { - nameFieldMappers.put(mapper.names().name(), mappers); + for (FieldMapper mapper : docMapper.mappers()) { + FieldMappers mappers = nameFieldMappers.get(mapper.names().name()); + if (mappers != null) { + mappers = mappers.remove(mapper); + if (mappers.isEmpty()) { + nameFieldMappers.remove(mapper.names().name()); + } else { + nameFieldMappers.put(mapper.names().name(), mappers); + } + } + + mappers = indexNameFieldMappers.get(mapper.names().indexName()); + if (mappers != null) { + mappers = mappers.remove(mapper); + if (mappers.isEmpty()) { + indexNameFieldMappers.remove(mapper.names().indexName()); + } else { + indexNameFieldMappers.put(mapper.names().indexName(), mappers); + } + } + + mappers = fullNameFieldMappers.get(mapper.names().fullName()); + if (mappers != null) { + mappers = mappers.remove(mapper); + if (mappers.isEmpty()) { + fullNameFieldMappers.remove(mapper.names().fullName()); + } else { + fullNameFieldMappers.put(mapper.names().fullName(), mappers); + } + } + } + this.nameFieldMappers = nameFieldMappers.map(); + this.indexNameFieldMappers = indexNameFieldMappers.map(); + this.fullNameFieldMappers = fullNameFieldMappers.map(); + + MapBuilder fullPathObjectMappers = newMapBuilder(this.fullPathObjectMappers); + for (ObjectMapper mapper : docMapper.objectMappers().values()) { + ObjectMappers mappers = fullPathObjectMappers.get(mapper.fullPath()); + if (mappers != null) { + mappers = mappers.remove(mapper); + if (mappers.isEmpty()) { + fullPathObjectMappers.remove(mapper.fullPath()); + } else { + fullPathObjectMappers.put(mapper.fullPath(), mappers); + } } } - mappers = indexNameFieldMappers.get(mapper.names().indexName()); - if (mappers != null) { - mappers = mappers.remove(mapper); - if (mappers.isEmpty()) { - indexNameFieldMappers.remove(mapper.names().indexName()); - } else { - indexNameFieldMappers.put(mapper.names().indexName(), mappers); - } - } - - mappers = fullNameFieldMappers.get(mapper.names().fullName()); - if (mappers != null) { - mappers = mappers.remove(mapper); - if (mappers.isEmpty()) { - fullNameFieldMappers.remove(mapper.names().fullName()); - } else { - fullNameFieldMappers.put(mapper.names().fullName(), mappers); - } - } + this.fullPathObjectMappers = fullPathObjectMappers.map(); } - this.nameFieldMappers = nameFieldMappers.map(); - this.indexNameFieldMappers = indexNameFieldMappers.map(); - this.fullNameFieldMappers = fullNameFieldMappers.map(); - - MapBuilder fullPathObjectMappers = newMapBuilder(this.fullPathObjectMappers); - for (ObjectMapper mapper : docMapper.objectMappers().values()) { - ObjectMappers mappers = fullPathObjectMappers.get(mapper.fullPath()); - if (mappers != null) { - mappers = mappers.remove(mapper); - if (mappers.isEmpty()) { - fullPathObjectMappers.remove(mapper.fullPath()); - } else { - fullPathObjectMappers.put(mapper.fullPath(), mappers); - } - } - } - - this.fullPathObjectMappers = fullPathObjectMappers.map(); } /** @@ -457,7 +460,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable ignoredValues = new HashMap(); private boolean mappingsModified = false; + private boolean withinNewMapper = false; private boolean externalValueSet; @@ -82,9 +83,6 @@ public class ParseContext { private float docBoost = 1.0f; - private FieldMapperListener.Aggregator newFieldMappers = new FieldMapperListener.Aggregator(); - private ObjectMapperListener.Aggregator newObjectMappers = new ObjectMapperListener.Aggregator(); - public ParseContext(String index, @Nullable Settings indexSettings, DocumentMapperParser docMapperParser, DocumentMapper docMapper, ContentPath path) { this.index = index; this.indexSettings = indexSettings; @@ -110,26 +108,17 @@ public class ParseContext { this.source = source == null ? null : sourceToParse.source(); this.path.reset(); this.mappingsModified = false; + this.withinNewMapper = false; this.listener = listener == null ? DocumentMapper.ParseListener.EMPTY : listener; this.allEntries = new AllEntries(); this.ignoredValues.clear(); this.docBoost = 1.0f; - this.newFieldMappers.mappers.clear(); - this.newObjectMappers.mappers.clear(); } public boolean flyweight() { return sourceToParse.flyweight(); } - public FieldMapperListener.Aggregator newFieldMappers() { - return newFieldMappers; - } - - public ObjectMapperListener.Aggregator newObjectMappers() { - return newObjectMappers; - } - public DocumentMapperParser docMapperParser() { return this.docMapperParser; } @@ -142,6 +131,18 @@ public class ParseContext { this.mappingsModified = true; } + public void setWithinNewMapper() { + this.withinNewMapper = true; + } + + public void clearWithinNewMapper() { + this.withinNewMapper = false; + } + + public boolean isWithinNewMapper() { + return withinNewMapper; + } + public String index() { return this.index; } diff --git a/src/main/java/org/elasticsearch/index/mapper/object/ObjectMapper.java b/src/main/java/org/elasticsearch/index/mapper/object/ObjectMapper.java index 1d2a0e3a067..b21ea3d62a3 100644 --- a/src/main/java/org/elasticsearch/index/mapper/object/ObjectMapper.java +++ b/src/main/java/org/elasticsearch/index/mapper/object/ObjectMapper.java @@ -518,11 +518,9 @@ public class ObjectMapper implements Mapper, AllFieldMapper.IncludeInAll { } else if (dynamic == Dynamic.TRUE) { // we sync here just so we won't add it twice. Its not the end of the world // to sync here since next operations will get it before - boolean newMapper = false; synchronized (mutex) { objectMapper = mappers.get(currentFieldName); if (objectMapper == null) { - newMapper = true; // remove the current field name from path, since template search and the object builder add it as well... context.path().remove(); Mapper.Builder builder = context.root().findTemplateBuilder(context, currentFieldName, "object"); @@ -535,21 +533,38 @@ public class ObjectMapper implements Mapper, AllFieldMapper.IncludeInAll { } BuilderContext builderContext = new BuilderContext(context.indexSettings(), context.path()); objectMapper = builder.build(builderContext); - putMapper(objectMapper); // ...now re add it context.path().add(currentFieldName); context.setMappingsModified(); + + if (context.isWithinNewMapper()) { + // within a new mapper, no need to traverse, just parse + objectMapper.parse(context); + } else { + // create a context of new mapper, so we batch aggregate all the changes within + // this object mapper once, and traverse all of them to add them in a single go + context.setWithinNewMapper(); + try { + objectMapper.parse(context); + FieldMapperListener.Aggregator newFields = new FieldMapperListener.Aggregator(); + ObjectMapperListener.Aggregator newObjects = new ObjectMapperListener.Aggregator(); + objectMapper.traverse(newFields); + objectMapper.traverse(newObjects); + // callback on adding those fields! + context.docMapper().addFieldMappers(newFields.mappers); + context.docMapper().addObjectMappers(newObjects.mappers); + } finally { + context.clearWithinNewMapper(); + } + } + + // only put after we traversed and did the callbacks, so other parsing won't see it only after we + // properly traversed it and adding the mappers + putMapper(objectMapper); + } else { + objectMapper.parse(context); } } - // traverse and parse outside of the mutex - if (newMapper) { - // we need to traverse in case we have a dynamic template and need to add field mappers - // introduced by it - objectMapper.traverse(context.newFieldMappers()); - objectMapper.traverse(context.newObjectMappers()); - } - // now, parse it - objectMapper.parse(context); } else { // not dynamic, read everything up to end object context.parser().skipChildren(); @@ -607,11 +622,9 @@ public class ObjectMapper implements Mapper, AllFieldMapper.IncludeInAll { // we sync here since we don't want to add this field twice to the document mapper // its not the end of the world, since we add it to the mappers once we create it // so next time we won't even get here for this field - boolean newMapper = false; synchronized (mutex) { mapper = mappers.get(currentFieldName); if (mapper == null) { - newMapper = true; BuilderContext builderContext = new BuilderContext(context.indexSettings(), context.path()); if (token == XContentParser.Token.VALUE_STRING) { boolean resolved = false; @@ -765,14 +778,29 @@ public class ObjectMapper implements Mapper, AllFieldMapper.IncludeInAll { throw new ElasticSearchIllegalStateException("Can't handle serializing a dynamic type with content token [" + token + "] and field name [" + currentFieldName + "]"); } } + + if (context.isWithinNewMapper()) { + mapper.parse(context); + } else { + context.setWithinNewMapper(); + try { + mapper.parse(context); + FieldMapperListener.Aggregator newFields = new FieldMapperListener.Aggregator(); + mapper.traverse(newFields); + context.docMapper().addFieldMappers(newFields.mappers); + } finally { + context.clearWithinNewMapper(); + } + } + + // only put after we traversed and did the callbacks, so other parsing won't see it only after we + // properly traversed it and adding the mappers putMapper(mapper); context.setMappingsModified(); + } else { + mapper.parse(context); } } - if (newMapper) { - mapper.traverse(context.newFieldMappers()); - } - mapper.parse(context); } @Override diff --git a/src/test/java/org/elasticsearch/test/integration/indices/mapping/ConcurrentDynamicTemplateTests.java b/src/test/java/org/elasticsearch/test/integration/indices/mapping/ConcurrentDynamicTemplateTests.java new file mode 100644 index 00000000000..47389af6b0d --- /dev/null +++ b/src/test/java/org/elasticsearch/test/integration/indices/mapping/ConcurrentDynamicTemplateTests.java @@ -0,0 +1,91 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.indices.mapping; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.index.query.MatchQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.test.integration.AbstractSharedClusterTest; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.Matchers.emptyIterable; + +public class ConcurrentDynamicTemplateTests extends AbstractSharedClusterTest { + + private final String mappingType = "test-mapping"; + + @Test // see #3544 + public void testConcurrentDynamicMapping() throws Exception { + final String mapping = "{" + mappingType + ": {" + "\"properties\": {" + "\"an_id\": {" + + "\"type\": \"string\"," + "\"store\": \"yes\"," + "\"index\": \"not_analyzed\"" + "}" + "}," + "\"dynamic_templates\": [" + + "{" + "\"participants\": {" + "\"path_match\": \"*\"," + "\"mapping\": {" + "\"type\": \"string\"," + "\"store\": \"yes\"," + + "\"index\": \"analyzed\"," + "\"analyzer\": \"whitespace\"" + "}" + "}" + "}" + "]" + "}" + "}"; + // The 'fieldNames' array is used to help with retrieval of index terms + // after testing + + final String fieldName = "participants.ACCEPTED"; + int iters = atLeast(5); + for (int i = 0; i < iters; i++) { + wipeIndex("test"); + client().admin().indices().prepareCreate("test").addMapping(mappingType, mapping).execute().actionGet(); + ensureYellow(); + int numDocs = atLeast(5); + final CountDownLatch latch = new CountDownLatch(numDocs); + final List throwable = new CopyOnWriteArrayList(); + for (int j = 0; j < numDocs; j++) { + Map source = new HashMap(); + source.put("an_id", UUID.randomUUID().toString()); + source.put(fieldName, "test-user"); + client().prepareIndex("test", mappingType).setSource(source).setConsistencyLevel(WriteConsistencyLevel.QUORUM).execute(new ActionListener() { + @Override + public void onResponse(IndexResponse response) { + latch.countDown(); + } + + @Override + public void onFailure(Throwable e) { + throwable.add(e); + latch.countDown(); + } + }); + } + latch.await(); + assertThat(throwable, emptyIterable()); + refresh(); + MatchQueryBuilder builder = QueryBuilders.matchQuery(fieldName, "test-user"); + SearchHits sh = client().prepareSearch("test").setQuery(builder).execute().actionGet().getHits(); + assertEquals(sh.getTotalHits(), numDocs); + + assertEquals(client().prepareSearch("test").setQuery(QueryBuilders.matchQuery(fieldName, "test user")).execute().actionGet().getHits().getTotalHits(), 0); + + } + } + +} \ No newline at end of file