mirror of https://github.com/apache/druid.git
DruidOverlord: Move becomeLeader/stopBeingLeader earlier. (#17415)
* DruidOverlord: Move becomeLeader/stopBeingLeader earlier. On becoming leader, it is helpful for the TaskRunner and TaskQueue to be available when the SupervisorManager starts up, to aid the supervisors in discovering their tasks. On stopping leadership, it is helpful for the TaskRunner and TaskQueue to be available until the SupervisorManager has finished shutting down. They are only available when the TaskMaster is in "leader" mode, so to achieve the above, this patch moves it earlier in the sequence. * Adjust leadership into two phases. * Update test. * Adjustments for coverage. * Stop mirrors start better.
This commit is contained in:
parent
446a8f466f
commit
6a9c050095
|
@ -130,8 +130,27 @@ public class DruidOverlord
|
|||
.emit();
|
||||
}
|
||||
|
||||
// First add "half leader" services: everything required for APIs except the supervisor manager.
|
||||
// Then, become "half leader" so those APIs light up and supervisor initialization can proceed.
|
||||
leaderLifecycle.addManagedInstance(taskRunner);
|
||||
leaderLifecycle.addManagedInstance(taskQueue);
|
||||
leaderLifecycle.addHandler(
|
||||
new Lifecycle.Handler() {
|
||||
@Override
|
||||
public void start()
|
||||
{
|
||||
segmentAllocationQueue.becomeLeader();
|
||||
taskMaster.becomeHalfLeader(taskRunner, taskQueue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop()
|
||||
{
|
||||
taskMaster.stopBeingLeader();
|
||||
segmentAllocationQueue.stopBeingLeader();
|
||||
}
|
||||
}
|
||||
);
|
||||
leaderLifecycle.addManagedInstance(supervisorManager);
|
||||
leaderLifecycle.addManagedInstance(overlordDutyExecutor);
|
||||
leaderLifecycle.addHandler(
|
||||
|
@ -140,8 +159,7 @@ public class DruidOverlord
|
|||
@Override
|
||||
public void start()
|
||||
{
|
||||
segmentAllocationQueue.becomeLeader();
|
||||
taskMaster.becomeLeader(taskRunner, taskQueue);
|
||||
taskMaster.becomeFullLeader();
|
||||
compactionScheduler.start();
|
||||
|
||||
// Announce the node only after all the services have been initialized
|
||||
|
@ -154,8 +172,7 @@ public class DruidOverlord
|
|||
{
|
||||
serviceAnnouncer.unannounce(node);
|
||||
compactionScheduler.stop();
|
||||
taskMaster.stopBeingLeader();
|
||||
segmentAllocationQueue.stopBeingLeader();
|
||||
taskMaster.downgradeToHalfLeader();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.druid.server.metrics.TaskSlotCountStatsProvider;
|
|||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* Encapsulates various Overlord classes that allow querying and updating the
|
||||
|
@ -40,12 +40,28 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
*/
|
||||
public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsProvider
|
||||
{
|
||||
enum LeadershipState
|
||||
{
|
||||
NOT_LEADER,
|
||||
|
||||
/**
|
||||
* Leader of essential services only: task queue, task action client, and task runner. We enter this state after
|
||||
* the queue and runner are initialized, but before the supervisor manager is not yet initialized.
|
||||
*/
|
||||
HALF_LEADER,
|
||||
|
||||
/**
|
||||
* Leader of all services. We enter this state after the supervisor manager is initialized.
|
||||
*/
|
||||
FULL_LEADER
|
||||
}
|
||||
|
||||
private final TaskActionClientFactory taskActionClientFactory;
|
||||
private final SupervisorManager supervisorManager;
|
||||
private volatile TaskRunner taskRunner;
|
||||
private volatile TaskQueue taskQueue;
|
||||
|
||||
private final AtomicBoolean isLeader = new AtomicBoolean(false);
|
||||
private final AtomicReference<LeadershipState> leadershipState = new AtomicReference<>(LeadershipState.NOT_LEADER);
|
||||
|
||||
@Inject
|
||||
public TaskMaster(
|
||||
|
@ -57,28 +73,45 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
|
|||
this.supervisorManager = supervisorManager;
|
||||
}
|
||||
|
||||
public void becomeLeader(TaskRunner taskRunner, TaskQueue taskQueue)
|
||||
/**
|
||||
* Enter state {@link LeadershipState#HALF_LEADER}, from any state.
|
||||
*/
|
||||
public void becomeHalfLeader(TaskRunner taskRunner, TaskQueue taskQueue)
|
||||
{
|
||||
this.taskRunner = taskRunner;
|
||||
this.taskQueue = taskQueue;
|
||||
isLeader.set(true);
|
||||
leadershipState.set(LeadershipState.HALF_LEADER);
|
||||
}
|
||||
|
||||
/**
|
||||
* Enter state {@link LeadershipState#HALF_LEADER}, from {@link LeadershipState#FULL_LEADER}.
|
||||
*/
|
||||
public void downgradeToHalfLeader()
|
||||
{
|
||||
leadershipState.compareAndSet(LeadershipState.FULL_LEADER, LeadershipState.HALF_LEADER);
|
||||
}
|
||||
|
||||
/**
|
||||
* Enter state {@link LeadershipState#FULL_LEADER}.
|
||||
*/
|
||||
public void becomeFullLeader()
|
||||
{
|
||||
leadershipState.set(LeadershipState.FULL_LEADER);
|
||||
}
|
||||
|
||||
/**
|
||||
* Enter state {@link LeadershipState#NOT_LEADER}.
|
||||
*/
|
||||
public void stopBeingLeader()
|
||||
{
|
||||
isLeader.set(false);
|
||||
leadershipState.set(LeadershipState.NOT_LEADER);
|
||||
this.taskQueue = null;
|
||||
this.taskRunner = null;
|
||||
}
|
||||
|
||||
private boolean isLeader()
|
||||
{
|
||||
return isLeader.get();
|
||||
}
|
||||
|
||||
public Optional<TaskRunner> getTaskRunner()
|
||||
{
|
||||
if (isLeader()) {
|
||||
if (isHalfOrFullLeader()) {
|
||||
return Optional.of(taskRunner);
|
||||
} else {
|
||||
return Optional.absent();
|
||||
|
@ -87,7 +120,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
|
|||
|
||||
public Optional<TaskQueue> getTaskQueue()
|
||||
{
|
||||
if (isLeader()) {
|
||||
if (isHalfOrFullLeader()) {
|
||||
return Optional.of(taskQueue);
|
||||
} else {
|
||||
return Optional.absent();
|
||||
|
@ -96,7 +129,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
|
|||
|
||||
public Optional<TaskActionClient> getTaskActionClient(Task task)
|
||||
{
|
||||
if (isLeader()) {
|
||||
if (isHalfOrFullLeader()) {
|
||||
return Optional.of(taskActionClientFactory.create(task));
|
||||
} else {
|
||||
return Optional.absent();
|
||||
|
@ -105,7 +138,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
|
|||
|
||||
public Optional<ScalingStats> getScalingStats()
|
||||
{
|
||||
if (isLeader()) {
|
||||
if (isHalfOrFullLeader()) {
|
||||
return taskRunner.getScalingStats();
|
||||
} else {
|
||||
return Optional.absent();
|
||||
|
@ -114,7 +147,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
|
|||
|
||||
public Optional<SupervisorManager> getSupervisorManager()
|
||||
{
|
||||
if (isLeader()) {
|
||||
if (isFullLeader()) {
|
||||
return Optional.of(supervisorManager);
|
||||
} else {
|
||||
return Optional.absent();
|
||||
|
@ -246,4 +279,15 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isHalfOrFullLeader()
|
||||
{
|
||||
final LeadershipState state = leadershipState.get();
|
||||
return state == LeadershipState.HALF_LEADER || state == LeadershipState.FULL_LEADER;
|
||||
}
|
||||
|
||||
public boolean isFullLeader()
|
||||
{
|
||||
return leadershipState.get() == LeadershipState.FULL_LEADER;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,7 +95,6 @@ public class OverlordCompactionSchedulerTest
|
|||
private CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig;
|
||||
|
||||
private TaskMaster taskMaster;
|
||||
private TaskRunner taskRunner;
|
||||
private TaskQueue taskQueue;
|
||||
private BlockingExecutorService executor;
|
||||
|
||||
|
@ -108,11 +107,20 @@ public class OverlordCompactionSchedulerTest
|
|||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
taskRunner = Mockito.mock(TaskRunner.class);
|
||||
final TaskRunner taskRunner = Mockito.mock(TaskRunner.class);
|
||||
taskQueue = Mockito.mock(TaskQueue.class);
|
||||
|
||||
taskMaster = new TaskMaster(null, null);
|
||||
taskMaster.becomeLeader(taskRunner, taskQueue);
|
||||
Assert.assertFalse(taskMaster.isHalfOrFullLeader());
|
||||
Assert.assertFalse(taskMaster.isFullLeader());
|
||||
|
||||
taskMaster.becomeHalfLeader(taskRunner, taskQueue);
|
||||
Assert.assertTrue(taskMaster.isHalfOrFullLeader());
|
||||
Assert.assertFalse(taskMaster.isFullLeader());
|
||||
|
||||
taskMaster.becomeFullLeader();
|
||||
Assert.assertTrue(taskMaster.isHalfOrFullLeader());
|
||||
Assert.assertTrue(taskMaster.isFullLeader());
|
||||
|
||||
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
|
||||
|
||||
|
|
Loading…
Reference in New Issue