diff --git a/client/src/main/java/com/metamx/druid/client/BrokerServerView.java b/client/src/main/java/com/metamx/druid/client/BrokerServerView.java index ccb28464c6a..c340544e908 100644 --- a/client/src/main/java/com/metamx/druid/client/BrokerServerView.java +++ b/client/src/main/java/com/metamx/druid/client/BrokerServerView.java @@ -127,7 +127,7 @@ public class BrokerServerView implements TimelineServerView } } - private void addServer(DruidServer server) + private QueryableDruidServer addServer(DruidServer server) { QueryableDruidServer exists = clients.put( server.getName(), @@ -136,6 +136,8 @@ public class BrokerServerView implements TimelineServerView if (exists != null) { log.warn("QueryRunner for server[%s] already existed!?", server); } + + return exists; } private DirectDruidClient makeDirectClient(DruidServer server) @@ -143,12 +145,13 @@ public class BrokerServerView implements TimelineServerView return new DirectDruidClient(warehose, smileMapper, httpClient, server.getHost()); } - private void removeServer(DruidServer server) + private QueryableDruidServer removeServer(DruidServer server) { - clients.remove(server.getName()); + QueryableDruidServer retVal = clients.remove(server.getName()); for (DataSegment segment : server.getSegments().values()) { serverRemovedSegment(server, segment); } + return retVal; } private void serverAddedSegment(final DruidServer server, final DataSegment segment) @@ -171,10 +174,11 @@ public class BrokerServerView implements TimelineServerView selectors.put(segmentId, selector); } - if (!clients.containsKey(server.getName())) { - addServer(server); + QueryableDruidServer queryableDruidServer = clients.get(server.getName()); + if (queryableDruidServer == null) { + queryableDruidServer = addServer(server); } - selector.addServer(clients.get(server.getName())); + selector.addServer(queryableDruidServer); } } @@ -236,6 +240,7 @@ public class BrokerServerView implements TimelineServerView QueryableDruidServer queryableDruidServer = clients.get(server.getName()); if (queryableDruidServer == null) { log.error("WTF?! No QueryableDruidServer found for %s", server.getName()); + return null; } return queryableDruidServer.getClient(); } diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java index 4c6b302ad46..a61f46f58af 100644 --- a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java +++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java @@ -172,6 +172,16 @@ public class ServerInventoryView implements ServerView, InventoryView final DataSegment segment = container.getSegment(inventoryKey); final DruidServer retVal = container.removeDataSegment(inventoryKey); + if (segment == null) { + log.warn( + "Not running callbacks or cleanup for non-existing segment[%s] on server[%s]", + inventoryKey, + container.getName() + ); + + return retVal; + } + runSegmentCallbacks( new Function() { diff --git a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java index c1cb16d99c0..f308554e8d4 100644 --- a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java +++ b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java @@ -44,10 +44,10 @@ import java.util.concurrent.atomic.AtomicReference; /** * An InventoryManager watches updates to inventory on Zookeeper (or some other discovery-like service publishing * system). It is built up on two object types: containers and inventory objects. - * + *

* The logic of the InventoryManager just maintains a local cache of the containers and inventory it sees on ZK. It * provides methods for getting at the container objects, which house the actual individual pieces of inventory. - * + *

* A Strategy is provided to the constructor of an Inventory manager, this strategy provides all of the * object-specific logic to serialize, deserialize, compose and alter the container and inventory objects. */ @@ -128,8 +128,7 @@ public class CuratorInventoryManager final ContainerHolder containerHolder = containers.remove(containerKey); if (containerHolder == null) { log.wtf("!? Got key[%s] from keySet() but it didn't have a value!?", containerKey); - } - else { + } else { // This close() call actually calls shutdownNow() on the executor registered with the Cache object... containerHolder.getCache().close(); } @@ -202,52 +201,58 @@ public class CuratorInventoryManager switch (event.getType()) { case CHILD_ADDED: - container = strategy.deserializeContainer(child.getData()); + synchronized (lock) { + container = strategy.deserializeContainer(child.getData()); - // This would normally be a race condition, but the only thing that should be mutating the containers - // map is this listener, which should never run concurrently. If the same container is going to disappear - // and come back, we expect a removed event in between. - if (containers.containsKey(containerKey)) { - log.error("New node[%s] but there was already one. That's not good, ignoring new one.", child.getPath()); - } + // This would normally be a race condition, but the only thing that should be mutating the containers + // map is this listener, which should never run concurrently. If the same container is going to disappear + // and come back, we expect a removed event in between. + if (containers.containsKey(containerKey)) { + log.error("New node[%s] but there was already one. That's not good, ignoring new one.", child.getPath()); + } else { + final String inventoryPath = String.format("%s/%s", config.getInventoryPath(), containerKey); + PathChildrenCache inventoryCache = cacheFactory.make(curatorFramework, inventoryPath); + inventoryCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath)); - final String inventoryPath = String.format("%s/%s", config.getInventoryPath(), containerKey); - PathChildrenCache inventoryCache = cacheFactory.make(curatorFramework, inventoryPath); - inventoryCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath)); + containers.put(containerKey, new ContainerHolder(container, inventoryCache)); - containers.put(containerKey, new ContainerHolder(container, inventoryCache)); + log.info("Starting inventory cache for %s", container); + inventoryCache.start(); + strategy.newContainer(container); + } - inventoryCache.start(); - strategy.newContainer(container); - - break; - case CHILD_REMOVED: - final ContainerHolder removed = containers.remove(containerKey); - if (removed == null) { - log.warn("Container[%s] removed that wasn't a container!?", child.getPath()); break; } - - // This close() call actually calls shutdownNow() on the executor registered with the Cache object, it - // better have its own executor or ignore shutdownNow() calls... - removed.getCache().close(); - strategy.deadContainer(removed.getContainer()); - - break; - case CHILD_UPDATED: - container = strategy.deserializeContainer(child.getData()); - - ContainerHolder oldContainer = containers.get(containerKey); - if (oldContainer == null) { - log.warn("Container update[%s], but the old container didn't exist!? Ignoring.", child.getPath()); - } - else { - synchronized (oldContainer) { - oldContainer.setContainer(strategy.updateContainer(oldContainer.getContainer(), container)); + case CHILD_REMOVED: + synchronized (lock) { + final ContainerHolder removed = containers.remove(containerKey); + if (removed == null) { + log.warn("Container[%s] removed that wasn't a container!?", child.getPath()); + break; } - } - break; + // This close() call actually calls shutdownNow() on the executor registered with the Cache object, it + // better have its own executor or ignore shutdownNow() calls... + removed.getCache().close(); + strategy.deadContainer(removed.getContainer()); + + break; + } + case CHILD_UPDATED: + synchronized (lock) { + container = strategy.deserializeContainer(child.getData()); + + ContainerHolder oldContainer = containers.get(containerKey); + if (oldContainer == null) { + log.warn("Container update[%s], but the old container didn't exist!? Ignoring.", child.getPath()); + } else { + synchronized (oldContainer) { + oldContainer.setContainer(strategy.updateContainer(oldContainer.getContainer(), container)); + } + } + + break; + } } }