From c6cdf7781b27e1340770cf57201370592bd3f43e Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Tue, 21 Apr 2015 15:30:35 +0200 Subject: [PATCH] Mappings: simplify dynamic mappings updates. While dynamic mappings updates are using the same code path as updates from the API when applied on a data node since #10593, they were still using a different code path on the master node. This commit makes dynamic updates processed the same way as updates from the API, which also seems to do a better way at acknowledgements (I could not reproduce the ConcurrentDynamicTemplateTests failure anymore). It also adds more checks, like for instance that indexing on replicas should not trigger dynamic mapping updates since they should have been handled on the primary before. Close #10720 --- .../action/bulk/TransportShardBulkAction.java | 87 +++-- .../action/index/TransportIndexAction.java | 90 +++-- ...nsportShardReplicationOperationAction.java | 2 +- .../action/index/MappingUpdatedAction.java | 364 +++--------------- .../metadata/MetaDataMappingService.java | 41 -- .../index/gateway/IndexShardGateway.java | 2 +- .../termvectors/ShardTermVectorsService.java | 30 +- .../recovery/RecoverySourceHandler.java | 2 +- .../java/org/elasticsearch/node/Node.java | 3 +- .../percolator/PercolatorService.java | 2 +- .../ConcurrentDynamicTemplateTests.java | 1 - 11 files changed, 201 insertions(+), 423 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 245d7d16033..9e354e82836 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -52,6 +52,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; @@ -352,23 +353,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } - private void applyMappingUpdate(IndexService indexService, String type, Mapping update) throws Throwable { - // HACK: Rivers seem to have something specific that triggers potential - // deadlocks when doing concurrent indexing. So for now they keep the - // old behaviour of updating mappings locally first and then - // asynchronously notifying the master - // this can go away when rivers are removed - final String indexName = indexService.index().name(); - final String indexUUID = indexService.indexUUID(); - if (indexName.equals(RiverIndexName.Conf.indexName(settings))) { - indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true); - mappingUpdatedAction.updateMappingOnMaster(indexName, indexUUID, type, update, null); - } else { - mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexUUID, type, update); - indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true); - } - } - private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState, IndexShard indexShard, IndexService indexService, boolean processed) throws Throwable { @@ -392,20 +376,54 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation Engine.IndexingOperation op; if (indexRequest.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates()); - if (index.parsedDoc().dynamicMappingsUpdate() != null) { - applyMappingUpdate(indexService, indexRequest.type(), index.parsedDoc().dynamicMappingsUpdate()); + Mapping update = index.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + final String indexName = indexService.index().name(); + if (indexName.equals(RiverIndexName.Conf.indexName(settings))) { + // With rivers, we have a chicken and egg problem if indexing + // the _meta document triggers a mapping update. Because we would + // like to validate the mapping update first, but on the other + // hand putting the mapping would start the river, which expects + // to find a _meta document + // So we have no choice but to index first and send mappings afterwards + MapperService mapperService = indexService.mapperService(); + mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true); + indexShard.index(index); + mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update); + } else { + mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update); + indexShard.index(index); + } + } else { + indexShard.index(index); } - indexShard.index(index); version = index.version(); op = index; created = index.created(); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId()); - if (create.parsedDoc().dynamicMappingsUpdate() != null) { - applyMappingUpdate(indexService, indexRequest.type(), create.parsedDoc().dynamicMappingsUpdate()); + Mapping update = create.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + final String indexName = indexService.index().name(); + if (indexName.equals(RiverIndexName.Conf.indexName(settings))) { + // With rivers, we have a chicken and egg problem if indexing + // the _meta document triggers a mapping update. Because we would + // like to validate the mapping update first, but on the other + // hand putting the mapping would start the river, which expects + // to find a _meta document + // So we have no choice but to index first and send mappings afterwards + MapperService mapperService = indexService.mapperService(); + mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true); + indexShard.create(create); + mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update); + } else { + mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update); + indexShard.create(create); + } + } else { + indexShard.create(create); } - indexShard.create(create); version = create.version(); op = create; created = true; @@ -528,8 +546,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation @Override - protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { - IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id()); + protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception { + IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()); + IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id()); final BulkShardRequest request = shardRequest.request; for (int i = 0; i < request.items().length; i++) { BulkItemRequest item = request.items()[i]; @@ -544,11 +563,29 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation if (indexRequest.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates()); + if (index.parsedDoc().dynamicMappingsUpdate() != null) { + if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) { + // mappings updates on the _river are not validated synchronously so we can't + // assume they are here when indexing on a replica + indexService.mapperService().merge(indexRequest.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true); + } else { + throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]"); + } + } indexShard.index(index); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId()); + if (create.parsedDoc().dynamicMappingsUpdate() != null) { + if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) { + // mappings updates on the _river are not validated synchronously so we can't + // assume they are here when indexing on a replica + indexService.mapperService().merge(indexRequest.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true); + } else { + throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]"); + } + } indexShard.create(create); } } catch (Throwable e) { diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 79ea496c317..494f70708cb 100644 --- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.index; +import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.RoutingMissingException; @@ -42,6 +43,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; @@ -51,6 +53,8 @@ import org.elasticsearch.river.RiverIndexName; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; + /** * Performs the index operation. *

@@ -167,23 +171,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi .indexShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing()); } - private void applyMappingUpdate(IndexService indexService, String type, Mapping update) throws Throwable { - // HACK: Rivers seem to have something specific that triggers potential - // deadlocks when doing concurrent indexing. So for now they keep the - // old behaviour of updating mappings locally first and then - // asynchronously notifying the master - // this can go away when rivers are removed - final String indexName = indexService.index().name(); - final String indexUUID = indexService.indexUUID(); - if (indexName.equals(RiverIndexName.Conf.indexName(settings))) { - indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true); - mappingUpdatedAction.updateMappingOnMaster(indexName, indexUUID, type, update, null); - } else { - mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexUUID, type, update); - indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true); - } - } - @Override protected Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable { final IndexRequest request = shardRequest.request; @@ -206,19 +193,53 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi if (request.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates()); - if (index.parsedDoc().dynamicMappingsUpdate() != null) { - applyMappingUpdate(indexService, request.type(), index.parsedDoc().dynamicMappingsUpdate()); + Mapping update = index.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + final String indexName = indexService.index().name(); + if (indexName.equals(RiverIndexName.Conf.indexName(settings))) { + // With rivers, we have a chicken and egg problem if indexing + // the _meta document triggers a mapping update. Because we would + // like to validate the mapping update first, but on the other + // hand putting the mapping would start the river, which expects + // to find a _meta document + // So we have no choice but to index first and send mappings afterwards + MapperService mapperService = indexService.mapperService(); + mapperService.merge(request.type(), new CompressedString(update.toBytes()), true); + indexShard.index(index); + mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update); + } else { + mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update); + indexShard.index(index); + } + } else { + indexShard.index(index); } - indexShard.index(index); version = index.version(); created = index.created(); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId()); - if (create.parsedDoc().dynamicMappingsUpdate() != null) { - applyMappingUpdate(indexService, request.type(), create.parsedDoc().dynamicMappingsUpdate()); + Mapping update = create.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + final String indexName = indexService.index().name(); + if (indexName.equals(RiverIndexName.Conf.indexName(settings))) { + // With rivers, we have a chicken and egg problem if indexing + // the _meta document triggers a mapping update. Because we would + // like to validate the mapping update first, but on the other + // hand putting the mapping would start the river, which expects + // to find a _meta document + // So we have no choice but to index first and send mappings afterwards + MapperService mapperService = indexService.mapperService(); + mapperService.merge(request.type(), new CompressedString(update.toBytes()), true); + indexShard.create(create); + mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update); + } else { + mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update); + indexShard.create(create); + } + } else { + indexShard.create(create); } - indexShard.create(create); version = create.version(); created = true; } @@ -239,17 +260,36 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } @Override - protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { - IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id()); + protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws IOException { + IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()); + IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id()); IndexRequest request = shardRequest.request; SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id()) .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); if (request.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates()); + if (index.parsedDoc().dynamicMappingsUpdate() != null) { + if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) { + // mappings updates on the _river are not validated synchronously so we can't + // assume they are here when indexing on a replica + indexService.mapperService().merge(request.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true); + } else { + throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]"); + } + } indexShard.index(index); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId()); + if (create.parsedDoc().dynamicMappingsUpdate() != null) { + if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) { + // mappings updates on the _river are not validated synchronously so we can't + // assume they are here when indexing on a replica + indexService.mapperService().merge(request.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true); + } else { + throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]"); + } + } indexShard.create(create); } if (request.refresh()) { diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index df99d045177..c5a0fc95efe 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -117,7 +117,7 @@ public abstract class TransportShardReplicationOperationAction shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable; - protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest); + protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception; protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException; diff --git a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index 6c5e92b3799..44727699354 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -19,61 +19,31 @@ package org.elasticsearch.cluster.action.index; -import com.google.common.collect.ImmutableMap; - -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.MasterNodeOperationRequest; -import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaDataMappingService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.compress.CompressedString; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.node.settings.NodeSettingsService; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated * in the cluster state meta data (and broadcast to all members). */ -public class MappingUpdatedAction extends TransportMasterNodeOperationAction { +public class MappingUpdatedAction extends AbstractComponent { public static final String INDICES_MAPPING_DYNAMIC_TIMEOUT = "indices.mapping.dynamic_timeout"; - public static final String ACTION_NAME = "internal:cluster/mapping_updated"; - - private final MetaDataMappingService metaDataMappingService; - - private volatile MasterMappingUpdater masterMappingUpdater; + private IndicesAdminClient client; private volatile TimeValue dynamicMappingUpdateTimeout; class ApplySettings implements NodeSettingsService.Listener { @@ -89,44 +59,58 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationActionof())); - final CompressedString mappingSource = new CompressedString(builder.endObject().bytes()); - masterMappingUpdater.add(new MappingChange(index, indexUUID, type, mappingSource, listener)); - } catch (IOException bogus) { - throw new AssertionError("Cannot happen", bogus); + return client.preparePutMapping(index).setType(type).setSource(mappingUpdate.toString()) + .setMasterNodeTimeout(timeout).setTimeout(timeout); + } + + public void updateMappingOnMaster(String index, String type, Mapping mappingUpdate, final TimeValue timeout, final MappingUpdateListener listener) { + final PutMappingRequestBuilder request = updateMappingRequest(index, type, mappingUpdate, timeout); + if (listener == null) { + request.execute(); + } else { + final ActionListener actionListener = new ActionListener() { + @Override + public void onResponse(PutMappingResponse response) { + if (response.isAcknowledged()) { + listener.onMappingUpdate(); + } else { + listener.onFailure(new TimeoutException("Failed to acknowledge the mapping response within [" + timeout + "]")); + } + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }; + request.execute(actionListener); } } + public void updateMappingOnMasterAsynchronously(String index, String type, Mapping mappingUpdate) throws Throwable { + updateMappingOnMaster(index, type, mappingUpdate, dynamicMappingUpdateTimeout, null); + } + /** * Same as {@link #updateMappingOnMasterSynchronously(String, String, String, Mapping, TimeValue)} * using the default timeout. */ - public void updateMappingOnMasterSynchronously(String index, String indexUUID, String type, Mapping mappingUpdate) throws Throwable { - updateMappingOnMasterSynchronously(index, indexUUID, type, mappingUpdate, dynamicMappingUpdateTimeout); + public void updateMappingOnMasterSynchronously(String index, String type, Mapping mappingUpdate) throws Throwable { + updateMappingOnMasterSynchronously(index, type, mappingUpdate, dynamicMappingUpdateTimeout); } /** @@ -134,179 +118,9 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction listener) throws ElasticsearchException { - metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), request.nodeId, new ActionListener() { - @Override - public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(new MappingUpdatedResponse()); - } - - @Override - public void onFailure(Throwable t) { - logger.warn("[{}] update-mapping [{}] failed to dynamically update the mapping in cluster_state from shard", t, request.index(), request.type()); - listener.onFailure(t); - } - }); - } - - public static class MappingUpdatedResponse extends ActionResponse { - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - } - } - - public static class MappingUpdatedRequest extends MasterNodeOperationRequest implements IndicesRequest { - - private String index; - private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE; - private String type; - private CompressedString mappingSource; - private String nodeId = null; // null means not set - - MappingUpdatedRequest() { - } - - public MappingUpdatedRequest(String index, String indexUUID, String type, CompressedString mappingSource, String nodeId) { - this.index = index; - this.indexUUID = indexUUID; - this.type = type; - this.mappingSource = mappingSource; - this.nodeId = nodeId; - } - - public String index() { - return index; - } - - @Override - public IndicesOptions indicesOptions() { - return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); - } - - @Override - public String[] indices() { - return new String[]{index}; - } - - public String indexUUID() { - return indexUUID; - } - - public String type() { - return type; - } - - public CompressedString mappingSource() { - return mappingSource; - } - - /** - * Returns null for not set. - */ - public String nodeId() { - return this.nodeId; - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - index = in.readString(); - type = in.readString(); - mappingSource = CompressedString.readCompressedString(in); - indexUUID = in.readString(); - nodeId = in.readOptionalString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(index); - out.writeString(type); - mappingSource.writeTo(out); - out.writeString(indexUUID); - out.writeOptionalString(nodeId); - } - - @Override - public String toString() { - return "index [" + index + "], indexUUID [" + indexUUID + "], type [" + type + "] and source [" + mappingSource + "]"; - } - } - - private static class MappingChange { - public final String index; - public final String indexUUID; - public final String type; - public final CompressedString mappingSource; - public final MappingUpdateListener listener; - - MappingChange(String index, String indexUUID, String type, CompressedString mappingSource, MappingUpdateListener listener) { - this.index = index; - this.indexUUID = indexUUID; - this.type = type; - this.mappingSource = mappingSource; - this.listener = listener; + public void updateMappingOnMasterSynchronously(String index, String type, Mapping mappingUpdate, TimeValue timeout) throws Throwable { + if (updateMappingRequest(index, type, mappingUpdate, timeout).get().isAcknowledged() == false) { + throw new TimeoutException("Failed to acknowledge mapping update within [" + timeout + "]"); } } @@ -319,90 +133,4 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction - * It also allows to reduce multiple mapping updates on the same index(UUID) and type into one update - * (refreshSource + sending to master), which allows to offload the number of times mappings are updated - * and sent to master for heavy single index requests that each introduce a new mapping, and when - * multiple shards exists on the same nodes, allowing to work on the index level in this case. - */ - private class MasterMappingUpdater extends Thread { - - private volatile boolean running = true; - private final BlockingQueue queue = ConcurrentCollections.newBlockingQueue(); - - public MasterMappingUpdater(String name) { - super(name); - } - - public void add(MappingChange change) { - queue.add(change); - } - - public void close() { - running = false; - this.interrupt(); - } - - @Override - public void run() { - while (running) { - MappingUpdateListener listener = null; - try { - final MappingChange change = queue.poll(10, TimeUnit.MINUTES); - if (change == null) { - continue; - } - listener = change.listener; - - final MappingUpdatedAction.MappingUpdatedRequest mappingRequest; - try { - DiscoveryNode node = clusterService.localNode(); - mappingRequest = new MappingUpdatedAction.MappingUpdatedRequest( - change.index, change.indexUUID, change.type, change.mappingSource, node != null ? node.id() : null - ); - } catch (Throwable t) { - logger.warn("Failed to update master on updated mapping for index [" + change.index + "], type [" + change.type + "]", t); - if (change.listener != null) { - change.listener.onFailure(t); - } - continue; - } - logger.trace("sending mapping updated to master: {}", mappingRequest); - execute(mappingRequest, new ActionListener() { - @Override - public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) { - logger.debug("successfully updated master with mapping update: {}", mappingRequest); - if (change.listener != null) { - change.listener.onMappingUpdate(); - } - } - - @Override - public void onFailure(Throwable e) { - logger.warn("failed to update master on updated mapping for {}", e, mappingRequest); - if (change.listener != null) { - change.listener.onFailure(e); - } - } - }); - } catch (Throwable t) { - if (listener != null) { - // even if the failure is expected, eg. if we got interrupted, - // we need to notify the listener as there might be a latch - // waiting for it to be called - listener.onFailure(t); - } - if (t instanceof InterruptedException && !running) { - // all is well, we are shutting down - } else { - logger.warn("failed to process mapping update", t); - } - } - } - } - } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 690dcceb534..1749e6e271d 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -331,47 +331,6 @@ public class MetaDataMappingService extends AbstractComponent { }); } - public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final String nodeId, final ActionListener listener) { - final long insertOrder; - synchronized (refreshOrUpdateMutex) { - insertOrder = ++refreshOrUpdateInsertOrder; - refreshOrUpdateQueue.add(new UpdateTask(index, indexUUID, type, mappingSource, nodeId, listener)); - } - clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "] / node [" + nodeId + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() { - private volatile List allTasks; - - @Override - public void onFailure(String source, Throwable t) { - listener.onFailure(t); - } - - @Override - public ClusterState execute(final ClusterState currentState) throws Exception { - Tuple> tuple = executeRefreshOrUpdate(currentState, insertOrder); - this.allTasks = tuple.v2(); - return tuple.v1(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (allTasks == null) { - return; - } - for (Object task : allTasks) { - if (task instanceof UpdateTask) { - UpdateTask uTask = (UpdateTask) task; - ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true); - try { - uTask.listener.onResponse(response); - } catch (Throwable t) { - logger.debug("failed to ping back on response of mapping processing for task [{}]", t, uTask.listener); - } - } - } - } - }); - } - public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener listener) { clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask(request, listener) { diff --git a/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java index 05a38b138d4..1cbfaab0672 100644 --- a/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java @@ -165,7 +165,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl private void validateMappingUpdate(final String type, Mapping update) { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference error = new AtomicReference<>(); - mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), indexService.indexUUID(), type, update, new MappingUpdatedAction.MappingUpdateListener() { + mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), type, update, waitForMappingUpdatePostRecovery, new MappingUpdatedAction.MappingUpdateListener() { @Override public void onMappingUpdate() { latch.countDown(); diff --git a/src/main/java/org/elasticsearch/index/termvectors/ShardTermVectorsService.java b/src/main/java/org/elasticsearch/index/termvectors/ShardTermVectorsService.java index 6d60d21b1fe..9ca66a65ec7 100644 --- a/src/main/java/org/elasticsearch/index/termvectors/ShardTermVectorsService.java +++ b/src/main/java/org/elasticsearch/index/termvectors/ShardTermVectorsService.java @@ -20,7 +20,12 @@ package org.elasticsearch.index.termvectors; import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.index.*; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.MultiFields; +import org.apache.lucene.index.Term; +import org.apache.lucene.index.Terms; import org.apache.lucene.index.memory.MemoryIndex; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.termvectors.TermVectorsFilter; @@ -40,18 +45,30 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.get.GetField; import org.elasticsearch.index.get.GetResult; -import org.elasticsearch.index.mapper.*; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.FieldMapper; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.ParseContext; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.core.StringFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.dfs.AggregatedDfs; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; import static org.elasticsearch.index.mapper.SourceToParse.source; @@ -285,7 +302,6 @@ public class ShardTermVectorsService extends AbstractIndexShardComponent { private ParsedDocument parseDocument(String index, String type, BytesReference doc) throws Throwable { MapperService mapperService = indexShard.mapperService(); - IndexService indexService = indexShard.indexService(); // TODO: make parsing not dynamically create fields not in the original mapping Tuple docMapper = mapperService.documentMapperWithAutoCreate(type); @@ -294,7 +310,7 @@ public class ShardTermVectorsService extends AbstractIndexShardComponent { parsedDocument.addDynamicMappingsUpdate(docMapper.v2()); } if (parsedDocument.dynamicMappingsUpdate() != null) { - mappingUpdatedAction.updateMappingOnMasterSynchronously(index, indexService.indexUUID(), type, parsedDocument.dynamicMappingsUpdate()); + mappingUpdatedAction.updateMappingOnMasterSynchronously(index, type, parsedDocument.dynamicMappingsUpdate()); } return parsedDocument; } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 54e11c55556..50fd53a0f98 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -567,7 +567,7 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { } }; for (DocumentMapper documentMapper : documentMappersToUpdate) { - mappingUpdatedAction.updateMappingOnMaster(indexService.index().getName(), indexService.indexUUID(), documentMapper.type(), documentMapper.mapping(), listener); + mappingUpdatedAction.updateMappingOnMaster(indexService.index().getName(), documentMapper.type(), documentMapper.mapping(), recoverySettings.internalActionTimeout(), listener); } cancellableThreads.execute(new Interruptable() { @Override diff --git a/src/main/java/org/elasticsearch/node/Node.java b/src/main/java/org/elasticsearch/node/Node.java index bed0eb2a8df..bf3a81487b8 100644 --- a/src/main/java/org/elasticsearch/node/Node.java +++ b/src/main/java/org/elasticsearch/node/Node.java @@ -242,7 +242,7 @@ public class Node implements Releasable { injector.getInstance(plugin).start(); } - injector.getInstance(MappingUpdatedAction.class).start(); + injector.getInstance(MappingUpdatedAction.class).setClient(client); injector.getInstance(IndicesService.class).start(); injector.getInstance(IndexingMemoryController.class).start(); injector.getInstance(IndicesClusterStateService.class).start(); @@ -285,7 +285,6 @@ public class Node implements Releasable { injector.getInstance(HttpServer.class).stop(); } - injector.getInstance(MappingUpdatedAction.class).stop(); injector.getInstance(RiversManager.class).stop(); injector.getInstance(SnapshotsService.class).stop(); diff --git a/src/main/java/org/elasticsearch/percolator/PercolatorService.java b/src/main/java/org/elasticsearch/percolator/PercolatorService.java index da26759d7c1..6723372f1c8 100644 --- a/src/main/java/org/elasticsearch/percolator/PercolatorService.java +++ b/src/main/java/org/elasticsearch/percolator/PercolatorService.java @@ -287,7 +287,7 @@ public class PercolatorService extends AbstractComponent { doc.addDynamicMappingsUpdate(docMapper.v2()); } if (doc.dynamicMappingsUpdate() != null) { - mappingUpdatedAction.updateMappingOnMasterSynchronously(request.shardId().getIndex(), documentIndexService.indexUUID(), request.documentType(), doc.dynamicMappingsUpdate()); + mappingUpdatedAction.updateMappingOnMasterSynchronously(request.shardId().getIndex(), request.documentType(), doc.dynamicMappingsUpdate()); } // the document parsing exists the "doc" object, so we need to set the new current field. currentFieldName = parser.currentName(); diff --git a/src/test/java/org/elasticsearch/indices/mapping/ConcurrentDynamicTemplateTests.java b/src/test/java/org/elasticsearch/indices/mapping/ConcurrentDynamicTemplateTests.java index 50bbd8e9e2d..28bcde323d2 100644 --- a/src/test/java/org/elasticsearch/indices/mapping/ConcurrentDynamicTemplateTests.java +++ b/src/test/java/org/elasticsearch/indices/mapping/ConcurrentDynamicTemplateTests.java @@ -46,7 +46,6 @@ public class ConcurrentDynamicTemplateTests extends ElasticsearchIntegrationTest private final String mappingType = "test-mapping"; @Test // see #3544 - @AwaitsFix(bugUrl = "adrien is looking into this") public void testConcurrentDynamicMapping() throws Exception { final String fieldName = "field"; final String mapping = "{ \"" + mappingType + "\": {" +