Use ReleaseableLock and try to add a test for it..
This commit is contained in:
parent
a98d78b3ae
commit
c8f39dd2b9
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.text.StringAndBytesText;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
|
@ -164,6 +165,7 @@ public class DocumentMapper implements ToXContent {
|
|||
|
||||
private final Query typeFilter;
|
||||
|
||||
private final ReleasableLock mappingWriteLock;
|
||||
private final ReentrantReadWriteLock mappingLock;
|
||||
|
||||
public DocumentMapper(MapperService mapperService, String index, @Nullable Settings indexSettings, DocumentMapperParser docMapperParser,
|
||||
|
@ -181,9 +183,10 @@ public class DocumentMapper implements ToXContent {
|
|||
rootMappers.values().toArray(new RootMapper[rootMappers.values().size()]),
|
||||
sourceTransforms.toArray(new SourceTransform[sourceTransforms.size()]),
|
||||
meta);
|
||||
this.documentParser = new DocumentParser(index, indexSettings, docMapperParser, this, mappingLock.readLock());
|
||||
this.documentParser = new DocumentParser(index, indexSettings, docMapperParser, this, new ReleasableLock(mappingLock.readLock()));
|
||||
|
||||
this.typeFilter = typeMapper().termQuery(type, null);
|
||||
this.mappingWriteLock = new ReleasableLock(mappingLock.writeLock());
|
||||
this.mappingLock = mappingLock;
|
||||
|
||||
if (rootMapper(ParentFieldMapper.class).active()) {
|
||||
|
@ -391,16 +394,17 @@ public class DocumentMapper implements ToXContent {
|
|||
}
|
||||
|
||||
private void addObjectMappers(Collection<ObjectMapper> objectMappers) {
|
||||
assert mappingLock.isWriteLockedByCurrentThread();
|
||||
MapBuilder<String, ObjectMapper> builder = MapBuilder.newMapBuilder(this.objectMappers);
|
||||
for (ObjectMapper objectMapper : objectMappers) {
|
||||
builder.put(objectMapper.fullPath(), objectMapper);
|
||||
if (objectMapper.nested().isNested()) {
|
||||
hasNestedObjects = true;
|
||||
try (ReleasableLock lock = mappingWriteLock.acquire()) {
|
||||
MapBuilder<String, ObjectMapper> builder = MapBuilder.newMapBuilder(this.objectMappers);
|
||||
for (ObjectMapper objectMapper : objectMappers) {
|
||||
builder.put(objectMapper.fullPath(), objectMapper);
|
||||
if (objectMapper.nested().isNested()) {
|
||||
hasNestedObjects = true;
|
||||
}
|
||||
}
|
||||
this.objectMappers = builder.immutableMap();
|
||||
mapperService.addObjectMappers(objectMappers);
|
||||
}
|
||||
this.objectMappers = builder.immutableMap();
|
||||
mapperService.addObjectMappers(objectMappers);
|
||||
}
|
||||
|
||||
private MergeResult newMergeContext(boolean simulate) {
|
||||
|
@ -451,8 +455,7 @@ public class DocumentMapper implements ToXContent {
|
|||
}
|
||||
|
||||
public MergeResult merge(Mapping mapping, boolean simulate) {
|
||||
mappingLock.writeLock().lock();
|
||||
try {
|
||||
try (ReleasableLock lock = mappingWriteLock.acquire()) {
|
||||
final MergeResult mergeResult = newMergeContext(simulate);
|
||||
this.mapping.merge(mapping, mergeResult);
|
||||
if (simulate == false) {
|
||||
|
@ -461,8 +464,6 @@ public class DocumentMapper implements ToXContent {
|
|||
refreshSource();
|
||||
}
|
||||
return mergeResult;
|
||||
} finally {
|
||||
mappingLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
|
@ -61,9 +62,9 @@ class DocumentParser implements Closeable {
|
|||
private final Settings indexSettings;
|
||||
private final DocumentMapperParser docMapperParser;
|
||||
private final DocumentMapper docMapper;
|
||||
private final Lock parseLock;
|
||||
private final ReleasableLock parseLock;
|
||||
|
||||
public DocumentParser(String index, Settings indexSettings, DocumentMapperParser docMapperParser, DocumentMapper docMapper, Lock parseLock) {
|
||||
public DocumentParser(String index, Settings indexSettings, DocumentMapperParser docMapperParser, DocumentMapper docMapper, ReleasableLock parseLock) {
|
||||
this.index = index;
|
||||
this.indexSettings = indexSettings;
|
||||
this.docMapperParser = docMapperParser;
|
||||
|
@ -72,11 +73,8 @@ class DocumentParser implements Closeable {
|
|||
}
|
||||
|
||||
public ParsedDocument parseDocument(SourceToParse source) throws MapperParsingException {
|
||||
parseLock.lock();
|
||||
try {
|
||||
try (ReleasableLock lock = parseLock.acquire()){
|
||||
return innerParseDocument(source);
|
||||
} finally {
|
||||
parseLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.lucene.search.Queries;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.index.AbstractIndexComponent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.analysis.AnalysisService;
|
||||
|
@ -100,6 +101,7 @@ public class MapperService extends AbstractIndexComponent {
|
|||
// under the write lock and read operations (document parsing) need to be
|
||||
// performed under the read lock
|
||||
final ReentrantReadWriteLock mappingLock = new ReentrantReadWriteLock();
|
||||
private final ReleasableLock mappingWriteLock = new ReleasableLock(mappingLock.writeLock());
|
||||
|
||||
private volatile FieldMappersLookup fieldMappers;
|
||||
private volatile ImmutableOpenMap<String, ObjectMappers> fullPathObjectMappers = ImmutableOpenMap.of();
|
||||
|
@ -217,11 +219,8 @@ public class MapperService extends AbstractIndexComponent {
|
|||
DocumentMapper mapper = documentParser.parseCompressed(type, mappingSource);
|
||||
// still add it as a document mapper so we have it registered and, for example, persisted back into
|
||||
// the cluster meta data if needed, or checked for existence
|
||||
mappingLock.writeLock().lock();
|
||||
try {
|
||||
try (ReleasableLock lock = mappingWriteLock.acquire()) {
|
||||
mappers = newMapBuilder(mappers).put(type, mapper).map();
|
||||
} finally {
|
||||
mappingLock.writeLock().unlock();
|
||||
}
|
||||
try {
|
||||
defaultMappingSource = mappingSource.string();
|
||||
|
@ -237,8 +236,7 @@ public class MapperService extends AbstractIndexComponent {
|
|||
// never expose this to the outside world, we need to reparse the doc mapper so we get fresh
|
||||
// instances of field mappers to properly remove existing doc mapper
|
||||
private DocumentMapper merge(DocumentMapper mapper) {
|
||||
mappingLock.writeLock().lock();
|
||||
try {
|
||||
try (ReleasableLock lock = mappingWriteLock.acquire()) {
|
||||
if (mapper.type().length() == 0) {
|
||||
throw new InvalidTypeNameException("mapping type name is empty");
|
||||
}
|
||||
|
@ -287,8 +285,6 @@ public class MapperService extends AbstractIndexComponent {
|
|||
mappers = newMapBuilder(mappers).put(mapper.type(), mapper).map();
|
||||
return mapper;
|
||||
}
|
||||
} finally {
|
||||
mappingLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,16 +19,26 @@
|
|||
|
||||
package org.elasticsearch.index.mapper.merge;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.analysis.FieldNameAnalyzer;
|
||||
import org.elasticsearch.index.analysis.NamedAnalyzer;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.DocumentMapperParser;
|
||||
import org.elasticsearch.index.mapper.MergeResult;
|
||||
import org.elasticsearch.index.mapper.*;
|
||||
import org.elasticsearch.index.mapper.core.StringFieldMapper;
|
||||
import org.elasticsearch.index.mapper.object.ObjectMapper;
|
||||
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
|
@ -144,4 +154,62 @@ public class TestMergeMapperTests extends ElasticsearchSingleNodeTest {
|
|||
assertThat(((StringFieldMapper) (existing.mappers().getMapper("field"))).getIgnoreAbove(), equalTo(14));
|
||||
}
|
||||
|
||||
public void testConcurrentMergeTest() throws IOException, BrokenBarrierException, InterruptedException {
|
||||
final MapperService mapperService = createIndex("test").mapperService();
|
||||
final AtomicInteger counter = new AtomicInteger(0);
|
||||
Tuple<DocumentMapper, Mapping> docMapper = mapperService.documentMapperWithAutoCreate("test");
|
||||
final DocumentMapper documentMapper = docMapper.v1();
|
||||
int id = counter.incrementAndGet();
|
||||
ParsedDocument doc = documentMapper.parse("test", Integer.toString(id), new BytesArray("{ \"test_field_" + id + "\" : \"test\" }"));
|
||||
if (docMapper.v2() != null) {
|
||||
doc.addDynamicMappingsUpdate(docMapper.v2());
|
||||
}
|
||||
Mapping mapping = doc.dynamicMappingsUpdate();
|
||||
mapperService.merge("test", new CompressedString(mapping.toString()), false);
|
||||
try {
|
||||
int nextID = counter.get() + 1;
|
||||
DocumentFieldMappers mappers = mapperService.documentMapper("test").mappers();
|
||||
FieldMapper mapper = mappers.getMapper("test_field_" + nextID);
|
||||
assertNull(mapper);
|
||||
((FieldNameAnalyzer)mappers.indexAnalyzer()).getWrappedAnalyzer("test_field_" + nextID);
|
||||
fail("field not there yet");
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertEquals(ex.getMessage(), "Field [test_field_2] has no associated analyzer");
|
||||
}
|
||||
final AtomicBoolean stopped = new AtomicBoolean(false);
|
||||
final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
final Thread updater = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
barrier.await();
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
Tuple<DocumentMapper, Mapping> docMapper = mapperService.documentMapperWithAutoCreate("test");
|
||||
int id = counter.incrementAndGet();
|
||||
ParsedDocument doc = documentMapper.parse("test", Integer.toString(id), new BytesArray("{ \"test_field_" + id + "\" : \"test\" }"));
|
||||
if (docMapper.v2() != null) {
|
||||
doc.addDynamicMappingsUpdate(docMapper.v2());
|
||||
}
|
||||
Mapping mapping = doc.dynamicMappingsUpdate();
|
||||
mapperService.merge("test", new CompressedString(mapping.toString()), false);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
|
||||
} finally {
|
||||
stopped.set(true);
|
||||
}
|
||||
}
|
||||
};
|
||||
updater.start();
|
||||
barrier.await();
|
||||
while(stopped.get() == false) {
|
||||
List<ObjectMapper> newObjectMappers = new ArrayList<>();
|
||||
List<FieldMapper<?>> newFieldMappers = new ArrayList<>();
|
||||
MapperUtils.collect(mapperService.documentMapper("test").root(), newObjectMappers, newFieldMappers);
|
||||
DocumentFieldMappers dfm = mapperService.documentMapper("test").mappers();
|
||||
for (FieldMapper<?> fieldMapper : newFieldMappers) {
|
||||
((FieldNameAnalyzer) dfm.indexAnalyzer()).getWrappedAnalyzer(fieldMapper.name());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue