From c79fbea9235d09477fd5c95dffdbfcc28b5ebb60 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 17 Sep 2018 15:00:30 +0200 Subject: [PATCH] [Zen2] Implement basic cluster formation (#33668) This PR integrates the following pieces of machinery in the Coordinator: - discovery - pre-voting - randomised election scheduling - joining (of a new master) - publication of cluster state updates Together, these things are everything needed to form a cluster. We therefore also add the start of a test suite that allows us to assert higher-level properties of the interactions between all these pieces of machinery, with as little fake behaviour as possible. We assert one such property: "a cluster successfully forms". --- .../coordination/ApplyCommitRequest.java | 1 + .../cluster/coordination/Coordinator.java | 269 ++++++++-- .../cluster/coordination/JoinHelper.java | 70 ++- .../coordination/PreVoteCollector.java | 17 +- .../cluster/coordination/Publication.java | 8 +- .../elasticsearch/discovery/PeerFinder.java | 25 +- .../transport/TransportService.java | 22 +- .../admin/indices/create/ShrinkIndexIT.java | 3 +- .../coordination/CoordinatorTests.java | 466 ++++++++++++++++++ .../coordination/DeterministicTaskQueue.java | 2 +- .../ElectionSchedulerFactoryTests.java | 14 +- .../cluster/coordination/NodeJoinTests.java | 4 +- .../coordination/PreVoteCollectorTests.java | 6 +- .../coordination/PublicationTests.java | 16 +- .../common/settings/SettingTests.java | 6 +- .../cluster/FakeThreadPoolMasterService.java | 34 +- .../monitoring/integration/MonitoringIT.java | 11 +- 17 files changed, 886 insertions(+), 88 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ApplyCommitRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ApplyCommitRequest.java index 02cffbb4c4d..87f13a1f632 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ApplyCommitRequest.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ApplyCommitRequest.java @@ -49,6 +49,7 @@ public class ApplyCommitRequest extends TermVersionRequest { return "ApplyCommitRequest{" + "term=" + term + ", version=" + version + + ", sourceNode=" + sourceNode + '}'; } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 7528050e4df..011f8714091 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -19,18 +19,35 @@ package org.elasticsearch.cluster.coordination; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.HandshakingTransportAddressConnector; +import org.elasticsearch.discovery.PeerFinder; +import org.elasticsearch.discovery.UnicastConfiguredHostsResolver; +import org.elasticsearch.discovery.zen.UnicastHostsProvider; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; public class Coordinator extends AbstractLifecycleComponent { + public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state"; + public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state"; + private final TransportService transportService; private final JoinHelper joinHelper; private final Supplier persistedStateSupplier; @@ -38,6 +55,17 @@ public class Coordinator extends AbstractLifecycleComponent { // These tests can be rewritten to use public methods once Coordinator is more feature-complete final Object mutex = new Object(); final SetOnce coordinationState = new SetOnce<>(); // initialized on start-up (see doStart) + private volatile Optional lastCommittedState = Optional.empty(); + + private final PeerFinder peerFinder; + private final PreVoteCollector preVoteCollector; + private final ElectionSchedulerFactory electionSchedulerFactory; + private final UnicastConfiguredHostsResolver configuredHostsResolver; + @Nullable + private Releasable electionScheduler; + @Nullable + private Releasable prevotingRound; + private AtomicLong maxTermSeen = new AtomicLong(); private Mode mode; private Optional lastKnownLeader; @@ -45,15 +73,99 @@ public class Coordinator extends AbstractLifecycleComponent { private JoinHelper.JoinAccumulator joinAccumulator; public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService, - MasterService masterService, Supplier persistedStateSupplier) { + MasterService masterService, Supplier persistedStateSupplier, + UnicastHostsProvider unicastHostsProvider) { super(settings); this.transportService = transportService; this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, - this::getCurrentTerm, this::handleJoinRequest); + this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm); this.persistedStateSupplier = persistedStateSupplier; this.lastKnownLeader = Optional.empty(); this.lastJoin = Optional.empty(); this.joinAccumulator = joinHelper.new CandidateJoinAccumulator(); + + this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, Randomness.get(), transportService.getThreadPool()); + this.preVoteCollector = new PreVoteCollector(settings, transportService, this::startElection, this::updateMaxTermSeen); + configuredHostsResolver = new UnicastConfiguredHostsResolver(settings, transportService, unicastHostsProvider); + this.peerFinder = new CoordinatorPeerFinder(settings, transportService, + new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver); + + transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, Names.GENERIC, false, false, + in -> new PublishRequest(in, transportService.getLocalNode()), + (request, channel, task) -> channel.sendResponse(handlePublishRequest(request))); + + transportService.registerRequestHandler(COMMIT_STATE_ACTION_NAME, Names.GENERIC, false, false, + ApplyCommitRequest::new, + (request, channel, task) -> { + handleApplyCommit(request); + channel.sendResponse(Empty.INSTANCE); + }); + } + + private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) { + synchronized (mutex) { + logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest); + + coordinationState.get().handleCommit(applyCommitRequest); + lastCommittedState = Optional.of(coordinationState.get().getLastAcceptedState()); + } + } + + private PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { + synchronized (mutex) { + final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode(); + logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode); + ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); + final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest); + + if (sourceNode.equals(getLocalNode()) == false) { + becomeFollower("handlePublishRequest", sourceNode); + } + + return new PublishWithJoinResponse(publishResponse, + joinWithDestination(lastJoin, sourceNode, publishRequest.getAcceptedState().term())); + } + } + + private static Optional joinWithDestination(Optional lastJoin, DiscoveryNode leader, long term) { + if (lastJoin.isPresent() + && lastJoin.get().getTargetNode().getId().equals(leader.getId()) + && lastJoin.get().getTerm() == term) { + return lastJoin; + } + + return Optional.empty(); + } + + private void closePrevotingAndElectionScheduler() { + if (prevotingRound != null) { + prevotingRound.close(); + prevotingRound = null; + } + + if (electionScheduler != null) { + electionScheduler.close(); + electionScheduler = null; + } + } + + private void updateMaxTermSeen(final long term) { + maxTermSeen.updateAndGet(oldMaxTerm -> Math.max(oldMaxTerm, term)); + // TODO if we are leader here, and there is no publication in flight, then we should bump our term + // (if we are leader and there _is_ a publication in flight then doing so would cancel the publication, so don't do that, but + // do check for this after the publication completes) + } + + private void startElection() { + synchronized (mutex) { + // The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have + // to check our mode again here. + if (mode == Mode.CANDIDATE) { + final StartJoinRequest startJoinRequest + = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen.get()) + 1); + getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node)); + } + } } private Optional ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) { @@ -65,19 +177,20 @@ public class Coordinator extends AbstractLifecycleComponent { } private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) { - assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - logger.debug("joinLeaderInTerm: from [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm()); - Join join = coordinationState.get().handleStartJoin(startJoinRequest); - lastJoin = Optional.of(join); - if (mode != Mode.CANDIDATE) { - becomeCandidate("joinLeaderInTerm"); + synchronized (mutex) { + logger.debug("joinLeaderInTerm: for [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm()); + final Join join = coordinationState.get().handleStartJoin(startJoinRequest); + lastJoin = Optional.of(join); + if (mode != Mode.CANDIDATE) { + becomeCandidate("joinLeaderInTerm"); + } + return join; } - return join; } private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) { assert Thread.holdsLock(mutex) == false; - logger.trace("handleJoin: as {}, handling {}", mode, joinRequest); + logger.trace("handleJoinRequest: as {}, handling {}", mode, joinRequest); transportService.connectToNode(joinRequest.getSourceNode()); final Optional optionalJoin = joinRequest.getOptionalJoin(); @@ -85,29 +198,11 @@ public class Coordinator extends AbstractLifecycleComponent { final CoordinationState coordState = coordinationState.get(); final boolean prevElectionWon = coordState.electionWon(); - if (optionalJoin.isPresent()) { - Join join = optionalJoin.get(); - // if someone thinks we should be master, let's add our vote and try to become one - // note that the following line should never throw an exception - ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(coordState::handleJoin); - - if (coordState.electionWon()) { - // if we have already won the election then the actual join does not matter for election purposes, - // so swallow any exception - try { - coordState.handleJoin(join); - } catch (CoordinationStateRejectedException e) { - logger.trace("failed to add join, ignoring", e); - } - } else { - coordState.handleJoin(join); // this might fail and bubble up the exception - } - } - + optionalJoin.ifPresent(this::handleJoin); joinAccumulator.handleJoinRequest(joinRequest.getSourceNode(), joinCallback); if (prevElectionWon == false && coordState.electionWon()) { - becomeLeader("handleJoin"); + becomeLeader("handleJoinRequest"); } } } @@ -120,7 +215,11 @@ public class Coordinator extends AbstractLifecycleComponent { mode = Mode.CANDIDATE; joinAccumulator.close(mode); joinAccumulator = joinHelper.new CandidateJoinAccumulator(); + + peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes()); } + + preVoteCollector.update(getPreVoteResponse(), null); } void becomeLeader(String method) { @@ -129,9 +228,13 @@ public class Coordinator extends AbstractLifecycleComponent { logger.debug("{}: becoming LEADER (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader); mode = Mode.LEADER; - lastKnownLeader = Optional.of(getLocalNode()); joinAccumulator.close(mode); joinAccumulator = joinHelper.new LeaderJoinAccumulator(); + + lastKnownLeader = Optional.of(getLocalNode()); + peerFinder.deactivate(getLocalNode()); + closePrevotingAndElectionScheduler(); + preVoteCollector.update(getPreVoteResponse(), getLocalNode()); } void becomeFollower(String method, DiscoveryNode leaderNode) { @@ -145,6 +248,14 @@ public class Coordinator extends AbstractLifecycleComponent { } lastKnownLeader = Optional.of(leaderNode); + peerFinder.deactivate(leaderNode); + closePrevotingAndElectionScheduler(); + preVoteCollector.update(getPreVoteResponse(), leaderNode); + } + + private PreVoteResponse getPreVoteResponse() { + return new PreVoteResponse(getCurrentTerm(), coordinationState.get().getLastAcceptedTerm(), + coordinationState.get().getLastAcceptedVersion()); } // package-visible for testing @@ -170,6 +281,7 @@ public class Coordinator extends AbstractLifecycleComponent { protected void doStart() { CoordinationState.PersistedState persistedState = persistedStateSupplier.get(); coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState)); + configuredHostsResolver.start(); } public void startInitialJoin() { @@ -180,7 +292,7 @@ public class Coordinator extends AbstractLifecycleComponent { @Override protected void doStop() { - + configuredHostsResolver.stop(); } @Override @@ -190,22 +302,115 @@ public class Coordinator extends AbstractLifecycleComponent { public void invariant() { synchronized (mutex) { + final Optional peerFinderLeader = peerFinder.getLeader(); if (mode == Mode.LEADER) { assert coordinationState.get().electionWon(); assert lastKnownLeader.isPresent() && lastKnownLeader.get().equals(getLocalNode()); assert joinAccumulator instanceof JoinHelper.LeaderJoinAccumulator; + assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader; + assert electionScheduler == null : electionScheduler; + assert prevotingRound == null : prevotingRound; } else if (mode == Mode.FOLLOWER) { assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false"; assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false); assert joinAccumulator instanceof JoinHelper.FollowerJoinAccumulator; + assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader; + assert electionScheduler == null : electionScheduler; + assert prevotingRound == null : prevotingRound; } else { assert mode == Mode.CANDIDATE; assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator; + assert peerFinderLeader.isPresent() == false : peerFinderLeader; + assert prevotingRound == null || electionScheduler != null; } } } + // for tests + boolean hasJoinVoteFrom(DiscoveryNode localNode) { + return coordinationState.get().containsJoinVoteFor(localNode); + } + + void handleJoin(Join join) { + synchronized (mutex) { + ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin); + + if (coordinationState.get().electionWon()) { + // if we have already won the election then the actual join does not matter for election purposes, + // so swallow any exception + try { + coordinationState.get().handleJoin(join); + } catch (CoordinationStateRejectedException e) { + logger.debug("failed to add join, ignoring", e); + } + } else { + coordinationState.get().handleJoin(join); // this might fail and bubble up the exception + } + } + } + + public ClusterState getLastAcceptedState() { + synchronized (mutex) { + return coordinationState.get().getLastAcceptedState(); + } + } + + public Optional getLastCommittedState() { + return lastCommittedState; + } + + private List getDiscoveredNodes() { + final List nodes = new ArrayList<>(); + nodes.add(getLocalNode()); + peerFinder.getFoundPeers().forEach(nodes::add); + return nodes; + } + public enum Mode { CANDIDATE, LEADER, FOLLOWER } + + private class CoordinatorPeerFinder extends PeerFinder { + + CoordinatorPeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector, + ConfiguredHostsResolver configuredHostsResolver) { + super(settings, transportService, transportAddressConnector, configuredHostsResolver); + } + + @Override + protected void onActiveMasterFound(DiscoveryNode masterNode, long term) { + // TODO + } + + @Override + protected void onFoundPeersUpdated() { + synchronized (mutex) { + if (mode == Mode.CANDIDATE) { + final CoordinationState.VoteCollection expectedVotes = new CoordinationState.VoteCollection(); + getFoundPeers().forEach(expectedVotes::addVote); + expectedVotes.addVote(Coordinator.this.getLocalNode()); + final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); + final boolean foundQuorum = CoordinationState.isElectionQuorum(expectedVotes, lastAcceptedState); + + if (foundQuorum) { + if (electionScheduler == null) { + final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period + electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, () -> { + synchronized (mutex) { + if (mode == Mode.CANDIDATE) { + if (prevotingRound != null) { + prevotingRound.close(); + } + prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes()); + } + } + }); + } + } else { + closePrevotingAndElectionScheduler(); + } + } + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 766bdce26da..bc01612b899 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskListener; @@ -27,21 +28,29 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponse.Empty; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.function.LongSupplier; public class JoinHelper extends AbstractComponent { public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join"; + public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join"; private final MasterService masterService; private final TransportService transportService; @@ -49,7 +58,7 @@ public class JoinHelper extends AbstractComponent { public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, TransportService transportService, LongSupplier currentTermSupplier, - BiConsumer joinHandler) { + BiConsumer joinHandler, Function joinLeaderInTerm) { super(settings); this.masterService = masterService; this.transportService = transportService; @@ -100,6 +109,62 @@ public class JoinHelper extends AbstractComponent { return "JoinCallback{request=" + request + "}"; } })); + + transportService.registerRequestHandler(START_JOIN_ACTION_NAME, Names.GENERIC, false, false, + StartJoinRequest::new, + (request, channel, task) -> { + final DiscoveryNode destination = request.getSourceNode(); + final JoinRequest joinRequest + = new JoinRequest(transportService.getLocalNode(), Optional.of(joinLeaderInTerm.apply(request))); + logger.debug("attempting to join {} with {}", destination, joinRequest); + this.transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest, new TransportResponseHandler() { + @Override + public Empty read(StreamInput in) { + return Empty.INSTANCE; + } + + @Override + public void handleResponse(Empty response) { + logger.debug("successfully joined {} with {}", destination, joinRequest); + } + + @Override + public void handleException(TransportException exp) { + logger.debug(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp); + } + + @Override + public String executor() { + return Names.SAME; + } + }); + channel.sendResponse(Empty.INSTANCE); + }); + } + + public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) { + transportService.sendRequest(destination, START_JOIN_ACTION_NAME, + startJoinRequest, new TransportResponseHandler() { + @Override + public Empty read(StreamInput in) { + return Empty.INSTANCE; + } + + @Override + public void handleResponse(Empty response) { + logger.debug("successful response to {} from {}", startJoinRequest, destination); + } + + @Override + public void handleException(TransportException exp) { + logger.debug(new ParameterizedMessage("failure in response to {} from {}", startJoinRequest, destination), exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); } public interface JoinCallback { @@ -211,7 +276,8 @@ public class JoinHelper extends AbstractComponent { @Override public String toString() { - return "CandidateJoinAccumulator{" + joinRequestAccumulator.keySet() + '}'; + return "CandidateJoinAccumulator{" + joinRequestAccumulator.keySet() + + ", closed=" + closed + '}'; } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java index 84dfa70752f..b9aed719af4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.lease.Releasable; @@ -34,6 +35,7 @@ import org.elasticsearch.transport.TransportService; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongConsumer; import static org.elasticsearch.cluster.coordination.CoordinationState.isElectionQuorum; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; @@ -44,16 +46,17 @@ public class PreVoteCollector extends AbstractComponent { private final TransportService transportService; private final Runnable startElection; + private final LongConsumer updateMaxTermSeen; - // Tuple for simple atomic updates - private volatile Tuple state; // DiscoveryNode component is null if there is currently no known leader + // Tuple for simple atomic updates. null until the first call to `update()`. + private volatile Tuple state; // DiscoveryNode component is null if there is currently no known leader. - PreVoteCollector(final Settings settings, final PreVoteResponse preVoteResponse, - final TransportService transportService, final Runnable startElection) { + PreVoteCollector(final Settings settings, final TransportService transportService, final Runnable startElection, + final LongConsumer updateMaxTermSeen) { super(settings); - state = new Tuple<>(null, preVoteResponse); this.transportService = transportService; this.startElection = startElection; + this.updateMaxTermSeen = updateMaxTermSeen; // TODO does this need to be on the generic threadpool or can it use SAME? transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false, @@ -74,7 +77,7 @@ public class PreVoteCollector extends AbstractComponent { return preVotingRound; } - public void update(final PreVoteResponse preVoteResponse, final DiscoveryNode leader) { + public void update(final PreVoteResponse preVoteResponse, @Nullable final DiscoveryNode leader) { logger.trace("updating with preVoteResponse={}, leader={}", preVoteResponse, leader); state = new Tuple<>(leader, preVoteResponse); } @@ -156,7 +159,7 @@ public class PreVoteCollector extends AbstractComponent { return; } - // TODO the response carries the sender's current term. If an election starts then it should be in a higher term. + updateMaxTermSeen.accept(response.getCurrentTerm()); if (response.getLastAcceptedTerm() > clusterState.term() || (response.getLastAcceptedTerm() == clusterState.term() diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index d2b29aa999b..f95bfd223d5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -156,7 +156,7 @@ public abstract class Publication extends AbstractComponent { protected abstract Optional handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse); - protected abstract void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response); + protected abstract void onJoin(Join join); protected abstract void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, ActionListener responseActionListener); @@ -287,8 +287,10 @@ public abstract class Publication extends AbstractComponent { return; } - // TODO: check if we need to pass the full response here or if it's sufficient to just pass the optional join. - onPossibleJoin(discoveryNode, response); + response.getJoin().ifPresent(join -> { + assert discoveryNode.equals(join.getSourceNode()); + onJoin(join); + }); assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM; state = PublicationTargetState.WAITING_FOR_QUORUM; diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 05b7b438b7f..3a854a53ad4 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -72,8 +72,8 @@ public abstract class PeerFinder extends AbstractComponent { private final Map peersByAddress = newConcurrentMap(); private Optional leader = Optional.empty(); - PeerFinder(Settings settings, TransportService transportService, - TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) { + public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector, + ConfiguredHostsResolver configuredHostsResolver) { super(settings); findPeersDelay = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); this.transportService = transportService; @@ -95,6 +95,8 @@ public abstract class PeerFinder extends AbstractComponent { leader = Optional.empty(); handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected } + + onFoundPeersUpdated(); // trigger a check for a quorum already } public void deactivate(DiscoveryNode leader) { @@ -116,7 +118,7 @@ public abstract class PeerFinder extends AbstractComponent { return Thread.holdsLock(mutex); } - boolean assertInactiveWithNoKnownPeers() { + private boolean assertInactiveWithNoKnownPeers() { assert active == false; assert peersByAddress.isEmpty() : peersByAddress.keySet(); return true; @@ -125,13 +127,24 @@ public abstract class PeerFinder extends AbstractComponent { PeersResponse handlePeersRequest(PeersRequest peersRequest) { synchronized (mutex) { assert peersRequest.getSourceNode().equals(getLocalNode()) == false; + final List knownPeers; if (active) { + assert leader.isPresent() == false : leader; startProbe(peersRequest.getSourceNode().getAddress()); peersRequest.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(this::startProbe); - return new PeersResponse(Optional.empty(), getFoundPeersUnderLock(), currentTerm); + knownPeers = getFoundPeersUnderLock(); } else { - return new PeersResponse(leader, Collections.emptyList(), currentTerm); + assert leader.isPresent(); + knownPeers = Collections.emptyList(); } + return new PeersResponse(leader, knownPeers, currentTerm); + } + } + + // exposed for checking invariant in o.e.c.c.Coordinator (public since this is a different package) + public Optional getLeader() { + synchronized (mutex) { + return leader; } } @@ -247,7 +260,7 @@ public abstract class PeerFinder extends AbstractComponent { @Override public String toString() { - return "PeerFinder::handleWakeUp"; + return "PeerFinder handling wakeup"; } }); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index e37ea81211a..39ae48e0f39 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -733,6 +733,11 @@ public class TransportService extends AbstractLifecycleComponent implements Tran "failed to notify channel of error message for action [{}]", action), inner); } } + + @Override + public String toString() { + return "processing of [" + action + "][" + requestId + "]: " + request; + } }); } @@ -1049,6 +1054,11 @@ public class TransportService extends AbstractLifecycleComponent implements Tran "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers"; FutureUtils.cancel(future); } + + @Override + public String toString() { + return "TimeoutHandler for [" + action + "][" + requestId + "]"; + } } static class TimeoutInfoHolder { @@ -1176,7 +1186,17 @@ public class TransportService extends AbstractLifecycleComponent implements Tran if (ThreadPool.Names.SAME.equals(executor)) { processResponse(handler, response); } else { - threadPool.executor(executor).execute(() -> processResponse(handler, response)); + threadPool.executor(executor).execute(new Runnable() { + @Override + public String toString() { + return "delivery of response to [" + action + "][" + requestId + "]: " + response; + } + + @Override + public void run() { + DirectResponseChannel.this.processResponse(handler, response); + } + }); } } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java index b9624e3073f..a87e01477b8 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java @@ -162,7 +162,8 @@ public class ShrinkIndexIT extends ESIntegTestCase { final List factors = Arrays.asList(2, 3, 5, 7); final List numberOfShardsFactors = randomSubsetOf(scaledRandomIntBetween(1, factors.size() - 1), factors); final int numberOfShards = numberOfShardsFactors.stream().reduce(1, (x, y) -> x * y); - final int numberOfTargetShards = randomSubsetOf(numberOfShardsFactors).stream().reduce(1, (x, y) -> x * y); + final int numberOfTargetShards = randomSubsetOf(randomInt(numberOfShardsFactors.size() - 1), numberOfShardsFactors) + .stream().reduce(1, (x, y) -> x * y); internalCluster().ensureAtLeastNumDataNodes(2); prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", numberOfShards)).get(); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java new file mode 100644 index 00000000000..9462724cb76 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -0,0 +1,466 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; +import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection; +import org.elasticsearch.cluster.coordination.CoordinationStateTests.InMemoryPersistedState; +import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNode.Role; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver; +import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.RequestHandlerRegistry; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponse.Empty; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportResponseOptions; +import org.elasticsearch.transport.TransportService; +import org.hamcrest.Matcher; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState; +import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue; +import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value; +import static org.elasticsearch.cluster.coordination.Coordinator.COMMIT_STATE_ACTION_NAME; +import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER; +import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_STATE_ACTION_NAME; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; +import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.cluster.discovery:TRACE") +public class CoordinatorTests extends ESTestCase { + + public void testCanUpdateClusterStateAfterStabilisation() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + long finalValue = randomLong(); + leader.submitValue(finalValue); + cluster.stabilise(); // TODO this should only need a short stabilisation + + for (final ClusterNode clusterNode : cluster.clusterNodes) { + final String nodeId = clusterNode.getId(); + final ClusterState committedState = clusterNode.coordinator.getLastCommittedState().get(); + assertThat(nodeId + " has the committed value", value(committedState), is(finalValue)); + } + } + + private static String nodeIdFromIndex(int nodeIndex) { + return "node" + nodeIndex; + } + + class Cluster { + + static final long DEFAULT_STABILISATION_TIME = 3000L; // TODO use a real stabilisation time - needs fault detection and disruption + + final List clusterNodes; + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( + Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build()); + private final VotingConfiguration initialConfiguration; + + Cluster(int initialNodeCount) { + logger.info("--> creating cluster of {} nodes", initialNodeCount); + + Set initialNodeIds = new HashSet<>(initialNodeCount); + for (int i = 0; i < initialNodeCount; i++) { + initialNodeIds.add(nodeIdFromIndex(i)); + } + initialConfiguration = new VotingConfiguration(initialNodeIds); + + clusterNodes = new ArrayList<>(initialNodeCount); + for (int i = 0; i < initialNodeCount; i++) { + final ClusterNode clusterNode = new ClusterNode(i); + clusterNodes.add(clusterNode); + } + } + + void stabilise() { + final long stabilisationStartTime = deterministicTaskQueue.getCurrentTimeMillis(); + while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationStartTime + DEFAULT_STABILISATION_TIME) { + + while (deterministicTaskQueue.hasRunnableTasks()) { + try { + deterministicTaskQueue.runRandomTask(random()); + } catch (CoordinationStateRejectedException e) { + logger.debug("ignoring benign exception thrown when stabilising", e); + } + for (final ClusterNode clusterNode : clusterNodes) { + clusterNode.coordinator.invariant(); + } + } + + if (deterministicTaskQueue.hasDeferredTasks() == false) { + break; // TODO when fault detection is enabled this should be removed, as there should _always_ be deferred tasks + } + + deterministicTaskQueue.advanceTime(); + } + + assertUniqueLeaderAndExpectedModes(); + } + + private void assertUniqueLeaderAndExpectedModes() { + final ClusterNode leader = getAnyLeader(); + final long leaderTerm = leader.coordinator.getCurrentTerm(); + Matcher> isPresentAndEqualToLeaderVersion + = equalTo(Optional.of(leader.coordinator.getLastAcceptedState().getVersion())); + + assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion); + + for (final ClusterNode clusterNode : clusterNodes) { + if (clusterNode == leader) { + continue; + } + + final String nodeId = clusterNode.getId(); + assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); + assertTrue("leader should have received a vote from " + nodeId, + leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode())); + + assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); + assertThat(nodeId + " is at the same accepted version as the leader", + Optional.of(clusterNode.coordinator.getLastAcceptedState().getVersion()), isPresentAndEqualToLeaderVersion); + assertThat(nodeId + " is at the same committed version as the leader", + clusterNode.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion); + } + } + + ClusterNode getAnyLeader() { + List allLeaders = clusterNodes.stream().filter(ClusterNode::isLeader).collect(Collectors.toList()); + assertThat(allLeaders, not(empty())); + return randomFrom(allLeaders); + } + + class ClusterNode extends AbstractComponent { + private final int nodeIndex; + private Coordinator coordinator; + private DiscoveryNode localNode; + private final PersistedState persistedState; + private MasterService masterService; + private TransportService transportService; + private CapturingTransport capturingTransport; + + ClusterNode(int nodeIndex) { + super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build()); + this.nodeIndex = nodeIndex; + localNode = createDiscoveryNode(); + persistedState = new InMemoryPersistedState(1L, + clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L)); + setUp(); + } + + private DiscoveryNode createDiscoveryNode() { + final TransportAddress transportAddress = buildNewFakeTransportAddress(); + // Generate the ephemeral ID deterministically, for repeatable tests. This means we have to pass everything else into the + // constructor explicitly too. + return new DiscoveryNode("", nodeIdFromIndex(nodeIndex), UUIDs.randomBase64UUID(random()), + transportAddress.address().getHostString(), + transportAddress.getAddress(), transportAddress, Collections.emptyMap(), + EnumSet.allOf(Role.class), Version.CURRENT); + } + + private void setUp() { + capturingTransport = new CapturingTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) { + assert destination.equals(localNode) == false : "non-local message from " + localNode + " to itself"; + super.onSendRequest(requestId, action, request, destination); + + // connecting and handshaking with a new node happens synchronously, so we cannot enqueue these tasks for later + final Consumer scheduler; + final Predicate matchesDestination; + if (action.equals(HANDSHAKE_ACTION_NAME)) { + scheduler = Runnable::run; + matchesDestination = n -> n.getLocalNode().getAddress().equals(destination.getAddress()); + } else { + scheduler = deterministicTaskQueue::scheduleNow; + matchesDestination = n -> n.getLocalNode().equals(destination); + } + + scheduler.accept(new Runnable() { + @Override + public String toString() { + return "delivery of [" + action + "][" + requestId + "]: " + request; + } + + @Override + public void run() { + clusterNodes.stream().filter(matchesDestination).findAny().ifPresent( + destinationNode -> { + + final RequestHandlerRegistry requestHandler + = destinationNode.capturingTransport.getRequestHandler(action); + + final TransportChannel transportChannel = new TransportChannel() { + @Override + public String getProfileName() { + return "default"; + } + + @Override + public String getChannelType() { + return "coordinator-test-channel"; + } + + @Override + public void sendResponse(final TransportResponse response) { + scheduler.accept(new Runnable() { + @Override + public String toString() { + return "delivery of response " + response + + " to [" + action + "][" + requestId + "]: " + request; + } + + @Override + public void run() { + handleResponse(requestId, response); + } + }); + } + + @Override + public void sendResponse(TransportResponse response, TransportResponseOptions options) { + sendResponse(response); + } + + @Override + public void sendResponse(Exception exception) { + scheduler.accept(new Runnable() { + @Override + public String toString() { + return "delivery of error response " + exception.getMessage() + + " to [" + action + "][" + requestId + "]: " + request; + } + + @Override + public void run() { + handleRemoteError(requestId, exception); + } + }); + } + }; + + try { + processMessageReceived(request, requestHandler, transportChannel); + } catch (Exception e) { + scheduler.accept(new Runnable() { + @Override + public String toString() { + return "delivery of processing error response " + e.getMessage() + + " to [" + action + "][" + requestId + "]: " + request; + } + + @Override + public void run() { + handleRemoteError(requestId, e); + } + }); + } + } + ); + } + }); + } + }; + + masterService = new FakeThreadPoolMasterService("test", deterministicTaskQueue::scheduleNow); + AtomicReference currentState = new AtomicReference<>(getPersistedState().getLastAcceptedState()); + masterService.setClusterStateSupplier(currentState::get); + masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { + final PublishRequest publishRequest; + try { + publishRequest = coordinator.coordinationState.get().handleClientValue(event.state()); + } catch (CoordinationStateRejectedException e) { + publishListener.onFailure(new FailedToCommitClusterStateException("rejected client value", e)); + return; + } + + final Publication publication = new Publication(settings, publishRequest, ackListener, + deterministicTaskQueue::getCurrentTimeMillis) { + + @Override + protected void onCompletion(boolean committed) { + if (committed) { + currentState.set(event.state()); + publishListener.onResponse(null); + } else { + publishListener.onFailure(new FailedToCommitClusterStateException("not committed")); + } + } + + @Override + protected boolean isPublishQuorum(VoteCollection votes) { + return coordinator.coordinationState.get().isPublishQuorum(votes); + } + + @Override + protected Optional handlePublishResponse(DiscoveryNode sourceNode, + PublishResponse publishResponse) { + return coordinator.coordinationState.get().handlePublishResponse(sourceNode, publishResponse); + } + + @Override + protected void onJoin(Join join) { + coordinator.handleJoin(join); + } + + @Override + protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, + ActionListener responseActionListener) { + transportService.sendRequest(destination, PUBLISH_STATE_ACTION_NAME, publishRequest, + + new TransportResponseHandler() { + + @Override + public void handleResponse(PublishWithJoinResponse response) { + responseActionListener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + responseActionListener.onFailure(exp); + } + + @Override + public String executor() { + return Names.GENERIC; + } + }); + } + + @Override + protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit, + ActionListener responseActionListener) { + transportService.sendRequest(destination, COMMIT_STATE_ACTION_NAME, applyCommit, + + new TransportResponseHandler() { + + @Override + public void handleResponse(Empty response) { + responseActionListener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + responseActionListener.onFailure(exp); + } + + @Override + public String executor() { + return Names.GENERIC; + } + }); + } + }; + publication.start(emptySet()); + }); + masterService.start(); + + transportService = capturingTransport.createCapturingTransportService( + settings, deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + + coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY), + masterService, this::getPersistedState, Cluster.this::provideUnicastHosts); + + coordinator.start(); + coordinator.startInitialJoin(); + } + + private PersistedState getPersistedState() { + return persistedState; + } + + String getId() { + return localNode.getId(); + } + + public DiscoveryNode getLocalNode() { + return localNode; + } + + boolean isLeader() { + return coordinator.getMode() == Coordinator.Mode.LEADER; + } + + void submitValue(final long value) { + masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return setValue(currentState, value); + } + + @Override + public void onFailure(String source, Exception e) { + logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e); + } + }); + } + } + + private List provideUnicastHosts(HostsResolver ignored) { + return clusterNodes.stream().map(ClusterNode::getLocalNode).map(DiscoveryNode::getAddress).collect(Collectors.toList()); + } + } + + @SuppressWarnings("unchecked") + private static void processMessageReceived(TransportRequest request, RequestHandlerRegistry requestHandler, + TransportChannel transportChannel) throws Exception { + requestHandler.processMessageReceived(request, transportChannel); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index b72c33a5ceb..41e0414570c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -326,7 +326,7 @@ public class DeterministicTaskQueue extends AbstractComponent { @Override public boolean cancel(boolean mayInterruptIfRunning) { - throw new UnsupportedOperationException(); + return false; } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ElectionSchedulerFactoryTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ElectionSchedulerFactoryTests.java index 200a889783b..cc14d232b10 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ElectionSchedulerFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ElectionSchedulerFactoryTests.java @@ -144,37 +144,39 @@ public class ElectionSchedulerFactoryTests extends ESTestCase { { final Settings settings = Settings.builder().put(ELECTION_INITIAL_TIMEOUT_SETTING.getKey(), "0s").build(); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ELECTION_INITIAL_TIMEOUT_SETTING.get(settings)); - assertThat(e.getMessage(), is("Failed to parse value [0s] for setting [cluster.election.initial_timeout] must be >= 1ms")); + assertThat(e.getMessage(), is("failed to parse value [0s] for setting [cluster.election.initial_timeout], must be >= [1ms]")); } { final Settings settings = Settings.builder().put(ELECTION_INITIAL_TIMEOUT_SETTING.getKey(), "10001ms").build(); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ELECTION_INITIAL_TIMEOUT_SETTING.get(settings)); - assertThat(e.getMessage(), is("Failed to parse value [10001ms] for setting [cluster.election.initial_timeout] must be <= 10s")); + assertThat(e.getMessage(), + is("failed to parse value [10001ms] for setting [cluster.election.initial_timeout], must be <= [10s]")); } { final Settings settings = Settings.builder().put(ELECTION_BACK_OFF_TIME_SETTING.getKey(), "0s").build(); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ELECTION_BACK_OFF_TIME_SETTING.get(settings)); - assertThat(e.getMessage(), is("Failed to parse value [0s] for setting [cluster.election.back_off_time] must be >= 1ms")); + assertThat(e.getMessage(), is("failed to parse value [0s] for setting [cluster.election.back_off_time], must be >= [1ms]")); } { final Settings settings = Settings.builder().put(ELECTION_BACK_OFF_TIME_SETTING.getKey(), "60001ms").build(); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ELECTION_BACK_OFF_TIME_SETTING.get(settings)); - assertThat(e.getMessage(), is("Failed to parse value [60001ms] for setting [cluster.election.back_off_time] must be <= 1m")); + assertThat(e.getMessage(), + is("failed to parse value [60001ms] for setting [cluster.election.back_off_time], must be <= [60s]")); } { final Settings settings = Settings.builder().put(ELECTION_MAX_TIMEOUT_SETTING.getKey(), "199ms").build(); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ELECTION_MAX_TIMEOUT_SETTING.get(settings)); - assertThat(e.getMessage(), is("Failed to parse value [199ms] for setting [cluster.election.max_timeout] must be >= 200ms")); + assertThat(e.getMessage(), is("failed to parse value [199ms] for setting [cluster.election.max_timeout], must be >= [200ms]")); } { final Settings settings = Settings.builder().put(ELECTION_MAX_TIMEOUT_SETTING.getKey(), "301s").build(); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ELECTION_MAX_TIMEOUT_SETTING.get(settings)); - assertThat(e.getMessage(), is("Failed to parse value [301s] for setting [cluster.election.max_timeout] must be <= 5m")); + assertThat(e.getMessage(), is("failed to parse value [301s] for setting [cluster.election.max_timeout], must be <= [300s]")); } { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 67570bcb186..9891f8d318f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -61,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static org.hamcrest.Matchers.containsString; import static org.mockito.Matchers.anyObject; @@ -135,6 +136,7 @@ public class NodeJoinTests extends ESTestCase { this.masterService = masterService; TransportService transportService = mock(TransportService.class); when(transportService.getLocalNode()).thenReturn(initialState.nodes().getLocalNode()); + when(transportService.getThreadPool()).thenReturn(threadPool); @SuppressWarnings("unchecked") ArgumentCaptor> joinRequestHandler = ArgumentCaptor.forClass( (Class) TransportRequestHandler.class); @@ -142,7 +144,7 @@ public class NodeJoinTests extends ESTestCase { transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, - () -> new CoordinationStateTests.InMemoryPersistedState(term, initialState)); + () -> new CoordinationStateTests.InMemoryPersistedState(term, initialState), r -> emptyList()); verify(transportService).registerRequestHandler(eq(JoinHelper.JOIN_ACTION_NAME), eq(ThreadPool.Names.GENERIC), eq(false), eq(false), anyObject(), joinRequestHandler.capture()); transportRequestHandler = joinRequestHandler.getValue(); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PreVoteCollectorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PreVoteCollectorTests.java index 2f1b73a90c2..ea986e00739 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PreVoteCollectorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PreVoteCollectorTests.java @@ -109,10 +109,12 @@ public class PreVoteCollectorTests extends ESTestCase { transportService.start(); transportService.acceptIncomingRequests(); - preVoteCollector = new PreVoteCollector(settings, getLocalPreVoteResponse(), transportService, () -> { + preVoteCollector = new PreVoteCollector(settings, transportService, () -> { assert electionOccurred == false; electionOccurred = true; - }); + }, l -> { + }); // TODO need tests that check that the max term seen is updated + preVoteCollector.update(getLocalPreVoteResponse(), null); } private PreVoteResponse getLocalPreVoteResponse() { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index 19c7f436c4f..e08b59bc4b8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -100,7 +100,7 @@ public class PublicationTests extends ESTestCase { Map> pendingPublications = new HashMap<>(); Map> pendingCommits = new HashMap<>(); - Map possibleJoins = new HashMap<>(); + Map joins = new HashMap<>(); MockPublication(Settings settings, PublishRequest publishRequest, Discovery.AckListener ackListener, LongSupplier currentTimeSupplier) { @@ -116,8 +116,8 @@ public class PublicationTests extends ESTestCase { } @Override - protected void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response) { - assertNull(possibleJoins.put(sourceNode, response)); + protected void onJoin(Join join) { + assertNull(joins.put(join.getSourceNode(), join)); } @Override @@ -180,13 +180,17 @@ public class PublicationTests extends ESTestCase { PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( publication.publishRequest); assertNotEquals(processedNode1PublishResponse.get(), publication.pendingCommits.isEmpty()); - assertFalse(publication.possibleJoins.containsKey(e.getKey())); + assertFalse(publication.joins.containsKey(e.getKey())); PublishWithJoinResponse publishWithJoinResponse = new PublishWithJoinResponse(publishResponse, randomBoolean() ? Optional.empty() : Optional.of(new Join(e.getKey(), randomFrom(n1, n2, n3), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()))); e.getValue().onResponse(publishWithJoinResponse); - assertTrue(publication.possibleJoins.containsKey(e.getKey())); - assertEquals(publishWithJoinResponse, publication.possibleJoins.get(e.getKey())); + if (publishWithJoinResponse.getJoin().isPresent()) { + assertTrue(publication.joins.containsKey(e.getKey())); + assertEquals(publishWithJoinResponse.getJoin().get(), publication.joins.get(e.getKey())); + } else { + assertFalse(publication.joins.containsKey(e.getKey())); + } if (e.getKey().equals(n1)) { processedNode1PublishResponse.set(true); } diff --git a/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java b/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java index 49cf102946d..09118309875 100644 --- a/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java +++ b/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java @@ -799,7 +799,7 @@ public class SettingTests extends ESTestCase { = expectThrows(IllegalArgumentException.class, () -> settingWithLowerBound.get(Settings.builder().put("foo", "4999ms").build())); - assertThat(illegalArgumentException.getMessage(), equalTo("Failed to parse value [4999ms] for setting [foo] must be >= 5s")); + assertThat(illegalArgumentException.getMessage(), equalTo("failed to parse value [4999ms] for setting [foo], must be >= [5s]")); Setting settingWithBothBounds = Setting.timeSetting("bar", TimeValue.timeValueSeconds(10), TimeValue.timeValueSeconds(5), TimeValue.timeValueSeconds(20)); @@ -810,12 +810,12 @@ public class SettingTests extends ESTestCase { illegalArgumentException = expectThrows(IllegalArgumentException.class, () -> settingWithBothBounds.get(Settings.builder().put("bar", "4999ms").build())); - assertThat(illegalArgumentException.getMessage(), equalTo("Failed to parse value [4999ms] for setting [bar] must be >= 5s")); + assertThat(illegalArgumentException.getMessage(), equalTo("failed to parse value [4999ms] for setting [bar], must be >= [5s]")); illegalArgumentException = expectThrows(IllegalArgumentException.class, () -> settingWithBothBounds.get(Settings.builder().put("bar", "20001ms").build())); - assertThat(illegalArgumentException.getMessage(), equalTo("Failed to parse value [20001ms] for setting [bar] must be <= 20s")); + assertThat(illegalArgumentException.getMessage(), equalTo("failed to parse value [20001ms] for setting [bar], must be <= [20s]")); } public void testSettingsGroupUpdater() { diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java b/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java index 08a7ea38c72..628b0917d90 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java @@ -84,20 +84,28 @@ public class FakeThreadPoolMasterService extends MasterService { private void scheduleNextTaskIfNecessary() { if (taskInProgress == false && pendingTasks.isEmpty() == false && scheduledNextTask == false) { scheduledNextTask = true; - onTaskAvailableToRun.accept(() -> { - assert taskInProgress == false; - assert waitForPublish == false; - assert scheduledNextTask; - final int taskIndex = randomInt(pendingTasks.size() - 1); - logger.debug("next master service task: choosing task {} of {}", taskIndex, pendingTasks.size()); - final Runnable task = pendingTasks.remove(taskIndex); - taskInProgress = true; - scheduledNextTask = false; - task.run(); - if (waitForPublish == false) { - taskInProgress = false; + onTaskAvailableToRun.accept(new Runnable() { + @Override + public String toString() { + return "master service scheduling next task"; + } + + @Override + public void run() { + assert taskInProgress == false; + assert waitForPublish == false; + assert scheduledNextTask; + final int taskIndex = randomInt(pendingTasks.size() - 1); + logger.debug("next master service task: choosing task {} of {}", taskIndex, pendingTasks.size()); + final Runnable task = pendingTasks.remove(taskIndex); + taskInProgress = true; + scheduledNextTask = false; + task.run(); + if (waitForPublish == false) { + taskInProgress = false; + } + FakeThreadPoolMasterService.this.scheduleNextTaskIfNecessary(); } - scheduleNextTaskIfNecessary(); }); } } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/integration/MonitoringIT.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/integration/MonitoringIT.java index e062ea96de3..89792eec3c8 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/integration/MonitoringIT.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/integration/MonitoringIT.java @@ -373,6 +373,9 @@ public class MonitoringIT extends ESSingleNodeTestCase { assertThat(clusterState.remove("cluster_uuid"), notNullValue()); assertThat(clusterState.remove("master_node"), notNullValue()); assertThat(clusterState.remove("nodes"), notNullValue()); + assertThat(clusterState.remove("term"), notNullValue()); + assertThat(clusterState.remove("last_committed_config"), notNullValue()); + assertThat(clusterState.remove("last_accepted_config"), notNullValue()); assertThat(clusterState.keySet(), empty()); } @@ -538,16 +541,16 @@ public class MonitoringIT extends ESSingleNodeTestCase { if (ti.getLockName() != null) { b.append(" on ").append(ti.getLockName()); } - + if (ti.getLockOwnerName() != null) { b.append(" owned by \"").append(ti.getLockOwnerName()) .append("\" ID=").append(ti.getLockOwnerId()); } - + b.append(ti.isSuspended() ? " (suspended)" : ""); b.append(ti.isInNative() ? " (in native code)" : ""); b.append("\n"); - + final StackTraceElement[] stack = ti.getStackTrace(); final LockInfo lockInfo = ti.getLockInfo(); final MonitorInfo [] monitorInfos = ti.getLockedMonitors(); @@ -559,7 +562,7 @@ public class MonitoringIT extends ESSingleNodeTestCase { .append(lockInfo) .append("\n"); } - + for (MonitorInfo mi : monitorInfos) { if (mi.getLockedStackDepth() == i) { b.append("\t- locked ").append(mi).append("\n");