intervals)
@@ -149,6 +157,11 @@ public class RealtimeManager implements QuerySegmentWalker
}
}
+ public FireDepartmentMetrics getMetrics()
+ {
+ return metrics;
+ }
+
@Override
public void run()
{
@@ -186,11 +199,11 @@ public class RealtimeManager implements QuerySegmentWalker
}
int currCount = sink.add(inputRow);
- metrics.incrementProcessed();
if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
+ metrics.incrementProcessed();
}
catch (FormattedException e) {
log.info(e, "unparseable line: %s", e.getDetails());
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/SegmentPublisher.java b/realtime/src/main/java/com/metamx/druid/realtime/SegmentPublisher.java
index 48315849921..a381e5ab6a3 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/SegmentPublisher.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/SegmentPublisher.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
package com.metamx.druid.realtime;
import com.metamx.druid.client.DataSegment;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java
index c9d9dfec3de..937947bfb34 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java
@@ -29,7 +29,8 @@ import java.io.IOException;
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class),
@JsonSubTypes.Type(name = "rabbitmq", value = RabbitMQFirehoseFactory.class),
@JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class),
- @JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class)
+ @JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class),
+ @JsonSubTypes.Type(name = "irc", value = IrcFirehoseFactory.class)
})
public interface FirehoseFactory
{
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcDecoder.java
new file mode 100644
index 00000000000..3ce230fbcfe
--- /dev/null
+++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcDecoder.java
@@ -0,0 +1,15 @@
+package com.metamx.druid.realtime.firehose;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.metamx.druid.input.InputRow;
+import org.joda.time.DateTime;
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes({
+ @JsonSubTypes.Type(name = "wikipedia", value = WikipediaIrcDecoder.class)
+ })
+public interface IrcDecoder
+{
+ public InputRow decodeMessage(DateTime timestamp, String channel, String msg);
+}
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java
new file mode 100644
index 00000000000..aa9f47c8c81
--- /dev/null
+++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java
@@ -0,0 +1,246 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.realtime.firehose;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import com.ircclouds.irc.api.Callback;
+import com.ircclouds.irc.api.IRCApi;
+import com.ircclouds.irc.api.IRCApiImpl;
+import com.ircclouds.irc.api.IServerParameters;
+import com.ircclouds.irc.api.domain.IRCServer;
+import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg;
+import com.ircclouds.irc.api.listeners.VariousMessageListenerAdapter;
+import com.ircclouds.irc.api.state.IIRCState;
+import com.metamx.common.Pair;
+import com.metamx.common.logger.Logger;
+import com.metamx.druid.input.InputRow;
+import org.joda.time.DateTime;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Example Usage
+ *
+ * Decoder definition: wikipedia-decoder.json
+ * {@code
+ *
+ * {
+ * "type": "wikipedia",
+ * "namespaces": {
+ * "#en.wikipedia": {
+ * "": "main",
+ * "Category": "category",
+ * "Template talk": "template talk",
+ * "Help talk": "help talk",
+ * "Media": "media",
+ * "MediaWiki talk": "mediawiki talk",
+ * "File talk": "file talk",
+ * "MediaWiki": "mediawiki",
+ * "User": "user",
+ * "File": "file",
+ * "User talk": "user talk",
+ * "Template": "template",
+ * "Help": "help",
+ * "Special": "special",
+ * "Talk": "talk",
+ * "Category talk": "category talk"
+ * }
+ * },
+ * "geoIpDatabase": "path/to/GeoLite2-City.mmdb"
+ * }
+ * }
+ *
+ * Example code:
+ * {@code
+ * IrcDecoder wikipediaDecoder = new ObjectMapper().readValue(
+ * new File("wikipedia-decoder.json"),
+ * IrcDecoder.class
+ * );
+ *
+ * IrcFirehoseFactory factory = new IrcFirehoseFactory(
+ * "wiki123",
+ * "irc.wikimedia.org",
+ * Lists.newArrayList(
+ * "#en.wikipedia",
+ * "#fr.wikipedia",
+ * "#de.wikipedia",
+ * "#ja.wikipedia"
+ * ),
+ * wikipediaDecoder
+ * );
+ * }
+ */
+public class IrcFirehoseFactory implements FirehoseFactory
+{
+ private static final Logger log = new Logger(IrcFirehoseFactory.class);
+
+ private final String nick;
+ private final String host;
+ private final List channels;
+ private final IrcDecoder decoder;
+
+ @JsonCreator
+ public IrcFirehoseFactory(
+ @JsonProperty String nick,
+ @JsonProperty String host,
+ @JsonProperty List channels,
+ @JsonProperty IrcDecoder decoder
+ )
+ {
+ this.nick = nick;
+ this.host = host;
+ this.channels = channels;
+ this.decoder = decoder;
+ }
+
+ @Override
+ public Firehose connect() throws IOException
+ {
+ final IRCApi irc = new IRCApiImpl(false);
+ final LinkedBlockingQueue> queue = new LinkedBlockingQueue>();
+
+ irc.addListener(new VariousMessageListenerAdapter() {
+ @Override
+ public void onChannelMessage(ChannelPrivMsg aMsg)
+ {
+ try {
+ queue.put(Pair.of(DateTime.now(), aMsg));
+ } catch(InterruptedException e) {
+ throw new RuntimeException("interrupted adding message to queue", e);
+ }
+ }
+ });
+
+ log.info("connecting to irc server [%s]", host);
+ irc.connect(
+ new IServerParameters()
+ {
+ @Override
+ public String getNickname()
+ {
+ return nick;
+ }
+
+ @Override
+ public List getAlternativeNicknames()
+ {
+ return Lists.newArrayList(nick + "_",
+ nick + "__",
+ nick + "___");
+ }
+
+ @Override
+ public String getIdent()
+ {
+ return "druid";
+ }
+
+ @Override
+ public String getRealname()
+ {
+ return nick;
+ }
+
+ @Override
+ public IRCServer getServer()
+ {
+ return new IRCServer(host, false);
+ }
+ },
+ new Callback()
+ {
+ @Override
+ public void onSuccess(IIRCState aObject)
+ {
+ log.info("irc connection to server [%s] established", host);
+ for(String chan : channels) {
+ log.info("Joining channel %s", chan);
+ irc.joinChannel(chan);
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e)
+ {
+ log.error(e, "Unable to connect to irc server [%s]", host);
+ throw new RuntimeException("Unable to connect to server", e);
+ }
+ });
+
+
+ return new Firehose()
+ {
+ InputRow nextRow = null;
+
+ @Override
+ public boolean hasMore()
+ {
+ try {
+ while(true) {
+ Pair nextMsg = queue.take();
+ try {
+ nextRow = decoder.decodeMessage(nextMsg.lhs, nextMsg.rhs.getChannelName(), nextMsg.rhs.getText());
+ if(nextRow != null) return true;
+ }
+ catch (IllegalArgumentException iae) {
+ log.debug("ignoring invalid message in channel [%s]", nextMsg.rhs.getChannelName());
+ }
+ }
+ }
+ catch(InterruptedException e) {
+ Thread.interrupted();
+ throw new RuntimeException("interrupted retrieving elements from queue", e);
+ }
+ }
+
+ @Override
+ public InputRow nextRow()
+ {
+ return nextRow;
+ }
+
+ @Override
+ public Runnable commit()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ // nothing to see here
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ log.info("disconnecting from irc server [%s]", host);
+ irc.disconnect("");
+ }
+ };
+ }
+}
+
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java
new file mode 100644
index 00000000000..b1ecf91554e
--- /dev/null
+++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java
@@ -0,0 +1,222 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.realtime.firehose;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.maxmind.geoip2.DatabaseReader;
+import com.maxmind.geoip2.exception.GeoIp2Exception;
+import com.maxmind.geoip2.model.Omni;
+import com.metamx.common.logger.Logger;
+import com.metamx.druid.input.InputRow;
+import org.apache.commons.io.FileUtils;
+import org.joda.time.DateTime;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.GZIPInputStream;
+
+class WikipediaIrcDecoder implements IrcDecoder
+{
+ static final Logger log = new Logger(WikipediaIrcDecoder.class);
+
+ final DatabaseReader geoLookup;
+
+ static final Pattern pattern = Pattern.compile(
+ "\\x0314\\[\\[\\x0307(.+?)\\x0314\\]\\]\\x034 (.*?)\\x0310.*\\x0302(http.+?)\\x03.+\\x0303(.+?)\\x03.+\\x03 (\\(([+-]\\d+)\\).*|.+) \\x0310(.+)\\x03"
+ );
+ static final Pattern ipPattern = Pattern.compile("\\d+.\\d+.\\d+.\\d+");
+ static final Pattern shortnamePattern = Pattern.compile("#(\\w\\w)\\..*");
+
+ static final List dimensionList = Lists.newArrayList(
+ "page",
+ "language",
+ "user",
+ "unpatrolled",
+ "newPage",
+ "robot",
+ "anonymous",
+ "namespace",
+ "continent",
+ "country",
+ "region",
+ "city"
+ );
+
+ final Map> namespaces;
+
+ public WikipediaIrcDecoder( Map> namespaces) {
+ this(namespaces, null);
+ }
+
+ @JsonCreator
+ public WikipediaIrcDecoder(@JsonProperty("namespaces") Map> namespaces,
+ @JsonProperty("geoIpDatabase") String geoIpDatabase)
+ {
+ if(namespaces == null) {
+ namespaces = Maps.newHashMap();
+ }
+ this.namespaces = namespaces;
+
+
+ File geoDb;
+ if(geoIpDatabase != null) {
+ geoDb = new File(geoIpDatabase);
+ } else {
+ try {
+ String tmpDir = System.getProperty("java.io.tmpdir");
+ geoDb = new File(tmpDir, this.getClass().getCanonicalName() + ".GeoLite2-City.mmdb");
+ if(!geoDb.exists()) {
+ log.info("Downloading geo ip database to [%s]", geoDb);
+
+ FileUtils.copyInputStreamToFile(
+ new GZIPInputStream(
+ new URL("http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz").openStream()
+ ),
+ geoDb
+ );
+ }
+ } catch(IOException e) {
+ throw new RuntimeException("Unable to download geo ip database [%s]", e);
+ }
+ }
+ try {
+ geoLookup = new DatabaseReader(geoDb);
+ } catch(IOException e) {
+ throw new RuntimeException("Unable to open geo ip lookup database", e);
+ }
+ }
+
+ @Override
+ public InputRow decodeMessage(final DateTime timestamp, String channel, String msg)
+ {
+ final Map dimensions = Maps.newHashMap();
+ final Map metrics = Maps.newHashMap();
+
+ Matcher m = pattern.matcher(msg);
+ if(!m.matches()) {
+ throw new IllegalArgumentException("Invalid input format");
+ }
+
+ Matcher shortname = shortnamePattern.matcher(channel);
+ if(shortname.matches()) {
+ dimensions.put("language", shortname.group(1));
+ }
+
+ String page = m.group(1);
+ String pageUrl = page.replaceAll("\\s", "_");
+
+ dimensions.put("page", pageUrl);
+
+ String user = m.group(4);
+ Matcher ipMatch = ipPattern.matcher(user);
+ boolean anonymous = ipMatch.matches();
+ if(anonymous) {
+ try {
+ final InetAddress ip = InetAddress.getByName(ipMatch.group());
+ final Omni lookup = geoLookup.omni(ip);
+
+ dimensions.put("continent", lookup.getContinent().getName());
+ dimensions.put("country", lookup.getCountry().getName());
+ dimensions.put("region", lookup.getMostSpecificSubdivision().getName());
+ dimensions.put("city", lookup.getCity().getName());
+ } catch(UnknownHostException e) {
+ log.error(e, "invalid ip [%s]", ipMatch.group());
+ } catch(IOException e) {
+ log.error(e, "error looking up geo ip");
+ } catch(GeoIp2Exception e) {
+ log.error(e, "error looking up geo ip");
+ }
+ }
+ dimensions.put("user", user);
+
+ final String flags = m.group(2);
+ dimensions.put("unpatrolled", Boolean.toString(flags.contains("!")));
+ dimensions.put("newPage", Boolean.toString(flags.contains("N")));
+ dimensions.put("robot", Boolean.toString(flags.contains("B")));
+
+ dimensions.put("anonymous", Boolean.toString(anonymous));
+
+ String[] parts = page.split(":");
+ if(parts.length > 1 && !parts[1].startsWith(" ")) {
+ Map channelNamespaces = namespaces.get(channel);
+ if(channelNamespaces != null && channelNamespaces.containsKey(parts[0])) {
+ dimensions.put("namespace", channelNamespaces.get(parts[0]));
+ } else {
+ dimensions.put("namespace", "wikipedia");
+ }
+ }
+ else {
+ dimensions.put("namespace", "article");
+ }
+
+ float delta = m.group(6) != null ? Float.parseFloat(m.group(6)) : 0;
+ metrics.put("delta", delta);
+ metrics.put("added", Math.max(delta, 0));
+ metrics.put("deleted", Math.min(delta, 0));
+
+ return new InputRow()
+ {
+ @Override
+ public List getDimensions()
+ {
+ return dimensionList;
+ }
+
+ @Override
+ public long getTimestampFromEpoch()
+ {
+ return timestamp.getMillis();
+ }
+
+ @Override
+ public List getDimension(String dimension)
+ {
+ return ImmutableList.of(dimensions.get(dimension));
+ }
+
+ @Override
+ public float getFloatMetric(String metric)
+ {
+ return metrics.get(metric);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "WikipediaRow{" +
+ "timestamp=" + timestamp +
+ ", dimensions=" + dimensions +
+ ", metrics=" + metrics +
+ '}';
+ }
+ };
+ }
+}
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java
index 4ad3f123299..c052deeec98 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
package com.metamx.druid.realtime.plumber;
import org.joda.time.Interval;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java
index 117fa6a40eb..57ba07a76cc 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java
index 59a3e24cb21..27bef8b8020 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java
index 847c917dc35..61f4308a15a 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java
index 3557a8ba3bc..b97700699d4 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java
index a1823b6c09a..bc0bc194f99 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java
@@ -100,6 +100,11 @@ public class Sink implements Iterable
return interval;
}
+ public FireHydrant getCurrIndex()
+ {
+ return currIndex;
+ }
+
public int add(InputRow row)
{
if (currIndex == null) {
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java
index 5fe790dd284..36ab3830f6e 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
package com.metamx.druid.realtime.plumber;
import com.fasterxml.jackson.annotation.JsonSubTypes;
diff --git a/realtime/src/test/java/com/metamx/druid/realtime/RealtimeManagerTest.java b/realtime/src/test/java/com/metamx/druid/realtime/RealtimeManagerTest.java
new file mode 100644
index 00000000000..1203a0e6ba4
--- /dev/null
+++ b/realtime/src/test/java/com/metamx/druid/realtime/RealtimeManagerTest.java
@@ -0,0 +1,252 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.realtime;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.metamx.common.ISE;
+import com.metamx.druid.Query;
+import com.metamx.druid.QueryGranularity;
+import com.metamx.druid.aggregation.AggregatorFactory;
+import com.metamx.druid.aggregation.CountAggregatorFactory;
+import com.metamx.druid.guava.Runnables;
+import com.metamx.druid.index.v1.SpatialDimensionSchema;
+import com.metamx.druid.input.InputRow;
+import com.metamx.druid.query.QueryRunner;
+import com.metamx.druid.realtime.firehose.Firehose;
+import com.metamx.druid.realtime.firehose.FirehoseFactory;
+import com.metamx.druid.realtime.plumber.Plumber;
+import com.metamx.druid.realtime.plumber.PlumberSchool;
+import com.metamx.druid.realtime.plumber.Sink;
+import com.metamx.druid.shard.NoneShardSpec;
+import junit.framework.Assert;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class RealtimeManagerTest
+{
+ private RealtimeManager realtimeManager;
+ private Schema schema;
+ private TestPlumber plumber;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ schema = new Schema(
+ "test",
+ Lists.newArrayList(),
+ new AggregatorFactory[]{new CountAggregatorFactory("rows")},
+ QueryGranularity.NONE,
+ new NoneShardSpec()
+ );
+
+ final List rows = Arrays.asList(
+ makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis())
+ );
+
+ plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, new DateTime().toString()));
+
+ realtimeManager = new RealtimeManager(
+ Arrays.asList(
+ new FireDepartment(
+ schema,
+ new FireDepartmentConfig(1, new Period("P1Y")),
+ new FirehoseFactory()
+ {
+ @Override
+ public Firehose connect() throws IOException
+ {
+ return new TestFirehose(rows.iterator());
+ }
+ },
+ new PlumberSchool()
+ {
+ @Override
+ public Plumber findPlumber(
+ Schema schema, FireDepartmentMetrics metrics
+ )
+ {
+ return plumber;
+ }
+ }
+ )
+ ),
+ null
+ );
+ }
+
+ @Test
+ public void testRun() throws Exception
+ {
+ realtimeManager.start();
+
+ Stopwatch stopwatch = new Stopwatch().start();
+ while (realtimeManager.getMetrics("test").processed() != 1) {
+ Thread.sleep(100);
+ if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
+ throw new ISE("Realtime manager should have completed processing 2 events!");
+ }
+ }
+
+ Assert.assertEquals(1, realtimeManager.getMetrics("test").processed());
+ Assert.assertEquals(1, realtimeManager.getMetrics("test").thrownAway());
+ Assert.assertTrue(plumber.isStartedJob());
+ Assert.assertTrue(plumber.isFinishedJob());
+ Assert.assertEquals(1, plumber.getPersistCount());
+ }
+
+ private InputRow makeRow(final long timestamp)
+ {
+ return new InputRow()
+ {
+ @Override
+ public List getDimensions()
+ {
+ return Arrays.asList("testDim");
+ }
+
+ @Override
+ public long getTimestampFromEpoch()
+ {
+ return timestamp;
+ }
+
+ @Override
+ public List getDimension(String dimension)
+ {
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public float getFloatMetric(String metric)
+ {
+ return 0;
+ }
+ };
+ }
+
+
+ private static class TestFirehose implements Firehose
+ {
+ private final Iterator rows;
+
+ private TestFirehose(Iterator rows)
+ {
+ this.rows = rows;
+ }
+
+ @Override
+ public boolean hasMore()
+ {
+ return rows.hasNext();
+ }
+
+ @Override
+ public InputRow nextRow()
+ {
+ return rows.next();
+ }
+
+ @Override
+ public Runnable commit()
+ {
+ return Runnables.getNoopRunnable();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ }
+ }
+
+ private static class TestPlumber implements Plumber
+ {
+ private final Sink sink;
+
+
+ private volatile boolean startedJob = false;
+ private volatile boolean finishedJob = false;
+ private volatile int persistCount = 0;
+
+ private TestPlumber(Sink sink)
+ {
+ this.sink = sink;
+ }
+
+ private boolean isStartedJob()
+ {
+ return startedJob;
+ }
+
+ private boolean isFinishedJob()
+ {
+ return finishedJob;
+ }
+
+ private int getPersistCount()
+ {
+ return persistCount;
+ }
+
+ @Override
+ public void startJob()
+ {
+ startedJob = true;
+ }
+
+ @Override
+ public Sink getSink(long timestamp)
+ {
+ if (sink.getInterval().contains(timestamp)) {
+ return sink;
+ }
+ return null;
+ }
+
+ @Override
+ public QueryRunner getQueryRunner(Query query)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void persist(Runnable commitRunnable)
+ {
+ persistCount++;
+ }
+
+ @Override
+ public void finishJob()
+ {
+ finishedJob = true;
+ }
+ }
+}
diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicyTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicyTest.java
new file mode 100644
index 00000000000..dcf92019428
--- /dev/null
+++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicyTest.java
@@ -0,0 +1,37 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.realtime.plumber;
+
+import junit.framework.Assert;
+import org.joda.time.Interval;
+import org.junit.Test;
+
+/**
+ */
+public class IntervalStartVersioningPolicyTest
+{
+ @Test
+ public void testGetVersion() throws Exception
+ {
+ IntervalStartVersioningPolicy policy = new IntervalStartVersioningPolicy();
+ String version = policy.getVersion(new Interval("2013-01-01/2013-01-02"));
+ Assert.assertEquals("2013-01-01T00:00:00.000Z", version);
+ }
+}
diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java
new file mode 100644
index 00000000000..87a6b9e9536
--- /dev/null
+++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java
@@ -0,0 +1,46 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.realtime.plumber;
+
+import junit.framework.Assert;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.Test;
+
+/**
+ */
+public class MessageTimeRejectionPolicyFactoryTest
+{
+ @Test
+ public void testAccept() throws Exception
+ {
+ Period period = new Period("PT10M");
+ RejectionPolicy rejectionPolicy = new MessageTimeRejectionPolicyFactory().create(period);
+
+ DateTime now = new DateTime();
+ DateTime past = now.minus(period).minus(1);
+ DateTime future = now.plus(period).plus(1);
+
+ Assert.assertTrue(rejectionPolicy.accept(now.getMillis()));
+ Assert.assertFalse(rejectionPolicy.accept(past.getMillis()));
+ Assert.assertTrue(rejectionPolicy.accept(future.getMillis()));
+ Assert.assertFalse(rejectionPolicy.accept(now.getMillis()));
+ }
+}
diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchoolTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchoolTest.java
new file mode 100644
index 00000000000..a93308c4fa6
--- /dev/null
+++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -0,0 +1,162 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.realtime.plumber;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.metamx.common.ISE;
+import com.metamx.druid.Query;
+import com.metamx.druid.QueryGranularity;
+import com.metamx.druid.aggregation.AggregatorFactory;
+import com.metamx.druid.aggregation.CountAggregatorFactory;
+import com.metamx.druid.client.DataSegment;
+import com.metamx.druid.client.ServerView;
+import com.metamx.druid.coordination.DataSegmentAnnouncer;
+import com.metamx.druid.index.v1.IndexGranularity;
+import com.metamx.druid.index.v1.SpatialDimensionSchema;
+import com.metamx.druid.loading.DataSegmentPusher;
+import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import com.metamx.druid.query.QueryRunnerFactory;
+import com.metamx.druid.realtime.FireDepartmentMetrics;
+import com.metamx.druid.realtime.Schema;
+import com.metamx.druid.realtime.SegmentPublisher;
+import com.metamx.druid.shard.NoneShardSpec;
+import com.metamx.emitter.service.ServiceEmitter;
+import junit.framework.Assert;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class RealtimePlumberSchoolTest
+{
+ private Plumber plumber;
+
+ private DataSegmentAnnouncer announcer;
+ private SegmentPublisher segmentPublisher;
+ private DataSegmentPusher dataSegmentPusher;
+ private ServerView serverView;
+ private ServiceEmitter emitter;
+
+ @Before
+ public void setUp() throws Exception
+ {
+
+ final File tmpDir = Files.createTempDir();
+ tmpDir.deleteOnExit();
+
+ final Schema schema = new Schema(
+ "test",
+ Lists.newArrayList(),
+ new AggregatorFactory[]{new CountAggregatorFactory("rows")},
+ QueryGranularity.NONE,
+ new NoneShardSpec()
+ );
+
+ RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
+ new Period("PT10m"),
+ tmpDir,
+ IndexGranularity.HOUR
+ );
+
+ announcer = EasyMock.createMock(DataSegmentAnnouncer.class);
+ announcer.announceSegment(EasyMock.anyObject());
+ EasyMock.expectLastCall().anyTimes();
+
+ segmentPublisher = EasyMock.createMock(SegmentPublisher.class);
+ dataSegmentPusher = EasyMock.createMock(DataSegmentPusher.class);
+
+ serverView = EasyMock.createMock(ServerView.class);
+ serverView.registerSegmentCallback(
+ EasyMock.anyObject(),
+ EasyMock.anyObject()
+ );
+ EasyMock.expectLastCall().anyTimes();
+
+ emitter = EasyMock.createMock(ServiceEmitter.class);
+
+ EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
+
+ realtimePlumberSchool.setConglomerate(new DefaultQueryRunnerFactoryConglomerate(Maps., QueryRunnerFactory>newHashMap()));
+ realtimePlumberSchool.setSegmentAnnouncer(announcer);
+ realtimePlumberSchool.setSegmentPublisher(segmentPublisher);
+ realtimePlumberSchool.setRejectionPolicyFactory(new NoopRejectionPolicyFactory());
+ realtimePlumberSchool.setVersioningPolicy(new IntervalStartVersioningPolicy());
+ realtimePlumberSchool.setDataSegmentPusher(dataSegmentPusher);
+ realtimePlumberSchool.setServerView(serverView);
+ realtimePlumberSchool.setServiceEmitter(emitter);
+
+ plumber = realtimePlumberSchool.findPlumber(schema, new FireDepartmentMetrics());
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
+ }
+
+ @Test
+ public void testGetSink() throws Exception
+ {
+ final DateTime theTime = new DateTime("2013-01-01");
+ Sink sink = plumber.getSink(theTime.getMillis());
+
+ Assert.assertEquals(new Interval(String.format("%s/PT1H", theTime.toString())), sink.getInterval());
+ Assert.assertEquals(theTime.toString(), sink.getVersion());
+ }
+
+ @Test
+ public void testPersist() throws Exception
+ {
+ final MutableBoolean committed = new MutableBoolean(false);
+ plumber.startJob();
+ plumber.persist(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ committed.setValue(true);
+ }
+ }
+ );
+
+ Stopwatch stopwatch = new Stopwatch().start();
+ while (!committed.booleanValue()) {
+ Thread.sleep(100);
+ if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
+ throw new ISE("Taking too long to set perist value");
+ }
+ }
+ plumber.finishJob();
+ }
+}
diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java
new file mode 100644
index 00000000000..97f99fd396a
--- /dev/null
+++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java
@@ -0,0 +1,44 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.realtime.plumber;
+
+import junit.framework.Assert;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.Test;
+
+/**
+ */
+public class ServerTimeRejectionPolicyFactoryTest
+{
+ @Test
+ public void testAccept() throws Exception
+ {
+ Period period = new Period("PT10M");
+
+ RejectionPolicy rejectionPolicy = new ServerTimeRejectionPolicyFactory().create(period);
+
+ DateTime now = new DateTime();
+ DateTime past = now.minus(period).minus(1);
+
+ Assert.assertTrue(rejectionPolicy.accept(now.getMillis()));
+ Assert.assertFalse(rejectionPolicy.accept(past.getMillis()));
+ }
+}
diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/SinkTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/SinkTest.java
new file mode 100644
index 00000000000..27b06609a0d
--- /dev/null
+++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/SinkTest.java
@@ -0,0 +1,124 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.realtime.plumber;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.metamx.druid.QueryGranularity;
+import com.metamx.druid.aggregation.AggregatorFactory;
+import com.metamx.druid.aggregation.CountAggregatorFactory;
+import com.metamx.druid.index.v1.SpatialDimensionSchema;
+import com.metamx.druid.input.InputRow;
+import com.metamx.druid.realtime.FireHydrant;
+import com.metamx.druid.realtime.Schema;
+import com.metamx.druid.shard.NoneShardSpec;
+import junit.framework.Assert;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ */
+public class SinkTest
+{
+ @Test
+ public void testSwap() throws Exception
+ {
+ final Schema schema = new Schema(
+ "test",
+ Lists.newArrayList(),
+ new AggregatorFactory[]{new CountAggregatorFactory("rows")},
+ QueryGranularity.MINUTE,
+ new NoneShardSpec()
+ );
+
+ final Interval interval = new Interval("2013-01-01/2013-01-02");
+ final String version = new DateTime().toString();
+ final Sink sink = new Sink(interval, schema, version);
+
+ sink.add(new InputRow()
+ {
+ @Override
+ public List getDimensions()
+ {
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public long getTimestampFromEpoch()
+ {
+ return new DateTime("2013-01-01").getMillis();
+ }
+
+ @Override
+ public List getDimension(String dimension)
+ {
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public float getFloatMetric(String metric)
+ {
+ return 0;
+ }
+ });
+
+ FireHydrant currHydrant = sink.getCurrIndex();
+ Assert.assertEquals(new Interval("2013-01-01/PT1M"), currHydrant.getIndex().getInterval());
+
+
+ FireHydrant swapHydrant = sink.swap();
+
+ sink.add(new InputRow()
+ {
+ @Override
+ public List getDimensions()
+ {
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public long getTimestampFromEpoch()
+ {
+ return new DateTime("2013-01-01").getMillis();
+ }
+
+ @Override
+ public List getDimension(String dimension)
+ {
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public float getFloatMetric(String metric)
+ {
+ return 0;
+ }
+ });
+
+ Assert.assertEquals(currHydrant, swapHydrant);
+ Assert.assertNotSame(currHydrant, sink.getCurrIndex());
+ Assert.assertEquals(new Interval("2013-01-01/PT1M"), sink.getCurrIndex().getIndex().getInterval());
+
+ Assert.assertEquals(2, Iterators.size(sink.iterator()));
+ }
+}
diff --git a/server/pom.xml b/server/pom.xml
index 796926104ed..1f45e161f71 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.42-SNAPSHOT
+ 0.5.45-SNAPSHOT
diff --git a/services/pom.xml b/services/pom.xml
index 39ec814e8c6..8f366b1d326 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -24,11 +24,11 @@
druid-services
druid-services
druid-services
- 0.5.42-SNAPSHOT
+ 0.5.45-SNAPSHOT
com.metamx
druid
- 0.5.42-SNAPSHOT
+ 0.5.45-SNAPSHOT