Rename and refactor RoutingService (#43827)

The `RoutingService` has a confusing name, since it doesn't really have
anything to do with routing. Its responsibility is submitting reroute commands
to the master.

This commit renames this class to `BatchedRerouteService`, and extracts the
`RerouteService` interface to avoid passing `BiConsumer`s everywhere. It also
removes that `BatchedRerouteService extends AbstractLifecycleComponent` since
this service has no meaningful lifecycle. Finally, it introduces a small
wrapper class to allow for lazy initialization to deal with the dependency loop
when constructing a `Node`.
This commit is contained in:
David Turner 2019-07-02 06:37:33 +01:00
parent 81f60652d8
commit 1e8e85797d
18 changed files with 164 additions and 154 deletions

View File

@ -37,7 +37,7 @@ import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedShard;
@ -92,7 +92,7 @@ public class ShardStateAction {
@Inject
public ShardStateAction(ClusterService clusterService, TransportService transportService,
AllocationService allocationService, RoutingService routingService, ThreadPool threadPool) {
AllocationService allocationService, RerouteService rerouteService, ThreadPool threadPool) {
this.transportService = transportService;
this.clusterService = clusterService;
this.threadPool = threadPool;
@ -101,7 +101,7 @@ public class ShardStateAction {
new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ThreadPool.Names.SAME, FailedShardEntry::new,
new ShardFailedTransportHandler(clusterService,
new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService, logger), logger));
}
private void sendShardAction(final String actionName, final ClusterState currentState,
@ -283,12 +283,12 @@ public class ShardStateAction {
public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor<FailedShardEntry> {
private final AllocationService allocationService;
private final RoutingService routingService;
private final RerouteService rerouteService;
private final Logger logger;
public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, Logger logger) {
public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService, Logger logger) {
this.allocationService = allocationService;
this.routingService = routingService;
this.rerouteService = rerouteService;
this.logger = logger;
}
@ -382,7 +382,7 @@ public class ShardStateAction {
if (logger.isTraceEnabled()) {
logger.trace("{}, scheduling a reroute", reason);
}
routingService.reroute(reason, ActionListener.wrap(
rerouteService.reroute(reason, ActionListener.wrap(
r -> logger.trace("{}, reroute completed", reason),
e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)));
}

View File

@ -40,6 +40,7 @@ import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
@ -150,15 +151,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
* @param onJoinValidators A collection of join validators to restrict which nodes may join the cluster.
* @param reroute A callback to call when the membership of the cluster has changed, to recalculate the assignment of shards. In
* production code this calls
* {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String, ActionListener)}.
*/
public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
BiConsumer<String, ActionListener<Void>> reroute, ElectionStrategy electionStrategy) {
RerouteService rerouteService, ElectionStrategy electionStrategy) {
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;
@ -168,7 +166,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.electionStrategy = electionStrategy;
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
reroute);
rerouteService);
this.persistedStateSupplier = persistedStateSupplier;
this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings);
this.lastKnownLeader = Optional.empty();

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
@ -93,11 +94,11 @@ public class JoinHelper {
JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, BiConsumer<String, ActionListener<Void>> reroute) {
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService) {
this.masterService = masterService;
this.transportService = transportService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, reroute) {
this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {
@Override
public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.ElectMasterService;
@ -47,7 +48,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
private final AllocationService allocationService;
private final Logger logger;
private final BiConsumer<String, ActionListener<Void>> reroute;
private final RerouteService rerouteService;
private final int minimumMasterNodesOnLocalNode;
@ -86,12 +87,11 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
}
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger,
BiConsumer<String, ActionListener<Void>> reroute) {
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger, RerouteService rerouteService) {
this.allocationService = allocationService;
this.logger = logger;
minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
this.reroute = reroute;
this.rerouteService = rerouteService;
}
@Override
@ -155,7 +155,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
results.success(joinTask);
}
if (nodesChanged) {
reroute.accept("post-join reroute", ActionListener.wrap(
rerouteService.reroute("post-join reroute", ActionListener.wrap(
r -> logger.trace("post-join reroute completed"),
e -> logger.debug("post-join reroute failed", e)));

View File

@ -25,32 +25,22 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import java.util.function.BiFunction;
/**
* A {@link RoutingService} listens to clusters state. When this service
* receives a {@link ClusterChangedEvent} the cluster state will be verified and
* the routing tables might be updated.
* <p>
* Note: The {@link RoutingService} is responsible for cluster wide operations
* that include modifications to the cluster state. Such an operation can only
* be performed on the clusters master node. Unless the local node this service
* is running on is the clusters master node this service will not perform any
* actions.
* </p>
* A {@link BatchedRerouteService} is a {@link RerouteService} that batches together reroute requests to avoid unnecessary extra reroutes.
* This component only does meaningful work on the elected master node. Reroute requests will fail with a {@link NotMasterException} on
* other nodes.
*/
public class RoutingService extends AbstractLifecycleComponent {
private static final Logger logger = LogManager.getLogger(RoutingService.class);
public class BatchedRerouteService implements RerouteService {
private static final Logger logger = LogManager.getLogger(BatchedRerouteService.class);
private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";
@ -61,33 +51,19 @@ public class RoutingService extends AbstractLifecycleComponent {
@Nullable // null if no reroute is currently pending
private PlainListenableActionFuture<Void> pendingRerouteListeners;
@Inject
public RoutingService(ClusterService clusterService, BiFunction<ClusterState, String, ClusterState> reroute) {
/**
* @param reroute Function that computes the updated cluster state after it has been rerouted.
*/
public BatchedRerouteService(ClusterService clusterService, BiFunction<ClusterState, String, ClusterState> reroute) {
this.clusterService = clusterService;
this.reroute = reroute;
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
/**
* Initiates a reroute.
*/
@Override
public final void reroute(String reason, ActionListener<Void> listener) {
if (lifecycle.started() == false) {
listener.onFailure(new IllegalStateException(
"rejecting delayed reroute [" + reason + "] in state [" + lifecycleState() + "]"));
return;
}
final PlainListenableActionFuture<Void> currentListeners;
synchronized (mutex) {
if (pendingRerouteListeners != null) {

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
/**
* A {@link RerouteService} that can be initialized lazily. The real reroute service, {@link BatchedRerouteService}, depends on components
* constructed quite late in the construction of the node, but other components constructed earlier eventually need access to the reroute
* service too.
*/
public class LazilyInitializedRerouteService implements RerouteService {
private final SetOnce<RerouteService> delegate = new SetOnce<>();
@Override
public void reroute(String reason, ActionListener<Void> listener) {
assert delegate.get() != null;
delegate.get().reroute(reason, listener);
}
public void setRerouteService(RerouteService rerouteService) {
delegate.set(rerouteService);
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import org.elasticsearch.action.ActionListener;
/**
* Asynchronously performs a cluster reroute, updating any shard states and rebalancing the cluster if appropriate.
*/
@FunctionalInterface
public interface RerouteService {
void reroute(String reason, ActionListener<Void> listener);
}

View File

@ -19,20 +19,10 @@
package org.elasticsearch.cluster.routing.allocation;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import com.carrotsearch.hppc.ObjectLookupContainer;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.client.Client;
@ -41,6 +31,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
@ -49,6 +40,13 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
/**
* Listens for a node to go over the high watermark and kicks off an empty
* reroute if it does. Also responsible for logging about nodes that have
@ -63,14 +61,15 @@ public class DiskThresholdMonitor {
private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();
private final Supplier<ClusterState> clusterStateSupplier;
private final LongSupplier currentTimeMillisSupplier;
private final RerouteService rerouteService;
private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE);
private final AtomicBoolean checkInProgress = new AtomicBoolean();
private final SetOnce<Consumer<ActionListener<Void>>> rerouteAction = new SetOnce<>();
public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings,
Client client, LongSupplier currentTimeMillisSupplier) {
Client client, LongSupplier currentTimeMillisSupplier, RerouteService rerouteService) {
this.clusterStateSupplier = clusterStateSupplier;
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
this.rerouteService = rerouteService;
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
this.client = client;
}
@ -111,8 +110,6 @@ public class DiskThresholdMonitor {
public void onNewInfo(ClusterInfo info) {
assert rerouteAction.get() != null;
if (checkInProgress.compareAndSet(false, true) == false) {
logger.info("skipping monitor as a check is already in progress");
return;
@ -188,7 +185,7 @@ public class DiskThresholdMonitor {
if (reroute) {
logger.info("rerouting shards: [{}]", explanation);
rerouteAction.get().accept(ActionListener.wrap(r -> {
rerouteService.reroute("disk threshold monitor", ActionListener.wrap(r -> {
setLastRunTimeMillis();
listener.onResponse(r);
}, e -> {
@ -225,8 +222,4 @@ public class DiskThresholdMonitor {
.setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build())
.execute(ActionListener.map(listener, r -> null));
}
public void setRerouteAction(BiConsumer<String, ActionListener<Void>> rerouteAction) {
this.rerouteAction.set(listener -> rerouteAction.accept("disk threshold monitor", listener));
}
}

View File

@ -25,7 +25,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.ElectionStrategy;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplierService;
@ -92,7 +92,7 @@ public class DiscoveryModule {
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState,
RoutingService routingService) {
RerouteService rerouteService) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
hostProviders.put("settings", () -> new SettingsBasedSeedHostsProvider(settings, transportService));
@ -154,10 +154,10 @@ public class DiscoveryModule {
settings, clusterSettings,
transportService, namedWriteableRegistry, allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,
clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), routingService::reroute, electionStrategy);
clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService, electionStrategy);
} else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) {
discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState, routingService::reroute);
clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState, rerouteService);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
}

View File

@ -22,7 +22,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
@ -30,6 +29,7 @@ import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.JoinTaskExecutor;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
@ -44,7 +44,6 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
/**
* This class processes incoming join request (passed zia {@link ZenDiscovery}). Incoming nodes
@ -63,9 +62,9 @@ public class NodeJoinController {
public NodeJoinController(Settings settings, MasterService masterService, AllocationService allocationService,
ElectMasterService electMaster, BiConsumer<String, ActionListener<Void>> reroute) {
ElectMasterService electMaster, RerouteService rerouteService) {
this.masterService = masterService;
joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, reroute) {
joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {
@Override
public void clusterStatePublished(ClusterChangedEvent event) {
electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.coordination.NoMasterBlockService;
import org.elasticsearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
@ -165,7 +166,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,
ClusterSettings clusterSettings, SeedHostsProvider hostsProvider, AllocationService allocationService,
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, GatewayMetaState gatewayMetaState,
BiConsumer<String, ActionListener<Void>> reroute) {
RerouteService rerouteService) {
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.masterService = masterService;
this.clusterApplier = clusterApplier;
@ -226,7 +227,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
this.membership = new MembershipAction(transportService, new MembershipListener(), onJoinValidators);
this.joinThreadControl = new JoinThreadControl();
this.nodeJoinController = new NodeJoinController(settings, masterService, allocationService, electMaster, reroute);
this.nodeJoinController = new NodeJoinController(settings, masterService, allocationService, electMaster, rerouteService);
this.nodeRemovalExecutor = new ZenNodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::submitRejoin, logger);
masterService.setClusterStateSupplier(this::clusterState);

View File

@ -25,8 +25,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.elasticsearch.cluster.routing.allocation.FailedShard;
@ -44,7 +44,7 @@ public class GatewayAllocator {
private static final Logger logger = LogManager.getLogger(GatewayAllocator.class);
private final RoutingService routingService;
private final RerouteService rerouteService;
private final PrimaryShardAllocator primaryShardAllocator;
private final ReplicaShardAllocator replicaShardAllocator;
@ -55,10 +55,10 @@ public class GatewayAllocator {
asyncFetchStore = ConcurrentCollections.newConcurrentMap();
@Inject
public GatewayAllocator(RoutingService routingService,
public GatewayAllocator(RerouteService rerouteService,
TransportNodesListGatewayStartedShards startedAction,
TransportNodesListShardStoreMetaData storeAction) {
this.routingService = routingService;
this.rerouteService = rerouteService;
this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction);
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
}
@ -72,7 +72,7 @@ public class GatewayAllocator {
// for tests
protected GatewayAllocator() {
this.routingService = null;
this.rerouteService = null;
this.primaryShardAllocator = null;
this.replicaShardAllocator = null;
}
@ -139,7 +139,7 @@ public class GatewayAllocator {
@Override
protected void reroute(ShardId shardId, String reason) {
logger.trace("{} scheduling reroute for {}", shardId, reason);
routingService.reroute("async_shard_fetch", ActionListener.wrap(
rerouteService.reroute("async_shard_fetch", ActionListener.wrap(
r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason),
e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e)));
}

View File

@ -56,7 +56,9 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.BatchedRerouteService;
import org.elasticsearch.cluster.routing.LazilyInitializedRerouteService;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.StopWatch;
@ -368,8 +370,9 @@ public class Node implements Closeable {
.newHashPublisher());
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
final LazilyInitializedRerouteService lazilyInitializedRerouteService = new LazilyInitializedRerouteService();
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis);
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, lazilyInitializedRerouteService);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
diskThresholdMonitor::onNewInfo);
final UsageService usageService = new UsageService();
@ -506,16 +509,17 @@ public class Node implements Closeable {
RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
metaDataCreateIndexService, metaDataIndexUpgradeService, clusterService.getClusterSettings());
final RoutingService routingService = new RoutingService(clusterService, clusterModule.getAllocationService()::reroute);
final RerouteService rerouteService
= new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
lazilyInitializedRerouteService.setRerouteService(rerouteService);
final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry,
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, routingService);
clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService);
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
searchTransportService);
diskThresholdMonitor.setRerouteAction(routingService::reroute);
final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
@ -586,7 +590,7 @@ public class Node implements Closeable {
b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);
b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus);
b.bind(RestoreService.class).toInstance(restoreService);
b.bind(RoutingService.class).toInstance(routingService);
b.bind(RerouteService.class).toInstance(rerouteService);
}
);
injector = modules.createInjector();
@ -674,7 +678,6 @@ public class Node implements Closeable {
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SnapshotShardsService.class).start();
injector.getInstance(RoutingService.class).start();
injector.getInstance(SearchService.class).start();
nodeService.getMonitorService().start();
@ -792,7 +795,6 @@ public class Node implements Closeable {
// This can confuse other nodes and delay things - mostly if we're the master and we're running tests.
injector.getInstance(Discovery.class).stop();
// we close indices first, so operations won't be allowed on it
injector.getInstance(RoutingService.class).stop();
injector.getInstance(ClusterService.class).stop();
injector.getInstance(NodeConnectionsService.class).stop();
nodeService.getMonitorService().stop();
@ -842,8 +844,6 @@ public class Node implements Closeable {
toClose.add(injector.getInstance(IndicesService.class));
// close filter/fielddata caches after indices
toClose.add(injector.getInstance(IndicesStore.class));
toClose.add(() -> stopWatch.stop().start("routing"));
toClose.add(injector.getInstance(RoutingService.class));
toClose.add(() -> stopWatch.stop().start("cluster"));
toClose.add(injector.getInstance(ClusterService.class));
toClose.add(() -> stopWatch.stop().start("node_connections_service"));

View File

@ -32,7 +32,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
@ -92,8 +92,8 @@ public class ShardStateActionTests extends ESTestCase {
private static class TestShardStateAction extends ShardStateAction {
TestShardStateAction(ClusterService clusterService, TransportService transportService,
AllocationService allocationService, RoutingService routingService) {
super(clusterService, transportService, allocationService, routingService, THREAD_POOL);
AllocationService allocationService, RerouteService rerouteService) {
super(clusterService, transportService, allocationService, rerouteService, THREAD_POOL);
}
private Runnable onBeforeWaitForNewMasterAndRetry;

View File

@ -20,7 +20,6 @@ package org.elasticsearch.cluster.routing;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
@ -41,9 +40,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.startsWith;
public class RoutingServiceTests extends ESTestCase {
public class BatchedRerouteServiceTests extends ESTestCase {
private ThreadPool threadPool;
private ClusterService clusterService;
@ -60,38 +58,19 @@ public class RoutingServiceTests extends ESTestCase {
threadPool.shutdown();
}
public void testRejectionUnlessStarted() {
final RoutingService routingService = new RoutingService(clusterService, (s, r) -> s);
final PlainActionFuture<Void> future = new PlainActionFuture<>();
if (randomBoolean()) {
routingService.start();
routingService.stop();
} else if (randomBoolean()) {
routingService.close();
}
routingService.reroute("test", future);
assertTrue(future.isDone());
assertThat(expectThrows(IllegalStateException.class, future::actionGet).getMessage(),
startsWith("rejecting delayed reroute [test] in state ["));
}
public void testReroutesWhenRequested() throws InterruptedException {
final AtomicLong rerouteCount = new AtomicLong();
final RoutingService routingService = new RoutingService(clusterService, (s, r) -> {
final BatchedRerouteService batchedRerouteService = new BatchedRerouteService(clusterService, (s, r) -> {
rerouteCount.incrementAndGet();
return s;
});
routingService.start();
long rerouteCountBeforeReroute = 0L;
final int iterations = between(1, 100);
final CountDownLatch countDownLatch = new CountDownLatch(iterations);
for (int i = 0; i < iterations; i++) {
rerouteCountBeforeReroute = Math.max(rerouteCountBeforeReroute, rerouteCount.get());
routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
}
countDownLatch.await(10, TimeUnit.SECONDS);
assertThat(rerouteCountBeforeReroute, lessThan(rerouteCount.get()));
@ -116,17 +95,15 @@ public class RoutingServiceTests extends ESTestCase {
cyclicBarrier.await(); // wait for master thread to be blocked
final AtomicBoolean rerouteExecuted = new AtomicBoolean();
final RoutingService routingService = new RoutingService(clusterService, (s, r) -> {
final BatchedRerouteService batchedRerouteService = new BatchedRerouteService(clusterService, (s, r) -> {
assertTrue(rerouteExecuted.compareAndSet(false, true)); // only called once
return s;
});
routingService.start();
final int iterations = between(1, 100);
final CountDownLatch countDownLatch = new CountDownLatch(iterations);
for (int i = 0; i < iterations; i++) {
routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
}
cyclicBarrier.await(); // allow master thread to continue;
@ -136,18 +113,17 @@ public class RoutingServiceTests extends ESTestCase {
public void testNotifiesOnFailure() throws InterruptedException {
final RoutingService routingService = new RoutingService(clusterService, (s, r) -> {
final BatchedRerouteService batchedRerouteService = new BatchedRerouteService(clusterService, (s, r) -> {
if (rarely()) {
throw new ElasticsearchException("simulated");
}
return randomBoolean() ? s : ClusterState.builder(s).build();
});
routingService.start();
final int iterations = between(1, 100);
final CountDownLatch countDownLatch = new CountDownLatch(iterations);
for (int i = 0; i < iterations; i++) {
routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
if (rarely()) {
clusterService.getMasterService().setClusterStatePublisher(
randomBoolean()

View File

@ -77,7 +77,10 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
AtomicReference<Set<String>> indices = new AtomicReference<>();
AtomicLong currentTime = new AtomicLong();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> {
assertTrue(reroute.compareAndSet(false, true));
listener.onResponse(null);
}) {
@Override
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
@ -85,11 +88,6 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
}
};
monitor.setRerouteAction((reason, listener) -> {
assertTrue(reroute.compareAndSet(false, true));
listener.onResponse(null);
});
ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder();
builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4));
builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 30));
@ -119,17 +117,16 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> {
assertTrue(reroute.compareAndSet(false, true));
listener.onResponse(null);
}) {
@Override
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
listener.onResponse(null);
}
};
monitor.setRerouteAction((reason, listener) -> {
assertTrue(reroute.compareAndSet(false, true));
listener.onResponse(null);
});
indices.set(null);
reroute.set(false);
@ -147,18 +144,16 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
AtomicLong currentTime = new AtomicLong();
AtomicReference<ActionListener<Void>> listenerReference = new AtomicReference<>();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> {
assertNotNull(listener);
assertTrue(listenerReference.compareAndSet(null, listener));
}) {
@Override
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
throw new AssertionError("unexpected");
}
};
monitor.setRerouteAction((reason, listener) -> {
assertNotNull(listener);
assertTrue(listenerReference.compareAndSet(null, listener));
});
final ImmutableOpenMap.Builder<String, DiskUsage> allDisksOkBuilder;
allDisksOkBuilder = ImmutableOpenMap.builder();
allDisksOkBuilder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 50));

View File

@ -22,7 +22,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -92,7 +92,7 @@ public class DiscoveryModuleTests extends ESTestCase {
private DiscoveryModule newModule(Settings settings, List<DiscoveryPlugin> plugins) {
return new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry, null, masterService,
clusterApplier, clusterSettings, plugins, null, createTempDir().toAbsolutePath(), gatewayMetaState,
mock(RoutingService.class));
mock(RerouteService.class));
}
public void testDefaults() {

View File

@ -100,7 +100,7 @@ import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.BatchedRerouteService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@ -1065,7 +1065,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
transportService, indicesService, actionFilters, indexNameExpressionResolver);
final ShardStateAction shardStateAction = new ShardStateAction(
clusterService, transportService, allocationService,
new RoutingService(clusterService, allocationService::reroute),
new BatchedRerouteService(clusterService, allocationService::reroute),
threadPool
);
final MetaDataMappingService metaDataMappingService = new MetaDataMappingService(clusterService, indicesService);
@ -1248,7 +1248,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
hostsResolver -> testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode())
.map(n -> n.node.getAddress()).collect(Collectors.toList()),
clusterService.getClusterApplierService(), Collections.emptyList(), random(),
new RoutingService(clusterService, allocationService::reroute)::reroute, ElectionStrategy.DEFAULT_INSTANCE);
new BatchedRerouteService(clusterService, allocationService::reroute), ElectionStrategy.DEFAULT_INSTANCE);
masterService.setClusterStatePublisher(coordinator);
coordinator.start();
masterService.start();