Zen2: Trigger join when active master detected (#34008)
Triggers a join when an active master is detected. In order to avoid spamming joins, deduplicates join request based on <target, join> pair. This ensures that a new join is sent whenever the term is incremented or when a new master is found. Also changes the logging of join failures from DEBUG to INFO. These join failures should be happening rarely, and can either indicate a failed election (which should be rare) or a configuration issue.
This commit is contained in:
parent
1d47c9582b
commit
679fb698d0
|
@ -624,7 +624,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
|
||||
@Override
|
||||
protected void onActiveMasterFound(DiscoveryNode masterNode, long term) {
|
||||
// TODO
|
||||
synchronized (mutex) {
|
||||
ensureTermAtLeast(masterNode, term);
|
||||
joinHelper.sendJoinRequest(masterNode, joinWithDestination(lastJoin, masterNode, term));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,9 +27,11 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
|
@ -44,6 +46,7 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.LongSupplier;
|
||||
|
@ -57,6 +60,8 @@ public class JoinHelper extends AbstractComponent {
|
|||
private final TransportService transportService;
|
||||
private final JoinTaskExecutor joinTaskExecutor;
|
||||
|
||||
final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet();
|
||||
|
||||
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
|
||||
TransportService transportService, LongSupplier currentTermSupplier,
|
||||
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm) {
|
||||
|
@ -115,34 +120,44 @@ public class JoinHelper extends AbstractComponent {
|
|||
StartJoinRequest::new,
|
||||
(request, channel, task) -> {
|
||||
final DiscoveryNode destination = request.getSourceNode();
|
||||
final JoinRequest joinRequest
|
||||
= new JoinRequest(transportService.getLocalNode(), Optional.of(joinLeaderInTerm.apply(request)));
|
||||
logger.debug("attempting to join {} with {}", destination, joinRequest);
|
||||
this.transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest, new TransportResponseHandler<Empty>() {
|
||||
@Override
|
||||
public Empty read(StreamInput in) {
|
||||
return Empty.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(Empty response) {
|
||||
logger.debug("successfully joined {} with {}", destination, joinRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.debug(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return Names.SAME;
|
||||
}
|
||||
});
|
||||
sendJoinRequest(destination, Optional.of(joinLeaderInTerm.apply(request)));
|
||||
channel.sendResponse(Empty.INSTANCE);
|
||||
});
|
||||
}
|
||||
|
||||
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
|
||||
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
|
||||
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
|
||||
if (pendingOutgoingJoins.add(dedupKey)) {
|
||||
logger.debug("attempting to join {} with {}", destination, joinRequest);
|
||||
transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest, new TransportResponseHandler<Empty>() {
|
||||
@Override
|
||||
public Empty read(StreamInput in) {
|
||||
return Empty.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(Empty response) {
|
||||
pendingOutgoingJoins.remove(dedupKey);
|
||||
logger.debug("successfully joined {} with {}", destination, joinRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
pendingOutgoingJoins.remove(dedupKey);
|
||||
logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return Names.SAME;
|
||||
}
|
||||
});
|
||||
} else {
|
||||
logger.debug("already attempting to join {} with request {}, not sending request", destination, joinRequest);
|
||||
}
|
||||
}
|
||||
|
||||
public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
|
||||
transportService.sendRequest(destination, START_JOIN_ACTION_NAME,
|
||||
startJoinRequest, new TransportResponseHandler<Empty>() {
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.cluster.coordination.CoordinationStateTests.InMemoryPer
|
|||
import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
|
@ -88,6 +89,18 @@ public class CoordinatorTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testNodesJoinAfterStableCluster() {
|
||||
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
|
||||
cluster.stabilise();
|
||||
|
||||
final long currentTerm = cluster.getAnyLeader().coordinator.getCurrentTerm();
|
||||
cluster.addNodes(randomIntBetween(1, 2));
|
||||
cluster.stabilise();
|
||||
|
||||
final long newTerm = cluster.getAnyLeader().coordinator.getCurrentTerm();
|
||||
assertEquals(currentTerm, newTerm);
|
||||
}
|
||||
|
||||
private static String nodeIdFromIndex(int nodeIndex) {
|
||||
return "node" + nodeIndex;
|
||||
}
|
||||
|
@ -118,6 +131,16 @@ public class CoordinatorTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
void addNodes(int newNodesCount) {
|
||||
logger.info("--> adding {} nodes", newNodesCount);
|
||||
|
||||
final int nodeSizeAtStart = clusterNodes.size();
|
||||
for (int i = 0; i < newNodesCount; i++) {
|
||||
final ClusterNode clusterNode = new ClusterNode(nodeSizeAtStart + i);
|
||||
clusterNodes.add(clusterNode);
|
||||
}
|
||||
}
|
||||
|
||||
void stabilise() {
|
||||
final long stabilisationStartTime = deterministicTaskQueue.getCurrentTimeMillis();
|
||||
while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationStartTime + DEFAULT_STABILISATION_TIME) {
|
||||
|
@ -150,6 +173,8 @@ public class CoordinatorTests extends ESTestCase {
|
|||
= equalTo(Optional.of(leader.coordinator.getLastAcceptedState().getVersion()));
|
||||
|
||||
assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion);
|
||||
assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(leader.getId())),
|
||||
equalTo(Optional.of(true)));
|
||||
|
||||
for (final ClusterNode clusterNode : clusterNodes) {
|
||||
if (clusterNode == leader) {
|
||||
|
@ -166,7 +191,12 @@ public class CoordinatorTests extends ESTestCase {
|
|||
Optional.of(clusterNode.coordinator.getLastAcceptedState().getVersion()), isPresentAndEqualToLeaderVersion);
|
||||
assertThat(nodeId + " is at the same committed version as the leader",
|
||||
clusterNode.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion);
|
||||
assertThat(clusterNode.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)),
|
||||
equalTo(Optional.of(true)));
|
||||
}
|
||||
|
||||
assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(DiscoveryNodes::getSize),
|
||||
equalTo(Optional.of(clusterNodes.size())));
|
||||
}
|
||||
|
||||
ClusterNode getAnyLeader() {
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.cluster.coordination;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.CapturingTransport;
|
||||
import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class JoinHelperTests extends ESTestCase {
|
||||
|
||||
public void testJoinDeduplication() {
|
||||
DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
|
||||
Settings.builder().put(NODE_NAME_SETTING.getKey(), "node0").build());
|
||||
CapturingTransport capturingTransport = new CapturingTransport();
|
||||
DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY,
|
||||
deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
||||
x -> localNode, null, Collections.emptySet());
|
||||
JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L,
|
||||
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); });
|
||||
transportService.start();
|
||||
|
||||
DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
|
||||
// check that sending a join to node1 works
|
||||
Optional<Join> optionalJoin1 = randomBoolean() ? Optional.empty() :
|
||||
Optional.of(new Join(localNode, node1, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
|
||||
joinHelper.sendJoinRequest(node1, optionalJoin1);
|
||||
CapturedRequest[] capturedRequests1 = capturingTransport.getCapturedRequestsAndClear();
|
||||
assertThat(capturedRequests1.length, equalTo(1));
|
||||
CapturedRequest capturedRequest1 = capturedRequests1[0];
|
||||
assertEquals(node1, capturedRequest1.node);
|
||||
|
||||
// check that sending a join to node2 works
|
||||
Optional<Join> optionalJoin2 = randomBoolean() ? Optional.empty() :
|
||||
Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
|
||||
joinHelper.sendJoinRequest(node2, optionalJoin2);
|
||||
CapturedRequest[] capturedRequests2 = capturingTransport.getCapturedRequestsAndClear();
|
||||
assertThat(capturedRequests2.length, equalTo(1));
|
||||
CapturedRequest capturedRequest2 = capturedRequests2[0];
|
||||
assertEquals(node2, capturedRequest2.node);
|
||||
|
||||
// check that sending another join to node1 is a noop as the previous join is still in progress
|
||||
joinHelper.sendJoinRequest(node1, optionalJoin1);
|
||||
assertThat(capturingTransport.getCapturedRequestsAndClear().length, equalTo(0));
|
||||
|
||||
// complete the previous join to node1
|
||||
if (randomBoolean()) {
|
||||
capturingTransport.handleResponse(capturedRequest1.requestId, TransportResponse.Empty.INSTANCE);
|
||||
} else {
|
||||
capturingTransport.handleRemoteError(capturedRequest1.requestId, new CoordinationStateRejectedException("dummy"));
|
||||
}
|
||||
|
||||
// check that sending another join to node1 now works again
|
||||
joinHelper.sendJoinRequest(node1, optionalJoin1);
|
||||
CapturedRequest[] capturedRequests1a = capturingTransport.getCapturedRequestsAndClear();
|
||||
assertThat(capturedRequests1a.length, equalTo(1));
|
||||
CapturedRequest capturedRequest1a = capturedRequests1a[0];
|
||||
assertEquals(node1, capturedRequest1a.node);
|
||||
|
||||
// check that sending another join to node2 works if the optionalJoin is different
|
||||
Optional<Join> optionalJoin2a = optionalJoin2.isPresent() && randomBoolean() ? Optional.empty() :
|
||||
Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
|
||||
joinHelper.sendJoinRequest(node2, optionalJoin2a);
|
||||
CapturedRequest[] capturedRequests2a = capturingTransport.getCapturedRequestsAndClear();
|
||||
assertThat(capturedRequests2a.length, equalTo(1));
|
||||
CapturedRequest capturedRequest2a = capturedRequests2a[0];
|
||||
assertEquals(node2, capturedRequest2a.node);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue