diff --git a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java index ff16b7e9025..16e8b723d1c 100644 --- a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java @@ -22,18 +22,15 @@ package com.metamx.druid.coordination; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; -import com.metamx.druid.client.DruidServer; import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.initialization.ZkPathsConfig; import com.netflix.curator.utils.ZKPaths; import java.io.IOException; -import java.util.Map; public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer { @@ -100,14 +97,16 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer public void announceSegment(DataSegment segment) throws IOException { - log.info("Announcing realtime segment %s", segment.getIdentifier()); - announcer.announce(makeServedSegmentPath(segment), jsonMapper.writeValueAsBytes(segment)); + final String path = makeServedSegmentPath(segment); + log.info("Announcing segment[%s] to path[%s]", segment.getIdentifier(), path); + announcer.announce(path, jsonMapper.writeValueAsBytes(segment)); } public void unannounceSegment(DataSegment segment) throws IOException { - log.info("Unannouncing realtime segment %s", segment.getIdentifier()); - announcer.unannounce(makeServedSegmentPath(segment)); + final String path = makeServedSegmentPath(segment); + log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), path); + announcer.unannounce(path); } private String makeAnnouncementPath() { diff --git a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java index 6915aaf79d0..57e4ad6b0ee 100644 --- a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java +++ b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java @@ -190,6 +190,7 @@ public class Announcer */ public void unannounce(String path) { + log.info("unannouncing [%s]", path); final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); final String parentPath = pathAndNode.getPath(); @@ -202,5 +203,12 @@ public class Announcer if (subPaths.remove(pathAndNode.getNode()) == null) { throw new IAE("Path[%s] not announced, cannot unannounce.", path); } + + try { + curator.delete().guaranteed().forPath(path); + } + catch (Exception e) { + throw Throwables.propagate(e); + } } } diff --git a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java index 71f06e71456..2e6eba3836e 100644 --- a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java +++ b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java @@ -19,6 +19,7 @@ package com.metamx.druid.curator.inventory; +import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -30,10 +31,13 @@ import com.metamx.druid.curator.ShutdownNowIgnoringExecutorService; import com.metamx.druid.curator.cache.PathChildrenCacheFactory; import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory; import com.netflix.curator.framework.CuratorFramework; +import com.netflix.curator.framework.CuratorFrameworkFactory; import com.netflix.curator.framework.recipes.cache.ChildData; import com.netflix.curator.framework.recipes.cache.PathChildrenCache; import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent; import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener; +import com.netflix.curator.retry.RetryOneTime; +import com.netflix.curator.utils.ZKPaths; import java.io.IOException; import java.util.concurrent.ConcurrentMap; @@ -189,39 +193,36 @@ public class CuratorInventoryManager private class ContainerCacheListener implements PathChildrenCacheListener { @Override - public void childEvent( - CuratorFramework client, PathChildrenCacheEvent event - ) throws Exception + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - final ChildData containerChild = event.getData(); - final String containerKey = containerChild.getPath().substring(config.getInventoryPath().length()); + final ChildData child = event.getData(); + final String containerKey = ZKPaths.getNodeFromPath(child.getPath()); final ContainerClass container; switch (event.getType()) { case CHILD_ADDED: - container = strategy.deserializeContainer(containerChild.getData()); + container = strategy.deserializeContainer(child.getData()); // This would normally be a race condition, but the only thing that should be mutating the containers // map is this listener, which should never run concurrently. If the same container is going to disappear // and come back, we expect a removed event in between. if (containers.containsKey(containerKey)) { - log.error("New node[%s] but there was already one. That's not good, ignoring new one.", containerChild.getPath()); + log.error("New node[%s] but there was already one. That's not good, ignoring new one.", child.getPath()); } - final String inventoryPath = String.format("%s/%s", config.getContainerPath(), containerKey); - log.info("Creating listener on inventory[%s]", inventoryPath); - PathChildrenCache containerCache = cacheFactory.make(curatorFramework, inventoryPath); - containerCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath)); + final String inventoryPath = String.format("%s/%s", config.getInventoryPath(), containerKey); + PathChildrenCache inventoryCache = cacheFactory.make(curatorFramework, inventoryPath); + inventoryCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath)); - containers.put(containerKey, new ContainerHolder(container, containerCache)); + containers.put(containerKey, new ContainerHolder(container, inventoryCache)); - containerCache.start(); + inventoryCache.start(); break; case CHILD_REMOVED: final ContainerHolder removed = containers.remove(containerKey); if (removed == null) { - log.warn("Container[%s] removed that wasn't a container!?", containerChild.getPath()); + log.warn("Container[%s] removed that wasn't a container!?", child.getPath()); break; } @@ -232,11 +233,11 @@ public class CuratorInventoryManager break; case CHILD_UPDATED: - container = strategy.deserializeContainer(containerChild.getData()); + container = strategy.deserializeContainer(child.getData()); ContainerHolder oldContainer = containers.get(containerKey); if (oldContainer == null) { - log.warn("Container update[%s], but the old container didn't exist!? Ignoring.", containerChild.getPath()); + log.warn("Container update[%s], but the old container didn't exist!? Ignoring.", child.getPath()); } else { synchronized (oldContainer) { @@ -245,8 +246,6 @@ public class CuratorInventoryManager } break; - default: - log.info("Got event[%s] at containerPath[%s]", config.getInventoryPath()); } } @@ -262,24 +261,22 @@ public class CuratorInventoryManager } @Override - public void childEvent( - CuratorFramework client, PathChildrenCacheEvent event - ) throws Exception + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - final ChildData inventoryChild = event.getData(); + final ChildData child = event.getData(); final ContainerHolder holder = containers.get(containerKey); if (holder == null) { - log.error("Inventory[%s] change for non-existent container[%s]!?", inventoryChild.getPath()); + log.error("Inventory[%s] change for non-existent container[%s]!?", child.getPath()); return; } - final String inventoryKey = inventoryChild.getPath().substring(inventoryPath.length()); + final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath()); switch (event.getType()) { case CHILD_ADDED: case CHILD_UPDATED: - final InventoryClass inventory = strategy.deserializeInventory(inventoryChild.getData()); + final InventoryClass inventory = strategy.deserializeInventory(child.getData()); synchronized (holder) { holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, inventory)); diff --git a/index-common/src/main/java/com/metamx/druid/index/serde/ComplexColumnPartSerde.java b/index-common/src/main/java/com/metamx/druid/index/serde/ComplexColumnPartSerde.java index 264294950bc..b5ba4456e03 100644 --- a/index-common/src/main/java/com/metamx/druid/index/serde/ComplexColumnPartSerde.java +++ b/index-common/src/main/java/com/metamx/druid/index/serde/ComplexColumnPartSerde.java @@ -75,6 +75,6 @@ public class ComplexColumnPartSerde implements ColumnPartSerde @Override public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder) { - return serde.deserializeColumn(buffer, builder); + return serde == null ? this : serde.deserializeColumn(buffer, builder); } }