initialize segment view before announcing broker

This commit is contained in:
Xavier Léauté 2014-09-16 00:07:23 -07:00
parent 4b4de83f21
commit f1e3518f37
10 changed files with 206 additions and 28 deletions

View File

@ -174,6 +174,12 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
}
return action;
}
@Override
public CallbackAction segmentViewInitialized()
{
return callback.segmentViewInitialized();
}
}
);
}

View File

@ -104,6 +104,12 @@ public class BrokerServerView implements TimelineServerView
serverRemovedSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public CallbackAction segmentViewInitialized()
{
return ServerView.CallbackAction.CONTINUE;
}
}
);

View File

@ -168,6 +168,22 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
{
return removeInnerInventory(container, inventoryKey);
}
@Override
public void inventoryInitialized()
{
log.info("Inventory Initialized");
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentViewInitialized();
}
}
);
}
}
);
}

View File

@ -91,6 +91,8 @@ public interface ServerView
* should remain registered.
*/
public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment);
public CallbackAction segmentViewInitialized();
}
public static abstract class BaseSegmentCallback implements SegmentCallback
@ -106,5 +108,11 @@ public interface ServerView
{
return CallbackAction.CONTINUE;
}
@Override
public CallbackAction segmentViewInitialized()
{
return CallbackAction.CONTINUE;
}
}
}

View File

@ -138,6 +138,12 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
return action;
}
}
@Override
public CallbackAction segmentViewInitialized()
{
return callback.segmentViewInitialized();
}
}
);
}

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
@ -37,6 +38,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
@ -62,6 +64,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
private final CuratorInventoryManagerStrategy<ContainerClass, InventoryClass> strategy;
private final ConcurrentMap<String, ContainerHolder> containers;
private final Set<ContainerHolder> uninitializedInventory;
private final PathChildrenCacheFactory cacheFactory;
private volatile PathChildrenCache childrenCache;
@ -78,6 +81,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
this.strategy = strategy;
this.containers = new MapMaker().makeMap();
this.uninitializedInventory = Sets.newConcurrentHashSet();
this.cacheFactory = new SimplePathChildrenCacheFactory(true, true, new ShutdownNowIgnoringExecutorService(exec));
}
@ -96,7 +100,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
childrenCache.getListenable().addListener(new ContainerCacheListener());
try {
childrenCache.start();
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
}
catch (Exception e) {
synchronized (lock) {
@ -165,6 +169,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
{
private final AtomicReference<ContainerClass> container;
private final PathChildrenCache cache;
private boolean initialized = false;
ContainerHolder(
ContainerClass container,
@ -193,21 +198,19 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
private class ContainerCacheListener implements PathChildrenCacheListener
{
private volatile boolean containersInitialized = false;
private volatile boolean doneInitializing = false;
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
final ChildData child = event.getData();
if (child == null) {
return;
}
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
final ContainerClass container;
switch (event.getType()) {
case CHILD_ADDED:
synchronized (lock) {
container = strategy.deserializeContainer(child.getData());
final ChildData child = event.getData();
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
final ContainerClass container = strategy.deserializeContainer(child.getData());
// This would normally be a race condition, but the only thing that should be mutating the containers
// map is this listener, which should never run concurrently. If the same container is going to disappear
@ -222,7 +225,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
containers.put(containerKey, new ContainerHolder(container, inventoryCache));
log.info("Starting inventory cache for %s, inventoryPath %s", containerKey, inventoryPath);
inventoryCache.start();
inventoryCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
strategy.newContainer(container);
}
@ -230,6 +233,9 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
}
case CHILD_REMOVED:
synchronized (lock) {
final ChildData child = event.getData();
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
final ContainerHolder removed = containers.remove(containerKey);
if (removed == null) {
log.warn("Container[%s] removed that wasn't a container!?", child.getPath());
@ -243,11 +249,19 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
removed.getCache().close();
strategy.deadContainer(removed.getContainer());
// also remove node from uninitilized, in case a nodes gets removed while we are starting up
synchronized (removed) {
markInventoryInitialized(removed);
}
break;
}
case CHILD_UPDATED:
synchronized (lock) {
container = strategy.deserializeContainer(child.getData());
final ChildData child = event.getData();
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
final ContainerClass container = strategy.deserializeContainer(child.getData());
ContainerHolder oldContainer = containers.get(containerKey);
if (oldContainer == null) {
@ -260,6 +274,41 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
break;
}
case INITIALIZED:
synchronized (lock) {
// must await initialized of all containerholders
for(ContainerHolder holder : containers.values()) {
synchronized (holder) {
if(!holder.initialized) {
uninitializedInventory.add(holder);
}
}
}
containersInitialized = true;
maybeDoneInitializing();
break;
}
}
}
// must be run in synchronized(lock) { synchronized(holder) { ... } } block
private void markInventoryInitialized(final ContainerHolder holder)
{
holder.initialized = true;
uninitializedInventory.remove(holder);
maybeDoneInitializing();
}
private void maybeDoneInitializing()
{
if(doneInitializing) {
return;
}
// only fire if we are done initializing the parent PathChildrenCache
if(containersInitialized && uninitializedInventory.isEmpty()) {
doneInitializing = true;
strategy.inventoryInitialized();
}
}
@ -279,30 +328,28 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
final ChildData child = event.getData();
if (child == null) {
return;
}
final ContainerHolder holder = containers.get(containerKey);
if (holder == null) {
return;
}
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_ADDED: {
final ChildData child = event.getData();
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
final InventoryClass addedInventory = strategy.deserializeInventory(child.getData());
synchronized (holder) {
holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, addedInventory));
}
break;
case CHILD_UPDATED:
}
case CHILD_UPDATED: {
final ChildData child = event.getData();
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
final InventoryClass updatedInventory = strategy.deserializeInventory(child.getData());
synchronized (holder) {
@ -310,11 +357,26 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
}
break;
case CHILD_REMOVED:
}
case CHILD_REMOVED: {
final ChildData child = event.getData();
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
synchronized (holder) {
holder.setContainer(strategy.removeInventory(holder.getContainer(), inventoryKey));
}
break;
}
case INITIALIZED:
// make sure to acquire locks in (lock -> holder) order
synchronized (lock) {
synchronized (holder) {
markInventoryInitialized(holder);
}
}
break;
}
}

View File

@ -35,4 +35,5 @@ public interface CuratorInventoryManagerStrategy<ContainerClass, InventoryClass>
public ContainerClass addInventory(ContainerClass container, String inventoryKey, InventoryClass inventory);
public ContainerClass updateInventory(ContainerClass container, String inventoryKey, InventoryClass inventory);
public ContainerClass removeInventory(ContainerClass container, String inventoryKey);
public void inventoryInitialized();
}

