better logic on sending mapping update new type introduction

when an indexing request introduces a new mapping, today we rely on the parsing logic to mark it as modified on the "first" parsing phase. This can cause sending of mapping updates to master even when the mapping has been introduced in the create index/put mapping case, and can cause sending mapping updates without needing to.
 This bubbled up in the disabled field data format test, where we explicitly define mappings to not have the update mapping behavior happening, yet it still happens because of the current logic, and because in our test we delay the introduction of any mapping updates randomly, it can get in and override updated ones.
closes #6669
This commit is contained in:
Shay Banon 2014-07-01 21:39:57 +02:00
parent 4091162d91
commit ccd54dae2d
7 changed files with 123 additions and 54 deletions

View File

@ -281,8 +281,6 @@ public class DocumentMapper implements ToXContent {
private final Object mappersMutex = new Object(); private final Object mappersMutex = new Object();
private boolean initMappersAdded = true;
public DocumentMapper(String index, @Nullable Settings indexSettings, DocumentMapperParser docMapperParser, public DocumentMapper(String index, @Nullable Settings indexSettings, DocumentMapperParser docMapperParser,
RootObjectMapper rootObjectMapper, RootObjectMapper rootObjectMapper,
ImmutableMap<String, Object> meta, ImmutableMap<String, Object> meta,
@ -482,11 +480,6 @@ public class DocumentMapper implements ToXContent {
parser = XContentHelper.createParser(source.source()); parser = XContentHelper.createParser(source.source());
} }
context.reset(parser, new ParseContext.Document(), source, listener); context.reset(parser, new ParseContext.Document(), source, listener);
// on a newly created instance of document mapper, we always consider it as new mappers that have been added
if (initMappersAdded) {
context.setMappingsModified();
initMappersAdded = false;
}
// will result in START_OBJECT // will result in START_OBJECT
int countDownTokens = 0; int countDownTokens = 0;

View File

@ -34,6 +34,7 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
@ -399,10 +400,14 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
return mappers.get(type); return mappers.get(type);
} }
public DocumentMapper documentMapperWithAutoCreate(String type) { /**
* Returns the document mapper created, including if the document mapper ended up
* being actually created or not in the second tuple value.
*/
public Tuple<DocumentMapper, Boolean> documentMapperWithAutoCreate(String type) {
DocumentMapper mapper = mappers.get(type); DocumentMapper mapper = mappers.get(type);
if (mapper != null) { if (mapper != null) {
return mapper; return Tuple.tuple(mapper, Boolean.FALSE);
} }
if (!dynamic) { if (!dynamic) {
throw new TypeMissingException(index, type, "trying to auto create mapping, but dynamic mapping is disabled"); throw new TypeMissingException(index, type, "trying to auto create mapping, but dynamic mapping is disabled");
@ -411,10 +416,10 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
synchronized (typeMutex) { synchronized (typeMutex) {
mapper = mappers.get(type); mapper = mappers.get(type);
if (mapper != null) { if (mapper != null) {
return mapper; return Tuple.tuple(mapper, Boolean.FALSE);
} }
merge(type, null, true); merge(type, null, true);
return mappers.get(type); return Tuple.tuple(mappers.get(type), Boolean.TRUE);
} }
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.mapper;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Field; import org.apache.lucene.document.Field;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParseContext.Document;
import java.util.List; import java.util.List;
@ -131,6 +132,24 @@ public class ParsedDocument {
return mappingsModified; return mappingsModified;
} }
/**
* latches the mapping to be marked as modified.
*/
public void setMappingsModified() {
this.mappingsModified = true;
}
/**
* Uses the value of get document or create to automatically set if mapping is
* modified or not.
*/
public ParsedDocument setMappingsModified(Tuple<DocumentMapper, Boolean> docMapper) {
if (docMapper.v2()) {
setMappingsModified();
}
return this;
}
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
@ -369,9 +370,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override @Override
public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) throws ElasticsearchException { public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) throws ElasticsearchException {
long startTime = System.nanoTime(); long startTime = System.nanoTime();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type()); Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(source.type());
ParsedDocument doc = docMapper.parse(source); ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId); return new Engine.Create(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId);
} }
@Override @Override
@ -390,9 +391,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override @Override
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException { public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException {
long startTime = System.nanoTime(); long startTime = System.nanoTime();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type()); Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(source.type());
ParsedDocument doc = docMapper.parse(source); ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates); return new Engine.Index(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates);
} }
@Override @Override
@ -416,7 +417,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override @Override
public Engine.Delete prepareDelete(String type, String id, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException { public Engine.Delete prepareDelete(String type, String id, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException {
long startTime = System.nanoTime(); long startTime = System.nanoTime();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type); DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type).v1();
return new Engine.Delete(type, id, docMapper.uidMapper().term(type, id), version, versionType, origin, startTime, false); return new Engine.Delete(type, id, docMapper.uidMapper().term(type, id), version, versionType, origin, startTime, false);
} }

