mirror of https://github.com/apache/druid.git
Synchronize scheduled poll() calls in SQLMetadataRuleManager to prevent flakiness in SqlMetadataRuleManagerTest (#6033)
This commit is contained in:
parent
49dca01b56
commit
7d5eb0c21a
|
@ -27,9 +27,6 @@ import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
|
||||||
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
|
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import io.druid.audit.AuditEntry;
|
import io.druid.audit.AuditEntry;
|
||||||
import io.druid.audit.AuditInfo;
|
import io.druid.audit.AuditInfo;
|
||||||
|
@ -63,6 +60,7 @@ import java.sql.SQLException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
@ -72,7 +70,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
public class SQLMetadataRuleManager implements MetadataRuleManager
|
public class SQLMetadataRuleManager implements MetadataRuleManager
|
||||||
{
|
{
|
||||||
|
|
||||||
|
|
||||||
public static void createDefaultRule(
|
public static void createDefaultRule(
|
||||||
final IDBI dbi,
|
final IDBI dbi,
|
||||||
final String ruleTable,
|
final String ruleTable,
|
||||||
|
@ -142,13 +139,19 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
|
||||||
private final AuditManager auditManager;
|
private final AuditManager auditManager;
|
||||||
|
|
||||||
private final Object lock = new Object();
|
private final Object lock = new Object();
|
||||||
|
/** The number of times this SQLMetadataRuleManager was started. */
|
||||||
private volatile boolean started = false;
|
private long startCount = 0;
|
||||||
|
/**
|
||||||
private volatile ListeningScheduledExecutorService exec = null;
|
* Equal to the current {@link #startCount} value, if the SQLMetadataRuleManager is currently started; -1 if
|
||||||
private volatile ListenableFuture<?> future = null;
|
* currently stopped.
|
||||||
|
*
|
||||||
private volatile long retryStartTime = 0;
|
* This field is used to implement a simple stamp mechanism instead of just a boolean "started" flag to prevent
|
||||||
|
* the theoretical situation of two tasks scheduled in {@link #start()} calling {@link #poll()} concurrently, if
|
||||||
|
* the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions occurs quickly.
|
||||||
|
*/
|
||||||
|
private long currentStartOrder = -1;
|
||||||
|
private ScheduledExecutorService exec = null;
|
||||||
|
private long retryStartTime = 0;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public SQLMetadataRuleManager(
|
public SQLMetadataRuleManager(
|
||||||
|
@ -169,9 +172,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
|
||||||
Preconditions.checkNotNull(config.getAlertThreshold().toStandardDuration());
|
Preconditions.checkNotNull(config.getAlertThreshold().toStandardDuration());
|
||||||
Preconditions.checkNotNull(config.getPollDuration().toStandardDuration());
|
Preconditions.checkNotNull(config.getPollDuration().toStandardDuration());
|
||||||
|
|
||||||
this.rules = new AtomicReference<>(
|
this.rules = new AtomicReference<>(ImmutableMap.of());
|
||||||
ImmutableMap.<String, List<Rule>>of()
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -179,22 +180,35 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
|
||||||
public void start()
|
public void start()
|
||||||
{
|
{
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
if (started) {
|
if (currentStartOrder >= 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d"));
|
startCount++;
|
||||||
|
currentStartOrder = startCount;
|
||||||
|
long localStartedOrder = currentStartOrder;
|
||||||
|
|
||||||
|
exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d");
|
||||||
|
|
||||||
createDefaultRule(dbi, getRulesTable(), config.getDefaultRule(), jsonMapper);
|
createDefaultRule(dbi, getRulesTable(), config.getDefaultRule(), jsonMapper);
|
||||||
future = exec.scheduleWithFixedDelay(
|
exec.scheduleWithFixedDelay(
|
||||||
new Runnable()
|
new Runnable()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void run()
|
public void run()
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
// poll() is synchronized together with start() and stop() to ensure that when stop() exists, poll()
|
||||||
|
// won't actually run anymore after that (it could only enter the syncrhonized section and exit
|
||||||
|
// immediately because the localStartedOrder doesn't match the new currentStartOrder). It's needed
|
||||||
|
// to avoid flakiness in SQLMetadataRuleManagerTest.
|
||||||
|
// See https://github.com/apache/incubator-druid/issues/6028
|
||||||
|
synchronized (lock) {
|
||||||
|
if (localStartedOrder == currentStartOrder) {
|
||||||
poll();
|
poll();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
log.error(e, "uncaught exception in rule manager polling thread");
|
log.error(e, "uncaught exception in rule manager polling thread");
|
||||||
}
|
}
|
||||||
|
@ -204,8 +218,6 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
|
||||||
config.getPollDuration().toStandardDuration().getMillis(),
|
config.getPollDuration().toStandardDuration().getMillis(),
|
||||||
TimeUnit.MILLISECONDS
|
TimeUnit.MILLISECONDS
|
||||||
);
|
);
|
||||||
|
|
||||||
started = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,15 +226,12 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
|
||||||
public void stop()
|
public void stop()
|
||||||
{
|
{
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
if (!started) {
|
if (currentStartOrder == -1) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
rules.set(ImmutableMap.of());
|
||||||
rules.set(ImmutableMap.<String, List<Rule>>of());
|
currentStartOrder = -1;
|
||||||
|
// This call cancels the periodic poll() task, scheduled in start().
|
||||||
future.cancel(false);
|
|
||||||
future = null;
|
|
||||||
started = false;
|
|
||||||
exec.shutdownNow();
|
exec.shutdownNow();
|
||||||
exec = null;
|
exec = null;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue