diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
index 245d7d16033..9e354e82836 100644
--- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
+++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
@@ -52,6 +52,7 @@ import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
+import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
@@ -352,23 +353,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
- private void applyMappingUpdate(IndexService indexService, String type, Mapping update) throws Throwable {
- // HACK: Rivers seem to have something specific that triggers potential
- // deadlocks when doing concurrent indexing. So for now they keep the
- // old behaviour of updating mappings locally first and then
- // asynchronously notifying the master
- // this can go away when rivers are removed
- final String indexName = indexService.index().name();
- final String indexUUID = indexService.indexUUID();
- if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
- indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
- mappingUpdatedAction.updateMappingOnMaster(indexName, indexUUID, type, update, null);
- } else {
- mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexUUID, type, update);
- indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
- }
- }
-
private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
IndexShard indexShard, IndexService indexService, boolean processed) throws Throwable {
@@ -392,20 +376,54 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
Engine.IndexingOperation op;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
- if (index.parsedDoc().dynamicMappingsUpdate() != null) {
- applyMappingUpdate(indexService, indexRequest.type(), index.parsedDoc().dynamicMappingsUpdate());
+ Mapping update = index.parsedDoc().dynamicMappingsUpdate();
+ if (update != null) {
+ final String indexName = indexService.index().name();
+ if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
+ // With rivers, we have a chicken and egg problem if indexing
+ // the _meta document triggers a mapping update. Because we would
+ // like to validate the mapping update first, but on the other
+ // hand putting the mapping would start the river, which expects
+ // to find a _meta document
+ // So we have no choice but to index first and send mappings afterwards
+ MapperService mapperService = indexService.mapperService();
+ mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
+ indexShard.index(index);
+ mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
+ } else {
+ mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
+ indexShard.index(index);
+ }
+ } else {
+ indexShard.index(index);
}
- indexShard.index(index);
version = index.version();
op = index;
created = index.created();
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY,
request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
- if (create.parsedDoc().dynamicMappingsUpdate() != null) {
- applyMappingUpdate(indexService, indexRequest.type(), create.parsedDoc().dynamicMappingsUpdate());
+ Mapping update = create.parsedDoc().dynamicMappingsUpdate();
+ if (update != null) {
+ final String indexName = indexService.index().name();
+ if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
+ // With rivers, we have a chicken and egg problem if indexing
+ // the _meta document triggers a mapping update. Because we would
+ // like to validate the mapping update first, but on the other
+ // hand putting the mapping would start the river, which expects
+ // to find a _meta document
+ // So we have no choice but to index first and send mappings afterwards
+ MapperService mapperService = indexService.mapperService();
+ mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
+ indexShard.create(create);
+ mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
+ } else {
+ mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
+ indexShard.create(create);
+ }
+ } else {
+ indexShard.create(create);
}
- indexShard.create(create);
version = create.version();
op = create;
created = true;
@@ -528,8 +546,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
@Override
- protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
- IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
+ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception {
+ IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
+ IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
final BulkShardRequest request = shardRequest.request;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
@@ -544,11 +563,29 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
+ if (index.parsedDoc().dynamicMappingsUpdate() != null) {
+ if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
+ // mappings updates on the _river are not validated synchronously so we can't
+ // assume they are here when indexing on a replica
+ indexService.mapperService().merge(indexRequest.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
+ } else {
+ throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]");
+ }
+ }
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
indexRequest.version(), indexRequest.versionType(),
Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
+ if (create.parsedDoc().dynamicMappingsUpdate() != null) {
+ if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
+ // mappings updates on the _river are not validated synchronously so we can't
+ // assume they are here when indexing on a replica
+ indexService.mapperService().merge(indexRequest.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
+ } else {
+ throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]");
+ }
+ }
indexShard.create(create);
}
} catch (Throwable e) {
diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java
index 79ea496c317..494f70708cb 100644
--- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java
+++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java
@@ -19,6 +19,7 @@
package org.elasticsearch.action.index;
+import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
@@ -42,6 +43,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
@@ -51,6 +53,8 @@ import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
+import java.io.IOException;
+
/**
* Performs the index operation.
*
@@ -167,23 +171,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
.indexShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing());
}
- private void applyMappingUpdate(IndexService indexService, String type, Mapping update) throws Throwable {
- // HACK: Rivers seem to have something specific that triggers potential
- // deadlocks when doing concurrent indexing. So for now they keep the
- // old behaviour of updating mappings locally first and then
- // asynchronously notifying the master
- // this can go away when rivers are removed
- final String indexName = indexService.index().name();
- final String indexUUID = indexService.indexUUID();
- if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
- indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
- mappingUpdatedAction.updateMappingOnMaster(indexName, indexUUID, type, update, null);
- } else {
- mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexUUID, type, update);
- indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
- }
- }
-
@Override
protected Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
final IndexRequest request = shardRequest.request;
@@ -206,19 +193,53 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
- if (index.parsedDoc().dynamicMappingsUpdate() != null) {
- applyMappingUpdate(indexService, request.type(), index.parsedDoc().dynamicMappingsUpdate());
+ Mapping update = index.parsedDoc().dynamicMappingsUpdate();
+ if (update != null) {
+ final String indexName = indexService.index().name();
+ if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
+ // With rivers, we have a chicken and egg problem if indexing
+ // the _meta document triggers a mapping update. Because we would
+ // like to validate the mapping update first, but on the other
+ // hand putting the mapping would start the river, which expects
+ // to find a _meta document
+ // So we have no choice but to index first and send mappings afterwards
+ MapperService mapperService = indexService.mapperService();
+ mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
+ indexShard.index(index);
+ mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
+ } else {
+ mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
+ indexShard.index(index);
+ }
+ } else {
+ indexShard.index(index);
}
- indexShard.index(index);
version = index.version();
created = index.created();
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
- if (create.parsedDoc().dynamicMappingsUpdate() != null) {
- applyMappingUpdate(indexService, request.type(), create.parsedDoc().dynamicMappingsUpdate());
+ Mapping update = create.parsedDoc().dynamicMappingsUpdate();
+ if (update != null) {
+ final String indexName = indexService.index().name();
+ if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
+ // With rivers, we have a chicken and egg problem if indexing
+ // the _meta document triggers a mapping update. Because we would
+ // like to validate the mapping update first, but on the other
+ // hand putting the mapping would start the river, which expects
+ // to find a _meta document
+ // So we have no choice but to index first and send mappings afterwards
+ MapperService mapperService = indexService.mapperService();
+ mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
+ indexShard.create(create);
+ mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
+ } else {
+ mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
+ indexShard.create(create);
+ }
+ } else {
+ indexShard.create(create);
}
- indexShard.create(create);
version = create.version();
created = true;
}
@@ -239,17 +260,36 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
@Override
- protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
- IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
+ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws IOException {
+ IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
+ IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
IndexRequest request = shardRequest.request;
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
+ if (index.parsedDoc().dynamicMappingsUpdate() != null) {
+ if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
+ // mappings updates on the _river are not validated synchronously so we can't
+ // assume they are here when indexing on a replica
+ indexService.mapperService().merge(request.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
+ } else {
+ throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]");
+ }
+ }
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
+ if (create.parsedDoc().dynamicMappingsUpdate() != null) {
+ if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
+ // mappings updates on the _river are not validated synchronously so we can't
+ // assume they are here when indexing on a replica
+ indexService.mapperService().merge(request.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
+ } else {
+ throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]");
+ }
+ }
indexShard.create(create);
}
if (request.refresh()) {
diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java
index df99d045177..c5a0fc95efe 100644
--- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java
+++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java
@@ -117,7 +117,7 @@ public abstract class TransportShardReplicationOperationAction shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
- protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest);
+ protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception;
protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException;
diff --git a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java
index 6c5e92b3799..44727699354 100644
--- a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java
+++ b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java
@@ -19,61 +19,31 @@
package org.elasticsearch.cluster.action.index;
-import com.google.common.collect.ImmutableMap;
-
-import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ActionRequestValidationException;
-import org.elasticsearch.action.ActionResponse;
-import org.elasticsearch.action.IndicesRequest;
-import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
-import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
-import org.elasticsearch.cluster.ClusterService;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
-import org.elasticsearch.cluster.block.ClusterBlockException;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.cluster.metadata.MetaDataMappingService;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.compress.CompressedString;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.common.xcontent.ToXContent;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.node.settings.NodeSettingsService;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.TransportService;
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
* in the cluster state meta data (and broadcast to all members).
*/
-public class MappingUpdatedAction extends TransportMasterNodeOperationAction {
+public class MappingUpdatedAction extends AbstractComponent {
public static final String INDICES_MAPPING_DYNAMIC_TIMEOUT = "indices.mapping.dynamic_timeout";
- public static final String ACTION_NAME = "internal:cluster/mapping_updated";
-
- private final MetaDataMappingService metaDataMappingService;
-
- private volatile MasterMappingUpdater masterMappingUpdater;
+ private IndicesAdminClient client;
private volatile TimeValue dynamicMappingUpdateTimeout;
class ApplySettings implements NodeSettingsService.Listener {
@@ -89,44 +59,58 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction