mirror of https://github.com/apache/druid.git
Fix SQLMetadataSegmentManager to allow succesive start and stop (#5554)
* Fix SQLMetadataSegmentManager to allow succesive start and stop * address comment * add synchronization
This commit is contained in:
parent
0c4598c1fe
commit
05547e29b2
|
@ -175,9 +175,9 @@ public final class LifecycleLock
|
|||
}
|
||||
}
|
||||
|
||||
void reset()
|
||||
void exitStopAndReset()
|
||||
{
|
||||
if (!compareAndSetState(STOPPED, NOT_STARTED)) {
|
||||
if (!compareAndSetState(STOPPING, NOT_STARTED)) {
|
||||
throw new IllegalMonitorStateException("Not called exitStop() before reset()");
|
||||
}
|
||||
}
|
||||
|
@ -187,7 +187,7 @@ public final class LifecycleLock
|
|||
|
||||
/**
|
||||
* Start latch, only one canStart() call in any thread on this LifecycleLock object could return true, if {@link
|
||||
* #reset()} is not called in between.
|
||||
* #exitStopAndReset()} is not called in between.
|
||||
*/
|
||||
public boolean canStart()
|
||||
{
|
||||
|
@ -257,8 +257,8 @@ public final class LifecycleLock
|
|||
}
|
||||
|
||||
/**
|
||||
* If this LifecycleLock is used in a restartable object, which uses {@link #reset()}, exitStop() must be called
|
||||
* before exit from stop() on this object, usually in a finally block.
|
||||
* Finalizes stopping the the LifecycleLock. This method must be called before exit from stop() on this object,
|
||||
* usually in a finally block. If you're using a restartable object, use {@link #exitStopAndReset()} instead.
|
||||
*
|
||||
* @throws IllegalMonitorStateException if {@link #canStop()} is not yet called on this LifecycleLock
|
||||
*/
|
||||
|
@ -268,12 +268,14 @@ public final class LifecycleLock
|
|||
}
|
||||
|
||||
/**
|
||||
* Resets the LifecycleLock after {@link #exitStop()}, so that {@link #canStart()} could be called again.
|
||||
* Finalizes stopping the LifecycleLock and resets it, so that {@link #canStart()} could be called again. If this
|
||||
* LifecycleLock is used in a restartable object, this method must be called before exit from stop() on this object,
|
||||
* usually in a finally block.
|
||||
*
|
||||
* @throws IllegalMonitorStateException if {@link #exitStop()} is not yet called on this LifecycleLock
|
||||
* @throws IllegalMonitorStateException if {@link #canStop()} is not yet called on this LifecycleLock
|
||||
*/
|
||||
public void reset()
|
||||
public void exitStopAndReset()
|
||||
{
|
||||
sync.reset();
|
||||
sync.exitStopAndReset();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -138,8 +138,7 @@ public class LifecycleLockTest
|
|||
lifecycleLock.started();
|
||||
lifecycleLock.exitStart();
|
||||
Assert.assertTrue(lifecycleLock.canStop());
|
||||
lifecycleLock.exitStop();
|
||||
lifecycleLock.reset();
|
||||
lifecycleLock.exitStopAndReset();
|
||||
Assert.assertTrue(lifecycleLock.canStart());
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,6 @@ import com.google.common.util.concurrent.MoreExecutors;
|
|||
import com.google.inject.Inject;
|
||||
import io.druid.client.DruidDataSource;
|
||||
import io.druid.client.ImmutableDruidDataSource;
|
||||
import io.druid.concurrent.LifecycleLock;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.java.util.common.DateTimes;
|
||||
import io.druid.java.util.common.Intervals;
|
||||
|
@ -86,7 +85,10 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
|
|||
private static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner();
|
||||
private static final EmittingLogger log = new EmittingLogger(SQLMetadataSegmentManager.class);
|
||||
|
||||
private final LifecycleLock lifecycleLock = new LifecycleLock();
|
||||
// Use to synchronize start() and stop(). These methods should be synchronized to prevent from being called at the
|
||||
// same time if two different threads are calling them. This might be possible if a druid coordinator gets and drops
|
||||
// leadership repeatedly in quick succession.
|
||||
private final Object lock = new Object();
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final Supplier<MetadataSegmentManagerConfig> config;
|
||||
|
@ -96,6 +98,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
|
|||
|
||||
private volatile ListeningScheduledExecutorService exec = null;
|
||||
private volatile ListenableFuture<?> future = null;
|
||||
private volatile boolean started;
|
||||
|
||||
@Inject
|
||||
public SQLMetadataSegmentManager(
|
||||
|
@ -116,11 +119,11 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
|
|||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
if (!lifecycleLock.canStart()) {
|
||||
return;
|
||||
}
|
||||
synchronized (lock) {
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"));
|
||||
|
||||
final Duration delay = config.get().getPollDuration().toStandardDuration();
|
||||
|
@ -143,10 +146,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
|
|||
delay.getMillis(),
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
lifecycleLock.started();
|
||||
}
|
||||
finally {
|
||||
lifecycleLock.exitStart();
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -154,10 +154,11 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
|
|||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
if (!lifecycleLock.canStop()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
synchronized (lock) {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
|
||||
final ConcurrentHashMap<String, DruidDataSource> emptyMap = new ConcurrentHashMap<>();
|
||||
ConcurrentHashMap<String, DruidDataSource> current;
|
||||
do {
|
||||
|
@ -168,9 +169,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
|
|||
future = null;
|
||||
exec.shutdownNow();
|
||||
exec = null;
|
||||
}
|
||||
finally {
|
||||
lifecycleLock.exitStop();
|
||||
started = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -366,7 +365,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
|
|||
@Override
|
||||
public boolean isStarted()
|
||||
{
|
||||
return lifecycleLock.isStarted();
|
||||
return started;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -420,7 +419,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
|
|||
public void poll()
|
||||
{
|
||||
try {
|
||||
if (!lifecycleLock.isStarted()) {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -36,12 +36,6 @@ import com.google.common.util.concurrent.ListenableScheduledFuture;
|
|||
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.java.util.emitter.EmittingLogger;
|
||||
import io.druid.java.util.http.client.HttpClient;
|
||||
import io.druid.java.util.http.client.Request;
|
||||
import io.druid.java.util.http.client.response.ClientResponse;
|
||||
import io.druid.java.util.http.client.response.HttpResponseHandler;
|
||||
import io.druid.java.util.http.client.response.SequenceInputStreamResponseHandler;
|
||||
import io.druid.audit.AuditInfo;
|
||||
import io.druid.common.config.JacksonConfigManager;
|
||||
import io.druid.concurrent.LifecycleLock;
|
||||
|
@ -54,6 +48,12 @@ import io.druid.java.util.common.ISE;
|
|||
import io.druid.java.util.common.StreamUtils;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.java.util.common.concurrent.Execs;
|
||||
import io.druid.java.util.emitter.EmittingLogger;
|
||||
import io.druid.java.util.http.client.HttpClient;
|
||||
import io.druid.java.util.http.client.Request;
|
||||
import io.druid.java.util.http.client.response.ClientResponse;
|
||||
import io.druid.java.util.http.client.response.HttpResponseHandler;
|
||||
import io.druid.java.util.http.client.response.SequenceInputStreamResponseHandler;
|
||||
import io.druid.query.lookup.LookupsState;
|
||||
import io.druid.server.http.HostAndPortWithScheme;
|
||||
import io.druid.server.listener.resource.ListenerResource;
|
||||
|
@ -124,8 +124,7 @@ public class LookupCoordinatorManager
|
|||
// Updated by config watching service
|
||||
private AtomicReference<Map<String, Map<String, LookupExtractorFactoryMapContainer>>> lookupMapConfigRef;
|
||||
|
||||
@VisibleForTesting
|
||||
final LifecycleLock lifecycleLock = new LifecycleLock();
|
||||
private final LifecycleLock lifecycleLock = new LifecycleLock();
|
||||
|
||||
private ListeningScheduledExecutorService executorService;
|
||||
private ListenableScheduledFuture<?> backgroundManagerFuture;
|
||||
|
@ -333,6 +332,17 @@ public class LookupCoordinatorManager
|
|||
return tierLookups.get(lookupName);
|
||||
}
|
||||
|
||||
public boolean isStarted()
|
||||
{
|
||||
return lifecycleLock.isStarted();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean awaitStarted(long waitTimeMs)
|
||||
{
|
||||
return lifecycleLock.awaitStarted(waitTimeMs, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
// start() and stop() are synchronized so that they never run in parallel in case of ZK acting funny or druid bug and
|
||||
// coordinator becomes leader and drops leadership in quick succession.
|
||||
public void start()
|
||||
|
@ -439,8 +449,7 @@ public class LookupCoordinatorManager
|
|||
}
|
||||
finally {
|
||||
//so that subsequent start() would happen, even if stop() failed with exception
|
||||
lifecycleLock.exitStop();
|
||||
lifecycleLock.reset();
|
||||
lifecycleLock.exitStopAndReset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,6 +82,16 @@ public class SQLMetadataRuleManagerTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleStopAndStart()
|
||||
{
|
||||
// Simulate successive losing and getting the coordinator leadership
|
||||
ruleManager.start();
|
||||
ruleManager.stop();
|
||||
ruleManager.start();
|
||||
ruleManager.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRuleInsert()
|
||||
{
|
||||
|
|
|
@ -41,7 +41,7 @@ import org.junit.Test;
|
|||
import java.io.IOException;
|
||||
|
||||
|
||||
public class MetadataSegmentManagerTest
|
||||
public class SQLMetadataSegmentManagerTest
|
||||
{
|
||||
@Rule
|
||||
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
|
||||
|
@ -230,4 +230,14 @@ public class MetadataSegmentManagerTest
|
|||
Assert.assertNull(manager.getInventoryValue(newDataSource));
|
||||
Assert.assertTrue(manager.removeSegment(newDataSource, newSegment.getIdentifier()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStopAndStart()
|
||||
{
|
||||
// Simulate successive losing and getting the coordinator leadership
|
||||
manager.start();
|
||||
manager.stop();
|
||||
manager.start();
|
||||
manager.stop();
|
||||
}
|
||||
}
|
|
@ -62,7 +62,6 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -1256,15 +1255,15 @@ public class LookupCoordinatorManagerTest
|
|||
lookupCoordinatorManagerConfig
|
||||
);
|
||||
|
||||
Assert.assertFalse(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
Assert.assertFalse(manager.isStarted());
|
||||
|
||||
manager.start();
|
||||
Assert.assertTrue(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(manager.awaitStarted(1));
|
||||
Assert.assertTrue(manager.backgroundManagerIsRunning());
|
||||
Assert.assertFalse(manager.waitForBackgroundTermination(10));
|
||||
|
||||
manager.stop();
|
||||
Assert.assertFalse(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
Assert.assertFalse(manager.awaitStarted(1));
|
||||
Assert.assertTrue(manager.waitForBackgroundTermination(10));
|
||||
Assert.assertFalse(manager.backgroundManagerIsRunning());
|
||||
|
||||
|
@ -1293,35 +1292,35 @@ public class LookupCoordinatorManagerTest
|
|||
lookupCoordinatorManagerConfig
|
||||
);
|
||||
|
||||
Assert.assertFalse(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
Assert.assertFalse(manager.awaitStarted(1));
|
||||
|
||||
manager.start();
|
||||
Assert.assertTrue(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(manager.awaitStarted(1));
|
||||
Assert.assertTrue(manager.backgroundManagerIsRunning());
|
||||
Assert.assertFalse(manager.waitForBackgroundTermination(10));
|
||||
|
||||
manager.stop();
|
||||
Assert.assertFalse(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
Assert.assertFalse(manager.awaitStarted(1));
|
||||
Assert.assertTrue(manager.waitForBackgroundTermination(10));
|
||||
Assert.assertFalse(manager.backgroundManagerIsRunning());
|
||||
|
||||
manager.start();
|
||||
Assert.assertTrue(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(manager.awaitStarted(1));
|
||||
Assert.assertTrue(manager.backgroundManagerIsRunning());
|
||||
Assert.assertFalse(manager.waitForBackgroundTermination(10));
|
||||
|
||||
manager.stop();
|
||||
Assert.assertFalse(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
Assert.assertFalse(manager.awaitStarted(1));
|
||||
Assert.assertTrue(manager.waitForBackgroundTermination(10));
|
||||
Assert.assertFalse(manager.backgroundManagerIsRunning());
|
||||
|
||||
manager.start();
|
||||
Assert.assertTrue(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
Assert.assertTrue(manager.awaitStarted(1));
|
||||
Assert.assertTrue(manager.backgroundManagerIsRunning());
|
||||
Assert.assertFalse(manager.waitForBackgroundTermination(10));
|
||||
|
||||
manager.stop();
|
||||
Assert.assertFalse(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
Assert.assertFalse(manager.awaitStarted(1));
|
||||
Assert.assertTrue(manager.waitForBackgroundTermination(10));
|
||||
Assert.assertFalse(manager.backgroundManagerIsRunning());
|
||||
|
||||
|
|
Loading…
Reference in New Issue