lifecycle stage refactor to ensure proper start and stop ordering of servers and announcements (#7234)

* lifecycle stage refactor to ensure proper ordering of servers and announcements

* move DerivativeDataSourceManager to Lifecycle.Stage.NORMAL
This commit is contained in:
Clint Wylie 2019-03-12 07:09:03 -07:00 committed by Fangjin Yang
parent e240fba247
commit 4d3987c1dd
12 changed files with 108 additions and 39 deletions

View File

@ -41,7 +41,8 @@ public class LifecycleModule implements Module
// the 'stop' method, either failing silently or failing violently and throwing an exception causing an ungraceful exit // the 'stop' method, either failing silently or failing violently and throwing an exception causing an ungraceful exit
private final LifecycleScope initScope = new LifecycleScope(Lifecycle.Stage.INIT); private final LifecycleScope initScope = new LifecycleScope(Lifecycle.Stage.INIT);
private final LifecycleScope scope = new LifecycleScope(Lifecycle.Stage.NORMAL); private final LifecycleScope scope = new LifecycleScope(Lifecycle.Stage.NORMAL);
private final LifecycleScope lastScope = new LifecycleScope(Lifecycle.Stage.LAST); private final LifecycleScope serverScope = new LifecycleScope(Lifecycle.Stage.SERVER);
private final LifecycleScope annoucementsScope = new LifecycleScope(Lifecycle.Stage.ANNOUNCEMENTS);
/** /**
* Registers a class to instantiate eagerly. Classes mentioned here will be pulled out of * Registers a class to instantiate eagerly. Classes mentioned here will be pulled out of
@ -118,7 +119,8 @@ public class LifecycleModule implements Module
binder.bindScope(ManageLifecycleInit.class, initScope); binder.bindScope(ManageLifecycleInit.class, initScope);
binder.bindScope(ManageLifecycle.class, scope); binder.bindScope(ManageLifecycle.class, scope);
binder.bindScope(ManageLifecycleLast.class, lastScope); binder.bindScope(ManageLifecycleServer.class, serverScope);
binder.bindScope(ManageLifecycleAnnouncements.class, annoucementsScope);
} }
@Provides @LazySingleton @Provides @LazySingleton
@ -140,7 +142,8 @@ public class LifecycleModule implements Module
}; };
initScope.setLifecycle(lifecycle); initScope.setLifecycle(lifecycle);
scope.setLifecycle(lifecycle); scope.setLifecycle(lifecycle);
lastScope.setLifecycle(lifecycle); serverScope.setLifecycle(lifecycle);
annoucementsScope.setLifecycle(lifecycle);
return lifecycle; return lifecycle;
} }

View File

@ -28,7 +28,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target; import java.lang.annotation.Target;
/** /**
* Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on Stage.LAST * Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on Stage.ANNOUNCEMENTS
* *
* This Scope gets defined by {@link LifecycleModule} * This Scope gets defined by {@link LifecycleModule}
*/ */
@ -36,6 +36,6 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@ScopeAnnotation @ScopeAnnotation
@PublicApi @PublicApi
public @interface ManageLifecycleLast public @interface ManageLifecycleAnnouncements
{ {
} }

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.guice;
import com.google.inject.ScopeAnnotation;
import org.apache.druid.guice.annotations.PublicApi;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on Stage.SERVER
*
* This Scope gets defined by {@link LifecycleModule}
*/
@Target({ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@ScopeAnnotation
@PublicApi
public @interface ManageLifecycleServer
{
}

View File

@ -40,15 +40,30 @@ import java.util.concurrent.locks.ReentrantLock;
/** /**
* A manager of object Lifecycles. * A manager of object Lifecycles.
* <p/> *
* This object has methods for registering objects that should be started and stopped. The Lifecycle allows for * This object has methods for registering objects that should be started and stopped. The Lifecycle allows for
* three stages: Stage.INIT, Stage.NORMAL, and Stage.LAST. * four stages: Stage.INIT, Stage.NORMAL, Stage.SERVER, and Stage.ANNOUNCEMENTS.
* <p/> *
* Things added at Stage.INIT will be started first (in the order that they are added to the Lifecycle instance) and * Things added at Stage.INIT will be started first (in the order that they are added to the Lifecycle instance) and
* then things added at Stage.NORMAL, and finally, Stage.LAST will be started. * then things added at Stage.NORMAL, then Stage.SERVER, and finally, Stage.ANNOUNCEMENTS will be started.
* <p/> *
* The close operation goes in reverse order, starting with the last thing added at Stage.LAST and working backwards. * The close operation goes in reverse order, starting with the last thing added at Stage.ANNOUNCEMENTS and working
* <p/> * backwards.
*
* Conceptually, the stages have the following purposes:
* - Stage.INIT: Currently, this stage is used exclusively for log4j initialization, since almost everything needs
* logging and it should be the last thing to shutdown. Any sort of bootstrapping object that provides something that
* should be initialized before nearly all other Lifecycle objects could also belong here (if it doesn't need
* logging during start or stop).
* - Stage.NORMAL: This is the default stage. Most objects will probably make the most sense to be registered at
* this level, with the exception of any form of server or service announcements
* - Stage.SERVER: This lifecycle stage is intended for all 'server' objects, and currently only contains the Jetty
* module, but any sort of 'server' that expects most Lifecycle objects to be initialized by the time it starts, and
* still available at the time it stops can logically live in this stage.
* - Stage.ANNOUNCENTS: Any object which announces to a cluster this servers location belongs in this stage. By being
* last, we can be sure that all servers are initialized before we advertise the endpoint locations, and also can be
* sure that we un-announce these advertisements prior to the Stage.SERVER objects stop.
*
* There are two sets of methods to add things to the Lifecycle. One set that will just add instances and enforce that * There are two sets of methods to add things to the Lifecycle. One set that will just add instances and enforce that
* start() has not been called yet. The other set will add instances and, if the lifecycle is already started, start * start() has not been called yet. The other set will add instances and, if the lifecycle is already started, start
* them. * them.
@ -61,7 +76,8 @@ public class Lifecycle
{ {
INIT, INIT,
NORMAL, NORMAL,
LAST SERVER,
ANNOUNCEMENTS
} }
private enum State private enum State

View File

@ -172,25 +172,26 @@ public class LifecycleTest
lifecycle.addManagedInstance(new ObjectToBeLifecycled(0, startOrder, stopOrder)); lifecycle.addManagedInstance(new ObjectToBeLifecycled(0, startOrder, stopOrder));
lifecycle.addManagedInstance(new ObjectToBeLifecycled(1, startOrder, stopOrder), Lifecycle.Stage.NORMAL); lifecycle.addManagedInstance(new ObjectToBeLifecycled(1, startOrder, stopOrder), Lifecycle.Stage.NORMAL);
lifecycle.addManagedInstance(new ObjectToBeLifecycled(2, startOrder, stopOrder), Lifecycle.Stage.NORMAL); lifecycle.addManagedInstance(new ObjectToBeLifecycled(2, startOrder, stopOrder), Lifecycle.Stage.NORMAL);
lifecycle.addManagedInstance(new ObjectToBeLifecycled(3, startOrder, stopOrder), Lifecycle.Stage.LAST); lifecycle.addManagedInstance(new ObjectToBeLifecycled(3, startOrder, stopOrder), Lifecycle.Stage.ANNOUNCEMENTS);
lifecycle.addStartCloseInstance(new ObjectToBeLifecycled(4, startOrder, stopOrder)); lifecycle.addStartCloseInstance(new ObjectToBeLifecycled(4, startOrder, stopOrder));
lifecycle.addManagedInstance(new ObjectToBeLifecycled(5, startOrder, stopOrder)); lifecycle.addManagedInstance(new ObjectToBeLifecycled(5, startOrder, stopOrder));
lifecycle.addStartCloseInstance(new ObjectToBeLifecycled(6, startOrder, stopOrder), Lifecycle.Stage.LAST); lifecycle.addStartCloseInstance(new ObjectToBeLifecycled(6, startOrder, stopOrder), Lifecycle.Stage.ANNOUNCEMENTS);
lifecycle.addManagedInstance(new ObjectToBeLifecycled(7, startOrder, stopOrder)); lifecycle.addManagedInstance(new ObjectToBeLifecycled(7, startOrder, stopOrder));
lifecycle.addStartCloseInstance(new ObjectToBeLifecycled(8, startOrder, stopOrder), Lifecycle.Stage.INIT); lifecycle.addStartCloseInstance(new ObjectToBeLifecycled(8, startOrder, stopOrder), Lifecycle.Stage.INIT);
lifecycle.addStartCloseInstance(new ObjectToBeLifecycled(9, startOrder, stopOrder), Lifecycle.Stage.SERVER);
final List<Integer> expectedOrder = Arrays.asList(8, 0, 1, 2, 4, 5, 7, 3, 6); final List<Integer> expectedOrder = Arrays.asList(8, 0, 1, 2, 4, 5, 7, 9, 3, 6);
lifecycle.start(); lifecycle.start();
Assert.assertEquals(9, startOrder.size()); Assert.assertEquals(10, startOrder.size());
Assert.assertEquals(0, stopOrder.size()); Assert.assertEquals(0, stopOrder.size());
Assert.assertEquals(expectedOrder, startOrder); Assert.assertEquals(expectedOrder, startOrder);
lifecycle.stop(); lifecycle.stop();
Assert.assertEquals(9, startOrder.size()); Assert.assertEquals(10, startOrder.size());
Assert.assertEquals(9, stopOrder.size()); Assert.assertEquals(10, stopOrder.size());
Assert.assertEquals(Lists.reverse(expectedOrder), stopOrder); Assert.assertEquals(Lists.reverse(expectedOrder), stopOrder);
} }
@ -210,20 +211,28 @@ public class LifecycleTest
public void start() throws Exception public void start() throws Exception
{ {
lifecycle.addMaybeStartManagedInstance( lifecycle.addMaybeStartManagedInstance(
new ObjectToBeLifecycled(1, startOrder, stopOrder), Lifecycle.Stage.NORMAL new ObjectToBeLifecycled(1, startOrder, stopOrder),
Lifecycle.Stage.NORMAL
); );
lifecycle.addMaybeStartManagedInstance( lifecycle.addMaybeStartManagedInstance(
new ObjectToBeLifecycled(2, startOrder, stopOrder), Lifecycle.Stage.INIT new ObjectToBeLifecycled(2, startOrder, stopOrder),
Lifecycle.Stage.INIT
); );
lifecycle.addMaybeStartManagedInstance( lifecycle.addMaybeStartManagedInstance(
new ObjectToBeLifecycled(3, startOrder, stopOrder), Lifecycle.Stage.LAST new ObjectToBeLifecycled(3, startOrder, stopOrder),
Lifecycle.Stage.ANNOUNCEMENTS
); );
lifecycle.addMaybeStartStartCloseInstance(new ObjectToBeLifecycled(4, startOrder, stopOrder)); lifecycle.addMaybeStartStartCloseInstance(new ObjectToBeLifecycled(4, startOrder, stopOrder));
lifecycle.addMaybeStartManagedInstance(new ObjectToBeLifecycled(5, startOrder, stopOrder)); lifecycle.addMaybeStartManagedInstance(new ObjectToBeLifecycled(5, startOrder, stopOrder));
lifecycle.addMaybeStartStartCloseInstance( lifecycle.addMaybeStartStartCloseInstance(
new ObjectToBeLifecycled(6, startOrder, stopOrder), Lifecycle.Stage.LAST new ObjectToBeLifecycled(6, startOrder, stopOrder),
Lifecycle.Stage.ANNOUNCEMENTS
); );
lifecycle.addMaybeStartManagedInstance(new ObjectToBeLifecycled(7, startOrder, stopOrder)); lifecycle.addMaybeStartManagedInstance(new ObjectToBeLifecycled(7, startOrder, stopOrder));
lifecycle.addMaybeStartManagedInstance(
new ObjectToBeLifecycled(8, startOrder, stopOrder),
Lifecycle.Stage.SERVER
);
} }
@Override @Override
@ -234,8 +243,8 @@ public class LifecycleTest
} }
); );
final List<Integer> expectedOrder = Arrays.asList(0, 1, 2, 4, 5, 7, 3, 6); final List<Integer> expectedOrder = Arrays.asList(0, 1, 2, 4, 5, 7, 8, 3, 6);
final List<Integer> expectedStopOrder = Arrays.asList(6, 3, 7, 5, 4, 1, 0, 2); final List<Integer> expectedStopOrder = Arrays.asList(6, 3, 8, 7, 5, 4, 1, 0, 2);
lifecycle.start(); lifecycle.start();

View File

@ -27,7 +27,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.guice.ManageLifecycleLast; import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.indexing.materializedview.DerivativeDataSourceMetadata; import org.apache.druid.indexing.materializedview.DerivativeDataSourceMetadata;
import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
@ -65,7 +65,7 @@ import java.util.stream.Collectors;
* Read and store derivatives information from dataSource table frequently. * Read and store derivatives information from dataSource table frequently.
* When optimize query, DerivativesManager offers the information about derivatives. * When optimize query, DerivativesManager offers the information about derivatives.
*/ */
@ManageLifecycleLast @ManageLifecycle
public class DerivativeDataSourceManager public class DerivativeDataSourceManager
{ {
private static final EmittingLogger log = new EmittingLogger(DerivativeDataSourceManager.class); private static final EmittingLogger log = new EmittingLogger(DerivativeDataSourceManager.class);

View File

@ -387,16 +387,16 @@ public class Announcer
*/ */
public void unannounce(String path) public void unannounce(String path)
{ {
log.info("unannouncing [%s]", path);
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath(); final String parentPath = pathAndNode.getPath();
final ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath); final ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
if (subPaths == null || subPaths.remove(pathAndNode.getNode()) == null) { if (subPaths == null || subPaths.remove(pathAndNode.getNode()) == null) {
log.error("Path[%s] not announced, cannot unannounce.", path); log.debug("Path[%s] not announced, cannot unannounce.", path);
return; return;
} }
log.info("unannouncing [%s]", path);
try { try {
curator.inTransaction().delete().forPath(path).and().commit(); curator.inTransaction().delete().forPath(path).and().commit();

View File

@ -92,7 +92,7 @@ public class DiscoveryModule implements Module
* *
* That is, this module will announce the DruidNode instance returned by * That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class)) automatically. * injector.getInstance(Key.get(DruidNode.class)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle * Announcement will happen in the ANNOUNCEMENTS stage of the Lifecycle
* *
* @param binder the Binder to register with * @param binder the Binder to register with
*/ */
@ -106,7 +106,7 @@ public class DiscoveryModule implements Module
* *
* That is, this module will announce the DruidNode instance returned by * That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically. * injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle * Announcement will happen in the ANNOUNCEMENTS stage of the Lifecycle
* *
* @param annotation The annotation instance to use in finding the DruidNode instance, usually a Named annotation * @param annotation The annotation instance to use in finding the DruidNode instance, usually a Named annotation
*/ */
@ -120,7 +120,7 @@ public class DiscoveryModule implements Module
* *
* That is, this module will announce the DruidNode instance returned by * That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically. * injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle * Announcement will happen in the ANNOUNCEMENTS stage of the Lifecycle
* *
* @param binder the Binder to register with * @param binder the Binder to register with
* @param annotation The annotation class to use in finding the DruidNode instance * @param annotation The annotation class to use in finding the DruidNode instance
@ -135,7 +135,7 @@ public class DiscoveryModule implements Module
* *
* That is, this module will announce the DruidNode instance returned by * That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically. * injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle * Announcement will happen in the ANNOUNCEMENTS stage of the Lifecycle
* *
* @param binder the Binder to register with * @param binder the Binder to register with
* @param key The key to use in finding the DruidNode instance * @param key The key to use in finding the DruidNode instance
@ -251,7 +251,7 @@ public class DiscoveryModule implements Module
} }
} }
}, },
Lifecycle.Stage.LAST Lifecycle.Stage.ANNOUNCEMENTS
); );
return announcer; return announcer;

