Ignore timeouts with single-node discovery (#52159)
Today we use `cluster.join.timeout` to prevent nodes from waiting indefinitely if joining a faulty master that is too slow to respond, and `cluster.publish.timeout` to allow a faulty master to detect that it is unable to publish its cluster state updates in a timely fashion. If these timeouts occur then the node restarts the discovery process in an attempt to find a healthier master. In the special case of `discovery.type: single-node` there is no point in looking for another healthier master since the single node in the cluster is all we've got. This commit suppresses these timeouts and instead lets the node wait for joins and publications to succeed no matter how long this might take.
This commit is contained in:
parent
4c88996cd7
commit
00b9098250
|
@ -30,8 +30,9 @@ Discovery and cluster formation are affected by the following settings:
|
|||
Specifies whether {es} should form a multiple-node cluster. By default, {es}
|
||||
discovers other nodes when forming a cluster and allows other nodes to join
|
||||
the cluster later. If `discovery.type` is set to `single-node`, {es} forms a
|
||||
single-node cluster. For more information about when you might use this
|
||||
setting, see <<single-node-discovery>>.
|
||||
single-node cluster and suppresses the timeouts set by
|
||||
`cluster.publish.timeout` and `cluster.join.timeout`. For more information
|
||||
about when you might use this setting, see <<single-node-discovery>>.
|
||||
|
||||
`cluster.initial_master_nodes`::
|
||||
|
||||
|
@ -183,8 +184,8 @@ or may become unstable or intolerant of certain failures.
|
|||
`cluster.join.timeout`::
|
||||
|
||||
Sets how long a node will wait after sending a request to join a cluster
|
||||
before it considers the request to have failed and retries. Defaults to
|
||||
`60s`.
|
||||
before it considers the request to have failed and retries, unless
|
||||
`discovery.type` is set to `single-node`. Defaults to `60s`.
|
||||
|
||||
`cluster.max_voting_config_exclusions`::
|
||||
|
||||
|
@ -201,8 +202,8 @@ or may become unstable or intolerant of certain failures.
|
|||
`cluster.publish.timeout`::
|
||||
|
||||
Sets how long the master node waits for each cluster state update to be
|
||||
completely published to all nodes. The default value is `30s`. See
|
||||
<<cluster-state-publishing>>.
|
||||
completely published to all nodes, unless `discovery.type` is set to
|
||||
`single-node`. The default value is `30s`. See <<cluster-state-publishing>>.
|
||||
|
||||
[[no-master-block]]`cluster.no_master_block`::
|
||||
Specifies which operations are rejected when there is no active master in a
|
||||
|
|
|
@ -80,7 +80,7 @@ public class ClusterBootstrapService {
|
|||
public ClusterBootstrapService(Settings settings, TransportService transportService,
|
||||
Supplier<Iterable<DiscoveryNode>> discoveredNodesSupplier, BooleanSupplier isBootstrappedSupplier,
|
||||
Consumer<VotingConfiguration> votingConfigurationConsumer) {
|
||||
if (DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings))) {
|
||||
if (DiscoveryModule.isSingleNodeDiscovery(settings)) {
|
||||
if (INITIAL_MASTER_NODES_SETTING.exists(settings)) {
|
||||
throw new IllegalArgumentException("setting [" + INITIAL_MASTER_NODES_SETTING.getKey() +
|
||||
"] is not allowed when [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] is set to [" +
|
||||
|
|
|
@ -170,7 +170,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
this.masterService = masterService;
|
||||
this.allocationService = allocationService;
|
||||
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
|
||||
this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings));
|
||||
this.singleNodeDiscovery = DiscoveryModule.isSingleNodeDiscovery(settings);
|
||||
this.electionStrategy = electionStrategy;
|
||||
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
|
||||
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
|
||||
|
@ -1253,6 +1253,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
private final AckListener ackListener;
|
||||
private final ActionListener<Void> publishListener;
|
||||
private final PublicationTransportHandler.PublicationContext publicationContext;
|
||||
|
||||
@Nullable // if using single-node discovery
|
||||
private final Scheduler.ScheduledCancellable timeoutHandler;
|
||||
private final Scheduler.Cancellable infoTimeoutHandler;
|
||||
|
||||
|
@ -1296,7 +1298,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
this.ackListener = ackListener;
|
||||
this.publishListener = publishListener;
|
||||
|
||||
this.timeoutHandler = transportService.getThreadPool().schedule(new Runnable() {
|
||||
this.timeoutHandler = singleNodeDiscovery ? null : transportService.getThreadPool().schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (mutex) {
|
||||
|
@ -1364,8 +1366,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
synchronized (mutex) {
|
||||
removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState");
|
||||
}
|
||||
timeoutHandler.cancel();
|
||||
infoTimeoutHandler.cancel();
|
||||
cancelTimeoutHandlers();
|
||||
ackListener.onNodeAck(getLocalNode(), e);
|
||||
publishListener.onFailure(e);
|
||||
}
|
||||
|
@ -1412,8 +1413,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
lagDetector.startLagDetector(publishRequest.getAcceptedState().version());
|
||||
logIncompleteNodes(Level.WARN);
|
||||
}
|
||||
timeoutHandler.cancel();
|
||||
infoTimeoutHandler.cancel();
|
||||
cancelTimeoutHandlers();
|
||||
ackListener.onNodeAck(getLocalNode(), null);
|
||||
publishListener.onResponse(null);
|
||||
}
|
||||
|
@ -1424,8 +1424,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
public void onFailure(Exception e) {
|
||||
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
|
||||
removePublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)");
|
||||
timeoutHandler.cancel();
|
||||
infoTimeoutHandler.cancel();
|
||||
cancelTimeoutHandlers();
|
||||
|
||||
final FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException("publication failed", e);
|
||||
ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.
|
||||
|
@ -1434,6 +1433,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
}, EsExecutors.newDirectExecutorService(), transportService.getThreadPool().getThreadContext());
|
||||
}
|
||||
|
||||
private void cancelTimeoutHandlers() {
|
||||
if (timeoutHandler != null) {
|
||||
timeoutHandler.cancel();
|
||||
}
|
||||
infoTimeoutHandler.cancel();
|
||||
}
|
||||
|
||||
private void handleAssociatedJoin(Join join) {
|
||||
if (join.getTerm() == getCurrentTerm() && missingJoinVoteFrom(join.getSourceNode())) {
|
||||
logger.trace("handling {}", join);
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.routing.RerouteService;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -42,6 +43,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.zen.MembershipAction;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
|
@ -85,6 +87,8 @@ public class JoinHelper {
|
|||
private final MasterService masterService;
|
||||
private final TransportService transportService;
|
||||
private final JoinTaskExecutor joinTaskExecutor;
|
||||
|
||||
@Nullable // if using single-node discovery
|
||||
private final TimeValue joinTimeout;
|
||||
|
||||
private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());
|
||||
|
@ -97,7 +101,7 @@ public class JoinHelper {
|
|||
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService) {
|
||||
this.masterService = masterService;
|
||||
this.transportService = transportService;
|
||||
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
|
||||
this.joinTimeout = DiscoveryModule.isSingleNodeDiscovery(settings) ? null : JOIN_TIMEOUT_SETTING.get(settings);
|
||||
this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -175,6 +175,10 @@ public class DiscoveryModule {
|
|||
return DISCOVERY_SEED_PROVIDERS_SETTING.get(settings);
|
||||
}
|
||||
|
||||
public static boolean isSingleNodeDiscovery(Settings settings) {
|
||||
return SINGLE_NODE_DISCOVERY_TYPE.equals(DISCOVERY_TYPE_SETTING.get(settings));
|
||||
}
|
||||
|
||||
public Discovery getDiscovery() {
|
||||
return discovery;
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.common.logging.Loggers;
|
|||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.Settings.Builder;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
|
@ -57,6 +58,7 @@ import java.util.function.Function;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.cluster.coordination.AbstractCoordinatorTestCase.Cluster.DEFAULT_DELAY_VARIABILITY;
|
||||
import static org.elasticsearch.cluster.coordination.AbstractCoordinatorTestCase.Cluster.EXTREME_DELAY_VARIABILITY;
|
||||
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE;
|
||||
import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING;
|
||||
import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING;
|
||||
|
@ -79,6 +81,7 @@ import static org.hamcrest.Matchers.hasSize;
|
|||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
public class CoordinatorTests extends AbstractCoordinatorTestCase {
|
||||
|
@ -1051,7 +1054,7 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
|
|||
cluster.stabilise();
|
||||
|
||||
partitionedNode.heal();
|
||||
cluster.runRandomly(false);
|
||||
cluster.runRandomly(false, true, EXTREME_DELAY_VARIABILITY);
|
||||
cluster.stabilise();
|
||||
}
|
||||
}
|
||||
|
@ -1132,6 +1135,36 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testSingleNodeDiscoveryStabilisesEvenWhenDisrupted() {
|
||||
try (Cluster cluster = new Cluster(1, randomBoolean(), Settings.builder()
|
||||
.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE).build())) {
|
||||
|
||||
// A cluster using single-node discovery should not apply any timeouts to joining or cluster state publication. There are no
|
||||
// other options, so there's no point in failing and retrying from scratch no matter how badly disrupted we are and we may as
|
||||
// well just wait.
|
||||
|
||||
// larger variability is are good for checking that we don't time out, but smaller variability also tightens up the time bound
|
||||
// within which we expect to converge, so use a mix of both
|
||||
final long delayVariabilityMillis = randomLongBetween(DEFAULT_DELAY_VARIABILITY, TimeValue.timeValueMinutes(10).millis());
|
||||
if (randomBoolean()) {
|
||||
cluster.runRandomly(true, false, delayVariabilityMillis);
|
||||
} else {
|
||||
cluster.deterministicTaskQueue.setExecutionDelayVariabilityMillis(delayVariabilityMillis);
|
||||
}
|
||||
|
||||
final ClusterNode clusterNode = cluster.getAnyNode();
|
||||
|
||||
// cf. DEFAULT_STABILISATION_TIME, but stabilisation is quicker when there's a single node - there's no meaningful fault
|
||||
// detection and ongoing publications do not time out
|
||||
cluster.runFor(ELECTION_INITIAL_TIMEOUT_SETTING.get(Settings.EMPTY).millis() + delayVariabilityMillis
|
||||
+ 4 * delayVariabilityMillis // two round trips for pre-voting and voting
|
||||
+ 7 * delayVariabilityMillis, // see definition of DEFAULT_CLUSTER_STATE_UPDATE_DELAY
|
||||
"stabilising");
|
||||
|
||||
assertThat(cluster.getAnyLeader(), sameInstance(clusterNode));
|
||||
}
|
||||
}
|
||||
|
||||
private static class BrokenCustom extends AbstractDiffable<ClusterState.Custom> implements ClusterState.Custom {
|
||||
|
||||
static final String EXCEPTION_MESSAGE = "simulated";
|
||||
|
|
|
@ -320,10 +320,16 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
void runRandomly() {
|
||||
runRandomly(true);
|
||||
runRandomly(true, true, EXTREME_DELAY_VARIABILITY);
|
||||
}
|
||||
|
||||
void runRandomly(boolean allowReboots) {
|
||||
/**
|
||||
* @param allowReboots whether to randomly reboot the nodes during the process, losing all transient state. Usually true.
|
||||
* @param coolDown whether to set the delay variability back to {@link Cluster#DEFAULT_DELAY_VARIABILITY} and drain all
|
||||
* disrupted events from the queue before returning. Usually true.
|
||||
* @param delayVariability the delay variability to use while running randomly. Usually {@link Cluster#EXTREME_DELAY_VARIABILITY}.
|
||||
*/
|
||||
void runRandomly(boolean allowReboots, boolean coolDown, long delayVariability) {
|
||||
|
||||
// TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it
|
||||
assertThat("may reconnect disconnected nodes, probably unexpected", disconnectedNodes, empty());
|
||||
|
@ -336,9 +342,9 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
|
||||
final int randomSteps = scaledRandomIntBetween(10, 10000);
|
||||
final int keyRange = randomSteps / 50; // for randomized writes and reads
|
||||
logger.info("--> start of safety phase of at least [{}] steps", randomSteps);
|
||||
logger.info("--> start of safety phase of at least [{}] steps with delay variability of [{}ms]", randomSteps, delayVariability);
|
||||
|
||||
deterministicTaskQueue.setExecutionDelayVariabilityMillis(EXTREME_DELAY_VARIABILITY);
|
||||
deterministicTaskQueue.setExecutionDelayVariabilityMillis(delayVariability);
|
||||
disruptStorage = true;
|
||||
int step = 0;
|
||||
long finishTime = -1;
|
||||
|
@ -349,8 +355,13 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
|
||||
if (randomSteps <= step && finishTime == -1) {
|
||||
finishTime = deterministicTaskQueue.getLatestDeferredExecutionTime();
|
||||
if (coolDown) {
|
||||
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
|
||||
logger.debug("----> [runRandomly {}] reducing delay variability and running until [{}ms]", step, finishTime);
|
||||
} else {
|
||||
logger.debug("----> [runRandomly {}] running until [{}ms] with delay variability of [{}ms]", step, finishTime,
|
||||
deterministicTaskQueue.getExecutionDelayVariabilityMillis());
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue