Add PeerFinder#onFoundPeersUpdated (#32939)

Today the PeerFinder silently updates the set of found peers as new peers are
discovered and old ones are disconnected, and elections are scheduled
independently of these changes. In fact, it would be better if the election
scheduler were only activated on discovery of a quorum of peers. This commit
introduces the `onFoundPeersUpdated` method that allows this flow.
This commit is contained in:
David Turner 2018-08-21 08:04:30 +01:00 committed by GitHub
parent cd6326b391
commit e4ef12798e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 76 additions and 18 deletions

View File

@ -89,22 +89,26 @@ public abstract class PeerFinder extends AbstractComponent {
logger.trace("activating with {}", lastAcceptedNodes);
synchronized (mutex) {
assert active == false;
assert assertInactiveWithNoKnownPeers();
active = true;
this.lastAcceptedNodes = lastAcceptedNodes;
leader = Optional.empty();
handleWakeUp();
handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected
}
}
public void deactivate(DiscoveryNode leader) {
final boolean peersRemoved;
synchronized (mutex) {
logger.trace("deactivating and setting leader to {}", leader);
active = false;
handleWakeUp();
peersRemoved = handleWakeUp();
this.leader = Optional.of(leader);
assert assertInactiveWithNoKnownPeers();
}
if (peersRemoved) {
onFoundPeersUpdated();
}
}
// exposed to subclasses for testing
@ -114,7 +118,7 @@ public abstract class PeerFinder extends AbstractComponent {
boolean assertInactiveWithNoKnownPeers() {
assert active == false;
assert peersByAddress.isEmpty();
assert peersByAddress.isEmpty() : peersByAddress.keySet();
return true;
}
@ -142,10 +146,20 @@ public abstract class PeerFinder extends AbstractComponent {
}
/**
* Called on receipt of a PeersResponse from a node that believes it's an active leader, which this node should therefore try and join.
* Invoked on receipt of a PeersResponse from a node that believes it's an active leader, which this node should therefore try and join.
* Note that invocations of this method are not synchronised. By the time it is called we may have been deactivated.
*/
protected abstract void onActiveMasterFound(DiscoveryNode masterNode, long term);
/**
* Invoked when the set of found peers changes. Note that invocations of this method are not fully synchronised, so we only guarantee
* that the change to the set of found peers happens before this method is invoked. If there are multiple concurrent changes then there
* will be multiple concurrent invocations of this method, with no guarantee as to their order. For this reason we do not pass the
* updated set of peers as an argument to this method, leaving it to the implementation to call getFoundPeers() with appropriate
* synchronisation to avoid lost updates. Also, by the time this method is invoked we may have been deactivated.
*/
protected abstract void onFoundPeersUpdated();
public interface TransportAddressConnector {
/**
* Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it.
@ -170,7 +184,6 @@ public abstract class PeerFinder extends AbstractComponent {
}
private List<DiscoveryNode> getFoundPeersUnderLock() {
assert active;
assert holdsLock() : "PeerFinder mutex not held";
return peersByAddress.values().stream().map(Peer::getDiscoveryNode).filter(Objects::nonNull).collect(Collectors.toList());
}
@ -181,16 +194,21 @@ public abstract class PeerFinder extends AbstractComponent {
return peer;
}
private void handleWakeUp() {
/**
* @return whether any peers were removed due to disconnection
*/
private boolean handleWakeUp() {
assert holdsLock() : "PeerFinder mutex not held";
boolean peersRemoved = false;
for (final Peer peer : peersByAddress.values()) {
peer.handleWakeUp();
peersRemoved = peer.handleWakeUp() || peersRemoved; // care: avoid short-circuiting, each peer needs waking up
}
if (active == false) {
logger.trace("not active");
return;
return peersRemoved;
}
logger.trace("probing master nodes from cluster state: {}", lastAcceptedNodes);
@ -220,8 +238,11 @@ public abstract class PeerFinder extends AbstractComponent {
@Override
protected void doRun() {
synchronized (mutex) {
handleWakeUp();
if (handleWakeUp() == false) {
return;
}
}
onFoundPeersUpdated();
}
@Override
@ -229,6 +250,8 @@ public abstract class PeerFinder extends AbstractComponent {
return "PeerFinder::handleWakeUp";
}
});
return peersRemoved;
}
private void startProbe(TransportAddress transportAddress) {
@ -260,12 +283,12 @@ public abstract class PeerFinder extends AbstractComponent {
return discoveryNode.get();
}
void handleWakeUp() {
boolean handleWakeUp() {
assert holdsLock() : "PeerFinder mutex not held";
if (active == false) {
removePeer();
return;
return true;
}
final DiscoveryNode discoveryNode = getDiscoveryNode();
@ -279,8 +302,11 @@ public abstract class PeerFinder extends AbstractComponent {
} else {
logger.trace("{} no longer connected", this);
removePeer();
return true;
}
}
return false;
}
void establishConnection() {
@ -295,12 +321,17 @@ public abstract class PeerFinder extends AbstractComponent {
assert remoteNode.isMasterNode() : remoteNode + " is not master-eligible";
assert remoteNode.equals(getLocalNode()) == false : remoteNode + " is the local node";
synchronized (mutex) {
if (active) {
assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get();
discoveryNode.set(remoteNode);
requestPeers();
if (active == false) {
return;
}
assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get();
discoveryNode.set(remoteNode);
requestPeers();
}
assert holdsLock() == false : "PeerFinder mutex is held in error";
onFoundPeersUpdated();
}
@Override

View File

@ -86,6 +86,7 @@ public class PeerFinderTests extends ESTestCase {
private Set<DiscoveryNode> connectedNodes = new HashSet<>();
private DiscoveryNodes lastAcceptedNodes;
private TransportService transportService;
private Iterable<DiscoveryNode> foundPeersFromNotification;
private static long CONNECTION_TIMEOUT_MILLIS = 30000;
@ -156,6 +157,13 @@ public class PeerFinderTests extends ESTestCase {
discoveredMasterNode = masterNode;
discoveredMasterTerm = OptionalLong.of(term);
}
@Override
protected void onFoundPeersUpdated() {
assert holdsLock() == false : "PeerFinder lock held in error";
foundPeersFromNotification = getFoundPeers();
logger.trace("onFoundPeersUpdated({})", foundPeersFromNotification);
}
}
private void resolveConfiguredHosts(Consumer<List<TransportAddress>> onResult) {
@ -214,13 +222,13 @@ public class PeerFinderTests extends ESTestCase {
lastAcceptedNodes = DiscoveryNodes.builder().localNodeId(localNode.getId()).add(localNode).build();
peerFinder = new TestPeerFinder(settings, transportService, transportAddressConnector);
foundPeersFromNotification = emptyList();
}
@After
public void deactivateAndRunRemainingTasks() {
peerFinder.deactivate(localNode);
deterministicTaskQueue.runAllTasks(); // termination ensures that everything is properly cleaned up
peerFinder.assertInactiveWithNoKnownPeers(); // should eventually have no nodes when deactivated
deterministicTaskQueue.runAllRunnableTasks(random());
}
public void testAddsReachableNodesFromUnicastHostsList() {
@ -693,6 +701,13 @@ public class PeerFinderTests extends ESTestCase {
final Stream<DiscoveryNode> expectedNodes = Arrays.stream(expectedNodesArray);
final Stream<DiscoveryNode> actualNodes = StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false);
assertThat(actualNodes.collect(Collectors.toSet()), equalTo(expectedNodes.collect(Collectors.toSet())));
assertNotifiedOfAllUpdates();
}
private void assertNotifiedOfAllUpdates() {
final Stream<DiscoveryNode> actualNodes = StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false);
final Stream<DiscoveryNode> notifiedNodes = StreamSupport.stream(foundPeersFromNotification.spliterator(), false);
assertThat(notifiedNodes.collect(Collectors.toSet()), equalTo(actualNodes.collect(Collectors.toSet())));
}
private DiscoveryNode newDiscoveryNode(String nodeId) {
@ -700,7 +715,19 @@ public class PeerFinderTests extends ESTestCase {
}
private void runAllRunnableTasks() {
deterministicTaskQueue.scheduleNow(new Runnable() {
@Override
public void run() {
PeerFinderTests.this.assertNotifiedOfAllUpdates();
}
@Override
public String toString() {
return "assertNotifiedOfAllUpdates";
}
});
deterministicTaskQueue.runAllRunnableTasks(random());
assertNotifiedOfAllUpdates();
}
}