Synchronize scheduled poll() calls in SQLMetadataSegmentManager (#6041)

Similar issue to https://github.com/apache/incubator-druid/issues/6028.
This commit is contained in:
Jihoon Son 2018-07-24 20:57:30 -07:00 committed by Roman Leventov
parent 7d5eb0c21a
commit 5ee7b0cada
3 changed files with 81 additions and 32 deletions

View File

@ -148,10 +148,12 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
* This field is used to implement a simple stamp mechanism instead of just a boolean "started" flag to prevent
* the theoretical situation of two tasks scheduled in {@link #start()} calling {@link #poll()} concurrently, if
* the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions occurs quickly.
*
* {@link SQLMetadataSegmentManager} also have a similar issue.
*/
private long currentStartOrder = -1;
private ScheduledExecutorService exec = null;
private long retryStartTime = 0;
private long failStartTimeMs = 0;
@Inject
public SQLMetadataRuleManager(
@ -311,17 +313,17 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
log.info("Polled and found rules for %,d datasource(s)", newRules.size());
rules.set(newRules);
retryStartTime = 0;
failStartTimeMs = 0;
}
catch (Exception e) {
if (retryStartTime == 0) {
retryStartTime = System.currentTimeMillis();
if (failStartTimeMs == 0) {
failStartTimeMs = System.currentTimeMillis();
}
if (System.currentTimeMillis() - retryStartTime > config.getAlertThreshold().toStandardDuration().getMillis()) {
if (System.currentTimeMillis() - failStartTimeMs > config.getAlertThreshold().toStandardDuration().getMillis()) {
log.makeAlert(e, "Exception while polling for rules")
.emit();
retryStartTime = 0;
failStartTimeMs = 0;
} else {
log.error(e, "Exception while polling for rules");
}

View File

@ -29,9 +29,6 @@ import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.client.DruidDataSource;
import io.druid.client.ImmutableDruidDataSource;
@ -73,8 +70,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
@ -85,10 +85,16 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner();
private static final EmittingLogger log = new EmittingLogger(SQLMetadataSegmentManager.class);
// Use to synchronize start() and stop(). These methods should be synchronized to prevent from being called at the
// same time if two different threads are calling them. This might be possible if a druid coordinator gets and drops
// leadership repeatedly in quick succession.
private final Object lock = new Object();
/**
* Use to synchronize {@link #start()}, {@link #stop()}, {@link #poll()}, and {@link #isStarted()}. These methods
* should be synchronized to prevent from being called at the same time if two different threads are calling them.
* This might be possible if a druid coordinator gets and drops leadership repeatedly in quick succession.
*/
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
/** {@link #poll()} and {@link #isStarted()} use readLock. */
private final Lock readLock = readWriteLock.readLock();
/** {@link #start()} and {@link #stop()} use writeLock. */
private final Lock writeLock = readWriteLock.writeLock();
private final ObjectMapper jsonMapper;
private final Supplier<MetadataSegmentManagerConfig> config;
@ -96,9 +102,21 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSourcesRef;
private final SQLMetadataConnector connector;
private volatile ListeningScheduledExecutorService exec = null;
private volatile ListenableFuture<?> future = null;
private volatile boolean started;
/** The number of times this SQLMetadataSegmentManager was started. */
private long startCount = 0;
/**
* Equal to the current {@link #startCount} value, if the SQLMetadataSegmentManager is currently started; -1 if
* currently stopped.
*
* This field is used to implement a simple stamp mechanism instead of just a boolean "started" flag to prevent
* the theoretical situation of two or more tasks scheduled in {@link #start()} calling {@link #isStarted()} and
* {@link #poll()} concurrently, if the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions
* occurs quickly.
*
* {@link SQLMetadataRuleManager} also have a similar issue.
*/
private long currentStartOrder = -1;
private ScheduledExecutorService exec = null;
@Inject
public SQLMetadataSegmentManager(
@ -119,34 +137,52 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
writeLock.lock();
try {
if (isStarted()) {
return;
}
exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"));
startCount++;
currentStartOrder = startCount;
final long localStartOrder = currentStartOrder;
exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d");
final Duration delay = config.get().getPollDuration().toStandardDuration();
future = exec.scheduleWithFixedDelay(
exec.scheduleWithFixedDelay(
new Runnable()
{
@Override
public void run()
{
// poll() is synchronized together with start(), stop() and isStarted() to ensure that when stop() exists,
// poll() won't actually run anymore after that (it could only enter the syncrhonized section and exit
// immediately because the localStartedOrder doesn't match the new currentStartOrder). It's needed
// to avoid flakiness in SQLMetadataSegmentManagerTest.
// See https://github.com/apache/incubator-druid/issues/6028
readLock.lock();
try {
poll();
if (localStartOrder == currentStartOrder) {
poll();
}
}
catch (Exception e) {
log.makeAlert(e, "uncaught exception in segment manager polling thread").emit();
}
finally {
readLock.unlock();
}
}
},
0,
delay.getMillis(),
TimeUnit.MILLISECONDS
);
started = true;
}
finally {
writeLock.unlock();
}
}
@ -154,8 +190,9 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
writeLock.lock();
try {
if (!isStarted()) {
return;
}
@ -165,11 +202,12 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
current = dataSourcesRef.get();
} while (!dataSourcesRef.compareAndSet(current, emptyMap));
future.cancel(false);
future = null;
currentStartOrder = -1;
exec.shutdownNow();
exec = null;
started = false;
}
finally {
writeLock.unlock();
}
}
@ -340,7 +378,15 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
@Override
public boolean isStarted()
{
return started;
// isStarted() is synchronized together with start(), stop() and poll() to ensure that the latest currentStartOrder
// is always visible. readLock should be used to avoid unexpected performance degradation of DruidCoordinator.
readLock.lock();
try {
return currentStartOrder >= 0;
}
finally {
readLock.unlock();
}
}
@Override
@ -394,10 +440,6 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
public void poll()
{
try {
if (!started) {
return;
}
ConcurrentHashMap<String, DruidDataSource> newDataSources = new ConcurrentHashMap<>();
log.debug("Starting polling of segment table");

View File

@ -118,6 +118,7 @@ public class SQLMetadataSegmentManagerTest
{
manager.start();
manager.poll();
Assert.assertTrue(manager.isStarted());
Assert.assertEquals(
ImmutableList.of("wikipedia"),
manager.getAllDatasourceNames()
@ -149,6 +150,7 @@ public class SQLMetadataSegmentManagerTest
EmittingLogger.registerEmitter(new NoopServiceEmitter());
manager.start();
manager.poll();
Assert.assertTrue(manager.isStarted());
Assert.assertEquals(
"wikipedia", Iterables.getOnlyElement(manager.getInventory()).getName()
@ -160,6 +162,7 @@ public class SQLMetadataSegmentManagerTest
{
manager.start();
manager.poll();
Assert.assertTrue(manager.isStarted());
Assert.assertTrue(manager.removeDatasource("wikipedia"));
Assert.assertEquals(
@ -178,6 +181,7 @@ public class SQLMetadataSegmentManagerTest
{
manager.start();
manager.poll();
Assert.assertTrue(manager.isStarted());
final String newDataSource = "wikipedia2";
final DataSegment newSegment = new DataSegment(
@ -207,6 +211,7 @@ public class SQLMetadataSegmentManagerTest
{
manager.start();
manager.poll();
Assert.assertTrue(manager.isStarted());
final String newDataSource = "wikipedia2";
final DataSegment newSegment = new DataSegment(