From f1e3518f37bd0399246a638adc3790ab07d3a2ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 16 Sep 2014 00:07:23 -0700 Subject: [PATCH] initialize segment view before announcing broker --- .../client/BatchServerInventoryView.java | 6 + .../io/druid/client/BrokerServerView.java | 6 + .../io/druid/client/ServerInventoryView.java | 16 +++ .../main/java/io/druid/client/ServerView.java | 8 ++ .../client/SingleServerInventoryView.java | 6 + .../inventory/CuratorInventoryManager.java | 112 ++++++++++++++---- .../CuratorInventoryManagerStrategy.java | 1 + .../coordination/broker/DruidBroker.java | 67 +++++++++++ .../CuratorInventoryManagerTest.java | 7 ++ .../src/main/java/io/druid/cli/CliBroker.java | 5 +- 10 files changed, 206 insertions(+), 28 deletions(-) create mode 100644 server/src/main/java/io/druid/server/coordination/broker/DruidBroker.java diff --git a/server/src/main/java/io/druid/client/BatchServerInventoryView.java b/server/src/main/java/io/druid/client/BatchServerInventoryView.java index a2a3c422aa1..e15e85e399a 100644 --- a/server/src/main/java/io/druid/client/BatchServerInventoryView.java +++ b/server/src/main/java/io/druid/client/BatchServerInventoryView.java @@ -174,6 +174,12 @@ public class BatchServerInventoryView extends ServerInventoryView implements ServerView, { return removeInnerInventory(container, inventoryKey); } + + @Override + public void inventoryInitialized() + { + log.info("Inventory Initialized"); + runSegmentCallbacks( + new Function() + { + @Override + public CallbackAction apply(SegmentCallback input) + { + return input.segmentViewInitialized(); + } + } + ); + } } ); } diff --git a/server/src/main/java/io/druid/client/ServerView.java b/server/src/main/java/io/druid/client/ServerView.java index 67ad13efab4..0eb6e392d46 100644 --- a/server/src/main/java/io/druid/client/ServerView.java +++ b/server/src/main/java/io/druid/client/ServerView.java @@ -91,6 +91,8 @@ public interface ServerView * should remain registered. */ public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment); + + public CallbackAction segmentViewInitialized(); } public static abstract class BaseSegmentCallback implements SegmentCallback @@ -106,5 +108,11 @@ public interface ServerView { return CallbackAction.CONTINUE; } + + @Override + public CallbackAction segmentViewInitialized() + { + return CallbackAction.CONTINUE; + } } } diff --git a/server/src/main/java/io/druid/client/SingleServerInventoryView.java b/server/src/main/java/io/druid/client/SingleServerInventoryView.java index 4acdbabf786..4f1d91bbf61 100644 --- a/server/src/main/java/io/druid/client/SingleServerInventoryView.java +++ b/server/src/main/java/io/druid/client/SingleServerInventoryView.java @@ -138,6 +138,12 @@ public class SingleServerInventoryView extends ServerInventoryView return action; } } + + @Override + public CallbackAction segmentViewInitialized() + { + return callback.segmentViewInitialized(); + } } ); } diff --git a/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java b/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java index d32bdd147f3..d61265b501c 100644 --- a/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java +++ b/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.MapMaker; +import com.google.common.collect.Sets; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; @@ -37,6 +38,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import java.io.IOException; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -62,6 +64,7 @@ public class CuratorInventoryManager private final CuratorInventoryManagerStrategy strategy; private final ConcurrentMap containers; + private final Set uninitializedInventory; private final PathChildrenCacheFactory cacheFactory; private volatile PathChildrenCache childrenCache; @@ -78,6 +81,7 @@ public class CuratorInventoryManager this.strategy = strategy; this.containers = new MapMaker().makeMap(); + this.uninitializedInventory = Sets.newConcurrentHashSet(); this.cacheFactory = new SimplePathChildrenCacheFactory(true, true, new ShutdownNowIgnoringExecutorService(exec)); } @@ -96,7 +100,7 @@ public class CuratorInventoryManager childrenCache.getListenable().addListener(new ContainerCacheListener()); try { - childrenCache.start(); + childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); } catch (Exception e) { synchronized (lock) { @@ -165,6 +169,7 @@ public class CuratorInventoryManager { private final AtomicReference container; private final PathChildrenCache cache; + private boolean initialized = false; ContainerHolder( ContainerClass container, @@ -193,21 +198,19 @@ public class CuratorInventoryManager private class ContainerCacheListener implements PathChildrenCacheListener { + private volatile boolean containersInitialized = false; + private volatile boolean doneInitializing = false; + @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - final ChildData child = event.getData(); - if (child == null) { - return; - } - - final String containerKey = ZKPaths.getNodeFromPath(child.getPath()); - final ContainerClass container; - switch (event.getType()) { case CHILD_ADDED: synchronized (lock) { - container = strategy.deserializeContainer(child.getData()); + final ChildData child = event.getData(); + final String containerKey = ZKPaths.getNodeFromPath(child.getPath()); + + final ContainerClass container = strategy.deserializeContainer(child.getData()); // This would normally be a race condition, but the only thing that should be mutating the containers // map is this listener, which should never run concurrently. If the same container is going to disappear @@ -222,7 +225,7 @@ public class CuratorInventoryManager containers.put(containerKey, new ContainerHolder(container, inventoryCache)); log.info("Starting inventory cache for %s, inventoryPath %s", containerKey, inventoryPath); - inventoryCache.start(); + inventoryCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); strategy.newContainer(container); } @@ -230,6 +233,9 @@ public class CuratorInventoryManager } case CHILD_REMOVED: synchronized (lock) { + final ChildData child = event.getData(); + final String containerKey = ZKPaths.getNodeFromPath(child.getPath()); + final ContainerHolder removed = containers.remove(containerKey); if (removed == null) { log.warn("Container[%s] removed that wasn't a container!?", child.getPath()); @@ -243,11 +249,19 @@ public class CuratorInventoryManager removed.getCache().close(); strategy.deadContainer(removed.getContainer()); + // also remove node from uninitilized, in case a nodes gets removed while we are starting up + synchronized (removed) { + markInventoryInitialized(removed); + } + break; } case CHILD_UPDATED: synchronized (lock) { - container = strategy.deserializeContainer(child.getData()); + final ChildData child = event.getData(); + final String containerKey = ZKPaths.getNodeFromPath(child.getPath()); + + final ContainerClass container = strategy.deserializeContainer(child.getData()); ContainerHolder oldContainer = containers.get(containerKey); if (oldContainer == null) { @@ -260,6 +274,41 @@ public class CuratorInventoryManager break; } + case INITIALIZED: + synchronized (lock) { + // must await initialized of all containerholders + for(ContainerHolder holder : containers.values()) { + synchronized (holder) { + if(!holder.initialized) { + uninitializedInventory.add(holder); + } + } + } + containersInitialized = true; + maybeDoneInitializing(); + break; + } + } + } + + // must be run in synchronized(lock) { synchronized(holder) { ... } } block + private void markInventoryInitialized(final ContainerHolder holder) + { + holder.initialized = true; + uninitializedInventory.remove(holder); + maybeDoneInitializing(); + } + + private void maybeDoneInitializing() + { + if(doneInitializing) { + return; + } + + // only fire if we are done initializing the parent PathChildrenCache + if(containersInitialized && uninitializedInventory.isEmpty()) { + doneInitializing = true; + strategy.inventoryInitialized(); } } @@ -279,30 +328,28 @@ public class CuratorInventoryManager @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - final ChildData child = event.getData(); - - if (child == null) { - return; - } - final ContainerHolder holder = containers.get(containerKey); - if (holder == null) { return; } - final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath()); - switch (event.getType()) { - case CHILD_ADDED: + case CHILD_ADDED: { + final ChildData child = event.getData(); + final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath()); + final InventoryClass addedInventory = strategy.deserializeInventory(child.getData()); synchronized (holder) { holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, addedInventory)); } - break; - case CHILD_UPDATED: + } + + case CHILD_UPDATED: { + final ChildData child = event.getData(); + final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath()); + final InventoryClass updatedInventory = strategy.deserializeInventory(child.getData()); synchronized (holder) { @@ -310,11 +357,26 @@ public class CuratorInventoryManager } break; - case CHILD_REMOVED: + } + + case CHILD_REMOVED: { + final ChildData child = event.getData(); + final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath()); + synchronized (holder) { holder.setContainer(strategy.removeInventory(holder.getContainer(), inventoryKey)); } + break; + } + case INITIALIZED: + // make sure to acquire locks in (lock -> holder) order + synchronized (lock) { + synchronized (holder) { + markInventoryInitialized(holder); + } + } + break; } } diff --git a/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManagerStrategy.java b/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManagerStrategy.java index 785e35534df..1e53ff6e8c3 100644 --- a/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManagerStrategy.java +++ b/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManagerStrategy.java @@ -35,4 +35,5 @@ public interface CuratorInventoryManagerStrategy public ContainerClass addInventory(ContainerClass container, String inventoryKey, InventoryClass inventory); public ContainerClass updateInventory(ContainerClass container, String inventoryKey, InventoryClass inventory); public ContainerClass removeInventory(ContainerClass container, String inventoryKey); + public void inventoryInitialized(); } diff --git a/server/src/main/java/io/druid/server/coordination/broker/DruidBroker.java b/server/src/main/java/io/druid/server/coordination/broker/DruidBroker.java new file mode 100644 index 00000000000..ab44dbe8f72 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordination/broker/DruidBroker.java @@ -0,0 +1,67 @@ +package io.druid.server.coordination.broker; + +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Inject; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import io.druid.client.ServerInventoryView; +import io.druid.client.ServerView; +import io.druid.curator.discovery.ServiceAnnouncer; +import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.Self; +import io.druid.server.DruidNode; + +@ManageLifecycle +public class DruidBroker +{ + private final DruidNode self; + private final ServiceAnnouncer serviceAnnouncer; + private volatile boolean started = false; + + @Inject + public DruidBroker( + final ServerInventoryView serverInventoryView, + final @Self DruidNode self, + final ServiceAnnouncer serviceAnnouncer + ) + { + this.self = self; + this.serviceAnnouncer = serviceAnnouncer; + + serverInventoryView.registerSegmentCallback( + MoreExecutors.sameThreadExecutor(), + new ServerView.BaseSegmentCallback() + { + @Override + public ServerView.CallbackAction segmentViewInitialized() + { + serviceAnnouncer.announce(self); + return ServerView.CallbackAction.UNREGISTER; + } + } + ); + } + + @LifecycleStart + public void start() + { + synchronized (self) { + if(started) { + return; + } + started = true; + } + } + + @LifecycleStop + public void stop() + { + synchronized (self) { + if (!started) { + return; + } + serviceAnnouncer.unannounce(self); + started = false; + } + } +} diff --git a/server/src/test/java/io/druid/curator/inventory/CuratorInventoryManagerTest.java b/server/src/test/java/io/druid/curator/inventory/CuratorInventoryManagerTest.java index e325b2c59df..18b921fc827 100644 --- a/server/src/test/java/io/druid/curator/inventory/CuratorInventoryManagerTest.java +++ b/server/src/test/java/io/druid/curator/inventory/CuratorInventoryManagerTest.java @@ -176,6 +176,7 @@ public class CuratorInventoryManagerTest extends io.druid.curator.CuratorTestBas private volatile CountDownLatch deadContainerLatch = null; private volatile CountDownLatch newInventoryLatch = null; private volatile CountDownLatch deadInventoryLatch = null; + private volatile boolean initialized = false; @Override public Map deserializeContainer(byte[] bytes) @@ -271,5 +272,11 @@ public class CuratorInventoryManagerTest extends io.druid.curator.CuratorTestBas { this.deadInventoryLatch = deadInventoryLatch; } + + @Override + public void inventoryInitialized() + { + initialized = true; + } } } diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index f23102bccdd..f70ea8837ad 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -34,19 +34,18 @@ import io.druid.client.cache.CacheProvider; import io.druid.client.selector.CustomTierSelectorStrategyConfig; import io.druid.client.selector.ServerSelectorStrategy; import io.druid.client.selector.TierSelectorStrategy; -import io.druid.curator.discovery.DiscoveryModule; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; -import io.druid.guice.annotations.Self; import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChestWarehouse; import io.druid.server.ClientInfoResource; import io.druid.server.ClientQuerySegmentWalker; import io.druid.server.QueryResource; +import io.druid.server.coordination.broker.DruidBroker; import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.metrics.MetricsModule; import org.eclipse.jetty.server.Server; @@ -95,8 +94,8 @@ public class CliBroker extends ServerRunnable Jerseys.addResource(binder, QueryResource.class); Jerseys.addResource(binder, ClientInfoResource.class); LifecycleModule.register(binder, QueryResource.class); + LifecycleModule.register(binder, DruidBroker.class); - DiscoveryModule.register(binder, Self.class); MetricsModule.register(binder, CacheMonitor.class); LifecycleModule.register(binder, Server.class);