Update clusterstate if mapping service has local changes

If the during percolating a new field was introduced
in the local mapping service, then those changes should
be updated in cluster state of the master as well.

Closes #5776
This commit is contained in:
Martijn van Groningen 2014-04-11 19:14:22 +07:00 committed by Simon Willnauer
parent 7c6d745523
commit 202b1e2306
2 changed files with 148 additions and 23 deletions

View File

@ -31,12 +31,15 @@ import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.percolate.PercolateShardRequest;
import org.elasticsearch.action.percolate.PercolateShardResponse;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
@ -115,6 +118,7 @@ public class PercolatorService extends AbstractComponent {
private final AggregationPhase aggregationPhase;
private final SortParseElement sortParseElement;
private final ScriptService scriptService;
private final MappingUpdatedAction mappingUpdatedAction;
private final CloseableThreadLocal<MemoryIndex> cache;
@ -122,7 +126,8 @@ public class PercolatorService extends AbstractComponent {
public PercolatorService(Settings settings, IndicesService indicesService, CacheRecycler cacheRecycler,
PageCacheRecycler pageCacheRecycler, BigArrays bigArrays,
HighlightPhase highlightPhase, ClusterService clusterService, FacetPhase facetPhase,
AggregationPhase aggregationPhase, ScriptService scriptService) {
AggregationPhase aggregationPhase, ScriptService scriptService,
MappingUpdatedAction mappingUpdatedAction) {
super(settings);
this.indicesService = indicesService;
this.cacheRecycler = cacheRecycler;
@ -133,6 +138,7 @@ public class PercolatorService extends AbstractComponent {
this.facetPhase = facetPhase;
this.aggregationPhase = aggregationPhase;
this.scriptService = scriptService;
this.mappingUpdatedAction = mappingUpdatedAction;
this.sortParseElement = new SortParseElement();
final long maxReuseBytes = settings.getAsBytesSize("indices.memory.memory_index.size_per_thread", new ByteSizeValue(1, ByteSizeUnit.MB)).bytes();
@ -269,6 +275,9 @@ public class PercolatorService extends AbstractComponent {
MapperService mapperService = documentIndexService.mapperService();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(request.documentType());
doc = docMapper.parse(source(parser).type(request.documentType()).flyweight(true));
if (doc.mappingsModified()) {
updateMappingOnMaster(docMapper, request, documentIndexService.indexUUID());
}
// the document parsing exists the "doc" object, so we need to set the new current field.
currentFieldName = parser.currentName();
}
@ -827,6 +836,31 @@ public class PercolatorService extends AbstractComponent {
}
}
// TODO: maybe move this logic into MappingUpdatedAction? There is similar logic for the index and bulk api now.
private void updateMappingOnMaster(DocumentMapper documentMapper, final PercolateShardRequest request, String indexUUID) {
// we generate the order id before we get the mapping to send and refresh the source, so
// if 2 happen concurrently, we know that the later order will include the previous one
long orderId = mappingUpdatedAction.generateNextMappingUpdateOrder();
documentMapper.refreshSource();
DiscoveryNode node = clusterService.localNode();
final MappingUpdatedAction.MappingUpdatedRequest mappingRequest = new MappingUpdatedAction.MappingUpdatedRequest(
request.index(), indexUUID, request.documentType(), documentMapper.mappingSource(), orderId, node != null ? node.id() : null
);
logger.trace("Sending mapping updated to master: {}", mappingRequest);
mappingUpdatedAction.execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
// all is well
logger.debug("Successfully updated master with mapping update: {}", mappingRequest);
}
@Override
public void onFailure(Throwable e) {
logger.warn("Failed to update master on updated mapping for {}", e, mappingRequest);
}
});
}
private InternalFacets reduceFacets(List<PercolateShardResponse> shardResults) {
if (shardResults.get(0).facets() == null) {
return null;

View File

@ -22,6 +22,7 @@ import com.google.common.base.Predicate;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
@ -32,8 +33,8 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.ImmutableSettings.Builder;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -150,21 +151,7 @@ public class PercolatorTests extends ElasticsearchIntegrationTest {
@Test
public void testSimple2() throws Exception {
//TODO this test seems to have problems with more shards and/or 1 replica instead of 0
client().admin().indices().prepareCreate("index").setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build()
).execute().actionGet();
client().admin().indices().prepareCreate("test").setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build()
).execute().actionGet();
ensureGreen();
createIndex("index", "test");
// introduce the doc
XContentBuilder doc = XContentFactory.jsonBuilder().startObject().startObject("doc")
@ -1595,13 +1582,18 @@ public class PercolatorTests extends ElasticsearchIntegrationTest {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings("test1", "test2").get();
boolean hasPercolatorType = getMappingsResponse.getMappings().get("test1").containsKey(PercolatorService.TYPE_NAME);
if (hasPercolatorType) {
return getMappingsResponse.getMappings().get("test2").containsKey(PercolatorService.TYPE_NAME);
} else {
return false;
for (Client client : cluster()) {
GetMappingsResponse getMappingsResponse = client.admin().indices().prepareGetMappings("test1", "test2").get();
boolean hasPercolatorType = getMappingsResponse.getMappings().get("test1").containsKey(PercolatorService.TYPE_NAME);
if (!hasPercolatorType) {
return false;
}
if (!getMappingsResponse.getMappings().get("test2").containsKey(PercolatorService.TYPE_NAME)) {
return false;
}
}
return true;
}
});
@ -1681,6 +1673,105 @@ public class PercolatorTests extends ElasticsearchIntegrationTest {
assertEquals(response.getMatches()[0].getId().string(), "Q");
}
@Test
public void testPercolationWithDynamicTemplates() throws Exception {
assertAcked(prepareCreate("idx").addMapping("type", jsonBuilder().startObject().startObject("type")
.field("dynamic", false)
.startObject("properties")
.startObject("custom")
.field("dynamic", true)
.field("type", "object")
.field("incude_in_all", false)
.endObject()
.endObject()
.startArray("dynamic_template")
.startObject()
.startObject("custom_fields")
.field("path_match", "custom.*")
.startObject("mapping")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject()
.endArray()
.endObject().endObject()));
client().prepareIndex("idx", PercolatorService.TYPE_NAME, "1")
.setSource(jsonBuilder().startObject().field("query", QueryBuilders.queryString("color:red")).endObject())
.get();
client().prepareIndex("idx", PercolatorService.TYPE_NAME, "2")
.setSource(jsonBuilder().startObject().field("query", QueryBuilders.queryString("color:blue")).endObject())
.get();
PercolateResponse percolateResponse = client().preparePercolate().setDocumentType("type")
.setPercolateDoc(new PercolateSourceBuilder.DocBuilder().setDoc(jsonBuilder().startObject().startObject("custom").field("color", "blue").endObject().endObject()))
.get();
assertMatchCount(percolateResponse, 0l);
assertThat(percolateResponse.getMatches(), arrayWithSize(0));
// wait until the mapping change has propagated from the percolate request
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet();
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get();
return pendingTasks.pendingTasks().isEmpty();
}
});
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet();
// The previous percolate request introduced the custom.color field, so now we register the query again
// and the field name `color` will be resolved to `custom.color` field in mapping via smart field mapping resolving.
client().prepareIndex("idx", PercolatorService.TYPE_NAME, "2")
.setSource(jsonBuilder().startObject().field("query", QueryBuilders.queryString("color:blue")).endObject())
.get();
// The second request will yield a match, since the query during the proper field during parsing.
percolateResponse = client().preparePercolate().setDocumentType("type")
.setPercolateDoc(new PercolateSourceBuilder.DocBuilder().setDoc(jsonBuilder().startObject().startObject("custom").field("color", "blue").endObject().endObject()))
.get();
assertMatchCount(percolateResponse, 1l);
assertThat(percolateResponse.getMatches()[0].getId().string(), equalTo("2"));
}
@Test
public void testUpdateMappingDynamicallyWhilePercolating() throws Exception {
createIndex("test");
// percolation source
XContentBuilder percolateDocumentSource = XContentFactory.jsonBuilder().startObject().startObject("doc")
.field("field1", 1)
.field("field2", "value")
.endObject().endObject();
PercolateResponse response = client().preparePercolate()
.setIndices("test").setDocumentType("type1")
.setSource(percolateDocumentSource).execute().actionGet();
assertMatchCount(response, 0l);
assertThat(response.getMatches(), arrayWithSize(0));
// wait until the mapping change has propagated
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet();
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get();
return pendingTasks.pendingTasks().isEmpty();
}
});
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet();
GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings("test").get();
assertThat(mappingsResponse.getMappings().get("test"), notNullValue());
assertThat(mappingsResponse.getMappings().get("test").get("type1"), notNullValue());
assertThat(mappingsResponse.getMappings().get("test").get("type1").getSourceAsMap().isEmpty(), is(false));
Map<String, Object> properties = (Map<String, Object>) mappingsResponse.getMappings().get("test").get("type1").getSourceAsMap().get("properties");
assertThat(((Map<String, String>) properties.get("field1")).get("type"), equalTo("long"));
assertThat(((Map<String, String>) properties.get("field2")).get("type"), equalTo("string"));
}
void initNestedIndexAndPercolation() throws IOException {
XContentBuilder mapping = XContentFactory.jsonBuilder();
mapping.startObject().startObject("properties").startObject("companyname").field("type", "string").endObject()