fix according to code review

This commit is contained in:
fjy 2013-07-02 15:56:12 -07:00
parent 2969d9a713
commit 7219ed15d3
17 changed files with 320 additions and 219 deletions

View File

@ -39,18 +39,19 @@ import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig; import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.ServerView; import com.metamx.druid.client.ServerView;
import com.metamx.druid.concurrent.Execs; import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer;
import com.metamx.druid.coordination.BatchingCuratorDataSegmentAnnouncer; import com.metamx.druid.coordination.BatchingCuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer; import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer;
import com.metamx.druid.curator.SegmentReader;
import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.http.NoopRequestLogger; import com.metamx.druid.http.NoopRequestLogger;
import com.metamx.druid.http.RequestLogger; import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
@ -429,17 +430,12 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
setAnnouncer( setAnnouncer(
new MultipleDataSegmentAnnouncerDataSegmentAnnouncer( new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
getDruidServerMetadata(), Arrays.<AbstractDataSegmentAnnouncer>asList(
getZkPaths(),
announcer,
getJsonMapper(),
Arrays.<DataSegmentAnnouncer>asList(
new BatchingCuratorDataSegmentAnnouncer( new BatchingCuratorDataSegmentAnnouncer(
getDruidServerMetadata(), getDruidServerMetadata(),
getZkPaths(), getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class),
announcer, announcer,
getJsonMapper(), getJsonMapper()
new SegmentReader(curator, getJsonMapper())
), ),
new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper()) new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper())
) )

View File

