diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 1fa0161655a..500a9a6ec56 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -40,6 +40,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.bytes.BytesReference; @@ -62,7 +63,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.Map; import java.util.Set; @@ -569,7 +569,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } documentMapper.refreshSource(); - mappingUpdatedAction.execute(new MappingUpdatedAction.MappingUpdatedRequest(index, type, documentMapper.mappingSource()), new ActionListener() { + IndexMetaData metaData = clusterService.state().metaData().index(index); + + final MappingUpdatedAction.MappingUpdatedRequest request = new MappingUpdatedAction.MappingUpdatedRequest(index, metaData.uuid(), type, documentMapper.mappingSource()); + mappingUpdatedAction.execute(request, new ActionListener() { @Override public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) { // all is well @@ -577,11 +580,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation @Override public void onFailure(Throwable e) { - try { - logger.warn("failed to update master on updated mapping for index [{}], type [{}] and source [{}]", e, index, type, documentMapper.mappingSource().string()); - } catch (IOException e1) { - // ignore - } + logger.warn("failed to update master on updated mapping for {}", e, request); } }); } catch (Throwable e) { diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 9a7e3a861a0..1c3acf40375 100644 --- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.ShardIterator; @@ -48,7 +49,6 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -184,7 +184,8 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi final IndexRequest request = shardRequest.request; // validate, if routing is required, that we got routing - MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mappingOrDefault(request.type()); + IndexMetaData indexMetaData = clusterState.metaData().index(request.index()); + MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type()); if (mappingMd != null && mappingMd.routing().required()) { if (request.routing() == null) { throw new RoutingMissingException(request.index(), request.type(), request.id()); @@ -203,7 +204,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi .versionType(request.versionType()) .origin(Engine.Operation.Origin.PRIMARY); if (index.parsedDoc().mappingsModified()) { - updateMappingOnMaster(request); + updateMappingOnMaster(request, indexMetaData); } indexShard.index(index); version = index.version(); @@ -215,7 +216,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi .versionType(request.versionType()) .origin(Engine.Operation.Origin.PRIMARY); if (create.parsedDoc().mappingsModified()) { - updateMappingOnMaster(request); + updateMappingOnMaster(request, indexMetaData); } indexShard.create(create); version = create.version(); @@ -263,17 +264,19 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } } - private void updateMappingOnMaster(final IndexRequest request) { + private void updateMappingOnMaster(final IndexRequest request, IndexMetaData indexMetaData) { final CountDownLatch latch = new CountDownLatch(1); try { - MapperService mapperService = indicesService.indexServiceSafe(request.index()).mapperService(); + final MapperService mapperService = indicesService.indexServiceSafe(request.index()).mapperService(); final DocumentMapper documentMapper = mapperService.documentMapper(request.type()); if (documentMapper == null) { // should not happen return; } documentMapper.refreshSource(); - logger.trace("Sending mapping updated to master: index [{}] type [{}]", request.index(), request.type()); - mappingUpdatedAction.execute(new MappingUpdatedAction.MappingUpdatedRequest(request.index(), request.type(), documentMapper.mappingSource()), new ActionListener() { + final MappingUpdatedAction.MappingUpdatedRequest mappingRequest = + new MappingUpdatedAction.MappingUpdatedRequest(request.index(), indexMetaData.uuid(), request.type(), documentMapper.mappingSource()); + logger.trace("Sending mapping updated to master: {}", mappingRequest); + mappingUpdatedAction.execute(mappingRequest, new ActionListener() { @Override public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) { // all is well @@ -283,11 +286,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi @Override public void onFailure(Throwable e) { latch.countDown(); - try { - logger.warn("Failed to update master on updated mapping for index [" + request.index() + "], type [" + request.type() + "] and source [" + documentMapper.mappingSource().string() + "]", e); - } catch (IOException e1) { - // ignore - } + logger.warn("Failed to update master on updated mapping for {}", e, mappingRequest); } }); } catch (Exception e) { diff --git a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index 42d877d7fc3..f8062700e57 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.action.index; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; @@ -27,6 +28,7 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; @@ -76,7 +78,7 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction listener) throws ElasticSearchException { - metaDataMappingService.updateMapping(request.index(), request.type(), request.mappingSource(), new MetaDataMappingService.Listener() { + metaDataMappingService.updateMapping(request.index(), request.indexUUID(), request.type(), request.mappingSource(), new MetaDataMappingService.Listener() { @Override public void onResponse(MetaDataMappingService.Response response) { listener.onResponse(new MappingUpdatedResponse()); @@ -84,7 +86,7 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction { private String index; - + private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE; private String type; - private CompressedString mappingSource; MappingUpdatedRequest() { } - public MappingUpdatedRequest(String index, String type, CompressedString mappingSource) { + public MappingUpdatedRequest(String index, String indexUUID, String type, CompressedString mappingSource) { this.index = index; + this.indexUUID = indexUUID; this.type = type; this.mappingSource = mappingSource; } @@ -123,6 +125,10 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction { @@ -87,14 +89,16 @@ public class NodeMappingRefreshAction extends AbstractComponent { public static class NodeMappingRefreshRequest extends TransportRequest { private String index; + private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE; private String[] types; private String nodeId; NodeMappingRefreshRequest() { } - public NodeMappingRefreshRequest(String index, String[] types, String nodeId) { + public NodeMappingRefreshRequest(String index, String indexUUID, String[] types, String nodeId) { this.index = index; + this.indexUUID = indexUUID; this.types = types; this.nodeId = nodeId; } @@ -103,6 +107,11 @@ public class NodeMappingRefreshAction extends AbstractComponent { return index; } + public String indexUUID() { + return indexUUID; + } + + public String[] types() { return types; } @@ -117,6 +126,9 @@ public class NodeMappingRefreshAction extends AbstractComponent { out.writeString(index); out.writeStringArray(types); out.writeString(nodeId); + if (out.getVersion().onOrAfter(Version.V_0_90_6)) { + out.writeString(indexUUID); + } } @Override @@ -125,6 +137,9 @@ public class NodeMappingRefreshAction extends AbstractComponent { index = in.readString(); types = in.readStringArray(); nodeId = in.readString(); + if (in.getVersion().onOrAfter(Version.V_0_90_6)) { + indexUUID = in.readString(); + } } } } diff --git a/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 3cbba6c2332..6732a4f6b45 100644 --- a/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -77,7 +77,7 @@ public class ShardStateAction extends AbstractComponent { transportService.registerHandler(ShardFailedTransportHandler.ACTION, new ShardFailedTransportHandler()); } - public void shardFailed(final ShardRouting shardRouting, String indexUUID, final String reason) throws ElasticSearchException { + public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason) throws ElasticSearchException { ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason); logger.warn("{} sending failed shard for {}", shardRouting.shardId(), shardRoutingEntry); DiscoveryNodes nodes = clusterService.state().nodes(); @@ -215,21 +215,25 @@ public class ShardStateAction extends AbstractComponent { // with the shard still initializing, and it will try and start it again (until the verification comes) IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardRouting.id()); + + boolean applyShardEvent = true; + for (ShardRouting entry : indexShardRoutingTable) { if (shardRouting.currentNodeId().equals(entry.currentNodeId())) { // we found the same shard that exists on the same node id - if (entry.initializing()) { - // shard not started, add it to the shards to be processed. - shardRoutingToBeApplied.add(shardRouting); - logger.debug("{} will apply shard started {}", shardRouting.shardId(), shardRoutingEntry); - } else { + if (!entry.initializing()) { + // shard is in initialized state, skipping event (probable already started) logger.debug("{} ignoring shard started event for {}, current state: {}", shardRouting.shardId(), shardRoutingEntry, entry.state()); + applyShardEvent = false; } - } else { - shardRoutingToBeApplied.add(shardRouting); - logger.debug("{} will apply shard started {}", shardRouting.shardId(), shardRoutingEntry); } } + + if (applyShardEvent) { + shardRoutingToBeApplied.add(shardRouting); + logger.debug("{} will apply shard started {}", shardRouting.shardId(), shardRoutingEntry); + } + } catch (Throwable t) { logger.error("{} unexpected failure while processing shard started [{}]", t, shardRouting.shardId(), shardRouting); } @@ -299,7 +303,7 @@ public class ShardStateAction extends AbstractComponent { private ShardRouting shardRouting; - private String indexUUID; + private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE; private String reason; @@ -318,7 +322,7 @@ public class ShardStateAction extends AbstractComponent { shardRouting = readShardRoutingEntry(in); reason = in.readString(); if (in.getVersion().onOrAfter(Version.V_0_90_6)) { - indexUUID = in.readOptionalString(); + indexUUID = in.readString(); } } @@ -328,18 +332,13 @@ public class ShardStateAction extends AbstractComponent { shardRouting.writeTo(out); out.writeString(reason); if (out.getVersion().onOrAfter(Version.V_0_90_6)) { - out.writeOptionalString(indexUUID); + out.writeString(indexUUID); } } @Override public String toString() { - StringBuilder sb = new StringBuilder(shardRouting.toString()); - if (indexUUID != null) { - sb.append(", indexUUID [").append(indexUUID).append("]"); - } - sb.append(", reason [").append(reason).append("]"); - return sb.toString(); + return "" + shardRouting + ", indexUUID [" + indexUUID + "], reason [" + reason + "]"; } } } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index f47c58fc6f4..ef4f87fc061 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -157,6 +157,7 @@ public class IndexMetaData { public static final String SETTING_BLOCKS_METADATA = "index.blocks.metadata"; public static final String SETTING_VERSION_CREATED = "index.version.created"; public static final String SETTING_UUID = "index.uuid"; + public static final String INDEX_UUID_NA_VALUE = "_na_"; private final String index; private final long version; @@ -214,25 +215,30 @@ public class IndexMetaData { return index; } + public String getIndex() { + return index(); + } + + public String uuid() { + return settings.get(SETTING_UUID, INDEX_UUID_NA_VALUE); + } + public String getUUID() { - return settings.get(SETTING_UUID); + return uuid(); } /** - * Test whether the current index UUID is the same as the given one. Incoming nulls always return true. + * Test whether the current index UUID is the same as the given one. Returns true if either are _na_ */ - public boolean isSameUUID(@Nullable String otherUUID) { - if (otherUUID == null || getUUID() == null) { + public boolean isSameUUID(String otherUUID) { + assert otherUUID != null; + assert uuid() != null; + if (INDEX_UUID_NA_VALUE.equals(otherUUID) || INDEX_UUID_NA_VALUE.equals(uuid())) { return true; } return otherUUID.equals(getUUID()); } - - public String getIndex() { - return index(); - } - public long version() { return this.version; } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 1b8c6d9892a..1ed50e09df4 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -69,7 +69,7 @@ public class MetaDataMappingService extends AbstractComponent { private final NodeMappingCreatedAction mappingCreatedAction; - private final BlockingQueue refreshOrUpdateQueue = ConcurrentCollections.newBlockingQueue(); + private final BlockingQueue refreshOrUpdateQueue = ConcurrentCollections.newBlockingQueue(); @Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeMappingCreatedAction mappingCreatedAction) { @@ -79,24 +79,32 @@ public class MetaDataMappingService extends AbstractComponent { this.mappingCreatedAction = mappingCreatedAction; } - static class RefreshTask { + static class MappingTask { final String index; + final String indexUUID; + + MappingTask(String index, final String indexUUID) { + this.index = index; + this.indexUUID = indexUUID; + } + } + + static class RefreshTask extends MappingTask { final String[] types; - RefreshTask(String index, String[] types) { - this.index = index; + RefreshTask(String index, final String indexUUID, String[] types) { + super(index, indexUUID); this.types = types; } } - static class UpdateTask { - final String index; + static class UpdateTask extends MappingTask { final String type; final CompressedString mappingSource; final Listener listener; - UpdateTask(String index, String type, CompressedString mappingSource, Listener listener) { - this.index = index; + UpdateTask(String index, String indexUUID, String type, CompressedString mappingSource, Listener listener) { + super(index, indexUUID); this.type = type; this.mappingSource = mappingSource; this.listener = listener; @@ -109,7 +117,7 @@ public class MetaDataMappingService extends AbstractComponent { * and generate a single cluster change event out of all of those. */ ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exception { - List allTasks = new ArrayList(); + List allTasks = new ArrayList(); refreshOrUpdateQueue.drainTo(allTasks); if (allTasks.isEmpty()) { @@ -118,43 +126,45 @@ public class MetaDataMappingService extends AbstractComponent { // break down to tasks per index, so we can optimize the on demand index service creation // to only happen for the duration of a single index processing of its respective events - Map> tasksPerIndex = Maps.newHashMap(); - for (Object task : allTasks) { - String index = null; - if (task instanceof UpdateTask) { - index = ((UpdateTask) task).index; - } else if (task instanceof RefreshTask) { - index = ((RefreshTask) task).index; - } else { - logger.warn("illegal state, got wrong mapping task type [{}]", task); + Map> tasksPerIndex = Maps.newHashMap(); + for (MappingTask task : allTasks) { + if (task.index == null) { + logger.debug("ignoring a mapping task of type [{}] with a null index.", task); } - if (index != null) { - List indexTasks = tasksPerIndex.get(index); - if (indexTasks == null) { - indexTasks = new ArrayList(); - tasksPerIndex.put(index, indexTasks); - } - indexTasks.add(task); + List indexTasks = tasksPerIndex.get(task.index); + if (indexTasks == null) { + indexTasks = new ArrayList(); + tasksPerIndex.put(task.index, indexTasks); } + indexTasks.add(task); + } boolean dirty = false; MetaData.Builder mdBuilder = newMetaDataBuilder().metaData(currentState.metaData()); - for (Map.Entry> entry : tasksPerIndex.entrySet()) { + for (Map.Entry> entry : tasksPerIndex.entrySet()) { String index = entry.getKey(); - List tasks = entry.getValue(); + List tasks = entry.getValue(); boolean removeIndex = false; // keep track of what we already refreshed, no need to refresh it again... Set processedRefreshes = Sets.newHashSet(); try { - for (Object task : tasks) { + for (MappingTask task : tasks) { + final IndexMetaData indexMetaData = mdBuilder.get(index); + if (indexMetaData == null) { + // index got deleted on us, ignore... + logger.debug("[{}] ignoring task [{}] - index meta data doesn't exist", index, task); + continue; + } + + if (!indexMetaData.isSameUUID(task.indexUUID)) { + // index got deleted on us, ignore... + logger.debug("[{}] ignoring task [{}] - index meta data doesn't match task uuid", index, task); + continue; + } + if (task instanceof RefreshTask) { RefreshTask refreshTask = (RefreshTask) task; - final IndexMetaData indexMetaData = mdBuilder.get(index); - if (indexMetaData == null) { - // index got delete on us, ignore... - continue; - } IndexService indexService = indicesService.indexService(index); if (indexService == null) { // we need to create the index here, and add the current mapping to it, so we can merge @@ -195,13 +205,8 @@ public class MetaDataMappingService extends AbstractComponent { String type = updateTask.type; CompressedString mappingSource = updateTask.mappingSource; - // first, check if it really needs to be updated - final IndexMetaData indexMetaData = mdBuilder.get(index); - if (indexMetaData == null) { - // index got delete on us, ignore... - continue; - } if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(mappingSource)) { + logger.debug("[{}] update_mapping [{}] ignoring mapping update task as it's source is equal to ours", index, updateTask.type); continue; } @@ -221,16 +226,13 @@ public class MetaDataMappingService extends AbstractComponent { // if we end up with the same mapping as the original once, ignore if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(updatedMapper.mappingSource())) { + logger.debug("[{}] update_mapping [{}] ignoring mapping update task as it results in the same source as what we have", index, updateTask.type); continue; } // build the updated mapping source if (logger.isDebugEnabled()) { - try { - logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource().string()); - } catch (Exception e) { - // ignore - } + logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource()); } else if (logger.isInfoEnabled()) { logger.info("[{}] update_mapping [{}] (dynamic)", index, type); } @@ -262,8 +264,8 @@ public class MetaDataMappingService extends AbstractComponent { /** * Refreshes mappings if they are not the same between original and parsed version */ - public void refreshMapping(final String index, final String... types) { - refreshOrUpdateQueue.add(new RefreshTask(index, types)); + public void refreshMapping(final String index, final String indexUUID, final String... types) { + refreshOrUpdateQueue.add(new RefreshTask(index, indexUUID, types)); clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ClusterStateUpdateTask() { @Override public void onFailure(String source, Throwable t) { @@ -277,8 +279,8 @@ public class MetaDataMappingService extends AbstractComponent { }); } - public void updateMapping(final String index, final String type, final CompressedString mappingSource, final Listener listener) { - refreshOrUpdateQueue.add(new UpdateTask(index, type, mappingSource, listener)); + public void updateMapping(final String index, final String indexUUID, final String type, final CompressedString mappingSource, final Listener listener) { + refreshOrUpdateQueue.add(new UpdateTask(index, indexUUID, type, mappingSource, listener)); clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", Priority.HIGH, new ClusterStateUpdateTask() { @Override public void onFailure(String source, Throwable t) { diff --git a/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index 56fb05d7e22..18b8991dcdb 100644 --- a/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -295,7 +295,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde @Override public String indexUUID() { - return indexSettings.get(IndexMetaData.SETTING_UUID); + return indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE); } @Override diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index ce0bd251fd8..3131ebbca01 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -388,7 +388,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent