From aef022de98bc350231af38c7ce6959b7a61b5320 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 26 Sep 2018 21:48:02 -0700 Subject: [PATCH] Fix race in taskMaster (#6388) --- .../druid/indexing/overlord/TaskMaster.java | 27 ++++++++++++++----- .../server/coordinator/DruidCoordinator.java | 11 ++++---- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 4c6d5ec8a3c..4428661513a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -25,6 +25,7 @@ import com.google.inject.Inject; import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.curator.discovery.ServiceAnnouncer; import org.apache.druid.discovery.DruidLeaderSelector; +import org.apache.druid.discovery.DruidLeaderSelector.Listener; import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; @@ -49,6 +50,8 @@ import java.util.concurrent.locks.ReentrantLock; */ public class TaskMaster { + private static final EmittingLogger log = new EmittingLogger(TaskMaster.class); + private final DruidLeaderSelector overlordLeaderSelector; private final DruidLeaderSelector.Listener leadershipListener; @@ -61,7 +64,12 @@ public class TaskMaster private volatile TaskRunner taskRunner; private volatile TaskQueue taskQueue; - private static final EmittingLogger log = new EmittingLogger(TaskMaster.class); + /** + * This flag indicates that all services has been started and should be true before calling + * {@link ServiceAnnouncer#announce}. This is set to false immediately once {@link Listener#stopBeingLeader()} is + * called. + */ + private volatile boolean initialized; @Inject public TaskMaster( @@ -127,6 +135,7 @@ public class TaskMaster @Override public void start() { + initialized = true; serviceAnnouncer.announce(node); } @@ -153,6 +162,7 @@ public class TaskMaster { giant.lock(); try { + initialized = false; final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null); if (leaderLifecycle != null) { leaderLifecycle.stop(); @@ -198,9 +208,12 @@ public class TaskMaster } } + /** + * Returns true if it's the leader and its all services have been properly initialized. + */ public boolean isLeader() { - return overlordLeaderSelector.isLeader(); + return overlordLeaderSelector.isLeader() && initialized; } public String getCurrentLeader() @@ -210,7 +223,7 @@ public class TaskMaster public Optional getTaskRunner() { - if (overlordLeaderSelector.isLeader()) { + if (isLeader()) { return Optional.of(taskRunner); } else { return Optional.absent(); @@ -219,7 +232,7 @@ public class TaskMaster public Optional getTaskQueue() { - if (overlordLeaderSelector.isLeader()) { + if (isLeader()) { return Optional.of(taskQueue); } else { return Optional.absent(); @@ -228,7 +241,7 @@ public class TaskMaster public Optional getTaskActionClient(Task task) { - if (overlordLeaderSelector.isLeader()) { + if (isLeader()) { return Optional.of(taskActionClientFactory.create(task)); } else { return Optional.absent(); @@ -237,7 +250,7 @@ public class TaskMaster public Optional getScalingStats() { - if (overlordLeaderSelector.isLeader()) { + if (isLeader()) { return taskRunner.getScalingStats(); } else { return Optional.absent(); @@ -246,7 +259,7 @@ public class TaskMaster public Optional getSupervisorManager() { - if (overlordLeaderSelector.isLeader()) { + if (isLeader()) { return Optional.of(supervisorManager); } else { return Optional.absent(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 6e11833c950..40a973950ce 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -102,6 +102,7 @@ public class DruidCoordinator .reverse(); private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class); + private final Object lock = new Object(); private final DruidCoordinatorConfig config; private final ZkPathsConfig zkPaths; @@ -118,14 +119,15 @@ public class DruidCoordinator private final ServiceAnnouncer serviceAnnouncer; private final DruidNode self; private final Set indexingServiceHelpers; - private volatile boolean started = false; - private volatile SegmentReplicantLookup segmentReplicantLookup = null; private final BalancerStrategyFactory factory; private final LookupCoordinatorManager lookupCoordinatorManager; private final DruidLeaderSelector coordLeaderSelector; private final DruidCoordinatorSegmentCompactor segmentCompactor; + private volatile boolean started = false; + private volatile SegmentReplicantLookup segmentReplicantLookup = null; + @Inject public DruidCoordinator( DruidCoordinatorConfig config, @@ -532,6 +534,7 @@ public class DruidCoordinator metadataSegmentManager.start(); metadataRuleManager.start(); + lookupCoordinatorManager.start(); serviceAnnouncer.announce(self); final int startingLeaderCounter = coordLeaderSelector.localTerm(); @@ -579,8 +582,6 @@ public class DruidCoordinator } ); } - - lookupCoordinatorManager.start(); } } @@ -597,9 +598,9 @@ public class DruidCoordinator loadManagementPeons.clear(); serviceAnnouncer.unannounce(self); + lookupCoordinatorManager.stop(); metadataRuleManager.stop(); metadataSegmentManager.stop(); - lookupCoordinatorManager.stop(); } }