@ -19,19 +19,26 @@
package com.metamx.druid.coordination; package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.SegmentReader;
import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.io.IOException; import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -43,21 +50,19 @@ public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnno
{ {
private static final Logger log = new Logger(BatchingCuratorDataSegmentAnnouncer.class); private static final Logger log = new Logger(BatchingCuratorDataSegmentAnnouncer.class);
private final ZkPathsConfig config; private final ZkDataSegmentAnnouncerConfig config;
private final Announcer announcer; private final Announcer announcer;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final SegmentReader segmentReader;
private final String liveSegmentLocation; private final String liveSegmentLocation;
private final Map<String, Integer> zNodes = Maps.newHashMap(); private final Set<SegmentZNode> availableZNodes = Sets.newHashSet();
private final Map<String, String> segmentLookup = new ConcurrentHashMap<String, String>(); private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap();
public BatchingCuratorDataSegmentAnnouncer( public BatchingCuratorDataSegmentAnnouncer(
DruidServerMetadata server, DruidServerMetadata server,
ZkPathsConfig config, ZkDataSegmentAnnouncerConfig config,
Announcer announcer, Announcer announcer,
ObjectMapper jsonMapper, ObjectMapper jsonMapper
SegmentReader segmentReader
) )
{ {
super(server, config, announcer, jsonMapper); super(server, config, announcer, jsonMapper);
@ -65,76 +70,91 @@ public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnno
this.config = config; this.config = config;
this.announcer = announcer; this.announcer = announcer;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.segmentReader = segmentReader;
this.liveSegmentLocation = ZKPaths.makePath(config.getLiveSegmentsPath(), server.getName()); this.liveSegmentLocation = ZKPaths.makePath(config.getLiveSegmentsPath(), server.getName());
} }
@Override @Override
public void announceSegment(DataSegment segment) throws IOException public void announceSegment(DataSegment segment) throws IOException
{ {
Map.Entry<String, Integer> zNode = (zNodes.entrySet().isEmpty()) ? null : zNodes.entrySet().iterator().next(); int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
if (newBytesLen > config.getMaxNumBytes()) {
final String path = (zNode == null) ? makeServedSegmentPath(new DateTime().toString()) : zNode.getKey(); throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
Set<DataSegment> zkSegments = segmentReader.read(path);
zkSegments.add(segment);
if (zkSegments.size() >= config.getSegmentsPerNode()) {
zNodes.remove(path);
} else {
zNodes.put(path, zkSegments.size());
}
segmentLookup.put(segment.getIdentifier(), path);
log.info("Announcing segment[%s] to path[%s]", segment.getIdentifier(), path);
byte[] bytes = jsonMapper.writeValueAsBytes(zkSegments);
if (bytes.length > config.getMaxNumBytes()) {
throw new ISE("byte size %,d exceeds %,d", bytes.length, config.getMaxNumBytes());
} }
announcer.update(path, bytes); if (availableZNodes.isEmpty()) {
availableZNodes.add(new SegmentZNode(makeServedSegmentPath(new DateTime().toString())));
}
Iterator<SegmentZNode> iter = availableZNodes.iterator();
boolean done = false;
while (iter.hasNext() && !done) {
SegmentZNode availableZNode = iter.next();
if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.update(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
availableZNodes.remove(availableZNode);
}
done = true;
}
}
} }
@Override @Override
public void unannounceSegment(DataSegment segment) throws IOException public void unannounceSegment(DataSegment segment) throws IOException
{ {
final String path = segmentLookup.get(segment.getIdentifier()); final SegmentZNode segmentZNode = segmentLookup.remove(segment);
segmentZNode.removeSegment(segment);
Set<DataSegment> zkSegments = segmentReader.read(path); log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
zkSegments.remove(segment); if (segmentZNode.getCount() == 0) {
availableZNodes.remove(segmentZNode);
log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), path); announcer.unannounce(segmentZNode.getPath());
} else {
if (zkSegments.isEmpty()) { announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
announcer.unannounce(path); availableZNodes.add(segmentZNode);
} else if (zkSegments.size() < config.getSegmentsPerNode()) {
zNodes.put(path, zkSegments.size());
announcer.update(path, jsonMapper.writeValueAsBytes(zkSegments));
} }
} }
@Override @Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException public void announceSegments(Iterable<DataSegment> segments) throws IOException
{ {
Iterable<List<DataSegment>> batched = Iterables.partition(segments, config.getSegmentsPerNode()); SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
Set<DataSegment> batch = Sets.newHashSet();
int byteSize = 0;
int count = 0;
for (List<DataSegment> batch : batched) { for (DataSegment segment : segments) {
final String path = makeServedSegmentPath(new DateTime().toString()); int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
for (DataSegment segment : batch) {
log.info("Announcing segment[%s] to path[%s]", segment.getIdentifier(), path); if (newBytesLen > config.getMaxNumBytes()) {
segmentLookup.put(segment.getIdentifier(), path); throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
}
if (batch.size() < config.getSegmentsPerNode()) {
zNodes.put(path, batch.size());
} }
byte[] bytes = jsonMapper.writeValueAsBytes(batch); if (count >= config.getSegmentsPerNode() || byteSize + newBytesLen > config.getMaxNumBytes()) {
if (bytes.length > config.getMaxNumBytes()) { segmentZNode.addSegments(batch);
throw new ISE("byte size %,d exceeds %,d", bytes.length, config.getMaxNumBytes()); announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
batch = Sets.newHashSet();
count = 0;
byteSize = 0;
} }
announcer.update(path, bytes); log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
segmentLookup.put(segment, segmentZNode);
batch.add(segment);
count++;
byteSize += newBytesLen;
} }
segmentZNode.addSegments(batch);
announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
} }
@Override @Override
@ -149,4 +169,122 @@ public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnno
{ {
return ZKPaths.makePath(liveSegmentLocation, zNode); return ZKPaths.makePath(liveSegmentLocation, zNode);
} }
private class SegmentZNode
{
private final String path;
private byte[] bytes = new byte[]{};
private int count = 0;
public SegmentZNode(String path)
{
this.path = path;
}
public String getPath()
{
return path;
}
public int getCount()
{
return count;
}
public byte[] getBytes()
{
return bytes;
}
public Set<DataSegment> getSegments()
{
if (bytes.length == 0) {
return Sets.newHashSet();
}
try {
return jsonMapper.readValue(
bytes, new TypeReference<Set<DataSegment>>()
{
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public void addSegment(DataSegment segment)
{
Set<DataSegment> zkSegments = getSegments();
zkSegments.add(segment);
try {
bytes = jsonMapper.writeValueAsBytes(zkSegments);
}
catch (Exception e) {
zkSegments.remove(segment);
throw Throwables.propagate(e);
}
count++;
}
public void addSegments(Set<DataSegment> segments)
{
Set<DataSegment> zkSegments = getSegments();
zkSegments.addAll(segments);
try {
bytes = jsonMapper.writeValueAsBytes(zkSegments);
}
catch (Exception e) {
zkSegments.removeAll(segments);
throw Throwables.propagate(e);
}
count += segments.size();
}
public void removeSegment(DataSegment segment)
{
Set<DataSegment> zkSegments = getSegments();
zkSegments.remove(segment);
try {
bytes = jsonMapper.writeValueAsBytes(zkSegments);
}
catch (Exception e) {
zkSegments.add(segment);
throw Throwables.propagate(e);
}
count--;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SegmentZNode that = (SegmentZNode) o;
if (!path.equals(that.path)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return path.hashCode();
}
}
} }

View File

@ -20,6 +20,8 @@
package com.metamx.druid.coordination; package com.metamx.druid.coordination;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.initialization.ZkPathsConfig;
@ -29,23 +31,33 @@ import java.io.IOException;
/** /**
* This class has the greatest name ever * This class has the greatest name ever
*/ */
public class MultipleDataSegmentAnnouncerDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer public class MultipleDataSegmentAnnouncerDataSegmentAnnouncer implements DataSegmentAnnouncer
{ {
private final Iterable<DataSegmentAnnouncer> dataSegmentAnnouncers; private final Iterable<AbstractDataSegmentAnnouncer> dataSegmentAnnouncers;
public MultipleDataSegmentAnnouncerDataSegmentAnnouncer( public MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
DruidServerMetadata server, Iterable<AbstractDataSegmentAnnouncer> dataSegmentAnnouncers
ZkPathsConfig config,
Announcer announcer,
ObjectMapper jsonMapper,
Iterable<DataSegmentAnnouncer> dataSegmentAnnouncers
) )
{ {
super(server, config, announcer, jsonMapper);
this.dataSegmentAnnouncers = dataSegmentAnnouncers; this.dataSegmentAnnouncers = dataSegmentAnnouncers;
} }
@LifecycleStart
public void start()
{
for (AbstractDataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.start();
}
}
@LifecycleStop
public void stop()
{
for (AbstractDataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.stop();
}
}
@Override @Override
public void announceSegment(DataSegment segment) throws IOException public void announceSegment(DataSegment segment) throws IOException
{ {

View File

@ -1,46 +0,0 @@
package com.metamx.druid.curator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import java.util.Set;
/**
*/
public class SegmentReader
{
private static final Logger log = new Logger(SegmentReader.class);
private final CuratorFramework cf;
private final ObjectMapper jsonMapper;
public SegmentReader(CuratorFramework cf, ObjectMapper jsonMapper)
{
this.cf = cf;
this.jsonMapper = jsonMapper;
}
public Set<DataSegment> read(String path)
{
try {
if (cf.checkExists().forPath(path) != null) {
return jsonMapper.readValue(
cf.getData().forPath(path), new TypeReference<Set<DataSegment>>()
{
}
);
}
}
catch (Exception e) {
log.error("Unable to read any segment ids from %s", path);
throw Throwables.propagate(e);
}
return Sets.newHashSet();
}
}

View File

@ -42,6 +42,7 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -226,13 +227,14 @@ public class Announcer
boolean created = false; boolean created = false;
synchronized (toAnnounce) { synchronized (toAnnounce) {
if (started) { if (started) {
byte[] oldBytes = subPaths.putIfAbsent(pathAndNode.getNode(), bytes); byte[] oldBytes = subPaths.get(pathAndNode.getNode());
if (oldBytes != null) { if (oldBytes == null) {
throw new IAE("Already announcing[%s], cannot announce it twice.", path); subPaths.put(pathAndNode.getNode(), bytes);
created = true;
} else if (!Arrays.equals(oldBytes, bytes)) {
throw new IAE("Cannot reannounce different values under the same path");
} }
created = true;
} }
} }
@ -261,7 +263,12 @@ public class Announcer
} }
try { try {
updateAnnouncement(path, bytes); byte[] oldBytes = subPaths.get(nodePath);
if (!Arrays.equals(oldBytes, bytes)) {
subPaths.put(nodePath, bytes);
updateAnnouncement(path, bytes);
}
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);

View File

@ -0,0 +1,17 @@
package com.metamx.druid.initialization;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class ZkDataSegmentAnnouncerConfig extends ZkPathsConfig
{
@Config("druid.zk.segmentsPerNode")
@Default("50")
public abstract int getSegmentsPerNode();
@Config("druid.zk.maxNumBytesPerNode")
@Default("512000")
public abstract long getMaxNumBytes();
}

View File

@ -28,14 +28,6 @@ public abstract class ZkPathsConfig
@Config("druid.zk.paths.base") @Config("druid.zk.paths.base")
public abstract String getZkBasePath(); public abstract String getZkBasePath();
@Config("druid.zk.segmentsPerNode")
@Default("50")
public abstract int getSegmentsPerNode();
@Config("druid.zk.maxNumBytesPerNode")
@Default("512000")
public abstract long getMaxNumBytes();
@Config("druid.zk.paths.propertiesPath") @Config("druid.zk.paths.propertiesPath")
public String getPropertiesPath() public String getPropertiesPath()
{ {

View File

@ -19,15 +19,16 @@
package com.metamx.druid.coordination; package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider; import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.SegmentReader;
import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -91,7 +92,7 @@ public class BatchingCuratorDataSegmentAnnouncerTest
"type", "type",
"tier" "tier"
), ),
new ZkPathsConfig() new ZkDataSegmentAnnouncerConfig()
{ {
@Override @Override
public String getZkBasePath() public String getZkBasePath()
@ -112,8 +113,7 @@ public class BatchingCuratorDataSegmentAnnouncerTest
} }
}, },
announcer, announcer,
jsonMapper, jsonMapper
segmentReader
); );
segmentAnnouncer.start(); segmentAnnouncer.start();
@ -187,6 +187,14 @@ public class BatchingCuratorDataSegmentAnnouncerTest
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
} }
@Test
public void testMultipleBatchAnnounce() throws Exception
{
for (int i = 0; i < 10; i++) {
testBatchAnnounce();
}
}
private DataSegment makeSegment(int offset) private DataSegment makeSegment(int offset)
{ {
return DataSegment.builder() return DataSegment.builder()
@ -200,4 +208,34 @@ public class BatchingCuratorDataSegmentAnnouncerTest
.version(new DateTime().toString()) .version(new DateTime().toString())
.build(); .build();
} }
private class SegmentReader
{
private final CuratorFramework cf;
private final ObjectMapper jsonMapper;
public SegmentReader(CuratorFramework cf, ObjectMapper jsonMapper)
{
this.cf = cf;
this.jsonMapper = jsonMapper;
}
public Set<DataSegment> read(String path)
{
try {
if (cf.checkExists().forPath(path) != null) {
return jsonMapper.readValue(
cf.getData().forPath(path), new TypeReference<Set<DataSegment>>()
{
}
);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return Sets.newHashSet();
}
}
} }

