Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
Jan Rudert 2013-07-11 16:02:11 +02:00
commit 0961d8f149
47 changed files with 2545 additions and 148 deletions

1
.gitignore vendored
View File

@ -13,3 +13,4 @@ target
examples/rand/RealtimeNode.out
examples/twitter/RealtimeNode.out
*.log
*.DS_Store

View File

@ -39,15 +39,19 @@ import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer;
import com.metamx.druid.coordination.BatchingCuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.http.NoopRequestLogger;
import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
@ -424,7 +428,20 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s"));
lifecycle.addManagedInstance(announcer);
setAnnouncer(new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper()));
setAnnouncer(
new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Arrays.<AbstractDataSegmentAnnouncer>asList(
new BatchingCuratorDataSegmentAnnouncer(
getDruidServerMetadata(),
getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class),
announcer,
getJsonMapper()
),
new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper())
)
)
);
lifecycle.addManagedInstance(getAnnouncer(), Lifecycle.Stage.LAST);
}
}

View File

@ -0,0 +1,100 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths;
/**
*/
public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnouncer
{
private static final Logger log = new Logger(AbstractDataSegmentAnnouncer.class);
private final DruidServerMetadata server;
private final ZkPathsConfig config;
private final Announcer announcer;
private final ObjectMapper jsonMapper;
private final Object lock = new Object();
private volatile boolean started = false;
protected AbstractDataSegmentAnnouncer(
DruidServerMetadata server,
ZkPathsConfig config,
Announcer announcer,
ObjectMapper jsonMapper
)
{
this.server = server;
this.config = config;
this.announcer = announcer;
this.jsonMapper = jsonMapper;
}
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
try {
final String path = makeAnnouncementPath();
log.info("Announcing self[%s] at [%s]", server, path);
announcer.announce(path, jsonMapper.writeValueAsBytes(server));
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config);
announcer.unannounce(makeAnnouncementPath());
started = false;
}
}
private String makeAnnouncementPath()
{
return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName());
}
}

View File

@ -0,0 +1,297 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{
private static final Logger log = new Logger(BatchingCuratorDataSegmentAnnouncer.class);
private final ZkDataSegmentAnnouncerConfig config;
private final Announcer announcer;
private final ObjectMapper jsonMapper;
private final String liveSegmentLocation;
private final Set<SegmentZNode> availableZNodes = Sets.newHashSet();
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap();
public BatchingCuratorDataSegmentAnnouncer(
DruidServerMetadata server,
ZkDataSegmentAnnouncerConfig config,
Announcer announcer,
ObjectMapper jsonMapper
)
{
super(server, config, announcer, jsonMapper);
this.config = config;
this.announcer = announcer;
this.jsonMapper = jsonMapper;
this.liveSegmentLocation = ZKPaths.makePath(config.getLiveSegmentsPath(), server.getName());
}
@Override
public void announceSegment(DataSegment segment) throws IOException
{
int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
if (newBytesLen > config.getMaxNumBytes()) {
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
}
// create new batch
if (availableZNodes.isEmpty()) {
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
availableZNodes.add(availableZNode);
} else { // update existing batch
Iterator<SegmentZNode> iter = availableZNodes.iterator();
boolean done = false;
while (iter.hasNext() && !done) {
SegmentZNode availableZNode = iter.next();
if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.update(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
availableZNodes.remove(availableZNode);
}
done = true;
}
}
}
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
final SegmentZNode segmentZNode = segmentLookup.remove(segment);
segmentZNode.removeSegment(segment);
log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
if (segmentZNode.getCount() == 0) {
availableZNodes.remove(segmentZNode);
announcer.unannounce(segmentZNode.getPath());
} else {
announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
availableZNodes.add(segmentZNode);
}
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
Set<DataSegment> batch = Sets.newHashSet();
int byteSize = 0;
int count = 0;
for (DataSegment segment : segments) {
int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
if (newBytesLen > config.getMaxNumBytes()) {
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
}
if (count >= config.getSegmentsPerNode() || byteSize + newBytesLen > config.getMaxNumBytes()) {
segmentZNode.addSegments(batch);
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
batch = Sets.newHashSet();
count = 0;
byteSize = 0;
}
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
segmentLookup.put(segment, segmentZNode);
batch.add(segment);
count++;
byteSize += newBytesLen;
}
segmentZNode.addSegments(batch);
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
unannounceSegment(segment);
}
}
private String makeServedSegmentPath(String zNode)
{
return ZKPaths.makePath(liveSegmentLocation, zNode);
}
private class SegmentZNode
{
private final String path;
private byte[] bytes = new byte[]{};
private int count = 0;
public SegmentZNode(String path)
{
this.path = path;
}
public String getPath()
{
return path;
}
public int getCount()
{
return count;
}
public byte[] getBytes()
{
return bytes;
}
public Set<DataSegment> getSegments()
{
if (bytes.length == 0) {
return Sets.newHashSet();
}
try {
return jsonMapper.readValue(
bytes, new TypeReference<Set<DataSegment>>()
{
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public void addSegment(DataSegment segment)
{
Set<DataSegment> zkSegments = getSegments();
zkSegments.add(segment);
try {
bytes = jsonMapper.writeValueAsBytes(zkSegments);
}
catch (Exception e) {
zkSegments.remove(segment);
throw Throwables.propagate(e);
}
count++;
}
public void addSegments(Set<DataSegment> segments)
{
Set<DataSegment> zkSegments = getSegments();
zkSegments.addAll(segments);
try {
bytes = jsonMapper.writeValueAsBytes(zkSegments);
}
catch (Exception e) {
zkSegments.removeAll(segments);
throw Throwables.propagate(e);
}
count += segments.size();
}
public void removeSegment(DataSegment segment)
{
Set<DataSegment> zkSegments = getSegments();
zkSegments.remove(segment);
try {
bytes = jsonMapper.writeValueAsBytes(zkSegments);
}
catch (Exception e) {
zkSegments.add(segment);
throw Throwables.propagate(e);
}
count--;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SegmentZNode that = (SegmentZNode) o;
if (!path.equals(that.path)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return path.hashCode();
}
}
}

View File

@ -19,11 +19,7 @@
package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.announcement.Announcer;
@ -32,20 +28,14 @@ import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
public class CuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{
private static final Logger log = new Logger(CuratorDataSegmentAnnouncer.class);
private final Object lock = new Object();
private final DruidServerMetadata server;
private final ZkPathsConfig config;
private final Announcer announcer;
private final ObjectMapper jsonMapper;
private final String servedSegmentsLocation;
private volatile boolean started = false;
public CuratorDataSegmentAnnouncer(
DruidServerMetadata server,
ZkPathsConfig config,
@ -53,49 +43,13 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
ObjectMapper jsonMapper
)
{
this.server = server;
this.config = config;
super(server, config, announcer, jsonMapper);
this.announcer = announcer;
this.jsonMapper = jsonMapper;
this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsPath(), server.getName());
}
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
try {
final String path = makeAnnouncementPath();
log.info("Announcing self[%s] at [%s]", server, path);
announcer.announce(path, jsonMapper.writeValueAsBytes(server));
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config);
announcer.unannounce(makeAnnouncementPath());
started = false;
}
}
public void announceSegment(DataSegment segment) throws IOException
{
final String path = makeServedSegmentPath(segment);
@ -110,8 +64,20 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
announcer.unannounce(path);
}
private String makeAnnouncementPath() {
return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName());
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
announceSegment(segment);
}
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
unannounceSegment(segment);
}
}
private String makeServedSegmentPath(DataSegment segment)

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.metamx.druid.client.DataSegment;
@ -7,5 +26,10 @@ import java.io.IOException;
public interface DataSegmentAnnouncer
{
public void announceSegment(DataSegment segment) throws IOException;
public void unannounceSegment(DataSegment segment) throws IOException;
public void announceSegments(Iterable<DataSegment> segments) throws IOException;
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException;
}

