refresh changed mapping in cluster metadata, this will happen when upgrading from 0.15 to 0.16

This commit is contained in:
kimchy 2011-04-22 16:07:18 +03:00
parent 50a475fd02
commit 65e05538f0
4 changed files with 229 additions and 7 deletions

View File

@ -19,10 +19,7 @@
package org.elasticsearch.cluster;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.action.index.*;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.routing.RoutingService;
@ -67,6 +64,7 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
bind(NodeIndexCreatedAction.class).asEagerSingleton();
bind(NodeIndexDeletedAction.class).asEagerSingleton();
bind(NodeMappingCreatedAction.class).asEagerSingleton();
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.action.index;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import java.io.IOException;
/**
* @author kimchy (Shay Banon)
*/
public class NodeMappingRefreshAction extends AbstractComponent {
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterService clusterService;
private final MetaDataMappingService metaDataMappingService;
@Inject public NodeMappingRefreshAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService,
MetaDataMappingService metaDataMappingService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterService = clusterService;
this.metaDataMappingService = metaDataMappingService;
transportService.registerHandler(NodeMappingRefreshTransportHandler.ACTION, new NodeMappingRefreshTransportHandler());
}
public void nodeMappingRefresh(final NodeMappingRefreshRequest request) throws ElasticSearchException {
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
innerMappingRefresh(request);
}
});
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
NodeMappingRefreshTransportHandler.ACTION, request, VoidTransportResponseHandler.INSTANCE_SAME);
}
}
private void innerMappingRefresh(NodeMappingRefreshRequest request) {
metaDataMappingService.refreshMapping(request.index(), request.types());
}
private class NodeMappingRefreshTransportHandler extends BaseTransportRequestHandler<NodeMappingRefreshRequest> {
static final String ACTION = "cluster/nodeMappingRefresh";
@Override public NodeMappingRefreshRequest newInstance() {
return new NodeMappingRefreshRequest();
}
@Override public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel) throws Exception {
innerMappingRefresh(request);
channel.sendResponse(VoidStreamable.INSTANCE);
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
}
public static class NodeMappingRefreshRequest implements Streamable {
private String index;
private String[] types;
private String nodeId;
private NodeMappingRefreshRequest() {
}
public NodeMappingRefreshRequest(String index, String[] types, String nodeId) {
this.index = index;
this.types = types;
this.nodeId = nodeId;
}
public String index() {
return index;
}
public String[] types() {
return types;
}
public String nodeId() {
return nodeId;
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
out.writeVInt(types.length);
for (String type : types) {
out.writeUTF(type);
}
out.writeUTF(nodeId);
}
@Override public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
types = new String[in.readVInt()];
for (int i = 0; i < types.length; i++) {
types[i] = in.readUTF();
}
nodeId = in.readUTF();
}
}
}

View File

