Merge pull request #170 from metamx/batch-zk

Enable batched ZK announcements
This commit is contained in:
cheddar 2013-07-09 17:23:47 -07:00
commit 093953c041
21 changed files with 1120 additions and 137 deletions

View File

@ -39,15 +39,19 @@ import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig; import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.ServerView; import com.metamx.druid.client.ServerView;
import com.metamx.druid.concurrent.Execs; import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer;
import com.metamx.druid.coordination.BatchingCuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer;
import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.http.NoopRequestLogger; import com.metamx.druid.http.NoopRequestLogger;
import com.metamx.druid.http.RequestLogger; import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
@ -424,7 +428,20 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s")); final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s"));
lifecycle.addManagedInstance(announcer); lifecycle.addManagedInstance(announcer);
setAnnouncer(new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper())); setAnnouncer(
new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Arrays.<AbstractDataSegmentAnnouncer>asList(
new BatchingCuratorDataSegmentAnnouncer(
getDruidServerMetadata(),
getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class),
announcer,
getJsonMapper()
),
new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper())
)
)
);
lifecycle.addManagedInstance(getAnnouncer(), Lifecycle.Stage.LAST); lifecycle.addManagedInstance(getAnnouncer(), Lifecycle.Stage.LAST);
} }
} }

View File

@ -0,0 +1,100 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths;
/**
*/
public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnouncer
{
private static final Logger log = new Logger(AbstractDataSegmentAnnouncer.class);
private final DruidServerMetadata server;
private final ZkPathsConfig config;
private final Announcer announcer;
private final ObjectMapper jsonMapper;
private final Object lock = new Object();
private volatile boolean started = false;
protected AbstractDataSegmentAnnouncer(
DruidServerMetadata server,
ZkPathsConfig config,
Announcer announcer,
ObjectMapper jsonMapper
)
{
this.server = server;
this.config = config;
this.announcer = announcer;
this.jsonMapper = jsonMapper;
}
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
try {
final String path = makeAnnouncementPath();
log.info("Announcing self[%s] at [%s]", server, path);
announcer.announce(path, jsonMapper.writeValueAsBytes(server));
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config);
announcer.unannounce(makeAnnouncementPath());
started = false;
}
}
private String makeAnnouncementPath()
{
return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName());
}
}

View File

@ -0,0 +1,297 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{
private static final Logger log = new Logger(BatchingCuratorDataSegmentAnnouncer.class);
private final ZkDataSegmentAnnouncerConfig config;
private final Announcer announcer;
private final ObjectMapper jsonMapper;
private final String liveSegmentLocation;
private final Set<SegmentZNode> availableZNodes = Sets.newHashSet();
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap();
public BatchingCuratorDataSegmentAnnouncer(
DruidServerMetadata server,
ZkDataSegmentAnnouncerConfig config,
Announcer announcer,
ObjectMapper jsonMapper
)
{
super(server, config, announcer, jsonMapper);
this.config = config;
this.announcer = announcer;
this.jsonMapper = jsonMapper;
this.liveSegmentLocation = ZKPaths.makePath(config.getLiveSegmentsPath(), server.getName());
}
@Override
public void announceSegment(DataSegment segment) throws IOException
{
int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
if (newBytesLen > config.getMaxNumBytes()) {
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
}
// create new batch
if (availableZNodes.isEmpty()) {
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
availableZNodes.add(availableZNode);
} else { // update existing batch
Iterator<SegmentZNode> iter = availableZNodes.iterator();
boolean done = false;
while (iter.hasNext() && !done) {
SegmentZNode availableZNode = iter.next();
if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.update(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
availableZNodes.remove(availableZNode);
}
done = true;
}
}
}
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
final SegmentZNode segmentZNode = segmentLookup.remove(segment);
segmentZNode.removeSegment(segment);
log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
if (segmentZNode.getCount() == 0) {
availableZNodes.remove(segmentZNode);
announcer.unannounce(segmentZNode.getPath());
} else {
announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
availableZNodes.add(segmentZNode);
}
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
Set<DataSegment> batch = Sets.newHashSet();
int byteSize = 0;
int count = 0;
for (DataSegment segment : segments) {
int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
if (newBytesLen > config.getMaxNumBytes()) {
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
}
if (count >= config.getSegmentsPerNode() || byteSize + newBytesLen > config.getMaxNumBytes()) {
segmentZNode.addSegments(batch);
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
batch = Sets.newHashSet();
count = 0;
byteSize = 0;
}
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
segmentLookup.put(segment, segmentZNode);
batch.add(segment);
count++;
byteSize += newBytesLen;
}
segmentZNode.addSegments(batch);
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
unannounceSegment(segment);
}
}
private String makeServedSegmentPath(String zNode)
{
return ZKPaths.makePath(liveSegmentLocation, zNode);
}
private class SegmentZNode
{
private final String path;
private byte[] bytes = new byte[]{};
private int count = 0;
public SegmentZNode(String path)
{
this.path = path;
}
public String getPath()
{
return path;
}
public int getCount()
{
return count;
}
public byte[] getBytes()
{
return bytes;
}
public Set<DataSegment> getSegments()
{
if (bytes.length == 0) {
return Sets.newHashSet();
}
try {
return jsonMapper.readValue(
bytes, new TypeReference<Set<DataSegment>>()
{
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public void addSegment(DataSegment segment)
{
Set<DataSegment> zkSegments = getSegments();
zkSegments.add(segment);
try {
bytes = jsonMapper.writeValueAsBytes(zkSegments);
}
catch (Exception e) {
zkSegments.remove(segment);
throw Throwables.propagate(e);
}
count++;
}
public void addSegments(Set<DataSegment> segments)
{
Set<DataSegment> zkSegments = getSegments();
zkSegments.addAll(segments);
try {
bytes = jsonMapper.writeValueAsBytes(zkSegments);
}
catch (Exception e) {
zkSegments.removeAll(segments);
throw Throwables.propagate(e);
}
count += segments.size();
}
public void removeSegment(DataSegment segment)
{
Set<DataSegment> zkSegments = getSegments();
zkSegments.remove(segment);
try {
bytes = jsonMapper.writeValueAsBytes(zkSegments);
}
catch (Exception e) {
zkSegments.add(segment);
throw Throwables.propagate(e);
}
count--;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SegmentZNode that = (SegmentZNode) o;
if (!path.equals(that.path)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return path.hashCode();
}
}
}

View File

@ -19,11 +19,7 @@
package com.metamx.druid.coordination; package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.curator.announcement.Announcer;
@ -32,20 +28,14 @@ import org.apache.curator.utils.ZKPaths;
import java.io.IOException; import java.io.IOException;
public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer public class CuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{ {
private static final Logger log = new Logger(CuratorDataSegmentAnnouncer.class); private static final Logger log = new Logger(CuratorDataSegmentAnnouncer.class);
private final Object lock = new Object();
private final DruidServerMetadata server;
private final ZkPathsConfig config;
private final Announcer announcer; private final Announcer announcer;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final String servedSegmentsLocation; private final String servedSegmentsLocation;
private volatile boolean started = false;
public CuratorDataSegmentAnnouncer( public CuratorDataSegmentAnnouncer(
DruidServerMetadata server, DruidServerMetadata server,
ZkPathsConfig config, ZkPathsConfig config,
@ -53,49 +43,13 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
ObjectMapper jsonMapper ObjectMapper jsonMapper
) )
{ {
this.server = server; super(server, config, announcer, jsonMapper);
this.config = config;
this.announcer = announcer; this.announcer = announcer;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsPath(), server.getName()); this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsPath(), server.getName());
} }
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
try {
final String path = makeAnnouncementPath();
log.info("Announcing self[%s] at [%s]", server, path);
announcer.announce(path, jsonMapper.writeValueAsBytes(server));
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config);
announcer.unannounce(makeAnnouncementPath());
started = false;
}
}
public void announceSegment(DataSegment segment) throws IOException public void announceSegment(DataSegment segment) throws IOException
{ {
final String path = makeServedSegmentPath(segment); final String path = makeServedSegmentPath(segment);
@ -110,8 +64,20 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
announcer.unannounce(path); announcer.unannounce(path);
} }
private String makeAnnouncementPath() { @Override
return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName()); public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
announceSegment(segment);
}
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
unannounceSegment(segment);
}
} }
private String makeServedSegmentPath(DataSegment segment) private String makeServedSegmentPath(DataSegment segment)

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination; package com.metamx.druid.coordination;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
@ -7,5 +26,10 @@ import java.io.IOException;
public interface DataSegmentAnnouncer public interface DataSegmentAnnouncer
{ {
public void announceSegment(DataSegment segment) throws IOException; public void announceSegment(DataSegment segment) throws IOException;
public void unannounceSegment(DataSegment segment) throws IOException; public void unannounceSegment(DataSegment segment) throws IOException;
public void announceSegments(Iterable<DataSegment> segments) throws IOException;
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException;
} }