View File

@ -80,7 +80,7 @@ public class DruidServerMetadata
@Override
public String toString()
{
return "DruidServer{" +
return "DruidServerMetadata{" +
"name='" + name + '\'' +
", host='" + host + '\'' +
", maxSize=" + maxSize +

View File

@ -0,0 +1,92 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkPathsConfig;
import java.io.IOException;
/**
* This class has the greatest name ever
*/
public class MultipleDataSegmentAnnouncerDataSegmentAnnouncer implements DataSegmentAnnouncer
{
private final Iterable<AbstractDataSegmentAnnouncer> dataSegmentAnnouncers;
public MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Iterable<AbstractDataSegmentAnnouncer> dataSegmentAnnouncers
)
{
this.dataSegmentAnnouncers = dataSegmentAnnouncers;
}
@LifecycleStart
public void start()
{
for (AbstractDataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.start();
}
}
@LifecycleStop
public void stop()
{
for (AbstractDataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.stop();
}
}
@Override
public void announceSegment(DataSegment segment) throws IOException
{
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.announceSegment(segment);
}
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.unannounceSegment(segment);
}
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.announceSegments(segments);
}
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.unannounceSegments(segments);
}
}
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.curator.announcement;
import com.google.common.base.Throwables;
@ -6,6 +25,7 @@ import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
@ -21,7 +41,9 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -208,11 +230,11 @@ public class Announcer
if (started) {
byte[] oldBytes = subPaths.putIfAbsent(pathAndNode.getNode(), bytes);
if (oldBytes != null) {
throw new IAE("Already announcing[%s], cannot announce it twice.", path);
}
if (oldBytes == null) {
created = true;
} else if (!Arrays.equals(oldBytes, bytes)) {
throw new IAE("Cannot reannounce different values under the same path");
}
}
}
@ -226,15 +248,48 @@ public class Announcer
}
}
public void update(final String path, final byte[] bytes)
{
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath();
final String nodePath = pathAndNode.getNode();
ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
if (subPaths == null || subPaths.get(nodePath) == null) {
throw new ISE("Cannot update a path[%s] that hasn't been announced!", path);
}
synchronized (subPaths) {
try {
byte[] oldBytes = subPaths.get(nodePath);
if (!Arrays.equals(oldBytes, bytes)) {
subPaths.put(nodePath, bytes);
updateAnnouncement(path, bytes);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
private String createAnnouncement(final String path, byte[] value) throws Exception
{
return curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path, value);
}
private Stat updateAnnouncement(final String path, final byte[] value) throws Exception
{
return curator.setData().compressed().inBackground().forPath(path, value);
}
/**
* Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer
* will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions.
*
* <p/>
* If you need to completely clear all the state of what is being watched and announced, stop() the Announcer.
*
* @param path

View File

@ -0,0 +1,17 @@
package com.metamx.druid.initialization;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class ZkDataSegmentAnnouncerConfig extends ZkPathsConfig
{
@Config("druid.zk.segmentsPerNode")
@Default("50")
public abstract int getSegmentsPerNode();
@Config("druid.zk.maxNumBytesPerNode")
@Default("512000")
public abstract long getMaxNumBytes();
}

View File

@ -21,6 +21,7 @@ package com.metamx.druid.initialization;
import org.apache.curator.utils.ZKPaths;
import org.skife.config.Config;
import org.skife.config.Default;
public abstract class ZkPathsConfig
{
@ -45,6 +46,12 @@ public abstract class ZkPathsConfig
return defaultPath("servedSegments");
}
@Config("druid.zk.paths.liveSegmentsPath")
public String getLiveSegmentsPath()
{
return defaultPath("segments");
}
@Config("druid.zk.paths.loadQueuePath")
public String getLoadQueuePath()
{

View File

@ -0,0 +1,241 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
*/
public class BatchingCuratorDataSegmentAnnouncerTest
{
private static final String testBasePath = "/test";
private static final String testSegmentsPath = "/test/segments/id";
private static final Joiner joiner = Joiner.on("/");
private TestingCluster testingCluster;
private CuratorFramework cf;
private ObjectMapper jsonMapper;
private Announcer announcer;
private SegmentReader segmentReader;
private BatchingCuratorDataSegmentAnnouncer segmentAnnouncer;
private Set<DataSegment> testSegments;
@Before
public void setUp() throws Exception
{
testingCluster = new TestingCluster(1);
testingCluster.start();
cf = CuratorFrameworkFactory.builder()
.connectString(testingCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(new PotentiallyGzippedCompressionProvider(false))
.build();
cf.start();
cf.create().creatingParentsIfNeeded().forPath(testBasePath);
jsonMapper = new DefaultObjectMapper();
announcer = new Announcer(
cf,
MoreExecutors.sameThreadExecutor()
);
announcer.start();
segmentReader = new SegmentReader(cf, jsonMapper);
segmentAnnouncer = new BatchingCuratorDataSegmentAnnouncer(
new DruidServerMetadata(
"id",
"host",
Long.MAX_VALUE,
"type",
"tier"
),
new ZkDataSegmentAnnouncerConfig()
{
@Override
public String getZkBasePath()
{
return testBasePath;
}
@Override
public int getSegmentsPerNode()
{
return 50;
}
@Override
public long getMaxNumBytes()
{
return 100000;
}
},
announcer,
jsonMapper
);
segmentAnnouncer.start();
testSegments = Sets.newHashSet();
for (int i = 0; i < 100; i++) {
testSegments.add(makeSegment(i));
}
}
@After
public void tearDown() throws Exception
{
segmentAnnouncer.stop();
announcer.stop();
cf.close();
testingCluster.stop();
}
@Test
public void testSingleAnnounce() throws Exception
{
Iterator<DataSegment> segIter = testSegments.iterator();
DataSegment firstSegment = segIter.next();
DataSegment secondSegment = segIter.next();
segmentAnnouncer.announceSegment(firstSegment);
List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
for (String zNode : zNodes) {
Set<DataSegment> segments = segmentReader.read(joiner.join(testSegmentsPath, zNode));
Assert.assertEquals(segments.iterator().next(), firstSegment);
}
segmentAnnouncer.announceSegment(secondSegment);
for (String zNode : zNodes) {
Set<DataSegment> segments = segmentReader.read(joiner.join(testSegmentsPath, zNode));
Assert.assertEquals(Sets.newHashSet(firstSegment, secondSegment), segments);
}
segmentAnnouncer.unannounceSegment(firstSegment);
for (String zNode : zNodes) {
Set<DataSegment> segments = segmentReader.read(joiner.join(testSegmentsPath, zNode));
Assert.assertEquals(segments.iterator().next(), secondSegment);
}
segmentAnnouncer.unannounceSegment(secondSegment);
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}
@Test
public void testBatchAnnounce() throws Exception
{
segmentAnnouncer.announceSegments(testSegments);
List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
Assert.assertTrue(zNodes.size() == 2);
Set<DataSegment> allSegments = Sets.newHashSet();
for (String zNode : zNodes) {
allSegments.addAll(segmentReader.read(joiner.join(testSegmentsPath, zNode)));
}
Assert.assertEquals(allSegments, testSegments);
segmentAnnouncer.unannounceSegments(testSegments);
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}
@Test
public void testMultipleBatchAnnounce() throws Exception
{
for (int i = 0; i < 10; i++) {
testBatchAnnounce();
}
}
private DataSegment makeSegment(int offset)
{
return DataSegment.builder()
.dataSource("foo")
.interval(
new Interval(
new DateTime("2013-01-01").plusDays(offset),
new DateTime("2013-01-02").plusDays(offset)
)
)
.version(new DateTime().toString())
.build();
}
private class SegmentReader
{
private final CuratorFramework cf;
private final ObjectMapper jsonMapper;
public SegmentReader(CuratorFramework cf, ObjectMapper jsonMapper)
{
this.cf = cf;
this.jsonMapper = jsonMapper;
}
public Set<DataSegment> read(String path)
{
try {
if (cf.checkExists().forPath(path) != null) {
return jsonMapper.readValue(
cf.getData().forPath(path), new TypeReference<Set<DataSegment>>()
{
}
);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return Sets.newHashSet();
}
}
}

View File

@ -43,14 +43,11 @@ public class CuratorInventoryManagerTest extends CuratorTestBase
curator.start();
manager.start();
curator.create().creatingParentsIfNeeded().forPath("/container");
curator.create().creatingParentsIfNeeded().forPath("/inventory/billy");
Assert.assertTrue(Iterables.isEmpty(manager.getInventory()));
CountDownLatch containerLatch = new CountDownLatch(1);
strategy.setNewContainerLatch(containerLatch);
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/container/billy", new byte[]{});
curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/container/billy", new byte[]{});
Assert.assertTrue(timing.awaitLatch(containerLatch));
strategy.setNewContainerLatch(null);
@ -60,7 +57,7 @@ public class CuratorInventoryManagerTest extends CuratorTestBase
CountDownLatch inventoryLatch = new CountDownLatch(2);
strategy.setNewInventoryLatch(inventoryLatch);
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/1", Ints.toByteArray(100));
curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/1", Ints.toByteArray(100));
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/bob", Ints.toByteArray(2287));
Assert.assertTrue(timing.awaitLatch(inventoryLatch));

View File

@ -34,4 +34,10 @@ public class Runnables
}
};
}
public static Runnable getNoopRunnable(){
return new Runnable(){
public void run(){}
};
}
}

27
examples/bin/ec2/env.sh Normal file
View File

@ -0,0 +1,27 @@
# Setup Oracle Java
sudo apt-get update
sudo add-apt-repository -y ppa:webupd8team/java
sudo apt-get update
# Setup yes answer to license question
echo debconf shared/accepted-oracle-license-v1-1 select true | sudo debconf-set-selections
echo debconf shared/accepted-oracle-license-v1-1 seen true | sudo debconf-set-selections
sudo apt-get -y -q install oracle-java7-installer
# Automated Kafka setup
curl http://static.druid.io/artifacts/kafka-0.7.2-incubating-bin.tar.gz -o /tmp/kafka-0.7.2-incubating-bin.tar.gz
tar -xvzf /tmp/kafka-0.7.2-incubating-bin.tar.gz
cd kafka-0.7.2-incubating-bin
cat config/zookeeper.properties
nohup bin/zookeeper-server-start.sh config/zookeeper.properties 2>&1 > /dev/null &
# in a new console
nohup bin/kafka-server-start.sh config/server.properties 2>&1 > /dev/null &
# Install dependencies - mysql must be built from source, as the 12.04 apt-get hangs
export DEBIAN_FRONTEND=noninteractive
sudo debconf-set-selections <<< 'mysql-server-5.5 mysql-server/root_password password diurd'
sudo debconf-set-selections <<< 'mysql-server-5.5 mysql-server/root_password_again password diurd'
sudo apt-get -q -y -V --force-yes --reinstall install mysql-server-5.5
echo "ALL DONE with druid environment setup! Hit CTRL-C to proceed."
exit 0

22
examples/bin/ec2/run.sh Normal file
View File

@ -0,0 +1,22 @@
# Is localhost expected with multi-node?
mysql -u root -pdiurd -e "GRANT ALL ON druid.* TO 'druid'@'localhost' IDENTIFIED BY 'diurd'; CREATE database druid;" 2>&1 > /dev/null
tar -xvzf druid-services-*-bin.tar.gz 2>&1 > /dev/null
cd druid-services-* 2>&1 > /dev/null
mkdir logs 2>&1 > /dev/null
# Now start a realtime node
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=config/realtime/realtime.spec -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/realtime com.metamx.druid.realtime.RealtimeMain 2>&1 > logs/realtime.log &
# And a master node
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/master com.metamx.druid.http.MasterMain 2>&1 > logs/master.log &
# And a compute node
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/compute com.metamx.druid.http.ComputeMain 2>&1 > logs/compute.log &
# And a broker node
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/broker com.metamx.druid.http.BrokerMain 2>&1 > logs/broker.log &
echo "Hit CTRL-C to continue..."
exit 0

76
examples/bin/run_ec2.sh Executable file
View File

@ -0,0 +1,76 @@
# Before running, you will need to download the EC2 tools from http://aws.amazon.com/developertools/351
# and then setup your EC2_HOME and PATH variables (or similar):
#
# # Setup environment for ec2-api-tools
# export EC2_HOME=/path/to/ec2-api-tools-1.6.7.4/
# export PATH=$PATH:$EC2_HOME/bin
# export AWS_ACCESS_KEY=
# export AWS_SECRET_KEY=
# Check for ec2 commands we require and die if they're missing
type ec2-create-keypair >/dev/null 2>&1 || { echo >&2 "I require ec2-create-keypair but it's not installed. Aborting."; exit 1; }
type ec2-create-group >/dev/null 2>&1 || { echo >&2 "I require ec2-create-group but it's not installed. Aborting."; exit 1; }
type ec2-authorize >/dev/null 2>&1 || { echo >&2 "I require ec2-authorize but it's not installed. Aborting."; exit 1; }
type ec2-run-instances >/dev/null 2>&1 || { echo >&2 "I require ec2-run-instances but it's not installed. Aborting."; exit 1; }
type ec2-describe-instances >/dev/null 2>&1 || { echo >&2 "I require ec2-describe-instances but it's not installed. Aborting."; exit 1; }
# Create a keypair for our servers
echo "Removing old keypair for druid..."
ec2-delete-keypair druid-keypair
echo "Creating new keypair for druid..."
ec2-create-keypair druid-keypair > druid-keypair
chmod 0600 druid-keypair
mv druid-keypair ~/.ssh/
# Create a security group for our servers
echo "Creating a new security group for druid..."
ec2-create-group druid-group -d "Druid Cluster"
# Create rules that allow necessary services in our group
echo "Creating new firewall rules for druid..."
# SSH from outside
ec2-authorize druid-group -P tcp -p 22
# Enable all traffic within group
ec2-authorize druid-group -P tcp -p 1-65535 -o druid-group
ec2-authorize druid-group -P udp -p 1-65535 -o druid-group
echo "Booting a single small instance for druid..."
# Use ami ami-e7582d8e - Alestic Ubuntu 12.04 us-east
INSTANCE_ID=$(ec2-run-instances ami-e7582d8e -n 1 -g druid-group -k druid-keypair --instance-type m1.small| awk '/INSTANCE/{print $2}')
while true; do
sleep 1
INSTANCE_STATUS=$(ec2-describe-instances|grep INSTANCE|grep $INSTANCE_ID|cut -f6)
if [ $INSTANCE_STATUS == "running" ]
then
echo "Instance $INSTANCE_ID is status $INSTANCE_STATUS..."
break
fi
done
# Wait for the instance to come up
echo "Waiting 60 seconds for instance $INSTANCE_ID to boot..."
sleep 60
# Get hostname and ssh with the key we created, and ssh there
INSTANCE_ADDRESS=`ec2-describe-instances|grep 'INSTANCE'|grep $INSTANCE_ID|cut -f4`
echo "Connecting to $INSTANCE_ADDRESS to prepare environment for druid..."
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ./ec2/env.sh ubuntu@${INSTANCE_ADDRESS}:
ssh -q -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./env.sh;./env.sh'
echo "Prepared $INSTANCE_ADDRESS for druid."
# Now to scp a tarball up that can run druid!
if [ -f ../../services/target/druid-services-*-SNAPSHOT-bin.tar.gz ];
then
echo "Uploading druid tarball to server..."
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ../../services/target/druid-services-*-bin.tar.gz ubuntu@${INSTANCE_ADDRESS}:
else
echo "ERROR - package not built!"
fi
# Now boot druid parts
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ./ec2/run.sh ubuntu@${INSTANCE_ADDRESS}:
ssh -q -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./run.sh;./run.sh'
echo "Druid booting complete!"
echo "ssh -i ~/.ssh/druid-keypair ubuntu@${INSTANCE_ADDRESS} #to connect"

View File

@ -0,0 +1,43 @@
druid.host=127.0.0.1
druid.port=8083
com.metamx.emitter.logging=true
druid.processing.formatString=processing_%s
druid.processing.numThreads=1
druid.processing.buffer.sizeBytes=10000000
#emitting, opaque marker
druid.service=example
druid.request.logging.dir=/tmp/example/log
druid.realtime.specFile=realtime.spec
com.metamx.emitter.logging=true
com.metamx.emitter.logging.level=info
# below are dummy values when operating a realtime only node
com.metamx.aws.accessKey=dummy_access_key
com.metamx.aws.secretKey=dummy_secret_key
druid.pusher.s3.bucket=dummy_s3_bucket
druid.zk.service.host=localhost
druid.server.maxSize=300000000000
druid.zk.paths.base=/druid
druid.database.segmentTable=prod_segments
druid.database.user=druid
druid.database.password=diurd
druid.database.connectURI=jdbc:mysql://localhost:3306/druid
druid.zk.paths.discoveryPath=/druid/discoveryPath
druid.database.ruleTable=rules
druid.database.configTable=config
# Path on local FS for storage of segments; dir will be created if needed
druid.paths.indexCache=/tmp/druid/indexCache
# Path on local FS for storage of segment metadata; dir will be created if needed
druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache
druid.pusher.local.storageDirectory=/tmp/druid/localStorage
druid.pusher.local=true
# thread pool size for servicing queries
druid.client.http.connections=30
druid.host=127.0.0.1:8083

View File

@ -0,0 +1,39 @@
druid.host=127.0.0.1
druid.port=8082
com.metamx.emitter.logging=true
druid.processing.formatString=processing_%s
druid.processing.numThreads=1
druid.processing.buffer.sizeBytes=10000000
#emitting, opaque marker
druid.service=example
druid.request.logging.dir=/tmp/example/log
druid.realtime.specFile=realtime.spec
com.metamx.emitter.logging=true
com.metamx.emitter.logging.level=info
# below are dummy values when operating a realtime only node
com.metamx.aws.accessKey=dummy_access_key
com.metamx.aws.secretKey=dummy_secret_key
druid.pusher.s3.bucket=dummy_s3_bucket
druid.zk.service.host=localhost
druid.server.maxSize=300000000000
druid.zk.paths.base=/druid
druid.database.segmentTable=prod_segments
druid.database.user=druid
druid.database.password=diurd
druid.database.connectURI=jdbc:mysql://localhost:3306/druid
druid.zk.paths.discoveryPath=/druid/discoveryPath
druid.database.ruleTable=rules
druid.database.configTable=config
# Path on local FS for storage of segments; dir will be created if needed
druid.paths.indexCache=/tmp/druid/indexCache
# Path on local FS for storage of segment metadata; dir will be created if needed
druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache
druid.pusher.local.storageDirectory=/tmp/druid/localStorage
druid.pusher.local=true

View File

@ -0,0 +1,39 @@
druid.host=127.0.0.1
druid.port=8081
com.metamx.emitter.logging=true
druid.processing.formatString=processing_%s
druid.processing.numThreads=1
druid.processing.buffer.sizeBytes=10000000
#emitting, opaque marker
druid.service=example
druid.request.logging.dir=/tmp/example/log
druid.realtime.specFile=realtime.spec
com.metamx.emitter.logging=true
com.metamx.emitter.logging.level=info
# below are dummy values when operating a realtime only node
com.metamx.aws.accessKey=dummy_access_key
com.metamx.aws.secretKey=dummy_secret_key
druid.pusher.s3.bucket=dummy_s3_bucket
druid.zk.service.host=localhost
druid.server.maxSize=300000000000
druid.zk.paths.base=/druid
druid.database.segmentTable=prod_segments
druid.database.user=druid
druid.database.password=diurd
druid.database.connectURI=jdbc:mysql://localhost:3306/druid
druid.zk.paths.discoveryPath=/druid/discoveryPath
druid.database.ruleTable=rules
druid.database.configTable=config
# Path on local FS for storage of segments; dir will be created if needed
druid.paths.indexCache=/tmp/druid/indexCache
# Path on local FS for storage of segment metadata; dir will be created if needed
druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache
druid.pusher.local.storageDirectory=/tmp/druid/localStorage
druid.pusher.local=true

View File

@ -0,0 +1,29 @@
[{
"schema" : { "dataSource":"druidtest",
"aggregators":[ {"type":"count", "name":"impressions"},
{"type":"doubleSum","name":"wp","fieldName":"wp"}],
"indexGranularity":"minute",
"shardSpec" : { "type": "none" } },
"config" : { "maxRowsInMemory" : 500000,
"intermediatePersistPeriod" : "PT10m" },
"firehose" : { "type" : "kafka-0.7.2",
"consumerProps" : { "zk.connect" : "localhost:2181",
"zk.connectiontimeout.ms" : "15000",
"zk.sessiontimeout.ms" : "15000",
"zk.synctime.ms" : "5000",
"groupid" : "topic-pixel-local",
"fetch.size" : "1048586",
"autooffset.reset" : "largest",
"autocommit.enable" : "false" },
"feed" : "druidtest",
"parser" : { "timestampSpec" : { "column" : "utcdt", "format" : "iso" },
"data" : { "format" : "json" },
"dimensionExclusions" : ["wp"] } },
"plumber" : { "type" : "realtime",
"windowPeriod" : "PT10m",
"segmentGranularity":"hour",
"basePersistDirectory" : "/tmp/realtime/basePersist",
"rejectionPolicy": {"type": "messageTime"} }
}]

View File

@ -0,0 +1,41 @@
druid.host=127.0.0.1
druid.port=8080
com.metamx.emitter.logging=true
druid.processing.formatString=processing_%s
druid.processing.numThreads=1
druid.processing.buffer.sizeBytes=10000000
#emitting, opaque marker
druid.service=example
druid.request.logging.dir=/tmp/example/log
druid.realtime.specFile=realtime.spec
com.metamx.emitter.logging=true
com.metamx.emitter.logging.level=info
# below are dummy values when operating a realtime only node
com.metamx.aws.accessKey=dummy_access_key
com.metamx.aws.secretKey=dummy_secret_key
druid.pusher.s3.bucket=dummy_s3_bucket
druid.zk.service.host=localhost
druid.server.maxSize=300000000000
druid.zk.paths.base=/druid
druid.database.segmentTable=prod_segments
druid.database.user=druid
druid.database.password=diurd
druid.database.connectURI=jdbc:mysql://localhost:3306/druid
druid.zk.paths.discoveryPath=/druid/discoveryPath
druid.database.ruleTable=rules
druid.database.configTable=config
# Path on local FS for storage of segments; dir will be created if needed
druid.paths.indexCache=/tmp/druid/indexCache
# Path on local FS for storage of segment metadata; dir will be created if needed
druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache
druid.pusher.local.storageDirectory=/tmp/druid/localStorage
druid.pusher.local=true
druid.host=127.0.0.1:8080

View File

@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-examples</artifactId>

View File

@ -16,6 +16,7 @@ import com.metamx.druid.realtime.SegmentPublisher;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import java.io.File;
import java.io.IOException;
@ -41,7 +42,9 @@ public class RealtimeStandaloneMain
rn.registerJacksonSubtype(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand")
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream")
);
// Create dummy objects for the various interfaces that interact with the DB, ZK and deep storage
@ -142,5 +145,17 @@ public class RealtimeStandaloneMain
{
// do nothing
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
// do nothing
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
// do nothing
}
}
}

View File

@ -0,0 +1,133 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.web;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.InputSupplier;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.emitter.EmittingLogger;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class InputSupplierUpdateStream implements UpdateStream
{
private static final EmittingLogger log = new EmittingLogger(InputSupplierUpdateStream.class);
private static final long queueWaitTime = 15L;
private final TypeReference<HashMap<String, Object>> typeRef;
private final InputSupplier<BufferedReader> supplier;
private final int QUEUE_SIZE = 10000;
private final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
private final ObjectMapper mapper = new DefaultObjectMapper();
private final String timeDimension;
private final Thread addToQueueThread;
public InputSupplierUpdateStream(
final InputSupplier<BufferedReader> supplier,
final String timeDimension
)
{
addToQueueThread = new Thread()
{
public void run()
{
while (!isInterrupted()) {
try {
BufferedReader reader = supplier.getInput();
String line;
while ((line = reader.readLine()) != null) {
if (isValid(line)) {
HashMap<String, Object> map = mapper.readValue(line, typeRef);
if (map.get(timeDimension) != null) {
queue.offer(map, queueWaitTime, TimeUnit.SECONDS);
log.debug("Successfully added to queue");
} else {
log.info("missing timestamp");
}
}
}
}
catch (InterruptedException e){
log.info(e, "Thread adding events to the queue interrupted");
return;
}
catch (JsonMappingException e) {
log.info(e, "Error in converting json to map");
}
catch (JsonParseException e) {
log.info(e, "Error in parsing json");
}
catch (IOException e) {
log.info(e, "Error in connecting to InputStream");
}
}
}
};
addToQueueThread.setDaemon(true);
this.supplier = supplier;
this.typeRef = new TypeReference<HashMap<String, Object>>()
{
};
this.timeDimension = timeDimension;
}
private boolean isValid(String s)
{
return !(s.isEmpty());
}
public void start()
{
addToQueueThread.start();
}
public void stop()
{
addToQueueThread.interrupt();
}
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
{
return queue.poll(waitTime, unit);
}
public int getQueueSize()
{
return queue.size();
}
public String getTimeDimension()
{
return timeDimension;
}
}

View File

@ -0,0 +1,42 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.web;
import com.google.common.io.InputSupplier;
import java.io.BufferedReader;
public class InputSupplierUpdateStreamFactory implements UpdateStreamFactory
{
private final InputSupplier<BufferedReader> inputSupplier;
private final String timeDimension;
public InputSupplierUpdateStreamFactory(InputSupplier<BufferedReader> inputSupplier, String timeDimension)
{
this.inputSupplier = inputSupplier;
this.timeDimension = timeDimension;
}
public InputSupplierUpdateStream build()
{
return new InputSupplierUpdateStream(inputSupplier, timeDimension);
}
}

View File

@ -0,0 +1,81 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.web;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class RenamingKeysUpdateStream implements UpdateStream
{
private final InputSupplierUpdateStream updateStream;
private Map<String, String> renamedDimensions;
public RenamingKeysUpdateStream(
InputSupplierUpdateStream updateStream,
Map<String, String> renamedDimensions
)
{
this.renamedDimensions = renamedDimensions;
this.updateStream = updateStream;
}
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
{
return renameKeys(updateStream.pollFromQueue(waitTime, unit));
}
private Map<String, Object> renameKeys(Map<String, Object> update)
{
if (renamedDimensions != null) {
Map<String, Object> renamedMap = Maps.newHashMap();
for (String key : renamedDimensions.keySet()) {
if (update.get(key) != null) {
Object obj = update.get(key);
renamedMap.put(renamedDimensions.get(key), obj);
}
}
return renamedMap;
} else {
return update;
}
}
public String getTimeDimension()
{
if (renamedDimensions != null && renamedDimensions.get(updateStream.getTimeDimension()) != null) {
return renamedDimensions.get(updateStream.getTimeDimension());
}
return updateStream.getTimeDimension();
}
public void start()
{
updateStream.start();
}
public void stop(){
updateStream.stop();
}
}

View File

@ -0,0 +1,39 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.web;
import java.util.Map;
public class RenamingKeysUpdateStreamFactory implements UpdateStreamFactory
{
private InputSupplierUpdateStreamFactory updateStreamFactory;
private Map<String, String> renamedDimensions;
public RenamingKeysUpdateStreamFactory(InputSupplierUpdateStreamFactory updateStreamFactory, Map<String, String> renamedDimensions)
{
this.updateStreamFactory = updateStreamFactory;
this.renamedDimensions = renamedDimensions;
}
public RenamingKeysUpdateStream build()
{
return new RenamingKeysUpdateStream(updateStreamFactory.build(), renamedDimensions);
}
}

View File

@ -0,0 +1,31 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.web;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public interface UpdateStream
{
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException;
public String getTimeDimension();
public void start();
public void stop();
}

View File

@ -0,0 +1,24 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.web;
public interface UpdateStreamFactory
{
public UpdateStream build();
}

View File

@ -0,0 +1,134 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.web;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Throwables;
import com.metamx.common.parsers.TimestampParser;
import com.metamx.druid.guava.Runnables;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@JsonTypeName("webstream")
public class WebFirehoseFactory implements FirehoseFactory
{
private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class);
private final String timeFormat;
private final UpdateStreamFactory factory;
private final long queueWaitTime = 15L;
@JsonCreator
public WebFirehoseFactory(
@JsonProperty("url") String url,
@JsonProperty("renamedDimensions") Map<String, String> renamedDimensions,
@JsonProperty("timeDimension") String timeDimension,
@JsonProperty("timeFormat") String timeFormat
)
{
this(
new RenamingKeysUpdateStreamFactory(
new InputSupplierUpdateStreamFactory(new WebJsonSupplier(url), timeDimension),
renamedDimensions
), timeFormat
);
}
public WebFirehoseFactory(UpdateStreamFactory factory, String timeFormat)
{
this.factory = factory;
if (timeFormat == null) {
this.timeFormat = "auto";
} else {
this.timeFormat = timeFormat;
}
}
@Override
public Firehose connect() throws IOException
{
final UpdateStream updateStream = factory.build();
updateStream.start();
return new Firehose()
{
Map<String, Object> map;
private final Runnable doNothingRunnable = Runnables.getNoopRunnable();
@Override
public boolean hasMore()
{
try {
map = updateStream.pollFromQueue(queueWaitTime, TimeUnit.SECONDS);
return map != null;
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
@Override
public InputRow nextRow()
{
try {
DateTime date = TimestampParser.createTimestampParser(timeFormat)
.apply(map.get(updateStream.getTimeDimension()).toString());
return new MapBasedInputRow(
date.getMillis(),
new ArrayList(map.keySet()),
map
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
map = null;
}
}
@Override
public Runnable commit()
{
// ephemera in, ephemera out.
return doNothingRunnable; // reuse the same object each time
}
@Override
public void close() throws IOException
{
updateStream.stop();
}
};
}
}

View File

@ -0,0 +1,57 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.web;
import com.google.common.io.InputSupplier;
import com.metamx.emitter.EmittingLogger;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
public class WebJsonSupplier implements InputSupplier<BufferedReader>
{
private static final EmittingLogger log = new EmittingLogger(WebJsonSupplier.class);
private String urlString;
private URL url;
public WebJsonSupplier(String urlString)
{
this.urlString = urlString;
try {
this.url = new URL(urlString);
}
catch (Exception e) {
log.error(e,"Malformed url");
}
}
@Override
public BufferedReader getInput() throws IOException
{
URL url = new URL(urlString);
URLConnection connection = url.openConnection();
connection.setDoInput(true);
return new BufferedReader(new InputStreamReader(url.openStream()));
}
}

View File

@ -0,0 +1,111 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.web;
import com.google.common.io.InputSupplier;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class InputSupplierUpdateStreamTest
{
private final long waitTime = 1L;
private final TimeUnit unit = TimeUnit.SECONDS;
private final ArrayList<String> dimensions = new ArrayList<String>();
private InputSupplier testCaseSupplier;
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
String timeDimension;
@Before
public void setUp()
{
timeDimension = "time";
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"item2\":2,"
+ "\"time\":1372121562 }"
);
dimensions.add("item1");
dimensions.add("item2");
dimensions.add("time");
expectedAnswer.put("item1", "value1");
expectedAnswer.put("item2", 2);
expectedAnswer.put("time", 1372121562);
}
@Test
public void basicIngestionCheck() throws Exception
{
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
testCaseSupplier,
timeDimension
);
updateStream.start();
Map<String, Object> insertedRow = updateStream.pollFromQueue(waitTime, unit);
Assert.assertEquals(expectedAnswer, insertedRow);
}
//If a timestamp is missing, we should throw away the event
@Test
public void missingTimeStampCheck()
{
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"item2\":2}"
);
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
testCaseSupplier,
timeDimension
);
updateStream.start();
Assert.assertEquals(updateStream.getQueueSize(), 0);
}
//If any other value is missing, we should still add the event and process it properly
@Test
public void otherNullValueCheck() throws Exception
{
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"time\":1372121562 }"
);
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
testCaseSupplier,
timeDimension
);
updateStream.start();
Map<String, Object> insertedRow = updateStream.pollFromQueue(waitTime, unit);
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
expectedAnswer.put("item1", "value1");
expectedAnswer.put("time", 1372121562);
Assert.assertEquals(expectedAnswer, insertedRow);
}
}

View File

@ -0,0 +1,94 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.web;
import com.google.common.io.InputSupplier;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class RenamingKeysUpdateStreamTest
{
private final long waitTime = 15L;
private final TimeUnit unit = TimeUnit.SECONDS;
private InputSupplier testCaseSupplier;
String timeDimension;
@Before
public void setUp()
{
timeDimension = "time";
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"item2\":2,"
+ "\"time\":1372121562 }"
);
}
@Test
public void testPollFromQueue() throws Exception
{
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
Map<String, String> renamedKeys = new HashMap<String, String>();
renamedKeys.put("item1", "i1");
renamedKeys.put("item2", "i2");
renamedKeys.put("time", "t");
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
renamer.start();
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
expectedAnswer.put("i1", "value1");
expectedAnswer.put("i2", 2);
expectedAnswer.put("t", 1372121562);
Assert.assertEquals(expectedAnswer, renamer.pollFromQueue(waitTime, unit));
}
@Test
public void testGetTimeDimension() throws Exception
{
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
Map<String, String> renamedKeys = new HashMap<String, String>();
renamedKeys.put("item1", "i1");
renamedKeys.put("item2", "i2");
renamedKeys.put("time", "t");
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
Assert.assertEquals("t", renamer.getTimeDimension());
}
@Test
public void testMissingTimeRename() throws Exception
{
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
Map<String, String> renamedKeys = new HashMap<String, String>();
renamedKeys.put("item1", "i1");
renamedKeys.put("item2", "i2");
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
Assert.assertEquals("time", renamer.getTimeDimension());
}
}

View File

@ -0,0 +1,42 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.web;
import com.google.common.io.InputSupplier;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
public class TestCaseSupplier implements InputSupplier<BufferedReader>
{
private final String testString;
public TestCaseSupplier(String testString)
{
this.testString = testString;
}
@Override
public BufferedReader getInput() throws IOException
{
return new BufferedReader(new StringReader(testString));
}
}

View File

@ -0,0 +1,223 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.web;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.firehose.Firehose;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class WebFirehoseFactoryTest
{
private List<String> dimensions = Lists.newArrayList();
private WebFirehoseFactory webbie;
private WebFirehoseFactory webbie1;
@Before
public void setUp() throws Exception
{
dimensions.add("item1");
dimensions.add("item2");
dimensions.add("time");
webbie = new WebFirehoseFactory(
new UpdateStreamFactory()
{
@Override
public UpdateStream build()
{
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "1372121562"));
}
},
"posix"
);
webbie1 = new WebFirehoseFactory(
new UpdateStreamFactory()
{
@Override
public UpdateStream build()
{
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "1373241600000"));
}
},
"auto"
);
}
@Test
public void testDimensions() throws Exception
{
InputRow inputRow;
Firehose firehose = webbie.connect();
if (firehose.hasMore()) {
inputRow = firehose.nextRow();
} else {
throw new RuntimeException("queue is empty");
}
List<String> actualAnswer = inputRow.getDimensions();
Collections.sort(actualAnswer);
Assert.assertEquals(actualAnswer, dimensions);
}
@Test
public void testPosixTimeStamp() throws Exception
{
InputRow inputRow;
Firehose firehose = webbie.connect();
if (firehose.hasMore()) {
inputRow = firehose.nextRow();
} else {
throw new RuntimeException("queue is empty");
}
long expectedTime = 1372121562L * 1000L;
Assert.assertEquals(expectedTime, inputRow.getTimestampFromEpoch());
}
@Test
public void testISOTimeStamp() throws Exception
{
WebFirehoseFactory webbie3 = new WebFirehoseFactory(
new UpdateStreamFactory()
{
@Override
public UpdateStream build()
{
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "2013-07-08"));
}
},
"auto"
);
Firehose firehose1 = webbie3.connect();
if (firehose1.hasMore()) {
long milliSeconds = firehose1.nextRow().getTimestampFromEpoch();
DateTime date = new DateTime("2013-07-08");
Assert.assertEquals(date.getMillis(), milliSeconds);
} else {
Assert.assertFalse("hasMore returned false", true);
}
}
@Test
public void testAutoIsoTimeStamp() throws Exception
{
WebFirehoseFactory webbie2 = new WebFirehoseFactory(
new UpdateStreamFactory()
{
@Override
public UpdateStream build()
{
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "2013-07-08"));
}
},
null
);
Firehose firehose2 = webbie2.connect();
if (firehose2.hasMore()) {
long milliSeconds = firehose2.nextRow().getTimestampFromEpoch();
DateTime date = new DateTime("2013-07-08");
Assert.assertEquals(date.getMillis(), milliSeconds);
} else {
Assert.assertFalse("hasMore returned false", true);
}
}
@Test
public void testAutoMilliSecondsTimeStamp() throws Exception
{
Firehose firehose3 = webbie1.connect();
if (firehose3.hasMore()) {
long milliSeconds = firehose3.nextRow().getTimestampFromEpoch();
DateTime date = new DateTime("2013-07-08");
Assert.assertEquals(date.getMillis(), milliSeconds);
} else {
Assert.assertFalse("hasMore returned false", true);
}
}
@Test
public void testGetDimension() throws Exception
{
InputRow inputRow;
Firehose firehose = webbie1.connect();
if (firehose.hasMore()) {
inputRow = firehose.nextRow();
} else {
throw new RuntimeException("queue is empty");
}
List<String> column1 = Lists.newArrayList();
column1.add("value1");
Assert.assertEquals(column1, inputRow.getDimension("item1"));
}
@Test
public void testGetFloatMetric() throws Exception
{
InputRow inputRow;
Firehose firehose = webbie1.connect();
if (firehose.hasMore()) {
inputRow = firehose.nextRow();
} else {
throw new RuntimeException("queue is empty");
}
Assert.assertEquals((float) 2.0, inputRow.getFloatMetric("item2"), 0.0f);
}
private static class MyUpdateStream implements UpdateStream
{
private static ImmutableMap<String,Object> map;
public MyUpdateStream(ImmutableMap<String,Object> map){
this.map=map;
}
@Override
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
{
return map;
}
@Override
public String getTimeDimension()
{
return "time";
}
@Override
public void start()
{
}
@Override
public void stop()
{
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.web;
import org.junit.Test;
import java.net.UnknownHostException;
public class WebJsonSupplierTest
{
@Test(expected = UnknownHostException.class)
public void checkInvalidUrl() throws Exception
{
String invalidURL = "http://invalid.url";
WebJsonSupplier supplier = new WebJsonSupplier(invalidURL);
supplier.getInput();
}
}

View File

@ -30,7 +30,6 @@ import com.metamx.druid.Query;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.indexing.common.TaskLock;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolbox;
@ -38,6 +37,7 @@ import com.metamx.druid.indexing.common.actions.LockAcquireAction;
import com.metamx.druid.indexing.common.actions.LockListAction;
import com.metamx.druid.indexing.common.actions.LockReleaseAction;
import com.metamx.druid.indexing.common.actions.SegmentInsertAction;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.query.FinalizeResultsQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
@ -226,6 +226,28 @@ public class RealtimeIndexTask extends AbstractTask
toolbox.getTaskActionClient().submit(new LockReleaseAction(segment.getInterval()));
}
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval()));
}
toolbox.getSegmentAnnouncer().announceSegments(segments);
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
try {
toolbox.getSegmentAnnouncer().unannounceSegments(segments);
}
finally {
for (DataSegment segment : segments) {
toolbox.getTaskActionClient().submit(new LockReleaseAction(segment.getInterval()));
}
}
}
};
// NOTE: getVersion will block if there is lock contention, which will block plumber.getSink

View File

@ -106,9 +106,7 @@ public class TaskMasterLifecycle
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle);
leaderLifecycle.addManagedInstance(taskConsumer);
if (indexerCoordinatorConfig.isAutoScalingEnabled()) {
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
}
try {
leaderLifecycle.start();

View File

@ -55,10 +55,6 @@ import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.RedirectFilter;
import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
@ -93,12 +89,17 @@ import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.indexing.coordinator.scaling.AutoScalingStrategy;
import com.metamx.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy;
import com.metamx.druid.indexing.coordinator.scaling.NoopAutoScalingStrategy;
import com.metamx.druid.indexing.coordinator.scaling.NoopResourceManagementScheduler;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerConfig;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy;
import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagmentConfig;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
@ -687,6 +688,16 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
private void initializeResourceManagement(final JacksonConfigManager configManager)
{
if (resourceManagementSchedulerFactory == null) {
if (!config.isAutoScalingEnabled()) {
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
{
@Override
public ResourceManagementScheduler build(TaskRunner runner)
{
return new NoopResourceManagementScheduler();
}
};
} else {
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
{
@Override
@ -736,6 +747,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
};
}
}
}
public static class Builder
{

View File

@ -0,0 +1,52 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.coordinator.scaling;
import com.metamx.common.logger.Logger;
/**
*/
public class NoopResourceManagementScheduler extends ResourceManagementScheduler
{
private static final Logger log = new Logger(NoopResourceManagementScheduler.class);
public NoopResourceManagementScheduler()
{
super(null, null, null, null);
}
@Override
public void start()
{
log.info("Autoscaling is disabled.");
}
@Override
public void stop()
{
// do nothing
}
@Override
public ScalingStats getStats()
{
return new ScalingStats(0);
}
}

View File

@ -39,7 +39,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.22.3</metamx.java-util.version>
<apache.curator.version>2.0.2-21-22</apache.curator.version>
<apache.curator.version>2.1.0-incubating</apache.curator.version>
</properties>
<modules>

View File

@ -21,6 +21,7 @@ package com.metamx.druid.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
@ -37,6 +38,7 @@ import org.apache.curator.utils.ZKPaths;
import java.io.File;
import java.io.IOException;
import java.util.List;
/**
*/
@ -103,7 +105,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
if (config.isLoadFromSegmentCacheEnabled()) {
loadCache();
}
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
@ -192,12 +196,14 @@ public class ZkCoordinator implements DataSegmentChangeHandler
return;
}
List<DataSegment> cachedSegments = Lists.newArrayList();
for (File file : baseDir.listFiles()) {
log.info("Loading segment cache file [%s]", file);
try {
DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
if (serverManager.isSegmentCached(segment)) {
addSegment(segment);
cachedSegments.add(segment);
//addSegment(segment);
} else {
log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier());
@ -213,6 +219,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler
.emit();
}
}
addSegments(cachedSegments);
}
@Override
@ -222,6 +230,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
serverManager.loadSegment(segment);
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
if (!segmentInfoCacheFile.exists()) {
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
@ -231,6 +240,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
}
try {
announcer.announceSegment(segment);
@ -239,6 +249,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
removeSegment(segment);
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
}
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segment for dataSource")
@ -247,6 +258,42 @@ public class ZkCoordinator implements DataSegmentChangeHandler
}
}
public void addSegments(Iterable<DataSegment> segments)
{
try {
for (DataSegment segment : segments) {
serverManager.loadSegment(segment);
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
if (!segmentInfoCacheFile.exists()) {
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
removeSegment(segment);
throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
}
}
try {
announcer.announceSegments(segments);
}
catch (IOException e) {
removeSegments(segments);
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments);
}
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segments for dataSource")
.addData("segments", segments)
.emit();
}
}
@Override
public void removeSegment(DataSegment segment)
{
@ -266,4 +313,25 @@ public class ZkCoordinator implements DataSegmentChangeHandler
.emit();
}
}
public void removeSegments(Iterable<DataSegment> segments)
{
try {
for (DataSegment segment : segments) {
serverManager.dropSegment(segment);
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
}
announcer.unannounceSegments(segments);
}
catch (Exception e) {
log.makeAlert(e, "Failed to remove segments")
.addData("segments", segments)
.emit();
}
}
}

View File

@ -29,4 +29,10 @@ public abstract class ZkCoordinatorConfig
{
@Config("druid.paths.segmentInfoCache")
public abstract File getSegmentInfoCacheDirectory();
@Config("druid.segmentCache.enable")
public boolean isLoadFromSegmentCacheEnabled()
{
return true;
}
}

View File

@ -443,17 +443,17 @@ public class DruidMaster
curator, ZKPaths.makePath(zkPaths.getMasterPath(), MASTER_OWNER_NODE), config.getHost()
);
newLeaderLatch.attachListener(
newLeaderLatch.addListener(
new LeaderLatchListener()
{
@Override
public void becomeMaster()
public void isLeader()
{
DruidMaster.this.becomeMaster();
}
@Override
public void stopBeingMaster()
public void notLeader()
{
DruidMaster.this.stopBeingMaster();
}

View File

@ -1,6 +1,5 @@
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<?xml version="1.0"?>
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>bin</id>
<formats>
<format>tar.gz</format>
@ -13,6 +12,34 @@
</includes>
<outputDirectory>config</outputDirectory>
</fileSet>
<fileSet>
<directory>../examples/config/broker</directory>
<includes>
<include>*</include>
</includes>
<outputDirectory>config/broker</outputDirectory>
</fileSet>
<fileSet>
<directory>../examples/config/master</directory>
<includes>
<include>*</include>
</includes>
<outputDirectory>config/master</outputDirectory>
</fileSet>
<fileSet>
<directory>../examples/config/realtime</directory>
<includes>
<include>*</include>
</includes>
<outputDirectory>config/realtime</outputDirectory>
</fileSet>
<fileSet>
<directory>../examples/config/compute</directory>
<includes>
<include>*</include>
</includes>
<outputDirectory>config/compute</outputDirectory>
</fileSet>
<fileSet>
<directory>../examples/bin</directory>
<includes>
@ -28,6 +55,13 @@
</includes>
<outputDirectory>lib</outputDirectory>
</fileSet>
<fileSet>
<directory>../services/target</directory>
<includes>
<include>druid-services-*-selfcontained.jar</include>
</includes>
<outputDirectory>lib</outputDirectory>
</fileSet>
<fileSet>
<directory>../examples/bin/examples</directory>
<includes>

6
upload.sh Normal file
View File

@ -0,0 +1,6 @@
#!/bin/bash -e
#
# Script to upload tarball of assembly build to static.druid.io for serving
#
s3cmd put services/target/druid-services-*-bin.tar.gz s3://static.druid.io/artifacts/