From 3ac3c7d12c0cb845a8c28cec5b890705b13dc659 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 14 Aug 2013 17:04:19 +0200 Subject: [PATCH] Put Mappings CountDownListener validates cluster state version of incoming change confirmations. Closes #3508 --- .../index/NodeMappingCreatedAction.java | 11 ++++++++- .../metadata/MetaDataMappingService.java | 24 +++++++++++-------- .../cluster/IndicesClusterStateService.java | 12 ++++++---- .../indices/mapping/UpdateMappingTests.java | 9 +++---- 4 files changed, 37 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java index a00abc9df16..2477eade496 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java @@ -76,6 +76,7 @@ public class NodeMappingCreatedAction extends AbstractComponent { public void nodeMappingCreated(final NodeMappingCreatedResponse response) throws ElasticSearchException { DiscoveryNodes nodes = clusterService.state().nodes(); + logger.debug("Sending mapping created for index {}, type {} (cluster state version: {})", response.index, response.type, response.clusterStateVersion); if (nodes.localNodeMaster()) { threadPool.generic().execute(new Runnable() { @Override @@ -128,14 +129,16 @@ public class NodeMappingCreatedAction extends AbstractComponent { private String index; private String type; private String nodeId; + private long clusterStateVersion; private NodeMappingCreatedResponse() { } - public NodeMappingCreatedResponse(String index, String type, String nodeId) { + public NodeMappingCreatedResponse(String index, String type, String nodeId, long clusterStateVersion) { this.index = index; this.type = type; this.nodeId = nodeId; + this.clusterStateVersion = clusterStateVersion; } public String index() { @@ -150,12 +153,17 @@ public class NodeMappingCreatedAction extends AbstractComponent { return nodeId; } + public long clusterStateVersion() { + return clusterStateVersion; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(index); out.writeString(type); out.writeString(nodeId); + out.writeVLong(clusterStateVersion); } @Override @@ -164,6 +172,7 @@ public class NodeMappingCreatedAction extends AbstractComponent { index = in.readString(); type = in.readString(); nodeId = in.readString(); + clusterStateVersion = in.readVLong(); } } } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 4e77099fa1a..21dd8b4c0bc 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -492,7 +492,11 @@ public class MetaDataMappingService extends AbstractComponent { } } - countDownListener = new CountDownListener(counter, request.indices, request.mappingType, listener); + + // TODO: adding one to the version is based on knowledge on how the parent class will increment the version + // move this to the base class or add another callback before publishing the new cluster state so we + // capture it's version. + countDownListener = new CountDownListener(counter, currentState.version() + 1, listener); mappingCreatedAction.add(countDownListener, request.timeout); return updatedState; @@ -589,22 +593,22 @@ public class MetaDataMappingService extends AbstractComponent { private final AtomicBoolean notified = new AtomicBoolean(); private final AtomicInteger countDown; private final Listener listener; - private final List indices; - private final String type; + private final long minClusterStateVersion; - public CountDownListener(int countDown, String[] indices, String type, Listener listener) { - this.indices = Arrays.asList(indices); - this.type = type; + /** + * @param countDown initial counter value + * @param minClusterStateVersion the minimum cluster state version for which accept responses + * @param listener listener to call when counter reaches 0. + */ + public CountDownListener(int countDown, long minClusterStateVersion, Listener listener) { this.countDown = new AtomicInteger(countDown); this.listener = listener; + this.minClusterStateVersion = minClusterStateVersion; } @Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) { - if (indices.indexOf(response.index()) < 0) { - return; - } - if (type != null && !type.equals(response.type())) { + if (response.clusterStateVersion() < minClusterStateVersion) { return; } decrementCounter(); diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 10a3b4d8366..5aa262561ec 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -133,8 +133,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent mappings = getMappingResponse.getMappings().get(indexName); assertThat(mappings.keySet(), Matchers.hasItem(typeName)); + assertThat(((Map) mappings.get(typeName).getSourceAsMap().get("properties")).keySet(), Matchers.hasItem(fieldName)); } } catch (Throwable t) { threadException[0] = t;