From 2381f668f8962412d223ce1ac41dcb61a60339d6 Mon Sep 17 00:00:00 2001 From: kimchy Date: Sat, 15 Jan 2011 02:13:57 +0200 Subject: [PATCH] wait for mappings to be created on other nodes before returning from the put mapping request --- .../index/NodeMappingCreatedAction.java | 13 ++++++- .../metadata/MetaDataMappingService.java | 38 ++++++++++++++++++- .../cluster/routing/IndexRoutingTable.java | 12 ++++++ 3 files changed, 60 insertions(+), 3 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java index 98cb5222e02..c6be158dea5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingCreatedAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.VoidStreamable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BaseTransportRequestHandler; import org.elasticsearch.transport.TransportChannel; @@ -60,8 +61,16 @@ public class NodeMappingCreatedAction extends AbstractComponent { transportService.registerHandler(NodeMappingCreatedTransportHandler.ACTION, new NodeMappingCreatedTransportHandler()); } - public void add(Listener listener) { + public void add(final Listener listener, TimeValue timeout) { listeners.add(listener); + threadPool.schedule(new Runnable() { + @Override public void run() { + boolean removed = listeners.remove(listener); + if (removed) { + listener.onTimeout(); + } + } + }, timeout); } public void remove(Listener listener) { @@ -91,6 +100,8 @@ public class NodeMappingCreatedAction extends AbstractComponent { public static interface Listener { void onNodeMappingCreated(NodeMappingCreatedResponse response); + + void onTimeout(); } private class NodeMappingCreatedTransportHandler extends BaseTransportRequestHandler { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index dfdac347cf4..3ce5b0838fe 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -23,6 +23,8 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.inject.Inject; @@ -39,6 +41,7 @@ import org.elasticsearch.indices.InvalidTypeNameException; import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.ClusterState.*; import static org.elasticsearch.cluster.metadata.IndexMetaData.*; @@ -55,10 +58,13 @@ public class MetaDataMappingService extends AbstractComponent { private final IndicesService indicesService; - @Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService) { + private final NodeMappingCreatedAction mappingCreatedAction; + + @Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeMappingCreatedAction mappingCreatedAction) { super(settings); this.clusterService = clusterService; this.indicesService = indicesService; + this.mappingCreatedAction = mappingCreatedAction; } public void updateMapping(final String index, final String type, final CompressedString mappingSource) { @@ -267,7 +273,35 @@ public class MetaDataMappingService extends AbstractComponent { } @Override public void clusterStateProcessed(ClusterState clusterState) { - listener.onResponse(new Response(true)); + int counter = 0; + for (String index : request.indices) { + IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index); + if (indexRoutingTable != null) { + counter += indexRoutingTable.numberOfNodesShardsAreAllocatedOn(); + } + } + + if (counter > 0) { + counter = counter - 1; // we already added the mapping on the master here... + } + if (counter == 0) { + listener.onResponse(new Response(true)); + return; + } + + final AtomicInteger countDown = new AtomicInteger(counter); + mappingCreatedAction.add(new NodeMappingCreatedAction.Listener() { + @Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) { + if (countDown.decrementAndGet() == 0) { + mappingCreatedAction.remove(this); + listener.onResponse(new Response(true)); + } + } + + @Override public void onTimeout() { + listener.onResponse(new Response(false)); + } + }, request.timeout); } }); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index 7a6ef09ea6d..9ddf5f03844 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -102,6 +102,18 @@ public class IndexRoutingTable implements Iterable { return shards.values().iterator(); } + public int numberOfNodesShardsAreAllocatedOn() { + Set nodes = Sets.newHashSet(); + for (IndexShardRoutingTable shardRoutingTable : this) { + for (ShardRouting shardRouting : shardRoutingTable) { + if (shardRouting.assignedToNode()) { + nodes.add(shardRouting.currentNodeId()); + } + } + } + return nodes.size(); + } + public ImmutableMap shards() { return shards; }