1) Make druid.zk.paths.base required

2) Unit tests for announcer and fix race condition with session death
This commit is contained in:
cheddar 2013-04-26 16:28:57 -05:00
parent 163a73bed1
commit 046647c203
4 changed files with 238 additions and 13 deletions

View File

@ -171,6 +171,14 @@
<groupId>com.google.code.simple-spring-memcached</groupId> <groupId>com.google.code.simple-spring-memcached</groupId>
<artifactId>spymemcached</artifactId> <artifactId>spymemcached</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
<!-- Tests --> <!-- Tests -->
<dependency> <dependency>
@ -189,13 +197,11 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.antlr</groupId> <groupId>org.apache.curator</groupId>
<artifactId>antlr4-runtime</artifactId> <artifactId>curator-test</artifactId>
</dependency> <scope>test</scope>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -3,6 +3,7 @@ package com.metamx.druid.curator.announcement;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker; import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
@ -10,6 +11,8 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.ShutdownNowIgnoringExecutorService; import com.metamx.druid.curator.ShutdownNowIgnoringExecutorService;
import com.metamx.druid.curator.cache.PathChildrenCacheFactory;
import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@ -17,20 +20,24 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* Announces things on Zookeeper.
*/ */
public class Announcer public class Announcer
{ {
private static final Logger log = new Logger(Announcer.class); private static final Logger log = new Logger(Announcer.class);
private final CuratorFramework curator; private final CuratorFramework curator;
private final ExecutorService exec; private final PathChildrenCacheFactory factory;
private final List<Pair<String, byte[]>> toAnnounce = Lists.newArrayList(); private final List<Pair<String, byte[]>> toAnnounce = Lists.newArrayList();
private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap(); private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap();
@ -44,7 +51,7 @@ public class Announcer
) )
{ {
this.curator = curator; this.curator = curator;
this.exec = new ShutdownNowIgnoringExecutorService(exec); this.factory = new SimplePathChildrenCacheFactory(false, true, new ShutdownNowIgnoringExecutorService(exec));
} }
@LifecycleStart @LifecycleStart
@ -88,6 +95,13 @@ public class Announcer
} }
} }
/**
* Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node
* and monitor it to make sure that it always exists until it is unannounced or this object is closed.
*
* @param path The path to announce at
* @param bytes The payload to announce
*/
public void announce(String path, byte[] bytes) public void announce(String path, byte[] bytes)
{ {
synchronized (toAnnounce) { synchronized (toAnnounce) {
@ -114,13 +128,16 @@ public class Announcer
// Synchronize to make sure that I only create a listener once. // Synchronize to make sure that I only create a listener once.
synchronized (finalSubPaths) { synchronized (finalSubPaths) {
if (! listeners.containsKey(parentPath)) { if (! listeners.containsKey(parentPath)) {
PathChildrenCache cache = new PathChildrenCache(curator, parentPath, true, false, exec); final PathChildrenCache cache = factory.make(curator, parentPath);
cache.getListenable().addListener( cache.getListenable().addListener(
new PathChildrenCacheListener() new PathChildrenCacheListener()
{ {
private final AtomicReference<Set<String>> pathsLost = new AtomicReference<Set<String>>(null);
@Override @Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{ {
log.info("Path[%s] got event[%s]", parentPath, event);
switch (event.getType()) { switch (event.getType()) {
case CHILD_REMOVED: case CHILD_REMOVED:
final ChildData child = event.getData(); final ChildData child = event.getData();
@ -130,6 +147,40 @@ public class Announcer
log.info("Node[%s] dropped, reinstating.", child.getPath()); log.info("Node[%s] dropped, reinstating.", child.getPath());
createAnnouncement(child.getPath(), value); createAnnouncement(child.getPath(), value);
} }
break;
case CONNECTION_LOST:
// Lost connection, which means session is broken, take inventory of what has been seen.
// This is to protect from a race condition in which the ephemeral node could have been
// created but not actually seen by the PathChildrenCache, which means that it won't know
// that it disappeared and thus will not generate a CHILD_REMOVED event for us. Under normal
// circumstances, this can only happen upon connection loss; but technically if you have
// an adversary in the system, they could also delete the ephemeral node before the cache sees
// it. This does not protect from that case, so don't have adversaries.
Set<String> pathsToReinstate = Sets.newHashSet();
for (String node : finalSubPaths.keySet()) {
pathsToReinstate.add(ZKPaths.makePath(parentPath, node));
}
for (ChildData data : cache.getCurrentData()) {
pathsToReinstate.remove(data.getPath());
}
if (!pathsToReinstate.isEmpty() && !pathsLost.compareAndSet(null, pathsToReinstate)) {
log.info("Already had a pathsLost set!?[%s]", parentPath);
}
break;
case CONNECTION_RECONNECTED:
final Set<String> thePathsLost = pathsLost.getAndSet(null);
if (thePathsLost != null) {
for (String path : thePathsLost) {
log.info("Reinstating [%s]", path);
final ZKPaths.PathAndNode split = ZKPaths.getPathAndNode(path);
createAnnouncement(path, announcements.get(split.getPath()).get(split.getNode()));
}
}
break;
} }
} }
} }
@ -207,6 +258,9 @@ public class Announcer
try { try {
curator.delete().guaranteed().forPath(path); curator.delete().guaranteed().forPath(path);
} }
catch (KeeperException.NoNodeException e) {
log.info("node[%s] didn't exist anyway...", path);
}
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }

View File

@ -24,10 +24,7 @@ import org.skife.config.Config;
public abstract class ZkPathsConfig public abstract class ZkPathsConfig
{ {
@Config("druid.zk.paths.base") @Config("druid.zk.paths.base")
public String getZkBasePath() public abstract String getZkBasePath();
{
return "/druid";
}
@Config("druid.zk.paths.propertiesPath") @Config("druid.zk.paths.propertiesPath")
public String getPropertiesPath() public String getPropertiesPath()

View File

@ -0,0 +1,168 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.curator.announcement;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.druid.concurrent.Execs;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
/**
*/
public class AnnouncerTest
{
private TestingServer server;
@Before
public void setUp() throws Exception
{
server = new TestingServer();
}
@Test
public void testSanity() throws Exception
{
Timing timing = new Timing();
CuratorFramework curator = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
final ExecutorService exec = Execs.singleThreaded("test-announcer-sanity-%s");
curator.start();
try {
curator.create().forPath("/somewhere");
Announcer announcer = new Announcer(curator, exec);
final byte[] billy = "billy".getBytes();
final String testPath1 = "/test1";
final String testPath2 = "/somewhere/test2";
announcer.announce(testPath1, billy);
Assert.assertNull(curator.checkExists().forPath(testPath1));
Assert.assertNull(curator.checkExists().forPath(testPath2));
announcer.start();
Assert.assertArrayEquals(billy, curator.getData().forPath(testPath1));
Assert.assertNull(curator.checkExists().forPath(testPath2));
announcer.announce(testPath2, billy);
Assert.assertArrayEquals(billy, curator.getData().forPath(testPath1));
Assert.assertArrayEquals(billy, curator.getData().forPath(testPath2));
curator.delete().forPath(testPath1);
Thread.sleep(20); // Give the announcer time to notice
Assert.assertArrayEquals(billy, curator.getData().forPath(testPath1));
Assert.assertArrayEquals(billy, curator.getData().forPath(testPath2));
announcer.unannounce(testPath1);
Assert.assertNull(curator.checkExists().forPath(testPath1));
Assert.assertArrayEquals(billy, curator.getData().forPath(testPath2));
announcer.stop();
Assert.assertNull(curator.checkExists().forPath(testPath1));
Assert.assertNull(curator.checkExists().forPath(testPath2));
}
finally {
Closeables.closeQuietly(curator);
}
}
@Test
public void testSessionKilled() throws Exception
{
Timing timing = new Timing();
CuratorFramework curator = CuratorFrameworkFactory
.builder()
.connectString(server.getConnectString())
.sessionTimeoutMs(timing.session())
.connectionTimeoutMs(timing.connection())
.retryPolicy(new RetryOneTime(1))
.build();
final ExecutorService exec = Execs.singleThreaded("test-announcer-sanity-%s");
curator.start();
Announcer announcer = new Announcer(curator, exec);
try {
curator.create().forPath("/somewhere");
announcer.start();
final byte[] billy = "billy".getBytes();
final String testPath1 = "/test1";
final String testPath2 = "/somewhere/test2";
final Set<String> paths = Sets.newHashSet(testPath1, testPath2);
announcer.announce(testPath1, billy);
announcer.announce(testPath2, billy);
Assert.assertArrayEquals(billy, curator.getData().forPath(testPath1));
Assert.assertArrayEquals(billy, curator.getData().forPath(testPath2));
final CountDownLatch latch = new CountDownLatch(1);
curator.getCuratorListenable().addListener(
new CuratorListener()
{
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
{
if (event.getType() == CuratorEventType.CREATE) {
paths.remove(event.getPath());
if (paths.isEmpty()) {
latch.countDown();
}
}
}
}
);
KillSession.kill(curator.getZookeeperClient().getZooKeeper(), server.getConnectString());
timing.awaitLatch(latch);
Assert.assertArrayEquals(billy, curator.getData().forPath(testPath1));
Assert.assertArrayEquals(billy, curator.getData().forPath(testPath2));
announcer.stop();
Assert.assertNull(curator.checkExists().forPath(testPath1));
Assert.assertNull(curator.checkExists().forPath(testPath2));
}
finally {
announcer.stop();
Closeables.closeQuietly(curator);
}
}
}