Fix race in taskMaster (#6388)

This commit is contained in:
Jihoon Son 2018-09-26 21:48:02 -07:00 committed by Fangjin Yang
parent fc1d5795c1
commit aef022de98
2 changed files with 26 additions and 12 deletions

View File

@ -25,6 +25,7 @@ import com.google.inject.Inject;
import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.curator.discovery.ServiceAnnouncer; import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.discovery.DruidLeaderSelector.Listener;
import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
@ -49,6 +50,8 @@ import java.util.concurrent.locks.ReentrantLock;
*/ */
public class TaskMaster public class TaskMaster
{ {
private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
private final DruidLeaderSelector overlordLeaderSelector; private final DruidLeaderSelector overlordLeaderSelector;
private final DruidLeaderSelector.Listener leadershipListener; private final DruidLeaderSelector.Listener leadershipListener;
@ -61,7 +64,12 @@ public class TaskMaster
private volatile TaskRunner taskRunner; private volatile TaskRunner taskRunner;
private volatile TaskQueue taskQueue; private volatile TaskQueue taskQueue;
private static final EmittingLogger log = new EmittingLogger(TaskMaster.class); /**
* This flag indicates that all services has been started and should be true before calling
* {@link ServiceAnnouncer#announce}. This is set to false immediately once {@link Listener#stopBeingLeader()} is
* called.
*/
private volatile boolean initialized;
@Inject @Inject
public TaskMaster( public TaskMaster(
@ -127,6 +135,7 @@ public class TaskMaster
@Override @Override
public void start() public void start()
{ {
initialized = true;
serviceAnnouncer.announce(node); serviceAnnouncer.announce(node);
} }
@ -153,6 +162,7 @@ public class TaskMaster
{ {
giant.lock(); giant.lock();
try { try {
initialized = false;
final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null); final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null);
if (leaderLifecycle != null) { if (leaderLifecycle != null) {
leaderLifecycle.stop(); leaderLifecycle.stop();
@ -198,9 +208,12 @@ public class TaskMaster
} }
} }
/**
* Returns true if it's the leader and its all services have been properly initialized.
*/
public boolean isLeader() public boolean isLeader()
{ {
return overlordLeaderSelector.isLeader(); return overlordLeaderSelector.isLeader() && initialized;
} }
public String getCurrentLeader() public String getCurrentLeader()
@ -210,7 +223,7 @@ public class TaskMaster
public Optional<TaskRunner> getTaskRunner() public Optional<TaskRunner> getTaskRunner()
{ {
if (overlordLeaderSelector.isLeader()) { if (isLeader()) {
return Optional.of(taskRunner); return Optional.of(taskRunner);
} else { } else {
return Optional.absent(); return Optional.absent();
@ -219,7 +232,7 @@ public class TaskMaster
public Optional<TaskQueue> getTaskQueue() public Optional<TaskQueue> getTaskQueue()
{ {
if (overlordLeaderSelector.isLeader()) { if (isLeader()) {
return Optional.of(taskQueue); return Optional.of(taskQueue);
} else { } else {
return Optional.absent(); return Optional.absent();
@ -228,7 +241,7 @@ public class TaskMaster
public Optional<TaskActionClient> getTaskActionClient(Task task) public Optional<TaskActionClient> getTaskActionClient(Task task)
{ {
if (overlordLeaderSelector.isLeader()) { if (isLeader()) {
return Optional.of(taskActionClientFactory.create(task)); return Optional.of(taskActionClientFactory.create(task));
} else { } else {
return Optional.absent(); return Optional.absent();
@ -237,7 +250,7 @@ public class TaskMaster
public Optional<ScalingStats> getScalingStats() public Optional<ScalingStats> getScalingStats()
{ {
if (overlordLeaderSelector.isLeader()) { if (isLeader()) {
return taskRunner.getScalingStats(); return taskRunner.getScalingStats();
} else { } else {
return Optional.absent(); return Optional.absent();
@ -246,7 +259,7 @@ public class TaskMaster
public Optional<SupervisorManager> getSupervisorManager() public Optional<SupervisorManager> getSupervisorManager()
{ {
if (overlordLeaderSelector.isLeader()) { if (isLeader()) {
return Optional.of(supervisorManager); return Optional.of(supervisorManager);
} else { } else {
return Optional.absent(); return Optional.absent();

View File

@ -102,6 +102,7 @@ public class DruidCoordinator
.reverse(); .reverse();
private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class); private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class);
private final Object lock = new Object(); private final Object lock = new Object();
private final DruidCoordinatorConfig config; private final DruidCoordinatorConfig config;
private final ZkPathsConfig zkPaths; private final ZkPathsConfig zkPaths;
@ -118,14 +119,15 @@ public class DruidCoordinator
private final ServiceAnnouncer serviceAnnouncer; private final ServiceAnnouncer serviceAnnouncer;
private final DruidNode self; private final DruidNode self;
private final Set<DruidCoordinatorHelper> indexingServiceHelpers; private final Set<DruidCoordinatorHelper> indexingServiceHelpers;
private volatile boolean started = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
private final BalancerStrategyFactory factory; private final BalancerStrategyFactory factory;
private final LookupCoordinatorManager lookupCoordinatorManager; private final LookupCoordinatorManager lookupCoordinatorManager;
private final DruidLeaderSelector coordLeaderSelector; private final DruidLeaderSelector coordLeaderSelector;
private final DruidCoordinatorSegmentCompactor segmentCompactor; private final DruidCoordinatorSegmentCompactor segmentCompactor;
private volatile boolean started = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
@Inject @Inject
public DruidCoordinator( public DruidCoordinator(
DruidCoordinatorConfig config, DruidCoordinatorConfig config,
@ -532,6 +534,7 @@ public class DruidCoordinator
metadataSegmentManager.start(); metadataSegmentManager.start();
metadataRuleManager.start(); metadataRuleManager.start();
lookupCoordinatorManager.start();
serviceAnnouncer.announce(self); serviceAnnouncer.announce(self);
final int startingLeaderCounter = coordLeaderSelector.localTerm(); final int startingLeaderCounter = coordLeaderSelector.localTerm();
@ -579,8 +582,6 @@ public class DruidCoordinator
} }
); );
} }
lookupCoordinatorManager.start();
} }
} }
@ -597,9 +598,9 @@ public class DruidCoordinator
loadManagementPeons.clear(); loadManagementPeons.clear();
serviceAnnouncer.unannounce(self); serviceAnnouncer.unannounce(self);
lookupCoordinatorManager.stop();
metadataRuleManager.stop(); metadataRuleManager.stop();
metadataSegmentManager.stop(); metadataSegmentManager.stop();
lookupCoordinatorManager.stop();
} }
} }