View File

@ -43,14 +43,11 @@ public class CuratorInventoryManagerTest extends CuratorTestBase
curator.start(); curator.start();
manager.start(); manager.start();
curator.create().creatingParentsIfNeeded().forPath("/container");
curator.create().creatingParentsIfNeeded().forPath("/inventory/billy");
Assert.assertTrue(Iterables.isEmpty(manager.getInventory())); Assert.assertTrue(Iterables.isEmpty(manager.getInventory()));
CountDownLatch containerLatch = new CountDownLatch(1); CountDownLatch containerLatch = new CountDownLatch(1);
strategy.setNewContainerLatch(containerLatch); strategy.setNewContainerLatch(containerLatch);
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/container/billy", new byte[]{}); curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/container/billy", new byte[]{});
Assert.assertTrue(timing.awaitLatch(containerLatch)); Assert.assertTrue(timing.awaitLatch(containerLatch));
strategy.setNewContainerLatch(null); strategy.setNewContainerLatch(null);
@ -60,7 +57,7 @@ public class CuratorInventoryManagerTest extends CuratorTestBase
CountDownLatch inventoryLatch = new CountDownLatch(2); CountDownLatch inventoryLatch = new CountDownLatch(2);
strategy.setNewInventoryLatch(inventoryLatch); strategy.setNewInventoryLatch(inventoryLatch);
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/1", Ints.toByteArray(100)); curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/1", Ints.toByteArray(100));
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/bob", Ints.toByteArray(2287)); curator.create().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/bob", Ints.toByteArray(2287));
Assert.assertTrue(timing.awaitLatch(inventoryLatch)); Assert.assertTrue(timing.awaitLatch(inventoryLatch));

