diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java index 6519eb18f86..c549dabab97 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java @@ -100,7 +100,7 @@ public abstract class TransportMasterNodeOperationAction blocks = Sets.newHashSet(); + boolean rerouteAfterCreation = true; + public Request(Origin origin, String cause, String index) { this.origin = origin; this.cause = cause; @@ -421,6 +427,11 @@ public class MetaDataCreateIndexService extends AbstractComponent { this.timeout = timeout; return this; } + + public Request rerouteAfterCreation(boolean rerouteAfterCreation) { + this.rerouteAfterCreation = rerouteAfterCreation; + return this; + } } public static class Response { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index eca983f299d..b5d87f88553 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -29,6 +29,10 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -57,6 +61,8 @@ public class GatewayService extends AbstractLifecycleComponent i private final ThreadPool threadPool; + private final ShardsAllocation shardsAllocation; + private final ClusterService clusterService; private final DiscoveryService discoveryService; @@ -75,9 +81,10 @@ public class GatewayService extends AbstractLifecycleComponent i private final AtomicBoolean recovered = new AtomicBoolean(); private final AtomicBoolean scheduledRecovery = new AtomicBoolean(); - @Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService, MetaDataCreateIndexService createIndexService, ThreadPool threadPool) { + @Inject public GatewayService(Settings settings, Gateway gateway, ShardsAllocation shardsAllocation, ClusterService clusterService, DiscoveryService discoveryService, MetaDataCreateIndexService createIndexService, ThreadPool threadPool) { super(settings); this.gateway = gateway; + this.shardsAllocation = shardsAllocation; this.clusterService = clusterService; this.discoveryService = discoveryService; this.createIndexService = createIndexService; @@ -240,12 +247,16 @@ public class GatewayService extends AbstractLifecycleComponent i } // go over the meta data and create indices, we don't really need to copy over // the meta data per index, since we create the index and it will be added automatically + + // also, don't reroute (or even initialize the routing table) for the indices created, we will do it + // in one batch once creating those indices is done for (final IndexMetaData indexMetaData : recoveredState.metaData()) { try { createIndexService.createIndex(new MetaDataCreateIndexService.Request(MetaDataCreateIndexService.Request.Origin.GATEWAY, "gateway", indexMetaData.index()) .settings(indexMetaData.settings()) .mappingsMetaData(indexMetaData.mappings()) .state(indexMetaData.state()) + .rerouteAfterCreation(false) .timeout(timeValueSeconds(30)), new MetaDataCreateIndexService.Listener() { @@ -287,7 +298,7 @@ public class GatewayService extends AbstractLifecycleComponent i } private void markMetaDataAsReadFromGateway(String reason) { - clusterService.submitStateUpdateTask("gateway (marked as read, reason=" + reason + ")", new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("gateway (marked as read, reroute, reason=" + reason + ")", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { if (!currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) { return currentState; @@ -295,7 +306,23 @@ public class GatewayService extends AbstractLifecycleComponent i // remove the block, since we recovered from gateway ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(STATE_NOT_RECOVERED_BLOCK); - return newClusterStateBuilder().state(currentState).blocks(blocks).build(); + // initialize all index routing tables as empty + RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(currentState.routingTable()); + for (IndexMetaData indexMetaData : currentState.metaData().indices().values()) { + if (indexMetaData.state() == IndexMetaData.State.OPEN) { + IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.index()) + .initializeEmpty(currentState.metaData().index(indexMetaData.index()), false); + routingTableBuilder.add(indexRoutingBuilder); + } + } + + RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build()); + + return newClusterStateBuilder().state(currentState).blocks(blocks).routingResult(routingResult).build(); + } + + @Override public void clusterStateProcessed(ClusterState clusterState) { + logger.info("all indices created and rerouting has begun"); } }); }