1) Nicer logging

2) Announcer should actually delete its zk entries when it is told to unannounce something
3) Skip over complex columns that we don't have a proper deserializer for
This commit is contained in:
cheddar 2013-04-24 17:26:27 -05:00
parent b8ba9138ff
commit 9153cc94ac
4 changed files with 37 additions and 33 deletions

View File

@ -22,18 +22,15 @@ package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.netflix.curator.utils.ZKPaths;
import java.io.IOException;
import java.util.Map;
public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
{
@ -100,14 +97,16 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
public void announceSegment(DataSegment segment) throws IOException
{
log.info("Announcing realtime segment %s", segment.getIdentifier());
announcer.announce(makeServedSegmentPath(segment), jsonMapper.writeValueAsBytes(segment));
final String path = makeServedSegmentPath(segment);
log.info("Announcing segment[%s] to path[%s]", segment.getIdentifier(), path);
announcer.announce(path, jsonMapper.writeValueAsBytes(segment));
}
public void unannounceSegment(DataSegment segment) throws IOException
{
log.info("Unannouncing realtime segment %s", segment.getIdentifier());
announcer.unannounce(makeServedSegmentPath(segment));
final String path = makeServedSegmentPath(segment);
log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), path);
announcer.unannounce(path);
}
private String makeAnnouncementPath() {

View File

@ -190,6 +190,7 @@ public class Announcer
*/
public void unannounce(String path)
{
log.info("unannouncing [%s]", path);
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath();
@ -202,5 +203,12 @@ public class Announcer
if (subPaths.remove(pathAndNode.getNode()) == null) {
throw new IAE("Path[%s] not announced, cannot unannounce.", path);
}
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.curator.inventory;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -30,10 +31,13 @@ import com.metamx.druid.curator.ShutdownNowIgnoringExecutorService;
import com.metamx.druid.curator.cache.PathChildrenCacheFactory;
import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.framework.recipes.cache.ChildData;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
import com.netflix.curator.retry.RetryOneTime;
import com.netflix.curator.utils.ZKPaths;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
@ -189,39 +193,36 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
private class ContainerCacheListener implements PathChildrenCacheListener
{
@Override
public void childEvent(
CuratorFramework client, PathChildrenCacheEvent event
) throws Exception
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
final ChildData containerChild = event.getData();
final String containerKey = containerChild.getPath().substring(config.getInventoryPath().length());
final ChildData child = event.getData();
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
final ContainerClass container;
switch (event.getType()) {
case CHILD_ADDED:
container = strategy.deserializeContainer(containerChild.getData());
container = strategy.deserializeContainer(child.getData());
// This would normally be a race condition, but the only thing that should be mutating the containers
// map is this listener, which should never run concurrently. If the same container is going to disappear
// and come back, we expect a removed event in between.
if (containers.containsKey(containerKey)) {
log.error("New node[%s] but there was already one. That's not good, ignoring new one.", containerChild.getPath());
log.error("New node[%s] but there was already one. That's not good, ignoring new one.", child.getPath());
}
final String inventoryPath = String.format("%s/%s", config.getContainerPath(), containerKey);
log.info("Creating listener on inventory[%s]", inventoryPath);
PathChildrenCache containerCache = cacheFactory.make(curatorFramework, inventoryPath);
containerCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath));
final String inventoryPath = String.format("%s/%s", config.getInventoryPath(), containerKey);
PathChildrenCache inventoryCache = cacheFactory.make(curatorFramework, inventoryPath);
inventoryCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath));
containers.put(containerKey, new ContainerHolder(container, containerCache));
containers.put(containerKey, new ContainerHolder(container, inventoryCache));
containerCache.start();
inventoryCache.start();
break;
case CHILD_REMOVED:
final ContainerHolder removed = containers.remove(containerKey);
if (removed == null) {
log.warn("Container[%s] removed that wasn't a container!?", containerChild.getPath());
log.warn("Container[%s] removed that wasn't a container!?", child.getPath());
break;
}
@ -232,11 +233,11 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
break;
case CHILD_UPDATED:
container = strategy.deserializeContainer(containerChild.getData());
container = strategy.deserializeContainer(child.getData());
ContainerHolder oldContainer = containers.get(containerKey);
if (oldContainer == null) {
log.warn("Container update[%s], but the old container didn't exist!? Ignoring.", containerChild.getPath());
log.warn("Container update[%s], but the old container didn't exist!? Ignoring.", child.getPath());
}
else {
synchronized (oldContainer) {
@ -245,8 +246,6 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
}
break;
default:
log.info("Got event[%s] at containerPath[%s]", config.getInventoryPath());
}
}
@ -262,24 +261,22 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
}
@Override
public void childEvent(
CuratorFramework client, PathChildrenCacheEvent event
) throws Exception
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
final ChildData inventoryChild = event.getData();
final ChildData child = event.getData();
final ContainerHolder holder = containers.get(containerKey);
if (holder == null) {
log.error("Inventory[%s] change for non-existent container[%s]!?", inventoryChild.getPath());
log.error("Inventory[%s] change for non-existent container[%s]!?", child.getPath());
return;
}
final String inventoryKey = inventoryChild.getPath().substring(inventoryPath.length());
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
final InventoryClass inventory = strategy.deserializeInventory(inventoryChild.getData());
final InventoryClass inventory = strategy.deserializeInventory(child.getData());
synchronized (holder) {
holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, inventory));

View File

@ -75,6 +75,6 @@ public class ComplexColumnPartSerde implements ColumnPartSerde
@Override
public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder)
{
return serde.deserializeColumn(buffer, builder);
return serde == null ? this : serde.deserializeColumn(buffer, builder);
}
}