more logging for curator events

This commit is contained in:
fjy 2013-05-23 11:00:12 -06:00
parent 27d4f46156
commit 9b6098f778
3 changed files with 68 additions and 48 deletions

View File

@ -127,7 +127,7 @@ public class BrokerServerView implements TimelineServerView
}
}
private void addServer(DruidServer server)
private QueryableDruidServer addServer(DruidServer server)
{
QueryableDruidServer exists = clients.put(
server.getName(),
@ -136,6 +136,8 @@ public class BrokerServerView implements TimelineServerView
if (exists != null) {
log.warn("QueryRunner for server[%s] already existed!?", server);
}
return exists;
}
private DirectDruidClient makeDirectClient(DruidServer server)
@ -143,12 +145,13 @@ public class BrokerServerView implements TimelineServerView
return new DirectDruidClient(warehose, smileMapper, httpClient, server.getHost());
}
private void removeServer(DruidServer server)
private QueryableDruidServer removeServer(DruidServer server)
{
clients.remove(server.getName());
QueryableDruidServer retVal = clients.remove(server.getName());
for (DataSegment segment : server.getSegments().values()) {
serverRemovedSegment(server, segment);
}
return retVal;
}
private void serverAddedSegment(final DruidServer server, final DataSegment segment)
@ -171,10 +174,11 @@ public class BrokerServerView implements TimelineServerView
selectors.put(segmentId, selector);
}
if (!clients.containsKey(server.getName())) {
addServer(server);
QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (queryableDruidServer == null) {
queryableDruidServer = addServer(server);
}
selector.addServer(clients.get(server.getName()));
selector.addServer(queryableDruidServer);
}
}
@ -236,6 +240,7 @@ public class BrokerServerView implements TimelineServerView
QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (queryableDruidServer == null) {
log.error("WTF?! No QueryableDruidServer found for %s", server.getName());
return null;
}
return queryableDruidServer.getClient();
}

View File

@ -172,6 +172,16 @@ public class ServerInventoryView implements ServerView, InventoryView
final DataSegment segment = container.getSegment(inventoryKey);
final DruidServer retVal = container.removeDataSegment(inventoryKey);
if (segment == null) {
log.warn(
"Not running callbacks or cleanup for non-existing segment[%s] on server[%s]",
inventoryKey,
container.getName()
);
return retVal;
}
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{

View File

@ -44,10 +44,10 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* An InventoryManager watches updates to inventory on Zookeeper (or some other discovery-like service publishing
* system). It is built up on two object types: containers and inventory objects.
*
* <p/>
* The logic of the InventoryManager just maintains a local cache of the containers and inventory it sees on ZK. It
* provides methods for getting at the container objects, which house the actual individual pieces of inventory.
*
* <p/>
* A Strategy is provided to the constructor of an Inventory manager, this strategy provides all of the
* object-specific logic to serialize, deserialize, compose and alter the container and inventory objects.
*/
@ -128,8 +128,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
final ContainerHolder containerHolder = containers.remove(containerKey);
if (containerHolder == null) {
log.wtf("!? Got key[%s] from keySet() but it didn't have a value!?", containerKey);
}
else {
} else {
// This close() call actually calls shutdownNow() on the executor registered with the Cache object...
containerHolder.getCache().close();
}
@ -202,52 +201,58 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
switch (event.getType()) {
case CHILD_ADDED:
container = strategy.deserializeContainer(child.getData());
synchronized (lock) {
container = strategy.deserializeContainer(child.getData());
// This would normally be a race condition, but the only thing that should be mutating the containers
// map is this listener, which should never run concurrently. If the same container is going to disappear
// and come back, we expect a removed event in between.
if (containers.containsKey(containerKey)) {
log.error("New node[%s] but there was already one. That's not good, ignoring new one.", child.getPath());
}
// This would normally be a race condition, but the only thing that should be mutating the containers
// map is this listener, which should never run concurrently. If the same container is going to disappear
// and come back, we expect a removed event in between.
if (containers.containsKey(containerKey)) {
log.error("New node[%s] but there was already one. That's not good, ignoring new one.", child.getPath());
} else {
final String inventoryPath = String.format("%s/%s", config.getInventoryPath(), containerKey);
PathChildrenCache inventoryCache = cacheFactory.make(curatorFramework, inventoryPath);
inventoryCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath));
final String inventoryPath = String.format("%s/%s", config.getInventoryPath(), containerKey);
PathChildrenCache inventoryCache = cacheFactory.make(curatorFramework, inventoryPath);
inventoryCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath));
containers.put(containerKey, new ContainerHolder(container, inventoryCache));
containers.put(containerKey, new ContainerHolder(container, inventoryCache));
log.info("Starting inventory cache for %s", container);
inventoryCache.start();
strategy.newContainer(container);
}
inventoryCache.start();
strategy.newContainer(container);
break;
case CHILD_REMOVED:
final ContainerHolder removed = containers.remove(containerKey);
if (removed == null) {
log.warn("Container[%s] removed that wasn't a container!?", child.getPath());
break;
}
// This close() call actually calls shutdownNow() on the executor registered with the Cache object, it
// better have its own executor or ignore shutdownNow() calls...
removed.getCache().close();
strategy.deadContainer(removed.getContainer());
break;
case CHILD_UPDATED:
container = strategy.deserializeContainer(child.getData());
ContainerHolder oldContainer = containers.get(containerKey);
if (oldContainer == null) {
log.warn("Container update[%s], but the old container didn't exist!? Ignoring.", child.getPath());
}
else {
synchronized (oldContainer) {
oldContainer.setContainer(strategy.updateContainer(oldContainer.getContainer(), container));
case CHILD_REMOVED:
synchronized (lock) {
final ContainerHolder removed = containers.remove(containerKey);
if (removed == null) {
log.warn("Container[%s] removed that wasn't a container!?", child.getPath());
break;
}
}
break;
// This close() call actually calls shutdownNow() on the executor registered with the Cache object, it
// better have its own executor or ignore shutdownNow() calls...
removed.getCache().close();
strategy.deadContainer(removed.getContainer());
break;
}
case CHILD_UPDATED:
synchronized (lock) {
container = strategy.deserializeContainer(child.getData());
ContainerHolder oldContainer = containers.get(containerKey);
if (oldContainer == null) {
log.warn("Container update[%s], but the old container didn't exist!? Ignoring.", child.getPath());
} else {
synchronized (oldContainer) {
oldContainer.setContainer(strategy.updateContainer(oldContainer.getContainer(), container));
}
}
break;
}
}
}