View File

@ -283,12 +283,6 @@ public class RemoteTaskRunnerTest
jsonMapper, jsonMapper,
new IndexerZkConfig() new IndexerZkConfig()
{ {
@Override
public int getSegmentsPerNode()
{
return 1;
}
@Override @Override
public String getIndexerAnnouncementPath() public String getIndexerAnnouncementPath()
{ {
@ -454,11 +448,5 @@ public class RemoteTaskRunnerTest
{ {
return 1000; return 1000;
} }
@Override
public int getSegmentsPerNode()
{
return 1;
}
} }
} }

View File

@ -39,7 +39,7 @@
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.22.3</metamx.java-util.version> <metamx.java-util.version>0.22.3</metamx.java-util.version>
<apache.curator.version>2.0.2-21-22</apache.curator.version> <apache.curator.version>2.1.0-incubating</apache.curator.version>
</properties> </properties>
<modules> <modules>

View File

@ -230,14 +230,16 @@ public class ZkCoordinator implements DataSegmentChangeHandler
serverManager.loadSegment(segment); serverManager.loadSegment(segment);
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
try { if (!segmentInfoCacheFile.exists()) {
jsonMapper.writeValue(segmentInfoCacheFile, segment); try {
} jsonMapper.writeValue(segmentInfoCacheFile, segment);
catch (IOException e) { }
removeSegment(segment); catch (IOException e) {
throw new SegmentLoadingException( removeSegment(segment);
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile throw new SegmentLoadingException(
); e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
} }
try { try {
@ -260,18 +262,19 @@ public class ZkCoordinator implements DataSegmentChangeHandler
{ {
try { try {
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
serverManager.loadSegment(segment); serverManager.loadSegment(segment);
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
try { if (!segmentInfoCacheFile.exists()) {
jsonMapper.writeValue(segmentInfoCacheFile, segment); try {
} jsonMapper.writeValue(segmentInfoCacheFile, segment);
catch (IOException e) { }
removeSegment(segment); catch (IOException e) {
throw new SegmentLoadingException( removeSegment(segment);
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile throw new SegmentLoadingException(
); e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
} }
} }
@ -305,7 +308,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
announcer.unannounceSegment(segment); announcer.unannounceSegment(segment);
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert("Failed to remove segment") log.makeAlert(e, "Failed to remove segment")
.addData("segment", segment) .addData("segment", segment)
.emit(); .emit();
} }
@ -321,13 +324,12 @@ public class ZkCoordinator implements DataSegmentChangeHandler
if (!segmentInfoCacheFile.delete()) { if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
} }
} }
announcer.unannounceSegments(segments); announcer.unannounceSegments(segments);
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert("Failed to remove segments") log.makeAlert(e, "Failed to remove segments")
.addData("segments", segments) .addData("segments", segments)
.emit(); .emit();
} }

View File

@ -30,7 +30,7 @@ public abstract class ZkCoordinatorConfig
@Config("druid.paths.segmentInfoCache") @Config("druid.paths.segmentInfoCache")
public abstract File getSegmentInfoCacheDirectory(); public abstract File getSegmentInfoCacheDirectory();
@Config("druid.start.segmentCache.enable") @Config("druid.segmentCache.enable")
public boolean isLoadFromSegmentCacheEnabled() public boolean isLoadFromSegmentCacheEnabled()
{ {
return true; return true;

View File

@ -443,17 +443,17 @@ public class DruidMaster
curator, ZKPaths.makePath(zkPaths.getMasterPath(), MASTER_OWNER_NODE), config.getHost() curator, ZKPaths.makePath(zkPaths.getMasterPath(), MASTER_OWNER_NODE), config.getHost()
); );
newLeaderLatch.attachListener( newLeaderLatch.addListener(
new LeaderLatchListener() new LeaderLatchListener()
{ {
@Override @Override
public void becomeMaster() public void isLeader()
{ {
DruidMaster.this.becomeMaster(); DruidMaster.this.becomeMaster();
} }
@Override @Override
public void stopBeingMaster() public void notLeader()
{ {
DruidMaster.this.stopBeingMaster(); DruidMaster.this.stopBeingMaster();
} }

View File

@ -169,18 +169,6 @@ public class DruidSetup
{ {
return zPathBase; return zPathBase;
} }
@Override
public int getSegmentsPerNode()
{
return 50;
}
@Override
public long getMaxNumBytes()
{
return 1000;
}
}; };
try { try {

View File

@ -27,7 +27,6 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.concurrent.Execs; import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.curator.CuratorTestBase; import com.metamx.druid.curator.CuratorTestBase;
import com.metamx.druid.curator.SegmentReader;
import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.initialization.ZkPathsConfig;
@ -92,26 +91,11 @@ public class ZkCoordinatorTest extends CuratorTestBase
{ {
return "/druid"; return "/druid";
} }
@Override
public int getSegmentsPerNode()
{
return 1;
}
@Override
public long getMaxNumBytes()
{
return 1000;
}
}; };
announcer = new BatchingCuratorDataSegmentAnnouncer( announcer = new CuratorDataSegmentAnnouncer(
me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper, new SegmentReader(curator, jsonMapper) me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper
); );
//announcer = new CuratorDataSegmentAnnouncer(
// me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper
//);
zkCoordinator = new ZkCoordinator( zkCoordinator = new ZkCoordinator(
jsonMapper, jsonMapper,

View File

@ -130,18 +130,6 @@ public class DruidMasterTest
{ {
return ""; return "";
} }
@Override
public int getSegmentsPerNode()
{
return 1;
}
@Override
public long getMaxNumBytes()
{
return 1000;
}
}, },
null, null,
databaseSegmentManager, databaseSegmentManager,