View File

@ -0,0 +1,67 @@
package io.druid.server.coordination.broker;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import io.druid.client.ServerInventoryView;
import io.druid.client.ServerView;
import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
@ManageLifecycle
public class DruidBroker
{
private final DruidNode self;
private final ServiceAnnouncer serviceAnnouncer;
private volatile boolean started = false;
@Inject
public DruidBroker(
final ServerInventoryView serverInventoryView,
final @Self DruidNode self,
final ServiceAnnouncer serviceAnnouncer
)
{
this.self = self;
this.serviceAnnouncer = serviceAnnouncer;
serverInventoryView.registerSegmentCallback(
MoreExecutors.sameThreadExecutor(),
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentViewInitialized()
{
serviceAnnouncer.announce(self);
return ServerView.CallbackAction.UNREGISTER;
}
}
);
}
@LifecycleStart
public void start()
{
synchronized (self) {
if(started) {
return;
}
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (self) {
if (!started) {
return;
}
serviceAnnouncer.unannounce(self);
started = false;
}
}
}

View File

@ -176,6 +176,7 @@ public class CuratorInventoryManagerTest extends io.druid.curator.CuratorTestBas
private volatile CountDownLatch deadContainerLatch = null;
private volatile CountDownLatch newInventoryLatch = null;
private volatile CountDownLatch deadInventoryLatch = null;
private volatile boolean initialized = false;
@Override
public Map<String, Integer> deserializeContainer(byte[] bytes)
@ -271,5 +272,11 @@ public class CuratorInventoryManagerTest extends io.druid.curator.CuratorTestBas
{
this.deadInventoryLatch = deadInventoryLatch;
}
@Override
public void inventoryInitialized()
{
initialized = true;
}
}
}

View File

@ -34,19 +34,18 @@ import io.druid.client.cache.CacheProvider;
import io.druid.client.selector.CustomTierSelectorStrategyConfig;
import io.druid.client.selector.ServerSelectorStrategy;
import io.druid.client.selector.TierSelectorStrategy;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.ClientInfoResource;
import io.druid.server.ClientQuerySegmentWalker;
import io.druid.server.QueryResource;
import io.druid.server.coordination.broker.DruidBroker;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.metrics.MetricsModule;
import org.eclipse.jetty.server.Server;
@ -95,8 +94,8 @@ public class CliBroker extends ServerRunnable
Jerseys.addResource(binder, QueryResource.class);
Jerseys.addResource(binder, ClientInfoResource.class);
LifecycleModule.register(binder, QueryResource.class);
LifecycleModule.register(binder, DruidBroker.class);
DiscoveryModule.register(binder, Self.class);
MetricsModule.register(binder, CacheMonitor.class);
LifecycleModule.register(binder, Server.class);