diff --git a/.gitignore b/.gitignore index 31985c78113..845a2ddc007 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ target examples/rand/RealtimeNode.out examples/twitter/RealtimeNode.out *.log +*.DS_Store diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java index d01032817f7..c715de91f97 100644 --- a/client/src/main/java/com/metamx/druid/QueryableNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -39,15 +39,19 @@ import com.metamx.druid.client.ServerInventoryView; import com.metamx.druid.client.ServerInventoryViewConfig; import com.metamx.druid.client.ServerView; import com.metamx.druid.concurrent.Execs; +import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer; +import com.metamx.druid.coordination.BatchingCuratorDataSegmentAnnouncer; import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DruidServerMetadata; +import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer; import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.http.NoopRequestLogger; import com.metamx.druid.http.RequestLogger; import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; +import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig; import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; @@ -424,7 +428,20 @@ public abstract class QueryableNode extends Registering final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s")); lifecycle.addManagedInstance(announcer); - setAnnouncer(new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper())); + setAnnouncer( + new MultipleDataSegmentAnnouncerDataSegmentAnnouncer( + Arrays.asList( + new BatchingCuratorDataSegmentAnnouncer( + getDruidServerMetadata(), + getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class), + announcer, + getJsonMapper() + ), + new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper()) + ) + ) + ); + lifecycle.addManagedInstance(getAnnouncer(), Lifecycle.Stage.LAST); } } diff --git a/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java new file mode 100644 index 00000000000..583910e3dac --- /dev/null +++ b/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java @@ -0,0 +1,100 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.coordination; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import com.metamx.druid.curator.announcement.Announcer; +import com.metamx.druid.initialization.ZkPathsConfig; +import org.apache.curator.utils.ZKPaths; + +/** + */ +public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnouncer +{ + private static final Logger log = new Logger(AbstractDataSegmentAnnouncer.class); + + private final DruidServerMetadata server; + private final ZkPathsConfig config; + private final Announcer announcer; + private final ObjectMapper jsonMapper; + + private final Object lock = new Object(); + + private volatile boolean started = false; + + protected AbstractDataSegmentAnnouncer( + DruidServerMetadata server, + ZkPathsConfig config, + Announcer announcer, + ObjectMapper jsonMapper + ) + { + this.server = server; + this.config = config; + this.announcer = announcer; + this.jsonMapper = jsonMapper; + } + + @LifecycleStart + public void start() + { + synchronized (lock) { + if (started) { + return; + } + + try { + final String path = makeAnnouncementPath(); + log.info("Announcing self[%s] at [%s]", server, path); + announcer.announce(path, jsonMapper.writeValueAsBytes(server)); + } + catch (JsonProcessingException e) { + throw Throwables.propagate(e); + } + + started = true; + } + } + + @LifecycleStop + public void stop() + { + synchronized (lock) { + if (!started) { + return; + } + + log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config); + announcer.unannounce(makeAnnouncementPath()); + + started = false; + } + } + + private String makeAnnouncementPath() + { + return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName()); + } +} diff --git a/client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java new file mode 100644 index 00000000000..e8070e71ff7 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java @@ -0,0 +1,297 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.coordination; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.curator.announcement.Announcer; +import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig; +import com.metamx.druid.initialization.ZkPathsConfig; +import org.apache.curator.utils.ZKPaths; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + */ +public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer +{ + private static final Logger log = new Logger(BatchingCuratorDataSegmentAnnouncer.class); + + private final ZkDataSegmentAnnouncerConfig config; + private final Announcer announcer; + private final ObjectMapper jsonMapper; + private final String liveSegmentLocation; + + private final Set availableZNodes = Sets.newHashSet(); + private final Map segmentLookup = Maps.newHashMap(); + + public BatchingCuratorDataSegmentAnnouncer( + DruidServerMetadata server, + ZkDataSegmentAnnouncerConfig config, + Announcer announcer, + ObjectMapper jsonMapper + ) + { + super(server, config, announcer, jsonMapper); + + this.config = config; + this.announcer = announcer; + this.jsonMapper = jsonMapper; + this.liveSegmentLocation = ZKPaths.makePath(config.getLiveSegmentsPath(), server.getName()); + } + + @Override + public void announceSegment(DataSegment segment) throws IOException + { + int newBytesLen = jsonMapper.writeValueAsBytes(segment).length; + if (newBytesLen > config.getMaxNumBytes()) { + throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes()); + } + + // create new batch + if (availableZNodes.isEmpty()) { + SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString())); + availableZNode.addSegment(segment); + + log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath()); + announcer.announce(availableZNode.getPath(), availableZNode.getBytes()); + segmentLookup.put(segment, availableZNode); + availableZNodes.add(availableZNode); + } else { // update existing batch + Iterator iter = availableZNodes.iterator(); + boolean done = false; + while (iter.hasNext() && !done) { + SegmentZNode availableZNode = iter.next(); + if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) { + availableZNode.addSegment(segment); + + log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath()); + announcer.update(availableZNode.getPath(), availableZNode.getBytes()); + segmentLookup.put(segment, availableZNode); + + if (availableZNode.getCount() >= config.getSegmentsPerNode()) { + availableZNodes.remove(availableZNode); + } + + done = true; + } + } + } + } + + @Override + public void unannounceSegment(DataSegment segment) throws IOException + { + final SegmentZNode segmentZNode = segmentLookup.remove(segment); + segmentZNode.removeSegment(segment); + + log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath()); + if (segmentZNode.getCount() == 0) { + availableZNodes.remove(segmentZNode); + announcer.unannounce(segmentZNode.getPath()); + } else { + announcer.update(segmentZNode.getPath(), segmentZNode.getBytes()); + availableZNodes.add(segmentZNode); + } + } + + @Override + public void announceSegments(Iterable segments) throws IOException + { + SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString())); + Set batch = Sets.newHashSet(); + int byteSize = 0; + int count = 0; + + for (DataSegment segment : segments) { + int newBytesLen = jsonMapper.writeValueAsBytes(segment).length; + + if (newBytesLen > config.getMaxNumBytes()) { + throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes()); + } + + if (count >= config.getSegmentsPerNode() || byteSize + newBytesLen > config.getMaxNumBytes()) { + segmentZNode.addSegments(batch); + announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes()); + + segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString())); + batch = Sets.newHashSet(); + count = 0; + byteSize = 0; + } + + log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath()); + segmentLookup.put(segment, segmentZNode); + batch.add(segment); + count++; + byteSize += newBytesLen; + } + + segmentZNode.addSegments(batch); + announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes()); + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + for (DataSegment segment : segments) { + unannounceSegment(segment); + } + } + + private String makeServedSegmentPath(String zNode) + { + return ZKPaths.makePath(liveSegmentLocation, zNode); + } + + private class SegmentZNode + { + private final String path; + + private byte[] bytes = new byte[]{}; + private int count = 0; + + public SegmentZNode(String path) + { + this.path = path; + } + + public String getPath() + { + return path; + } + + public int getCount() + { + return count; + } + + public byte[] getBytes() + { + return bytes; + } + + public Set getSegments() + { + if (bytes.length == 0) { + return Sets.newHashSet(); + } + try { + return jsonMapper.readValue( + bytes, new TypeReference>() + { + } + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + public void addSegment(DataSegment segment) + { + Set zkSegments = getSegments(); + zkSegments.add(segment); + + try { + bytes = jsonMapper.writeValueAsBytes(zkSegments); + } + catch (Exception e) { + zkSegments.remove(segment); + throw Throwables.propagate(e); + } + + count++; + } + + public void addSegments(Set segments) + { + Set zkSegments = getSegments(); + zkSegments.addAll(segments); + + try { + bytes = jsonMapper.writeValueAsBytes(zkSegments); + } + catch (Exception e) { + zkSegments.removeAll(segments); + throw Throwables.propagate(e); + } + + count += segments.size(); + } + + public void removeSegment(DataSegment segment) + { + Set zkSegments = getSegments(); + zkSegments.remove(segment); + + try { + bytes = jsonMapper.writeValueAsBytes(zkSegments); + } + catch (Exception e) { + zkSegments.add(segment); + throw Throwables.propagate(e); + } + + count--; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SegmentZNode that = (SegmentZNode) o; + + if (!path.equals(that.path)) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return path.hashCode(); + } + } +} diff --git a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java index a7efc6cb9e8..f9286d1a239 100644 --- a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java @@ -19,11 +19,7 @@ package com.metamx.druid.coordination; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; -import com.metamx.common.lifecycle.LifecycleStart; -import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.curator.announcement.Announcer; @@ -32,20 +28,14 @@ import org.apache.curator.utils.ZKPaths; import java.io.IOException; -public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer +public class CuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer { private static final Logger log = new Logger(CuratorDataSegmentAnnouncer.class); - private final Object lock = new Object(); - - private final DruidServerMetadata server; - private final ZkPathsConfig config; private final Announcer announcer; private final ObjectMapper jsonMapper; private final String servedSegmentsLocation; - private volatile boolean started = false; - public CuratorDataSegmentAnnouncer( DruidServerMetadata server, ZkPathsConfig config, @@ -53,49 +43,13 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer ObjectMapper jsonMapper ) { - this.server = server; - this.config = config; + super(server, config, announcer, jsonMapper); + this.announcer = announcer; this.jsonMapper = jsonMapper; this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsPath(), server.getName()); } - @LifecycleStart - public void start() - { - synchronized (lock) { - if (started) { - return; - } - - try { - final String path = makeAnnouncementPath(); - log.info("Announcing self[%s] at [%s]", server, path); - announcer.announce(path, jsonMapper.writeValueAsBytes(server)); - } - catch (JsonProcessingException e) { - throw Throwables.propagate(e); - } - - started = true; - } - } - - @LifecycleStop - public void stop() - { - synchronized (lock) { - if (!started) { - return; - } - - log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config); - announcer.unannounce(makeAnnouncementPath()); - - started = false; - } - } - public void announceSegment(DataSegment segment) throws IOException { final String path = makeServedSegmentPath(segment); @@ -110,8 +64,20 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer announcer.unannounce(path); } - private String makeAnnouncementPath() { - return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName()); + @Override + public void announceSegments(Iterable segments) throws IOException + { + for (DataSegment segment : segments) { + announceSegment(segment); + } + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + for (DataSegment segment : segments) { + unannounceSegment(segment); + } } private String makeServedSegmentPath(DataSegment segment) diff --git a/client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java index 699c4b1e8ce..71eaaa37276 100644 --- a/client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.coordination; import com.metamx.druid.client.DataSegment; @@ -7,5 +26,10 @@ import java.io.IOException; public interface DataSegmentAnnouncer { public void announceSegment(DataSegment segment) throws IOException; + public void unannounceSegment(DataSegment segment) throws IOException; + + public void announceSegments(Iterable segments) throws IOException; + + public void unannounceSegments(Iterable segments) throws IOException; } diff --git a/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java b/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java index 25c9c9875e8..232df085e07 100644 --- a/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java +++ b/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java @@ -80,7 +80,7 @@ public class DruidServerMetadata @Override public String toString() { - return "DruidServer{" + + return "DruidServerMetadata{" + "name='" + name + '\'' + ", host='" + host + '\'' + ", maxSize=" + maxSize + diff --git a/client/src/main/java/com/metamx/druid/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java new file mode 100644 index 00000000000..053a3d85fac --- /dev/null +++ b/client/src/main/java/com/metamx/druid/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java @@ -0,0 +1,92 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.coordination; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.curator.announcement.Announcer; +import com.metamx.druid.initialization.ZkPathsConfig; + +import java.io.IOException; + +/** + * This class has the greatest name ever + */ +public class MultipleDataSegmentAnnouncerDataSegmentAnnouncer implements DataSegmentAnnouncer +{ + private final Iterable dataSegmentAnnouncers; + + public MultipleDataSegmentAnnouncerDataSegmentAnnouncer( + Iterable dataSegmentAnnouncers + ) + { + this.dataSegmentAnnouncers = dataSegmentAnnouncers; + } + + @LifecycleStart + public void start() + { + for (AbstractDataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { + dataSegmentAnnouncer.start(); + } + } + + @LifecycleStop + public void stop() + { + for (AbstractDataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { + dataSegmentAnnouncer.stop(); + } + } + + @Override + public void announceSegment(DataSegment segment) throws IOException + { + for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { + dataSegmentAnnouncer.announceSegment(segment); + } + } + + @Override + public void unannounceSegment(DataSegment segment) throws IOException + { + for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { + dataSegmentAnnouncer.unannounceSegment(segment); + } + } + + @Override + public void announceSegments(Iterable segments) throws IOException + { + for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { + dataSegmentAnnouncer.announceSegments(segments); + } + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { + dataSegmentAnnouncer.unannounceSegments(segments); + } + } +} diff --git a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java index f7c074938c7..57981f91785 100644 --- a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java +++ b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.curator.announcement; import com.google.common.base.Throwables; @@ -6,6 +25,7 @@ import com.google.common.collect.MapMaker; import com.google.common.collect.Sets; import com.google.common.io.Closeables; import com.metamx.common.IAE; +import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; @@ -21,7 +41,9 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; @@ -99,7 +121,7 @@ public class Announcer * Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node * and monitor it to make sure that it always exists until it is unannounced or this object is closed. * - * @param path The path to announce at + * @param path The path to announce at * @param bytes The payload to announce */ public void announce(String path, byte[] bytes) @@ -127,7 +149,7 @@ public class Announcer // Synchronize to make sure that I only create a listener once. synchronized (finalSubPaths) { - if (! listeners.containsKey(parentPath)) { + if (!listeners.containsKey(parentPath)) { final PathChildrenCache cache = factory.make(curator, parentPath); cache.getListenable().addListener( new PathChildrenCacheListener() @@ -208,11 +230,11 @@ public class Announcer if (started) { byte[] oldBytes = subPaths.putIfAbsent(pathAndNode.getNode(), bytes); - if (oldBytes != null) { - throw new IAE("Already announcing[%s], cannot announce it twice.", path); + if (oldBytes == null) { + created = true; + } else if (!Arrays.equals(oldBytes, bytes)) { + throw new IAE("Cannot reannounce different values under the same path"); } - - created = true; } } @@ -226,15 +248,48 @@ public class Announcer } } + public void update(final String path, final byte[] bytes) + { + final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); + + final String parentPath = pathAndNode.getPath(); + final String nodePath = pathAndNode.getNode(); + + ConcurrentMap subPaths = announcements.get(parentPath); + + if (subPaths == null || subPaths.get(nodePath) == null) { + throw new ISE("Cannot update a path[%s] that hasn't been announced!", path); + } + + synchronized (subPaths) { + try { + byte[] oldBytes = subPaths.get(nodePath); + + if (!Arrays.equals(oldBytes, bytes)) { + subPaths.put(nodePath, bytes); + updateAnnouncement(path, bytes); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + private String createAnnouncement(final String path, byte[] value) throws Exception { return curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path, value); } + private Stat updateAnnouncement(final String path, final byte[] value) throws Exception + { + return curator.setData().compressed().inBackground().forPath(path, value); + } + /** * Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer * will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions. - * + *

* If you need to completely clear all the state of what is being watched and announced, stop() the Announcer. * * @param path diff --git a/client/src/main/java/com/metamx/druid/initialization/ZkDataSegmentAnnouncerConfig.java b/client/src/main/java/com/metamx/druid/initialization/ZkDataSegmentAnnouncerConfig.java new file mode 100644 index 00000000000..2ff9f9172ca --- /dev/null +++ b/client/src/main/java/com/metamx/druid/initialization/ZkDataSegmentAnnouncerConfig.java @@ -0,0 +1,17 @@ +package com.metamx.druid.initialization; + +import org.skife.config.Config; +import org.skife.config.Default; + +/** + */ +public abstract class ZkDataSegmentAnnouncerConfig extends ZkPathsConfig +{ + @Config("druid.zk.segmentsPerNode") + @Default("50") + public abstract int getSegmentsPerNode(); + + @Config("druid.zk.maxNumBytesPerNode") + @Default("512000") + public abstract long getMaxNumBytes(); +} diff --git a/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java b/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java index 2f04e61b309..065829fd9ea 100644 --- a/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java @@ -21,6 +21,7 @@ package com.metamx.druid.initialization; import org.apache.curator.utils.ZKPaths; import org.skife.config.Config; +import org.skife.config.Default; public abstract class ZkPathsConfig { @@ -45,6 +46,12 @@ public abstract class ZkPathsConfig return defaultPath("servedSegments"); } + @Config("druid.zk.paths.liveSegmentsPath") + public String getLiveSegmentsPath() + { + return defaultPath("segments"); + } + @Config("druid.zk.paths.loadQueuePath") public String getLoadQueuePath() { diff --git a/client/src/test/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncerTest.java b/client/src/test/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncerTest.java new file mode 100644 index 00000000000..d4390587268 --- /dev/null +++ b/client/src/test/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncerTest.java @@ -0,0 +1,241 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.coordination; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import com.google.common.base.Throwables; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider; +import com.metamx.druid.curator.announcement.Announcer; +import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig; +import com.metamx.druid.jackson.DefaultObjectMapper; +import junit.framework.Assert; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingCluster; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** + */ +public class BatchingCuratorDataSegmentAnnouncerTest +{ + private static final String testBasePath = "/test"; + private static final String testSegmentsPath = "/test/segments/id"; + private static final Joiner joiner = Joiner.on("/"); + + private TestingCluster testingCluster; + private CuratorFramework cf; + private ObjectMapper jsonMapper; + private Announcer announcer; + private SegmentReader segmentReader; + private BatchingCuratorDataSegmentAnnouncer segmentAnnouncer; + private Set testSegments; + + @Before + public void setUp() throws Exception + { + testingCluster = new TestingCluster(1); + testingCluster.start(); + + cf = CuratorFrameworkFactory.builder() + .connectString(testingCluster.getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1, 10)) + .compressionProvider(new PotentiallyGzippedCompressionProvider(false)) + .build(); + cf.start(); + cf.create().creatingParentsIfNeeded().forPath(testBasePath); + + jsonMapper = new DefaultObjectMapper(); + + announcer = new Announcer( + cf, + MoreExecutors.sameThreadExecutor() + ); + announcer.start(); + + segmentReader = new SegmentReader(cf, jsonMapper); + segmentAnnouncer = new BatchingCuratorDataSegmentAnnouncer( + new DruidServerMetadata( + "id", + "host", + Long.MAX_VALUE, + "type", + "tier" + ), + new ZkDataSegmentAnnouncerConfig() + { + @Override + public String getZkBasePath() + { + return testBasePath; + } + + @Override + public int getSegmentsPerNode() + { + return 50; + } + + @Override + public long getMaxNumBytes() + { + return 100000; + } + }, + announcer, + jsonMapper + ); + segmentAnnouncer.start(); + + testSegments = Sets.newHashSet(); + for (int i = 0; i < 100; i++) { + testSegments.add(makeSegment(i)); + } + } + + @After + public void tearDown() throws Exception + { + segmentAnnouncer.stop(); + announcer.stop(); + cf.close(); + testingCluster.stop(); + } + + @Test + public void testSingleAnnounce() throws Exception + { + Iterator segIter = testSegments.iterator(); + DataSegment firstSegment = segIter.next(); + DataSegment secondSegment = segIter.next(); + + segmentAnnouncer.announceSegment(firstSegment); + + List zNodes = cf.getChildren().forPath(testSegmentsPath); + + for (String zNode : zNodes) { + Set segments = segmentReader.read(joiner.join(testSegmentsPath, zNode)); + Assert.assertEquals(segments.iterator().next(), firstSegment); + } + + segmentAnnouncer.announceSegment(secondSegment); + + for (String zNode : zNodes) { + Set segments = segmentReader.read(joiner.join(testSegmentsPath, zNode)); + Assert.assertEquals(Sets.newHashSet(firstSegment, secondSegment), segments); + } + + segmentAnnouncer.unannounceSegment(firstSegment); + + for (String zNode : zNodes) { + Set segments = segmentReader.read(joiner.join(testSegmentsPath, zNode)); + Assert.assertEquals(segments.iterator().next(), secondSegment); + } + + segmentAnnouncer.unannounceSegment(secondSegment); + + Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); + } + + @Test + public void testBatchAnnounce() throws Exception + { + segmentAnnouncer.announceSegments(testSegments); + + List zNodes = cf.getChildren().forPath(testSegmentsPath); + + Assert.assertTrue(zNodes.size() == 2); + + Set allSegments = Sets.newHashSet(); + for (String zNode : zNodes) { + allSegments.addAll(segmentReader.read(joiner.join(testSegmentsPath, zNode))); + } + Assert.assertEquals(allSegments, testSegments); + + segmentAnnouncer.unannounceSegments(testSegments); + + Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); + } + + @Test + public void testMultipleBatchAnnounce() throws Exception + { + for (int i = 0; i < 10; i++) { + testBatchAnnounce(); + } + } + + private DataSegment makeSegment(int offset) + { + return DataSegment.builder() + .dataSource("foo") + .interval( + new Interval( + new DateTime("2013-01-01").plusDays(offset), + new DateTime("2013-01-02").plusDays(offset) + ) + ) + .version(new DateTime().toString()) + .build(); + } + + private class SegmentReader + { + private final CuratorFramework cf; + private final ObjectMapper jsonMapper; + + public SegmentReader(CuratorFramework cf, ObjectMapper jsonMapper) + { + this.cf = cf; + this.jsonMapper = jsonMapper; + } + + public Set read(String path) + { + try { + if (cf.checkExists().forPath(path) != null) { + return jsonMapper.readValue( + cf.getData().forPath(path), new TypeReference>() + { + } + ); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + return Sets.newHashSet(); + } + } +} diff --git a/client/src/test/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerTest.java b/client/src/test/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerTest.java index 2331e430cc2..2d520418824 100644 --- a/client/src/test/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerTest.java +++ b/client/src/test/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerTest.java @@ -43,14 +43,11 @@ public class CuratorInventoryManagerTest extends CuratorTestBase curator.start(); manager.start(); - curator.create().creatingParentsIfNeeded().forPath("/container"); - curator.create().creatingParentsIfNeeded().forPath("/inventory/billy"); - Assert.assertTrue(Iterables.isEmpty(manager.getInventory())); CountDownLatch containerLatch = new CountDownLatch(1); strategy.setNewContainerLatch(containerLatch); - curator.create().withMode(CreateMode.EPHEMERAL).forPath("/container/billy", new byte[]{}); + curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/container/billy", new byte[]{}); Assert.assertTrue(timing.awaitLatch(containerLatch)); strategy.setNewContainerLatch(null); @@ -60,7 +57,7 @@ public class CuratorInventoryManagerTest extends CuratorTestBase CountDownLatch inventoryLatch = new CountDownLatch(2); strategy.setNewInventoryLatch(inventoryLatch); - curator.create().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/1", Ints.toByteArray(100)); + curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/1", Ints.toByteArray(100)); curator.create().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/bob", Ints.toByteArray(2287)); Assert.assertTrue(timing.awaitLatch(inventoryLatch)); diff --git a/common/src/main/java/com/metamx/druid/guava/Runnables.java b/common/src/main/java/com/metamx/druid/guava/Runnables.java index 8c41886d349..118d4938a9d 100644 --- a/common/src/main/java/com/metamx/druid/guava/Runnables.java +++ b/common/src/main/java/com/metamx/druid/guava/Runnables.java @@ -34,4 +34,10 @@ public class Runnables } }; } + + public static Runnable getNoopRunnable(){ + return new Runnable(){ + public void run(){} + }; + } } diff --git a/examples/bin/ec2/env.sh b/examples/bin/ec2/env.sh new file mode 100644 index 00000000000..a0a05719df1 --- /dev/null +++ b/examples/bin/ec2/env.sh @@ -0,0 +1,27 @@ +# Setup Oracle Java +sudo apt-get update +sudo add-apt-repository -y ppa:webupd8team/java +sudo apt-get update + +# Setup yes answer to license question +echo debconf shared/accepted-oracle-license-v1-1 select true | sudo debconf-set-selections +echo debconf shared/accepted-oracle-license-v1-1 seen true | sudo debconf-set-selections +sudo apt-get -y -q install oracle-java7-installer + +# Automated Kafka setup +curl http://static.druid.io/artifacts/kafka-0.7.2-incubating-bin.tar.gz -o /tmp/kafka-0.7.2-incubating-bin.tar.gz +tar -xvzf /tmp/kafka-0.7.2-incubating-bin.tar.gz +cd kafka-0.7.2-incubating-bin +cat config/zookeeper.properties +nohup bin/zookeeper-server-start.sh config/zookeeper.properties 2>&1 > /dev/null & +# in a new console +nohup bin/kafka-server-start.sh config/server.properties 2>&1 > /dev/null & + +# Install dependencies - mysql must be built from source, as the 12.04 apt-get hangs +export DEBIAN_FRONTEND=noninteractive +sudo debconf-set-selections <<< 'mysql-server-5.5 mysql-server/root_password password diurd' +sudo debconf-set-selections <<< 'mysql-server-5.5 mysql-server/root_password_again password diurd' +sudo apt-get -q -y -V --force-yes --reinstall install mysql-server-5.5 + +echo "ALL DONE with druid environment setup! Hit CTRL-C to proceed." +exit 0 diff --git a/examples/bin/ec2/run.sh b/examples/bin/ec2/run.sh new file mode 100644 index 00000000000..933de0a2b42 --- /dev/null +++ b/examples/bin/ec2/run.sh @@ -0,0 +1,22 @@ +# Is localhost expected with multi-node? +mysql -u root -pdiurd -e "GRANT ALL ON druid.* TO 'druid'@'localhost' IDENTIFIED BY 'diurd'; CREATE database druid;" 2>&1 > /dev/null + +tar -xvzf druid-services-*-bin.tar.gz 2>&1 > /dev/null +cd druid-services-* 2>&1 > /dev/null + +mkdir logs 2>&1 > /dev/null + +# Now start a realtime node +nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=config/realtime/realtime.spec -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/realtime com.metamx.druid.realtime.RealtimeMain 2>&1 > logs/realtime.log & + +# And a master node +nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/master com.metamx.druid.http.MasterMain 2>&1 > logs/master.log & + +# And a compute node +nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/compute com.metamx.druid.http.ComputeMain 2>&1 > logs/compute.log & + +# And a broker node +nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/broker com.metamx.druid.http.BrokerMain 2>&1 > logs/broker.log & + +echo "Hit CTRL-C to continue..." +exit 0 diff --git a/examples/bin/run_ec2.sh b/examples/bin/run_ec2.sh new file mode 100755 index 00000000000..4acdde9cc8d --- /dev/null +++ b/examples/bin/run_ec2.sh @@ -0,0 +1,76 @@ +# Before running, you will need to download the EC2 tools from http://aws.amazon.com/developertools/351 +# and then setup your EC2_HOME and PATH variables (or similar): +# +# # Setup environment for ec2-api-tools +# export EC2_HOME=/path/to/ec2-api-tools-1.6.7.4/ +# export PATH=$PATH:$EC2_HOME/bin +# export AWS_ACCESS_KEY= +# export AWS_SECRET_KEY= + +# Check for ec2 commands we require and die if they're missing +type ec2-create-keypair >/dev/null 2>&1 || { echo >&2 "I require ec2-create-keypair but it's not installed. Aborting."; exit 1; } +type ec2-create-group >/dev/null 2>&1 || { echo >&2 "I require ec2-create-group but it's not installed. Aborting."; exit 1; } +type ec2-authorize >/dev/null 2>&1 || { echo >&2 "I require ec2-authorize but it's not installed. Aborting."; exit 1; } +type ec2-run-instances >/dev/null 2>&1 || { echo >&2 "I require ec2-run-instances but it's not installed. Aborting."; exit 1; } +type ec2-describe-instances >/dev/null 2>&1 || { echo >&2 "I require ec2-describe-instances but it's not installed. Aborting."; exit 1; } + +# Create a keypair for our servers +echo "Removing old keypair for druid..." +ec2-delete-keypair druid-keypair +echo "Creating new keypair for druid..." +ec2-create-keypair druid-keypair > druid-keypair +chmod 0600 druid-keypair +mv druid-keypair ~/.ssh/ + +# Create a security group for our servers +echo "Creating a new security group for druid..." +ec2-create-group druid-group -d "Druid Cluster" + +# Create rules that allow necessary services in our group +echo "Creating new firewall rules for druid..." +# SSH from outside +ec2-authorize druid-group -P tcp -p 22 +# Enable all traffic within group +ec2-authorize druid-group -P tcp -p 1-65535 -o druid-group +ec2-authorize druid-group -P udp -p 1-65535 -o druid-group + +echo "Booting a single small instance for druid..." +# Use ami ami-e7582d8e - Alestic Ubuntu 12.04 us-east +INSTANCE_ID=$(ec2-run-instances ami-e7582d8e -n 1 -g druid-group -k druid-keypair --instance-type m1.small| awk '/INSTANCE/{print $2}') +while true; do + sleep 1 + INSTANCE_STATUS=$(ec2-describe-instances|grep INSTANCE|grep $INSTANCE_ID|cut -f6) + if [ $INSTANCE_STATUS == "running" ] + then + echo "Instance $INSTANCE_ID is status $INSTANCE_STATUS..." + break + fi +done + +# Wait for the instance to come up +echo "Waiting 60 seconds for instance $INSTANCE_ID to boot..." +sleep 60 + +# Get hostname and ssh with the key we created, and ssh there +INSTANCE_ADDRESS=`ec2-describe-instances|grep 'INSTANCE'|grep $INSTANCE_ID|cut -f4` +echo "Connecting to $INSTANCE_ADDRESS to prepare environment for druid..." +scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ./ec2/env.sh ubuntu@${INSTANCE_ADDRESS}: +ssh -q -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./env.sh;./env.sh' + +echo "Prepared $INSTANCE_ADDRESS for druid." + +# Now to scp a tarball up that can run druid! +if [ -f ../../services/target/druid-services-*-SNAPSHOT-bin.tar.gz ]; +then + echo "Uploading druid tarball to server..." + scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ../../services/target/druid-services-*-bin.tar.gz ubuntu@${INSTANCE_ADDRESS}: +else + echo "ERROR - package not built!" +fi + +# Now boot druid parts +scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ./ec2/run.sh ubuntu@${INSTANCE_ADDRESS}: +ssh -q -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./run.sh;./run.sh' + +echo "Druid booting complete!" +echo "ssh -i ~/.ssh/druid-keypair ubuntu@${INSTANCE_ADDRESS} #to connect" diff --git a/examples/config/broker/runtime.properties b/examples/config/broker/runtime.properties new file mode 100644 index 00000000000..ede350f861f --- /dev/null +++ b/examples/config/broker/runtime.properties @@ -0,0 +1,43 @@ +druid.host=127.0.0.1 +druid.port=8083 + +com.metamx.emitter.logging=true + +druid.processing.formatString=processing_%s +druid.processing.numThreads=1 +druid.processing.buffer.sizeBytes=10000000 + +#emitting, opaque marker +druid.service=example + +druid.request.logging.dir=/tmp/example/log +druid.realtime.specFile=realtime.spec +com.metamx.emitter.logging=true +com.metamx.emitter.logging.level=info + +# below are dummy values when operating a realtime only node +com.metamx.aws.accessKey=dummy_access_key +com.metamx.aws.secretKey=dummy_secret_key +druid.pusher.s3.bucket=dummy_s3_bucket + +druid.zk.service.host=localhost +druid.server.maxSize=300000000000 +druid.zk.paths.base=/druid +druid.database.segmentTable=prod_segments +druid.database.user=druid +druid.database.password=diurd +druid.database.connectURI=jdbc:mysql://localhost:3306/druid +druid.zk.paths.discoveryPath=/druid/discoveryPath +druid.database.ruleTable=rules +druid.database.configTable=config + +# Path on local FS for storage of segments; dir will be created if needed +druid.paths.indexCache=/tmp/druid/indexCache +# Path on local FS for storage of segment metadata; dir will be created if needed +druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache +druid.pusher.local.storageDirectory=/tmp/druid/localStorage +druid.pusher.local=true + +# thread pool size for servicing queries +druid.client.http.connections=30 +druid.host=127.0.0.1:8083 diff --git a/examples/config/compute/runtime.properties b/examples/config/compute/runtime.properties new file mode 100644 index 00000000000..ab4db40342e --- /dev/null +++ b/examples/config/compute/runtime.properties @@ -0,0 +1,39 @@ +druid.host=127.0.0.1 +druid.port=8082 + +com.metamx.emitter.logging=true + +druid.processing.formatString=processing_%s +druid.processing.numThreads=1 +druid.processing.buffer.sizeBytes=10000000 + +#emitting, opaque marker +druid.service=example + +druid.request.logging.dir=/tmp/example/log +druid.realtime.specFile=realtime.spec +com.metamx.emitter.logging=true +com.metamx.emitter.logging.level=info + +# below are dummy values when operating a realtime only node +com.metamx.aws.accessKey=dummy_access_key +com.metamx.aws.secretKey=dummy_secret_key +druid.pusher.s3.bucket=dummy_s3_bucket + +druid.zk.service.host=localhost +druid.server.maxSize=300000000000 +druid.zk.paths.base=/druid +druid.database.segmentTable=prod_segments +druid.database.user=druid +druid.database.password=diurd +druid.database.connectURI=jdbc:mysql://localhost:3306/druid +druid.zk.paths.discoveryPath=/druid/discoveryPath +druid.database.ruleTable=rules +druid.database.configTable=config + +# Path on local FS for storage of segments; dir will be created if needed +druid.paths.indexCache=/tmp/druid/indexCache +# Path on local FS for storage of segment metadata; dir will be created if needed +druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache +druid.pusher.local.storageDirectory=/tmp/druid/localStorage +druid.pusher.local=true diff --git a/examples/config/master/runtime.properties b/examples/config/master/runtime.properties new file mode 100644 index 00000000000..af93593027e --- /dev/null +++ b/examples/config/master/runtime.properties @@ -0,0 +1,39 @@ +druid.host=127.0.0.1 +druid.port=8081 + +com.metamx.emitter.logging=true + +druid.processing.formatString=processing_%s +druid.processing.numThreads=1 +druid.processing.buffer.sizeBytes=10000000 + +#emitting, opaque marker +druid.service=example + +druid.request.logging.dir=/tmp/example/log +druid.realtime.specFile=realtime.spec +com.metamx.emitter.logging=true +com.metamx.emitter.logging.level=info + +# below are dummy values when operating a realtime only node +com.metamx.aws.accessKey=dummy_access_key +com.metamx.aws.secretKey=dummy_secret_key +druid.pusher.s3.bucket=dummy_s3_bucket + +druid.zk.service.host=localhost +druid.server.maxSize=300000000000 +druid.zk.paths.base=/druid +druid.database.segmentTable=prod_segments +druid.database.user=druid +druid.database.password=diurd +druid.database.connectURI=jdbc:mysql://localhost:3306/druid +druid.zk.paths.discoveryPath=/druid/discoveryPath +druid.database.ruleTable=rules +druid.database.configTable=config + +# Path on local FS for storage of segments; dir will be created if needed +druid.paths.indexCache=/tmp/druid/indexCache +# Path on local FS for storage of segment metadata; dir will be created if needed +druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache +druid.pusher.local.storageDirectory=/tmp/druid/localStorage +druid.pusher.local=true diff --git a/examples/config/realtime/realtime.spec b/examples/config/realtime/realtime.spec new file mode 100644 index 00000000000..0309b3bf891 --- /dev/null +++ b/examples/config/realtime/realtime.spec @@ -0,0 +1,29 @@ +[{ + "schema" : { "dataSource":"druidtest", + "aggregators":[ {"type":"count", "name":"impressions"}, + {"type":"doubleSum","name":"wp","fieldName":"wp"}], + "indexGranularity":"minute", + "shardSpec" : { "type": "none" } }, + "config" : { "maxRowsInMemory" : 500000, + "intermediatePersistPeriod" : "PT10m" }, + "firehose" : { "type" : "kafka-0.7.2", + "consumerProps" : { "zk.connect" : "localhost:2181", + "zk.connectiontimeout.ms" : "15000", + "zk.sessiontimeout.ms" : "15000", + "zk.synctime.ms" : "5000", + "groupid" : "topic-pixel-local", + "fetch.size" : "1048586", + "autooffset.reset" : "largest", + "autocommit.enable" : "false" }, + "feed" : "druidtest", + "parser" : { "timestampSpec" : { "column" : "utcdt", "format" : "iso" }, + "data" : { "format" : "json" }, + "dimensionExclusions" : ["wp"] } }, + "plumber" : { "type" : "realtime", + "windowPeriod" : "PT10m", + "segmentGranularity":"hour", + "basePersistDirectory" : "/tmp/realtime/basePersist", + "rejectionPolicy": {"type": "messageTime"} } + +}] + diff --git a/examples/config/realtime/runtime.properties b/examples/config/realtime/runtime.properties new file mode 100644 index 00000000000..ce46126d0f8 --- /dev/null +++ b/examples/config/realtime/runtime.properties @@ -0,0 +1,41 @@ +druid.host=127.0.0.1 +druid.port=8080 + +com.metamx.emitter.logging=true + +druid.processing.formatString=processing_%s +druid.processing.numThreads=1 +druid.processing.buffer.sizeBytes=10000000 + +#emitting, opaque marker +druid.service=example + +druid.request.logging.dir=/tmp/example/log +druid.realtime.specFile=realtime.spec +com.metamx.emitter.logging=true +com.metamx.emitter.logging.level=info + +# below are dummy values when operating a realtime only node +com.metamx.aws.accessKey=dummy_access_key +com.metamx.aws.secretKey=dummy_secret_key +druid.pusher.s3.bucket=dummy_s3_bucket + +druid.zk.service.host=localhost +druid.server.maxSize=300000000000 +druid.zk.paths.base=/druid +druid.database.segmentTable=prod_segments +druid.database.user=druid +druid.database.password=diurd +druid.database.connectURI=jdbc:mysql://localhost:3306/druid +druid.zk.paths.discoveryPath=/druid/discoveryPath +druid.database.ruleTable=rules +druid.database.configTable=config + +# Path on local FS for storage of segments; dir will be created if needed +druid.paths.indexCache=/tmp/druid/indexCache +# Path on local FS for storage of segment metadata; dir will be created if needed +druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache +druid.pusher.local.storageDirectory=/tmp/druid/localStorage +druid.pusher.local=true + +druid.host=127.0.0.1:8080 diff --git a/examples/pom.xml b/examples/pom.xml index d3bd28bd236..9acca31f12d 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 com.metamx.druid druid-examples diff --git a/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java b/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java index 4cb267f571f..ebdd59d46c1 100644 --- a/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java +++ b/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java @@ -16,6 +16,7 @@ import com.metamx.druid.realtime.SegmentPublisher; import druid.examples.flights.FlightsFirehoseFactory; import druid.examples.rand.RandomFirehoseFactory; import druid.examples.twitter.TwitterSpritzerFirehoseFactory; +import druid.examples.web.WebFirehoseFactory; import java.io.File; import java.io.IOException; @@ -41,7 +42,9 @@ public class RealtimeStandaloneMain rn.registerJacksonSubtype( new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), new NamedType(FlightsFirehoseFactory.class, "flights"), - new NamedType(RandomFirehoseFactory.class, "rand") + new NamedType(RandomFirehoseFactory.class, "rand"), + new NamedType(WebFirehoseFactory.class, "webstream") + ); // Create dummy objects for the various interfaces that interact with the DB, ZK and deep storage @@ -50,7 +53,7 @@ public class RealtimeStandaloneMain rn.setDataSegmentPusher(new NoopDataSegmentPusher()); rn.setServerView(new NoopServerView()); rn.setInventoryView(new NoopInventoryView()); - + Runtime.getRuntime().addShutdownHook( new Thread( new Runnable() @@ -142,5 +145,17 @@ public class RealtimeStandaloneMain { // do nothing } + + @Override + public void announceSegments(Iterable segments) throws IOException + { + // do nothing + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + // do nothing + } } } \ No newline at end of file diff --git a/examples/src/main/java/druid/examples/web/InputSupplierUpdateStream.java b/examples/src/main/java/druid/examples/web/InputSupplierUpdateStream.java new file mode 100644 index 00000000000..40d7cf71616 --- /dev/null +++ b/examples/src/main/java/druid/examples/web/InputSupplierUpdateStream.java @@ -0,0 +1,133 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.web; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.io.InputSupplier; +import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.emitter.EmittingLogger; + +import java.io.BufferedReader; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +public class InputSupplierUpdateStream implements UpdateStream +{ + private static final EmittingLogger log = new EmittingLogger(InputSupplierUpdateStream.class); + private static final long queueWaitTime = 15L; + private final TypeReference> typeRef; + private final InputSupplier supplier; + private final int QUEUE_SIZE = 10000; + private final BlockingQueue> queue = new ArrayBlockingQueue>(QUEUE_SIZE); + private final ObjectMapper mapper = new DefaultObjectMapper(); + private final String timeDimension; + private final Thread addToQueueThread; + + public InputSupplierUpdateStream( + final InputSupplier supplier, + final String timeDimension + ) + { + addToQueueThread = new Thread() + { + public void run() + { + while (!isInterrupted()) { + try { + BufferedReader reader = supplier.getInput(); + String line; + while ((line = reader.readLine()) != null) { + if (isValid(line)) { + HashMap map = mapper.readValue(line, typeRef); + if (map.get(timeDimension) != null) { + queue.offer(map, queueWaitTime, TimeUnit.SECONDS); + log.debug("Successfully added to queue"); + } else { + log.info("missing timestamp"); + } + } + } + } + + catch (InterruptedException e){ + log.info(e, "Thread adding events to the queue interrupted"); + return; + } + catch (JsonMappingException e) { + log.info(e, "Error in converting json to map"); + } + catch (JsonParseException e) { + log.info(e, "Error in parsing json"); + } + catch (IOException e) { + log.info(e, "Error in connecting to InputStream"); + } + } + } + }; + addToQueueThread.setDaemon(true); + + this.supplier = supplier; + this.typeRef = new TypeReference>() + { + }; + this.timeDimension = timeDimension; + } + + private boolean isValid(String s) + { + return !(s.isEmpty()); + } + + public void start() + { + addToQueueThread.start(); + + } + + public void stop() + { + addToQueueThread.interrupt(); + } + + + public Map pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException + { + return queue.poll(waitTime, unit); + } + + public int getQueueSize() + { + return queue.size(); + } + + public String getTimeDimension() + { + return timeDimension; + } + +} diff --git a/examples/src/main/java/druid/examples/web/InputSupplierUpdateStreamFactory.java b/examples/src/main/java/druid/examples/web/InputSupplierUpdateStreamFactory.java new file mode 100644 index 00000000000..52a9c47f6f4 --- /dev/null +++ b/examples/src/main/java/druid/examples/web/InputSupplierUpdateStreamFactory.java @@ -0,0 +1,42 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.web; + +import com.google.common.io.InputSupplier; + +import java.io.BufferedReader; + +public class InputSupplierUpdateStreamFactory implements UpdateStreamFactory +{ + private final InputSupplier inputSupplier; + private final String timeDimension; + + public InputSupplierUpdateStreamFactory(InputSupplier inputSupplier, String timeDimension) + { + this.inputSupplier = inputSupplier; + this.timeDimension = timeDimension; + } + + public InputSupplierUpdateStream build() + { + return new InputSupplierUpdateStream(inputSupplier, timeDimension); + } + +} diff --git a/examples/src/main/java/druid/examples/web/RenamingKeysUpdateStream.java b/examples/src/main/java/druid/examples/web/RenamingKeysUpdateStream.java new file mode 100644 index 00000000000..2bd63e7f1f1 --- /dev/null +++ b/examples/src/main/java/druid/examples/web/RenamingKeysUpdateStream.java @@ -0,0 +1,81 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.web; + +import com.google.common.collect.Maps; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class RenamingKeysUpdateStream implements UpdateStream +{ + + private final InputSupplierUpdateStream updateStream; + private Map renamedDimensions; + + public RenamingKeysUpdateStream( + InputSupplierUpdateStream updateStream, + Map renamedDimensions + ) + { + this.renamedDimensions = renamedDimensions; + this.updateStream = updateStream; + } + + public Map pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException + { + return renameKeys(updateStream.pollFromQueue(waitTime, unit)); + } + + + private Map renameKeys(Map update) + { + if (renamedDimensions != null) { + Map renamedMap = Maps.newHashMap(); + for (String key : renamedDimensions.keySet()) { + if (update.get(key) != null) { + Object obj = update.get(key); + renamedMap.put(renamedDimensions.get(key), obj); + } + } + return renamedMap; + } else { + return update; + } + } + + public String getTimeDimension() + { + if (renamedDimensions != null && renamedDimensions.get(updateStream.getTimeDimension()) != null) { + return renamedDimensions.get(updateStream.getTimeDimension()); + } + return updateStream.getTimeDimension(); + + } + + public void start() + { + updateStream.start(); + } + + public void stop(){ + updateStream.stop(); + } +} diff --git a/examples/src/main/java/druid/examples/web/RenamingKeysUpdateStreamFactory.java b/examples/src/main/java/druid/examples/web/RenamingKeysUpdateStreamFactory.java new file mode 100644 index 00000000000..021088d982e --- /dev/null +++ b/examples/src/main/java/druid/examples/web/RenamingKeysUpdateStreamFactory.java @@ -0,0 +1,39 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.web; + +import java.util.Map; + +public class RenamingKeysUpdateStreamFactory implements UpdateStreamFactory +{ + private InputSupplierUpdateStreamFactory updateStreamFactory; + private Map renamedDimensions; + + public RenamingKeysUpdateStreamFactory(InputSupplierUpdateStreamFactory updateStreamFactory, Map renamedDimensions) + { + this.updateStreamFactory = updateStreamFactory; + this.renamedDimensions = renamedDimensions; + } + + public RenamingKeysUpdateStream build() + { + return new RenamingKeysUpdateStream(updateStreamFactory.build(), renamedDimensions); + } +} diff --git a/examples/src/main/java/druid/examples/web/UpdateStream.java b/examples/src/main/java/druid/examples/web/UpdateStream.java new file mode 100644 index 00000000000..bb84fb0a9e1 --- /dev/null +++ b/examples/src/main/java/druid/examples/web/UpdateStream.java @@ -0,0 +1,31 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package druid.examples.web; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public interface UpdateStream +{ + public Map pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException; + public String getTimeDimension(); + public void start(); + public void stop(); + +} diff --git a/examples/src/main/java/druid/examples/web/UpdateStreamFactory.java b/examples/src/main/java/druid/examples/web/UpdateStreamFactory.java new file mode 100644 index 00000000000..c65685ba68b --- /dev/null +++ b/examples/src/main/java/druid/examples/web/UpdateStreamFactory.java @@ -0,0 +1,24 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package druid.examples.web; + +public interface UpdateStreamFactory +{ + public UpdateStream build(); +} diff --git a/examples/src/main/java/druid/examples/web/WebFirehoseFactory.java b/examples/src/main/java/druid/examples/web/WebFirehoseFactory.java new file mode 100644 index 00000000000..885089a2001 --- /dev/null +++ b/examples/src/main/java/druid/examples/web/WebFirehoseFactory.java @@ -0,0 +1,134 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.web; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Throwables; +import com.metamx.common.parsers.TimestampParser; +import com.metamx.druid.guava.Runnables; +import com.metamx.druid.input.InputRow; +import com.metamx.druid.input.MapBasedInputRow; +import com.metamx.druid.realtime.firehose.Firehose; +import com.metamx.druid.realtime.firehose.FirehoseFactory; +import com.metamx.emitter.EmittingLogger; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@JsonTypeName("webstream") +public class WebFirehoseFactory implements FirehoseFactory +{ + private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class); + private final String timeFormat; + private final UpdateStreamFactory factory; + private final long queueWaitTime = 15L; + + @JsonCreator + public WebFirehoseFactory( + @JsonProperty("url") String url, + @JsonProperty("renamedDimensions") Map renamedDimensions, + @JsonProperty("timeDimension") String timeDimension, + @JsonProperty("timeFormat") String timeFormat + ) + { + this( + new RenamingKeysUpdateStreamFactory( + new InputSupplierUpdateStreamFactory(new WebJsonSupplier(url), timeDimension), + renamedDimensions + ), timeFormat + ); + } + + public WebFirehoseFactory(UpdateStreamFactory factory, String timeFormat) + { + this.factory = factory; + if (timeFormat == null) { + this.timeFormat = "auto"; + } else { + this.timeFormat = timeFormat; + } + } + + @Override + public Firehose connect() throws IOException + { + + final UpdateStream updateStream = factory.build(); + updateStream.start(); + + return new Firehose() + { + Map map; + private final Runnable doNothingRunnable = Runnables.getNoopRunnable(); + + @Override + public boolean hasMore() + { + try { + map = updateStream.pollFromQueue(queueWaitTime, TimeUnit.SECONDS); + return map != null; + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + + + @Override + public InputRow nextRow() + { + try { + DateTime date = TimestampParser.createTimestampParser(timeFormat) + .apply(map.get(updateStream.getTimeDimension()).toString()); + return new MapBasedInputRow( + date.getMillis(), + new ArrayList(map.keySet()), + map + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + finally { + map = null; + } + } + + @Override + public Runnable commit() + { + // ephemera in, ephemera out. + return doNothingRunnable; // reuse the same object each time + } + + @Override + public void close() throws IOException + { + updateStream.stop(); + } + + }; + } +} diff --git a/examples/src/main/java/druid/examples/web/WebJsonSupplier.java b/examples/src/main/java/druid/examples/web/WebJsonSupplier.java new file mode 100644 index 00000000000..093ca1ad3ca --- /dev/null +++ b/examples/src/main/java/druid/examples/web/WebJsonSupplier.java @@ -0,0 +1,57 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.web; + +import com.google.common.io.InputSupplier; +import com.metamx.emitter.EmittingLogger; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.net.URLConnection; + +public class WebJsonSupplier implements InputSupplier +{ + private static final EmittingLogger log = new EmittingLogger(WebJsonSupplier.class); + + private String urlString; + private URL url; + + public WebJsonSupplier(String urlString) + { + this.urlString = urlString; + try { + this.url = new URL(urlString); + } + catch (Exception e) { + log.error(e,"Malformed url"); + } + } + + @Override + public BufferedReader getInput() throws IOException + { + URL url = new URL(urlString); + URLConnection connection = url.openConnection(); + connection.setDoInput(true); + return new BufferedReader(new InputStreamReader(url.openStream())); + } +} diff --git a/examples/src/test/java/druid/examples/web/InputSupplierUpdateStreamTest.java b/examples/src/test/java/druid/examples/web/InputSupplierUpdateStreamTest.java new file mode 100644 index 00000000000..df00f598b40 --- /dev/null +++ b/examples/src/test/java/druid/examples/web/InputSupplierUpdateStreamTest.java @@ -0,0 +1,111 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.web; + +import com.google.common.io.InputSupplier; +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class InputSupplierUpdateStreamTest +{ + private final long waitTime = 1L; + private final TimeUnit unit = TimeUnit.SECONDS; + private final ArrayList dimensions = new ArrayList(); + private InputSupplier testCaseSupplier; + Map expectedAnswer = new HashMap(); + String timeDimension; + + @Before + public void setUp() + { + timeDimension = "time"; + testCaseSupplier = new TestCaseSupplier( + "{\"item1\": \"value1\"," + + "\"item2\":2," + + "\"time\":1372121562 }" + ); + + dimensions.add("item1"); + dimensions.add("item2"); + dimensions.add("time"); + + expectedAnswer.put("item1", "value1"); + expectedAnswer.put("item2", 2); + expectedAnswer.put("time", 1372121562); + } + + + @Test + public void basicIngestionCheck() throws Exception + { + InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream( + testCaseSupplier, + timeDimension + ); + updateStream.start(); + Map insertedRow = updateStream.pollFromQueue(waitTime, unit); + Assert.assertEquals(expectedAnswer, insertedRow); + } + + //If a timestamp is missing, we should throw away the event + @Test + public void missingTimeStampCheck() + { + testCaseSupplier = new TestCaseSupplier( + "{\"item1\": \"value1\"," + + "\"item2\":2}" + ); + + InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream( + testCaseSupplier, + timeDimension + ); + updateStream.start(); + Assert.assertEquals(updateStream.getQueueSize(), 0); + } + + //If any other value is missing, we should still add the event and process it properly + @Test + public void otherNullValueCheck() throws Exception + { + testCaseSupplier = new TestCaseSupplier( + "{\"item1\": \"value1\"," + + "\"time\":1372121562 }" + ); + InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream( + testCaseSupplier, + timeDimension + ); + updateStream.start(); + Map insertedRow = updateStream.pollFromQueue(waitTime, unit); + Map expectedAnswer = new HashMap(); + expectedAnswer.put("item1", "value1"); + expectedAnswer.put("time", 1372121562); + Assert.assertEquals(expectedAnswer, insertedRow); + } + + +} diff --git a/examples/src/test/java/druid/examples/web/RenamingKeysUpdateStreamTest.java b/examples/src/test/java/druid/examples/web/RenamingKeysUpdateStreamTest.java new file mode 100644 index 00000000000..e31b8a8e346 --- /dev/null +++ b/examples/src/test/java/druid/examples/web/RenamingKeysUpdateStreamTest.java @@ -0,0 +1,94 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.web; + +import com.google.common.io.InputSupplier; +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class RenamingKeysUpdateStreamTest +{ + private final long waitTime = 15L; + private final TimeUnit unit = TimeUnit.SECONDS; + private InputSupplier testCaseSupplier; + String timeDimension; + + @Before + public void setUp() + { + timeDimension = "time"; + testCaseSupplier = new TestCaseSupplier( + "{\"item1\": \"value1\"," + + "\"item2\":2," + + "\"time\":1372121562 }" + ); + } + + @Test + public void testPollFromQueue() throws Exception + { + InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension); + Map renamedKeys = new HashMap(); + renamedKeys.put("item1", "i1"); + renamedKeys.put("item2", "i2"); + renamedKeys.put("time", "t"); + + RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys); + renamer.start(); + Map expectedAnswer = new HashMap(); + expectedAnswer.put("i1", "value1"); + expectedAnswer.put("i2", 2); + expectedAnswer.put("t", 1372121562); + + + Assert.assertEquals(expectedAnswer, renamer.pollFromQueue(waitTime, unit)); + } + + @Test + public void testGetTimeDimension() throws Exception + { + InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension); + Map renamedKeys = new HashMap(); + renamedKeys.put("item1", "i1"); + renamedKeys.put("item2", "i2"); + renamedKeys.put("time", "t"); + + RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys); + Assert.assertEquals("t", renamer.getTimeDimension()); + } + + @Test + public void testMissingTimeRename() throws Exception + { + InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension); + Map renamedKeys = new HashMap(); + renamedKeys.put("item1", "i1"); + renamedKeys.put("item2", "i2"); + RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys); + Assert.assertEquals("time", renamer.getTimeDimension()); + + } + +} diff --git a/examples/src/test/java/druid/examples/web/TestCaseSupplier.java b/examples/src/test/java/druid/examples/web/TestCaseSupplier.java new file mode 100644 index 00000000000..f4ae5ba0939 --- /dev/null +++ b/examples/src/test/java/druid/examples/web/TestCaseSupplier.java @@ -0,0 +1,42 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.web; + +import com.google.common.io.InputSupplier; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.StringReader; + +public class TestCaseSupplier implements InputSupplier +{ + private final String testString; + + public TestCaseSupplier(String testString) + { + this.testString = testString; + } + + @Override + public BufferedReader getInput() throws IOException + { + return new BufferedReader(new StringReader(testString)); + } +} diff --git a/examples/src/test/java/druid/examples/web/WebFirehoseFactoryTest.java b/examples/src/test/java/druid/examples/web/WebFirehoseFactoryTest.java new file mode 100644 index 00000000000..755bcfd197a --- /dev/null +++ b/examples/src/test/java/druid/examples/web/WebFirehoseFactoryTest.java @@ -0,0 +1,223 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.web; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.metamx.druid.input.InputRow; +import com.metamx.druid.realtime.firehose.Firehose; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class WebFirehoseFactoryTest +{ + private List dimensions = Lists.newArrayList(); + private WebFirehoseFactory webbie; + private WebFirehoseFactory webbie1; + + @Before + public void setUp() throws Exception + { + dimensions.add("item1"); + dimensions.add("item2"); + dimensions.add("time"); + webbie = new WebFirehoseFactory( + new UpdateStreamFactory() + { + @Override + public UpdateStream build() + { + return new MyUpdateStream(ImmutableMap.of("item1", "value1", "item2", 2, "time", "1372121562")); + } + }, + "posix" + ); + + webbie1 = new WebFirehoseFactory( + new UpdateStreamFactory() + { + @Override + public UpdateStream build() + { + return new MyUpdateStream(ImmutableMap.of("item1", "value1", "item2", 2, "time", "1373241600000")); + } + }, + "auto" + ); + + } + + @Test + public void testDimensions() throws Exception + { + InputRow inputRow; + Firehose firehose = webbie.connect(); + if (firehose.hasMore()) { + inputRow = firehose.nextRow(); + } else { + throw new RuntimeException("queue is empty"); + } + List actualAnswer = inputRow.getDimensions(); + Collections.sort(actualAnswer); + Assert.assertEquals(actualAnswer, dimensions); + } + + @Test + public void testPosixTimeStamp() throws Exception + { + InputRow inputRow; + Firehose firehose = webbie.connect(); + if (firehose.hasMore()) { + inputRow = firehose.nextRow(); + } else { + throw new RuntimeException("queue is empty"); + } + long expectedTime = 1372121562L * 1000L; + Assert.assertEquals(expectedTime, inputRow.getTimestampFromEpoch()); + } + + @Test + public void testISOTimeStamp() throws Exception + { + WebFirehoseFactory webbie3 = new WebFirehoseFactory( + new UpdateStreamFactory() + { + @Override + public UpdateStream build() + { + return new MyUpdateStream(ImmutableMap.of("item1", "value1", "item2", 2, "time", "2013-07-08")); + } + }, + "auto" + ); + Firehose firehose1 = webbie3.connect(); + if (firehose1.hasMore()) { + long milliSeconds = firehose1.nextRow().getTimestampFromEpoch(); + DateTime date = new DateTime("2013-07-08"); + Assert.assertEquals(date.getMillis(), milliSeconds); + } else { + Assert.assertFalse("hasMore returned false", true); + } + } + + @Test + public void testAutoIsoTimeStamp() throws Exception + { + WebFirehoseFactory webbie2 = new WebFirehoseFactory( + new UpdateStreamFactory() + { + @Override + public UpdateStream build() + { + return new MyUpdateStream(ImmutableMap.of("item1", "value1", "item2", 2, "time", "2013-07-08")); + } + }, + null + ); + Firehose firehose2 = webbie2.connect(); + if (firehose2.hasMore()) { + long milliSeconds = firehose2.nextRow().getTimestampFromEpoch(); + DateTime date = new DateTime("2013-07-08"); + Assert.assertEquals(date.getMillis(), milliSeconds); + } else { + Assert.assertFalse("hasMore returned false", true); + } + } + + @Test + public void testAutoMilliSecondsTimeStamp() throws Exception + { + Firehose firehose3 = webbie1.connect(); + if (firehose3.hasMore()) { + long milliSeconds = firehose3.nextRow().getTimestampFromEpoch(); + DateTime date = new DateTime("2013-07-08"); + Assert.assertEquals(date.getMillis(), milliSeconds); + } else { + Assert.assertFalse("hasMore returned false", true); + } + } + + @Test + public void testGetDimension() throws Exception + { + InputRow inputRow; + Firehose firehose = webbie1.connect(); + if (firehose.hasMore()) { + inputRow = firehose.nextRow(); + } else { + throw new RuntimeException("queue is empty"); + } + + List column1 = Lists.newArrayList(); + column1.add("value1"); + Assert.assertEquals(column1, inputRow.getDimension("item1")); + } + + @Test + public void testGetFloatMetric() throws Exception + { + InputRow inputRow; + Firehose firehose = webbie1.connect(); + if (firehose.hasMore()) { + inputRow = firehose.nextRow(); + } else { + throw new RuntimeException("queue is empty"); + } + + Assert.assertEquals((float) 2.0, inputRow.getFloatMetric("item2"), 0.0f); + } + + private static class MyUpdateStream implements UpdateStream + { + private static ImmutableMap map; + public MyUpdateStream(ImmutableMap map){ + this.map=map; + } + + @Override + public Map pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException + { + return map; + } + + @Override + public String getTimeDimension() + { + return "time"; + } + + @Override + public void start() + { + } + + @Override + public void stop() + { + } + } +} diff --git a/examples/src/test/java/druid/examples/web/WebJsonSupplierTest.java b/examples/src/test/java/druid/examples/web/WebJsonSupplierTest.java new file mode 100644 index 00000000000..c1bc5c4dc34 --- /dev/null +++ b/examples/src/test/java/druid/examples/web/WebJsonSupplierTest.java @@ -0,0 +1,36 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.web; + +import org.junit.Test; + +import java.net.UnknownHostException; + +public class WebJsonSupplierTest +{ + @Test(expected = UnknownHostException.class) + public void checkInvalidUrl() throws Exception + { + + String invalidURL = "http://invalid.url"; + WebJsonSupplier supplier = new WebJsonSupplier(invalidURL); + supplier.getInput(); + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java index d6f775a2611..d5dd40472f4 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java @@ -30,7 +30,6 @@ import com.metamx.druid.Query; import com.metamx.druid.client.DataSegment; import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.index.v1.IndexGranularity; -import com.metamx.druid.input.InputRow; import com.metamx.druid.indexing.common.TaskLock; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskToolbox; @@ -38,6 +37,7 @@ import com.metamx.druid.indexing.common.actions.LockAcquireAction; import com.metamx.druid.indexing.common.actions.LockListAction; import com.metamx.druid.indexing.common.actions.LockReleaseAction; import com.metamx.druid.indexing.common.actions.SegmentInsertAction; +import com.metamx.druid.input.InputRow; import com.metamx.druid.query.FinalizeResultsQueryRunner; import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunnerFactory; @@ -226,6 +226,28 @@ public class RealtimeIndexTask extends AbstractTask toolbox.getTaskActionClient().submit(new LockReleaseAction(segment.getInterval())); } } + + @Override + public void announceSegments(Iterable segments) throws IOException + { + for (DataSegment segment : segments) { + toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval())); + } + toolbox.getSegmentAnnouncer().announceSegments(segments); + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + try { + toolbox.getSegmentAnnouncer().unannounceSegments(segments); + } + finally { + for (DataSegment segment : segments) { + toolbox.getTaskActionClient().submit(new LockReleaseAction(segment.getInterval())); + } + } + } }; // NOTE: getVersion will block if there is lock contention, which will block plumber.getSink diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java index d83ecf2f747..8b4e3fba6e1 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java @@ -106,9 +106,7 @@ public class TaskMasterLifecycle Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle); leaderLifecycle.addManagedInstance(taskConsumer); - if (indexerCoordinatorConfig.isAutoScalingEnabled()) { - leaderLifecycle.addManagedInstance(resourceManagementScheduler); - } + leaderLifecycle.addManagedInstance(resourceManagementScheduler); try { leaderLifecycle.start(); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java index 69a9d4dac88..b07ae5f1792 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java @@ -55,10 +55,6 @@ import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.RedirectFilter; import com.metamx.druid.http.RedirectInfo; import com.metamx.druid.http.StatusServlet; -import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.initialization.ServerConfig; -import com.metamx.druid.initialization.ServiceDiscoveryConfig; -import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.indexing.common.RetryPolicyFactory; import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory; import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; @@ -93,12 +89,17 @@ import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.indexing.coordinator.scaling.AutoScalingStrategy; import com.metamx.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy; import com.metamx.druid.indexing.coordinator.scaling.NoopAutoScalingStrategy; +import com.metamx.druid.indexing.coordinator.scaling.NoopResourceManagementScheduler; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerConfig; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory; import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy; import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagmentConfig; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; +import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.initialization.ServerConfig; +import com.metamx.druid.initialization.ServiceDiscoveryConfig; +import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Emitters; @@ -687,53 +688,64 @@ public class IndexerCoordinatorNode extends QueryableNode workerSetupData = configManager.watch( - WorkerSetupData.CONFIG_KEY, WorkerSetupData.class - ); - - AutoScalingStrategy strategy; - if (config.getAutoScalingImpl().equalsIgnoreCase("ec2")) { - strategy = new EC2AutoScalingStrategy( - getJsonMapper(), - new AmazonEC2Client( - new BasicAWSCredentials( - PropUtils.getProperty(getProps(), "com.metamx.aws.accessKey"), - PropUtils.getProperty(getProps(), "com.metamx.aws.secretKey") - ) - ), - getConfigFactory().build(EC2AutoScalingStrategyConfig.class), - workerSetupData - ); - } else if (config.getAutoScalingImpl().equalsIgnoreCase("noop")) { - strategy = new NoopAutoScalingStrategy(); - } else { - throw new ISE("Invalid strategy implementation: %s", config.getAutoScalingImpl()); + @Override + public ResourceManagementScheduler build(TaskRunner runner) + { + return new NoopResourceManagementScheduler(); } + }; + } else { + resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory() + { + @Override + public ResourceManagementScheduler build(TaskRunner runner) + { + final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("ScalingExec--%d") + .build() + ); + final AtomicReference workerSetupData = configManager.watch( + WorkerSetupData.CONFIG_KEY, WorkerSetupData.class + ); - return new ResourceManagementScheduler( - runner, - new SimpleResourceManagementStrategy( - strategy, - getConfigFactory().build(SimpleResourceManagmentConfig.class), + AutoScalingStrategy strategy; + if (config.getAutoScalingImpl().equalsIgnoreCase("ec2")) { + strategy = new EC2AutoScalingStrategy( + getJsonMapper(), + new AmazonEC2Client( + new BasicAWSCredentials( + PropUtils.getProperty(getProps(), "com.metamx.aws.accessKey"), + PropUtils.getProperty(getProps(), "com.metamx.aws.secretKey") + ) + ), + getConfigFactory().build(EC2AutoScalingStrategyConfig.class), workerSetupData - ), - getConfigFactory().build(ResourceManagementSchedulerConfig.class), - scalingScheduledExec - ); - } - }; + ); + } else if (config.getAutoScalingImpl().equalsIgnoreCase("noop")) { + strategy = new NoopAutoScalingStrategy(); + } else { + throw new ISE("Invalid strategy implementation: %s", config.getAutoScalingImpl()); + } + + return new ResourceManagementScheduler( + runner, + new SimpleResourceManagementStrategy( + strategy, + getConfigFactory().build(SimpleResourceManagmentConfig.class), + workerSetupData + ), + getConfigFactory().build(ResourceManagementSchedulerConfig.class), + scalingScheduledExec + ); + } + }; + } } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/NoopResourceManagementScheduler.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/NoopResourceManagementScheduler.java new file mode 100644 index 00000000000..a4f9c274eec --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/NoopResourceManagementScheduler.java @@ -0,0 +1,52 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.indexing.coordinator.scaling; + +import com.metamx.common.logger.Logger; + +/** + */ +public class NoopResourceManagementScheduler extends ResourceManagementScheduler +{ + private static final Logger log = new Logger(NoopResourceManagementScheduler.class); + + public NoopResourceManagementScheduler() + { + super(null, null, null, null); + } + + @Override + public void start() + { + log.info("Autoscaling is disabled."); + } + + @Override + public void stop() + { + // do nothing + } + + @Override + public ScalingStats getStats() + { + return new ScalingStats(0); + } +} diff --git a/pom.xml b/pom.xml index ede50cdf143..dd042d33c33 100644 --- a/pom.xml +++ b/pom.xml @@ -39,7 +39,7 @@ UTF-8 0.22.3 - 2.0.2-21-22 + 2.1.0-incubating diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index 0740be81b5d..0146af28dd4 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -21,6 +21,7 @@ package com.metamx.druid.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; @@ -37,6 +38,7 @@ import org.apache.curator.utils.ZKPaths; import java.io.File; import java.io.IOException; +import java.util.List; /** */ @@ -103,7 +105,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient()); - loadCache(); + if (config.isLoadFromSegmentCacheEnabled()) { + loadCache(); + } loadQueueCache.getListenable().addListener( new PathChildrenCacheListener() @@ -136,9 +140,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler } log.makeAlert(e, "Segment load/unload: uncaught exception.") - .addData("node", path) - .addData("nodeProperties", segment) - .emit(); + .addData("node", path) + .addData("nodeProperties", segment) + .emit(); } break; @@ -192,12 +196,14 @@ public class ZkCoordinator implements DataSegmentChangeHandler return; } + List cachedSegments = Lists.newArrayList(); for (File file : baseDir.listFiles()) { log.info("Loading segment cache file [%s]", file); try { DataSegment segment = jsonMapper.readValue(file, DataSegment.class); if (serverManager.isSegmentCached(segment)) { - addSegment(segment); + cachedSegments.add(segment); + //addSegment(segment); } else { log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier()); @@ -213,6 +219,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler .emit(); } } + + addSegments(cachedSegments); } @Override @@ -222,14 +230,16 @@ public class ZkCoordinator implements DataSegmentChangeHandler serverManager.loadSegment(segment); File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); - } - catch (IOException e) { - removeSegment(segment); - throw new SegmentLoadingException( - e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile - ); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + removeSegment(segment); + throw new SegmentLoadingException( + e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile + ); + } } try { @@ -239,14 +249,51 @@ public class ZkCoordinator implements DataSegmentChangeHandler removeSegment(segment); throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); } + } catch (SegmentLoadingException e) { log.makeAlert(e, "Failed to load segment for dataSource") - .addData("segment", segment) - .emit(); + .addData("segment", segment) + .emit(); } } + public void addSegments(Iterable segments) + { + try { + for (DataSegment segment : segments) { + serverManager.loadSegment(segment); + + File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + removeSegment(segment); + throw new SegmentLoadingException( + e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile + ); + } + } + } + + try { + announcer.announceSegments(segments); + } + catch (IOException e) { + removeSegments(segments); + throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments); + } + } + catch (SegmentLoadingException e) { + log.makeAlert(e, "Failed to load segments for dataSource") + .addData("segments", segments) + .emit(); + } + } + + @Override public void removeSegment(DataSegment segment) { @@ -262,8 +309,29 @@ public class ZkCoordinator implements DataSegmentChangeHandler } catch (Exception e) { log.makeAlert(e, "Failed to remove segment") - .addData("segment", segment) - .emit(); + .addData("segment", segment) + .emit(); + } + } + + public void removeSegments(Iterable segments) + { + try { + for (DataSegment segment : segments) { + serverManager.dropSegment(segment); + + File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); + if (!segmentInfoCacheFile.delete()) { + log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); + } + } + + announcer.unannounceSegments(segments); + } + catch (Exception e) { + log.makeAlert(e, "Failed to remove segments") + .addData("segments", segments) + .emit(); } } } diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinatorConfig.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinatorConfig.java index 262f92e0c87..6f17f5f9a17 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinatorConfig.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinatorConfig.java @@ -29,4 +29,10 @@ public abstract class ZkCoordinatorConfig { @Config("druid.paths.segmentInfoCache") public abstract File getSegmentInfoCacheDirectory(); + + @Config("druid.segmentCache.enable") + public boolean isLoadFromSegmentCacheEnabled() + { + return true; + } } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 36b0b0b1596..c7c39d3fdd3 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -443,17 +443,17 @@ public class DruidMaster curator, ZKPaths.makePath(zkPaths.getMasterPath(), MASTER_OWNER_NODE), config.getHost() ); - newLeaderLatch.attachListener( + newLeaderLatch.addListener( new LeaderLatchListener() { @Override - public void becomeMaster() + public void isLeader() { DruidMaster.this.becomeMaster(); } @Override - public void stopBeingMaster() + public void notLeader() { DruidMaster.this.stopBeingMaster(); } diff --git a/services/src/assembly/assembly.xml b/services/src/assembly/assembly.xml index 6e0b675801d..648416c35a2 100644 --- a/services/src/assembly/assembly.xml +++ b/services/src/assembly/assembly.xml @@ -1,10 +1,9 @@ - + + bin - - tar.gz - + + tar.gz + ../examples/config @@ -13,6 +12,34 @@ config + + ../examples/config/broker + + * + + config/broker + + + ../examples/config/master + + * + + config/master + + + ../examples/config/realtime + + * + + config/realtime + + + ../examples/config/compute + + * + + config/compute + ../examples/bin @@ -28,14 +55,21 @@ lib - + + ../services/target + + druid-services-*-selfcontained.jar + + lib + + ../examples/bin/examples ** examples - + ../examples/bin/examples/twitter *sh diff --git a/upload.sh b/upload.sh new file mode 100644 index 00000000000..5e0580d3055 --- /dev/null +++ b/upload.sh @@ -0,0 +1,6 @@ +#!/bin/bash -e + +# +# Script to upload tarball of assembly build to static.druid.io for serving +# +s3cmd put services/target/druid-services-*-bin.tar.gz s3://static.druid.io/artifacts/