View File

@ -80,7 +80,7 @@ public class DruidServerMetadata
@Override @Override
public String toString() public String toString()
{ {
return "DruidServer{" + return "DruidServerMetadata{" +
"name='" + name + '\'' + "name='" + name + '\'' +
", host='" + host + '\'' + ", host='" + host + '\'' +
", maxSize=" + maxSize + ", maxSize=" + maxSize +

View File

@ -0,0 +1,92 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkPathsConfig;
import java.io.IOException;
/**
* This class has the greatest name ever
*/
public class MultipleDataSegmentAnnouncerDataSegmentAnnouncer implements DataSegmentAnnouncer
{
private final Iterable<AbstractDataSegmentAnnouncer> dataSegmentAnnouncers;
public MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Iterable<AbstractDataSegmentAnnouncer> dataSegmentAnnouncers
)
{
this.dataSegmentAnnouncers = dataSegmentAnnouncers;
}
@LifecycleStart
public void start()
{
for (AbstractDataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.start();
}
}
@LifecycleStop
public void stop()
{
for (AbstractDataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.stop();
}
}
@Override
public void announceSegment(DataSegment segment) throws IOException
{
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.announceSegment(segment);
}
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.unannounceSegment(segment);
}
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.announceSegments(segments);
}
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.unannounceSegments(segments);
}
}
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.curator.announcement; package com.metamx.druid.curator.announcement;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
@ -6,6 +25,7 @@ import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
@ -21,7 +41,9 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -99,7 +121,7 @@ public class Announcer
* Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node * Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node
* and monitor it to make sure that it always exists until it is unannounced or this object is closed. * and monitor it to make sure that it always exists until it is unannounced or this object is closed.
* *
* @param path The path to announce at * @param path The path to announce at
* @param bytes The payload to announce * @param bytes The payload to announce
*/ */
public void announce(String path, byte[] bytes) public void announce(String path, byte[] bytes)
@ -127,7 +149,7 @@ public class Announcer
// Synchronize to make sure that I only create a listener once. // Synchronize to make sure that I only create a listener once.
synchronized (finalSubPaths) { synchronized (finalSubPaths) {
if (! listeners.containsKey(parentPath)) { if (!listeners.containsKey(parentPath)) {
final PathChildrenCache cache = factory.make(curator, parentPath); final PathChildrenCache cache = factory.make(curator, parentPath);
cache.getListenable().addListener( cache.getListenable().addListener(
new PathChildrenCacheListener() new PathChildrenCacheListener()
@ -208,11 +230,11 @@ public class Announcer
if (started) { if (started) {
byte[] oldBytes = subPaths.putIfAbsent(pathAndNode.getNode(), bytes); byte[] oldBytes = subPaths.putIfAbsent(pathAndNode.getNode(), bytes);
if (oldBytes != null) { if (oldBytes == null) {
throw new IAE("Already announcing[%s], cannot announce it twice.", path); created = true;
} else if (!Arrays.equals(oldBytes, bytes)) {
throw new IAE("Cannot reannounce different values under the same path");
} }
created = true;
} }
} }
@ -226,15 +248,48 @@ public class Announcer
} }
} }
public void update(final String path, final byte[] bytes)
{
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath();
final String nodePath = pathAndNode.getNode();
ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
if (subPaths == null || subPaths.get(nodePath) == null) {
throw new ISE("Cannot update a path[%s] that hasn't been announced!", path);
}
synchronized (subPaths) {
try {
byte[] oldBytes = subPaths.get(nodePath);
if (!Arrays.equals(oldBytes, bytes)) {
subPaths.put(nodePath, bytes);
updateAnnouncement(path, bytes);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
private String createAnnouncement(final String path, byte[] value) throws Exception private String createAnnouncement(final String path, byte[] value) throws Exception
{ {
return curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path, value); return curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path, value);
} }
private Stat updateAnnouncement(final String path, final byte[] value) throws Exception
{
return curator.setData().compressed().inBackground().forPath(path, value);
}
/** /**
* Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer * Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer
* will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions. * will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions.
* * <p/>
* If you need to completely clear all the state of what is being watched and announced, stop() the Announcer. * If you need to completely clear all the state of what is being watched and announced, stop() the Announcer.
* *
* @param path * @param path

View File

@ -0,0 +1,17 @@
package com.metamx.druid.initialization;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class ZkDataSegmentAnnouncerConfig extends ZkPathsConfig
{
@Config("druid.zk.segmentsPerNode")
@Default("50")
public abstract int getSegmentsPerNode();
@Config("druid.zk.maxNumBytesPerNode")
@Default("512000")
public abstract long getMaxNumBytes();
}

View File

@ -21,6 +21,7 @@ package com.metamx.druid.initialization;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.skife.config.Config; import org.skife.config.Config;
import org.skife.config.Default;
public abstract class ZkPathsConfig public abstract class ZkPathsConfig
{ {
@ -45,6 +46,12 @@ public abstract class ZkPathsConfig
return defaultPath("servedSegments"); return defaultPath("servedSegments");
} }
@Config("druid.zk.paths.liveSegmentsPath")
public String getLiveSegmentsPath()
{
return defaultPath("segments");
}
@Config("druid.zk.paths.loadQueuePath") @Config("druid.zk.paths.loadQueuePath")
public String getLoadQueuePath() public String getLoadQueuePath()
{ {

View File

@ -0,0 +1,241 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
*/
public class BatchingCuratorDataSegmentAnnouncerTest
{
private static final String testBasePath = "/test";
private static final String testSegmentsPath = "/test/segments/id";
private static final Joiner joiner = Joiner.on("/");
private TestingCluster testingCluster;
private CuratorFramework cf;
private ObjectMapper jsonMapper;
private Announcer announcer;
private SegmentReader segmentReader;
private BatchingCuratorDataSegmentAnnouncer segmentAnnouncer;
private Set<DataSegment> testSegments;
@Before
public void setUp() throws Exception
{
testingCluster = new TestingCluster(1);
testingCluster.start();
cf = CuratorFrameworkFactory.builder()
.connectString(testingCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(new PotentiallyGzippedCompressionProvider(false))
.build();
cf.start();
cf.create().creatingParentsIfNeeded().forPath(testBasePath);
jsonMapper = new DefaultObjectMapper();
announcer = new Announcer(
cf,
MoreExecutors.sameThreadExecutor()
);
announcer.start();
segmentReader = new SegmentReader(cf, jsonMapper);
segmentAnnouncer = new BatchingCuratorDataSegmentAnnouncer(
new DruidServerMetadata(
"id",
"host",
Long.MAX_VALUE,
"type",
"tier"
),
new ZkDataSegmentAnnouncerConfig()
{
@Override
public String getZkBasePath()
{
return testBasePath;
}
@Override
public int getSegmentsPerNode()
{
return 50;
}
@Override
public long getMaxNumBytes()
{
return 100000;
}
},
announcer,
jsonMapper
);
segmentAnnouncer.start();
testSegments = Sets.newHashSet();
for (int i = 0; i < 100; i++) {
testSegments.add(makeSegment(i));
}
}
@After
public void tearDown() throws Exception
{
segmentAnnouncer.stop();
announcer.stop();
cf.close();
testingCluster.stop();
}
@Test
public void testSingleAnnounce() throws Exception
{
Iterator<DataSegment> segIter = testSegments.iterator();
DataSegment firstSegment = segIter.next();
DataSegment secondSegment = segIter.next();
segmentAnnouncer.announceSegment(firstSegment);
List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
for (String zNode : zNodes) {
Set<DataSegment> segments = segmentReader.read(joiner.join(testSegmentsPath, zNode));
Assert.assertEquals(segments.iterator().next(), firstSegment);
}
segmentAnnouncer.announceSegment(secondSegment);
for (String zNode : zNodes) {
Set<DataSegment> segments = segmentReader.read(joiner.join(testSegmentsPath, zNode));
Assert.assertEquals(Sets.newHashSet(firstSegment, secondSegment), segments);
}
segmentAnnouncer.unannounceSegment(firstSegment);
for (String zNode : zNodes) {
Set<DataSegment> segments = segmentReader.read(joiner.join(testSegmentsPath, zNode));
Assert.assertEquals(segments.iterator().next(), secondSegment);
}
segmentAnnouncer.unannounceSegment(secondSegment);
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}
@Test
public void testBatchAnnounce() throws Exception
{
segmentAnnouncer.announceSegments(testSegments);
List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
Assert.assertTrue(zNodes.size() == 2);
Set<DataSegment> allSegments = Sets.newHashSet();
for (String zNode : zNodes) {
allSegments.addAll(segmentReader.read(joiner.join(testSegmentsPath, zNode)));
}
Assert.assertEquals(allSegments, testSegments);
segmentAnnouncer.unannounceSegments(testSegments);
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}
@Test
public void testMultipleBatchAnnounce() throws Exception
{
for (int i = 0; i < 10; i++) {
testBatchAnnounce();
}
}
private DataSegment makeSegment(int offset)
{
return DataSegment.builder()
.dataSource("foo")
.interval(
new Interval(
new DateTime("2013-01-01").plusDays(offset),
new DateTime("2013-01-02").plusDays(offset)
)
)
.version(new DateTime().toString())
.build();
}
private class SegmentReader
{
private final CuratorFramework cf;
private final ObjectMapper jsonMapper;
public SegmentReader(CuratorFramework cf, ObjectMapper jsonMapper)
{
this.cf = cf;
this.jsonMapper = jsonMapper;
}
public Set<DataSegment> read(String path)
{
try {
if (cf.checkExists().forPath(path) != null) {
return jsonMapper.readValue(
cf.getData().forPath(path), new TypeReference<Set<DataSegment>>()
{
}
);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return Sets.newHashSet();
}
}
}

View File

@ -43,14 +43,11 @@ public class CuratorInventoryManagerTest extends CuratorTestBase
curator.start(); curator.start();
manager.start(); manager.start();
curator.create().creatingParentsIfNeeded().forPath("/container");
curator.create().creatingParentsIfNeeded().forPath("/inventory/billy");
Assert.assertTrue(Iterables.isEmpty(manager.getInventory())); Assert.assertTrue(Iterables.isEmpty(manager.getInventory()));
CountDownLatch containerLatch = new CountDownLatch(1); CountDownLatch containerLatch = new CountDownLatch(1);
strategy.setNewContainerLatch(containerLatch); strategy.setNewContainerLatch(containerLatch);
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/container/billy", new byte[]{}); curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/container/billy", new byte[]{});
Assert.assertTrue(timing.awaitLatch(containerLatch)); Assert.assertTrue(timing.awaitLatch(containerLatch));
strategy.setNewContainerLatch(null); strategy.setNewContainerLatch(null);
@ -60,7 +57,7 @@ public class CuratorInventoryManagerTest extends CuratorTestBase
CountDownLatch inventoryLatch = new CountDownLatch(2); CountDownLatch inventoryLatch = new CountDownLatch(2);
strategy.setNewInventoryLatch(inventoryLatch); strategy.setNewInventoryLatch(inventoryLatch);
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/1", Ints.toByteArray(100)); curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/1", Ints.toByteArray(100));
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/bob", Ints.toByteArray(2287)); curator.create().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/bob", Ints.toByteArray(2287));
Assert.assertTrue(timing.awaitLatch(inventoryLatch)); Assert.assertTrue(timing.awaitLatch(inventoryLatch));

View File

@ -142,5 +142,17 @@ public class RealtimeStandaloneMain
{ {
// do nothing // do nothing
} }
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
// do nothing
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
// do nothing
}
} }
} }

View File

@ -30,7 +30,6 @@ import com.metamx.druid.Query;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.index.v1.IndexGranularity; import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.indexing.common.TaskLock; import com.metamx.druid.indexing.common.TaskLock;
import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolbox; import com.metamx.druid.indexing.common.TaskToolbox;
@ -38,6 +37,7 @@ import com.metamx.druid.indexing.common.actions.LockAcquireAction;
import com.metamx.druid.indexing.common.actions.LockListAction; import com.metamx.druid.indexing.common.actions.LockListAction;
import com.metamx.druid.indexing.common.actions.LockReleaseAction; import com.metamx.druid.indexing.common.actions.LockReleaseAction;
import com.metamx.druid.indexing.common.actions.SegmentInsertAction; import com.metamx.druid.indexing.common.actions.SegmentInsertAction;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.query.FinalizeResultsQueryRunner; import com.metamx.druid.query.FinalizeResultsQueryRunner;
import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory; import com.metamx.druid.query.QueryRunnerFactory;
@ -226,6 +226,28 @@ public class RealtimeIndexTask extends AbstractTask
toolbox.getTaskActionClient().submit(new LockReleaseAction(segment.getInterval())); toolbox.getTaskActionClient().submit(new LockReleaseAction(segment.getInterval()));
} }
} }
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval()));
}
toolbox.getSegmentAnnouncer().announceSegments(segments);
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
try {
toolbox.getSegmentAnnouncer().unannounceSegments(segments);
}
finally {
for (DataSegment segment : segments) {
toolbox.getTaskActionClient().submit(new LockReleaseAction(segment.getInterval()));
}
}
}
}; };
// NOTE: getVersion will block if there is lock contention, which will block plumber.getSink // NOTE: getVersion will block if there is lock contention, which will block plumber.getSink

View File

@ -106,9 +106,7 @@ public class TaskMasterLifecycle
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle); Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle);
leaderLifecycle.addManagedInstance(taskConsumer); leaderLifecycle.addManagedInstance(taskConsumer);
if (indexerCoordinatorConfig.isAutoScalingEnabled()) { leaderLifecycle.addManagedInstance(resourceManagementScheduler);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
}
try { try {
leaderLifecycle.start(); leaderLifecycle.start();

View File

@ -55,10 +55,6 @@ import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.RedirectFilter; import com.metamx.druid.http.RedirectFilter;
import com.metamx.druid.http.RedirectInfo; import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.http.StatusServlet; import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.indexing.common.RetryPolicyFactory; import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory; import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
@ -93,12 +89,17 @@ import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.indexing.coordinator.scaling.AutoScalingStrategy; import com.metamx.druid.indexing.coordinator.scaling.AutoScalingStrategy;
import com.metamx.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy; import com.metamx.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy;
import com.metamx.druid.indexing.coordinator.scaling.NoopAutoScalingStrategy; import com.metamx.druid.indexing.coordinator.scaling.NoopAutoScalingStrategy;
import com.metamx.druid.indexing.coordinator.scaling.NoopResourceManagementScheduler;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerConfig; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerConfig;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy; import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy;
import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagmentConfig; import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagmentConfig;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters; import com.metamx.emitter.core.Emitters;
@ -687,53 +688,64 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
private void initializeResourceManagement(final JacksonConfigManager configManager) private void initializeResourceManagement(final JacksonConfigManager configManager)
{ {
if (resourceManagementSchedulerFactory == null) { if (resourceManagementSchedulerFactory == null) {
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory() if (!config.isAutoScalingEnabled()) {
{ resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
@Override
public ResourceManagementScheduler build(TaskRunner runner)
{ {
final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool( @Override
1, public ResourceManagementScheduler build(TaskRunner runner)
new ThreadFactoryBuilder() {
.setDaemon(true) return new NoopResourceManagementScheduler();
.setNameFormat("ScalingExec--%d")
.build()
);
final AtomicReference<WorkerSetupData> workerSetupData = configManager.watch(
WorkerSetupData.CONFIG_KEY, WorkerSetupData.class
);
AutoScalingStrategy strategy;
if (config.getAutoScalingImpl().equalsIgnoreCase("ec2")) {
strategy = new EC2AutoScalingStrategy(
getJsonMapper(),
new AmazonEC2Client(
new BasicAWSCredentials(
PropUtils.getProperty(getProps(), "com.metamx.aws.accessKey"),
PropUtils.getProperty(getProps(), "com.metamx.aws.secretKey")
)
),
getConfigFactory().build(EC2AutoScalingStrategyConfig.class),
workerSetupData
);
} else if (config.getAutoScalingImpl().equalsIgnoreCase("noop")) {
strategy = new NoopAutoScalingStrategy();
} else {
throw new ISE("Invalid strategy implementation: %s", config.getAutoScalingImpl());
} }
};
} else {
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
{
@Override
public ResourceManagementScheduler build(TaskRunner runner)
{
final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("ScalingExec--%d")
.build()
);
final AtomicReference<WorkerSetupData> workerSetupData = configManager.watch(
WorkerSetupData.CONFIG_KEY, WorkerSetupData.class
);
return new ResourceManagementScheduler( AutoScalingStrategy strategy;
runner, if (config.getAutoScalingImpl().equalsIgnoreCase("ec2")) {
new SimpleResourceManagementStrategy( strategy = new EC2AutoScalingStrategy(
strategy, getJsonMapper(),
getConfigFactory().build(SimpleResourceManagmentConfig.class), new AmazonEC2Client(
new BasicAWSCredentials(
PropUtils.getProperty(getProps(), "com.metamx.aws.accessKey"),
PropUtils.getProperty(getProps(), "com.metamx.aws.secretKey")
)
),
getConfigFactory().build(EC2AutoScalingStrategyConfig.class),
workerSetupData workerSetupData
), );
getConfigFactory().build(ResourceManagementSchedulerConfig.class), } else if (config.getAutoScalingImpl().equalsIgnoreCase("noop")) {
scalingScheduledExec strategy = new NoopAutoScalingStrategy();
); } else {
} throw new ISE("Invalid strategy implementation: %s", config.getAutoScalingImpl());
}; }
return new ResourceManagementScheduler(
runner,
new SimpleResourceManagementStrategy(
strategy,
getConfigFactory().build(SimpleResourceManagmentConfig.class),
workerSetupData
),
getConfigFactory().build(ResourceManagementSchedulerConfig.class),
scalingScheduledExec
);
}
};
}
} }
} }

View File

@ -0,0 +1,52 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.coordinator.scaling;
import com.metamx.common.logger.Logger;
/**
*/
public class NoopResourceManagementScheduler extends ResourceManagementScheduler
{
private static final Logger log = new Logger(NoopResourceManagementScheduler.class);
public NoopResourceManagementScheduler()
{
super(null, null, null, null);
}
@Override
public void start()
{
log.info("Autoscaling is disabled.");
}
@Override
public void stop()
{
// do nothing
}
@Override
public ScalingStats getStats()
{
return new ScalingStats(0);
}
}

View File

@ -39,7 +39,7 @@
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.22.3</metamx.java-util.version> <metamx.java-util.version>0.22.3</metamx.java-util.version>
<apache.curator.version>2.0.2-21-22</apache.curator.version> <apache.curator.version>2.1.0-incubating</apache.curator.version>
</properties> </properties>
<modules> <modules>

View File

@ -21,6 +21,7 @@ package com.metamx.druid.coordination;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
@ -37,6 +38,7 @@ import org.apache.curator.utils.ZKPaths;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List;
/** /**
*/ */
@ -103,7 +105,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
loadCache(); if (config.isLoadFromSegmentCacheEnabled()) {
loadCache();
}
loadQueueCache.getListenable().addListener( loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener() new PathChildrenCacheListener()
@ -136,9 +140,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler
} }
log.makeAlert(e, "Segment load/unload: uncaught exception.") log.makeAlert(e, "Segment load/unload: uncaught exception.")
.addData("node", path) .addData("node", path)
.addData("nodeProperties", segment) .addData("nodeProperties", segment)
.emit(); .emit();
} }
break; break;
@ -192,12 +196,14 @@ public class ZkCoordinator implements DataSegmentChangeHandler
return; return;
} }
List<DataSegment> cachedSegments = Lists.newArrayList();
for (File file : baseDir.listFiles()) { for (File file : baseDir.listFiles()) {
log.info("Loading segment cache file [%s]", file); log.info("Loading segment cache file [%s]", file);
try { try {
DataSegment segment = jsonMapper.readValue(file, DataSegment.class); DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
if (serverManager.isSegmentCached(segment)) { if (serverManager.isSegmentCached(segment)) {
addSegment(segment); cachedSegments.add(segment);
//addSegment(segment);
} else { } else {
log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier()); log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier());
@ -213,6 +219,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler
.emit(); .emit();
} }
} }
addSegments(cachedSegments);
} }
@Override @Override
@ -222,14 +230,16 @@ public class ZkCoordinator implements DataSegmentChangeHandler
serverManager.loadSegment(segment); serverManager.loadSegment(segment);
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
try { if (!segmentInfoCacheFile.exists()) {
jsonMapper.writeValue(segmentInfoCacheFile, segment); try {
} jsonMapper.writeValue(segmentInfoCacheFile, segment);
catch (IOException e) { }
removeSegment(segment); catch (IOException e) {
throw new SegmentLoadingException( removeSegment(segment);
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile throw new SegmentLoadingException(
); e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
} }
try { try {
@ -239,14 +249,51 @@ public class ZkCoordinator implements DataSegmentChangeHandler
removeSegment(segment); removeSegment(segment);
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
} }
} }
catch (SegmentLoadingException e) { catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segment for dataSource") log.makeAlert(e, "Failed to load segment for dataSource")
.addData("segment", segment) .addData("segment", segment)
.emit(); .emit();
} }
} }
public void addSegments(Iterable<DataSegment> segments)
{
try {
for (DataSegment segment : segments) {
serverManager.loadSegment(segment);
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
if (!segmentInfoCacheFile.exists()) {
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
removeSegment(segment);
throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
}
}
try {
announcer.announceSegments(segments);
}
catch (IOException e) {
removeSegments(segments);
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments);
}
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segments for dataSource")
.addData("segments", segments)
.emit();
}
}
@Override @Override
public void removeSegment(DataSegment segment) public void removeSegment(DataSegment segment)
{ {
@ -262,8 +309,29 @@ public class ZkCoordinator implements DataSegmentChangeHandler
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Failed to remove segment") log.makeAlert(e, "Failed to remove segment")
.addData("segment", segment) .addData("segment", segment)
.emit(); .emit();
}
}
public void removeSegments(Iterable<DataSegment> segments)
{
try {
for (DataSegment segment : segments) {
serverManager.dropSegment(segment);
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
}
announcer.unannounceSegments(segments);
}
catch (Exception e) {
log.makeAlert(e, "Failed to remove segments")
.addData("segments", segments)
.emit();
} }
} }
} }

View File

@ -29,4 +29,10 @@ public abstract class ZkCoordinatorConfig
{ {
@Config("druid.paths.segmentInfoCache") @Config("druid.paths.segmentInfoCache")
public abstract File getSegmentInfoCacheDirectory(); public abstract File getSegmentInfoCacheDirectory();
@Config("druid.segmentCache.enable")
public boolean isLoadFromSegmentCacheEnabled()
{
return true;
}
} }

View File

@ -443,17 +443,17 @@ public class DruidMaster
curator, ZKPaths.makePath(zkPaths.getMasterPath(), MASTER_OWNER_NODE), config.getHost() curator, ZKPaths.makePath(zkPaths.getMasterPath(), MASTER_OWNER_NODE), config.getHost()
); );
newLeaderLatch.attachListener( newLeaderLatch.addListener(
new LeaderLatchListener() new LeaderLatchListener()
{ {
@Override @Override
public void becomeMaster() public void isLeader()
{ {
DruidMaster.this.becomeMaster(); DruidMaster.this.becomeMaster();
} }
@Override @Override
public void stopBeingMaster() public void notLeader()
{ {
DruidMaster.this.stopBeingMaster(); DruidMaster.this.stopBeingMaster();
} }