Clean up and make the test work.

This commit is contained in:
Adrien Grand 2015-05-21 17:37:02 +02:00
parent c8f39dd2b9
commit 6b3918a97c
2 changed files with 59 additions and 55 deletions

View File

@ -394,7 +394,7 @@ public class DocumentMapper implements ToXContent {
} }
private void addObjectMappers(Collection<ObjectMapper> objectMappers) { private void addObjectMappers(Collection<ObjectMapper> objectMappers) {
try (ReleasableLock lock = mappingWriteLock.acquire()) { assert mappingLock.isWriteLockedByCurrentThread();
MapBuilder<String, ObjectMapper> builder = MapBuilder.newMapBuilder(this.objectMappers); MapBuilder<String, ObjectMapper> builder = MapBuilder.newMapBuilder(this.objectMappers);
for (ObjectMapper objectMapper : objectMappers) { for (ObjectMapper objectMapper : objectMappers) {
builder.put(objectMapper.fullPath(), objectMapper); builder.put(objectMapper.fullPath(), objectMapper);
@ -405,7 +405,6 @@ public class DocumentMapper implements ToXContent {
this.objectMappers = builder.immutableMap(); this.objectMappers = builder.immutableMap();
mapperService.addObjectMappers(objectMappers); mapperService.addObjectMappers(objectMappers);
} }
}
private MergeResult newMergeContext(boolean simulate) { private MergeResult newMergeContext(boolean simulate) {
return new MergeResult(simulate) { return new MergeResult(simulate) {

View File

@ -20,26 +20,30 @@
package org.elasticsearch.index.mapper.merge; package org.elasticsearch.index.mapper.merge;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.analysis.FieldNameAnalyzer; import org.elasticsearch.index.analysis.FieldNameAnalyzer;
import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.mapper.*; import org.elasticsearch.index.mapper.DocumentFieldMappers;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperParser;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.MergeResult;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.core.StringFieldMapper; import org.elasticsearch.index.mapper.core.StringFieldMapper;
import org.elasticsearch.index.mapper.object.ObjectMapper; import org.elasticsearch.index.mapper.object.ObjectMapper;
import org.elasticsearch.test.ElasticsearchSingleNodeTest; import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
/** /**
* *
@ -154,62 +158,63 @@ public class TestMergeMapperTests extends ElasticsearchSingleNodeTest {
assertThat(((StringFieldMapper) (existing.mappers().getMapper("field"))).getIgnoreAbove(), equalTo(14)); assertThat(((StringFieldMapper) (existing.mappers().getMapper("field"))).getIgnoreAbove(), equalTo(14));
} }
public void testConcurrentMergeTest() throws IOException, BrokenBarrierException, InterruptedException { public void testConcurrentMergeTest() throws Throwable {
final MapperService mapperService = createIndex("test").mapperService(); final MapperService mapperService = createIndex("test").mapperService();
final AtomicInteger counter = new AtomicInteger(0); mapperService.merge("test", new CompressedString("{\"test\":{}}"), true);
Tuple<DocumentMapper, Mapping> docMapper = mapperService.documentMapperWithAutoCreate("test"); final DocumentMapper documentMapper = mapperService.documentMapper("test");
final DocumentMapper documentMapper = docMapper.v1();
int id = counter.incrementAndGet(); DocumentFieldMappers dfm = documentMapper.mappers();
ParsedDocument doc = documentMapper.parse("test", Integer.toString(id), new BytesArray("{ \"test_field_" + id + "\" : \"test\" }"));
if (docMapper.v2() != null) {
doc.addDynamicMappingsUpdate(docMapper.v2());
}
Mapping mapping = doc.dynamicMappingsUpdate();
mapperService.merge("test", new CompressedString(mapping.toString()), false);
try { try {
int nextID = counter.get() + 1; ((FieldNameAnalyzer) dfm.indexAnalyzer()).getWrappedAnalyzer("non_existing_field");
DocumentFieldMappers mappers = mapperService.documentMapper("test").mappers(); fail();
FieldMapper mapper = mappers.getMapper("test_field_" + nextID); } catch (IllegalArgumentException e) {
assertNull(mapper); // ok that's expected
((FieldNameAnalyzer)mappers.indexAnalyzer()).getWrappedAnalyzer("test_field_" + nextID);
fail("field not there yet");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getMessage(), "Field [test_field_2] has no associated analyzer");
} }
final AtomicBoolean stopped = new AtomicBoolean(false); final AtomicBoolean stopped = new AtomicBoolean(false);
final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicReference<String> lastIntroducedFieldName = new AtomicReference<>();
final AtomicReference<Throwable> error = new AtomicReference<>();
final Thread updater = new Thread() { final Thread updater = new Thread() {
@Override @Override
public void run() { public void run() {
try { try {
barrier.await(); barrier.await();
for (int i = 0; i < 10000; i++) { for (int i = 0; i < 200 && stopped.get() == false; i++) {
Tuple<DocumentMapper, Mapping> docMapper = mapperService.documentMapperWithAutoCreate("test"); final String fieldName = Integer.toString(i);
int id = counter.incrementAndGet(); ParsedDocument doc = documentMapper.parse("test", fieldName, new BytesArray("{ \"" + fieldName + "\" : \"test\" }"));
ParsedDocument doc = documentMapper.parse("test", Integer.toString(id), new BytesArray("{ \"test_field_" + id + "\" : \"test\" }")); Mapping update = doc.dynamicMappingsUpdate();
if (docMapper.v2() != null) { assert update != null;
doc.addDynamicMappingsUpdate(docMapper.v2()); lastIntroducedFieldName.set(fieldName);
mapperService.merge("test", new CompressedString(update.toString()), false);
} }
Mapping mapping = doc.dynamicMappingsUpdate(); } catch (Throwable t) {
mapperService.merge("test", new CompressedString(mapping.toString()), false); error.set(t);
}
} catch (Exception ex) {
} finally { } finally {
stopped.set(true); stopped.set(true);
} }
} }
}; };
updater.start(); updater.start();
try {
barrier.await(); barrier.await();
while(stopped.get() == false) { while(stopped.get() == false) {
List<ObjectMapper> newObjectMappers = new ArrayList<>(); final String fieldName = lastIntroducedFieldName.get();
List<FieldMapper<?>> newFieldMappers = new ArrayList<>(); final BytesReference source = new BytesArray("{ \"" + fieldName + "\" : \"test\" }");
MapperUtils.collect(mapperService.documentMapper("test").root(), newObjectMappers, newFieldMappers); ParsedDocument parsedDoc = documentMapper.parse("test", "random", source);
DocumentFieldMappers dfm = mapperService.documentMapper("test").mappers(); if (parsedDoc.dynamicMappingsUpdate() != null) {
for (FieldMapper<?> fieldMapper : newFieldMappers) { // not in the mapping yet, try again
((FieldNameAnalyzer) dfm.indexAnalyzer()).getWrappedAnalyzer(fieldMapper.name()); continue;
} }
dfm = documentMapper.mappers();
((FieldNameAnalyzer) dfm.indexAnalyzer()).getWrappedAnalyzer(fieldName);
}
} finally {
stopped.set(true);
updater.join();
}
if (error.get() != null) {
throw error.get();
} }
} }
} }