View File

@ -47,7 +47,7 @@ public class AnnouncerModule implements Module
} }
@Provides @Provides
@ManageLifecycle @ManageLifecycleAnnouncements
public Announcer getAnnouncer(CuratorFramework curator) public Announcer getAnnouncer(CuratorFramework curator)
{ {
return new Announcer(curator, Execs.singleThreaded("Announcer-%s")); return new Announcer(curator, Execs.singleThreaded("Announcer-%s"));

View File

@ -89,7 +89,7 @@ public class CuratorDataSegmentServerAnnouncer implements DataSegmentServerAnnou
} }
final String path = makeAnnouncementPath(); final String path = makeAnnouncementPath();
log.info("Unannouncing self[%s] at [%s]", server, path); log.debug("Unannouncing self[%s] at [%s]", server, path);
announcer.unannounce(path); announcer.unannounce(path);
announced = false; announced = false;

View File

@ -441,7 +441,7 @@ public class JettyServerModule extends JerseyServletModule
} }
} }
}, },
Lifecycle.Stage.LAST Lifecycle.Stage.SERVER
); );
return server; return server;

View File

@ -93,7 +93,7 @@ public abstract class ServerRunnable extends GuiceRunnable
/** /**
* This is a helper class used by CliXXX classes to announce {@link DiscoveryDruidNode} * This is a helper class used by CliXXX classes to announce {@link DiscoveryDruidNode}
* as part of {@link Lifecycle.Stage#LAST}. * as part of {@link Lifecycle.Stage#ANNOUNCEMENTS}.
*/ */
protected static class DiscoverySideEffectsProvider implements Provider<DiscoverySideEffectsProvider.Child> protected static class DiscoverySideEffectsProvider implements Provider<DiscoverySideEffectsProvider.Child>
{ {
@ -200,7 +200,7 @@ public abstract class ServerRunnable extends GuiceRunnable
announcer.unannounce(discoveryDruidNode); announcer.unannounce(discoveryDruidNode);
} }
}, },
Lifecycle.Stage.LAST Lifecycle.Stage.ANNOUNCEMENTS
); );
return new Child(); return new Child();