1) Adjust the Announcer to cleanup parent nodes that it creates. This stops it from leaving little turdlings lying around on zookeeper as nodes exit

2) Rename *CuratorDataSegmentAnnouncer because neither of them depend directly on Curator.
This commit is contained in:
cheddar 2013-07-22 16:31:01 -07:00
parent 5d96f6dc99
commit 7504f3ab65
8 changed files with 116 additions and 32 deletions

View File

@ -40,11 +40,11 @@ import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.ServerView; import com.metamx.druid.client.ServerView;
import com.metamx.druid.concurrent.Execs; import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer; import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer;
import com.metamx.druid.coordination.BatchingCuratorDataSegmentAnnouncer; import com.metamx.druid.coordination.BatchingDataSegmentAnnouncer;
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer; import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer;
import com.metamx.druid.coordination.SingleDataSegmentAnnouncer;
import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.http.NoopRequestLogger; import com.metamx.druid.http.NoopRequestLogger;
import com.metamx.druid.http.RequestLogger; import com.metamx.druid.http.RequestLogger;
@ -431,13 +431,13 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
setAnnouncer( setAnnouncer(
new MultipleDataSegmentAnnouncerDataSegmentAnnouncer( new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Arrays.<AbstractDataSegmentAnnouncer>asList( Arrays.<AbstractDataSegmentAnnouncer>asList(
new BatchingCuratorDataSegmentAnnouncer( new BatchingDataSegmentAnnouncer(
getDruidServerMetadata(), getDruidServerMetadata(),
getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class), getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class),
announcer, announcer,
getJsonMapper() getJsonMapper()
), ),
new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper()) new SingleDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper())
) )
) )
); );

View File

@ -86,7 +86,7 @@ public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnounc
return; return;
} }
log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config); log.info("Stopping %s with config[%s]", getClass(), config);
announcer.unannounce(makeAnnouncementPath()); announcer.unannounce(makeAnnouncementPath());
started = false; started = false;

View File

@ -22,33 +22,26 @@ package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig; import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.io.IOException; import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/** /**
*/ */
public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer public class BatchingDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{ {
private static final Logger log = new Logger(BatchingCuratorDataSegmentAnnouncer.class); private static final Logger log = new Logger(BatchingDataSegmentAnnouncer.class);
private final ZkDataSegmentAnnouncerConfig config; private final ZkDataSegmentAnnouncerConfig config;
private final Announcer announcer; private final Announcer announcer;
@ -58,7 +51,7 @@ public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnno
private final Set<SegmentZNode> availableZNodes = Sets.newHashSet(); private final Set<SegmentZNode> availableZNodes = Sets.newHashSet();
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap(); private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap();
public BatchingCuratorDataSegmentAnnouncer( public BatchingDataSegmentAnnouncer(
DruidServerMetadata server, DruidServerMetadata server,
ZkDataSegmentAnnouncerConfig config, ZkDataSegmentAnnouncerConfig config,
Announcer announcer, Announcer announcer,

View File

@ -28,15 +28,15 @@ import org.apache.curator.utils.ZKPaths;
import java.io.IOException; import java.io.IOException;
public class CuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer public class SingleDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{ {
private static final Logger log = new Logger(CuratorDataSegmentAnnouncer.class); private static final Logger log = new Logger(SingleDataSegmentAnnouncer.class);
private final Announcer announcer; private final Announcer announcer;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final String servedSegmentsLocation; private final String servedSegmentsLocation;
public CuratorDataSegmentAnnouncer( public SingleDataSegmentAnnouncer(
DruidServerMetadata server, DruidServerMetadata server,
ZkPathsConfig config, ZkPathsConfig config,
Announcer announcer, Announcer announcer,

View File

@ -48,6 +48,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -64,6 +65,7 @@ public class Announcer
private final List<Pair<String, byte[]>> toAnnounce = Lists.newArrayList(); private final List<Pair<String, byte[]>> toAnnounce = Lists.newArrayList();
private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap(); private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap();
private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap(); private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap();
private final List<String> parentsIBuilt = new CopyOnWriteArrayList<String>();
private boolean started = false; private boolean started = false;
@ -114,6 +116,15 @@ public class Announcer
unannounce(ZKPaths.makePath(basePath, announcementPath)); unannounce(ZKPaths.makePath(basePath, announcementPath));
} }
} }
for (String parent : parentsIBuilt) {
try {
curator.delete().forPath(parent);
}
catch (Exception e) {
log.info(e, "Unable to delete parent[%s], boooo.", parent);
}
}
} }
} }
@ -136,10 +147,19 @@ public class Announcer
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath(); final String parentPath = pathAndNode.getPath();
boolean buildParentPath = false;
ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath); ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
if (subPaths == null) { if (subPaths == null) {
try {
if (curator.checkExists().forPath(parentPath) == null) {
buildParentPath = true;
}
}
catch (Exception e) {
log.debug(e, "Problem checking if the parent existed, ignoring.");
}
// I don't have a watcher on this path yet, create a Map and start watching. // I don't have a watcher on this path yet, create a Map and start watching.
announcements.putIfAbsent(parentPath, new MapMaker().<String, byte[]>makeMap()); announcements.putIfAbsent(parentPath, new MapMaker().<String, byte[]>makeMap());
@ -208,18 +228,16 @@ public class Announcer
} }
); );
try {
synchronized (toAnnounce) { synchronized (toAnnounce) {
if (started) { if (started) {
cache.start(); if (buildParentPath) {
createPath(parentPath);
}
startCache(cache);
listeners.put(parentPath, cache); listeners.put(parentPath, cache);
} }
} }
} }
catch (Exception e) {
throw Throwables.propagate(e);
}
}
} }
subPaths = finalSubPaths; subPaths = finalSubPaths;
@ -261,7 +279,7 @@ public class Announcer
throw new ISE("Cannot update a path[%s] that hasn't been announced!", path); throw new ISE("Cannot update a path[%s] that hasn't been announced!", path);
} }
synchronized (subPaths) { synchronized (toAnnounce) {
try { try {
byte[] oldBytes = subPaths.get(nodePath); byte[] oldBytes = subPaths.get(nodePath);
@ -320,4 +338,26 @@ public class Announcer
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }
private void startCache(PathChildrenCache cache)
{
try {
cache.start();
}
catch (Exception e) {
Closeables.closeQuietly(cache);
throw Throwables.propagate(e);
}
}
private void createPath(String parentPath)
{
try {
curator.create().creatingParentsIfNeeded().forPath(parentPath);
parentsIBuilt.add(parentPath);
}
catch (Exception e) {
log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath);
}
}
} }

