Mappings: Make mapping updates atomic wrt document parsing.

When mapping updates happen concurrently with document parsing, bad things can
happen. For instance, when applying a mapping update we first update the Mapping
object which is used for parsing and then FieldNameAnalyzer which is used by
IndexWriter for analysis. So if you are unlucky, it could happen that a document
was parsed successfully without introducing dynamic updates yet IndexWriter does
not see its analyzer yet.

In order to fix this issue, mapping updates are now protected by a write lock
and document parsing is protected by the read lock associated with this write
lock. This ensures that no documents will be parsed while a mapping update is
being applied, so document parsing will either see none of the update or all of
it.
This commit is contained in:
Adrien Grand 2015-05-18 16:05:26 +02:00
parent 6b63ea49c2
commit a98d78b3ae
4 changed files with 80 additions and 58 deletions

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSet;
@ -75,7 +76,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
*
@ -140,7 +141,7 @@ public class DocumentMapper implements ToXContent {
public DocumentMapper build(MapperService mapperService, DocumentMapperParser docMapperParser) {
Preconditions.checkNotNull(rootObjectMapper, "Mapper builder must have the root object mapper set");
return new DocumentMapper(mapperService, index, indexSettings, docMapperParser, rootObjectMapper, meta, rootMappers, sourceTransforms);
return new DocumentMapper(mapperService, index, indexSettings, docMapperParser, rootObjectMapper, meta, rootMappers, sourceTransforms, mapperService.mappingLock);
}
}
@ -163,12 +164,14 @@ public class DocumentMapper implements ToXContent {
private final Query typeFilter;
private final Object mappersMutex = new Object();
private final ReentrantReadWriteLock mappingLock;
public DocumentMapper(MapperService mapperService, String index, @Nullable Settings indexSettings, DocumentMapperParser docMapperParser,
RootObjectMapper rootObjectMapper,
ImmutableMap<String, Object> meta,
Map<Class<? extends RootMapper>, RootMapper> rootMappers, List<SourceTransform> sourceTransforms) {
Map<Class<? extends RootMapper>, RootMapper> rootMappers,
List<SourceTransform> sourceTransforms,
ReentrantReadWriteLock mappingLock) {
this.mapperService = mapperService;
this.type = rootObjectMapper.name();
this.typeText = new StringAndBytesText(this.type);
@ -178,9 +181,10 @@ public class DocumentMapper implements ToXContent {
rootMappers.values().toArray(new RootMapper[rootMappers.values().size()]),
sourceTransforms.toArray(new SourceTransform[sourceTransforms.size()]),
meta);
this.documentParser = new DocumentParser(index, indexSettings, docMapperParser, this);
this.documentParser = new DocumentParser(index, indexSettings, docMapperParser, this, mappingLock.readLock());
this.typeFilter = typeMapper().termQuery(type, null);
this.mappingLock = mappingLock;
if (rootMapper(ParentFieldMapper.class).active()) {
// mark the routing field mapper as required
@ -310,10 +314,6 @@ public class DocumentMapper implements ToXContent {
return this.objectMappers;
}
public ParsedDocument parse(BytesReference source) throws MapperParsingException {
return parse(SourceToParse.source(source));
}
public ParsedDocument parse(String type, String id, BytesReference source) throws MapperParsingException {
return parse(SourceToParse.source(source).type(type).id(id));
}
@ -384,24 +384,22 @@ public class DocumentMapper implements ToXContent {
return DocumentParser.transformSourceAsMap(mapping, sourceAsMap);
}
public void addFieldMappers(Collection<FieldMapper<?>> fieldMappers) {
synchronized (mappersMutex) {
this.fieldMappers = this.fieldMappers.copyAndAllAll(fieldMappers);
}
private void addFieldMappers(Collection<FieldMapper<?>> fieldMappers) {
assert mappingLock.isWriteLockedByCurrentThread();
this.fieldMappers = this.fieldMappers.copyAndAllAll(fieldMappers);
mapperService.addFieldMappers(fieldMappers);
}
private void addObjectMappers(Collection<ObjectMapper> objectMappers) {
synchronized (mappersMutex) {
MapBuilder<String, ObjectMapper> builder = MapBuilder.newMapBuilder(this.objectMappers);
for (ObjectMapper objectMapper : objectMappers) {
builder.put(objectMapper.fullPath(), objectMapper);
if (objectMapper.nested().isNested()) {
hasNestedObjects = true;
}
assert mappingLock.isWriteLockedByCurrentThread();
MapBuilder<String, ObjectMapper> builder = MapBuilder.newMapBuilder(this.objectMappers);
for (ObjectMapper objectMapper : objectMappers) {
builder.put(objectMapper.fullPath(), objectMapper);
if (objectMapper.nested().isNested()) {
hasNestedObjects = true;
}
this.objectMappers = builder.immutableMap();
}
this.objectMappers = builder.immutableMap();
mapperService.addObjectMappers(objectMappers);
}
@ -452,15 +450,20 @@ public class DocumentMapper implements ToXContent {
};
}
public synchronized MergeResult merge(Mapping mapping, boolean simulate) {
final MergeResult mergeResult = newMergeContext(simulate);
this.mapping.merge(mapping, mergeResult);
if (simulate == false) {
addFieldMappers(mergeResult.getNewFieldMappers());
addObjectMappers(mergeResult.getNewObjectMappers());
refreshSource();
public MergeResult merge(Mapping mapping, boolean simulate) {
mappingLock.writeLock().lock();
try {
final MergeResult mergeResult = newMergeContext(simulate);
this.mapping.merge(mapping, mergeResult);
if (simulate == false) {
addFieldMappers(mergeResult.getNewFieldMappers());
addObjectMappers(mergeResult.getNewObjectMappers());
refreshSource();
}
return mergeResult;
} finally {
mappingLock.writeLock().unlock();
}
return mergeResult;
}
private void refreshSource() throws ElasticsearchGenerationException {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.mapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexableField;
@ -44,6 +45,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
/** A parser for documents, given mappings from a DocumentMapper */
class DocumentParser implements Closeable {
@ -55,19 +57,30 @@ class DocumentParser implements Closeable {
}
};
private String index;
private Settings indexSettings;
private DocumentMapperParser docMapperParser;
private DocumentMapper docMapper;
private final String index;
private final Settings indexSettings;
private final DocumentMapperParser docMapperParser;
private final DocumentMapper docMapper;
private final Lock parseLock;
public DocumentParser(String index, Settings indexSettings, DocumentMapperParser docMapperParser, DocumentMapper docMapper) {
public DocumentParser(String index, Settings indexSettings, DocumentMapperParser docMapperParser, DocumentMapper docMapper, Lock parseLock) {
this.index = index;
this.indexSettings = indexSettings;
this.docMapperParser = docMapperParser;
this.docMapper = docMapper;
this.parseLock = parseLock;
}
public ParsedDocument parseDocument(SourceToParse source) throws MapperParsingException {
parseLock.lock();
try {
return innerParseDocument(source);
} finally {
parseLock.unlock();
}
}
private ParsedDocument innerParseDocument(SourceToParse source) throws MapperParsingException {
ParseContext.InternalParseContext context = cache.get();
final Mapping mapping = docMapper.mapping();

View File

@ -65,11 +65,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
@ -96,8 +96,10 @@ public class MapperService extends AbstractIndexComponent {
private volatile Map<String, DocumentMapper> mappers = ImmutableMap.of();
private final Object typeMutex = new Object();
private final Object mappersMutex = new Object();
// A lock for mappings: modifications (put mapping) need to be performed
// under the write lock and read operations (document parsing) need to be
// performed under the read lock
final ReentrantReadWriteLock mappingLock = new ReentrantReadWriteLock();
private volatile FieldMappersLookup fieldMappers;
private volatile ImmutableOpenMap<String, ObjectMappers> fullPathObjectMappers = ImmutableOpenMap.of();
@ -215,8 +217,11 @@ public class MapperService extends AbstractIndexComponent {
DocumentMapper mapper = documentParser.parseCompressed(type, mappingSource);
// still add it as a document mapper so we have it registered and, for example, persisted back into
// the cluster meta data if needed, or checked for existence
synchronized (typeMutex) {
mappingLock.writeLock().lock();
try {
mappers = newMapBuilder(mappers).put(type, mapper).map();
} finally {
mappingLock.writeLock().unlock();
}
try {
defaultMappingSource = mappingSource.string();
@ -232,7 +237,8 @@ public class MapperService extends AbstractIndexComponent {
// never expose this to the outside world, we need to reparse the doc mapper so we get fresh
// instances of field mappers to properly remove existing doc mapper
private DocumentMapper merge(DocumentMapper mapper) {
synchronized (typeMutex) {
mappingLock.writeLock().lock();
try {
if (mapper.type().length() == 0) {
throw new InvalidTypeNameException("mapping type name is empty");
}
@ -281,33 +287,33 @@ public class MapperService extends AbstractIndexComponent {
mappers = newMapBuilder(mappers).put(mapper.type(), mapper).map();
return mapper;
}
} finally {
mappingLock.writeLock().unlock();
}
}
protected void addObjectMappers(Collection<ObjectMapper> objectMappers) {
synchronized (mappersMutex) {
ImmutableOpenMap.Builder<String, ObjectMappers> fullPathObjectMappers = ImmutableOpenMap.builder(this.fullPathObjectMappers);
for (ObjectMapper objectMapper : objectMappers) {
ObjectMappers mappers = fullPathObjectMappers.get(objectMapper.fullPath());
if (mappers == null) {
mappers = new ObjectMappers(objectMapper);
} else {
mappers = mappers.concat(objectMapper);
}
fullPathObjectMappers.put(objectMapper.fullPath(), mappers);
// update the hasNested flag
if (objectMapper.nested().isNested()) {
hasNested = true;
}
assert mappingLock.isWriteLockedByCurrentThread();
ImmutableOpenMap.Builder<String, ObjectMappers> fullPathObjectMappers = ImmutableOpenMap.builder(this.fullPathObjectMappers);
for (ObjectMapper objectMapper : objectMappers) {
ObjectMappers mappers = fullPathObjectMappers.get(objectMapper.fullPath());
if (mappers == null) {
mappers = new ObjectMappers(objectMapper);
} else {
mappers = mappers.concat(objectMapper);
}
fullPathObjectMappers.put(objectMapper.fullPath(), mappers);
// update the hasNested flag
if (objectMapper.nested().isNested()) {
hasNested = true;
}
this.fullPathObjectMappers = fullPathObjectMappers.build();
}
this.fullPathObjectMappers = fullPathObjectMappers.build();
}
protected void addFieldMappers(Collection<FieldMapper<?>> fieldMappers) {
synchronized (mappersMutex) {
this.fieldMappers = this.fieldMappers.copyAndAddAll(fieldMappers);
}
assert mappingLock.isWriteLockedByCurrentThread();
this.fieldMappers = this.fieldMappers.copyAndAddAll(fieldMappers);
}
public DocumentMapper parse(String mappingType, CompressedString mappingSource, boolean applyDefault) throws MapperParsingException {

View File

@ -1830,7 +1830,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
MapperService mapperService = new MapperService(index, settings, analysisService, null, similarityLookupService, null);
DocumentMapper.Builder b = new DocumentMapper.Builder(indexName, settings, rootBuilder);
DocumentMapperParser parser = new DocumentMapperParser(index, settings, mapperService, analysisService, similarityLookupService, null);
this.docMapper = b.build(null, parser);
this.docMapper = b.build(mapperService, parser);
}