diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java index 4883a8e1100..6900dff018c 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java @@ -27,9 +27,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningScheduledExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import io.druid.audit.AuditEntry; import io.druid.audit.AuditInfo; @@ -63,6 +60,7 @@ import java.sql.SQLException; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -72,7 +70,6 @@ import java.util.concurrent.atomic.AtomicReference; public class SQLMetadataRuleManager implements MetadataRuleManager { - public static void createDefaultRule( final IDBI dbi, final String ruleTable, @@ -142,13 +139,19 @@ public class SQLMetadataRuleManager implements MetadataRuleManager private final AuditManager auditManager; private final Object lock = new Object(); - - private volatile boolean started = false; - - private volatile ListeningScheduledExecutorService exec = null; - private volatile ListenableFuture future = null; - - private volatile long retryStartTime = 0; + /** The number of times this SQLMetadataRuleManager was started. */ + private long startCount = 0; + /** + * Equal to the current {@link #startCount} value, if the SQLMetadataRuleManager is currently started; -1 if + * currently stopped. + * + * This field is used to implement a simple stamp mechanism instead of just a boolean "started" flag to prevent + * the theoretical situation of two tasks scheduled in {@link #start()} calling {@link #poll()} concurrently, if + * the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions occurs quickly. + */ + private long currentStartOrder = -1; + private ScheduledExecutorService exec = null; + private long retryStartTime = 0; @Inject public SQLMetadataRuleManager( @@ -169,9 +172,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager Preconditions.checkNotNull(config.getAlertThreshold().toStandardDuration()); Preconditions.checkNotNull(config.getPollDuration().toStandardDuration()); - this.rules = new AtomicReference<>( - ImmutableMap.>of() - ); + this.rules = new AtomicReference<>(ImmutableMap.of()); } @Override @@ -179,21 +180,34 @@ public class SQLMetadataRuleManager implements MetadataRuleManager public void start() { synchronized (lock) { - if (started) { + if (currentStartOrder >= 0) { return; } - exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d")); + startCount++; + currentStartOrder = startCount; + long localStartedOrder = currentStartOrder; + + exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d"); createDefaultRule(dbi, getRulesTable(), config.getDefaultRule(), jsonMapper); - future = exec.scheduleWithFixedDelay( + exec.scheduleWithFixedDelay( new Runnable() { @Override public void run() { try { - poll(); + // poll() is synchronized together with start() and stop() to ensure that when stop() exists, poll() + // won't actually run anymore after that (it could only enter the syncrhonized section and exit + // immediately because the localStartedOrder doesn't match the new currentStartOrder). It's needed + // to avoid flakiness in SQLMetadataRuleManagerTest. + // See https://github.com/apache/incubator-druid/issues/6028 + synchronized (lock) { + if (localStartedOrder == currentStartOrder) { + poll(); + } + } } catch (Exception e) { log.error(e, "uncaught exception in rule manager polling thread"); @@ -204,8 +218,6 @@ public class SQLMetadataRuleManager implements MetadataRuleManager config.getPollDuration().toStandardDuration().getMillis(), TimeUnit.MILLISECONDS ); - - started = true; } } @@ -214,15 +226,12 @@ public class SQLMetadataRuleManager implements MetadataRuleManager public void stop() { synchronized (lock) { - if (!started) { + if (currentStartOrder == -1) { return; } - - rules.set(ImmutableMap.>of()); - - future.cancel(false); - future = null; - started = false; + rules.set(ImmutableMap.of()); + currentStartOrder = -1; + // This call cancels the periodic poll() task, scheduled in start(). exec.shutdownNow(); exec = null; }