From c8f39dd2b94694d81b4abb11a0b369306014f50e Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 21 May 2015 11:50:35 +0200 Subject: [PATCH] Use ReleaseableLock and try to add a test for it.. --- .../index/mapper/DocumentMapper.java | 27 +++---- .../index/mapper/DocumentParser.java | 10 +-- .../index/mapper/MapperService.java | 12 +-- .../mapper/merge/TestMergeMapperTests.java | 74 ++++++++++++++++++- 4 files changed, 93 insertions(+), 30 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java b/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java index eb11dcc1859..f4fc20a9859 100644 --- a/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java +++ b/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.StringAndBytesText; import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -164,6 +165,7 @@ public class DocumentMapper implements ToXContent { private final Query typeFilter; + private final ReleasableLock mappingWriteLock; private final ReentrantReadWriteLock mappingLock; public DocumentMapper(MapperService mapperService, String index, @Nullable Settings indexSettings, DocumentMapperParser docMapperParser, @@ -181,9 +183,10 @@ public class DocumentMapper implements ToXContent { rootMappers.values().toArray(new RootMapper[rootMappers.values().size()]), sourceTransforms.toArray(new SourceTransform[sourceTransforms.size()]), meta); - this.documentParser = new DocumentParser(index, indexSettings, docMapperParser, this, mappingLock.readLock()); + this.documentParser = new DocumentParser(index, indexSettings, docMapperParser, this, new ReleasableLock(mappingLock.readLock())); this.typeFilter = typeMapper().termQuery(type, null); + this.mappingWriteLock = new ReleasableLock(mappingLock.writeLock()); this.mappingLock = mappingLock; if (rootMapper(ParentFieldMapper.class).active()) { @@ -391,16 +394,17 @@ public class DocumentMapper implements ToXContent { } private void addObjectMappers(Collection objectMappers) { - assert mappingLock.isWriteLockedByCurrentThread(); - MapBuilder builder = MapBuilder.newMapBuilder(this.objectMappers); - for (ObjectMapper objectMapper : objectMappers) { - builder.put(objectMapper.fullPath(), objectMapper); - if (objectMapper.nested().isNested()) { - hasNestedObjects = true; + try (ReleasableLock lock = mappingWriteLock.acquire()) { + MapBuilder builder = MapBuilder.newMapBuilder(this.objectMappers); + for (ObjectMapper objectMapper : objectMappers) { + builder.put(objectMapper.fullPath(), objectMapper); + if (objectMapper.nested().isNested()) { + hasNestedObjects = true; + } } + this.objectMappers = builder.immutableMap(); + mapperService.addObjectMappers(objectMappers); } - this.objectMappers = builder.immutableMap(); - mapperService.addObjectMappers(objectMappers); } private MergeResult newMergeContext(boolean simulate) { @@ -451,8 +455,7 @@ public class DocumentMapper implements ToXContent { } public MergeResult merge(Mapping mapping, boolean simulate) { - mappingLock.writeLock().lock(); - try { + try (ReleasableLock lock = mappingWriteLock.acquire()) { final MergeResult mergeResult = newMergeContext(simulate); this.mapping.merge(mapping, mergeResult); if (simulate == false) { @@ -461,8 +464,6 @@ public class DocumentMapper implements ToXContent { refreshSource(); } return mergeResult; - } finally { - mappingLock.writeLock().unlock(); } } diff --git a/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index abdb4e48805..c2a44d4b08b 100644 --- a/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.joda.FormatDateTimeFormatter; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; @@ -61,9 +62,9 @@ class DocumentParser implements Closeable { private final Settings indexSettings; private final DocumentMapperParser docMapperParser; private final DocumentMapper docMapper; - private final Lock parseLock; + private final ReleasableLock parseLock; - public DocumentParser(String index, Settings indexSettings, DocumentMapperParser docMapperParser, DocumentMapper docMapper, Lock parseLock) { + public DocumentParser(String index, Settings indexSettings, DocumentMapperParser docMapperParser, DocumentMapper docMapper, ReleasableLock parseLock) { this.index = index; this.indexSettings = indexSettings; this.docMapperParser = docMapperParser; @@ -72,11 +73,8 @@ class DocumentParser implements Closeable { } public ParsedDocument parseDocument(SourceToParse source) throws MapperParsingException { - parseLock.lock(); - try { + try (ReleasableLock lock = parseLock.acquire()){ return innerParseDocument(source); - } finally { - parseLock.unlock(); } } diff --git a/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/src/main/java/org/elasticsearch/index/mapper/MapperService.java index 8c36b3a7320..580f597e8c6 100755 --- a/src/main/java/org/elasticsearch/index/mapper/MapperService.java +++ b/src/main/java/org/elasticsearch/index/mapper/MapperService.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.Index; import org.elasticsearch.index.analysis.AnalysisService; @@ -100,6 +101,7 @@ public class MapperService extends AbstractIndexComponent { // under the write lock and read operations (document parsing) need to be // performed under the read lock final ReentrantReadWriteLock mappingLock = new ReentrantReadWriteLock(); + private final ReleasableLock mappingWriteLock = new ReleasableLock(mappingLock.writeLock()); private volatile FieldMappersLookup fieldMappers; private volatile ImmutableOpenMap fullPathObjectMappers = ImmutableOpenMap.of(); @@ -217,11 +219,8 @@ public class MapperService extends AbstractIndexComponent { DocumentMapper mapper = documentParser.parseCompressed(type, mappingSource); // still add it as a document mapper so we have it registered and, for example, persisted back into // the cluster meta data if needed, or checked for existence - mappingLock.writeLock().lock(); - try { + try (ReleasableLock lock = mappingWriteLock.acquire()) { mappers = newMapBuilder(mappers).put(type, mapper).map(); - } finally { - mappingLock.writeLock().unlock(); } try { defaultMappingSource = mappingSource.string(); @@ -237,8 +236,7 @@ public class MapperService extends AbstractIndexComponent { // never expose this to the outside world, we need to reparse the doc mapper so we get fresh // instances of field mappers to properly remove existing doc mapper private DocumentMapper merge(DocumentMapper mapper) { - mappingLock.writeLock().lock(); - try { + try (ReleasableLock lock = mappingWriteLock.acquire()) { if (mapper.type().length() == 0) { throw new InvalidTypeNameException("mapping type name is empty"); } @@ -287,8 +285,6 @@ public class MapperService extends AbstractIndexComponent { mappers = newMapBuilder(mappers).put(mapper.type(), mapper).map(); return mapper; } - } finally { - mappingLock.writeLock().unlock(); } } diff --git a/src/test/java/org/elasticsearch/index/mapper/merge/TestMergeMapperTests.java b/src/test/java/org/elasticsearch/index/mapper/merge/TestMergeMapperTests.java index 7a73da835fd..c815bcca197 100644 --- a/src/test/java/org/elasticsearch/index/mapper/merge/TestMergeMapperTests.java +++ b/src/test/java/org/elasticsearch/index/mapper/merge/TestMergeMapperTests.java @@ -19,16 +19,26 @@ package org.elasticsearch.index.mapper.merge; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.analysis.FieldNameAnalyzer; import org.elasticsearch.index.analysis.NamedAnalyzer; -import org.elasticsearch.index.mapper.DocumentMapper; -import org.elasticsearch.index.mapper.DocumentMapperParser; -import org.elasticsearch.index.mapper.MergeResult; +import org.elasticsearch.index.mapper.*; import org.elasticsearch.index.mapper.core.StringFieldMapper; import org.elasticsearch.index.mapper.object.ObjectMapper; import org.elasticsearch.test.ElasticsearchSingleNodeTest; import org.junit.Test; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + import static org.hamcrest.Matchers.*; /** @@ -144,4 +154,62 @@ public class TestMergeMapperTests extends ElasticsearchSingleNodeTest { assertThat(((StringFieldMapper) (existing.mappers().getMapper("field"))).getIgnoreAbove(), equalTo(14)); } + public void testConcurrentMergeTest() throws IOException, BrokenBarrierException, InterruptedException { + final MapperService mapperService = createIndex("test").mapperService(); + final AtomicInteger counter = new AtomicInteger(0); + Tuple docMapper = mapperService.documentMapperWithAutoCreate("test"); + final DocumentMapper documentMapper = docMapper.v1(); + int id = counter.incrementAndGet(); + ParsedDocument doc = documentMapper.parse("test", Integer.toString(id), new BytesArray("{ \"test_field_" + id + "\" : \"test\" }")); + if (docMapper.v2() != null) { + doc.addDynamicMappingsUpdate(docMapper.v2()); + } + Mapping mapping = doc.dynamicMappingsUpdate(); + mapperService.merge("test", new CompressedString(mapping.toString()), false); + try { + int nextID = counter.get() + 1; + DocumentFieldMappers mappers = mapperService.documentMapper("test").mappers(); + FieldMapper mapper = mappers.getMapper("test_field_" + nextID); + assertNull(mapper); + ((FieldNameAnalyzer)mappers.indexAnalyzer()).getWrappedAnalyzer("test_field_" + nextID); + fail("field not there yet"); + } catch (IllegalArgumentException ex) { + assertEquals(ex.getMessage(), "Field [test_field_2] has no associated analyzer"); + } + final AtomicBoolean stopped = new AtomicBoolean(false); + final CyclicBarrier barrier = new CyclicBarrier(2); + final Thread updater = new Thread() { + @Override + public void run() { + try { + barrier.await(); + for (int i = 0; i < 10000; i++) { + Tuple docMapper = mapperService.documentMapperWithAutoCreate("test"); + int id = counter.incrementAndGet(); + ParsedDocument doc = documentMapper.parse("test", Integer.toString(id), new BytesArray("{ \"test_field_" + id + "\" : \"test\" }")); + if (docMapper.v2() != null) { + doc.addDynamicMappingsUpdate(docMapper.v2()); + } + Mapping mapping = doc.dynamicMappingsUpdate(); + mapperService.merge("test", new CompressedString(mapping.toString()), false); + } + } catch (Exception ex) { + + } finally { + stopped.set(true); + } + } + }; + updater.start(); + barrier.await(); + while(stopped.get() == false) { + List newObjectMappers = new ArrayList<>(); + List> newFieldMappers = new ArrayList<>(); + MapperUtils.collect(mapperService.documentMapper("test").root(), newObjectMappers, newFieldMappers); + DocumentFieldMappers dfm = mapperService.documentMapper("test").mappers(); + for (FieldMapper fieldMapper : newFieldMappers) { + ((FieldNameAnalyzer) dfm.indexAnalyzer()).getWrappedAnalyzer(fieldMapper.name()); + } + } + } }