Merge branch 'master' into batch-zk

Conflicts:
	client/src/main/java/com/metamx/druid/QueryableNode.java
This commit is contained in:
fjy 2013-07-23 10:14:26 -07:00
commit ea7c7d1d66
15 changed files with 183 additions and 62 deletions

View File

@ -43,11 +43,11 @@ import com.metamx.druid.client.ServerView;
import com.metamx.druid.client.SingleServerInventoryView; import com.metamx.druid.client.SingleServerInventoryView;
import com.metamx.druid.concurrent.Execs; import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer; import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer;
import com.metamx.druid.coordination.BatchingCuratorDataSegmentAnnouncer; import com.metamx.druid.coordination.BatchDataSegmentAnnouncer;
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer; import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer;
import com.metamx.druid.coordination.SingleDataSegmentAnnouncer;
import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.http.NoopRequestLogger; import com.metamx.druid.http.NoopRequestLogger;
import com.metamx.druid.http.RequestLogger; import com.metamx.druid.http.RequestLogger;
@ -456,7 +456,7 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
final DataSegmentAnnouncer dataSegmentAnnouncer; final DataSegmentAnnouncer dataSegmentAnnouncer;
if ("batch".equalsIgnoreCase(announcerType)) { if ("batch".equalsIgnoreCase(announcerType)) {
dataSegmentAnnouncer = new BatchingCuratorDataSegmentAnnouncer( dataSegmentAnnouncer = new BatchDataSegmentAnnouncer(
getDruidServerMetadata(), getDruidServerMetadata(),
config, config,
announcer, announcer,
@ -465,13 +465,13 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
} else if ("legacy".equalsIgnoreCase(announcerType)) { } else if ("legacy".equalsIgnoreCase(announcerType)) {
dataSegmentAnnouncer = new MultipleDataSegmentAnnouncerDataSegmentAnnouncer( dataSegmentAnnouncer = new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Arrays.<AbstractDataSegmentAnnouncer>asList( Arrays.<AbstractDataSegmentAnnouncer>asList(
new BatchingCuratorDataSegmentAnnouncer( new BatchDataSegmentAnnouncer(
getDruidServerMetadata(), getDruidServerMetadata(),
config, config,
announcer, announcer,
getJsonMapper() getJsonMapper()
), ),
new CuratorDataSegmentAnnouncer( new SingleDataSegmentAnnouncer(
getDruidServerMetadata(), getDruidServerMetadata(),
getZkPaths(), getZkPaths(),
announcer, announcer,

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.MapMaker; import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -49,9 +50,27 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
) )
{ {
super( super(
config, zkPaths, curator, exec, jsonMapper, new TypeReference<Set<DataSegment>>() config,
{ new InventoryManagerConfig()
} {
@Override
public String getContainerPath()
{
return zkPaths.getAnnouncementsPath();
}
@Override
public String getInventoryPath()
{
return zkPaths.getLiveSegmentsPath();
}
},
curator,
exec,
jsonMapper,
new TypeReference<Set<DataSegment>>()
{
}
); );
} }
@ -85,6 +104,7 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
for (DataSegment segment : Sets.difference(existing, inventory)) { for (DataSegment segment : Sets.difference(existing, inventory)) {
removeSingleInventory(container, segment.getIdentifier()); removeSingleInventory(container, segment.getIdentifier());
} }
zNodes.put(inventoryKey, inventory);
return container; return container;
} }

View File

@ -30,7 +30,6 @@ import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.curator.inventory.CuratorInventoryManager; import com.metamx.druid.curator.inventory.CuratorInventoryManager;
import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy; import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy;
import com.metamx.druid.curator.inventory.InventoryManagerConfig; import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -59,7 +58,7 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
public ServerInventoryView( public ServerInventoryView(
final ServerInventoryViewConfig config, final ServerInventoryViewConfig config,
final ZkPathsConfig zkPaths, final InventoryManagerConfig inventoryManagerConfig,
final CuratorFramework curator, final CuratorFramework curator,
final ExecutorService exec, final ExecutorService exec,
final ObjectMapper jsonMapper, final ObjectMapper jsonMapper,
@ -69,20 +68,7 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
this.config = config; this.config = config;
this.inventoryManager = new CuratorInventoryManager<DruidServer, InventoryType>( this.inventoryManager = new CuratorInventoryManager<DruidServer, InventoryType>(
curator, curator,
new InventoryManagerConfig() inventoryManagerConfig,
{
@Override
public String getContainerPath()
{
return zkPaths.getAnnouncementsPath();
}
@Override
public String getInventoryPath()
{
return zkPaths.getServedSegmentsPath();
}
},
exec, exec,
new CuratorInventoryManagerStrategy<DruidServer, InventoryType>() new CuratorInventoryManagerStrategy<DruidServer, InventoryType>()
{ {
@ -246,6 +232,11 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
segmentCallbacks.put(callback, exec); segmentCallbacks.put(callback, exec);
} }
public InventoryManagerConfig getInventoryManagerConfig()
{
return inventoryManager.getConfig();
}
protected void runSegmentCallbacks( protected void runSegmentCallbacks(
final Function<SegmentCallback, CallbackAction> fn final Function<SegmentCallback, CallbackAction> fn
) )

View File

@ -21,6 +21,7 @@ package com.metamx.druid.client;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -39,9 +40,27 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
) )
{ {
super( super(
config, zkPaths, curator, exec, jsonMapper, new TypeReference<DataSegment>() config,
{ new InventoryManagerConfig()
} {
@Override
public String getContainerPath()
{
return zkPaths.getAnnouncementsPath();
}
@Override
public String getInventoryPath()
{
return zkPaths.getServedSegmentsPath();
}
},
curator,
exec,
jsonMapper,
new TypeReference<DataSegment>()
{
}
); );
} }

View File

@ -86,7 +86,7 @@ public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnounc
return; return;
} }
log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config); log.info("Stopping %s with config[%s]", getClass(), config);
announcer.unannounce(makeAnnouncementPath()); announcer.unannounce(makeAnnouncementPath());
started = false; started = false;

View File

@ -22,33 +22,26 @@ package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig; import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.io.IOException; import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/** /**
*/ */
public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{ {
private static final Logger log = new Logger(BatchingCuratorDataSegmentAnnouncer.class); private static final Logger log = new Logger(BatchDataSegmentAnnouncer.class);
private final ZkDataSegmentAnnouncerConfig config; private final ZkDataSegmentAnnouncerConfig config;
private final Announcer announcer; private final Announcer announcer;
@ -58,7 +51,7 @@ public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnno
private final Set<SegmentZNode> availableZNodes = Sets.newHashSet(); private final Set<SegmentZNode> availableZNodes = Sets.newHashSet();
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap(); private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap();
public BatchingCuratorDataSegmentAnnouncer( public BatchDataSegmentAnnouncer(
DruidServerMetadata server, DruidServerMetadata server,
ZkDataSegmentAnnouncerConfig config, ZkDataSegmentAnnouncerConfig config,
Announcer announcer, Announcer announcer,

View File

@ -28,15 +28,15 @@ import org.apache.curator.utils.ZKPaths;
import java.io.IOException; import java.io.IOException;
public class CuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer public class SingleDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{ {
private static final Logger log = new Logger(CuratorDataSegmentAnnouncer.class); private static final Logger log = new Logger(SingleDataSegmentAnnouncer.class);
private final Announcer announcer; private final Announcer announcer;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final String servedSegmentsLocation; private final String servedSegmentsLocation;
public CuratorDataSegmentAnnouncer( public SingleDataSegmentAnnouncer(
DruidServerMetadata server, DruidServerMetadata server,
ZkPathsConfig config, ZkPathsConfig config,
Announcer announcer, Announcer announcer,

View File

@ -48,6 +48,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -64,6 +65,7 @@ public class Announcer
private final List<Pair<String, byte[]>> toAnnounce = Lists.newArrayList(); private final List<Pair<String, byte[]>> toAnnounce = Lists.newArrayList();
private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap(); private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap();
private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap(); private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap();
private final List<String> parentsIBuilt = new CopyOnWriteArrayList<String>();
private boolean started = false; private boolean started = false;
@ -114,6 +116,15 @@ public class Announcer
unannounce(ZKPaths.makePath(basePath, announcementPath)); unannounce(ZKPaths.makePath(basePath, announcementPath));
} }
} }
for (String parent : parentsIBuilt) {
try {
curator.delete().forPath(parent);
}
catch (Exception e) {
log.info(e, "Unable to delete parent[%s], boooo.", parent);
}
}
} }
} }
@ -136,10 +147,19 @@ public class Announcer
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath(); final String parentPath = pathAndNode.getPath();
boolean buildParentPath = false;
ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath); ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
if (subPaths == null) { if (subPaths == null) {
try {
if (curator.checkExists().forPath(parentPath) == null) {
buildParentPath = true;
}
}
catch (Exception e) {
log.debug(e, "Problem checking if the parent existed, ignoring.");
}
// I don't have a watcher on this path yet, create a Map and start watching. // I don't have a watcher on this path yet, create a Map and start watching.
announcements.putIfAbsent(parentPath, new MapMaker().<String, byte[]>makeMap()); announcements.putIfAbsent(parentPath, new MapMaker().<String, byte[]>makeMap());
@ -208,17 +228,15 @@ public class Announcer
} }
); );
try { synchronized (toAnnounce) {
synchronized (toAnnounce) { if (started) {
if (started) { if (buildParentPath) {
cache.start(); createPath(parentPath);
listeners.put(parentPath, cache);
} }
startCache(cache);
listeners.put(parentPath, cache);
} }
} }
catch (Exception e) {
throw Throwables.propagate(e);
}
} }
} }
@ -261,7 +279,7 @@ public class Announcer
throw new ISE("Cannot update a path[%s] that hasn't been announced!", path); throw new ISE("Cannot update a path[%s] that hasn't been announced!", path);
} }
synchronized (subPaths) { synchronized (toAnnounce) {
try { try {
byte[] oldBytes = subPaths.get(nodePath); byte[] oldBytes = subPaths.get(nodePath);
@ -320,4 +338,26 @@ public class Announcer
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }
private void startCache(PathChildrenCache cache)
{
try {
cache.start();
}
catch (Exception e) {
Closeables.closeQuietly(cache);
throw Throwables.propagate(e);
}
}
private void createPath(String parentPath)
{
try {
curator.create().creatingParentsIfNeeded().forPath(parentPath);
parentsIBuilt.add(parentPath);
}
catch (Exception e) {
log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath);
}
}
} }

