Mappings: simplify dynamic mappings updates.

While dynamic mappings updates are using the same code path as updates from the
API when applied on a data node since #10593, they were still using a different
code path on the master node. This commit makes dynamic updates processed the
same way as updates from the API, which also seems to do a better way at
acknowledgements (I could not reproduce the ConcurrentDynamicTemplateTests
failure anymore). It also adds more checks, like for instance that indexing on
replicas should not trigger dynamic mapping updates since they should have been
handled on the primary before.

Close #10720
This commit is contained in:
Adrien Grand 2015-04-21 15:30:35 +02:00
parent dbeb4aaacf
commit c6cdf7781b
11 changed files with 201 additions and 423 deletions

View File

@ -52,6 +52,7 @@ import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
@ -352,23 +353,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
private void applyMappingUpdate(IndexService indexService, String type, Mapping update) throws Throwable {
// HACK: Rivers seem to have something specific that triggers potential
// deadlocks when doing concurrent indexing. So for now they keep the
// old behaviour of updating mappings locally first and then
// asynchronously notifying the master
// this can go away when rivers are removed
final String indexName = indexService.index().name();
final String indexUUID = indexService.indexUUID();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
mappingUpdatedAction.updateMappingOnMaster(indexName, indexUUID, type, update, null);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexUUID, type, update);
indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
}
}
private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
IndexShard indexShard, IndexService indexService, boolean processed) throws Throwable {
@ -392,20 +376,54 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
Engine.IndexingOperation op;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
applyMappingUpdate(indexService, indexRequest.type(), index.parsedDoc().dynamicMappingsUpdate());
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
indexShard.index(index);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
indexShard.index(index);
}
} else {
indexShard.index(index);
}
indexShard.index(index);
version = index.version();
op = index;
created = index.created();
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY,
request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
applyMappingUpdate(indexService, indexRequest.type(), create.parsedDoc().dynamicMappingsUpdate());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
indexShard.create(create);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
indexShard.create(create);
}
} else {
indexShard.create(create);
}
indexShard.create(create);
version = create.version();
op = create;
created = true;
@ -528,8 +546,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception {
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
final BulkShardRequest request = shardRequest.request;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
@ -544,11 +563,29 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(indexRequest.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]");
}
}
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
indexRequest.version(), indexRequest.versionType(),
Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(indexRequest.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]");
}
}
indexShard.create(create);
}
} catch (Throwable e) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.index;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
@ -42,6 +43,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
@ -51,6 +53,8 @@ import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
/**
* Performs the index operation.
* <p/>
@ -167,23 +171,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
.indexShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing());
}
private void applyMappingUpdate(IndexService indexService, String type, Mapping update) throws Throwable {
// HACK: Rivers seem to have something specific that triggers potential
// deadlocks when doing concurrent indexing. So for now they keep the
// old behaviour of updating mappings locally first and then
// asynchronously notifying the master
// this can go away when rivers are removed
final String indexName = indexService.index().name();
final String indexUUID = indexService.indexUUID();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
mappingUpdatedAction.updateMappingOnMaster(indexName, indexUUID, type, update, null);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexUUID, type, update);
indexService.mapperService().merge(type, new CompressedString(update.toBytes()), true);
}
}
@Override
protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
final IndexRequest request = shardRequest.request;
@ -206,19 +193,53 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
applyMappingUpdate(indexService, request.type(), index.parsedDoc().dynamicMappingsUpdate());
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
indexShard.index(index);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
indexShard.index(index);
}
} else {
indexShard.index(index);
}
indexShard.index(index);
version = index.version();
created = index.created();
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
applyMappingUpdate(indexService, request.type(), create.parsedDoc().dynamicMappingsUpdate());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
indexShard.create(create);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
indexShard.create(create);
}
} else {
indexShard.create(create);
}
indexShard.create(create);
version = create.version();
created = true;
}
@ -239,17 +260,36 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
IndexRequest request = shardRequest.request;
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
if (index.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(request.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]");
}
}
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
if (create.parsedDoc().dynamicMappingsUpdate() != null) {
if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) {
// mappings updates on the _river are not validated synchronously so we can't
// assume they are here when indexing on a replica
indexService.mapperService().merge(request.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true);
} else {
throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]");
}
}
indexShard.create(create);
}
if (request.refresh()) {

View File

@ -117,7 +117,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
*/
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest);
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception;
protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException;

View File

@ -19,61 +19,31 @@
package org.elasticsearch.cluster.action.index;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
* in the cluster state meta data (and broadcast to all members).
*/
public class MappingUpdatedAction extends TransportMasterNodeOperationAction<MappingUpdatedAction.MappingUpdatedRequest, MappingUpdatedAction.MappingUpdatedResponse> {
public class MappingUpdatedAction extends AbstractComponent {
public static final String INDICES_MAPPING_DYNAMIC_TIMEOUT = "indices.mapping.dynamic_timeout";
public static final String ACTION_NAME = "internal:cluster/mapping_updated";
private final MetaDataMappingService metaDataMappingService;
private volatile MasterMappingUpdater masterMappingUpdater;
private IndicesAdminClient client;
private volatile TimeValue dynamicMappingUpdateTimeout;
class ApplySettings implements NodeSettingsService.Listener {
@ -89,44 +59,58 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
}
@Inject
public MappingUpdatedAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
MetaDataMappingService metaDataMappingService, NodeSettingsService nodeSettingsService, ActionFilters actionFilters) {
super(settings, ACTION_NAME, transportService, clusterService, threadPool, actionFilters);
this.metaDataMappingService = metaDataMappingService;
public MappingUpdatedAction(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
this.dynamicMappingUpdateTimeout = settings.getAsTime(INDICES_MAPPING_DYNAMIC_TIMEOUT, TimeValue.timeValueSeconds(30));
nodeSettingsService.addListener(new ApplySettings());
}
public void start() {
this.masterMappingUpdater = new MasterMappingUpdater(EsExecutors.threadName(settings, "master_mapping_updater"));
this.masterMappingUpdater.start();
public void setClient(Client client) {
this.client = client.admin().indices();
}
public void stop() {
this.masterMappingUpdater.close();
this.masterMappingUpdater = null;
}
public void updateMappingOnMaster(String index, String indexUUID, String type, Mapping mappingUpdate, MappingUpdateListener listener) {
private PutMappingRequestBuilder updateMappingRequest(String index, String type, Mapping mappingUpdate, final TimeValue timeout) {
if (type.equals(MapperService.DEFAULT_MAPPING)) {
throw new ElasticsearchIllegalArgumentException("_default_ mapping should not be updated");
}
try {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
mappingUpdate.toXContent(builder, new ToXContent.MapParams(ImmutableMap.<String, String>of()));
final CompressedString mappingSource = new CompressedString(builder.endObject().bytes());
masterMappingUpdater.add(new MappingChange(index, indexUUID, type, mappingSource, listener));
} catch (IOException bogus) {
throw new AssertionError("Cannot happen", bogus);
return client.preparePutMapping(index).setType(type).setSource(mappingUpdate.toString())
.setMasterNodeTimeout(timeout).setTimeout(timeout);
}
public void updateMappingOnMaster(String index, String type, Mapping mappingUpdate, final TimeValue timeout, final MappingUpdateListener listener) {
final PutMappingRequestBuilder request = updateMappingRequest(index, type, mappingUpdate, timeout);
if (listener == null) {
request.execute();
} else {
final ActionListener<PutMappingResponse> actionListener = new ActionListener<PutMappingResponse>() {
@Override
public void onResponse(PutMappingResponse response) {
if (response.isAcknowledged()) {
listener.onMappingUpdate();
} else {
listener.onFailure(new TimeoutException("Failed to acknowledge the mapping response within [" + timeout + "]"));
}
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
};
request.execute(actionListener);
}
}
public void updateMappingOnMasterAsynchronously(String index, String type, Mapping mappingUpdate) throws Throwable {
updateMappingOnMaster(index, type, mappingUpdate, dynamicMappingUpdateTimeout, null);
}
/**
* Same as {@link #updateMappingOnMasterSynchronously(String, String, String, Mapping, TimeValue)}
* using the default timeout.
*/
public void updateMappingOnMasterSynchronously(String index, String indexUUID, String type, Mapping mappingUpdate) throws Throwable {
updateMappingOnMasterSynchronously(index, indexUUID, type, mappingUpdate, dynamicMappingUpdateTimeout);
public void updateMappingOnMasterSynchronously(String index, String type, Mapping mappingUpdate) throws Throwable {
updateMappingOnMasterSynchronously(index, type, mappingUpdate, dynamicMappingUpdateTimeout);
}
/**
@ -134,179 +118,9 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
* {@code timeout}. When this method returns successfully mappings have
* been applied to the master node and propagated to data nodes.
*/
public void updateMappingOnMasterSynchronously(String index, String indexUUID, String type, Mapping mappingUpdate, TimeValue timeout) throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
final Throwable[] cause = new Throwable[1];
final MappingUpdateListener listener = new MappingUpdateListener() {
@Override
public void onMappingUpdate() {
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
cause[0] = t;
latch.countDown();
}
};
updateMappingOnMaster(index, indexUUID, type, mappingUpdate, listener);
if (!latch.await(timeout.getMillis(), TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Time out while waiting for the master node to validate a mapping update for type [" + type + "]");
}
if (cause[0] != null) {
throw cause[0];
}
}
@Override
protected ClusterBlockException checkBlock(MappingUpdatedRequest request, ClusterState state) {
// internal call by other nodes, no need to check for blocks
return null;
}
@Override
protected String executor() {
// we go async right away
return ThreadPool.Names.SAME;
}
@Override
protected MappingUpdatedRequest newRequest() {
return new MappingUpdatedRequest();
}
@Override
protected MappingUpdatedResponse newResponse() {
return new MappingUpdatedResponse();
}
@Override
protected void masterOperation(final MappingUpdatedRequest request, final ClusterState state, final ActionListener<MappingUpdatedResponse> listener) throws ElasticsearchException {
metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), request.nodeId, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new MappingUpdatedResponse());
}
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] update-mapping [{}] failed to dynamically update the mapping in cluster_state from shard", t, request.index(), request.type());
listener.onFailure(t);
}
});
}
public static class MappingUpdatedResponse extends ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
public static class MappingUpdatedRequest extends MasterNodeOperationRequest<MappingUpdatedRequest> implements IndicesRequest {
private String index;
private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
private String type;
private CompressedString mappingSource;
private String nodeId = null; // null means not set
MappingUpdatedRequest() {
}
public MappingUpdatedRequest(String index, String indexUUID, String type, CompressedString mappingSource, String nodeId) {
this.index = index;
this.indexUUID = indexUUID;
this.type = type;
this.mappingSource = mappingSource;
this.nodeId = nodeId;
}
public String index() {
return index;
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
@Override
public String[] indices() {
return new String[]{index};
}
public String indexUUID() {
return indexUUID;
}
public String type() {
return type;
}
public CompressedString mappingSource() {
return mappingSource;
}
/**
* Returns null for not set.
*/
public String nodeId() {
return this.nodeId;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
type = in.readString();
mappingSource = CompressedString.readCompressedString(in);
indexUUID = in.readString();
nodeId = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeString(type);
mappingSource.writeTo(out);
out.writeString(indexUUID);
out.writeOptionalString(nodeId);
}
@Override
public String toString() {
return "index [" + index + "], indexUUID [" + indexUUID + "], type [" + type + "] and source [" + mappingSource + "]";
}
}
private static class MappingChange {
public final String index;
public final String indexUUID;
public final String type;
public final CompressedString mappingSource;
public final MappingUpdateListener listener;
MappingChange(String index, String indexUUID, String type, CompressedString mappingSource, MappingUpdateListener listener) {
this.index = index;
this.indexUUID = indexUUID;
this.type = type;
this.mappingSource = mappingSource;
this.listener = listener;
public void updateMappingOnMasterSynchronously(String index, String type, Mapping mappingUpdate, TimeValue timeout) throws Throwable {
if (updateMappingRequest(index, type, mappingUpdate, timeout).get().isAcknowledged() == false) {
throw new TimeoutException("Failed to acknowledge mapping update within [" + timeout + "]");
}
}
@ -319,90 +133,4 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
void onFailure(Throwable t);
}
/**
* The master mapping updater removes the overhead of refreshing the mapping (refreshSource) on the
* indexing thread.
* <p/>
* It also allows to reduce multiple mapping updates on the same index(UUID) and type into one update
* (refreshSource + sending to master), which allows to offload the number of times mappings are updated
* and sent to master for heavy single index requests that each introduce a new mapping, and when
* multiple shards exists on the same nodes, allowing to work on the index level in this case.
*/
private class MasterMappingUpdater extends Thread {
private volatile boolean running = true;
private final BlockingQueue<MappingChange> queue = ConcurrentCollections.newBlockingQueue();
public MasterMappingUpdater(String name) {
super(name);
}
public void add(MappingChange change) {
queue.add(change);
}
public void close() {
running = false;
this.interrupt();
}
@Override
public void run() {
while (running) {
MappingUpdateListener listener = null;
try {
final MappingChange change = queue.poll(10, TimeUnit.MINUTES);
if (change == null) {
continue;
}
listener = change.listener;
final MappingUpdatedAction.MappingUpdatedRequest mappingRequest;
try {
DiscoveryNode node = clusterService.localNode();
mappingRequest = new MappingUpdatedAction.MappingUpdatedRequest(
change.index, change.indexUUID, change.type, change.mappingSource, node != null ? node.id() : null
);
} catch (Throwable t) {
logger.warn("Failed to update master on updated mapping for index [" + change.index + "], type [" + change.type + "]", t);
if (change.listener != null) {
change.listener.onFailure(t);
}
continue;
}
logger.trace("sending mapping updated to master: {}", mappingRequest);
execute(mappingRequest, new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override
public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
logger.debug("successfully updated master with mapping update: {}", mappingRequest);
if (change.listener != null) {
change.listener.onMappingUpdate();
}
}
@Override
public void onFailure(Throwable e) {
logger.warn("failed to update master on updated mapping for {}", e, mappingRequest);
if (change.listener != null) {
change.listener.onFailure(e);
}
}
});
} catch (Throwable t) {
if (listener != null) {
// even if the failure is expected, eg. if we got interrupted,
// we need to notify the listener as there might be a latch
// waiting for it to be called
listener.onFailure(t);
}
if (t instanceof InterruptedException && !running) {
// all is well, we are shutting down
} else {
logger.warn("failed to process mapping update", t);
}
}
}
}
}
}

View File

@ -331,47 +331,6 @@ public class MetaDataMappingService extends AbstractComponent {
});
}
public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final String nodeId, final ActionListener<ClusterStateUpdateResponse> listener) {
final long insertOrder;
synchronized (refreshOrUpdateMutex) {
insertOrder = ++refreshOrUpdateInsertOrder;
refreshOrUpdateQueue.add(new UpdateTask(index, indexUUID, type, mappingSource, nodeId, listener));
}
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "] / node [" + nodeId + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
private volatile List<MappingTask> allTasks;
@Override
public void onFailure(String source, Throwable t) {
listener.onFailure(t);
}
@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
Tuple<ClusterState, List<MappingTask>> tuple = executeRefreshOrUpdate(currentState, insertOrder);
this.allTasks = tuple.v2();
return tuple.v1();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (allTasks == null) {
return;
}
for (Object task : allTasks) {
if (task instanceof UpdateTask) {
UpdateTask uTask = (UpdateTask) task;
ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true);
try {
uTask.listener.onResponse(response);
} catch (Throwable t) {
logger.debug("failed to ping back on response of mapping processing for task [{}]", t, uTask.listener);
}
}
}
}
});
}
public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {

View File

@ -165,7 +165,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
private void validateMappingUpdate(final String type, Mapping update) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> error = new AtomicReference<>();
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), indexService.indexUUID(), type, update, new MappingUpdatedAction.MappingUpdateListener() {
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), type, update, waitForMappingUpdatePostRecovery, new MappingUpdatedAction.MappingUpdateListener() {
@Override
public void onMappingUpdate() {
latch.countDown();

View File

@ -20,7 +20,12 @@
package org.elasticsearch.index.termvectors;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.*;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.MultiFields;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.memory.MemoryIndex;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.termvectors.TermVectorsFilter;
@ -40,18 +45,30 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetField;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.core.StringFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.dfs.AggregatedDfs;
import java.io.IOException;
import java.util.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import static org.elasticsearch.index.mapper.SourceToParse.source;
@ -285,7 +302,6 @@ public class ShardTermVectorsService extends AbstractIndexShardComponent {
private ParsedDocument parseDocument(String index, String type, BytesReference doc) throws Throwable {
MapperService mapperService = indexShard.mapperService();
IndexService indexService = indexShard.indexService();
// TODO: make parsing not dynamically create fields not in the original mapping
Tuple<DocumentMapper, Mapping> docMapper = mapperService.documentMapperWithAutoCreate(type);
@ -294,7 +310,7 @@ public class ShardTermVectorsService extends AbstractIndexShardComponent {
parsedDocument.addDynamicMappingsUpdate(docMapper.v2());
}
if (parsedDocument.dynamicMappingsUpdate() != null) {
mappingUpdatedAction.updateMappingOnMasterSynchronously(index, indexService.indexUUID(), type, parsedDocument.dynamicMappingsUpdate());
mappingUpdatedAction.updateMappingOnMasterSynchronously(index, type, parsedDocument.dynamicMappingsUpdate());
}
return parsedDocument;
}

View File

@ -567,7 +567,7 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
}
};
for (DocumentMapper documentMapper : documentMappersToUpdate) {
mappingUpdatedAction.updateMappingOnMaster(indexService.index().getName(), indexService.indexUUID(), documentMapper.type(), documentMapper.mapping(), listener);
mappingUpdatedAction.updateMappingOnMaster(indexService.index().getName(), documentMapper.type(), documentMapper.mapping(), recoverySettings.internalActionTimeout(), listener);
}
cancellableThreads.execute(new Interruptable() {
@Override

View File

@ -242,7 +242,7 @@ public class Node implements Releasable {
injector.getInstance(plugin).start();
}
injector.getInstance(MappingUpdatedAction.class).start();
injector.getInstance(MappingUpdatedAction.class).setClient(client);
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndexingMemoryController.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
@ -285,7 +285,6 @@ public class Node implements Releasable {
injector.getInstance(HttpServer.class).stop();
}
injector.getInstance(MappingUpdatedAction.class).stop();
injector.getInstance(RiversManager.class).stop();
injector.getInstance(SnapshotsService.class).stop();

View File

@ -287,7 +287,7 @@ public class PercolatorService extends AbstractComponent {
doc.addDynamicMappingsUpdate(docMapper.v2());
}
if (doc.dynamicMappingsUpdate() != null) {
mappingUpdatedAction.updateMappingOnMasterSynchronously(request.shardId().getIndex(), documentIndexService.indexUUID(), request.documentType(), doc.dynamicMappingsUpdate());
mappingUpdatedAction.updateMappingOnMasterSynchronously(request.shardId().getIndex(), request.documentType(), doc.dynamicMappingsUpdate());
}
// the document parsing exists the "doc" object, so we need to set the new current field.
currentFieldName = parser.currentName();

View File

@ -46,7 +46,6 @@ public class ConcurrentDynamicTemplateTests extends ElasticsearchIntegrationTest
private final String mappingType = "test-mapping";
@Test // see #3544
@AwaitsFix(bugUrl = "adrien is looking into this")
public void testConcurrentDynamicMapping() throws Exception {
final String fieldName = "field";
final String mapping = "{ \"" + mappingType + "\": {" +