From 202b1e2306e7d18b93309cd3980e48d8ad0b75b9 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 11 Apr 2014 19:14:22 +0700 Subject: [PATCH] Update clusterstate if mapping service has local changes If the during percolating a new field was introduced in the local mapping service, then those changes should be updated in cluster state of the master as well. Closes #5776 --- .../percolator/PercolatorService.java | 36 ++++- .../percolator/PercolatorTests.java | 135 +++++++++++++++--- 2 files changed, 148 insertions(+), 23 deletions(-) diff --git a/src/main/java/org/elasticsearch/percolator/PercolatorService.java b/src/main/java/org/elasticsearch/percolator/PercolatorService.java index 01c6d6523d8..d01163be945 100644 --- a/src/main/java/org/elasticsearch/percolator/PercolatorService.java +++ b/src/main/java/org/elasticsearch/percolator/PercolatorService.java @@ -31,12 +31,15 @@ import org.apache.lucene.util.CloseableThreadLocal; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.percolate.PercolateResponse; import org.elasticsearch.action.percolate.PercolateShardRequest; import org.elasticsearch.action.percolate.PercolateShardResponse; import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.action.index.MappingUpdatedAction; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; @@ -115,6 +118,7 @@ public class PercolatorService extends AbstractComponent { private final AggregationPhase aggregationPhase; private final SortParseElement sortParseElement; private final ScriptService scriptService; + private final MappingUpdatedAction mappingUpdatedAction; private final CloseableThreadLocal cache; @@ -122,7 +126,8 @@ public class PercolatorService extends AbstractComponent { public PercolatorService(Settings settings, IndicesService indicesService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, HighlightPhase highlightPhase, ClusterService clusterService, FacetPhase facetPhase, - AggregationPhase aggregationPhase, ScriptService scriptService) { + AggregationPhase aggregationPhase, ScriptService scriptService, + MappingUpdatedAction mappingUpdatedAction) { super(settings); this.indicesService = indicesService; this.cacheRecycler = cacheRecycler; @@ -133,6 +138,7 @@ public class PercolatorService extends AbstractComponent { this.facetPhase = facetPhase; this.aggregationPhase = aggregationPhase; this.scriptService = scriptService; + this.mappingUpdatedAction = mappingUpdatedAction; this.sortParseElement = new SortParseElement(); final long maxReuseBytes = settings.getAsBytesSize("indices.memory.memory_index.size_per_thread", new ByteSizeValue(1, ByteSizeUnit.MB)).bytes(); @@ -269,6 +275,9 @@ public class PercolatorService extends AbstractComponent { MapperService mapperService = documentIndexService.mapperService(); DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(request.documentType()); doc = docMapper.parse(source(parser).type(request.documentType()).flyweight(true)); + if (doc.mappingsModified()) { + updateMappingOnMaster(docMapper, request, documentIndexService.indexUUID()); + } // the document parsing exists the "doc" object, so we need to set the new current field. currentFieldName = parser.currentName(); } @@ -827,6 +836,31 @@ public class PercolatorService extends AbstractComponent { } } + // TODO: maybe move this logic into MappingUpdatedAction? There is similar logic for the index and bulk api now. + private void updateMappingOnMaster(DocumentMapper documentMapper, final PercolateShardRequest request, String indexUUID) { + // we generate the order id before we get the mapping to send and refresh the source, so + // if 2 happen concurrently, we know that the later order will include the previous one + long orderId = mappingUpdatedAction.generateNextMappingUpdateOrder(); + documentMapper.refreshSource(); + DiscoveryNode node = clusterService.localNode(); + final MappingUpdatedAction.MappingUpdatedRequest mappingRequest = new MappingUpdatedAction.MappingUpdatedRequest( + request.index(), indexUUID, request.documentType(), documentMapper.mappingSource(), orderId, node != null ? node.id() : null + ); + logger.trace("Sending mapping updated to master: {}", mappingRequest); + mappingUpdatedAction.execute(mappingRequest, new ActionListener() { + @Override + public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) { + // all is well + logger.debug("Successfully updated master with mapping update: {}", mappingRequest); + } + + @Override + public void onFailure(Throwable e) { + logger.warn("Failed to update master on updated mapping for {}", e, mappingRequest); + } + }); + } + private InternalFacets reduceFacets(List shardResults) { if (shardResults.get(0).facets() == null) { return null; diff --git a/src/test/java/org/elasticsearch/percolator/PercolatorTests.java b/src/test/java/org/elasticsearch/percolator/PercolatorTests.java index bd8c1d45d83..cbfea32b3e0 100644 --- a/src/test/java/org/elasticsearch/percolator/PercolatorTests.java +++ b/src/test/java/org/elasticsearch/percolator/PercolatorTests.java @@ -22,6 +22,7 @@ import com.google.common.base.Predicate; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; @@ -32,8 +33,8 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings.Builder; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -150,21 +151,7 @@ public class PercolatorTests extends ElasticsearchIntegrationTest { @Test public void testSimple2() throws Exception { - - //TODO this test seems to have problems with more shards and/or 1 replica instead of 0 - client().admin().indices().prepareCreate("index").setSettings( - ImmutableSettings.settingsBuilder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .build() - ).execute().actionGet(); - client().admin().indices().prepareCreate("test").setSettings( - ImmutableSettings.settingsBuilder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .build() - ).execute().actionGet(); - ensureGreen(); + createIndex("index", "test"); // introduce the doc XContentBuilder doc = XContentFactory.jsonBuilder().startObject().startObject("doc") @@ -1595,13 +1582,18 @@ public class PercolatorTests extends ElasticsearchIntegrationTest { awaitBusy(new Predicate() { @Override public boolean apply(Object o) { - GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings("test1", "test2").get(); - boolean hasPercolatorType = getMappingsResponse.getMappings().get("test1").containsKey(PercolatorService.TYPE_NAME); - if (hasPercolatorType) { - return getMappingsResponse.getMappings().get("test2").containsKey(PercolatorService.TYPE_NAME); - } else { - return false; + for (Client client : cluster()) { + GetMappingsResponse getMappingsResponse = client.admin().indices().prepareGetMappings("test1", "test2").get(); + boolean hasPercolatorType = getMappingsResponse.getMappings().get("test1").containsKey(PercolatorService.TYPE_NAME); + if (!hasPercolatorType) { + return false; + } + + if (!getMappingsResponse.getMappings().get("test2").containsKey(PercolatorService.TYPE_NAME)) { + return false; + } } + return true; } }); @@ -1681,6 +1673,105 @@ public class PercolatorTests extends ElasticsearchIntegrationTest { assertEquals(response.getMatches()[0].getId().string(), "Q"); } + @Test + public void testPercolationWithDynamicTemplates() throws Exception { + assertAcked(prepareCreate("idx").addMapping("type", jsonBuilder().startObject().startObject("type") + .field("dynamic", false) + .startObject("properties") + .startObject("custom") + .field("dynamic", true) + .field("type", "object") + .field("incude_in_all", false) + .endObject() + .endObject() + .startArray("dynamic_template") + .startObject() + .startObject("custom_fields") + .field("path_match", "custom.*") + .startObject("mapping") + .field("index", "not_analyzed") + .endObject() + .endObject() + .endObject() + .endArray() + .endObject().endObject())); + + client().prepareIndex("idx", PercolatorService.TYPE_NAME, "1") + .setSource(jsonBuilder().startObject().field("query", QueryBuilders.queryString("color:red")).endObject()) + .get(); + client().prepareIndex("idx", PercolatorService.TYPE_NAME, "2") + .setSource(jsonBuilder().startObject().field("query", QueryBuilders.queryString("color:blue")).endObject()) + .get(); + + PercolateResponse percolateResponse = client().preparePercolate().setDocumentType("type") + .setPercolateDoc(new PercolateSourceBuilder.DocBuilder().setDoc(jsonBuilder().startObject().startObject("custom").field("color", "blue").endObject().endObject())) + .get(); + + assertMatchCount(percolateResponse, 0l); + assertThat(percolateResponse.getMatches(), arrayWithSize(0)); + + // wait until the mapping change has propagated from the percolate request + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet(); + awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get(); + return pendingTasks.pendingTasks().isEmpty(); + } + }); + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet(); + + // The previous percolate request introduced the custom.color field, so now we register the query again + // and the field name `color` will be resolved to `custom.color` field in mapping via smart field mapping resolving. + client().prepareIndex("idx", PercolatorService.TYPE_NAME, "2") + .setSource(jsonBuilder().startObject().field("query", QueryBuilders.queryString("color:blue")).endObject()) + .get(); + + // The second request will yield a match, since the query during the proper field during parsing. + percolateResponse = client().preparePercolate().setDocumentType("type") + .setPercolateDoc(new PercolateSourceBuilder.DocBuilder().setDoc(jsonBuilder().startObject().startObject("custom").field("color", "blue").endObject().endObject())) + .get(); + + assertMatchCount(percolateResponse, 1l); + assertThat(percolateResponse.getMatches()[0].getId().string(), equalTo("2")); + } + + @Test + public void testUpdateMappingDynamicallyWhilePercolating() throws Exception { + createIndex("test"); + + // percolation source + XContentBuilder percolateDocumentSource = XContentFactory.jsonBuilder().startObject().startObject("doc") + .field("field1", 1) + .field("field2", "value") + .endObject().endObject(); + + PercolateResponse response = client().preparePercolate() + .setIndices("test").setDocumentType("type1") + .setSource(percolateDocumentSource).execute().actionGet(); + assertMatchCount(response, 0l); + assertThat(response.getMatches(), arrayWithSize(0)); + + // wait until the mapping change has propagated + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet(); + awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get(); + return pendingTasks.pendingTasks().isEmpty(); + } + }); + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet(); + + GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings("test").get(); + assertThat(mappingsResponse.getMappings().get("test"), notNullValue()); + assertThat(mappingsResponse.getMappings().get("test").get("type1"), notNullValue()); + assertThat(mappingsResponse.getMappings().get("test").get("type1").getSourceAsMap().isEmpty(), is(false)); + Map properties = (Map) mappingsResponse.getMappings().get("test").get("type1").getSourceAsMap().get("properties"); + assertThat(((Map) properties.get("field1")).get("type"), equalTo("long")); + assertThat(((Map) properties.get("field2")).get("type"), equalTo("string")); + } + void initNestedIndexAndPercolation() throws IOException { XContentBuilder mapping = XContentFactory.jsonBuilder(); mapping.startObject().startObject("properties").startObject("companyname").field("type", "string").endObject()