View File

@ -135,6 +135,11 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
} }
} }
public InventoryManagerConfig getConfig()
{
return config;
}
public ContainerClass getInventoryValue(String containerKey) public ContainerClass getInventoryValue(String containerKey)
{ {
final ContainerHolder containerHolder = containers.get(containerKey); final ContainerHolder containerHolder = containers.get(containerKey);

View File

@ -47,7 +47,7 @@ import java.util.Set;
/** /**
*/ */
public class BatchingCuratorDataSegmentAnnouncerTest public class BatchDataSegmentAnnouncerTest
{ {
private static final String testBasePath = "/test"; private static final String testBasePath = "/test";
private static final String testSegmentsPath = "/test/segments/id"; private static final String testSegmentsPath = "/test/segments/id";
@ -58,7 +58,7 @@ public class BatchingCuratorDataSegmentAnnouncerTest
private ObjectMapper jsonMapper; private ObjectMapper jsonMapper;
private Announcer announcer; private Announcer announcer;
private SegmentReader segmentReader; private SegmentReader segmentReader;
private BatchingCuratorDataSegmentAnnouncer segmentAnnouncer; private BatchDataSegmentAnnouncer segmentAnnouncer;
private Set<DataSegment> testSegments; private Set<DataSegment> testSegments;
@Before @Before
@ -84,7 +84,7 @@ public class BatchingCuratorDataSegmentAnnouncerTest
announcer.start(); announcer.start();
segmentReader = new SegmentReader(cf, jsonMapper); segmentReader = new SegmentReader(cf, jsonMapper);
segmentAnnouncer = new BatchingCuratorDataSegmentAnnouncer( segmentAnnouncer = new BatchDataSegmentAnnouncer(
new DruidServerMetadata( new DruidServerMetadata(
"id", "id",
"host", "host",

View File

@ -27,6 +27,8 @@ import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.test.KillSession; import org.apache.curator.test.KillSession;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.data.Stat;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -60,7 +62,6 @@ public class AnnouncerTest extends CuratorTestBase
public void testSanity() throws Exception public void testSanity() throws Exception
{ {
curator.start(); curator.start();
curator.create().forPath("/somewhere");
Announcer announcer = new Announcer(curator, exec); Announcer announcer = new Announcer(curator, exec);
final byte[] billy = "billy".getBytes(); final byte[] billy = "billy".getBytes();
@ -163,4 +164,54 @@ public class AnnouncerTest extends CuratorTestBase
announcer.stop(); announcer.stop();
} }
} }
@Test
public void testCleansUpItsLittleTurdlings() throws Exception
{
curator.start();
Announcer announcer = new Announcer(curator, exec);
final byte[] billy = "billy".getBytes();
final String testPath = "/somewhere/test2";
final String parent = ZKPaths.getPathAndNode(testPath).getPath();
announcer.start();
Assert.assertNull(curator.checkExists().forPath(parent));
announcer.announce(testPath, billy);
Assert.assertNotNull(curator.checkExists().forPath(parent));
announcer.stop();
Assert.assertNull(curator.checkExists().forPath(parent));
}
@Test
public void testLeavesBehindTurdlingsThatAlreadyExisted() throws Exception
{
curator.start();
Announcer announcer = new Announcer(curator, exec);
final byte[] billy = "billy".getBytes();
final String testPath = "/somewhere/test2";
final String parent = ZKPaths.getPathAndNode(testPath).getPath();
curator.create().forPath(parent);
final Stat initialStat = curator.checkExists().forPath(parent);
announcer.start();
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
announcer.announce(testPath, billy);
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
announcer.stop();
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
}
} }

View File

@ -50,14 +50,12 @@ public class ZkCoordinator implements DataSegmentChangeHandler
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ZkCoordinatorConfig config; private final ZkCoordinatorConfig config;
private final ZkPathsConfig zkPaths;
private final DruidServerMetadata me; private final DruidServerMetadata me;
private final DataSegmentAnnouncer announcer; private final DataSegmentAnnouncer announcer;
private final CuratorFramework curator; private final CuratorFramework curator;
private final ServerManager serverManager; private final ServerManager serverManager;
private final String loadQueueLocation;
private final String servedSegmentsLocation;
private volatile PathChildrenCache loadQueueCache; private volatile PathChildrenCache loadQueueCache;
private volatile boolean started; private volatile boolean started;
@ -73,13 +71,11 @@ public class ZkCoordinator implements DataSegmentChangeHandler
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.config = config; this.config = config;
this.zkPaths = zkPaths;
this.me = me; this.me = me;
this.announcer = announcer; this.announcer = announcer;
this.curator = curator; this.curator = curator;
this.serverManager = serverManager; this.serverManager = serverManager;
this.loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
this.servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
} }
@LifecycleStart @LifecycleStart
@ -91,6 +87,10 @@ public class ZkCoordinator implements DataSegmentChangeHandler
return; return;
} }
final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName());
loadQueueCache = new PathChildrenCache( loadQueueCache = new PathChildrenCache(
curator, curator,
loadQueueLocation, loadQueueLocation,
@ -104,6 +104,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
if (config.isLoadFromSegmentCacheEnabled()) { if (config.isLoadFromSegmentCacheEnabled()) {
loadCache(); loadCache();

View File

@ -291,7 +291,7 @@ public class DruidMaster
final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName); final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName);
final String toServedSegPath = ZKPaths.makePath( final String toServedSegPath = ZKPaths.makePath(
ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName ZKPaths.makePath(serverInventoryView.getInventoryManagerConfig().getInventoryPath(), to), segmentName
); );
loadPeon.loadSegment( loadPeon.loadSegment(

View File

@ -296,6 +296,7 @@ public class DruidSetup
createPath(curator, zkPaths.getMasterPath(), out); createPath(curator, zkPaths.getMasterPath(), out);
createPath(curator, zkPaths.getLoadQueuePath(), out); createPath(curator, zkPaths.getLoadQueuePath(), out);
createPath(curator, zkPaths.getServedSegmentsPath(), out); createPath(curator, zkPaths.getServedSegmentsPath(), out);
createPath(curator, zkPaths.getLiveSegmentsPath(), out);
createPath(curator, zkPaths.getPropertiesPath(), out); createPath(curator, zkPaths.getPropertiesPath(), out);
} }

View File

@ -93,7 +93,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
} }
}; };
announcer = new CuratorDataSegmentAnnouncer( announcer = new SingleDataSegmentAnnouncer(
me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper
); );