diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 32cc265a6c7..d73527a27c4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -37,7 +37,7 @@ import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedShard; @@ -92,7 +92,7 @@ public class ShardStateAction { @Inject public ShardStateAction(ClusterService clusterService, TransportService transportService, - AllocationService allocationService, RoutingService routingService, ThreadPool threadPool) { + AllocationService allocationService, RerouteService rerouteService, ThreadPool threadPool) { this.transportService = transportService; this.clusterService = clusterService; this.threadPool = threadPool; @@ -101,7 +101,7 @@ public class ShardStateAction { new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger)); transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ThreadPool.Names.SAME, FailedShardEntry::new, new ShardFailedTransportHandler(clusterService, - new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger)); + new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService, logger), logger)); } private void sendShardAction(final String actionName, final ClusterState currentState, @@ -283,12 +283,12 @@ public class ShardStateAction { public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor { private final AllocationService allocationService; - private final RoutingService routingService; + private final RerouteService rerouteService; private final Logger logger; - public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, Logger logger) { + public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService, Logger logger) { this.allocationService = allocationService; - this.routingService = routingService; + this.rerouteService = rerouteService; this.logger = logger; } @@ -382,7 +382,7 @@ public class ShardStateAction { if (logger.isTraceEnabled()) { logger.trace("{}, scheduling a reroute", reason); } - routingService.reroute(reason, ActionListener.wrap( + rerouteService.reroute(reason, ActionListener.wrap( r -> logger.trace("{}, reroute completed", reason), e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e))); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index b9f3fdbd8c7..d2e71e3905e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -40,6 +40,7 @@ import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; @@ -150,15 +151,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. * @param onJoinValidators A collection of join validators to restrict which nodes may join the cluster. - * @param reroute A callback to call when the membership of the cluster has changed, to recalculate the assignment of shards. In - * production code this calls - * {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String, ActionListener)}. */ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService, Supplier persistedStateSupplier, SeedHostsProvider seedHostsProvider, ClusterApplier clusterApplier, Collection> onJoinValidators, Random random, - BiConsumer> reroute, ElectionStrategy electionStrategy) { + RerouteService rerouteService, ElectionStrategy electionStrategy) { this.settings = settings; this.transportService = transportService; this.masterService = masterService; @@ -168,7 +166,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.electionStrategy = electionStrategy; this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators, - reroute); + rerouteService); this.persistedStateSupplier = persistedStateSupplier; this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings); this.lastKnownLeader = Optional.empty(); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 7d4a1f41cd2..b8e58d6bbed 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.coordination.Coordinator.Mode; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Priority; @@ -93,11 +94,11 @@ public class JoinHelper { JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, BiConsumer joinHandler, Function joinLeaderInTerm, - Collection> joinValidators, BiConsumer> reroute) { + Collection> joinValidators, RerouteService rerouteService) { this.masterService = masterService; this.transportService = transportService; this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); - this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, reroute) { + this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService) { @Override public ClusterTasksResult execute(ClusterState currentState, List joiningTasks) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index 2f129fb1936..9835e702950 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.zen.ElectMasterService; @@ -47,7 +48,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor> reroute; + private final RerouteService rerouteService; private final int minimumMasterNodesOnLocalNode; @@ -86,12 +87,11 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor> reroute) { + public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger, RerouteService rerouteService) { this.allocationService = allocationService; this.logger = logger; minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings); - this.reroute = reroute; + this.rerouteService = rerouteService; } @Override @@ -155,7 +155,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor logger.trace("post-join reroute completed"), e -> logger.debug("post-join reroute failed", e))); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java similarity index 80% rename from server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java rename to server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java index 7068f907905..4ed4caadabe 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java @@ -25,32 +25,22 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainListenableActionFuture; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; import java.util.function.BiFunction; /** - * A {@link RoutingService} listens to clusters state. When this service - * receives a {@link ClusterChangedEvent} the cluster state will be verified and - * the routing tables might be updated. - *

- * Note: The {@link RoutingService} is responsible for cluster wide operations - * that include modifications to the cluster state. Such an operation can only - * be performed on the clusters master node. Unless the local node this service - * is running on is the clusters master node this service will not perform any - * actions. - *

+ * A {@link BatchedRerouteService} is a {@link RerouteService} that batches together reroute requests to avoid unnecessary extra reroutes. + * This component only does meaningful work on the elected master node. Reroute requests will fail with a {@link NotMasterException} on + * other nodes. */ -public class RoutingService extends AbstractLifecycleComponent { - private static final Logger logger = LogManager.getLogger(RoutingService.class); +public class BatchedRerouteService implements RerouteService { + private static final Logger logger = LogManager.getLogger(BatchedRerouteService.class); private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute"; @@ -61,33 +51,19 @@ public class RoutingService extends AbstractLifecycleComponent { @Nullable // null if no reroute is currently pending private PlainListenableActionFuture pendingRerouteListeners; - @Inject - public RoutingService(ClusterService clusterService, BiFunction reroute) { + /** + * @param reroute Function that computes the updated cluster state after it has been rerouted. + */ + public BatchedRerouteService(ClusterService clusterService, BiFunction reroute) { this.clusterService = clusterService; this.reroute = reroute; } - @Override - protected void doStart() { - } - - @Override - protected void doStop() { - } - - @Override - protected void doClose() { - } - /** * Initiates a reroute. */ + @Override public final void reroute(String reason, ActionListener listener) { - if (lifecycle.started() == false) { - listener.onFailure(new IllegalStateException( - "rejecting delayed reroute [" + reason + "] in state [" + lifecycleState() + "]")); - return; - } final PlainListenableActionFuture currentListeners; synchronized (mutex) { if (pendingRerouteListeners != null) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/LazilyInitializedRerouteService.java b/server/src/main/java/org/elasticsearch/cluster/routing/LazilyInitializedRerouteService.java new file mode 100644 index 00000000000..672d6dbc3a0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/LazilyInitializedRerouteService.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; + +/** + * A {@link RerouteService} that can be initialized lazily. The real reroute service, {@link BatchedRerouteService}, depends on components + * constructed quite late in the construction of the node, but other components constructed earlier eventually need access to the reroute + * service too. + */ +public class LazilyInitializedRerouteService implements RerouteService { + + private final SetOnce delegate = new SetOnce<>(); + + @Override + public void reroute(String reason, ActionListener listener) { + assert delegate.get() != null; + delegate.get().reroute(reason, listener); + } + + public void setRerouteService(RerouteService rerouteService) { + delegate.set(rerouteService); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java b/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java new file mode 100644 index 00000000000..11a49322e10 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.action.ActionListener; + +/** + * Asynchronously performs a cluster reroute, updating any shard states and rebalancing the cluster if appropriate. + */ +@FunctionalInterface +public interface RerouteService { + void reroute(String reason, ActionListener listener); +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index 96e4974b9b4..7177cf8bef4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -19,20 +19,10 @@ package org.elasticsearch.cluster.routing.allocation; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.function.LongSupplier; -import java.util.function.Supplier; - import com.carrotsearch.hppc.ObjectLookupContainer; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.client.Client; @@ -41,6 +31,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; @@ -49,6 +40,13 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; +import java.util.function.Supplier; + /** * Listens for a node to go over the high watermark and kicks off an empty * reroute if it does. Also responsible for logging about nodes that have @@ -63,14 +61,15 @@ public class DiskThresholdMonitor { private final Set nodeHasPassedWatermark = Sets.newConcurrentHashSet(); private final Supplier clusterStateSupplier; private final LongSupplier currentTimeMillisSupplier; + private final RerouteService rerouteService; private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE); private final AtomicBoolean checkInProgress = new AtomicBoolean(); - private final SetOnce>> rerouteAction = new SetOnce<>(); public DiskThresholdMonitor(Settings settings, Supplier clusterStateSupplier, ClusterSettings clusterSettings, - Client client, LongSupplier currentTimeMillisSupplier) { + Client client, LongSupplier currentTimeMillisSupplier, RerouteService rerouteService) { this.clusterStateSupplier = clusterStateSupplier; this.currentTimeMillisSupplier = currentTimeMillisSupplier; + this.rerouteService = rerouteService; this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); this.client = client; } @@ -111,8 +110,6 @@ public class DiskThresholdMonitor { public void onNewInfo(ClusterInfo info) { - assert rerouteAction.get() != null; - if (checkInProgress.compareAndSet(false, true) == false) { logger.info("skipping monitor as a check is already in progress"); return; @@ -188,7 +185,7 @@ public class DiskThresholdMonitor { if (reroute) { logger.info("rerouting shards: [{}]", explanation); - rerouteAction.get().accept(ActionListener.wrap(r -> { + rerouteService.reroute("disk threshold monitor", ActionListener.wrap(r -> { setLastRunTimeMillis(); listener.onResponse(r); }, e -> { @@ -225,8 +222,4 @@ public class DiskThresholdMonitor { .setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()) .execute(ActionListener.map(listener, r -> null)); } - - public void setRerouteAction(BiConsumer> rerouteAction) { - this.rerouteAction.set(listener -> rerouteAction.accept("disk threshold monitor", listener)); - } } diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index a74047c35b2..178d3c8e354 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -25,7 +25,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.ElectionStrategy; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.ClusterApplierService; @@ -92,7 +92,7 @@ public class DiscoveryModule { NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService, ClusterApplier clusterApplier, ClusterSettings clusterSettings, List plugins, AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState, - RoutingService routingService) { + RerouteService rerouteService) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); hostProviders.put("settings", () -> new SettingsBasedSeedHostsProvider(settings, transportService)); @@ -154,10 +154,10 @@ public class DiscoveryModule { settings, clusterSettings, transportService, namedWriteableRegistry, allocationService, masterService, () -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider, - clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), routingService::reroute, electionStrategy); + clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService, electionStrategy); } else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) { discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, - clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState, routingService::reroute); + clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState, rerouteService); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index f382a6b7a84..cfc5e521967 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -22,7 +22,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; @@ -30,6 +29,7 @@ import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.coordination.JoinTaskExecutor; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Priority; @@ -44,7 +44,6 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; /** * This class processes incoming join request (passed zia {@link ZenDiscovery}). Incoming nodes @@ -63,9 +62,9 @@ public class NodeJoinController { public NodeJoinController(Settings settings, MasterService masterService, AllocationService allocationService, - ElectMasterService electMaster, BiConsumer> reroute) { + ElectMasterService electMaster, RerouteService rerouteService) { this.masterService = masterService; - joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, reroute) { + joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService) { @Override public void clusterStatePublished(ClusterChangedEvent event) { electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state()); diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 09dcd74c6e3..918f9b68466 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.coordination.NoMasterBlockService; import org.elasticsearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; @@ -165,7 +166,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier, ClusterSettings clusterSettings, SeedHostsProvider hostsProvider, AllocationService allocationService, Collection> onJoinValidators, GatewayMetaState gatewayMetaState, - BiConsumer> reroute) { + RerouteService rerouteService) { this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.masterService = masterService; this.clusterApplier = clusterApplier; @@ -226,7 +227,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover this.membership = new MembershipAction(transportService, new MembershipListener(), onJoinValidators); this.joinThreadControl = new JoinThreadControl(); - this.nodeJoinController = new NodeJoinController(settings, masterService, allocationService, electMaster, reroute); + this.nodeJoinController = new NodeJoinController(settings, masterService, allocationService, electMaster, rerouteService); this.nodeRemovalExecutor = new ZenNodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::submitRejoin, logger); masterService.setClusterStateSupplier(this::clusterState); diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 6543d5d1174..7451f25460e 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -25,8 +25,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.RoutingNodes; -import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.elasticsearch.cluster.routing.allocation.FailedShard; @@ -44,7 +44,7 @@ public class GatewayAllocator { private static final Logger logger = LogManager.getLogger(GatewayAllocator.class); - private final RoutingService routingService; + private final RerouteService rerouteService; private final PrimaryShardAllocator primaryShardAllocator; private final ReplicaShardAllocator replicaShardAllocator; @@ -55,10 +55,10 @@ public class GatewayAllocator { asyncFetchStore = ConcurrentCollections.newConcurrentMap(); @Inject - public GatewayAllocator(RoutingService routingService, + public GatewayAllocator(RerouteService rerouteService, TransportNodesListGatewayStartedShards startedAction, TransportNodesListShardStoreMetaData storeAction) { - this.routingService = routingService; + this.rerouteService = rerouteService; this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction); this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction); } @@ -72,7 +72,7 @@ public class GatewayAllocator { // for tests protected GatewayAllocator() { - this.routingService = null; + this.rerouteService = null; this.primaryShardAllocator = null; this.replicaShardAllocator = null; } @@ -139,7 +139,7 @@ public class GatewayAllocator { @Override protected void reroute(ShardId shardId, String reason) { logger.trace("{} scheduling reroute for {}", shardId, reason); - routingService.reroute("async_shard_fetch", ActionListener.wrap( + rerouteService.reroute("async_shard_fetch", ActionListener.wrap( r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason), e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e))); } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index ff129adf5ad..35ddb3758cf 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -56,7 +56,9 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.metadata.TemplateUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.BatchedRerouteService; +import org.elasticsearch.cluster.routing.LazilyInitializedRerouteService; +import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.StopWatch; @@ -368,8 +370,9 @@ public class Node implements Closeable { .newHashPublisher()); final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); + final LazilyInitializedRerouteService lazilyInitializedRerouteService = new LazilyInitializedRerouteService(); final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state, - clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis); + clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, lazilyInitializedRerouteService); final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client, diskThresholdMonitor::onNewInfo); final UsageService usageService = new UsageService(); @@ -506,16 +509,17 @@ public class Node implements Closeable { RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(), metaDataCreateIndexService, metaDataIndexUpgradeService, clusterService.getClusterSettings()); - final RoutingService routingService = new RoutingService(clusterService, clusterModule.getAllocationService()::reroute); + final RerouteService rerouteService + = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute); + lazilyInitializedRerouteService.setRerouteService(rerouteService); final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry, networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(), clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class), - clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, routingService); + clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService); this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(), transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(), httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, searchTransportService); - diskThresholdMonitor.setRerouteAction(routingService::reroute); final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(), @@ -586,7 +590,7 @@ public class Node implements Closeable { b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService); b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus); b.bind(RestoreService.class).toInstance(restoreService); - b.bind(RoutingService.class).toInstance(routingService); + b.bind(RerouteService.class).toInstance(rerouteService); } ); injector = modules.createInjector(); @@ -674,7 +678,6 @@ public class Node implements Closeable { injector.getInstance(IndicesClusterStateService.class).start(); injector.getInstance(SnapshotsService.class).start(); injector.getInstance(SnapshotShardsService.class).start(); - injector.getInstance(RoutingService.class).start(); injector.getInstance(SearchService.class).start(); nodeService.getMonitorService().start(); @@ -792,7 +795,6 @@ public class Node implements Closeable { // This can confuse other nodes and delay things - mostly if we're the master and we're running tests. injector.getInstance(Discovery.class).stop(); // we close indices first, so operations won't be allowed on it - injector.getInstance(RoutingService.class).stop(); injector.getInstance(ClusterService.class).stop(); injector.getInstance(NodeConnectionsService.class).stop(); nodeService.getMonitorService().stop(); @@ -842,8 +844,6 @@ public class Node implements Closeable { toClose.add(injector.getInstance(IndicesService.class)); // close filter/fielddata caches after indices toClose.add(injector.getInstance(IndicesStore.class)); - toClose.add(() -> stopWatch.stop().start("routing")); - toClose.add(injector.getInstance(RoutingService.class)); toClose.add(() -> stopWatch.stop().start("cluster")); toClose.add(injector.getInstance(ClusterService.class)); toClose.add(() -> stopWatch.stop().start("node_connections_service")); diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 27342d4c64f..072ce043703 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -32,7 +32,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardsIterator; @@ -92,8 +92,8 @@ public class ShardStateActionTests extends ESTestCase { private static class TestShardStateAction extends ShardStateAction { TestShardStateAction(ClusterService clusterService, TransportService transportService, - AllocationService allocationService, RoutingService routingService) { - super(clusterService, transportService, allocationService, routingService, THREAD_POOL); + AllocationService allocationService, RerouteService rerouteService) { + super(clusterService, transportService, allocationService, rerouteService, THREAD_POOL); } private Runnable onBeforeWaitForNewMasterAndRetry; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java similarity index 79% rename from server/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java rename to server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java index 5368c1c5544..966ac1e6065 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; @@ -41,9 +40,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.Matchers.lessThan; -import static org.hamcrest.Matchers.startsWith; -public class RoutingServiceTests extends ESTestCase { +public class BatchedRerouteServiceTests extends ESTestCase { private ThreadPool threadPool; private ClusterService clusterService; @@ -60,38 +58,19 @@ public class RoutingServiceTests extends ESTestCase { threadPool.shutdown(); } - public void testRejectionUnlessStarted() { - final RoutingService routingService = new RoutingService(clusterService, (s, r) -> s); - final PlainActionFuture future = new PlainActionFuture<>(); - - if (randomBoolean()) { - routingService.start(); - routingService.stop(); - } else if (randomBoolean()) { - routingService.close(); - } - - routingService.reroute("test", future); - assertTrue(future.isDone()); - assertThat(expectThrows(IllegalStateException.class, future::actionGet).getMessage(), - startsWith("rejecting delayed reroute [test] in state [")); - } - public void testReroutesWhenRequested() throws InterruptedException { final AtomicLong rerouteCount = new AtomicLong(); - final RoutingService routingService = new RoutingService(clusterService, (s, r) -> { + final BatchedRerouteService batchedRerouteService = new BatchedRerouteService(clusterService, (s, r) -> { rerouteCount.incrementAndGet(); return s; }); - routingService.start(); - long rerouteCountBeforeReroute = 0L; final int iterations = between(1, 100); final CountDownLatch countDownLatch = new CountDownLatch(iterations); for (int i = 0; i < iterations; i++) { rerouteCountBeforeReroute = Math.max(rerouteCountBeforeReroute, rerouteCount.get()); - routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); + batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); } countDownLatch.await(10, TimeUnit.SECONDS); assertThat(rerouteCountBeforeReroute, lessThan(rerouteCount.get())); @@ -116,17 +95,15 @@ public class RoutingServiceTests extends ESTestCase { cyclicBarrier.await(); // wait for master thread to be blocked final AtomicBoolean rerouteExecuted = new AtomicBoolean(); - final RoutingService routingService = new RoutingService(clusterService, (s, r) -> { + final BatchedRerouteService batchedRerouteService = new BatchedRerouteService(clusterService, (s, r) -> { assertTrue(rerouteExecuted.compareAndSet(false, true)); // only called once return s; }); - routingService.start(); - final int iterations = between(1, 100); final CountDownLatch countDownLatch = new CountDownLatch(iterations); for (int i = 0; i < iterations; i++) { - routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); + batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); } cyclicBarrier.await(); // allow master thread to continue; @@ -136,18 +113,17 @@ public class RoutingServiceTests extends ESTestCase { public void testNotifiesOnFailure() throws InterruptedException { - final RoutingService routingService = new RoutingService(clusterService, (s, r) -> { + final BatchedRerouteService batchedRerouteService = new BatchedRerouteService(clusterService, (s, r) -> { if (rarely()) { throw new ElasticsearchException("simulated"); } return randomBoolean() ? s : ClusterState.builder(s).build(); }); - routingService.start(); final int iterations = between(1, 100); final CountDownLatch countDownLatch = new CountDownLatch(iterations); for (int i = 0; i < iterations; i++) { - routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); + batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); if (rarely()) { clusterService.getMasterService().setClusterStatePublisher( randomBoolean() diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 5ba5b7a0a70..e46c899dbec 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -77,7 +77,10 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase { AtomicReference> indices = new AtomicReference<>(); AtomicLong currentTime = new AtomicLong(); DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) { + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> { + assertTrue(reroute.compareAndSet(false, true)); + listener.onResponse(null); + }) { @Override protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); @@ -85,11 +88,6 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase { } }; - monitor.setRerouteAction((reason, listener) -> { - assertTrue(reroute.compareAndSet(false, true)); - listener.onResponse(null); - }); - ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4)); builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 30)); @@ -119,17 +117,16 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase { assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) { + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> { + assertTrue(reroute.compareAndSet(false, true)); + listener.onResponse(null); + }) { @Override protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); listener.onResponse(null); } }; - monitor.setRerouteAction((reason, listener) -> { - assertTrue(reroute.compareAndSet(false, true)); - listener.onResponse(null); - }); indices.set(null); reroute.set(false); @@ -147,18 +144,16 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase { AtomicLong currentTime = new AtomicLong(); AtomicReference> listenerReference = new AtomicReference<>(); DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) { + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> { + assertNotNull(listener); + assertTrue(listenerReference.compareAndSet(null, listener)); + }) { @Override protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { throw new AssertionError("unexpected"); } }; - monitor.setRerouteAction((reason, listener) -> { - assertNotNull(listener); - assertTrue(listenerReference.compareAndSet(null, listener)); - }); - final ImmutableOpenMap.Builder allDisksOkBuilder; allDisksOkBuilder = ImmutableOpenMap.builder(); allDisksOkBuilder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 50)); diff --git a/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java index d4255f23298..b705c53d5ab 100644 --- a/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java @@ -22,7 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -92,7 +92,7 @@ public class DiscoveryModuleTests extends ESTestCase { private DiscoveryModule newModule(Settings settings, List plugins) { return new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry, null, masterService, clusterApplier, clusterSettings, plugins, null, createTempDir().toAbsolutePath(), gatewayMetaState, - mock(RoutingService.class)); + mock(RerouteService.class)); } public void testDefaults() { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 51cdc6bd780..bbbf24a993f 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -100,7 +100,7 @@ import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.BatchedRerouteService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -1065,7 +1065,7 @@ public class SnapshotResiliencyTests extends ESTestCase { transportService, indicesService, actionFilters, indexNameExpressionResolver); final ShardStateAction shardStateAction = new ShardStateAction( clusterService, transportService, allocationService, - new RoutingService(clusterService, allocationService::reroute), + new BatchedRerouteService(clusterService, allocationService::reroute), threadPool ); final MetaDataMappingService metaDataMappingService = new MetaDataMappingService(clusterService, indicesService); @@ -1248,7 +1248,7 @@ public class SnapshotResiliencyTests extends ESTestCase { hostsResolver -> testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()) .map(n -> n.node.getAddress()).collect(Collectors.toList()), clusterService.getClusterApplierService(), Collections.emptyList(), random(), - new RoutingService(clusterService, allocationService::reroute)::reroute, ElectionStrategy.DEFAULT_INSTANCE); + new BatchedRerouteService(clusterService, allocationService::reroute), ElectionStrategy.DEFAULT_INSTANCE); masterService.setClusterStatePublisher(coordinator); coordinator.start(); masterService.start();