From a0d32f59475070756de43178ccaa5558173959f2 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 23 Aug 2018 19:18:52 +0200 Subject: [PATCH] Zen2: Add leader-side join handling logic (#33013) Adds the logic for handling joins by a prospective leader. Introduces the Coordinator class with the basic lifecycle modes (candidate, leader, follower) as well as a JoinHelper class that contains most of the plumbing for handling joins. --- .../coordination/CoordinationState.java | 1 + .../cluster/coordination/Coordinator.java | 211 ++++++++ .../cluster/coordination/JoinHelper.java | 217 ++++++++ .../cluster/coordination/JoinRequest.java | 87 ++++ .../cluster/coordination/MessagesTests.java | 24 + .../cluster/coordination/NodeJoinTests.java | 492 ++++++++++++++++++ .../cluster/FakeThreadPoolMasterService.java | 2 +- .../org/elasticsearch/test/ESTestCase.java | 4 +- 8 files changed, 1035 insertions(+), 3 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index 27544405d3c..8a6e6d18332 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -227,6 +227,7 @@ public class CoordinationState extends AbstractComponent { boolean added = joinVotes.addVote(join.getSourceNode()); boolean prevElectionWon = electionWon; electionWon = isElectionQuorum(joinVotes); + assert !prevElectionWon || electionWon; // we cannot go from won to not won logger.debug("handleJoin: added join {} from [{}] for election, electionWon={} lastAcceptedTerm={} lastAcceptedVersion={}", join, join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion()); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java new file mode 100644 index 00000000000..7528050e4df --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -0,0 +1,211 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.TransportService; + +import java.util.Optional; +import java.util.function.Supplier; + +public class Coordinator extends AbstractLifecycleComponent { + + private final TransportService transportService; + private final JoinHelper joinHelper; + private final Supplier persistedStateSupplier; + // TODO: the following two fields are package-private as some tests require access to them + // These tests can be rewritten to use public methods once Coordinator is more feature-complete + final Object mutex = new Object(); + final SetOnce coordinationState = new SetOnce<>(); // initialized on start-up (see doStart) + + private Mode mode; + private Optional lastKnownLeader; + private Optional lastJoin; + private JoinHelper.JoinAccumulator joinAccumulator; + + public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService, + MasterService masterService, Supplier persistedStateSupplier) { + super(settings); + this.transportService = transportService; + this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, + this::getCurrentTerm, this::handleJoinRequest); + this.persistedStateSupplier = persistedStateSupplier; + this.lastKnownLeader = Optional.empty(); + this.lastJoin = Optional.empty(); + this.joinAccumulator = joinHelper.new CandidateJoinAccumulator(); + } + + private Optional ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + if (getCurrentTerm() < targetTerm) { + return Optional.of(joinLeaderInTerm(new StartJoinRequest(sourceNode, targetTerm))); + } + return Optional.empty(); + } + + private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + logger.debug("joinLeaderInTerm: from [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm()); + Join join = coordinationState.get().handleStartJoin(startJoinRequest); + lastJoin = Optional.of(join); + if (mode != Mode.CANDIDATE) { + becomeCandidate("joinLeaderInTerm"); + } + return join; + } + + private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) { + assert Thread.holdsLock(mutex) == false; + logger.trace("handleJoin: as {}, handling {}", mode, joinRequest); + transportService.connectToNode(joinRequest.getSourceNode()); + + final Optional optionalJoin = joinRequest.getOptionalJoin(); + synchronized (mutex) { + final CoordinationState coordState = coordinationState.get(); + final boolean prevElectionWon = coordState.electionWon(); + + if (optionalJoin.isPresent()) { + Join join = optionalJoin.get(); + // if someone thinks we should be master, let's add our vote and try to become one + // note that the following line should never throw an exception + ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(coordState::handleJoin); + + if (coordState.electionWon()) { + // if we have already won the election then the actual join does not matter for election purposes, + // so swallow any exception + try { + coordState.handleJoin(join); + } catch (CoordinationStateRejectedException e) { + logger.trace("failed to add join, ignoring", e); + } + } else { + coordState.handleJoin(join); // this might fail and bubble up the exception + } + } + + joinAccumulator.handleJoinRequest(joinRequest.getSourceNode(), joinCallback); + + if (prevElectionWon == false && coordState.electionWon()) { + becomeLeader("handleJoin"); + } + } + } + + void becomeCandidate(String method) { + assert Thread.holdsLock(mutex) : "Legislator mutex not held"; + logger.debug("{}: becoming CANDIDATE (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader); + + if (mode != Mode.CANDIDATE) { + mode = Mode.CANDIDATE; + joinAccumulator.close(mode); + joinAccumulator = joinHelper.new CandidateJoinAccumulator(); + } + } + + void becomeLeader(String method) { + assert Thread.holdsLock(mutex) : "Legislator mutex not held"; + assert mode == Mode.CANDIDATE : "expected candidate but was " + mode; + logger.debug("{}: becoming LEADER (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader); + + mode = Mode.LEADER; + lastKnownLeader = Optional.of(getLocalNode()); + joinAccumulator.close(mode); + joinAccumulator = joinHelper.new LeaderJoinAccumulator(); + } + + void becomeFollower(String method, DiscoveryNode leaderNode) { + assert Thread.holdsLock(mutex) : "Legislator mutex not held"; + logger.debug("{}: becoming FOLLOWER of [{}] (was {}, lastKnownLeader was [{}])", method, leaderNode, mode, lastKnownLeader); + + if (mode != Mode.FOLLOWER) { + mode = Mode.FOLLOWER; + joinAccumulator.close(mode); + joinAccumulator = new JoinHelper.FollowerJoinAccumulator(); + } + + lastKnownLeader = Optional.of(leaderNode); + } + + // package-visible for testing + long getCurrentTerm() { + synchronized (mutex) { + return coordinationState.get().getCurrentTerm(); + } + } + + // package-visible for testing + Mode getMode() { + synchronized (mutex) { + return mode; + } + } + + // package-visible for testing + DiscoveryNode getLocalNode() { + return transportService.getLocalNode(); + } + + @Override + protected void doStart() { + CoordinationState.PersistedState persistedState = persistedStateSupplier.get(); + coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState)); + } + + public void startInitialJoin() { + synchronized (mutex) { + becomeCandidate("startInitialJoin"); + } + } + + @Override + protected void doStop() { + + } + + @Override + protected void doClose() { + + } + + public void invariant() { + synchronized (mutex) { + if (mode == Mode.LEADER) { + assert coordinationState.get().electionWon(); + assert lastKnownLeader.isPresent() && lastKnownLeader.get().equals(getLocalNode()); + assert joinAccumulator instanceof JoinHelper.LeaderJoinAccumulator; + } else if (mode == Mode.FOLLOWER) { + assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false"; + assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false); + assert joinAccumulator instanceof JoinHelper.FollowerJoinAccumulator; + } else { + assert mode == Mode.CANDIDATE; + assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator; + } + } + } + + public enum Mode { + CANDIDATE, LEADER, FOLLOWER + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java new file mode 100644 index 00000000000..766bdce26da --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -0,0 +1,217 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.coordination.Coordinator.Mode; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.LongSupplier; + +public class JoinHelper extends AbstractComponent { + + public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join"; + + private final MasterService masterService; + private final TransportService transportService; + private final JoinTaskExecutor joinTaskExecutor; + + public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, + TransportService transportService, LongSupplier currentTermSupplier, + BiConsumer joinHandler) { + super(settings); + this.masterService = masterService; + this.transportService = transportService; + this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) { + + @Override + public ClusterTasksResult execute(ClusterState currentState, List joiningTasks) + throws Exception { + // This is called when preparing the next cluster state for publication. There is no guarantee that the term we see here is + // the term under which this state will eventually be published: the current term may be increased after this check due to + // some other activity. That the term is correct is, however, checked properly during publication, so it is sufficient to + // check it here on a best-effort basis. This is fine because a concurrent change indicates the existence of another leader + // in a higher term which will cause this node to stand down. + + final long currentTerm = currentTermSupplier.getAsLong(); + if (currentState.term() != currentTerm) { + currentState = ClusterState.builder(currentState).term(currentTerm).build(); + } + return super.execute(currentState, joiningTasks); + } + + }; + + transportService.registerRequestHandler(JOIN_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, JoinRequest::new, + (request, channel, task) -> joinHandler.accept(request, new JoinCallback() { + + @Override + public void onSuccess() { + try { + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (IOException e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.warn("failed to send back failure on join request", inner); + } + } + + @Override + public String toString() { + return "JoinCallback{request=" + request + "}"; + } + })); + } + + public interface JoinCallback { + void onSuccess(); + + void onFailure(Exception e); + } + + static class JoinTaskListener implements ClusterStateTaskListener { + private final JoinTaskExecutor.Task task; + private final JoinCallback joinCallback; + + JoinTaskListener(JoinTaskExecutor.Task task, JoinCallback joinCallback) { + this.task = task; + this.joinCallback = joinCallback; + } + + @Override + public void onFailure(String source, Exception e) { + joinCallback.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + joinCallback.onSuccess(); + } + + @Override + public String toString() { + return "JoinTaskListener{task=" + task + "}"; + } + } + + interface JoinAccumulator { + void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback); + + default void close(Mode newMode) { + + } + } + + class LeaderJoinAccumulator implements JoinAccumulator { + @Override + public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) { + final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(sender, "join existing leader"); + masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT), + joinTaskExecutor, new JoinTaskListener(task, joinCallback)); + } + + @Override + public String toString() { + return "LeaderJoinAccumulator"; + } + } + + static class FollowerJoinAccumulator implements JoinAccumulator { + @Override + public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) { + joinCallback.onFailure(new CoordinationStateRejectedException("join target is a follower")); + } + + @Override + public String toString() { + return "FollowerJoinAccumulator"; + } + } + + class CandidateJoinAccumulator implements JoinAccumulator { + + private final Map joinRequestAccumulator = new HashMap<>(); + boolean closed; + + @Override + public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) { + assert closed == false : "CandidateJoinAccumulator closed"; + JoinCallback prev = joinRequestAccumulator.put(sender, joinCallback); + if (prev != null) { + prev.onFailure(new CoordinationStateRejectedException("received a newer join from " + sender)); + } + } + + @Override + public void close(Mode newMode) { + assert closed == false : "CandidateJoinAccumulator closed"; + closed = true; + if (newMode == Mode.LEADER) { + final Map pendingAsTasks = new HashMap<>(); + joinRequestAccumulator.forEach((key, value) -> { + final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(key, "elect leader"); + pendingAsTasks.put(task, new JoinTaskListener(task, value)); + }); + + final String stateUpdateSource = "elected-as-master ([" + pendingAsTasks.size() + "] nodes joined)"; + + pendingAsTasks.put(JoinTaskExecutor.BECOME_MASTER_TASK, (source, e) -> { + }); + pendingAsTasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, (source, e) -> { + }); + masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT), + joinTaskExecutor); + } else if (newMode == Mode.FOLLOWER) { + joinRequestAccumulator.values().forEach(joinCallback -> joinCallback.onFailure( + new CoordinationStateRejectedException("became follower"))); + } else { + assert newMode == Mode.CANDIDATE; + assert joinRequestAccumulator.isEmpty() : joinRequestAccumulator.keySet(); + } + } + + @Override + public String toString() { + return "CandidateJoinAccumulator{" + joinRequestAccumulator.keySet() + '}'; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java new file mode 100644 index 00000000000..091a6809c84 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java @@ -0,0 +1,87 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.Optional; + +public class JoinRequest extends TransportRequest { + + private final DiscoveryNode sourceNode; + + private final Optional optionalJoin; + + public JoinRequest(DiscoveryNode sourceNode, Optional optionalJoin) { + assert optionalJoin.isPresent() == false || optionalJoin.get().getSourceNode().equals(sourceNode); + this.sourceNode = sourceNode; + this.optionalJoin = optionalJoin; + } + + public JoinRequest(StreamInput in) throws IOException { + super(in); + sourceNode = new DiscoveryNode(in); + optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + sourceNode.writeTo(out); + out.writeOptionalWriteable(optionalJoin.orElse(null)); + } + + public DiscoveryNode getSourceNode() { + return sourceNode; + } + + public Optional getOptionalJoin() { + return optionalJoin; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof JoinRequest)) return false; + + JoinRequest that = (JoinRequest) o; + + if (!sourceNode.equals(that.sourceNode)) return false; + return optionalJoin.equals(that.optionalJoin); + } + + @Override + public int hashCode() { + int result = sourceNode.hashCode(); + result = 31 * result + optionalJoin.hashCode(); + return result; + } + + @Override + public String toString() { + return "JoinRequest{" + + "sourceNode=" + sourceNode + + ", optionalJoin=" + optionalJoin + + '}'; + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java index 926287503d8..f47fec2913e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java @@ -168,6 +168,30 @@ public class MessagesTests extends ESTestCase { }); } + public void testJoinRequestEqualsHashCodeSerialization() { + Join initialJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong()); + JoinRequest initialJoinRequest = new JoinRequest(initialJoin.getSourceNode(), + randomBoolean() ? Optional.empty() : Optional.of(initialJoin)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialJoinRequest, + joinRequest -> copyWriteable(joinRequest, writableRegistry(), JoinRequest::new), + joinRequest -> { + if (randomBoolean() && joinRequest.getOptionalJoin().isPresent() == false) { + return new JoinRequest(createNode(randomAlphaOfLength(20)), joinRequest.getOptionalJoin()); + } else { + // change OptionalJoin + final Optional newOptionalJoin; + if (joinRequest.getOptionalJoin().isPresent() && randomBoolean()) { + newOptionalJoin = Optional.empty(); + } else { + newOptionalJoin = Optional.of(new Join(joinRequest.getSourceNode(), createNode(randomAlphaOfLength(10)), + randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); + } + return new JoinRequest(joinRequest.getSourceNode(), newOptionalJoin); + } + }); + } + public ClusterState randomClusterState() { return CoordinationStateTests.clusterState(randomNonNegativeLong(), randomNonNegativeLong(), createNode(randomAlphaOfLength(10)), new ClusterState.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(10, 10, false))), diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java new file mode 100644 index 00000000000..67570bcb186 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -0,0 +1,492 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.cluster.service.MasterServiceTests; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.BaseFuture; +import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseOptions; +import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.mockito.ArgumentCaptor; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.Collections.emptyMap; +import static org.hamcrest.Matchers.containsString; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@TestLogging("org.elasticsearch.cluster.service:TRACE,org.elasticsearch.cluster.coordination:TRACE") +public class NodeJoinTests extends ESTestCase { + + private static ThreadPool threadPool; + + private MasterService masterService; + private Coordinator coordinator; + private DeterministicTaskQueue deterministicTaskQueue; + private TransportRequestHandler transportRequestHandler; + + @BeforeClass + public static void beforeClass() { + threadPool = new TestThreadPool(NodeJoinTests.getTestClass().getName()); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + masterService.close(); + } + + private static ClusterState initialState(boolean withMaster, DiscoveryNode localNode, long term, long version, + VotingConfiguration config) { + ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) + .nodes(DiscoveryNodes.builder() + .add(localNode) + .localNodeId(localNode.getId()) + .masterNodeId(withMaster ? localNode.getId() : null)) + .term(term) + .version(version) + .lastAcceptedConfiguration(config) + .lastCommittedConfiguration(config) + .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build(); + return initialClusterState; + } + + private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) { + deterministicTaskQueue = new DeterministicTaskQueue(Settings.EMPTY); + FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test", deterministicTaskQueue::scheduleNow); + AtomicReference currentState = new AtomicReference<>(initialState); + fakeMasterService.setClusterStateSupplier(currentState::get); + fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> { + currentState.set(event.state()); + publishListener.onResponse(null); + }); + fakeMasterService.start(); + setupMasterServiceAndCoordinator(term, initialState, fakeMasterService); + } + + private void setupRealMasterServiceAndCoordinator(long term, ClusterState initialState) { + setupMasterServiceAndCoordinator(term, initialState, ClusterServiceUtils.createMasterService(threadPool, initialState)); + } + + private void setupMasterServiceAndCoordinator(long term, ClusterState initialState, MasterService masterService) { + if (this.masterService != null || coordinator != null) { + throw new IllegalStateException("method setupMasterServiceAndCoordinator can only be called once"); + } + this.masterService = masterService; + TransportService transportService = mock(TransportService.class); + when(transportService.getLocalNode()).thenReturn(initialState.nodes().getLocalNode()); + @SuppressWarnings("unchecked") + ArgumentCaptor> joinRequestHandler = ArgumentCaptor.forClass( + (Class) TransportRequestHandler.class); + coordinator = new Coordinator(Settings.EMPTY, + transportService, + ESAllocationTestCase.createAllocationService(Settings.EMPTY), + masterService, + () -> new CoordinationStateTests.InMemoryPersistedState(term, initialState)); + verify(transportService).registerRequestHandler(eq(JoinHelper.JOIN_ACTION_NAME), eq(ThreadPool.Names.GENERIC), eq(false), eq(false), + anyObject(), joinRequestHandler.capture()); + transportRequestHandler = joinRequestHandler.getValue(); + coordinator.start(); + coordinator.startInitialJoin(); + } + + protected DiscoveryNode newNode(int i) { + return newNode(i, randomBoolean()); + } + + protected DiscoveryNode newNode(int i, boolean master) { + Set roles = new HashSet<>(); + if (master) { + roles.add(DiscoveryNode.Role.MASTER); + } + final String prefix = master ? "master_" : "data_"; + return new DiscoveryNode(prefix + i, i + "", buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT); + } + + static class SimpleFuture extends BaseFuture { + final String description; + + SimpleFuture(String description) { + this.description = description; + } + + public void markAsDone() { + set(null); + } + + public void markAsFailed(Throwable t) { + setException(t); + } + + @Override + public String toString() { + return "future [" + description + "]"; + } + } + + private SimpleFuture joinNodeAsync(final JoinRequest joinRequest) { + final SimpleFuture future = new SimpleFuture("join of " + joinRequest + "]"); + logger.debug("starting {}", future); + // clone the node before submitting to simulate an incoming join, which is guaranteed to have a new + // disco node object serialized off the network + try { + transportRequestHandler.messageReceived(joinRequest, new TransportChannel() { + @Override + public String getProfileName() { + return "dummy"; + } + + @Override + public String getChannelType() { + return "dummy"; + } + + @Override + public void sendResponse(TransportResponse response) { + logger.debug("{} completed", future); + future.markAsDone(); + } + + @Override + public void sendResponse(TransportResponse response, TransportResponseOptions options) { + sendResponse(response); + } + + @Override + public void sendResponse(Exception e) { + logger.error(() -> new ParameterizedMessage("unexpected error for {}", future), e); + future.markAsFailed(e); + } + }, null); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("unexpected error for {}", future), e); + future.markAsFailed(e); + } + return future; + } + + private void joinNode(final JoinRequest joinRequest) { + FutureUtils.get(joinNodeAsync(joinRequest)); + } + + private void joinNodeAndRun(final JoinRequest joinRequest) { + SimpleFuture fut = joinNodeAsync(joinRequest); + deterministicTaskQueue.runAllTasks(random()); + assertTrue(fut.isDone()); + FutureUtils.get(fut); + } + + public void testJoinWithHigherTermElectsLeader() { + DiscoveryNode node0 = newNode(0, true); + DiscoveryNode node1 = newNode(1, true); + long initialTerm = randomLongBetween(1, 10); + long initialVersion = randomLongBetween(1, 10); + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + new VotingConfiguration(Collections.singleton(randomFrom(node0, node1).getId())))); + assertFalse(isLocalNodeElectedMaster()); + long newTerm = initialTerm + randomLongBetween(1, 10); + joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion)))); + assertTrue(isLocalNodeElectedMaster()); + } + + public void testJoinWithHigherTermButBetterStateGetsRejected() { + DiscoveryNode node0 = newNode(0, true); + DiscoveryNode node1 = newNode(1, true); + long initialTerm = randomLongBetween(1, 10); + long initialVersion = randomLongBetween(1, 10); + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + new VotingConfiguration(Collections.singleton(node1.getId())))); + assertFalse(isLocalNodeElectedMaster()); + long newTerm = initialTerm + randomLongBetween(1, 10); + long higherVersion = initialVersion + randomLongBetween(1, 10); + expectThrows(CoordinationStateRejectedException.class, + () -> joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion))))); + assertFalse(isLocalNodeElectedMaster()); + } + + public void testJoinWithHigherTermButBetterStateStillElectsMasterThroughSelfJoin() { + DiscoveryNode node0 = newNode(0, true); + DiscoveryNode node1 = newNode(1, true); + long initialTerm = randomLongBetween(1, 10); + long initialVersion = randomLongBetween(1, 10); + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + new VotingConfiguration(Collections.singleton(node0.getId())))); + assertFalse(isLocalNodeElectedMaster()); + long newTerm = initialTerm + randomLongBetween(1, 10); + long higherVersion = initialVersion + randomLongBetween(1, 10); + joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion)))); + assertTrue(isLocalNodeElectedMaster()); + } + + public void testJoinElectedLeader() { + DiscoveryNode node0 = newNode(0, true); + DiscoveryNode node1 = newNode(1, true); + long initialTerm = randomLongBetween(1, 10); + long initialVersion = randomLongBetween(1, 10); + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + new VotingConfiguration(Collections.singleton(node0.getId())))); + assertFalse(isLocalNodeElectedMaster()); + long newTerm = initialTerm + randomLongBetween(1, 10); + joinNodeAndRun(new JoinRequest(node0, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); + assertTrue(isLocalNodeElectedMaster()); + assertFalse(clusterStateHasNode(node1)); + joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion)))); + assertTrue(isLocalNodeElectedMaster()); + assertTrue(clusterStateHasNode(node1)); + } + + public void testJoinAccumulation() { + DiscoveryNode node0 = newNode(0, true); + DiscoveryNode node1 = newNode(1, true); + DiscoveryNode node2 = newNode(2, true); + long initialTerm = randomLongBetween(1, 10); + long initialVersion = randomLongBetween(1, 10); + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + new VotingConfiguration(Collections.singleton(node2.getId())))); + assertFalse(isLocalNodeElectedMaster()); + long newTerm = initialTerm + randomLongBetween(1, 10); + SimpleFuture futNode0 = joinNodeAsync(new JoinRequest(node0, Optional.of( + new Join(node0, node0, newTerm, initialTerm, initialVersion)))); + deterministicTaskQueue.runAllTasks(random()); + assertFalse(futNode0.isDone()); + assertFalse(isLocalNodeElectedMaster()); + SimpleFuture futNode1 = joinNodeAsync(new JoinRequest(node1, Optional.of( + new Join(node1, node0, newTerm, initialTerm, initialVersion)))); + deterministicTaskQueue.runAllTasks(random()); + assertFalse(futNode1.isDone()); + assertFalse(isLocalNodeElectedMaster()); + joinNodeAndRun(new JoinRequest(node2, Optional.of(new Join(node2, node0, newTerm, initialTerm, initialVersion)))); + assertTrue(isLocalNodeElectedMaster()); + assertTrue(clusterStateHasNode(node1)); + assertTrue(clusterStateHasNode(node2)); + FutureUtils.get(futNode0); + FutureUtils.get(futNode1); + } + + public void testJoinFollowerWithHigherTerm() { + DiscoveryNode node0 = newNode(0, true); + DiscoveryNode node1 = newNode(1, true); + long initialTerm = randomLongBetween(1, 10); + long initialVersion = randomLongBetween(1, 10); + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + new VotingConfiguration(Collections.singleton(node0.getId())))); + long newTerm = initialTerm + randomLongBetween(1, 10); + coordinator.coordinationState.get().handleStartJoin(new StartJoinRequest(node1, newTerm)); + synchronized (coordinator.mutex) { + coordinator.becomeFollower("test", node1); + } + assertFalse(isLocalNodeElectedMaster()); + long newerTerm = newTerm + randomLongBetween(1, 10); + joinNodeAndRun(new JoinRequest(node1, + Optional.of(new Join(node1, node0, newerTerm, initialTerm, initialVersion)))); + assertTrue(isLocalNodeElectedMaster()); + } + + public void testJoinFollowerFails() { + DiscoveryNode node0 = newNode(0, true); + DiscoveryNode node1 = newNode(1, true); + long initialTerm = randomLongBetween(1, 10); + long initialVersion = randomLongBetween(1, 10); + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + new VotingConfiguration(Collections.singleton(node0.getId())))); + long newTerm = initialTerm + randomLongBetween(1, 10); + coordinator.coordinationState.get().handleStartJoin(new StartJoinRequest(node1, newTerm)); + synchronized (coordinator.mutex) { + coordinator.becomeFollower("test", node1); + } + assertFalse(isLocalNodeElectedMaster()); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> joinNodeAndRun(new JoinRequest(node1, Optional.empty()))).getMessage(), + containsString("join target is a follower")); + assertFalse(isLocalNodeElectedMaster()); + } + + public void testBecomeFollowerFailsPendingJoin() { + DiscoveryNode node0 = newNode(0, true); + DiscoveryNode node1 = newNode(1, true); + long initialTerm = randomLongBetween(1, 10); + long initialVersion = randomLongBetween(1, 10); + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(false, node0, initialTerm, initialVersion, + new VotingConfiguration(Collections.singleton(node1.getId())))); + long newTerm = initialTerm + randomLongBetween(1, 10); + SimpleFuture fut = joinNodeAsync(new JoinRequest(node0, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); + deterministicTaskQueue.runAllTasks(random()); + assertFalse(fut.isDone()); + assertFalse(isLocalNodeElectedMaster()); + synchronized (coordinator.mutex) { + coordinator.becomeFollower("test", node1); + } + assertFalse(isLocalNodeElectedMaster()); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> FutureUtils.get(fut)).getMessage(), + containsString("became follower")); + assertFalse(isLocalNodeElectedMaster()); + } + + public void testConcurrentJoining() { + List nodes = IntStream.rangeClosed(1, randomIntBetween(2, 5)) + .mapToObj(nodeId -> newNode(nodeId, true)).collect(Collectors.toList()); + + VotingConfiguration votingConfiguration = new VotingConfiguration( + randomSubsetOf(randomIntBetween(1, nodes.size()), nodes).stream().map(DiscoveryNode::getId).collect(Collectors.toSet())); + + logger.info("Voting configuration: {}", votingConfiguration); + + DiscoveryNode localNode = nodes.get(0); + long initialTerm = randomLongBetween(1, 10); + long initialVersion = randomLongBetween(1, 10); + setupRealMasterServiceAndCoordinator(initialTerm, initialState(false, localNode, initialTerm, initialVersion, votingConfiguration)); + long newTerm = initialTerm + randomLongBetween(1, 10); + + // we need at least a quorum of voting nodes with a correct term and worse state + List successfulNodes; + do { + successfulNodes = randomSubsetOf(nodes); + } while (votingConfiguration.hasQuorum(successfulNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) + == false); + + logger.info("Successful voting nodes: {}", successfulNodes); + + List correctJoinRequests = successfulNodes.stream().map( + node -> new JoinRequest(node, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion)))) + .collect(Collectors.toList()); + + List possiblyUnsuccessfulNodes = new ArrayList<>(nodes); + possiblyUnsuccessfulNodes.removeAll(successfulNodes); + + logger.info("Possibly unsuccessful voting nodes: {}", possiblyUnsuccessfulNodes); + + List possiblyFailingJoinRequests = possiblyUnsuccessfulNodes.stream().map(node -> { + if (randomBoolean()) { + // a correct request + return new JoinRequest(node, Optional.of(new Join(node, localNode, + newTerm, initialTerm, initialVersion))); + } else if (randomBoolean()) { + // term too low + return new JoinRequest(node, Optional.of(new Join(node, localNode, + randomLongBetween(0, initialTerm), initialTerm, initialVersion))); + } else { + // better state + return new JoinRequest(node, Optional.of(new Join(node, localNode, + newTerm, initialTerm, initialVersion + randomLongBetween(1, 10)))); + } + }).collect(Collectors.toList()); + + // duplicate some requests, which will be unsuccessful + possiblyFailingJoinRequests.addAll(randomSubsetOf(possiblyFailingJoinRequests)); + + CyclicBarrier barrier = new CyclicBarrier(correctJoinRequests.size() + possiblyFailingJoinRequests.size() + 1); + List threads = new ArrayList<>(); + threads.add(new Thread(() -> { + try { + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + for (int i = 0; i < 30; i++) { + coordinator.invariant(); + } + })); + threads.addAll(correctJoinRequests.stream().map(joinRequest -> new Thread( + () -> { + try { + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + joinNode(joinRequest); + })).collect(Collectors.toList())); + threads.addAll(possiblyFailingJoinRequests.stream().map(joinRequest -> new Thread(() -> { + try { + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + try { + joinNode(joinRequest); + } catch (CoordinationStateRejectedException ignore) { + // ignore + } + })).collect(Collectors.toList())); + + threads.forEach(Thread::start); + threads.forEach(t -> { + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + assertTrue(MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster()); + successfulNodes.forEach(node -> assertTrue(clusterStateHasNode(node))); + } + + private boolean isLocalNodeElectedMaster() { + return MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster(); + } + + private boolean clusterStateHasNode(DiscoveryNode node) { + return node.equals(MasterServiceTests.discoveryState(masterService).nodes().get(node.getId())); + } +} diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java b/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java index 519704349e1..08a7ea38c72 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java @@ -50,7 +50,7 @@ public class FakeThreadPoolMasterService extends MasterService { private boolean taskInProgress = false; private boolean waitForPublish = false; - FakeThreadPoolMasterService(String serviceName, Consumer onTaskAvailableToRun) { + public FakeThreadPoolMasterService(String serviceName, Consumer onTaskAvailableToRun) { super(Settings.EMPTY, createMockThreadPool()); this.name = serviceName; this.onTaskAvailableToRun = onTaskAvailableToRun; diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 922a6e0d276..d8cd22d92db 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -969,10 +969,10 @@ public abstract class ESTestCase extends LuceneTestCase { } /** - * Returns a random subset of values (including a potential empty list) + * Returns a random subset of values (including a potential empty list, or the full original list) */ public static List randomSubsetOf(Collection collection) { - return randomSubsetOf(randomInt(Math.max(collection.size() - 1, 0)), collection); + return randomSubsetOf(randomInt(collection.size()), collection); } /**