From 05547e29b2099646241fe2fd728286ad0b81296c Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 30 Mar 2018 12:43:19 -0700 Subject: [PATCH] Fix SQLMetadataSegmentManager to allow succesive start and stop (#5554) * Fix SQLMetadataSegmentManager to allow succesive start and stop * address comment * add synchronization --- .../io/druid/concurrent/LifecycleLock.java | 20 +++++----- .../druid/concurrent/LifecycleLockTest.java | 3 +- .../metadata/SQLMetadataSegmentManager.java | 37 +++++++++---------- .../cache/LookupCoordinatorManager.java | 29 ++++++++++----- .../metadata/SQLMetadataRuleManagerTest.java | 10 +++++ ...ava => SQLMetadataSegmentManagerTest.java} | 12 +++++- .../cache/LookupCoordinatorManagerTest.java | 21 +++++------ 7 files changed, 80 insertions(+), 52 deletions(-) rename server/src/test/java/io/druid/metadata/{MetadataSegmentManagerTest.java => SQLMetadataSegmentManagerTest.java} (96%) diff --git a/common/src/main/java/io/druid/concurrent/LifecycleLock.java b/common/src/main/java/io/druid/concurrent/LifecycleLock.java index a94e3cd31a3..7cc8e64a113 100644 --- a/common/src/main/java/io/druid/concurrent/LifecycleLock.java +++ b/common/src/main/java/io/druid/concurrent/LifecycleLock.java @@ -175,9 +175,9 @@ public final class LifecycleLock } } - void reset() + void exitStopAndReset() { - if (!compareAndSetState(STOPPED, NOT_STARTED)) { + if (!compareAndSetState(STOPPING, NOT_STARTED)) { throw new IllegalMonitorStateException("Not called exitStop() before reset()"); } } @@ -187,7 +187,7 @@ public final class LifecycleLock /** * Start latch, only one canStart() call in any thread on this LifecycleLock object could return true, if {@link - * #reset()} is not called in between. + * #exitStopAndReset()} is not called in between. */ public boolean canStart() { @@ -257,8 +257,8 @@ public final class LifecycleLock } /** - * If this LifecycleLock is used in a restartable object, which uses {@link #reset()}, exitStop() must be called - * before exit from stop() on this object, usually in a finally block. + * Finalizes stopping the the LifecycleLock. This method must be called before exit from stop() on this object, + * usually in a finally block. If you're using a restartable object, use {@link #exitStopAndReset()} instead. * * @throws IllegalMonitorStateException if {@link #canStop()} is not yet called on this LifecycleLock */ @@ -268,12 +268,14 @@ public final class LifecycleLock } /** - * Resets the LifecycleLock after {@link #exitStop()}, so that {@link #canStart()} could be called again. + * Finalizes stopping the LifecycleLock and resets it, so that {@link #canStart()} could be called again. If this + * LifecycleLock is used in a restartable object, this method must be called before exit from stop() on this object, + * usually in a finally block. * - * @throws IllegalMonitorStateException if {@link #exitStop()} is not yet called on this LifecycleLock + * @throws IllegalMonitorStateException if {@link #canStop()} is not yet called on this LifecycleLock */ - public void reset() + public void exitStopAndReset() { - sync.reset(); + sync.exitStopAndReset(); } } diff --git a/common/src/test/java/io/druid/concurrent/LifecycleLockTest.java b/common/src/test/java/io/druid/concurrent/LifecycleLockTest.java index e1d9f8adbbf..afb55bf2c2c 100644 --- a/common/src/test/java/io/druid/concurrent/LifecycleLockTest.java +++ b/common/src/test/java/io/druid/concurrent/LifecycleLockTest.java @@ -138,8 +138,7 @@ public class LifecycleLockTest lifecycleLock.started(); lifecycleLock.exitStart(); Assert.assertTrue(lifecycleLock.canStop()); - lifecycleLock.exitStop(); - lifecycleLock.reset(); + lifecycleLock.exitStopAndReset(); Assert.assertTrue(lifecycleLock.canStart()); } diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java index 2a4815b4722..8d412368b59 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java @@ -35,7 +35,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import io.druid.client.DruidDataSource; import io.druid.client.ImmutableDruidDataSource; -import io.druid.concurrent.LifecycleLock; import io.druid.guice.ManageLifecycle; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Intervals; @@ -86,7 +85,10 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager private static final Interner DATA_SEGMENT_INTERNER = Interners.newWeakInterner(); private static final EmittingLogger log = new EmittingLogger(SQLMetadataSegmentManager.class); - private final LifecycleLock lifecycleLock = new LifecycleLock(); + // Use to synchronize start() and stop(). These methods should be synchronized to prevent from being called at the + // same time if two different threads are calling them. This might be possible if a druid coordinator gets and drops + // leadership repeatedly in quick succession. + private final Object lock = new Object(); private final ObjectMapper jsonMapper; private final Supplier config; @@ -96,6 +98,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager private volatile ListeningScheduledExecutorService exec = null; private volatile ListenableFuture future = null; + private volatile boolean started; @Inject public SQLMetadataSegmentManager( @@ -116,11 +119,11 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager @LifecycleStart public void start() { - if (!lifecycleLock.canStart()) { - return; - } + synchronized (lock) { + if (started) { + return; + } - try { exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d")); final Duration delay = config.get().getPollDuration().toStandardDuration(); @@ -143,10 +146,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager delay.getMillis(), TimeUnit.MILLISECONDS ); - lifecycleLock.started(); - } - finally { - lifecycleLock.exitStart(); + started = true; } } @@ -154,10 +154,11 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager @LifecycleStop public void stop() { - if (!lifecycleLock.canStop()) { - return; - } - try { + synchronized (lock) { + if (!started) { + return; + } + final ConcurrentHashMap emptyMap = new ConcurrentHashMap<>(); ConcurrentHashMap current; do { @@ -168,9 +169,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager future = null; exec.shutdownNow(); exec = null; - } - finally { - lifecycleLock.exitStop(); + started = false; } } @@ -366,7 +365,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager @Override public boolean isStarted() { - return lifecycleLock.isStarted(); + return started; } @Override @@ -420,7 +419,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager public void poll() { try { - if (!lifecycleLock.isStarted()) { + if (!started) { return; } diff --git a/server/src/main/java/io/druid/server/lookup/cache/LookupCoordinatorManager.java b/server/src/main/java/io/druid/server/lookup/cache/LookupCoordinatorManager.java index ef9d3ff9459..3c4c8f75440 100644 --- a/server/src/main/java/io/druid/server/lookup/cache/LookupCoordinatorManager.java +++ b/server/src/main/java/io/druid/server/lookup/cache/LookupCoordinatorManager.java @@ -36,12 +36,6 @@ import com.google.common.util.concurrent.ListenableScheduledFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.http.client.HttpClient; -import io.druid.java.util.http.client.Request; -import io.druid.java.util.http.client.response.ClientResponse; -import io.druid.java.util.http.client.response.HttpResponseHandler; -import io.druid.java.util.http.client.response.SequenceInputStreamResponseHandler; import io.druid.audit.AuditInfo; import io.druid.common.config.JacksonConfigManager; import io.druid.concurrent.LifecycleLock; @@ -54,6 +48,12 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.StreamUtils; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.ClientResponse; +import io.druid.java.util.http.client.response.HttpResponseHandler; +import io.druid.java.util.http.client.response.SequenceInputStreamResponseHandler; import io.druid.query.lookup.LookupsState; import io.druid.server.http.HostAndPortWithScheme; import io.druid.server.listener.resource.ListenerResource; @@ -124,8 +124,7 @@ public class LookupCoordinatorManager // Updated by config watching service private AtomicReference>> lookupMapConfigRef; - @VisibleForTesting - final LifecycleLock lifecycleLock = new LifecycleLock(); + private final LifecycleLock lifecycleLock = new LifecycleLock(); private ListeningScheduledExecutorService executorService; private ListenableScheduledFuture backgroundManagerFuture; @@ -333,6 +332,17 @@ public class LookupCoordinatorManager return tierLookups.get(lookupName); } + public boolean isStarted() + { + return lifecycleLock.isStarted(); + } + + @VisibleForTesting + boolean awaitStarted(long waitTimeMs) + { + return lifecycleLock.awaitStarted(waitTimeMs, TimeUnit.MILLISECONDS); + } + // start() and stop() are synchronized so that they never run in parallel in case of ZK acting funny or druid bug and // coordinator becomes leader and drops leadership in quick succession. public void start() @@ -439,8 +449,7 @@ public class LookupCoordinatorManager } finally { //so that subsequent start() would happen, even if stop() failed with exception - lifecycleLock.exitStop(); - lifecycleLock.reset(); + lifecycleLock.exitStopAndReset(); } } } diff --git a/server/src/test/java/io/druid/metadata/SQLMetadataRuleManagerTest.java b/server/src/test/java/io/druid/metadata/SQLMetadataRuleManagerTest.java index fb485492f9b..4bee622d0e5 100644 --- a/server/src/test/java/io/druid/metadata/SQLMetadataRuleManagerTest.java +++ b/server/src/test/java/io/druid/metadata/SQLMetadataRuleManagerTest.java @@ -82,6 +82,16 @@ public class SQLMetadataRuleManagerTest ); } + @Test + public void testMultipleStopAndStart() + { + // Simulate successive losing and getting the coordinator leadership + ruleManager.start(); + ruleManager.stop(); + ruleManager.start(); + ruleManager.stop(); + } + @Test public void testRuleInsert() { diff --git a/server/src/test/java/io/druid/metadata/MetadataSegmentManagerTest.java b/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java similarity index 96% rename from server/src/test/java/io/druid/metadata/MetadataSegmentManagerTest.java rename to server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java index 2a67755d13d..df904a02588 100644 --- a/server/src/test/java/io/druid/metadata/MetadataSegmentManagerTest.java +++ b/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java @@ -41,7 +41,7 @@ import org.junit.Test; import java.io.IOException; -public class MetadataSegmentManagerTest +public class SQLMetadataSegmentManagerTest { @Rule public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); @@ -230,4 +230,14 @@ public class MetadataSegmentManagerTest Assert.assertNull(manager.getInventoryValue(newDataSource)); Assert.assertTrue(manager.removeSegment(newDataSource, newSegment.getIdentifier())); } + + @Test + public void testStopAndStart() + { + // Simulate successive losing and getting the coordinator leadership + manager.start(); + manager.stop(); + manager.start(); + manager.stop(); + } } diff --git a/server/src/test/java/io/druid/server/lookup/cache/LookupCoordinatorManagerTest.java b/server/src/test/java/io/druid/server/lookup/cache/LookupCoordinatorManagerTest.java index 3c63189a860..3eb29e25494 100644 --- a/server/src/test/java/io/druid/server/lookup/cache/LookupCoordinatorManagerTest.java +++ b/server/src/test/java/io/druid/server/lookup/cache/LookupCoordinatorManagerTest.java @@ -62,7 +62,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -1256,15 +1255,15 @@ public class LookupCoordinatorManagerTest lookupCoordinatorManagerConfig ); - Assert.assertFalse(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + Assert.assertFalse(manager.isStarted()); manager.start(); - Assert.assertTrue(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + Assert.assertTrue(manager.awaitStarted(1)); Assert.assertTrue(manager.backgroundManagerIsRunning()); Assert.assertFalse(manager.waitForBackgroundTermination(10)); manager.stop(); - Assert.assertFalse(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + Assert.assertFalse(manager.awaitStarted(1)); Assert.assertTrue(manager.waitForBackgroundTermination(10)); Assert.assertFalse(manager.backgroundManagerIsRunning()); @@ -1293,35 +1292,35 @@ public class LookupCoordinatorManagerTest lookupCoordinatorManagerConfig ); - Assert.assertFalse(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + Assert.assertFalse(manager.awaitStarted(1)); manager.start(); - Assert.assertTrue(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + Assert.assertTrue(manager.awaitStarted(1)); Assert.assertTrue(manager.backgroundManagerIsRunning()); Assert.assertFalse(manager.waitForBackgroundTermination(10)); manager.stop(); - Assert.assertFalse(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + Assert.assertFalse(manager.awaitStarted(1)); Assert.assertTrue(manager.waitForBackgroundTermination(10)); Assert.assertFalse(manager.backgroundManagerIsRunning()); manager.start(); - Assert.assertTrue(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + Assert.assertTrue(manager.awaitStarted(1)); Assert.assertTrue(manager.backgroundManagerIsRunning()); Assert.assertFalse(manager.waitForBackgroundTermination(10)); manager.stop(); - Assert.assertFalse(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + Assert.assertFalse(manager.awaitStarted(1)); Assert.assertTrue(manager.waitForBackgroundTermination(10)); Assert.assertFalse(manager.backgroundManagerIsRunning()); manager.start(); - Assert.assertTrue(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + Assert.assertTrue(manager.awaitStarted(1)); Assert.assertTrue(manager.backgroundManagerIsRunning()); Assert.assertFalse(manager.waitForBackgroundTermination(10)); manager.stop(); - Assert.assertFalse(manager.lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + Assert.assertFalse(manager.awaitStarted(1)); Assert.assertTrue(manager.waitForBackgroundTermination(10)); Assert.assertFalse(manager.backgroundManagerIsRunning());