Rare race condition when introducing new fields into a mapping

Dynamic mapping allow to dynamically introduce new fields into an existing mapping. There is a (pretty rare) race condition, where a new field/object being introduced will not be immediately visible for another document that introduces it at the same time.

closes #3667, closes #3544
This commit is contained in:
Shay Banon 2013-09-11 07:08:44 -07:00
parent 012797a82c
commit bbce6e8588
6 changed files with 215 additions and 164 deletions

View File

@ -1,50 +0,0 @@
#!/bin/sh
CDPATH=""
SCRIPT="$0"
# SCRIPT may be an arbitrarily deep series of symlinks. Loop until we have the concrete path.
while [ -h "$SCRIPT" ] ; do
ls=`ls -ld "$SCRIPT"`
# Drop everything prior to ->
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
SCRIPT="$link"
else
SCRIPT=`dirname "$SCRIPT"`/"$link"
fi
done
# determine elasticsearch home
ES_HOME=`dirname "$SCRIPT"`/..
# make ELASTICSEARCH_HOME absolute
ES_HOME=`cd "$ES_HOME"; pwd`
if [ -x "$JAVA_HOME/bin/java" ]; then
JAVA=$JAVA_HOME/bin/java
else
JAVA=`which java`
fi
# this is a poor mans getopt replacement
# real getopt cannot be used because we need to hand options over to the PluginManager
while [ $# -gt 0 ]; do
case $1 in
-D*=*)
properties="$properties $1"
;;
-D*)
var=$1
shift
properties="$properties $var=$1"
;;
*)
args="$args $1"
esac
shift
done
exec $JAVA $JAVA_OPTS -Xmx64m -Xms16m -Delasticsearch -Des.path.home="$ES_HOME" $properties -cp "$ES_HOME/lib/*" org.elasticsearch.plugins.PluginManager $args

View File

@ -272,7 +272,7 @@ public class DocumentMapper implements ToXContent {
private final Filter typeFilter;
private final Object mutex = new Object();
private final Object mappersMutex = new Object();
private boolean initMappersAdded = true;
@ -512,16 +512,6 @@ public class DocumentMapper implements ToXContent {
parser.nextToken();
}
// fire up any new mappers if exists
if (!context.newFieldMappers().mappers.isEmpty()) {
addFieldMappers(context.newFieldMappers().mappers);
context.newFieldMappers().mappers.clear();
}
if (!context.newObjectMappers().mappers.isEmpty()) {
addObjectMappers(context.newObjectMappers().mappers);
context.newObjectMappers().mappers.clear();
}
for (RootMapper rootMapper : rootMappersOrdered) {
rootMapper.postParse(context);
}
@ -530,18 +520,6 @@ public class DocumentMapper implements ToXContent {
rootMapper.validate(context);
}
} catch (Throwable e) {
// we have to fire up any new mappers even on a failure, because they
// have been added internally to each compound mapper...
// ... we have no option to "rollback" a change, which is very tricky in our copy on change system...
if (!context.newFieldMappers().mappers.isEmpty()) {
addFieldMappers(context.newFieldMappers().mappers);
context.newFieldMappers().mappers.clear();
}
if (!context.newObjectMappers().mappers.isEmpty()) {
addObjectMappers(context.newObjectMappers().mappers);
context.newObjectMappers().mappers.clear();
}
// if its already a mapper parsing exception, no need to wrap it...
if (e instanceof MapperParsingException) {
throw (MapperParsingException) e;
@ -586,12 +564,12 @@ public class DocumentMapper implements ToXContent {
return doc;
}
private void addFieldMappers(Collection<FieldMapper> fieldMappers) {
public void addFieldMappers(Collection<FieldMapper> fieldMappers) {
addFieldMappers(fieldMappers.toArray(new FieldMapper[fieldMappers.size()]));
}
private void addFieldMappers(FieldMapper... fieldMappers) {
synchronized (mutex) {
synchronized (mappersMutex) {
this.fieldMappers = this.fieldMappers.concat(this, fieldMappers);
}
for (FieldMapperListener listener : fieldMapperListeners) {
@ -615,12 +593,12 @@ public class DocumentMapper implements ToXContent {
rootObjectMapper.traverse(listener);
}
private void addObjectMappers(Collection<ObjectMapper> objectMappers) {
public void addObjectMappers(Collection<ObjectMapper> objectMappers) {
addObjectMappers(objectMappers.toArray(new ObjectMapper[objectMappers.size()]));
}
private void addObjectMappers(ObjectMapper... objectMappers) {
synchronized (mutex) {
synchronized (mappersMutex) {
MapBuilder<String, ObjectMapper> builder = MapBuilder.newMapBuilder(this.objectMappers);
for (ObjectMapper objectMapper : objectMappers) {
builder.put(objectMapper.fullPath(), objectMapper);

View File

@ -84,7 +84,8 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
private volatile Map<String, DocumentMapper> mappers = ImmutableMap.of();
private final Object mutex = new Object();
private final Object typeMutex = new Object();
private final Object mappersMutex = new Object();
private volatile Map<String, FieldMappers> nameFieldMappers = ImmutableMap.of();
private volatile Map<String, FieldMappers> indexNameFieldMappers = ImmutableMap.of();
@ -225,7 +226,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
DocumentMapper mapper = documentParser.parse(type, mappingSource);
// still add it as a document mapper so we have it registered and, for example, persisted back into
// the cluster meta data if needed, or checked for existence
synchronized (mutex) {
synchronized (typeMutex) {
mappers = newMapBuilder(mappers).put(type, mapper).map();
}
defaultMappingSource = mappingSource;
@ -238,7 +239,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
// never expose this to the outside world, we need to reparse the doc mapper so we get fresh
// instances of field mappers to properly remove existing doc mapper
private DocumentMapper merge(DocumentMapper mapper) {
synchronized (mutex) {
synchronized (typeMutex) {
if (mapper.type().length() == 0) {
throw new InvalidTypeNameException("mapping type name is empty");
}
@ -289,7 +290,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
}
private void addObjectMappers(ObjectMapper[] objectMappers) {
synchronized (mutex) {
synchronized (mappersMutex) {
MapBuilder<String, ObjectMappers> fullPathObjectMappers = newMapBuilder(this.fullPathObjectMappers);
for (ObjectMapper objectMapper : objectMappers) {
ObjectMappers mappers = fullPathObjectMappers.get(objectMapper.fullPath());
@ -309,7 +310,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
}
private void addFieldMappers(FieldMapper[] fieldMappers) {
synchronized (mutex) {
synchronized (mappersMutex) {
MapBuilder<String, FieldMappers> nameFieldMappers = newMapBuilder(this.nameFieldMappers);
MapBuilder<String, FieldMappers> indexNameFieldMappers = newMapBuilder(this.indexNameFieldMappers);
MapBuilder<String, FieldMappers> fullNameFieldMappers = newMapBuilder(this.fullNameFieldMappers);
@ -348,7 +349,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
}
public void remove(String type) {
synchronized (mutex) {
synchronized (typeMutex) {
DocumentMapper docMapper = mappers.get(type);
if (docMapper == null) {
return;
@ -363,60 +364,62 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
}
private void removeObjectAndFieldMappers(DocumentMapper docMapper) {
// we need to remove those mappers
MapBuilder<String, FieldMappers> nameFieldMappers = newMapBuilder(this.nameFieldMappers);
MapBuilder<String, FieldMappers> indexNameFieldMappers = newMapBuilder(this.indexNameFieldMappers);
MapBuilder<String, FieldMappers> fullNameFieldMappers = newMapBuilder(this.fullNameFieldMappers);
synchronized (mappersMutex) {
// we need to remove those mappers
MapBuilder<String, FieldMappers> nameFieldMappers = newMapBuilder(this.nameFieldMappers);
MapBuilder<String, FieldMappers> indexNameFieldMappers = newMapBuilder(this.indexNameFieldMappers);
MapBuilder<String, FieldMappers> fullNameFieldMappers = newMapBuilder(this.fullNameFieldMappers);
for (FieldMapper mapper : docMapper.mappers()) {
FieldMappers mappers = nameFieldMappers.get(mapper.names().name());
if (mappers != null) {
mappers = mappers.remove(mapper);
if (mappers.isEmpty()) {
nameFieldMappers.remove(mapper.names().name());
} else {
nameFieldMappers.put(mapper.names().name(), mappers);
for (FieldMapper mapper : docMapper.mappers()) {
FieldMappers mappers = nameFieldMappers.get(mapper.names().name());
if (mappers != null) {
mappers = mappers.remove(mapper);
if (mappers.isEmpty()) {
nameFieldMappers.remove(mapper.names().name());
} else {
nameFieldMappers.put(mapper.names().name(), mappers);
}
}
mappers = indexNameFieldMappers.get(mapper.names().indexName());
if (mappers != null) {
mappers = mappers.remove(mapper);
if (mappers.isEmpty()) {
indexNameFieldMappers.remove(mapper.names().indexName());
} else {
indexNameFieldMappers.put(mapper.names().indexName(), mappers);
}
}
mappers = fullNameFieldMappers.get(mapper.names().fullName());
if (mappers != null) {
mappers = mappers.remove(mapper);
if (mappers.isEmpty()) {
fullNameFieldMappers.remove(mapper.names().fullName());
} else {
fullNameFieldMappers.put(mapper.names().fullName(), mappers);
}
}
}
this.nameFieldMappers = nameFieldMappers.map();
this.indexNameFieldMappers = indexNameFieldMappers.map();
this.fullNameFieldMappers = fullNameFieldMappers.map();
MapBuilder<String, ObjectMappers> fullPathObjectMappers = newMapBuilder(this.fullPathObjectMappers);
for (ObjectMapper mapper : docMapper.objectMappers().values()) {
ObjectMappers mappers = fullPathObjectMappers.get(mapper.fullPath());
if (mappers != null) {
mappers = mappers.remove(mapper);
if (mappers.isEmpty()) {
fullPathObjectMappers.remove(mapper.fullPath());
} else {
fullPathObjectMappers.put(mapper.fullPath(), mappers);
}
}
}
mappers = indexNameFieldMappers.get(mapper.names().indexName());
if (mappers != null) {
mappers = mappers.remove(mapper);
if (mappers.isEmpty()) {
indexNameFieldMappers.remove(mapper.names().indexName());
} else {
indexNameFieldMappers.put(mapper.names().indexName(), mappers);
}
}
mappers = fullNameFieldMappers.get(mapper.names().fullName());
if (mappers != null) {
mappers = mappers.remove(mapper);
if (mappers.isEmpty()) {
fullNameFieldMappers.remove(mapper.names().fullName());
} else {
fullNameFieldMappers.put(mapper.names().fullName(), mappers);
}
}
this.fullPathObjectMappers = fullPathObjectMappers.map();
}
this.nameFieldMappers = nameFieldMappers.map();
this.indexNameFieldMappers = indexNameFieldMappers.map();
this.fullNameFieldMappers = fullNameFieldMappers.map();
MapBuilder<String, ObjectMappers> fullPathObjectMappers = newMapBuilder(this.fullPathObjectMappers);
for (ObjectMapper mapper : docMapper.objectMappers().values()) {
ObjectMappers mappers = fullPathObjectMappers.get(mapper.fullPath());
if (mappers != null) {
mappers = mappers.remove(mapper);
if (mappers.isEmpty()) {
fullPathObjectMappers.remove(mapper.fullPath());
} else {
fullPathObjectMappers.put(mapper.fullPath(), mappers);
}
}
}
this.fullPathObjectMappers = fullPathObjectMappers.map();
}
/**
@ -457,7 +460,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
throw new TypeMissingException(index, type, "trying to auto create mapping, but dynamic mapping is disabled");
}
// go ahead and dynamically create it
synchronized (mutex) {
synchronized (typeMutex) {
mapper = mappers.get(type);
if (mapper != null) {
return mapper;

View File

@ -73,6 +73,7 @@ public class ParseContext {
private Map<String, String> ignoredValues = new HashMap<String, String>();
private boolean mappingsModified = false;
private boolean withinNewMapper = false;
private boolean externalValueSet;
@ -82,9 +83,6 @@ public class ParseContext {
private float docBoost = 1.0f;
private FieldMapperListener.Aggregator newFieldMappers = new FieldMapperListener.Aggregator();
private ObjectMapperListener.Aggregator newObjectMappers = new ObjectMapperListener.Aggregator();
public ParseContext(String index, @Nullable Settings indexSettings, DocumentMapperParser docMapperParser, DocumentMapper docMapper, ContentPath path) {
this.index = index;
this.indexSettings = indexSettings;
@ -110,26 +108,17 @@ public class ParseContext {
this.source = source == null ? null : sourceToParse.source();
this.path.reset();
this.mappingsModified = false;
this.withinNewMapper = false;
this.listener = listener == null ? DocumentMapper.ParseListener.EMPTY : listener;
this.allEntries = new AllEntries();
this.ignoredValues.clear();
this.docBoost = 1.0f;
this.newFieldMappers.mappers.clear();
this.newObjectMappers.mappers.clear();
}
public boolean flyweight() {
return sourceToParse.flyweight();
}
public FieldMapperListener.Aggregator newFieldMappers() {
return newFieldMappers;
}
public ObjectMapperListener.Aggregator newObjectMappers() {
return newObjectMappers;
}
public DocumentMapperParser docMapperParser() {
return this.docMapperParser;
}
@ -142,6 +131,18 @@ public class ParseContext {
this.mappingsModified = true;
}
public void setWithinNewMapper() {
this.withinNewMapper = true;
}
public void clearWithinNewMapper() {
this.withinNewMapper = false;
}
public boolean isWithinNewMapper() {
return withinNewMapper;
}
public String index() {
return this.index;
}

View File

@ -518,11 +518,9 @@ public class ObjectMapper implements Mapper, AllFieldMapper.IncludeInAll {
} else if (dynamic == Dynamic.TRUE) {
// we sync here just so we won't add it twice. Its not the end of the world
// to sync here since next operations will get it before
boolean newMapper = false;
synchronized (mutex) {
objectMapper = mappers.get(currentFieldName);
if (objectMapper == null) {
newMapper = true;
// remove the current field name from path, since template search and the object builder add it as well...
context.path().remove();
Mapper.Builder builder = context.root().findTemplateBuilder(context, currentFieldName, "object");
@ -535,21 +533,38 @@ public class ObjectMapper implements Mapper, AllFieldMapper.IncludeInAll {
}
BuilderContext builderContext = new BuilderContext(context.indexSettings(), context.path());
objectMapper = builder.build(builderContext);
putMapper(objectMapper);
// ...now re add it
context.path().add(currentFieldName);
context.setMappingsModified();
if (context.isWithinNewMapper()) {
// within a new mapper, no need to traverse, just parse
objectMapper.parse(context);
} else {
// create a context of new mapper, so we batch aggregate all the changes within
// this object mapper once, and traverse all of them to add them in a single go
context.setWithinNewMapper();
try {
objectMapper.parse(context);
FieldMapperListener.Aggregator newFields = new FieldMapperListener.Aggregator();
ObjectMapperListener.Aggregator newObjects = new ObjectMapperListener.Aggregator();
objectMapper.traverse(newFields);
objectMapper.traverse(newObjects);
// callback on adding those fields!
context.docMapper().addFieldMappers(newFields.mappers);
context.docMapper().addObjectMappers(newObjects.mappers);
} finally {
context.clearWithinNewMapper();
}
}
// only put after we traversed and did the callbacks, so other parsing won't see it only after we
// properly traversed it and adding the mappers
putMapper(objectMapper);
} else {
objectMapper.parse(context);
}
}
// traverse and parse outside of the mutex
if (newMapper) {
// we need to traverse in case we have a dynamic template and need to add field mappers
// introduced by it
objectMapper.traverse(context.newFieldMappers());
objectMapper.traverse(context.newObjectMappers());
}
// now, parse it
objectMapper.parse(context);
} else {
// not dynamic, read everything up to end object
context.parser().skipChildren();
@ -607,11 +622,9 @@ public class ObjectMapper implements Mapper, AllFieldMapper.IncludeInAll {
// we sync here since we don't want to add this field twice to the document mapper
// its not the end of the world, since we add it to the mappers once we create it
// so next time we won't even get here for this field
boolean newMapper = false;
synchronized (mutex) {
mapper = mappers.get(currentFieldName);
if (mapper == null) {
newMapper = true;
BuilderContext builderContext = new BuilderContext(context.indexSettings(), context.path());
if (token == XContentParser.Token.VALUE_STRING) {
boolean resolved = false;
@ -765,14 +778,29 @@ public class ObjectMapper implements Mapper, AllFieldMapper.IncludeInAll {
throw new ElasticSearchIllegalStateException("Can't handle serializing a dynamic type with content token [" + token + "] and field name [" + currentFieldName + "]");
}
}
if (context.isWithinNewMapper()) {
mapper.parse(context);
} else {
context.setWithinNewMapper();
try {
mapper.parse(context);
FieldMapperListener.Aggregator newFields = new FieldMapperListener.Aggregator();
mapper.traverse(newFields);
context.docMapper().addFieldMappers(newFields.mappers);
} finally {
context.clearWithinNewMapper();
}
}
// only put after we traversed and did the callbacks, so other parsing won't see it only after we
// properly traversed it and adding the mappers
putMapper(mapper);
context.setMappingsModified();
} else {
mapper.parse(context);
}
}
if (newMapper) {
mapper.traverse(context.newFieldMappers());
}
mapper.parse(context);
}
@Override

View File

@ -0,0 +1,91 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.indices.mapping;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import static org.hamcrest.Matchers.emptyIterable;
public class ConcurrentDynamicTemplateTests extends AbstractSharedClusterTest {
private final String mappingType = "test-mapping";
@Test // see #3544
public void testConcurrentDynamicMapping() throws Exception {
final String mapping = "{" + mappingType + ": {" + "\"properties\": {" + "\"an_id\": {"
+ "\"type\": \"string\"," + "\"store\": \"yes\"," + "\"index\": \"not_analyzed\"" + "}" + "}," + "\"dynamic_templates\": ["
+ "{" + "\"participants\": {" + "\"path_match\": \"*\"," + "\"mapping\": {" + "\"type\": \"string\"," + "\"store\": \"yes\","
+ "\"index\": \"analyzed\"," + "\"analyzer\": \"whitespace\"" + "}" + "}" + "}" + "]" + "}" + "}";
// The 'fieldNames' array is used to help with retrieval of index terms
// after testing
final String fieldName = "participants.ACCEPTED";
int iters = atLeast(5);
for (int i = 0; i < iters; i++) {
wipeIndex("test");
client().admin().indices().prepareCreate("test").addMapping(mappingType, mapping).execute().actionGet();
ensureYellow();
int numDocs = atLeast(5);
final CountDownLatch latch = new CountDownLatch(numDocs);
final List<Throwable> throwable = new CopyOnWriteArrayList<Throwable>();
for (int j = 0; j < numDocs; j++) {
Map<String, Object> source = new HashMap<String, Object>();
source.put("an_id", UUID.randomUUID().toString());
source.put(fieldName, "test-user");
client().prepareIndex("test", mappingType).setSource(source).setConsistencyLevel(WriteConsistencyLevel.QUORUM).execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
latch.countDown();
}
@Override
public void onFailure(Throwable e) {
throwable.add(e);
latch.countDown();
}
});
}
latch.await();
assertThat(throwable, emptyIterable());
refresh();
MatchQueryBuilder builder = QueryBuilders.matchQuery(fieldName, "test-user");
SearchHits sh = client().prepareSearch("test").setQuery(builder).execute().actionGet().getHits();
assertEquals(sh.getTotalHits(), numDocs);
assertEquals(client().prepareSearch("test").setQuery(QueryBuilders.matchQuery(fieldName, "test user")).execute().actionGet().getHits().getTotalHits(), 0);
}
}
}