View File

@ -40,6 +40,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -276,10 +277,10 @@ public class PercolatorService extends AbstractComponent {
} }
MapperService mapperService = documentIndexService.mapperService(); MapperService mapperService = documentIndexService.mapperService();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(request.documentType()); Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(request.documentType());
doc = docMapper.parse(source(parser).type(request.documentType()).flyweight(true)); doc = docMapper.v1().parse(source(parser).type(request.documentType()).flyweight(true)).setMappingsModified(docMapper);
if (doc.mappingsModified()) { if (doc.mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(request.index(), docMapper, documentIndexService.indexUUID()); mappingUpdatedAction.updateMappingOnMaster(request.index(), docMapper.v1(), documentIndexService.indexUUID());
} }
// the document parsing exists the "doc" object, so we need to set the new current field. // the document parsing exists the "doc" object, so we need to set the new current field.
currentFieldName = parser.currentName(); currentFieldName = parser.currentName();
@ -386,8 +387,8 @@ public class PercolatorService extends AbstractComponent {
try { try {
parser = XContentFactory.xContent(fetchedDoc).createParser(fetchedDoc); parser = XContentFactory.xContent(fetchedDoc).createParser(fetchedDoc);
MapperService mapperService = documentIndexService.mapperService(); MapperService mapperService = documentIndexService.mapperService();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type); Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(type);
doc = docMapper.parse(source(parser).type(type).flyweight(true)); doc = docMapper.v1().parse(source(parser).type(type).flyweight(true));
if (context.highlight() != null) { if (context.highlight() != null) {
doc.setSource(fetchedDoc); doc.setSource(fetchedDoc);

View File

@ -19,31 +19,37 @@
package org.elasticsearch.index.fielddata; package org.elasticsearch.index.fielddata;
import com.google.common.base.Predicate;
import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.MapperService.SmartNameFieldMappers;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import java.util.Set;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
@ClusterScope(randomDynamicTemplates = false) @ClusterScope(randomDynamicTemplates = false)
public class DisabledFieldDataFormatTests extends ElasticsearchIntegrationTest { public class DisabledFieldDataFormatTests extends ElasticsearchIntegrationTest {
@Override
protected int numberOfReplicas() {
return 0;
}
public void test() throws Exception { public void test() throws Exception {
createIndex("test"); prepareCreate("test").addMapping("type", "s", "type=string").execute().actionGet();
ensureGreen(); ensureGreen();
logger.info("indexing data start");
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
client().prepareIndex("test", "type", Integer.toString(i)).setSource("s", "value" + i).execute().actionGet(); client().prepareIndex("test", "type", Integer.toString(i)).setSource("s", "value" + i).execute().actionGet();
} }
logger.info("indexing data end");
final int searchCycles = 20;
refresh(); refresh();
@ -53,55 +59,99 @@ public class DisabledFieldDataFormatTests extends ElasticsearchIntegrationTest {
SubAggCollectionMode aggCollectionMode = randomFrom(SubAggCollectionMode.values()); SubAggCollectionMode aggCollectionMode = randomFrom(SubAggCollectionMode.values());
SearchResponse resp = null; SearchResponse resp = null;
// try to run something that relies on field data and make sure that it fails // try to run something that relies on field data and make sure that it fails
try { for (int i = 0; i < searchCycles; i++) {
resp = client().prepareSearch("test").addAggregation(AggregationBuilders.terms("t").field("s") try {
.collectMode(aggCollectionMode)).execute().actionGet(); resp = client().prepareSearch("test").setPreference(Integer.toString(i)).addAggregation(AggregationBuilders.terms("t").field("s")
assertFailures(resp); .collectMode(aggCollectionMode)).execute().actionGet();
} catch (SearchPhaseExecutionException e) { assertFailures(resp);
// expected } catch (SearchPhaseExecutionException e) {
// expected
}
} }
// enable it again // enable it again
updateFormat("paged_bytes"); updateFormat("paged_bytes");
// try to run something that relies on field data and make sure that it works // try to run something that relies on field data and make sure that it works
resp = client().prepareSearch("test").addAggregation(AggregationBuilders.terms("t").field("s") for (int i = 0; i < searchCycles; i++) {
.collectMode(aggCollectionMode)).execute().actionGet(); resp = client().prepareSearch("test").setPreference(Integer.toString(i)).addAggregation(AggregationBuilders.terms("t").field("s")
assertNoFailures(resp); .collectMode(aggCollectionMode)).execute().actionGet();
assertNoFailures(resp);
}
// disable it again // disable it again
updateFormat("disabled"); updateFormat("disabled");
// this time, it should work because segments are already loaded // this time, it should work because segments are already loaded
resp = client().prepareSearch("test").addAggregation(AggregationBuilders.terms("t").field("s") for (int i = 0; i < searchCycles; i++) {
.collectMode(aggCollectionMode)).execute().actionGet(); resp = client().prepareSearch("test").setPreference(Integer.toString(i)).addAggregation(AggregationBuilders.terms("t").field("s")
assertNoFailures(resp); .collectMode(aggCollectionMode)).execute().actionGet();
assertNoFailures(resp);
}
// but add more docs and the new segment won't be loaded // but add more docs and the new segment won't be loaded
client().prepareIndex("test", "type", "-1").setSource("s", "value").execute().actionGet(); client().prepareIndex("test", "type", "-1").setSource("s", "value").execute().actionGet();
refresh(); refresh();
try { for (int i = 0; i < searchCycles; i++) {
resp = client().prepareSearch("test").addAggregation(AggregationBuilders.terms("t").field("s") try {
.collectMode(aggCollectionMode)).execute().actionGet(); resp = client().prepareSearch("test").setPreference(Integer.toString(i)).addAggregation(AggregationBuilders.terms("t").field("s")
assertFailures(resp); .collectMode(aggCollectionMode)).execute().actionGet();
} catch (SearchPhaseExecutionException e) { assertFailures(resp);
// expected } catch (SearchPhaseExecutionException e) {
// expected
}
} }
} }
private void updateFormat(String format) throws Exception { private void updateFormat(final String format) throws Exception {
logger.info(">> put mapping start {}", format);
client().admin().indices().preparePutMapping("test").setType("type").setSource( client().admin().indices().preparePutMapping("test").setType("type").setSource(
XContentFactory.jsonBuilder().startObject().startObject("type") XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("properties") .startObject("properties")
.startObject("s") .startObject("s")
.field("type", "string") .field("type", "string")
.startObject("fielddata") .startObject("fielddata")
.field("format", format) .field("format", format)
.endObject() .endObject()
.endObject() .endObject()
.endObject()
.endObject() .endObject()
.endObject()).execute().actionGet(); .endObject()
.endObject()).execute().actionGet();
logger.info(">> put mapping end {}", format);
boolean applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
try {
Set<String> nodes = internalCluster().nodesInclude("test");
if (nodes.isEmpty()) { // we expect at least one node to hold an index, so wait if not allocated yet
return false;
}
for (String node : nodes) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService("test");
if (indexService == null) {
return false;
}
final SmartNameFieldMappers mappers = indexService.mapperService().smartName("s");
if (mappers == null || !mappers.hasMapper()) {
return false;
}
final String currentFormat = mappers.mapper().fieldDataType().getFormat(ImmutableSettings.EMPTY);
if (!format.equals(currentFormat)) {
return false;
}
}
} catch (Exception e) {
logger.info("got exception waiting for concrete mappings", e);
return false;
}
return true;
}
});
logger.info(">> put mapping verified {}, applies {}", format, applied);
if (!applied) {
fail();
}
} }
} }

View File

@ -176,7 +176,7 @@ public class DefaultSourceMappingTests extends ElasticsearchTestCase {
MapperService mapperService = MapperTestUtils.newMapperService(); MapperService mapperService = MapperTestUtils.newMapperService();
mapperService.merge(MapperService.DEFAULT_MAPPING, new CompressedString(defaultMapping), true); mapperService.merge(MapperService.DEFAULT_MAPPING, new CompressedString(defaultMapping), true);
DocumentMapper mapper = mapperService.documentMapperWithAutoCreate("my_type"); DocumentMapper mapper = mapperService.documentMapperWithAutoCreate("my_type").v1();
assertThat(mapper.type(), equalTo("my_type")); assertThat(mapper.type(), equalTo("my_type"));
assertThat(mapper.sourceMapper().enabled(), equalTo(false)); assertThat(mapper.sourceMapper().enabled(), equalTo(false));
} }