@ -21,9 +21,12 @@ package org.elasticsearch.cluster.metadata;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
@ -34,11 +37,14 @@ import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MergeMappingException;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidTypeNameException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@ -66,6 +72,57 @@ public class MetaDataMappingService extends AbstractComponent {
this.mappingCreatedAction = mappingCreatedAction;
}
/**
* Refreshes mappings if they are not the same between original and parsed version
*/
public void refreshMapping(final String index, final String... types) {
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
try {
// first, check if it really needs to be updated
final IndexMetaData indexMetaData = currentState.metaData().index(index);
if (indexMetaData == null) {
// index got delete on us, ignore...
return currentState;
}
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
for (String type : types) {
// only add the current relevant mapping (if exists)
if (indexMetaData.mappings().containsKey(type)) {
indexService.mapperService().add(type, indexMetaData.mappings().get(type).source().string());
}
}
}
IndexMetaData.Builder indexMetaDataBuilder = newIndexMetaDataBuilder(indexMetaData);
List<String> updatedTypes = Lists.newArrayList();
for (String type : types) {
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
if (!mapper.mappingSource().equals(indexMetaData.mappings().get(type).source())) {
updatedTypes.add(type);
indexMetaDataBuilder.putMapping(new MappingMetaData(mapper));
}
}
if (updatedTypes.isEmpty()) {
return currentState;
}
logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes);
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
builder.put(indexMetaDataBuilder);
return newClusterStateBuilder().state(currentState).metaData(builder).build();
} catch (Exception e) {
logger.warn("failed to dynamically refresh the mapping in cluster_state from shard", e);
return currentState;
}
}
});
}
public void updateMapping(final String index, final String type, final CompressedString mappingSource, final Listener listener) {
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {

View File

@ -27,12 +27,14 @@ import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressedString;
@ -60,6 +62,7 @@ import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@ -89,6 +92,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final NodeMappingCreatedAction nodeMappingCreatedAction;
private final NodeMappingRefreshAction nodeMappingRefreshAction;
// a map of mappings type we have seen per index
private final ConcurrentMap<Tuple<String, String>, Boolean> seenMappings = ConcurrentCollections.newConcurrentMap();
@ -100,7 +105,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
ThreadPool threadPool, RecoveryTarget recoveryTarget, RecoverySource recoverySource,
ShardStateAction shardStateAction,
NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingCreatedAction nodeMappingCreatedAction) {
NodeMappingCreatedAction nodeMappingCreatedAction, NodeMappingRefreshAction nodeMappingRefreshAction) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
@ -111,6 +116,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
this.nodeMappingCreatedAction = nodeMappingCreatedAction;
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
}
@Override protected void doStart() throws ElasticSearchException {
@ -277,6 +283,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// we only create / update here
continue;
}
List<String> typesToRefresh = null;
String index = indexMetaData.index();
IndexService indexService = indicesService.indexServiceSafe(index);
MapperService mapperService = indexService.mapperService();
@ -292,7 +299,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (mappingType.equals(MapperService.DEFAULT_MAPPING)) { // we processed _default_ first
continue;
}
processMapping(event, index, mapperService, mappingType, mappingSource);
boolean requireRefresh = processMapping(event, index, mapperService, mappingType, mappingSource);
if (requireRefresh) {
if (typesToRefresh == null) {
typesToRefresh = Lists.newArrayList();
}
typesToRefresh.add(mappingType);
}
}
if (typesToRefresh != null) {
nodeMappingRefreshAction.nodeMappingRefresh(new NodeMappingRefreshAction.NodeMappingRefreshRequest(index, typesToRefresh.toArray(new String[typesToRefresh.size()]), event.state().nodes().localNodeId()));
}
// go over and remove mappings
for (DocumentMapper documentMapper : mapperService) {
@ -305,11 +321,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
private void processMapping(ClusterChangedEvent event, String index, MapperService mapperService, String mappingType, CompressedString mappingSource) {
private boolean processMapping(ClusterChangedEvent event, String index, MapperService mapperService, String mappingType, CompressedString mappingSource) {
if (!seenMappings.containsKey(new Tuple<String, String>(index, mappingType))) {
seenMappings.put(new Tuple<String, String>(index, mappingType), true);
}
boolean requiresRefresh = false;
try {
if (!mapperService.hasMapping(mappingType)) {
if (logger.isDebugEnabled()) {
@ -319,6 +336,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (!mapperService.documentMapper(mappingType).mappingSource().equals(mappingSource)) {
// this might happen when upgrading from 0.15 to 0.16
logger.debug("[{}] parsed mapping [{}], and got different sources\noriginal:\n{}\nparsed:\n{}", index, mappingType, mappingSource, mapperService.documentMapper(mappingType).mappingSource());
requiresRefresh = true;
}
nodeMappingCreatedAction.nodeMappingCreated(new NodeMappingCreatedAction.NodeMappingCreatedResponse(index, mappingType, event.state().nodes().localNodeId()));
} else {
@ -330,6 +348,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
mapperService.add(mappingType, mappingSource.string());
if (!mapperService.documentMapper(mappingType).mappingSource().equals(mappingSource)) {
requiresRefresh = true;
// this might happen when upgrading from 0.15 to 0.16
logger.debug("[{}] parsed mapping [{}], and got different sources\noriginal:\n{}\nparsed:\n{}", index, mappingType, mappingSource, mapperService.documentMapper(mappingType).mappingSource());
}
@ -339,6 +358,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} catch (Exception e) {
logger.warn("[{}] failed to add mapping [{}], source [{}]", e, index, mappingType, mappingSource);
}
return requiresRefresh;
}
private void applyNewOrUpdatedShards(final ClusterChangedEvent event) throws ElasticSearchException {