diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 7bec49abfb5..932a80e3073 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -23,10 +23,13 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.Strings; @@ -55,11 +58,15 @@ import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.river.RiverIndexName; +import org.elasticsearch.threadpool.ThreadPool; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.*; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.ClusterState.*; import static org.elasticsearch.cluster.metadata.IndexMetaData.*; @@ -73,25 +80,31 @@ public class MetaDataCreateIndexService extends AbstractComponent { private final Environment environment; + private final ThreadPool threadPool; + private final ClusterService clusterService; private final IndicesService indicesService; private final ShardsAllocation shardsAllocation; + private final NodeIndexCreatedAction nodeIndexCreatedAction; + private final String riverIndexName; - @Inject public MetaDataCreateIndexService(Settings settings, Environment environment, ClusterService clusterService, IndicesService indicesService, - ShardsAllocation shardsAllocation, @RiverIndexName String riverIndexName) { + @Inject public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService, + ShardsAllocation shardsAllocation, NodeIndexCreatedAction nodeIndexCreatedAction, @RiverIndexName String riverIndexName) { super(settings); this.environment = environment; + this.threadPool = threadPool; this.clusterService = clusterService; this.indicesService = indicesService; this.shardsAllocation = shardsAllocation; + this.nodeIndexCreatedAction = nodeIndexCreatedAction; this.riverIndexName = riverIndexName; } - public void createIndex(final Request request, final Listener listener) { + public void createIndex(final Request request, final Listener userListener) { ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder(); for (Map.Entry entry : request.settings.getAsMap().entrySet()) { if (!entry.getKey().startsWith("index.")) { @@ -101,8 +114,12 @@ public class MetaDataCreateIndexService extends AbstractComponent { } } request.settings(updatedSettingsBuilder.build()); + final CreateIndexListener listener = new CreateIndexListener(request, userListener); + clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ProcessedClusterStateUpdateTask() { + final Set allocatedNodes = Sets.newHashSet(); + @Override public ClusterState execute(ClusterState currentState) { try { try { @@ -254,6 +271,44 @@ public class MetaDataCreateIndexService extends AbstractComponent { updatedState = newClusterStateBuilder().state(updatedState).routingResult(routingResult).build(); } + // initialize the counter only for nodes the shards are allocated to + if (updatedState.routingTable().hasIndex(request.index)) { + for (IndexShardRoutingTable indexShardRoutingTable : updatedState.routingTable().index(request.index)) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + // if we have a routing for this shard on a node, and its not the master node (since we already created + // an index on it), then add it + if (shardRouting.currentNodeId() != null && !updatedState.nodes().localNodeId().equals(shardRouting.currentNodeId())) { + allocatedNodes.add(shardRouting.currentNodeId()); + } + } + } + } + + if (!allocatedNodes.isEmpty()) { + final AtomicInteger counter = new AtomicInteger(allocatedNodes.size()); + + final NodeIndexCreatedAction.Listener nodeIndexCreatedListener = new NodeIndexCreatedAction.Listener() { + @Override public void onNodeIndexCreated(String index, String nodeId) { + if (index.equals(request.index)) { + if (counter.decrementAndGet() == 0) { + listener.onResponse(new Response(true, indexMetaData)); + nodeIndexCreatedAction.remove(this); + } + } + } + }; + + nodeIndexCreatedAction.add(nodeIndexCreatedListener); + + listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() { + @Override public void run() { + listener.onResponse(new Response(false, indexMetaData)); + nodeIndexCreatedAction.remove(nodeIndexCreatedListener); + } + }); + } + + return updatedState; } catch (Exception e) { logger.warn("[{}] failed to create", e, request.index); @@ -263,11 +318,48 @@ public class MetaDataCreateIndexService extends AbstractComponent { } @Override public void clusterStateProcessed(ClusterState clusterState) { - listener.onResponse(new Response(true, clusterState.metaData().index(request.index))); + if (allocatedNodes.isEmpty()) { + listener.onResponse(new Response(true, clusterState.metaData().index(request.index))); + } } }); } + class CreateIndexListener implements Listener { + + private AtomicBoolean notified = new AtomicBoolean(); + + private final Request request; + + private final Listener listener; + + volatile ScheduledFuture future; + + private CreateIndexListener(Request request, Listener listener) { + this.request = request; + this.listener = listener; + } + + @Override public void onResponse(final Response response) { + if (notified.compareAndSet(false, true)) { + if (future != null) { + future.cancel(false); + } + listener.onResponse(response); + } + } + + @Override public void onFailure(Throwable t) { + if (notified.compareAndSet(false, true)) { + if (future != null) { + future.cancel(false); + } + listener.onFailure(t); + } + } + } + + private Map parseMapping(String mappingSource) throws Exception { return XContentFactory.xContent(mappingSource).createParser(mappingSource).mapAndClose(); }