From 65e05538f0ddae8549469ec0b59ee7611b39fdb0 Mon Sep 17 00:00:00 2001 From: kimchy Date: Fri, 22 Apr 2011 16:07:18 +0300 Subject: [PATCH] refresh changed mapping in cluster metadata, this will happen when upgrading from 0.15 to 0.16 --- .../elasticsearch/cluster/ClusterModule.java | 6 +- .../index/NodeMappingRefreshAction.java | 147 ++++++++++++++++++ .../metadata/MetaDataMappingService.java | 57 +++++++ .../cluster/IndicesClusterStateService.java | 26 +++- 4 files changed, 229 insertions(+), 7 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java index c8d43ac34c9..1f05b276d58 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -19,10 +19,7 @@ package org.elasticsearch.cluster; -import org.elasticsearch.cluster.action.index.MappingUpdatedAction; -import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; -import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; -import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; +import org.elasticsearch.cluster.action.index.*; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.*; import org.elasticsearch.cluster.routing.RoutingService; @@ -67,6 +64,7 @@ public class ClusterModule extends AbstractModule implements SpawnModules { bind(NodeIndexCreatedAction.class).asEagerSingleton(); bind(NodeIndexDeletedAction.class).asEagerSingleton(); bind(NodeMappingCreatedAction.class).asEagerSingleton(); + bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java new file mode 100644 index 00000000000..aaa591d5838 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java @@ -0,0 +1,147 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.action.index; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.MetaDataMappingService; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.VoidStreamable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BaseTransportRequestHandler; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.VoidTransportResponseHandler; + +import java.io.IOException; + +/** + * @author kimchy (Shay Banon) + */ +public class NodeMappingRefreshAction extends AbstractComponent { + + private final ThreadPool threadPool; + + private final TransportService transportService; + + private final ClusterService clusterService; + + private final MetaDataMappingService metaDataMappingService; + + @Inject public NodeMappingRefreshAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, + MetaDataMappingService metaDataMappingService) { + super(settings); + this.threadPool = threadPool; + this.transportService = transportService; + this.clusterService = clusterService; + this.metaDataMappingService = metaDataMappingService; + transportService.registerHandler(NodeMappingRefreshTransportHandler.ACTION, new NodeMappingRefreshTransportHandler()); + } + + public void nodeMappingRefresh(final NodeMappingRefreshRequest request) throws ElasticSearchException { + DiscoveryNodes nodes = clusterService.state().nodes(); + if (nodes.localNodeMaster()) { + threadPool.cached().execute(new Runnable() { + @Override public void run() { + innerMappingRefresh(request); + } + }); + } else { + transportService.sendRequest(clusterService.state().nodes().masterNode(), + NodeMappingRefreshTransportHandler.ACTION, request, VoidTransportResponseHandler.INSTANCE_SAME); + } + } + + private void innerMappingRefresh(NodeMappingRefreshRequest request) { + metaDataMappingService.refreshMapping(request.index(), request.types()); + } + + private class NodeMappingRefreshTransportHandler extends BaseTransportRequestHandler { + + static final String ACTION = "cluster/nodeMappingRefresh"; + + @Override public NodeMappingRefreshRequest newInstance() { + return new NodeMappingRefreshRequest(); + } + + @Override public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel) throws Exception { + innerMappingRefresh(request); + channel.sendResponse(VoidStreamable.INSTANCE); + } + + @Override public String executor() { + return ThreadPool.Names.SAME; + } + } + + public static class NodeMappingRefreshRequest implements Streamable { + + private String index; + + private String[] types; + + private String nodeId; + + private NodeMappingRefreshRequest() { + } + + public NodeMappingRefreshRequest(String index, String[] types, String nodeId) { + this.index = index; + this.types = types; + this.nodeId = nodeId; + } + + public String index() { + return index; + } + + public String[] types() { + return types; + } + + public String nodeId() { + return nodeId; + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(index); + out.writeVInt(types.length); + for (String type : types) { + out.writeUTF(type); + } + out.writeUTF(nodeId); + } + + @Override public void readFrom(StreamInput in) throws IOException { + index = in.readUTF(); + types = new String[in.readVInt()]; + for (int i = 0; i < types.length; i++) { + types[i] = in.readUTF(); + } + nodeId = in.readUTF(); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 8c12e0ea6ee..803902f9dcd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -21,9 +21,12 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.collect.Lists; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; @@ -34,11 +37,14 @@ import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MergeMappingException; import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidTypeNameException; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -66,6 +72,57 @@ public class MetaDataMappingService extends AbstractComponent { this.mappingCreatedAction = mappingCreatedAction; } + /** + * Refreshes mappings if they are not the same between original and parsed version + */ + public void refreshMapping(final String index, final String... types) { + clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + try { + // first, check if it really needs to be updated + final IndexMetaData indexMetaData = currentState.metaData().index(index); + if (indexMetaData == null) { + // index got delete on us, ignore... + return currentState; + } + + IndexService indexService = indicesService.indexService(index); + if (indexService == null) { + // we need to create the index here, and add the current mapping to it, so we can merge + indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id()); + for (String type : types) { + // only add the current relevant mapping (if exists) + if (indexMetaData.mappings().containsKey(type)) { + indexService.mapperService().add(type, indexMetaData.mappings().get(type).source().string()); + } + } + } + IndexMetaData.Builder indexMetaDataBuilder = newIndexMetaDataBuilder(indexMetaData); + List updatedTypes = Lists.newArrayList(); + for (String type : types) { + DocumentMapper mapper = indexService.mapperService().documentMapper(type); + if (!mapper.mappingSource().equals(indexMetaData.mappings().get(type).source())) { + updatedTypes.add(type); + indexMetaDataBuilder.putMapping(new MappingMetaData(mapper)); + } + } + + if (updatedTypes.isEmpty()) { + return currentState; + } + + logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes); + MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData()); + builder.put(indexMetaDataBuilder); + return newClusterStateBuilder().state(currentState).metaData(builder).build(); + } catch (Exception e) { + logger.warn("failed to dynamically refresh the mapping in cluster_state from shard", e); + return currentState; + } + } + }); + } + public void updateMapping(final String index, final String type, final CompressedString mappingSource, final Listener listener) { clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 64222145047..58f7d0d6bf4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -27,12 +27,14 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; +import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.CompressedString; @@ -60,6 +62,7 @@ import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -89,6 +92,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent, Boolean> seenMappings = ConcurrentCollections.newConcurrentMap(); @@ -100,7 +105,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent typesToRefresh = null; String index = indexMetaData.index(); IndexService indexService = indicesService.indexServiceSafe(index); MapperService mapperService = indexService.mapperService(); @@ -292,7 +299,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent(index, mappingType))) { seenMappings.put(new Tuple(index, mappingType), true); } + boolean requiresRefresh = false; try { if (!mapperService.hasMapping(mappingType)) { if (logger.isDebugEnabled()) { @@ -319,6 +336,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent