Separate stop/start logic for LeaderLatch (#17546)

This commit is contained in:
George Shiqi Wu 2024-12-06 16:01:28 -05:00 committed by GitHub
parent f61ec0af85
commit 7736228f37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 10 additions and 5 deletions

View File

@ -105,8 +105,8 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector
} }
catch (Exception ex) { catch (Exception ex) {
log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit();
stopAndCreateNewLeaderLatch();
recreateLeaderLatch(); startLeaderLatch();
} }
} }
@ -120,8 +120,10 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector
} }
leader = false; leader = false;
// give others a chance to become leader.
stopAndCreateNewLeaderLatch();
listener.stopBeingLeader(); listener.stopBeingLeader();
recreateLeaderLatch(); startLeaderLatch();
} }
catch (Exception ex) { catch (Exception ex) {
log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit(); log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit();
@ -206,15 +208,18 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector
listenerExecutor.shutdownNow(); listenerExecutor.shutdownNow();
} }
private void recreateLeaderLatch() private void stopAndCreateNewLeaderLatch()
{ {
// give others a chance to become leader.
CloseableUtils.closeAndSuppressExceptions( CloseableUtils.closeAndSuppressExceptions(
createNewLeaderLatchWithListener(), createNewLeaderLatchWithListener(),
e -> log.warn("Could not close old leader latch; continuing with new one anyway.") e -> log.warn("Could not close old leader latch; continuing with new one anyway.")
); );
leader = false; leader = false;
}
private void startLeaderLatch()
{
try { try {
//Small delay before starting the latch so that others waiting are chosen to become leader. //Small delay before starting the latch so that others waiting are chosen to become leader.
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));