Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
fjy 2014-09-16 15:18:34 -07:00
commit 4b5632008e
10 changed files with 206 additions and 28 deletions

View File

@ -174,6 +174,12 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
} }
return action; return action;
} }
@Override
public CallbackAction segmentViewInitialized()
{
return callback.segmentViewInitialized();
}
} }
); );
} }

View File

@ -104,6 +104,12 @@ public class BrokerServerView implements TimelineServerView
serverRemovedSegment(server, segment); serverRemovedSegment(server, segment);
return ServerView.CallbackAction.CONTINUE; return ServerView.CallbackAction.CONTINUE;
} }
@Override
public CallbackAction segmentViewInitialized()
{
return ServerView.CallbackAction.CONTINUE;
}
} }
); );

View File

@ -168,6 +168,22 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
{ {
return removeInnerInventory(container, inventoryKey); return removeInnerInventory(container, inventoryKey);
} }
@Override
public void inventoryInitialized()
{
log.info("Inventory Initialized");
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentViewInitialized();
}
}
);
}
} }
); );
} }

View File

@ -91,6 +91,8 @@ public interface ServerView
* should remain registered. * should remain registered.
*/ */
public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment); public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment);
public CallbackAction segmentViewInitialized();
} }
public static abstract class BaseSegmentCallback implements SegmentCallback public static abstract class BaseSegmentCallback implements SegmentCallback
@ -106,5 +108,11 @@ public interface ServerView
{ {
return CallbackAction.CONTINUE; return CallbackAction.CONTINUE;
} }
@Override
public CallbackAction segmentViewInitialized()
{
return CallbackAction.CONTINUE;
}
} }
} }

View File

@ -138,6 +138,12 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
return action; return action;
} }
} }
@Override
public CallbackAction segmentViewInitialized()
{
return callback.segmentViewInitialized();
}
} }
); );
} }

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker; import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -37,6 +38,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import java.io.IOException; import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -62,6 +64,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
private final CuratorInventoryManagerStrategy<ContainerClass, InventoryClass> strategy; private final CuratorInventoryManagerStrategy<ContainerClass, InventoryClass> strategy;
private final ConcurrentMap<String, ContainerHolder> containers; private final ConcurrentMap<String, ContainerHolder> containers;
private final Set<ContainerHolder> uninitializedInventory;
private final PathChildrenCacheFactory cacheFactory; private final PathChildrenCacheFactory cacheFactory;
private volatile PathChildrenCache childrenCache; private volatile PathChildrenCache childrenCache;
@ -78,6 +81,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
this.strategy = strategy; this.strategy = strategy;
this.containers = new MapMaker().makeMap(); this.containers = new MapMaker().makeMap();
this.uninitializedInventory = Sets.newConcurrentHashSet();
this.cacheFactory = new SimplePathChildrenCacheFactory(true, true, new ShutdownNowIgnoringExecutorService(exec)); this.cacheFactory = new SimplePathChildrenCacheFactory(true, true, new ShutdownNowIgnoringExecutorService(exec));
} }
@ -96,7 +100,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
childrenCache.getListenable().addListener(new ContainerCacheListener()); childrenCache.getListenable().addListener(new ContainerCacheListener());
try { try {
childrenCache.start(); childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
} }
catch (Exception e) { catch (Exception e) {
synchronized (lock) { synchronized (lock) {
@ -165,6 +169,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
{ {
private final AtomicReference<ContainerClass> container; private final AtomicReference<ContainerClass> container;
private final PathChildrenCache cache; private final PathChildrenCache cache;
private boolean initialized = false;
ContainerHolder( ContainerHolder(
ContainerClass container, ContainerClass container,
@ -193,21 +198,19 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
private class ContainerCacheListener implements PathChildrenCacheListener private class ContainerCacheListener implements PathChildrenCacheListener
{ {
private volatile boolean containersInitialized = false;
private volatile boolean doneInitializing = false;
@Override @Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{ {
final ChildData child = event.getData();
if (child == null) {
return;
}
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
final ContainerClass container;
switch (event.getType()) { switch (event.getType()) {
case CHILD_ADDED: case CHILD_ADDED:
synchronized (lock) { synchronized (lock) {
container = strategy.deserializeContainer(child.getData()); final ChildData child = event.getData();
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
final ContainerClass container = strategy.deserializeContainer(child.getData());
// This would normally be a race condition, but the only thing that should be mutating the containers // This would normally be a race condition, but the only thing that should be mutating the containers
// map is this listener, which should never run concurrently. If the same container is going to disappear // map is this listener, which should never run concurrently. If the same container is going to disappear
@ -222,7 +225,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
containers.put(containerKey, new ContainerHolder(container, inventoryCache)); containers.put(containerKey, new ContainerHolder(container, inventoryCache));
log.info("Starting inventory cache for %s, inventoryPath %s", containerKey, inventoryPath); log.info("Starting inventory cache for %s, inventoryPath %s", containerKey, inventoryPath);
inventoryCache.start(); inventoryCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
strategy.newContainer(container); strategy.newContainer(container);
} }
@ -230,6 +233,9 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
} }
case CHILD_REMOVED: case CHILD_REMOVED:
synchronized (lock) { synchronized (lock) {
final ChildData child = event.getData();
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
final ContainerHolder removed = containers.remove(containerKey); final ContainerHolder removed = containers.remove(containerKey);
if (removed == null) { if (removed == null) {
log.warn("Container[%s] removed that wasn't a container!?", child.getPath()); log.warn("Container[%s] removed that wasn't a container!?", child.getPath());
@ -243,11 +249,19 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
removed.getCache().close(); removed.getCache().close();
strategy.deadContainer(removed.getContainer()); strategy.deadContainer(removed.getContainer());
// also remove node from uninitilized, in case a nodes gets removed while we are starting up
synchronized (removed) {
markInventoryInitialized(removed);
}
break; break;
} }
case CHILD_UPDATED: case CHILD_UPDATED:
synchronized (lock) { synchronized (lock) {
container = strategy.deserializeContainer(child.getData()); final ChildData child = event.getData();
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
final ContainerClass container = strategy.deserializeContainer(child.getData());
ContainerHolder oldContainer = containers.get(containerKey); ContainerHolder oldContainer = containers.get(containerKey);
if (oldContainer == null) { if (oldContainer == null) {
@ -260,6 +274,41 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
break; break;
} }
case INITIALIZED:
synchronized (lock) {
// must await initialized of all containerholders
for(ContainerHolder holder : containers.values()) {
synchronized (holder) {
if(!holder.initialized) {
uninitializedInventory.add(holder);
}
}
}
containersInitialized = true;
maybeDoneInitializing();
break;
}
}
}
// must be run in synchronized(lock) { synchronized(holder) { ... } } block
private void markInventoryInitialized(final ContainerHolder holder)
{
holder.initialized = true;
uninitializedInventory.remove(holder);
maybeDoneInitializing();
}
private void maybeDoneInitializing()
{
if(doneInitializing) {
return;
}
// only fire if we are done initializing the parent PathChildrenCache
if(containersInitialized && uninitializedInventory.isEmpty()) {
doneInitializing = true;
strategy.inventoryInitialized();
} }
} }
@ -279,30 +328,28 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
@Override @Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{ {
final ChildData child = event.getData();
if (child == null) {
return;
}
final ContainerHolder holder = containers.get(containerKey); final ContainerHolder holder = containers.get(containerKey);
if (holder == null) { if (holder == null) {
return; return;
} }
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
switch (event.getType()) { switch (event.getType()) {
case CHILD_ADDED: case CHILD_ADDED: {
final ChildData child = event.getData();
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
final InventoryClass addedInventory = strategy.deserializeInventory(child.getData()); final InventoryClass addedInventory = strategy.deserializeInventory(child.getData());
synchronized (holder) { synchronized (holder) {
holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, addedInventory)); holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, addedInventory));
} }
break; break;
case CHILD_UPDATED: }
case CHILD_UPDATED: {
final ChildData child = event.getData();
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
final InventoryClass updatedInventory = strategy.deserializeInventory(child.getData()); final InventoryClass updatedInventory = strategy.deserializeInventory(child.getData());
synchronized (holder) { synchronized (holder) {
@ -310,11 +357,26 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
} }
break; break;
case CHILD_REMOVED: }
case CHILD_REMOVED: {
final ChildData child = event.getData();
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
synchronized (holder) { synchronized (holder) {
holder.setContainer(strategy.removeInventory(holder.getContainer(), inventoryKey)); holder.setContainer(strategy.removeInventory(holder.getContainer(), inventoryKey));
} }
break;
}
case INITIALIZED:
// make sure to acquire locks in (lock -> holder) order
synchronized (lock) {
synchronized (holder) {
markInventoryInitialized(holder);
}
}
break; break;
} }
} }

View File

@ -35,4 +35,5 @@ public interface CuratorInventoryManagerStrategy<ContainerClass, InventoryClass>
public ContainerClass addInventory(ContainerClass container, String inventoryKey, InventoryClass inventory); public ContainerClass addInventory(ContainerClass container, String inventoryKey, InventoryClass inventory);
public ContainerClass updateInventory(ContainerClass container, String inventoryKey, InventoryClass inventory); public ContainerClass updateInventory(ContainerClass container, String inventoryKey, InventoryClass inventory);
public ContainerClass removeInventory(ContainerClass container, String inventoryKey); public ContainerClass removeInventory(ContainerClass container, String inventoryKey);
public void inventoryInitialized();
} }

View File

@ -0,0 +1,67 @@
package io.druid.server.coordination.broker;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import io.druid.client.ServerInventoryView;
import io.druid.client.ServerView;
import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
@ManageLifecycle
public class DruidBroker
{
private final DruidNode self;
private final ServiceAnnouncer serviceAnnouncer;
private volatile boolean started = false;
@Inject
public DruidBroker(
final ServerInventoryView serverInventoryView,
final @Self DruidNode self,
final ServiceAnnouncer serviceAnnouncer
)
{
this.self = self;
this.serviceAnnouncer = serviceAnnouncer;
serverInventoryView.registerSegmentCallback(
MoreExecutors.sameThreadExecutor(),
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentViewInitialized()
{
serviceAnnouncer.announce(self);
return ServerView.CallbackAction.UNREGISTER;
}
}
);
}
@LifecycleStart
public void start()
{
synchronized (self) {
if(started) {
return;
}
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (self) {
if (!started) {
return;
}
serviceAnnouncer.unannounce(self);
started = false;
}
}
}

View File

@ -176,6 +176,7 @@ public class CuratorInventoryManagerTest extends io.druid.curator.CuratorTestBas
private volatile CountDownLatch deadContainerLatch = null; private volatile CountDownLatch deadContainerLatch = null;
private volatile CountDownLatch newInventoryLatch = null; private volatile CountDownLatch newInventoryLatch = null;
private volatile CountDownLatch deadInventoryLatch = null; private volatile CountDownLatch deadInventoryLatch = null;
private volatile boolean initialized = false;
@Override @Override
public Map<String, Integer> deserializeContainer(byte[] bytes) public Map<String, Integer> deserializeContainer(byte[] bytes)
@ -271,5 +272,11 @@ public class CuratorInventoryManagerTest extends io.druid.curator.CuratorTestBas
{ {
this.deadInventoryLatch = deadInventoryLatch; this.deadInventoryLatch = deadInventoryLatch;
} }
@Override
public void inventoryInitialized()
{
initialized = true;
}
} }
} }

View File

@ -34,19 +34,18 @@ import io.druid.client.cache.CacheProvider;
import io.druid.client.selector.CustomTierSelectorStrategyConfig; import io.druid.client.selector.CustomTierSelectorStrategyConfig;
import io.druid.client.selector.ServerSelectorStrategy; import io.druid.client.selector.ServerSelectorStrategy;
import io.druid.client.selector.TierSelectorStrategy; import io.druid.client.selector.TierSelectorStrategy;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.Jerseys; import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton; import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule; import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle; import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse; import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.ClientInfoResource; import io.druid.server.ClientInfoResource;
import io.druid.server.ClientQuerySegmentWalker; import io.druid.server.ClientQuerySegmentWalker;
import io.druid.server.QueryResource; import io.druid.server.QueryResource;
import io.druid.server.coordination.broker.DruidBroker;
import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.metrics.MetricsModule; import io.druid.server.metrics.MetricsModule;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -95,8 +94,8 @@ public class CliBroker extends ServerRunnable
Jerseys.addResource(binder, QueryResource.class); Jerseys.addResource(binder, QueryResource.class);
Jerseys.addResource(binder, ClientInfoResource.class); Jerseys.addResource(binder, ClientInfoResource.class);
LifecycleModule.register(binder, QueryResource.class); LifecycleModule.register(binder, QueryResource.class);
LifecycleModule.register(binder, DruidBroker.class);
DiscoveryModule.register(binder, Self.class);
MetricsModule.register(binder, CacheMonitor.class); MetricsModule.register(binder, CacheMonitor.class);
LifecycleModule.register(binder, Server.class); LifecycleModule.register(binder, Server.class);