From a98d78b3ae371c5b496d6cd851285c13d8f89c02 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Mon, 18 May 2015 16:05:26 +0200 Subject: [PATCH] Mappings: Make mapping updates atomic wrt document parsing. When mapping updates happen concurrently with document parsing, bad things can happen. For instance, when applying a mapping update we first update the Mapping object which is used for parsing and then FieldNameAnalyzer which is used by IndexWriter for analysis. So if you are unlucky, it could happen that a document was parsed successfully without introducing dynamic updates yet IndexWriter does not see its analyzer yet. In order to fix this issue, mapping updates are now protected by a write lock and document parsing is protected by the read lock associated with this write lock. This ensures that no documents will be parsed while a mapping update is being applied, so document parsing will either see none of the update or all of it. --- .../index/mapper/DocumentMapper.java | 61 ++++++++++--------- .../index/mapper/DocumentParser.java | 23 +++++-- .../index/mapper/MapperService.java | 52 +++++++++------- .../index/engine/InternalEngineTests.java | 2 +- 4 files changed, 80 insertions(+), 58 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java b/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java index 0717da2b8c1..eb11dcc1859 100644 --- a/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java +++ b/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; + import org.apache.lucene.document.Field; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.DocIdSet; @@ -75,7 +76,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @@ -140,7 +141,7 @@ public class DocumentMapper implements ToXContent { public DocumentMapper build(MapperService mapperService, DocumentMapperParser docMapperParser) { Preconditions.checkNotNull(rootObjectMapper, "Mapper builder must have the root object mapper set"); - return new DocumentMapper(mapperService, index, indexSettings, docMapperParser, rootObjectMapper, meta, rootMappers, sourceTransforms); + return new DocumentMapper(mapperService, index, indexSettings, docMapperParser, rootObjectMapper, meta, rootMappers, sourceTransforms, mapperService.mappingLock); } } @@ -163,12 +164,14 @@ public class DocumentMapper implements ToXContent { private final Query typeFilter; - private final Object mappersMutex = new Object(); + private final ReentrantReadWriteLock mappingLock; public DocumentMapper(MapperService mapperService, String index, @Nullable Settings indexSettings, DocumentMapperParser docMapperParser, RootObjectMapper rootObjectMapper, ImmutableMap meta, - Map, RootMapper> rootMappers, List sourceTransforms) { + Map, RootMapper> rootMappers, + List sourceTransforms, + ReentrantReadWriteLock mappingLock) { this.mapperService = mapperService; this.type = rootObjectMapper.name(); this.typeText = new StringAndBytesText(this.type); @@ -178,9 +181,10 @@ public class DocumentMapper implements ToXContent { rootMappers.values().toArray(new RootMapper[rootMappers.values().size()]), sourceTransforms.toArray(new SourceTransform[sourceTransforms.size()]), meta); - this.documentParser = new DocumentParser(index, indexSettings, docMapperParser, this); + this.documentParser = new DocumentParser(index, indexSettings, docMapperParser, this, mappingLock.readLock()); this.typeFilter = typeMapper().termQuery(type, null); + this.mappingLock = mappingLock; if (rootMapper(ParentFieldMapper.class).active()) { // mark the routing field mapper as required @@ -310,10 +314,6 @@ public class DocumentMapper implements ToXContent { return this.objectMappers; } - public ParsedDocument parse(BytesReference source) throws MapperParsingException { - return parse(SourceToParse.source(source)); - } - public ParsedDocument parse(String type, String id, BytesReference source) throws MapperParsingException { return parse(SourceToParse.source(source).type(type).id(id)); } @@ -384,24 +384,22 @@ public class DocumentMapper implements ToXContent { return DocumentParser.transformSourceAsMap(mapping, sourceAsMap); } - public void addFieldMappers(Collection> fieldMappers) { - synchronized (mappersMutex) { - this.fieldMappers = this.fieldMappers.copyAndAllAll(fieldMappers); - } + private void addFieldMappers(Collection> fieldMappers) { + assert mappingLock.isWriteLockedByCurrentThread(); + this.fieldMappers = this.fieldMappers.copyAndAllAll(fieldMappers); mapperService.addFieldMappers(fieldMappers); } private void addObjectMappers(Collection objectMappers) { - synchronized (mappersMutex) { - MapBuilder builder = MapBuilder.newMapBuilder(this.objectMappers); - for (ObjectMapper objectMapper : objectMappers) { - builder.put(objectMapper.fullPath(), objectMapper); - if (objectMapper.nested().isNested()) { - hasNestedObjects = true; - } + assert mappingLock.isWriteLockedByCurrentThread(); + MapBuilder builder = MapBuilder.newMapBuilder(this.objectMappers); + for (ObjectMapper objectMapper : objectMappers) { + builder.put(objectMapper.fullPath(), objectMapper); + if (objectMapper.nested().isNested()) { + hasNestedObjects = true; } - this.objectMappers = builder.immutableMap(); } + this.objectMappers = builder.immutableMap(); mapperService.addObjectMappers(objectMappers); } @@ -452,15 +450,20 @@ public class DocumentMapper implements ToXContent { }; } - public synchronized MergeResult merge(Mapping mapping, boolean simulate) { - final MergeResult mergeResult = newMergeContext(simulate); - this.mapping.merge(mapping, mergeResult); - if (simulate == false) { - addFieldMappers(mergeResult.getNewFieldMappers()); - addObjectMappers(mergeResult.getNewObjectMappers()); - refreshSource(); + public MergeResult merge(Mapping mapping, boolean simulate) { + mappingLock.writeLock().lock(); + try { + final MergeResult mergeResult = newMergeContext(simulate); + this.mapping.merge(mapping, mergeResult); + if (simulate == false) { + addFieldMappers(mergeResult.getNewFieldMappers()); + addObjectMappers(mergeResult.getNewObjectMappers()); + refreshSource(); + } + return mergeResult; + } finally { + mappingLock.writeLock().unlock(); } - return mergeResult; } private void refreshSource() throws ElasticsearchGenerationException { diff --git a/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index 59210167f80..abdb4e48805 100644 --- a/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.mapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; + import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexableField; @@ -44,6 +45,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.Lock; /** A parser for documents, given mappings from a DocumentMapper */ class DocumentParser implements Closeable { @@ -55,19 +57,30 @@ class DocumentParser implements Closeable { } }; - private String index; - private Settings indexSettings; - private DocumentMapperParser docMapperParser; - private DocumentMapper docMapper; + private final String index; + private final Settings indexSettings; + private final DocumentMapperParser docMapperParser; + private final DocumentMapper docMapper; + private final Lock parseLock; - public DocumentParser(String index, Settings indexSettings, DocumentMapperParser docMapperParser, DocumentMapper docMapper) { + public DocumentParser(String index, Settings indexSettings, DocumentMapperParser docMapperParser, DocumentMapper docMapper, Lock parseLock) { this.index = index; this.indexSettings = indexSettings; this.docMapperParser = docMapperParser; this.docMapper = docMapper; + this.parseLock = parseLock; } public ParsedDocument parseDocument(SourceToParse source) throws MapperParsingException { + parseLock.lock(); + try { + return innerParseDocument(source); + } finally { + parseLock.unlock(); + } + } + + private ParsedDocument innerParseDocument(SourceToParse source) throws MapperParsingException { ParseContext.InternalParseContext context = cache.get(); final Mapping mapping = docMapper.mapping(); diff --git a/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/src/main/java/org/elasticsearch/index/mapper/MapperService.java index fbaf6bbc885..8c36b3a7320 100755 --- a/src/main/java/org/elasticsearch/index/mapper/MapperService.java +++ b/src/main/java/org/elasticsearch/index/mapper/MapperService.java @@ -65,11 +65,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; @@ -96,8 +96,10 @@ public class MapperService extends AbstractIndexComponent { private volatile Map mappers = ImmutableMap.of(); - private final Object typeMutex = new Object(); - private final Object mappersMutex = new Object(); + // A lock for mappings: modifications (put mapping) need to be performed + // under the write lock and read operations (document parsing) need to be + // performed under the read lock + final ReentrantReadWriteLock mappingLock = new ReentrantReadWriteLock(); private volatile FieldMappersLookup fieldMappers; private volatile ImmutableOpenMap fullPathObjectMappers = ImmutableOpenMap.of(); @@ -215,8 +217,11 @@ public class MapperService extends AbstractIndexComponent { DocumentMapper mapper = documentParser.parseCompressed(type, mappingSource); // still add it as a document mapper so we have it registered and, for example, persisted back into // the cluster meta data if needed, or checked for existence - synchronized (typeMutex) { + mappingLock.writeLock().lock(); + try { mappers = newMapBuilder(mappers).put(type, mapper).map(); + } finally { + mappingLock.writeLock().unlock(); } try { defaultMappingSource = mappingSource.string(); @@ -232,7 +237,8 @@ public class MapperService extends AbstractIndexComponent { // never expose this to the outside world, we need to reparse the doc mapper so we get fresh // instances of field mappers to properly remove existing doc mapper private DocumentMapper merge(DocumentMapper mapper) { - synchronized (typeMutex) { + mappingLock.writeLock().lock(); + try { if (mapper.type().length() == 0) { throw new InvalidTypeNameException("mapping type name is empty"); } @@ -281,33 +287,33 @@ public class MapperService extends AbstractIndexComponent { mappers = newMapBuilder(mappers).put(mapper.type(), mapper).map(); return mapper; } + } finally { + mappingLock.writeLock().unlock(); } } protected void addObjectMappers(Collection objectMappers) { - synchronized (mappersMutex) { - ImmutableOpenMap.Builder fullPathObjectMappers = ImmutableOpenMap.builder(this.fullPathObjectMappers); - for (ObjectMapper objectMapper : objectMappers) { - ObjectMappers mappers = fullPathObjectMappers.get(objectMapper.fullPath()); - if (mappers == null) { - mappers = new ObjectMappers(objectMapper); - } else { - mappers = mappers.concat(objectMapper); - } - fullPathObjectMappers.put(objectMapper.fullPath(), mappers); - // update the hasNested flag - if (objectMapper.nested().isNested()) { - hasNested = true; - } + assert mappingLock.isWriteLockedByCurrentThread(); + ImmutableOpenMap.Builder fullPathObjectMappers = ImmutableOpenMap.builder(this.fullPathObjectMappers); + for (ObjectMapper objectMapper : objectMappers) { + ObjectMappers mappers = fullPathObjectMappers.get(objectMapper.fullPath()); + if (mappers == null) { + mappers = new ObjectMappers(objectMapper); + } else { + mappers = mappers.concat(objectMapper); + } + fullPathObjectMappers.put(objectMapper.fullPath(), mappers); + // update the hasNested flag + if (objectMapper.nested().isNested()) { + hasNested = true; } - this.fullPathObjectMappers = fullPathObjectMappers.build(); } + this.fullPathObjectMappers = fullPathObjectMappers.build(); } protected void addFieldMappers(Collection> fieldMappers) { - synchronized (mappersMutex) { - this.fieldMappers = this.fieldMappers.copyAndAddAll(fieldMappers); - } + assert mappingLock.isWriteLockedByCurrentThread(); + this.fieldMappers = this.fieldMappers.copyAndAddAll(fieldMappers); } public DocumentMapper parse(String mappingType, CompressedString mappingSource, boolean applyDefault) throws MapperParsingException { diff --git a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index fa8c0f3be70..96988646016 100644 --- a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1830,7 +1830,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { MapperService mapperService = new MapperService(index, settings, analysisService, null, similarityLookupService, null); DocumentMapper.Builder b = new DocumentMapper.Builder(indexName, settings, rootBuilder); DocumentMapperParser parser = new DocumentMapperParser(index, settings, mapperService, analysisService, similarityLookupService, null); - this.docMapper = b.build(null, parser); + this.docMapper = b.build(mapperService, parser); }