improve gateway initial recovery time by creating the indices, and only after all have been created, do the rerouting
This commit is contained in:
parent
763f986a30
commit
658594fa70
|
@ -100,7 +100,7 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
|
||||||
}
|
}
|
||||||
clusterService.add(request.masterNodeTimeout(), new TimeoutClusterStateListener() {
|
clusterService.add(request.masterNodeTimeout(), new TimeoutClusterStateListener() {
|
||||||
@Override public void postAdded() {
|
@Override public void postAdded() {
|
||||||
ClusterBlockException blockException = checkBlock(request, clusterState);
|
ClusterBlockException blockException = checkBlock(request, clusterService.state());
|
||||||
if (blockException == null || !blockException.retryable()) {
|
if (blockException == null || !blockException.retryable()) {
|
||||||
clusterService.remove(this);
|
clusterService.remove(this);
|
||||||
innerExecute(request, listener, false);
|
innerExecute(request, listener, false);
|
||||||
|
|
|
@ -262,6 +262,10 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||||
listener.onResponse(new Response(true, clusterState.metaData().index(request.index)));
|
listener.onResponse(new Response(true, clusterState.metaData().index(request.index)));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (!request.rerouteAfterCreation) {
|
||||||
|
listener.onResponse(new Response(true, clusterState.metaData().index(request.index)));
|
||||||
|
return;
|
||||||
|
}
|
||||||
clusterService.submitStateUpdateTask("reroute after index [" + request.index + "] creation", new ProcessedClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("reroute after index [" + request.index + "] creation", new ProcessedClusterStateUpdateTask() {
|
||||||
@Override public ClusterState execute(ClusterState currentState) {
|
@Override public ClusterState execute(ClusterState currentState) {
|
||||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(currentState.routingTable());
|
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(currentState.routingTable());
|
||||||
|
@ -377,6 +381,8 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||||
|
|
||||||
Set<ClusterBlock> blocks = Sets.newHashSet();
|
Set<ClusterBlock> blocks = Sets.newHashSet();
|
||||||
|
|
||||||
|
boolean rerouteAfterCreation = true;
|
||||||
|
|
||||||
public Request(Origin origin, String cause, String index) {
|
public Request(Origin origin, String cause, String index) {
|
||||||
this.origin = origin;
|
this.origin = origin;
|
||||||
this.cause = cause;
|
this.cause = cause;
|
||||||
|
@ -421,6 +427,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||||
this.timeout = timeout;
|
this.timeout = timeout;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Request rerouteAfterCreation(boolean rerouteAfterCreation) {
|
||||||
|
this.rerouteAfterCreation = rerouteAfterCreation;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Response {
|
public static class Response {
|
||||||
|
|
|
@ -29,6 +29,10 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
|
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -57,6 +61,8 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
|
private final ShardsAllocation shardsAllocation;
|
||||||
|
|
||||||
private final ClusterService clusterService;
|
private final ClusterService clusterService;
|
||||||
|
|
||||||
private final DiscoveryService discoveryService;
|
private final DiscoveryService discoveryService;
|
||||||
|
@ -75,9 +81,10 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
||||||
private final AtomicBoolean recovered = new AtomicBoolean();
|
private final AtomicBoolean recovered = new AtomicBoolean();
|
||||||
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
|
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
|
||||||
|
|
||||||
@Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService, MetaDataCreateIndexService createIndexService, ThreadPool threadPool) {
|
@Inject public GatewayService(Settings settings, Gateway gateway, ShardsAllocation shardsAllocation, ClusterService clusterService, DiscoveryService discoveryService, MetaDataCreateIndexService createIndexService, ThreadPool threadPool) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.gateway = gateway;
|
this.gateway = gateway;
|
||||||
|
this.shardsAllocation = shardsAllocation;
|
||||||
this.clusterService = clusterService;
|
this.clusterService = clusterService;
|
||||||
this.discoveryService = discoveryService;
|
this.discoveryService = discoveryService;
|
||||||
this.createIndexService = createIndexService;
|
this.createIndexService = createIndexService;
|
||||||
|
@ -240,12 +247,16 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
||||||
}
|
}
|
||||||
// go over the meta data and create indices, we don't really need to copy over
|
// go over the meta data and create indices, we don't really need to copy over
|
||||||
// the meta data per index, since we create the index and it will be added automatically
|
// the meta data per index, since we create the index and it will be added automatically
|
||||||
|
|
||||||
|
// also, don't reroute (or even initialize the routing table) for the indices created, we will do it
|
||||||
|
// in one batch once creating those indices is done
|
||||||
for (final IndexMetaData indexMetaData : recoveredState.metaData()) {
|
for (final IndexMetaData indexMetaData : recoveredState.metaData()) {
|
||||||
try {
|
try {
|
||||||
createIndexService.createIndex(new MetaDataCreateIndexService.Request(MetaDataCreateIndexService.Request.Origin.GATEWAY, "gateway", indexMetaData.index())
|
createIndexService.createIndex(new MetaDataCreateIndexService.Request(MetaDataCreateIndexService.Request.Origin.GATEWAY, "gateway", indexMetaData.index())
|
||||||
.settings(indexMetaData.settings())
|
.settings(indexMetaData.settings())
|
||||||
.mappingsMetaData(indexMetaData.mappings())
|
.mappingsMetaData(indexMetaData.mappings())
|
||||||
.state(indexMetaData.state())
|
.state(indexMetaData.state())
|
||||||
|
.rerouteAfterCreation(false)
|
||||||
.timeout(timeValueSeconds(30)),
|
.timeout(timeValueSeconds(30)),
|
||||||
|
|
||||||
new MetaDataCreateIndexService.Listener() {
|
new MetaDataCreateIndexService.Listener() {
|
||||||
|
@ -287,7 +298,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
||||||
}
|
}
|
||||||
|
|
||||||
private void markMetaDataAsReadFromGateway(String reason) {
|
private void markMetaDataAsReadFromGateway(String reason) {
|
||||||
clusterService.submitStateUpdateTask("gateway (marked as read, reason=" + reason + ")", new ClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("gateway (marked as read, reroute, reason=" + reason + ")", new ProcessedClusterStateUpdateTask() {
|
||||||
@Override public ClusterState execute(ClusterState currentState) {
|
@Override public ClusterState execute(ClusterState currentState) {
|
||||||
if (!currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
|
if (!currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
|
||||||
return currentState;
|
return currentState;
|
||||||
|
@ -295,7 +306,23 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
||||||
// remove the block, since we recovered from gateway
|
// remove the block, since we recovered from gateway
|
||||||
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(STATE_NOT_RECOVERED_BLOCK);
|
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(STATE_NOT_RECOVERED_BLOCK);
|
||||||
|
|
||||||
return newClusterStateBuilder().state(currentState).blocks(blocks).build();
|
// initialize all index routing tables as empty
|
||||||
|
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(currentState.routingTable());
|
||||||
|
for (IndexMetaData indexMetaData : currentState.metaData().indices().values()) {
|
||||||
|
if (indexMetaData.state() == IndexMetaData.State.OPEN) {
|
||||||
|
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.index())
|
||||||
|
.initializeEmpty(currentState.metaData().index(indexMetaData.index()), false);
|
||||||
|
routingTableBuilder.add(indexRoutingBuilder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build());
|
||||||
|
|
||||||
|
return newClusterStateBuilder().state(currentState).blocks(blocks).routingResult(routingResult).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
||||||
|
logger.info("all indices created and rerouting has begun");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue