diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index 9267540857a..2062436ef2c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -20,6 +20,10 @@ package org.elasticsearch.cluster; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.collect.Lists; + +import java.util.List; /** * @author kimchy (shay.banon) @@ -70,6 +74,50 @@ public class ClusterChangedEvent { return true; } + /** + * Returns the indices created in this event + */ + public List indicesCreated() { + if (previousState == null) { + return Lists.newArrayList(state.metaData().indices().keySet()); + } + if (!metaDataChanged()) { + return ImmutableList.of(); + } + List created = null; + for (String index : state.metaData().indices().keySet()) { + if (!previousState.metaData().hasIndex(index)) { + if (created == null) { + created = Lists.newArrayList(); + } + created.add(index); + } + } + return created == null ? ImmutableList.of() : created; + } + + /** + * Returns the indices deleted in this event + */ + public List indicesDeleted() { + if (previousState == null) { + return ImmutableList.of(); + } + if (!metaDataChanged()) { + return ImmutableList.of(); + } + List deleted = null; + for (String index : previousState.metaData().indices().keySet()) { + if (!state.metaData().hasIndex(index)) { + if (deleted == null) { + deleted = Lists.newArrayList(); + } + deleted.add(index); + } + } + return deleted == null ? ImmutableList.of() : deleted; + } + public boolean metaDataChanged() { return state.metaData() != previousState.metaData(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 1db79e6d1b2..d6d99b80aea 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -27,9 +27,7 @@ import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.Strings; @@ -63,7 +61,11 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.File; import java.io.FileReader; import java.io.IOException; -import java.util.*; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -118,8 +120,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ProcessedClusterStateUpdateTask() { - final Set allocatedNodes = Sets.newHashSet(); - @Override public ClusterState execute(ClusterState currentState) { try { try { @@ -271,43 +271,28 @@ public class MetaDataCreateIndexService extends AbstractComponent { updatedState = newClusterStateBuilder().state(updatedState).routingResult(routingResult).build(); } - // initialize the counter only for nodes the shards are allocated to - if (updatedState.routingTable().hasIndex(request.index)) { - for (IndexShardRoutingTable indexShardRoutingTable : updatedState.routingTable().index(request.index)) { - for (ShardRouting shardRouting : indexShardRoutingTable) { - // if we have a routing for this shard on a node, and its not the master node (since we already created - // an index on it), then add it - if (shardRouting.currentNodeId() != null && !updatedState.nodes().localNodeId().equals(shardRouting.currentNodeId())) { - allocatedNodes.add(shardRouting.currentNodeId()); + // we wait for events from all nodes that the index has been added to the metadata + final AtomicInteger counter = new AtomicInteger(currentState.nodes().size()); + + final NodeIndexCreatedAction.Listener nodeIndexCreatedListener = new NodeIndexCreatedAction.Listener() { + @Override public void onNodeIndexCreated(String index, String nodeId) { + if (index.equals(request.index)) { + if (counter.decrementAndGet() == 0) { + listener.onResponse(new Response(true, indexMetaData)); + nodeIndexCreatedAction.remove(this); } } } - } + }; - if (!allocatedNodes.isEmpty()) { - final AtomicInteger counter = new AtomicInteger(allocatedNodes.size()); - - final NodeIndexCreatedAction.Listener nodeIndexCreatedListener = new NodeIndexCreatedAction.Listener() { - @Override public void onNodeIndexCreated(String index, String nodeId) { - if (index.equals(request.index)) { - if (counter.decrementAndGet() == 0) { - listener.onResponse(new Response(true, indexMetaData)); - nodeIndexCreatedAction.remove(this); - } - } - } - }; - - nodeIndexCreatedAction.add(nodeIndexCreatedListener); - - listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() { - @Override public void run() { - listener.onResponse(new Response(false, indexMetaData)); - nodeIndexCreatedAction.remove(nodeIndexCreatedListener); - } - }); - } + nodeIndexCreatedAction.add(nodeIndexCreatedListener); + listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() { + @Override public void run() { + listener.onResponse(new Response(false, indexMetaData)); + nodeIndexCreatedAction.remove(nodeIndexCreatedListener); + } + }); return updatedState; } catch (Exception e) { @@ -318,9 +303,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { } @Override public void clusterStateProcessed(ClusterState clusterState) { - if (allocatedNodes.isEmpty()) { - listener.onResponse(new Response(true, clusterState.metaData().index(request.index))); - } } }); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index dc1d7113ef7..698bdfd559f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -24,12 +24,9 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; -import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -38,7 +35,6 @@ import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -93,46 +89,26 @@ public class MetaDataDeleteIndexService extends AbstractComponent { ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build(); - // initialize the counter only for nodes the shards are allocated to - Set allocatedNodes = Sets.newHashSet(); - if (currentState.routingTable().hasIndex(request.index)) { - for (IndexShardRoutingTable indexShardRoutingTable : currentState.routingTable().index(request.index)) { - for (ShardRouting shardRouting : indexShardRoutingTable) { - if (shardRouting.currentNodeId() != null) { - allocatedNodes.add(shardRouting.currentNodeId()); - } - if (shardRouting.relocatingNodeId() != null) { - allocatedNodes.add(shardRouting.relocatingNodeId()); + final AtomicInteger counter = new AtomicInteger(currentState.nodes().size()); + + final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() { + @Override public void onNodeIndexDeleted(String index, String nodeId) { + if (index.equals(request.index)) { + if (counter.decrementAndGet() == 0) { + listener.onResponse(new Response(true)); + nodeIndexDeletedAction.remove(this); } } } - } + }; + nodeIndexDeletedAction.add(nodeIndexDeleteListener); - if (allocatedNodes.isEmpty()) { - // no nodes allocated, don't wait for a response - listener.onResponse(new Response(true)); - } else { - final AtomicInteger counter = new AtomicInteger(allocatedNodes.size()); - - final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() { - @Override public void onNodeIndexDeleted(String index, String nodeId) { - if (index.equals(request.index)) { - if (counter.decrementAndGet() == 0) { - listener.onResponse(new Response(true)); - nodeIndexDeletedAction.remove(this); - } - } - } - }; - nodeIndexDeletedAction.add(nodeIndexDeleteListener); - - listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() { - @Override public void run() { - listener.onResponse(new Response(false)); - nodeIndexDeletedAction.remove(nodeIndexDeleteListener); - } - }); - } + listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() { + @Override public void run() { + listener.onResponse(new Response(false)); + nodeIndexDeletedAction.remove(nodeIndexDeleteListener); + } + }); return newClusterStateBuilder().state(currentState).routingResult(routingResult).metaData(newMetaData).blocks(blocks).build(); } catch (Exception e) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 277c87b91b0..a6e04963924 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -180,6 +180,24 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent