diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index c238e14fcdb..af79baf7514 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.collect.Lists; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; @@ -37,7 +36,6 @@ import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MergeMappingException; import org.elasticsearch.index.service.IndexService; -import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidTypeNameException; @@ -46,6 +44,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.ClusterState.*; @@ -340,7 +339,23 @@ public class MetaDataMappingService extends AbstractComponent { } } - return newClusterStateBuilder().state(currentState).metaData(builder).build(); + ClusterState updatedState = newClusterStateBuilder().state(currentState).metaData(builder).build(); + + // wait for responses from other nodes if needed + int counter = 0; + for (String index : request.indices) { + IndexRoutingTable indexRoutingTable = updatedState.routingTable().index(index); + if (indexRoutingTable != null) { + counter += indexRoutingTable.numberOfNodesShardsAreAllocatedOn(updatedState.nodes().masterNodeId()); + } + } + + if (counter == 0) { + listener.onResponse(new Response(true)); + return updatedState; + } + mappingCreatedAction.add(new CountDownListener(counter, listener), request.timeout); + return updatedState; } catch (Exception e) { listener.onFailure(e); return currentState; @@ -352,32 +367,6 @@ public class MetaDataMappingService extends AbstractComponent { } @Override public void clusterStateProcessed(ClusterState clusterState) { - int counter = 0; - for (String index : request.indices) { - IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index); - if (indexRoutingTable != null) { - counter += indexRoutingTable.numberOfNodesShardsAreAllocatedOn(clusterState.nodes().masterNodeId()); - } - } - - if (counter == 0) { - listener.onResponse(new Response(true)); - return; - } - - final AtomicInteger countDown = new AtomicInteger(counter); - mappingCreatedAction.add(new NodeMappingCreatedAction.Listener() { - @Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) { - if (countDown.decrementAndGet() == 0) { - mappingCreatedAction.remove(this); - listener.onResponse(new Response(true)); - } - } - - @Override public void onTimeout() { - listener.onResponse(new Response(false)); - } - }, request.timeout); } }); } @@ -441,4 +430,32 @@ public class MetaDataMappingService extends AbstractComponent { return acknowledged; } } + + private class CountDownListener implements NodeMappingCreatedAction.Listener { + + private final AtomicBoolean notified = new AtomicBoolean(); + private final AtomicInteger countDown; + private final Listener listener; + + public CountDownListener(int countDown, Listener listener) { + this.countDown = new AtomicInteger(countDown); + this.listener = listener; + } + + @Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) { + if (countDown.decrementAndGet() == 0) { + mappingCreatedAction.remove(this); + if (notified.compareAndSet(false, true)) { + listener.onResponse(new Response(true)); + } + } + } + + @Override public void onTimeout() { + mappingCreatedAction.remove(this); + if (notified.compareAndSet(false, true)) { + listener.onResponse(new Response(false)); + } + } + } }