View File

@ -47,7 +47,7 @@ import java.util.Set;
/** /**
*/ */
public class BatchingCuratorDataSegmentAnnouncerTest public class BatchingDataSegmentAnnouncerTest
{ {
private static final String testBasePath = "/test"; private static final String testBasePath = "/test";
private static final String testSegmentsPath = "/test/segments/id"; private static final String testSegmentsPath = "/test/segments/id";
@ -58,7 +58,7 @@ public class BatchingCuratorDataSegmentAnnouncerTest
private ObjectMapper jsonMapper; private ObjectMapper jsonMapper;
private Announcer announcer; private Announcer announcer;
private SegmentReader segmentReader; private SegmentReader segmentReader;
private BatchingCuratorDataSegmentAnnouncer segmentAnnouncer; private BatchingDataSegmentAnnouncer segmentAnnouncer;
private Set<DataSegment> testSegments; private Set<DataSegment> testSegments;
@Before @Before
@ -84,7 +84,7 @@ public class BatchingCuratorDataSegmentAnnouncerTest
announcer.start(); announcer.start();
segmentReader = new SegmentReader(cf, jsonMapper); segmentReader = new SegmentReader(cf, jsonMapper);
segmentAnnouncer = new BatchingCuratorDataSegmentAnnouncer( segmentAnnouncer = new BatchingDataSegmentAnnouncer(
new DruidServerMetadata( new DruidServerMetadata(
"id", "id",
"host", "host",

View File

@ -27,6 +27,8 @@ import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.test.KillSession; import org.apache.curator.test.KillSession;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.data.Stat;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -60,7 +62,6 @@ public class AnnouncerTest extends CuratorTestBase
public void testSanity() throws Exception public void testSanity() throws Exception
{ {
curator.start(); curator.start();
curator.create().forPath("/somewhere");
Announcer announcer = new Announcer(curator, exec); Announcer announcer = new Announcer(curator, exec);
final byte[] billy = "billy".getBytes(); final byte[] billy = "billy".getBytes();
@ -163,4 +164,54 @@ public class AnnouncerTest extends CuratorTestBase
announcer.stop(); announcer.stop();
} }
} }
@Test
public void testCleansUpItsLittleTurdlings() throws Exception
{
curator.start();
Announcer announcer = new Announcer(curator, exec);
final byte[] billy = "billy".getBytes();
final String testPath = "/somewhere/test2";
final String parent = ZKPaths.getPathAndNode(testPath).getPath();
announcer.start();
Assert.assertNull(curator.checkExists().forPath(parent));
announcer.announce(testPath, billy);
Assert.assertNotNull(curator.checkExists().forPath(parent));
announcer.stop();
Assert.assertNull(curator.checkExists().forPath(parent));
}
@Test
public void testLeavesBehindTurdlingsThatAlreadyExisted() throws Exception
{
curator.start();
Announcer announcer = new Announcer(curator, exec);
final byte[] billy = "billy".getBytes();
final String testPath = "/somewhere/test2";
final String parent = ZKPaths.getPathAndNode(testPath).getPath();
curator.create().forPath(parent);
final Stat initialStat = curator.checkExists().forPath(parent);
announcer.start();
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
announcer.announce(testPath, billy);
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
announcer.stop();
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
}
} }

View File

@ -93,7 +93,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
} }
}; };
announcer = new CuratorDataSegmentAnnouncer( announcer = new SingleDataSegmentAnnouncer(
me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper
); );