From 8ccac2f13d670fd9c4069079c007d9eb1378ae72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 13 Aug 2013 18:35:07 -0700 Subject: [PATCH 01/20] basic wikipedia firehose factory --- pom.xml | 5 + realtime/pom.xml | 4 + .../realtime/firehose/FirehoseFactory.java | 3 +- .../druid/realtime/firehose/IrcDecoder.java | 15 ++ .../realtime/firehose/IrcFirehoseFactory.java | 217 ++++++++++++++++++ .../firehose/WikipediaIrcDecoder.java | 132 +++++++++++ 6 files changed, 375 insertions(+), 1 deletion(-) create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcDecoder.java create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java diff --git a/pom.xml b/pom.xml index ece774d529a..e81fb297fb8 100644 --- a/pom.xml +++ b/pom.xml @@ -359,6 +359,11 @@ ${apache.curator.version} test + + com.ircclouds.irc + irc-api + 1.0-0011 + diff --git a/realtime/pom.xml b/realtime/pom.xml index eff0de32e91..5ebde38abac 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -147,6 +147,10 @@ java-xmlbuilder true + + com.ircclouds.irc + irc-api + diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java index c9d9dfec3de..937947bfb34 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java @@ -29,7 +29,8 @@ import java.io.IOException; @JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class), @JsonSubTypes.Type(name = "rabbitmq", value = RabbitMQFirehoseFactory.class), @JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class), - @JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class) + @JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class), + @JsonSubTypes.Type(name = "irc", value = IrcFirehoseFactory.class) }) public interface FirehoseFactory { diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcDecoder.java new file mode 100644 index 00000000000..3ce230fbcfe --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcDecoder.java @@ -0,0 +1,15 @@ +package com.metamx.druid.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.metamx.druid.input.InputRow; +import org.joda.time.DateTime; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(name = "wikipedia", value = WikipediaIrcDecoder.class) + }) +public interface IrcDecoder +{ + public InputRow decodeMessage(DateTime timestamp, String channel, String msg); +} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java new file mode 100644 index 00000000000..6c0a5ff0756 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java @@ -0,0 +1,217 @@ +package com.metamx.druid.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.ircclouds.irc.api.Callback; +import com.ircclouds.irc.api.IRCApi; +import com.ircclouds.irc.api.IRCApiImpl; +import com.ircclouds.irc.api.IServerParameters; +import com.ircclouds.irc.api.domain.IRCServer; +import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg; +import com.ircclouds.irc.api.listeners.VariousMessageListenerAdapter; +import com.ircclouds.irc.api.state.IIRCState; +import com.metamx.common.Pair; +import com.metamx.common.logger.Logger; +import com.metamx.druid.input.InputRow; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.LinkedBlockingQueue; + + +public class IrcFirehoseFactory implements FirehoseFactory +{ + private static final Logger log = new Logger(IrcFirehoseFactory.class); + + private final String nick; + private final String host; + private final List channels; + private final IrcDecoder decoder; + + @JsonCreator + public IrcFirehoseFactory( + @JsonProperty String nick, + @JsonProperty String host, + @JsonProperty List channels, + @JsonProperty IrcDecoder decoder + ) + { + this.nick = nick; + this.host = host; + this.channels = channels; + this.decoder = decoder; + } + + @Override + public Firehose connect() throws IOException + { + final IRCApi irc = new IRCApiImpl(false); + final LinkedBlockingQueue> queue = new LinkedBlockingQueue>(); + + irc.addListener(new VariousMessageListenerAdapter() { + @Override + public void onChannelMessage(ChannelPrivMsg aMsg) + { + try { + queue.put(Pair.of(new DateTime(), aMsg)); + } catch(InterruptedException e) { + throw new RuntimeException("interrupted adding message to queue", e); + } + } + }); + + log.info("connecting to irc server [%s]", host); + irc.connect( + new IServerParameters() + { + @Override + public String getNickname() + { + return nick; + } + + @Override + public List getAlternativeNicknames() + { + return Lists.newArrayList(nick + "_", + nick + "__", + nick + "___"); + } + + @Override + public String getIdent() + { + return "druid"; + } + + @Override + public String getRealname() + { + return nick; + } + + @Override + public IRCServer getServer() + { + return new IRCServer(host, false); + } + }, + new Callback() + { + @Override + public void onSuccess(IIRCState aObject) + { + log.info("irc connection to server [%s] established", host); + for(String chan : channels) { + log.info("Joining channel %s", chan); + irc.joinChannel(chan); + } + } + + @Override + public void onFailure(Exception e) + { + log.error(e, "Unable to connect to irc server [%s]", host); + throw new RuntimeException("Unable to connect to server", e); + } + }); + + + return new Firehose() + { + InputRow nextRow = null; + + @Override + public boolean hasMore() + { + try { + while(true) { + Pair nextMsg = queue.take(); + try { + nextRow = decoder.decodeMessage(nextMsg.lhs, nextMsg.rhs.getChannelName(), nextMsg.rhs.getText()); + if(nextRow != null) return true; + } + catch (IllegalArgumentException iae) { + log.debug("ignoring invalid message in channel [%s]", nextMsg.rhs.getChannelName()); + } + } + } + catch(InterruptedException e) { + Thread.interrupted(); + throw new RuntimeException("interrupted retrieving elements from queue", e); + } + } + + @Override + public InputRow nextRow() + { + return nextRow; + } + + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + // nothing to see here + } + }; + } + + @Override + public void close() throws IOException + { + log.info("disconnecting from irc server [%s]", host); + irc.disconnect(""); + } + }; + } + + public static void main(String[] args) throws IOException { + + Map namespaces = Maps.newHashMap(); + namespaces.put("", "main"); + namespaces.put("Category", "category"); + namespaces.put("Media", "media"); + namespaces.put("MediaWiki", "mediawiki"); + namespaces.put("Template", "template"); + namespaces.put("$1 talk", "project talk"); + namespaces.put("Help talk", "help talk"); + namespaces.put("User", "user"); + namespaces.put("Template talk", "template talk"); + namespaces.put("MediaWiki talk", "mediawiki talk"); + namespaces.put("Talk", "talk"); + namespaces.put("Help", "help"); + namespaces.put("File talk", "file talk"); + namespaces.put("File", "file"); + namespaces.put("User talk", "user talk"); + namespaces.put("Special", "special"); + namespaces.put("Category talk", "category talk"); + + IrcFirehoseFactory factory = new IrcFirehoseFactory( + UUID.randomUUID().toString(), + "irc.wikimedia.org", + Lists.newArrayList( + "#en.wikipedia", + "#fr.wikipedia", + "#de.wikipedia", + "#ja.wikipedia" + ), + new WikipediaIrcDecoder(namespaces) + ); + + Firehose f = factory.connect(); + while(f.hasMore()) { + System.out.println(f.nextRow()); + } + } +} + diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java new file mode 100644 index 00000000000..e7faebaa912 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java @@ -0,0 +1,132 @@ +package com.metamx.druid.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.druid.input.InputRow; +import org.joda.time.DateTime; + +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +class WikipediaIrcDecoder implements IrcDecoder +{ + static final Pattern pattern = Pattern.compile( + "\\x0314\\[\\[\\x0307(.+?)\\x0314\\]\\]\\x034 (.*?)\\x0310.*\\x0302(http.+?)\\x03.+\\x0303(.+?)\\x03.+\\x03 (\\(([+-]\\d+)\\).*|.+) \\x0310(.+)\\x03" + ); + static final Pattern ipPattern = Pattern.compile("\\d+.\\d+.\\d+.\\d+"); + static final Pattern shortnamePattern = Pattern.compile("#(\\w\\w)\\..*"); + + static final List dimensionList = Lists.newArrayList( + "page", + "language", + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "added", + "delete", + "delta" + ); + + final Map namespaces; + + @JsonCreator + public WikipediaIrcDecoder(@JsonProperty Map namespaces) + { + if(namespaces == null) namespaces = Maps.newHashMap(); + this.namespaces = namespaces; + } + + @Override + public InputRow decodeMessage(final DateTime timestamp, String channel, String msg) + { + final Map dimensions = Maps.newHashMap(); + final Map metrics = Maps.newHashMap(); + + Matcher m = pattern.matcher(msg); + if(!m.matches()) { + throw new IllegalArgumentException("Invalid input format"); + } + + Matcher shortname = shortnamePattern.matcher(channel); + if(shortname.matches()) { + dimensions.put("language", shortname.group(1)); + } + + String page = m.group(1); + String pageUrl = page.replaceAll("\\s", "_"); + + dimensions.put("page", pageUrl); + + String user = m.group(4); + boolean anonymous = ipPattern.matcher(user).matches(); + dimensions.put("user", user); + + final String flags = m.group(2); + dimensions.put("unpatrolled", Boolean.toString(flags.contains("!"))); + dimensions.put("newPage", Boolean.toString(flags.contains("N"))); + dimensions.put("robot", Boolean.toString(flags.contains("B"))); + + dimensions.put("anonymous", Boolean.toString(anonymous)); + + String[] parts = page.split(":"); + if(parts.length > 1 && !parts[1].startsWith(" ")) { + if(namespaces.containsKey(parts[0])) { + dimensions.put("namespace", namespaces.get(parts[0])); + } else { + dimensions.put("namespace", "wikipedia"); + } + } + else { + dimensions.put("namespace", "article"); + } + + float delta = m.group(6) != null ? Float.parseFloat(m.group(6)) : 0; + metrics.put("delta", delta); + metrics.put("added", Math.max(delta, 0)); + metrics.put("deleted", Math.min(delta, 0)); + + return new InputRow() + { + @Override + public List getDimensions() + { + return dimensionList; + } + + @Override + public long getTimestampFromEpoch() + { + return timestamp.getMillis(); + } + + @Override + public List getDimension(String dimension) + { + return Lists.newArrayList(dimensions.get(dimension)); + } + + @Override + public float getFloatMetric(String metric) + { + return metrics.get(metric); + } + + @Override + public String toString() + { + return "WikipediaRow{" + + "timestamp=" + timestamp + + ", dimensions=" + dimensions + + ", metrics=" + metrics + + '}'; + } + }; + } +} From 1cc1d0be5f801646f951dd1b3eaf820cb4318e40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 13 Aug 2013 19:05:56 -0700 Subject: [PATCH 02/20] remove example code --- .../realtime/firehose/IrcFirehoseFactory.java | 75 +++++++++---------- 1 file changed, 34 insertions(+), 41 deletions(-) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java index 6c0a5ff0756..b9b4bb57247 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java @@ -19,11 +19,43 @@ import org.joda.time.DateTime; import java.io.IOException; import java.util.List; -import java.util.Map; -import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; +/** + * Example usage + *
+    Map namespaces = Maps.newHashMap();
+    namespaces.put("", "main");
+    namespaces.put("Category", "category");
+    namespaces.put("Media", "media");
+    namespaces.put("MediaWiki", "mediawiki");
+    namespaces.put("Template", "template");
+    namespaces.put("$1 talk", "project talk");
+    namespaces.put("Help talk", "help talk");
+    namespaces.put("User", "user");
+    namespaces.put("Template talk", "template talk");
+    namespaces.put("MediaWiki talk", "mediawiki talk");
+    namespaces.put("Talk", "talk");
+    namespaces.put("Help", "help");
+    namespaces.put("File talk", "file talk");
+    namespaces.put("File", "file");
+    namespaces.put("User talk", "user talk");
+    namespaces.put("Special", "special");
+    namespaces.put("Category talk", "category talk");
 
+    IrcFirehoseFactory factory = new IrcFirehoseFactory(
+        "wiki-druid-123",
+        "irc.wikimedia.org",
+        Lists.newArrayList(
+            "#en.wikipedia",
+            "#fr.wikipedia",
+            "#de.wikipedia",
+            "#ja.wikipedia"
+        ),
+        new WikipediaIrcDecoder(namespaces)
+    );
+   
+ */ public class IrcFirehoseFactory implements FirehoseFactory { private static final Logger log = new Logger(IrcFirehoseFactory.class); @@ -174,44 +206,5 @@ public class IrcFirehoseFactory implements FirehoseFactory } }; } - - public static void main(String[] args) throws IOException { - - Map namespaces = Maps.newHashMap(); - namespaces.put("", "main"); - namespaces.put("Category", "category"); - namespaces.put("Media", "media"); - namespaces.put("MediaWiki", "mediawiki"); - namespaces.put("Template", "template"); - namespaces.put("$1 talk", "project talk"); - namespaces.put("Help talk", "help talk"); - namespaces.put("User", "user"); - namespaces.put("Template talk", "template talk"); - namespaces.put("MediaWiki talk", "mediawiki talk"); - namespaces.put("Talk", "talk"); - namespaces.put("Help", "help"); - namespaces.put("File talk", "file talk"); - namespaces.put("File", "file"); - namespaces.put("User talk", "user talk"); - namespaces.put("Special", "special"); - namespaces.put("Category talk", "category talk"); - - IrcFirehoseFactory factory = new IrcFirehoseFactory( - UUID.randomUUID().toString(), - "irc.wikimedia.org", - Lists.newArrayList( - "#en.wikipedia", - "#fr.wikipedia", - "#de.wikipedia", - "#ja.wikipedia" - ), - new WikipediaIrcDecoder(namespaces) - ); - - Firehose f = factory.connect(); - while(f.hasMore()) { - System.out.println(f.nextRow()); - } - } } From 6735ae4ecd8a29c0b55776270e7d301a323df6c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 14 Aug 2013 14:28:29 -0700 Subject: [PATCH 03/20] add ip geo lookup --- pom.xml | 14 ++-- realtime/pom.xml | 5 ++ .../realtime/firehose/IrcFirehoseFactory.java | 3 +- .../firehose/WikipediaIrcDecoder.java | 76 +++++++++++++++++-- 4 files changed, 84 insertions(+), 14 deletions(-) diff --git a/pom.xml b/pom.xml index e81fb297fb8..b2f2c922af7 100644 --- a/pom.xml +++ b/pom.xml @@ -182,37 +182,37 @@ com.fasterxml.jackson.core jackson-annotations - 2.1.4 + 2.2.2 com.fasterxml.jackson.core jackson-core - 2.1.4 + 2.2.2 com.fasterxml.jackson.core jackson-databind - 2.1.4 + 2.2.2 com.fasterxml.jackson.datatype jackson-datatype-guava - 2.1.2 + 2.2.2 com.fasterxml.jackson.datatype jackson-datatype-joda - 2.1.2 + 2.2.2 com.fasterxml.jackson.dataformat jackson-dataformat-smile - 2.1.4 + 2.2.2 com.fasterxml.jackson.jaxrs jackson-jaxrs-json-provider - 2.1.4 + 2.2.2 javax.inject diff --git a/realtime/pom.xml b/realtime/pom.xml index 5ebde38abac..b4f81e06595 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -151,6 +151,11 @@ com.ircclouds.irc irc-api + + com.maxmind.geoip2 + geoip2 + 0.4.0 + diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java index b9b4bb57247..d7102c46a39 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java @@ -3,7 +3,6 @@ package com.metamx.druid.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.ircclouds.irc.api.Callback; import com.ircclouds.irc.api.IRCApi; import com.ircclouds.irc.api.IRCApiImpl; @@ -90,7 +89,7 @@ public class IrcFirehoseFactory implements FirehoseFactory public void onChannelMessage(ChannelPrivMsg aMsg) { try { - queue.put(Pair.of(new DateTime(), aMsg)); + queue.put(Pair.of(DateTime.now(), aMsg)); } catch(InterruptedException e) { throw new RuntimeException("interrupted adding message to queue", e); } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java index e7faebaa912..a11937cd06a 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java @@ -4,16 +4,31 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.maxmind.geoip2.DatabaseReader; +import com.maxmind.geoip2.exception.GeoIp2Exception; +import com.maxmind.geoip2.model.Omni; +import com.metamx.common.logger.Logger; import com.metamx.druid.input.InputRow; +import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.zip.GZIPInputStream; class WikipediaIrcDecoder implements IrcDecoder { + static final Logger log = new Logger(WikipediaIrcDecoder.class); + + final DatabaseReader geoLookup; + static final Pattern pattern = Pattern.compile( "\\x0314\\[\\[\\x0307(.+?)\\x0314\\]\\]\\x034 (.*?)\\x0310.*\\x0302(http.+?)\\x03.+\\x0303(.+?)\\x03.+\\x03 (\\(([+-]\\d+)\\).*|.+) \\x0310(.+)\\x03" ); @@ -29,18 +44,51 @@ class WikipediaIrcDecoder implements IrcDecoder "robot", "anonymous", "namespace", - "added", - "delete", - "delta" + "continent", + "country", + "region", + "city" ); final Map namespaces; + public WikipediaIrcDecoder(Map namespaces) { + this(namespaces, null); + } + @JsonCreator - public WikipediaIrcDecoder(@JsonProperty Map namespaces) + public WikipediaIrcDecoder(@JsonProperty Map namespaces, + @JsonProperty String geoDbFile) { if(namespaces == null) namespaces = Maps.newHashMap(); this.namespaces = namespaces; + + + File geoDb; + if(geoDbFile != null) { + geoDb = new File(geoDbFile); + } else { + try { + geoDb = File.createTempFile("geoip", null); + geoDb.deleteOnExit(); + + log.info("Downloading geo ip database to [%s]", geoDb); + + FileUtils.copyInputStreamToFile( + new GZIPInputStream( + new URL("http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz").openStream() + ), + geoDb + ); + } catch(IOException e) { + throw new RuntimeException("Unable to download geo ip database [%s]", e); + } + } + try { + geoLookup = new DatabaseReader(geoDb); + } catch(IOException e) { + throw new RuntimeException("Unable to open geo ip lookup database", e); + } } @Override @@ -65,7 +113,25 @@ class WikipediaIrcDecoder implements IrcDecoder dimensions.put("page", pageUrl); String user = m.group(4); - boolean anonymous = ipPattern.matcher(user).matches(); + Matcher ipMatch = ipPattern.matcher(user); + boolean anonymous = ipMatch.matches(); + if(anonymous) { + try { + final InetAddress ip = InetAddress.getByName(ipMatch.group()); + final Omni lookup = geoLookup.omni(ip); + + dimensions.put("continent", lookup.getContinent().getName()); + dimensions.put("country", lookup.getCountry().getName()); + dimensions.put("region", lookup.getMostSpecificSubdivision().getName()); + dimensions.put("city", lookup.getCity().getName()); + } catch(UnknownHostException e) { + log.error(e, "invalid ip [%s]", ipMatch.group()); + } catch(IOException e) { + log.error(e, "error looking up geo ip"); + } catch(GeoIp2Exception e) { + log.error(e, "error looking up geo ip"); + } + } dimensions.put("user", user); final String flags = m.group(2); From 70b3640b852a1a22798a6819711fb560d329ca6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 14 Aug 2013 14:31:55 -0700 Subject: [PATCH 04/20] move dependency version into main pom --- pom.xml | 5 +++++ realtime/pom.xml | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b2f2c922af7..d36f7ca9605 100644 --- a/pom.xml +++ b/pom.xml @@ -364,6 +364,11 @@ irc-api 1.0-0011 + + com.maxmind.geoip2 + geoip2 + 0.4.0 + diff --git a/realtime/pom.xml b/realtime/pom.xml index b4f81e06595..722697a1f8a 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -154,7 +154,6 @@ com.maxmind.geoip2 geoip2 - 0.4.0 From 3f33ca15b057a5771b64ceda6e35fc7125dfa797 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 15 Aug 2013 10:58:03 -0700 Subject: [PATCH 05/20] refactor namespaces and add docs / license --- .../realtime/firehose/IrcFirehoseFactory.java | 99 +++++++++++++------ .../firehose/WikipediaIrcDecoder.java | 36 +++++-- 2 files changed, 95 insertions(+), 40 deletions(-) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java index d7102c46a39..6a51be2518a 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; @@ -16,44 +35,60 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.input.InputRow; import org.joda.time.DateTime; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; /** * Example usage - *
-    Map namespaces = Maps.newHashMap();
-    namespaces.put("", "main");
-    namespaces.put("Category", "category");
-    namespaces.put("Media", "media");
-    namespaces.put("MediaWiki", "mediawiki");
-    namespaces.put("Template", "template");
-    namespaces.put("$1 talk", "project talk");
-    namespaces.put("Help talk", "help talk");
-    namespaces.put("User", "user");
-    namespaces.put("Template talk", "template talk");
-    namespaces.put("MediaWiki talk", "mediawiki talk");
-    namespaces.put("Talk", "talk");
-    namespaces.put("Help", "help");
-    namespaces.put("File talk", "file talk");
-    namespaces.put("File", "file");
-    namespaces.put("User talk", "user talk");
-    namespaces.put("Special", "special");
-    namespaces.put("Category talk", "category talk");
-
-    IrcFirehoseFactory factory = new IrcFirehoseFactory(
-        "wiki-druid-123",
-        "irc.wikimedia.org",
-        Lists.newArrayList(
-            "#en.wikipedia",
-            "#fr.wikipedia",
-            "#de.wikipedia",
-            "#ja.wikipedia"
-        ),
-        new WikipediaIrcDecoder(namespaces)
-    );
-   
+ * + * Decoder definition wikipedia-decoder.json + *
{@code
+ * {
+ *   "type": "wikipedia",
+ *   "namespaces": {
+ *     "#en.wikipedia": {
+ *       "": "main",
+ *       "Category": "category",
+ *       "Template talk": "template talk",
+ *       "Help talk": "help talk",
+ *       "Media": "media",
+ *       "MediaWiki talk": "mediawiki talk",
+ *       "File talk": "file talk",
+ *       "MediaWiki": "mediawiki",
+ *       "User": "user",
+ *       "File": "file",
+ *       "User talk": "user talk",
+ *       "Template": "template",
+ *       "Help": "help",
+ *       "Special": "special",
+ *       "Talk": "talk",
+ *       "Category talk": "category talk"
+ *     }
+ *   },
+ *   "geoIpDatabase": "path/to/GeoLite2-City.mmdb"
+ * }
+ * }
+ * + *
{@code
+ * IrcDecoder wikipediaDecoder = new ObjectMapper().readValue(
+ *   new File("wikipedia-decoder.json"),
+ *   IrcDecoder.class
+ * );
+ *
+ * IrcFirehoseFactory factory = new IrcFirehoseFactory(
+ *     "wiki123",
+ *     "irc.wikimedia.org",
+ *     Lists.newArrayList(
+ *         "#en.wikipedia",
+ *         "#fr.wikipedia",
+ *         "#de.wikipedia",
+ *         "#ja.wikipedia"
+ *     ),
+ *     wikipediaDecoder
+ * );
+ * }
*/ public class IrcFirehoseFactory implements FirehoseFactory { diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java index a11937cd06a..4d1c15ce125 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; @@ -50,23 +69,23 @@ class WikipediaIrcDecoder implements IrcDecoder "city" ); - final Map namespaces; + final Map> namespaces; - public WikipediaIrcDecoder(Map namespaces) { + public WikipediaIrcDecoder( Map> namespaces) { this(namespaces, null); } @JsonCreator - public WikipediaIrcDecoder(@JsonProperty Map namespaces, - @JsonProperty String geoDbFile) + public WikipediaIrcDecoder(@JsonProperty("namespaces") Map> namespaces, + @JsonProperty("geoIpDatabase") String geoIpDatabase) { if(namespaces == null) namespaces = Maps.newHashMap(); this.namespaces = namespaces; File geoDb; - if(geoDbFile != null) { - geoDb = new File(geoDbFile); + if(geoIpDatabase != null) { + geoDb = new File(geoIpDatabase); } else { try { geoDb = File.createTempFile("geoip", null); @@ -143,8 +162,9 @@ class WikipediaIrcDecoder implements IrcDecoder String[] parts = page.split(":"); if(parts.length > 1 && !parts[1].startsWith(" ")) { - if(namespaces.containsKey(parts[0])) { - dimensions.put("namespace", namespaces.get(parts[0])); + Map channelNamespaces = namespaces.get(channel); + if(channelNamespaces != null && channelNamespaces.containsKey(parts[0])) { + dimensions.put("namespace", channelNamespaces.get(parts[0])); } else { dimensions.put("namespace", "wikipedia"); } From 071e8cb2f33732bef17aec9ac39cadf7d4a04f9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 15 Aug 2013 11:07:55 -0700 Subject: [PATCH 06/20] updated javadocs --- .../metamx/druid/realtime/firehose/IrcFirehoseFactory.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java index 6a51be2518a..aa9f47c8c81 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java @@ -41,10 +41,11 @@ import java.util.List; import java.util.concurrent.LinkedBlockingQueue; /** - * Example usage + *

Example Usage

* - * Decoder definition wikipedia-decoder.json + *

Decoder definition: wikipedia-decoder.json

*
{@code
+ *
  * {
  *   "type": "wikipedia",
  *   "namespaces": {
@@ -71,6 +72,7 @@ import java.util.concurrent.LinkedBlockingQueue;
  * }
  * }
* + *

Example code:

*
{@code
  * IrcDecoder wikipediaDecoder = new ObjectMapper().readValue(
  *   new File("wikipedia-decoder.json"),

From 9c145f5ce3483c68e24fa7ae92bc28a219c4c1c1 Mon Sep 17 00:00:00 2001
From: fjy 
Date: Mon, 19 Aug 2013 15:00:10 -0700
Subject: [PATCH 07/20] basic unit tets for realtime functinality

---
 .../druid/curator/announcement/Announcer.java |   9 +-
 .../druid/partition/PartitionHolder.java      |  33 ++-
 .../druid/realtime/DbSegmentPublisher.java    |  19 ++
 .../realtime/DbSegmentPublisherConfig.java    |  19 ++
 .../druid/realtime/RealtimeManager.java       |  11 +-
 .../druid/realtime/SegmentPublisher.java      |  19 ++
 .../IntervalStartVersioningPolicy.java        |  19 ++
 .../MessageTimeRejectionPolicyFactory.java    |  19 ++
 .../plumber/NoopRejectionPolicyFactory.java   |  19 ++
 .../realtime/plumber/RejectionPolicy.java     |  19 ++
 .../ServerTimeRejectionPolicyFactory.java     |  19 ++
 .../metamx/druid/realtime/plumber/Sink.java   |   5 +
 .../realtime/plumber/VersioningPolicy.java    |  19 ++
 .../druid/realtime/RealtimeManagerTest.java   | 252 ++++++++++++++++++
 .../IntervalStartVersioningPolicyTest.java    |  37 +++
 ...MessageTimeRejectionPolicyFactoryTest.java |  40 +++
 .../plumber/RealtimePlumberSchoolTest.java    | 162 +++++++++++
 .../ServerTimeRejectionPolicyFactoryTest.java |  41 +++
 .../druid/realtime/plumber/SinkTest.java      | 124 +++++++++
 19 files changed, 865 insertions(+), 20 deletions(-)
 create mode 100644 realtime/src/test/java/com/metamx/druid/realtime/RealtimeManagerTest.java
 create mode 100644 realtime/src/test/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicyTest.java
 create mode 100644 realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java
 create mode 100644 realtime/src/test/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchoolTest.java
 create mode 100644 realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java
 create mode 100644 realtime/src/test/java/com/metamx/druid/realtime/plumber/SinkTest.java

diff --git a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java
index 727f7704771..abb96b76f68 100644
--- a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java
+++ b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java
@@ -320,12 +320,9 @@ public class Announcer
 
     final ConcurrentMap subPaths = announcements.get(parentPath);
 
-    if (subPaths == null) {
-      throw new IAE("Path[%s] not announced, cannot unannounce.", path);
-    }
-
-    if (subPaths.remove(pathAndNode.getNode()) == null) {
-      throw new IAE("Path[%s] not announced, cannot unannounce.", path);
+    if (subPaths == null || subPaths.remove(pathAndNode.getNode()) == null) {
+      log.error("Path[%s] not announced, cannot unannounce.", path);
+      return;
     }
 
     try {
diff --git a/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java b/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java
index 79d1b5dd96e..be40ce9a63e 100644
--- a/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java
+++ b/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.SortedSet;
 import java.util.TreeSet;
 
 /**
@@ -68,11 +69,16 @@ public class PartitionHolder implements Iterable>
 
   public PartitionChunk remove(PartitionChunk chunk)
   {
-    // Somewhat funky implementation in order to return the removed object as it exists in the set
-    PartitionChunk element = holderSet.tailSet(chunk, true).first();
-    if (chunk.equals(element)) {
-      holderSet.remove(element);
-      return element;
+    if (!holderSet.isEmpty()) {
+      // Somewhat funky implementation in order to return the removed object as it exists in the set
+      SortedSet> tailSet = holderSet.tailSet(chunk, true);
+      if (!tailSet.isEmpty()) {
+        PartitionChunk element = tailSet.first();
+        if (chunk.equals(element)) {
+          holderSet.remove(element);
+          return element;
+        }
+      }
     }
     return null;
   }
@@ -110,16 +116,17 @@ public class PartitionHolder implements Iterable>
     return true;
   }
 
-  public PartitionChunk getChunk(final int partitionNum) {
+  public PartitionChunk getChunk(final int partitionNum)
+  {
     final Iterator> retVal = Iterators.filter(
         holderSet.iterator(), new Predicate>()
-        {
-          @Override
-          public boolean apply(PartitionChunk input)
-          {
-            return input.getChunkNumber() == partitionNum;
-          }
-        }
+    {
+      @Override
+      public boolean apply(PartitionChunk input)
+      {
+        return input.getChunkNumber() == partitionNum;
+      }
+    }
     );
 
     return retVal.hasNext() ? retVal.next() : null;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java
index c60723ece50..5af94da2fb3 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
 package com.metamx.druid.realtime;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisherConfig.java b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisherConfig.java
index 5dcaccac49b..f174fd0a85f 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisherConfig.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisherConfig.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
 package com.metamx.druid.realtime;
 
 import org.skife.config.Config;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java
index 55c8af27dcd..dc7bdc7960e 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java
@@ -94,6 +94,10 @@ public class RealtimeManager implements QuerySegmentWalker
       Closeables.closeQuietly(chief);
     }
   }
+  public FireDepartmentMetrics getMetrics(String datasource)
+  {
+    return chiefs.get(datasource).getMetrics();
+  }
 
   @Override
   public  QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals)
@@ -149,6 +153,11 @@ public class RealtimeManager implements QuerySegmentWalker
       }
     }
 
+    public FireDepartmentMetrics getMetrics()
+    {
+      return metrics;
+    }
+
     @Override
     public void run()
     {
@@ -186,11 +195,11 @@ public class RealtimeManager implements QuerySegmentWalker
             }
 
             int currCount = sink.add(inputRow);
-            metrics.incrementProcessed();
             if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
               plumber.persist(firehose.commit());
               nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
             }
+            metrics.incrementProcessed();
           }
           catch (FormattedException e) {
             log.info(e, "unparseable line: %s", e.getDetails());
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/SegmentPublisher.java b/realtime/src/main/java/com/metamx/druid/realtime/SegmentPublisher.java
index 48315849921..a381e5ab6a3 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/SegmentPublisher.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/SegmentPublisher.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
 package com.metamx.druid.realtime;
 
 import com.metamx.druid.client.DataSegment;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java
index 4ad3f123299..c052deeec98 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
 package com.metamx.druid.realtime.plumber;
 
 import org.joda.time.Interval;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java
index 117fa6a40eb..57ba07a76cc 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
 package com.metamx.druid.realtime.plumber;
 
 import org.joda.time.DateTime;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java
index 59a3e24cb21..27bef8b8020 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
 package com.metamx.druid.realtime.plumber;
 
 import org.joda.time.DateTime;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java
index 847c917dc35..61f4308a15a 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
 package com.metamx.druid.realtime.plumber;
 
 import org.joda.time.DateTime;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java
index 3557a8ba3bc..b97700699d4 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
 package com.metamx.druid.realtime.plumber;
 
 import org.joda.time.DateTime;
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java
index a1823b6c09a..bc0bc194f99 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java
@@ -100,6 +100,11 @@ public class Sink implements Iterable
     return interval;
   }
 
+  public FireHydrant getCurrIndex()
+  {
+    return currIndex;
+  }
+
   public int add(InputRow row)
   {
     if (currIndex == null) {
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java
index 5fe790dd284..36ab3830f6e 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
 package com.metamx.druid.realtime.plumber;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
diff --git a/realtime/src/test/java/com/metamx/druid/realtime/RealtimeManagerTest.java b/realtime/src/test/java/com/metamx/druid/realtime/RealtimeManagerTest.java
new file mode 100644
index 00000000000..1203a0e6ba4
--- /dev/null
+++ b/realtime/src/test/java/com/metamx/druid/realtime/RealtimeManagerTest.java
@@ -0,0 +1,252 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.realtime;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.metamx.common.ISE;
+import com.metamx.druid.Query;
+import com.metamx.druid.QueryGranularity;
+import com.metamx.druid.aggregation.AggregatorFactory;
+import com.metamx.druid.aggregation.CountAggregatorFactory;
+import com.metamx.druid.guava.Runnables;
+import com.metamx.druid.index.v1.SpatialDimensionSchema;
+import com.metamx.druid.input.InputRow;
+import com.metamx.druid.query.QueryRunner;
+import com.metamx.druid.realtime.firehose.Firehose;
+import com.metamx.druid.realtime.firehose.FirehoseFactory;
+import com.metamx.druid.realtime.plumber.Plumber;
+import com.metamx.druid.realtime.plumber.PlumberSchool;
+import com.metamx.druid.realtime.plumber.Sink;
+import com.metamx.druid.shard.NoneShardSpec;
+import junit.framework.Assert;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class RealtimeManagerTest
+{
+  private RealtimeManager realtimeManager;
+  private Schema schema;
+  private TestPlumber plumber;
+
+  @Before
+  public void setUp() throws Exception
+  {
+    schema = new Schema(
+        "test",
+        Lists.newArrayList(),
+        new AggregatorFactory[]{new CountAggregatorFactory("rows")},
+        QueryGranularity.NONE,
+        new NoneShardSpec()
+    );
+
+    final List rows = Arrays.asList(
+        makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis())
+    );
+
+    plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, new DateTime().toString()));
+
+    realtimeManager = new RealtimeManager(
+        Arrays.asList(
+            new FireDepartment(
+                schema,
+                new FireDepartmentConfig(1, new Period("P1Y")),
+                new FirehoseFactory()
+                {
+                  @Override
+                  public Firehose connect() throws IOException
+                  {
+                    return new TestFirehose(rows.iterator());
+                  }
+                },
+                new PlumberSchool()
+                {
+                  @Override
+                  public Plumber findPlumber(
+                      Schema schema, FireDepartmentMetrics metrics
+                  )
+                  {
+                    return plumber;
+                  }
+                }
+            )
+        ),
+        null
+    );
+  }
+
+  @Test
+  public void testRun() throws Exception
+  {
+    realtimeManager.start();
+
+    Stopwatch stopwatch = new Stopwatch().start();
+    while (realtimeManager.getMetrics("test").processed() != 1) {
+      Thread.sleep(100);
+      if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
+        throw new ISE("Realtime manager should have completed processing 2 events!");
+      }
+    }
+
+    Assert.assertEquals(1, realtimeManager.getMetrics("test").processed());
+    Assert.assertEquals(1, realtimeManager.getMetrics("test").thrownAway());
+    Assert.assertTrue(plumber.isStartedJob());
+    Assert.assertTrue(plumber.isFinishedJob());
+    Assert.assertEquals(1, plumber.getPersistCount());
+  }
+
+  private InputRow makeRow(final long timestamp)
+  {
+    return new InputRow()
+    {
+      @Override
+      public List getDimensions()
+      {
+        return Arrays.asList("testDim");
+      }
+
+      @Override
+      public long getTimestampFromEpoch()
+      {
+        return timestamp;
+      }
+
+      @Override
+      public List getDimension(String dimension)
+      {
+        return Lists.newArrayList();
+      }
+
+      @Override
+      public float getFloatMetric(String metric)
+      {
+        return 0;
+      }
+    };
+  }
+
+
+  private static class TestFirehose implements Firehose
+  {
+    private final Iterator rows;
+
+    private TestFirehose(Iterator rows)
+    {
+      this.rows = rows;
+    }
+
+    @Override
+    public boolean hasMore()
+    {
+      return rows.hasNext();
+    }
+
+    @Override
+    public InputRow nextRow()
+    {
+      return rows.next();
+    }
+
+    @Override
+    public Runnable commit()
+    {
+      return Runnables.getNoopRunnable();
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+    }
+  }
+
+  private static class TestPlumber implements Plumber
+  {
+    private final Sink sink;
+
+
+    private volatile boolean startedJob = false;
+    private volatile boolean finishedJob = false;
+    private volatile int persistCount = 0;
+
+    private TestPlumber(Sink sink)
+    {
+      this.sink = sink;
+    }
+
+    private boolean isStartedJob()
+    {
+      return startedJob;
+    }
+
+    private boolean isFinishedJob()
+    {
+      return finishedJob;
+    }
+
+    private int getPersistCount()
+    {
+      return persistCount;
+    }
+
+    @Override
+    public void startJob()
+    {
+      startedJob = true;
+    }
+
+    @Override
+    public Sink getSink(long timestamp)
+    {
+      if (sink.getInterval().contains(timestamp)) {
+        return sink;
+      }
+      return null;
+    }
+
+    @Override
+    public  QueryRunner getQueryRunner(Query query)
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void persist(Runnable commitRunnable)
+    {
+      persistCount++;
+    }
+
+    @Override
+    public void finishJob()
+    {
+      finishedJob = true;
+    }
+  }
+}
diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicyTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicyTest.java
new file mode 100644
index 00000000000..dcf92019428
--- /dev/null
+++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicyTest.java
@@ -0,0 +1,37 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.realtime.plumber;
+
+import junit.framework.Assert;
+import org.joda.time.Interval;
+import org.junit.Test;
+
+/**
+ */
+public class IntervalStartVersioningPolicyTest
+{
+  @Test
+  public void testGetVersion() throws Exception
+  {
+    IntervalStartVersioningPolicy policy = new IntervalStartVersioningPolicy();
+    String version = policy.getVersion(new Interval("2013-01-01/2013-01-02"));
+    Assert.assertEquals("2013-01-01T00:00:00.000Z", version);
+  }
+}
diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java
new file mode 100644
index 00000000000..c113cbf80ca
--- /dev/null
+++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java
@@ -0,0 +1,40 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.realtime.plumber;
+
+import junit.framework.Assert;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.Test;
+
+/**
+ */
+public class MessageTimeRejectionPolicyFactoryTest
+{
+  @Test
+  public void testAccept() throws Exception
+  {
+    RejectionPolicy rejectionPolicy = new MessageTimeRejectionPolicyFactory().create(
+        new Period("PT10M")
+    );
+
+    Assert.assertTrue(rejectionPolicy.accept(new DateTime().getMillis()));
+  }
+}
diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchoolTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchoolTest.java
new file mode 100644
index 00000000000..a93308c4fa6
--- /dev/null
+++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -0,0 +1,162 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.realtime.plumber;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.metamx.common.ISE;
+import com.metamx.druid.Query;
+import com.metamx.druid.QueryGranularity;
+import com.metamx.druid.aggregation.AggregatorFactory;
+import com.metamx.druid.aggregation.CountAggregatorFactory;
+import com.metamx.druid.client.DataSegment;
+import com.metamx.druid.client.ServerView;
+import com.metamx.druid.coordination.DataSegmentAnnouncer;
+import com.metamx.druid.index.v1.IndexGranularity;
+import com.metamx.druid.index.v1.SpatialDimensionSchema;
+import com.metamx.druid.loading.DataSegmentPusher;
+import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import com.metamx.druid.query.QueryRunnerFactory;
+import com.metamx.druid.realtime.FireDepartmentMetrics;
+import com.metamx.druid.realtime.Schema;
+import com.metamx.druid.realtime.SegmentPublisher;
+import com.metamx.druid.shard.NoneShardSpec;
+import com.metamx.emitter.service.ServiceEmitter;
+import junit.framework.Assert;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class RealtimePlumberSchoolTest
+{
+  private Plumber plumber;
+
+  private DataSegmentAnnouncer announcer;
+  private SegmentPublisher segmentPublisher;
+  private DataSegmentPusher dataSegmentPusher;
+  private ServerView serverView;
+  private ServiceEmitter emitter;
+
+  @Before
+  public void setUp() throws Exception
+  {
+
+    final File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+
+    final Schema schema = new Schema(
+        "test",
+        Lists.newArrayList(),
+        new AggregatorFactory[]{new CountAggregatorFactory("rows")},
+        QueryGranularity.NONE,
+        new NoneShardSpec()
+    );
+
+    RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
+        new Period("PT10m"),
+        tmpDir,
+        IndexGranularity.HOUR
+    );
+
+    announcer = EasyMock.createMock(DataSegmentAnnouncer.class);
+    announcer.announceSegment(EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
+
+    segmentPublisher = EasyMock.createMock(SegmentPublisher.class);
+    dataSegmentPusher = EasyMock.createMock(DataSegmentPusher.class);
+
+    serverView = EasyMock.createMock(ServerView.class);
+    serverView.registerSegmentCallback(
+        EasyMock.anyObject(),
+        EasyMock.anyObject()
+    );
+    EasyMock.expectLastCall().anyTimes();
+
+    emitter = EasyMock.createMock(ServiceEmitter.class);
+
+    EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
+
+    realtimePlumberSchool.setConglomerate(new DefaultQueryRunnerFactoryConglomerate(Maps., QueryRunnerFactory>newHashMap()));
+    realtimePlumberSchool.setSegmentAnnouncer(announcer);
+    realtimePlumberSchool.setSegmentPublisher(segmentPublisher);
+    realtimePlumberSchool.setRejectionPolicyFactory(new NoopRejectionPolicyFactory());
+    realtimePlumberSchool.setVersioningPolicy(new IntervalStartVersioningPolicy());
+    realtimePlumberSchool.setDataSegmentPusher(dataSegmentPusher);
+    realtimePlumberSchool.setServerView(serverView);
+    realtimePlumberSchool.setServiceEmitter(emitter);
+
+    plumber = realtimePlumberSchool.findPlumber(schema, new FireDepartmentMetrics());
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
+  }
+
+  @Test
+  public void testGetSink() throws Exception
+  {
+    final DateTime theTime = new DateTime("2013-01-01");
+    Sink sink = plumber.getSink(theTime.getMillis());
+
+    Assert.assertEquals(new Interval(String.format("%s/PT1H", theTime.toString())), sink.getInterval());
+    Assert.assertEquals(theTime.toString(), sink.getVersion());
+  }
+
+  @Test
+  public void testPersist() throws Exception
+  {
+    final MutableBoolean committed = new MutableBoolean(false);
+    plumber.startJob();
+    plumber.persist(
+        new Runnable()
+        {
+          @Override
+          public void run()
+          {
+            committed.setValue(true);
+          }
+        }
+    );
+
+    Stopwatch stopwatch = new Stopwatch().start();
+    while (!committed.booleanValue()) {
+      Thread.sleep(100);
+      if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
+        throw new ISE("Taking too long to set perist value");
+      }
+    }
+    plumber.finishJob();
+  }
+}
diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java
new file mode 100644
index 00000000000..c262e345895
--- /dev/null
+++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.realtime.plumber;
+
+import junit.framework.Assert;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.Test;
+
+/**
+ */
+public class ServerTimeRejectionPolicyFactoryTest
+{
+  @Test
+  public void testAccept() throws Exception
+  {
+    RejectionPolicy rejectionPolicy = new ServerTimeRejectionPolicyFactory().create(
+        new Period("PT10M")
+    );
+
+    Assert.assertTrue(rejectionPolicy.accept(new DateTime().getMillis()));
+    Assert.assertFalse(rejectionPolicy.accept(new DateTime("2000").getMillis()));
+  }
+}
diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/SinkTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/SinkTest.java
new file mode 100644
index 00000000000..27b06609a0d
--- /dev/null
+++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/SinkTest.java
@@ -0,0 +1,124 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package com.metamx.druid.realtime.plumber;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.metamx.druid.QueryGranularity;
+import com.metamx.druid.aggregation.AggregatorFactory;
+import com.metamx.druid.aggregation.CountAggregatorFactory;
+import com.metamx.druid.index.v1.SpatialDimensionSchema;
+import com.metamx.druid.input.InputRow;
+import com.metamx.druid.realtime.FireHydrant;
+import com.metamx.druid.realtime.Schema;
+import com.metamx.druid.shard.NoneShardSpec;
+import junit.framework.Assert;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ */
+public class SinkTest
+{
+  @Test
+  public void testSwap() throws Exception
+  {
+    final Schema schema = new Schema(
+        "test",
+        Lists.newArrayList(),
+        new AggregatorFactory[]{new CountAggregatorFactory("rows")},
+        QueryGranularity.MINUTE,
+        new NoneShardSpec()
+    );
+
+    final Interval interval = new Interval("2013-01-01/2013-01-02");
+    final String version = new DateTime().toString();
+    final Sink sink = new Sink(interval, schema, version);
+
+    sink.add(new InputRow()
+    {
+      @Override
+      public List getDimensions()
+      {
+        return Lists.newArrayList();
+      }
+
+      @Override
+      public long getTimestampFromEpoch()
+      {
+        return new DateTime("2013-01-01").getMillis();
+      }
+
+      @Override
+      public List getDimension(String dimension)
+      {
+        return Lists.newArrayList();
+      }
+
+      @Override
+      public float getFloatMetric(String metric)
+      {
+        return 0;
+      }
+    });
+
+    FireHydrant currHydrant = sink.getCurrIndex();
+    Assert.assertEquals(new Interval("2013-01-01/PT1M"), currHydrant.getIndex().getInterval());
+
+
+    FireHydrant swapHydrant = sink.swap();
+
+    sink.add(new InputRow()
+    {
+      @Override
+      public List getDimensions()
+      {
+        return Lists.newArrayList();
+      }
+
+      @Override
+      public long getTimestampFromEpoch()
+      {
+        return new DateTime("2013-01-01").getMillis();
+      }
+
+      @Override
+      public List getDimension(String dimension)
+      {
+        return Lists.newArrayList();
+      }
+
+      @Override
+      public float getFloatMetric(String metric)
+      {
+        return 0;
+      }
+    });
+
+    Assert.assertEquals(currHydrant, swapHydrant);
+    Assert.assertNotSame(currHydrant, sink.getCurrIndex());
+    Assert.assertEquals(new Interval("2013-01-01/PT1M"), sink.getCurrIndex().getIndex().getInterval());
+
+    Assert.assertEquals(2, Iterators.size(sink.iterator()));
+  }
+}

From b792bf2ba53dc3591430e2edfa86110f1d1440dc Mon Sep 17 00:00:00 2001
From: Gian Merlino 
Date: Tue, 20 Aug 2013 17:58:22 -0700
Subject: [PATCH 08/20] [maven-release-plugin] prepare release druid-0.5.42

---
 client/pom.xml           | 2 +-
 common/pom.xml           | 2 +-
 examples/pom.xml         | 2 +-
 indexing-common/pom.xml  | 2 +-
 indexing-hadoop/pom.xml  | 2 +-
 indexing-service/pom.xml | 2 +-
 pom.xml                  | 2 +-
 realtime/pom.xml         | 2 +-
 server/pom.xml           | 2 +-
 services/pom.xml         | 4 ++--
 10 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/client/pom.xml b/client/pom.xml
index 78928a7a553..083e68bf066 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.42-SNAPSHOT
+        0.5.42
     
 
     
diff --git a/common/pom.xml b/common/pom.xml
index a280d29c152..b2bf12a329c 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.42-SNAPSHOT
+        0.5.42
     
 
     
diff --git a/examples/pom.xml b/examples/pom.xml
index 8924d77d0a6..31371f10577 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -9,7 +9,7 @@
     
         com.metamx
         druid
-        0.5.42-SNAPSHOT
+        0.5.42
     
 
     
diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml
index 114036404d1..465258db262 100644
--- a/indexing-common/pom.xml
+++ b/indexing-common/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.42-SNAPSHOT
+        0.5.42
     
 
     
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 071920ff4d6..47cb5aa03bf 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.42-SNAPSHOT
+        0.5.42
     
 
     
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index eb0433f7d7a..8e9ef62d7f4 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.42-SNAPSHOT
+        0.5.42
     
 
     
diff --git a/pom.xml b/pom.xml
index 28078266960..8ce0660dc87 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
     com.metamx
     druid
     pom
-    0.5.42-SNAPSHOT
+    0.5.42
     druid
     druid
     
diff --git a/realtime/pom.xml b/realtime/pom.xml
index a64b0609ee6..e07756f821e 100644
--- a/realtime/pom.xml
+++ b/realtime/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.42-SNAPSHOT
+        0.5.42
     
 
     
diff --git a/server/pom.xml b/server/pom.xml
index 796926104ed..1374548743d 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.42-SNAPSHOT
+        0.5.42
     
 
     
diff --git a/services/pom.xml b/services/pom.xml
index 39ec814e8c6..9f606def8b0 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -24,11 +24,11 @@
     druid-services
     druid-services
     druid-services
-    0.5.42-SNAPSHOT
+    0.5.42
     
         com.metamx
         druid
-        0.5.42-SNAPSHOT
+        0.5.42
     
 
     

From a3793eeba4ac4e0af29d9ee9b16138627f412be9 Mon Sep 17 00:00:00 2001
From: Gian Merlino 
Date: Tue, 20 Aug 2013 17:58:28 -0700
Subject: [PATCH 09/20] [maven-release-plugin] prepare for next development
 iteration

---
 client/pom.xml           | 2 +-
 common/pom.xml           | 2 +-
 examples/pom.xml         | 2 +-
 indexing-common/pom.xml  | 2 +-
 indexing-hadoop/pom.xml  | 2 +-
 indexing-service/pom.xml | 2 +-
 pom.xml                  | 2 +-
 realtime/pom.xml         | 2 +-
 server/pom.xml           | 2 +-
 services/pom.xml         | 4 ++--
 10 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/client/pom.xml b/client/pom.xml
index 083e68bf066..210fc4f71e9 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.42
+        0.5.43-SNAPSHOT
     
 
     
diff --git a/common/pom.xml b/common/pom.xml
index b2bf12a329c..5b7844b9f18 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.42
+        0.5.43-SNAPSHOT
     
 
     
diff --git a/examples/pom.xml b/examples/pom.xml
index 31371f10577..1a85fa40058 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -9,7 +9,7 @@
     
         com.metamx
         druid
-        0.5.42
+        0.5.43-SNAPSHOT
     
 
     
diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml
index 465258db262..85aa1b6fdd2 100644
--- a/indexing-common/pom.xml
+++ b/indexing-common/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.42
+        0.5.43-SNAPSHOT
     
 
     
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 47cb5aa03bf..a8e185754aa 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.42
+        0.5.43-SNAPSHOT
     
 
     
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 8e9ef62d7f4..163160c88a6 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.42
+        0.5.43-SNAPSHOT
     
 
     
diff --git a/pom.xml b/pom.xml
index 8ce0660dc87..02c16b0f2ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
     com.metamx
     druid
     pom
-    0.5.42
+    0.5.43-SNAPSHOT
     druid
     druid
     
diff --git a/realtime/pom.xml b/realtime/pom.xml
index e07756f821e..5269c5e848e 100644
--- a/realtime/pom.xml
+++ b/realtime/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.42
+        0.5.43-SNAPSHOT
     
 
     
diff --git a/server/pom.xml b/server/pom.xml
index 1374548743d..fa484b32b12 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.42
+        0.5.43-SNAPSHOT
     
 
     
diff --git a/services/pom.xml b/services/pom.xml
index 9f606def8b0..38b6ba54d89 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -24,11 +24,11 @@
     druid-services
     druid-services
     druid-services
-    0.5.42
+    0.5.43-SNAPSHOT
     
         com.metamx
         druid
-        0.5.42
+        0.5.43-SNAPSHOT
     
 
     

From d02be16245f20e376e522b5ac6ee4f137ba415a6 Mon Sep 17 00:00:00 2001
From: fjy 
Date: Tue, 20 Aug 2013 19:25:16 -0700
Subject: [PATCH 10/20] fix RTR closing PCC too early

---
 .../druid/indexing/coordinator/RemoteTaskRunner.java     | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
index f7a81a8af64..c5da481f9e4 100644
--- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
@@ -656,14 +656,19 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
     ZkWorker zkWorker = zkWorkers.get(worker.getHost());
     if (zkWorker != null) {
       try {
-        for (String assignedTask : cf.getChildren()
-                                     .forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))) {
+        List tasksToFail = Lists.newArrayList(
+            cf.getChildren().forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
+        );
+        tasksToFail.addAll(zkWorker.getRunningTaskIds());
+        for (String assignedTask : tasksToFail) {
           RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(assignedTask);
           if (taskRunnerWorkItem != null) {
             String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), assignedTask);
             if (cf.checkExists().forPath(taskPath) != null) {
               cf.delete().guaranteed().forPath(taskPath);
             }
+
+            log.info("Failing task %s", assignedTask);
             taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
           } else {
             log.warn("RemoteTaskRunner has no knowledge of task %s", assignedTask);

From a4eb025ba7142560e8c65bf24780dc7a1356dead Mon Sep 17 00:00:00 2001
From: fjy 
Date: Tue, 20 Aug 2013 19:28:56 -0700
Subject: [PATCH 11/20] [maven-release-plugin] prepare release druid-0.5.43

---
 client/pom.xml           | 2 +-
 common/pom.xml           | 2 +-
 examples/pom.xml         | 2 +-
 indexing-common/pom.xml  | 2 +-
 indexing-hadoop/pom.xml  | 2 +-
 indexing-service/pom.xml | 2 +-
 pom.xml                  | 2 +-
 realtime/pom.xml         | 2 +-
 server/pom.xml           | 2 +-
 services/pom.xml         | 4 ++--
 10 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/client/pom.xml b/client/pom.xml
index 210fc4f71e9..aea4f84c2d6 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.43-SNAPSHOT
+        0.5.43
     
 
     
diff --git a/common/pom.xml b/common/pom.xml
index 5b7844b9f18..d1aef43036c 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.43-SNAPSHOT
+        0.5.43
     
 
     
diff --git a/examples/pom.xml b/examples/pom.xml
index 1a85fa40058..6e402d5eecf 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -9,7 +9,7 @@
     
         com.metamx
         druid
-        0.5.43-SNAPSHOT
+        0.5.43
     
 
     
diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml
index 85aa1b6fdd2..67897950f62 100644
--- a/indexing-common/pom.xml
+++ b/indexing-common/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.43-SNAPSHOT
+        0.5.43
     
 
     
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index a8e185754aa..b383805ad86 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.43-SNAPSHOT
+        0.5.43
     
 
     
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 163160c88a6..463e40933c0 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.43-SNAPSHOT
+        0.5.43
     
 
     
diff --git a/pom.xml b/pom.xml
index 02c16b0f2ae..a842d3fb253 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
     com.metamx
     druid
     pom
-    0.5.43-SNAPSHOT
+    0.5.43
     druid
     druid
     
diff --git a/realtime/pom.xml b/realtime/pom.xml
index 5269c5e848e..318c62a6ce8 100644
--- a/realtime/pom.xml
+++ b/realtime/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.43-SNAPSHOT
+        0.5.43
     
 
     
diff --git a/server/pom.xml b/server/pom.xml
index fa484b32b12..59b8f7209b7 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.43-SNAPSHOT
+        0.5.43
     
 
     
diff --git a/services/pom.xml b/services/pom.xml
index 38b6ba54d89..9947eef6b04 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -24,11 +24,11 @@
     druid-services
     druid-services
     druid-services
-    0.5.43-SNAPSHOT
+    0.5.43
     
         com.metamx
         druid
-        0.5.43-SNAPSHOT
+        0.5.43
     
 
     

From baedacf9708d9c04b373f83df3d75b6aaf72aaa9 Mon Sep 17 00:00:00 2001
From: fjy 
Date: Tue, 20 Aug 2013 19:29:05 -0700
Subject: [PATCH 12/20] [maven-release-plugin] prepare for next development
 iteration

---
 client/pom.xml           | 2 +-
 common/pom.xml           | 2 +-
 examples/pom.xml         | 2 +-
 indexing-common/pom.xml  | 2 +-
 indexing-hadoop/pom.xml  | 2 +-
 indexing-service/pom.xml | 2 +-
 pom.xml                  | 2 +-
 realtime/pom.xml         | 2 +-
 server/pom.xml           | 2 +-
 services/pom.xml         | 4 ++--
 10 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/client/pom.xml b/client/pom.xml
index aea4f84c2d6..ce316a582c5 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.43
+        0.5.44-SNAPSHOT
     
 
     
diff --git a/common/pom.xml b/common/pom.xml
index d1aef43036c..a8731fbe25f 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.43
+        0.5.44-SNAPSHOT
     
 
     
diff --git a/examples/pom.xml b/examples/pom.xml
index 6e402d5eecf..475879016ea 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -9,7 +9,7 @@
     
         com.metamx
         druid
-        0.5.43
+        0.5.44-SNAPSHOT
     
 
     
diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml
index 67897950f62..128091ffd14 100644
--- a/indexing-common/pom.xml
+++ b/indexing-common/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.43
+        0.5.44-SNAPSHOT
     
 
     
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index b383805ad86..6b880550f73 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.43
+        0.5.44-SNAPSHOT
     
 
     
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 463e40933c0..0275e02027b 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.43
+        0.5.44-SNAPSHOT
     
 
     
diff --git a/pom.xml b/pom.xml
index a842d3fb253..6d034570986 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
     com.metamx
     druid
     pom
-    0.5.43
+    0.5.44-SNAPSHOT
     druid
     druid
     
diff --git a/realtime/pom.xml b/realtime/pom.xml
index 318c62a6ce8..8eb6d5cc235 100644
--- a/realtime/pom.xml
+++ b/realtime/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.43
+        0.5.44-SNAPSHOT
     
 
     
diff --git a/server/pom.xml b/server/pom.xml
index 59b8f7209b7..f26d9e1bacd 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.43
+        0.5.44-SNAPSHOT
     
 
     
diff --git a/services/pom.xml b/services/pom.xml
index 9947eef6b04..1e280cb9eae 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -24,11 +24,11 @@
     druid-services
     druid-services
     druid-services
-    0.5.43
+    0.5.44-SNAPSHOT
     
         com.metamx
         druid
-        0.5.43
+        0.5.44-SNAPSHOT
     
 
     

From e283de68318f4e0568cc703c357e695c72d8bce1 Mon Sep 17 00:00:00 2001
From: fjy 
Date: Tue, 20 Aug 2013 19:34:30 -0700
Subject: [PATCH 13/20] fix another bug with RTR to remove things correctly
 from running tasks

---
 .../com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
index c5da481f9e4..94e1ff80bd7 100644
--- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
@@ -661,7 +661,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
         );
         tasksToFail.addAll(zkWorker.getRunningTaskIds());
         for (String assignedTask : tasksToFail) {
-          RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(assignedTask);
+          RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
           if (taskRunnerWorkItem != null) {
             String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), assignedTask);
             if (cf.checkExists().forPath(taskPath) != null) {

From 66dcfe8c3e2326754468bd48c75f050ad03135b5 Mon Sep 17 00:00:00 2001
From: fjy 
Date: Tue, 20 Aug 2013 19:37:44 -0700
Subject: [PATCH 14/20] [maven-release-plugin] prepare release druid-0.5.44

---
 client/pom.xml           | 2 +-
 common/pom.xml           | 2 +-
 examples/pom.xml         | 2 +-
 indexing-common/pom.xml  | 2 +-
 indexing-hadoop/pom.xml  | 2 +-
 indexing-service/pom.xml | 2 +-
 pom.xml                  | 2 +-
 realtime/pom.xml         | 2 +-
 server/pom.xml           | 2 +-
 services/pom.xml         | 4 ++--
 10 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/client/pom.xml b/client/pom.xml
index ce316a582c5..12e72cdfd65 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.44-SNAPSHOT
+        0.5.44
     
 
     
diff --git a/common/pom.xml b/common/pom.xml
index a8731fbe25f..4a5ea2ee3ea 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.44-SNAPSHOT
+        0.5.44
     
 
     
diff --git a/examples/pom.xml b/examples/pom.xml
index 475879016ea..0416b9de4a5 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -9,7 +9,7 @@
     
         com.metamx
         druid
-        0.5.44-SNAPSHOT
+        0.5.44
     
 
     
diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml
index 128091ffd14..dee4f460974 100644
--- a/indexing-common/pom.xml
+++ b/indexing-common/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.44-SNAPSHOT
+        0.5.44
     
 
     
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 6b880550f73..d2fff65bae5 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.44-SNAPSHOT
+        0.5.44
     
 
     
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 0275e02027b..16090ee7dd9 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.44-SNAPSHOT
+        0.5.44
     
 
     
diff --git a/pom.xml b/pom.xml
index 6d034570986..8da926e20bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
     com.metamx
     druid
     pom
-    0.5.44-SNAPSHOT
+    0.5.44
     druid
     druid
     
diff --git a/realtime/pom.xml b/realtime/pom.xml
index 8eb6d5cc235..5c658830c39 100644
--- a/realtime/pom.xml
+++ b/realtime/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.44-SNAPSHOT
+        0.5.44
     
 
     
diff --git a/server/pom.xml b/server/pom.xml
index f26d9e1bacd..0835bfd120e 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.44-SNAPSHOT
+        0.5.44
     
 
     
diff --git a/services/pom.xml b/services/pom.xml
index 1e280cb9eae..e3b2c6e0983 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -24,11 +24,11 @@
     druid-services
     druid-services
     druid-services
-    0.5.44-SNAPSHOT
+    0.5.44
     
         com.metamx
         druid
-        0.5.44-SNAPSHOT
+        0.5.44
     
 
     

From ed666d9d5f5a02adf677cc14eccd83662aad08e7 Mon Sep 17 00:00:00 2001
From: fjy 
Date: Tue, 20 Aug 2013 19:37:52 -0700
Subject: [PATCH 15/20] [maven-release-plugin] prepare for next development
 iteration

---
 client/pom.xml           | 2 +-
 common/pom.xml           | 2 +-
 examples/pom.xml         | 2 +-
 indexing-common/pom.xml  | 2 +-
 indexing-hadoop/pom.xml  | 2 +-
 indexing-service/pom.xml | 2 +-
 pom.xml                  | 2 +-
 realtime/pom.xml         | 2 +-
 server/pom.xml           | 2 +-
 services/pom.xml         | 4 ++--
 10 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/client/pom.xml b/client/pom.xml
index 12e72cdfd65..33bf2c37a78 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.44
+        0.5.45-SNAPSHOT
     
 
     
diff --git a/common/pom.xml b/common/pom.xml
index 4a5ea2ee3ea..d72f86c5f94 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.44
+        0.5.45-SNAPSHOT
     
 
     
diff --git a/examples/pom.xml b/examples/pom.xml
index 0416b9de4a5..3ac6b6eb888 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -9,7 +9,7 @@
     
         com.metamx
         druid
-        0.5.44
+        0.5.45-SNAPSHOT
     
 
     
diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml
index dee4f460974..972209343a8 100644
--- a/indexing-common/pom.xml
+++ b/indexing-common/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.44
+        0.5.45-SNAPSHOT
     
 
     
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index d2fff65bae5..4a3d9f8ebd8 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.44
+        0.5.45-SNAPSHOT
     
 
     
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 16090ee7dd9..0d78c70150f 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.44
+        0.5.45-SNAPSHOT
     
 
     
diff --git a/pom.xml b/pom.xml
index 8da926e20bd..3593bf4ce18 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
     com.metamx
     druid
     pom
-    0.5.44
+    0.5.45-SNAPSHOT
     druid
     druid
     
diff --git a/realtime/pom.xml b/realtime/pom.xml
index 5c658830c39..b7643285985 100644
--- a/realtime/pom.xml
+++ b/realtime/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.44
+        0.5.45-SNAPSHOT
     
 
     
diff --git a/server/pom.xml b/server/pom.xml
index 0835bfd120e..1f45e161f71 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -28,7 +28,7 @@
     
         com.metamx
         druid
-        0.5.44
+        0.5.45-SNAPSHOT
     
 
     
diff --git a/services/pom.xml b/services/pom.xml
index e3b2c6e0983..8f366b1d326 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -24,11 +24,11 @@
     druid-services
     druid-services
     druid-services
-    0.5.44
+    0.5.45-SNAPSHOT
     
         com.metamx
         druid
-        0.5.44
+        0.5.45-SNAPSHOT
     
 
     

From 88661b26a04dd89fb914dc3f780d592576d0d634 Mon Sep 17 00:00:00 2001
From: fjy 
Date: Wed, 21 Aug 2013 11:14:54 -0700
Subject: [PATCH 16/20] bug fix for RTR removing workers race condition and
 partition chunks not being sorted by chunk number

---
 .../partition/IntegerPartitionChunk.java      |  7 +----
 .../druid/partition/LinearPartitionChunk.java |  8 +++++-
 .../druid/partition/StringPartitionChunk.java |  9 ++-----
 .../partition/IntegerPartitionChunkTest.java  | 12 ++++-----
 .../partition/StringPartitionChunkTest.java   | 14 +++++-----
 .../coordinator/RemoteTaskRunner.java         | 16 +++++++++---
 .../coordinator/RemoteTaskRunnerWorkItem.java | 19 ++++++++++++--
 .../coordinator/RemoteTaskRunnerTest.java     | 26 +++++++++++++++++++
 8 files changed, 79 insertions(+), 32 deletions(-)

diff --git a/common/src/main/java/com/metamx/druid/partition/IntegerPartitionChunk.java b/common/src/main/java/com/metamx/druid/partition/IntegerPartitionChunk.java
index a49313711ec..a2c511d1f99 100644
--- a/common/src/main/java/com/metamx/druid/partition/IntegerPartitionChunk.java
+++ b/common/src/main/java/com/metamx/druid/partition/IntegerPartitionChunk.java
@@ -97,12 +97,7 @@ public class IntegerPartitionChunk implements PartitionChunk
     if (chunk instanceof IntegerPartitionChunk) {
       IntegerPartitionChunk intChunk = (IntegerPartitionChunk) chunk;
 
-      int retVal = comparator.compare(start, intChunk.start);
-      if (retVal == 0) {
-        retVal = comparator.compare(end, intChunk.end);
-      }
-
-      return retVal;
+      return comparator.compare(chunkNumber, intChunk.chunkNumber);
     }
     throw new IllegalArgumentException("Cannot compare against something that is not a StringPartitionChunk.");
   }
diff --git a/common/src/main/java/com/metamx/druid/partition/LinearPartitionChunk.java b/common/src/main/java/com/metamx/druid/partition/LinearPartitionChunk.java
index 2f9cee1fe8a..cbc299f820e 100644
--- a/common/src/main/java/com/metamx/druid/partition/LinearPartitionChunk.java
+++ b/common/src/main/java/com/metamx/druid/partition/LinearPartitionChunk.java
@@ -1,7 +1,13 @@
 package com.metamx.druid.partition;
 
+import com.google.common.collect.Ordering;
+
+import java.util.Comparator;
+
 public class LinearPartitionChunk  implements PartitionChunk
 {
+  Comparator comparator = Ordering.natural().nullsFirst();
+
   private final int chunkNumber;
   private final T object;
 
@@ -56,7 +62,7 @@ public class LinearPartitionChunk  implements PartitionChunk
     if (chunk instanceof LinearPartitionChunk) {
       LinearPartitionChunk linearChunk = (LinearPartitionChunk) chunk;
 
-      return chunkNumber - chunk.getChunkNumber();
+      return comparator.compare(chunkNumber, linearChunk.chunkNumber);
     }
     throw new IllegalArgumentException("Cannot compare against something that is not a LinearPartitionChunk.");
   }
diff --git a/common/src/main/java/com/metamx/druid/partition/StringPartitionChunk.java b/common/src/main/java/com/metamx/druid/partition/StringPartitionChunk.java
index 39d0e71ae76..54b067faf7d 100644
--- a/common/src/main/java/com/metamx/druid/partition/StringPartitionChunk.java
+++ b/common/src/main/java/com/metamx/druid/partition/StringPartitionChunk.java
@@ -27,7 +27,7 @@ import java.util.Comparator;
  */
 public class StringPartitionChunk implements PartitionChunk
 {
-  private static final Comparator comparator = Ordering.natural().nullsFirst();
+  private static final Comparator comparator = Ordering.natural().nullsFirst();
 
   private final String start;
   private final String end;
@@ -95,12 +95,7 @@ public class StringPartitionChunk implements PartitionChunk
     if (chunk instanceof StringPartitionChunk) {
       StringPartitionChunk stringChunk = (StringPartitionChunk) chunk;
 
-      int retVal = comparator.compare(start, stringChunk.start);
-      if (retVal == 0) {
-        retVal = comparator.compare(end, stringChunk.end);
-      }
-
-      return retVal;
+      return comparator.compare(chunkNumber, stringChunk.chunkNumber);
     }
     throw new IllegalArgumentException("Cannot compare against something that is not a StringPartitionChunk.");
   }
diff --git a/common/src/test/java/com/metamx/druid/partition/IntegerPartitionChunkTest.java b/common/src/test/java/com/metamx/druid/partition/IntegerPartitionChunkTest.java
index 0acb92911f5..f408f2b04cd 100644
--- a/common/src/test/java/com/metamx/druid/partition/IntegerPartitionChunkTest.java
+++ b/common/src/test/java/com/metamx/druid/partition/IntegerPartitionChunkTest.java
@@ -62,13 +62,13 @@ public class IntegerPartitionChunkTest
   public void testCompareTo() throws Exception
   {
     Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 0, 1)));
-    Assert.assertEquals(0, make(10, null, 0, 1).compareTo(make(10, null, 1, 2)));
-    Assert.assertEquals(0, make(null, 10, 0, 1).compareTo(make(null, 10, 1, 2)));
-    Assert.assertEquals(0, make(10, 11, 0, 1).compareTo(make(10, 11, 1, 2)));
+    Assert.assertEquals(0, make(10, null, 0, 1).compareTo(make(10, null, 0, 2)));
+    Assert.assertEquals(0, make(null, 10, 0, 1).compareTo(make(null, 10, 0, 2)));
+    Assert.assertEquals(0, make(10, 11, 0, 1).compareTo(make(10, 11, 0, 2)));
     Assert.assertEquals(-1, make(null, 10, 0, 1).compareTo(make(10, null, 1, 2)));
-    Assert.assertEquals(-1, make(11, 20, 0, 1).compareTo(make(20, 33, 0, 1)));
-    Assert.assertEquals(1, make(20, 33, 0, 1).compareTo(make(11, 20, 0, 1)));
-    Assert.assertEquals(1, make(10, null, 0, 1).compareTo(make(null, 10, 0, 1)));
+    Assert.assertEquals(-1, make(11, 20, 0, 1).compareTo(make(20, 33, 1, 1)));
+    Assert.assertEquals(1, make(20, 33, 1, 1).compareTo(make(11, 20, 0, 1)));
+    Assert.assertEquals(1, make(10, null, 1, 1).compareTo(make(null, 10, 0, 1)));
   }
 
   @Test
diff --git a/common/src/test/java/com/metamx/druid/partition/StringPartitionChunkTest.java b/common/src/test/java/com/metamx/druid/partition/StringPartitionChunkTest.java
index c6a7cdfd005..8a6cadf1743 100644
--- a/common/src/test/java/com/metamx/druid/partition/StringPartitionChunkTest.java
+++ b/common/src/test/java/com/metamx/druid/partition/StringPartitionChunkTest.java
@@ -61,14 +61,14 @@ public class StringPartitionChunkTest
   @Test
   public void testCompareTo() throws Exception
   {
-    Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 1, 2)));
-    Assert.assertEquals(0, make("10", null, 0, 1).compareTo(make("10", null, 1, 2)));
-    Assert.assertEquals(0, make(null, "10", 0, 1).compareTo(make(null, "10", 1, 2)));
-    Assert.assertEquals(0, make("10", "11", 0, 1).compareTo(make("10", "11", 1, 2)));
+    Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 0, 2)));
+    Assert.assertEquals(0, make("10", null, 0, 1).compareTo(make("10", null, 0, 2)));
+    Assert.assertEquals(0, make(null, "10", 1, 1).compareTo(make(null, "10", 1, 2)));
+    Assert.assertEquals(0, make("10", "11", 1, 1).compareTo(make("10", "11", 1, 2)));
     Assert.assertEquals(-1, make(null, "10", 0, 1).compareTo(make("10", null, 1, 2)));
-    Assert.assertEquals(-1, make("11", "20", 0, 1).compareTo(make("20", "33", 0, 1)));
-    Assert.assertEquals(1, make("20", "33", 0, 1).compareTo(make("11", "20", 0, 1)));
-    Assert.assertEquals(1, make("10", null, 0, 1).compareTo(make(null, "10", 0, 1)));
+    Assert.assertEquals(-1, make("11", "20", 0, 1).compareTo(make("20", "33", 1, 1)));
+    Assert.assertEquals(1, make("20", "33", 1, 1).compareTo(make("11", "20", 0, 1)));
+    Assert.assertEquals(1, make("10", null, 1, 1).compareTo(make(null, "10", 0, 1)));
   }
 
   @Test
diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
index 94e1ff80bd7..e5bfc3a01ee 100644
--- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
@@ -521,7 +521,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
         );
     }
 
-    runningTasks.put(task.getId(), pendingTasks.remove(task.getId()));
+    runningTasks.put(task.getId(), pendingTasks.remove(task.getId()).withWorker(theWorker));
     log.info("Task %s switched from pending to running", task.getId());
 
     // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
@@ -615,6 +615,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
                       if (taskRunnerWorkItem != null) {
                         log.info("Task %s just disappeared!", taskId);
                         taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
+                      } else {
+                        log.warn("Task %s just disappeared but I didn't know about it?!", taskId);
                       }
                       break;
                   }
@@ -653,13 +655,21 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
   {
     log.info("Kaboom! Worker[%s] removed!", worker.getHost());
 
-    ZkWorker zkWorker = zkWorkers.get(worker.getHost());
+    final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
     if (zkWorker != null) {
       try {
         List tasksToFail = Lists.newArrayList(
             cf.getChildren().forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
         );
-        tasksToFail.addAll(zkWorker.getRunningTaskIds());
+        log.info("%s: Found %d tasks assigned", worker.getHost(), tasksToFail.size());
+
+        for (Map.Entry entry : runningTasks.entrySet()) {
+          if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) {
+            log.info("%s: Found %s running", worker.getHost(), entry.getKey());
+            tasksToFail.add(entry.getKey());
+          }
+        }
+
         for (String assignedTask : tasksToFail) {
           RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
           if (taskRunnerWorkItem != null) {
diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java
index 72cb7155af8..be60c758ab6 100644
--- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java
@@ -22,6 +22,7 @@ package com.metamx.druid.indexing.coordinator;
 import com.google.common.util.concurrent.SettableFuture;
 import com.metamx.druid.indexing.common.TaskStatus;
 import com.metamx.druid.indexing.common.task.Task;
+import com.metamx.druid.indexing.worker.Worker;
 import org.joda.time.DateTime;
 
 /**
@@ -29,6 +30,7 @@ import org.joda.time.DateTime;
 public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
 {
   private final SettableFuture result;
+  private final Worker worker;
 
   public RemoteTaskRunnerWorkItem(
       Task task,
@@ -37,17 +39,25 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
   {
     super(task, result);
     this.result = result;
+    this.worker = null;
   }
 
   public RemoteTaskRunnerWorkItem(
       Task task,
       SettableFuture result,
       DateTime createdTime,
-      DateTime queueInsertionTime
+      DateTime queueInsertionTime,
+      Worker worker
   )
   {
     super(task, result, createdTime, queueInsertionTime);
     this.result = result;
+    this.worker = worker;
+  }
+
+  public Worker getWorker()
+  {
+    return worker;
   }
 
   public void setResult(TaskStatus status)
@@ -58,6 +68,11 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
   @Override
   public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time)
   {
-    return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time);
+    return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time, worker);
+  }
+
+  public RemoteTaskRunnerWorkItem withWorker(Worker worker)
+  {
+    return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), getQueueInsertionTime(), worker);
   }
 }
diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java
index 492251dfc6f..a22664ec77b 100644
--- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java
@@ -50,10 +50,13 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.Set;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
+ * Several of the tests here are integration tests rather than unit tests. We will introduce real unit tests for this
+ * class as well as integration tests in the very near future.
  */
 public class RemoteTaskRunnerTest
 {
@@ -277,6 +280,29 @@ public class RemoteTaskRunnerTest
     Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode());
   }
 
+  @Test
+  public void testWorkerRemoved() throws Exception
+  {
+    doSetup();
+    remoteTaskRunner.bootstrap(Lists.newArrayList());
+    Future future = remoteTaskRunner.run(makeTask(TaskStatus.running("task")));
+
+    Stopwatch stopwatch = new Stopwatch();
+    stopwatch.start();
+    while (cf.checkExists().forPath(joiner.join(statusPath, "task")) == null) {
+      Thread.sleep(100);
+      if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
+        throw new ISE("Cannot find running task");
+      }
+    }
+
+    workerCuratorCoordinator.stop();
+
+    TaskStatus status = future.get();
+
+    Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
+  }
+
   private void doSetup() throws Exception
   {
     makeWorker();

From 54f00479cc8e950faddd295288749e762c037da7 Mon Sep 17 00:00:00 2001
From: fjy 
Date: Wed, 21 Aug 2013 13:02:35 -0700
Subject: [PATCH 17/20] add explicit null check for moving tasks from pending
 to running

---
 .../coordinator/RemoteTaskRunner.java         | 24 ++++++++++++-------
 1 file changed, 16 insertions(+), 8 deletions(-)

diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
index e5bfc3a01ee..b7774a9b419 100644
--- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java
@@ -521,7 +521,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
         );
     }
 
-    runningTasks.put(task.getId(), pendingTasks.remove(task.getId()).withWorker(theWorker));
+    RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId());
+    if (workItem == null) {
+      log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!")
+         .addData("taskId", task.getId())
+         .emit();
+      return;
+    }
+
+    runningTasks.put(task.getId(), workItem.withWorker(theWorker));
     log.info("Task %s switched from pending to running", task.getId());
 
     // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
@@ -613,10 +621,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
                       taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
                       taskRunnerWorkItem = runningTasks.get(taskId);
                       if (taskRunnerWorkItem != null) {
-                        log.info("Task %s just disappeared!", taskId);
+                        log.info("Task[%s] just disappeared!", taskId);
                         taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
                       } else {
-                        log.warn("Task %s just disappeared but I didn't know about it?!", taskId);
+                        log.warn("Task[%s] just disappeared but I didn't know about it?!", taskId);
                       }
                       break;
                   }
@@ -661,11 +669,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
         List tasksToFail = Lists.newArrayList(
             cf.getChildren().forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
         );
-        log.info("%s: Found %d tasks assigned", worker.getHost(), tasksToFail.size());
+        log.info("[%s]: Found %d tasks assigned", worker.getHost(), tasksToFail.size());
 
         for (Map.Entry entry : runningTasks.entrySet()) {
           if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) {
-            log.info("%s: Found %s running", worker.getHost(), entry.getKey());
+            log.info("[%s]: Found [%s] running", worker.getHost(), entry.getKey());
             tasksToFail.add(entry.getKey());
           }
         }
@@ -678,10 +686,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
               cf.delete().guaranteed().forPath(taskPath);
             }
 
-            log.info("Failing task %s", assignedTask);
+            log.info("Failing task[%s]", assignedTask);
             taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
           } else {
-            log.warn("RemoteTaskRunner has no knowledge of task %s", assignedTask);
+            log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
           }
         }
       }
@@ -693,7 +701,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
           zkWorker.close();
         }
         catch (Exception e) {
-          log.error(e, "Exception closing worker %s!", worker.getHost());
+          log.error(e, "Exception closing worker[%s]!", worker.getHost());
         }
         zkWorkers.remove(worker.getHost());
       }

From ac32c6f27cad204eb4a6e6871c0d6e301e1e3a39 Mon Sep 17 00:00:00 2001
From: fjy 
Date: Wed, 21 Aug 2013 13:23:38 -0700
Subject: [PATCH 18/20] fix tests according to code review

---
 .../com/metamx/druid/realtime/RealtimeManager.java |  6 +++++-
 .../MessageTimeRejectionPolicyFactoryTest.java     | 14 ++++++++++----
 .../ServerTimeRejectionPolicyFactoryTest.java      | 13 ++++++++-----
 3 files changed, 23 insertions(+), 10 deletions(-)

diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java
index dc7bdc7960e..9f6d64f0898 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java
@@ -96,7 +96,11 @@ public class RealtimeManager implements QuerySegmentWalker
   }
   public FireDepartmentMetrics getMetrics(String datasource)
   {
-    return chiefs.get(datasource).getMetrics();
+    FireChief chief = chiefs.get(datasource);
+    if (chief == null) {
+      return null;
+    }
+    return chief.getMetrics();
   }
 
   @Override
diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java
index c113cbf80ca..87a6b9e9536 100644
--- a/realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java
+++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactoryTest.java
@@ -31,10 +31,16 @@ public class MessageTimeRejectionPolicyFactoryTest
   @Test
   public void testAccept() throws Exception
   {
-    RejectionPolicy rejectionPolicy = new MessageTimeRejectionPolicyFactory().create(
-        new Period("PT10M")
-    );
+    Period period = new Period("PT10M");
+    RejectionPolicy rejectionPolicy = new MessageTimeRejectionPolicyFactory().create(period);
 
-    Assert.assertTrue(rejectionPolicy.accept(new DateTime().getMillis()));
+    DateTime now = new DateTime();
+    DateTime past = now.minus(period).minus(1);
+    DateTime future = now.plus(period).plus(1);
+
+    Assert.assertTrue(rejectionPolicy.accept(now.getMillis()));
+    Assert.assertFalse(rejectionPolicy.accept(past.getMillis()));
+    Assert.assertTrue(rejectionPolicy.accept(future.getMillis()));
+    Assert.assertFalse(rejectionPolicy.accept(now.getMillis()));
   }
 }
diff --git a/realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java b/realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java
index c262e345895..97f99fd396a 100644
--- a/realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java
+++ b/realtime/src/test/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java
@@ -31,11 +31,14 @@ public class ServerTimeRejectionPolicyFactoryTest
   @Test
   public void testAccept() throws Exception
   {
-    RejectionPolicy rejectionPolicy = new ServerTimeRejectionPolicyFactory().create(
-        new Period("PT10M")
-    );
+    Period period = new Period("PT10M");
 
-    Assert.assertTrue(rejectionPolicy.accept(new DateTime().getMillis()));
-    Assert.assertFalse(rejectionPolicy.accept(new DateTime("2000").getMillis()));
+    RejectionPolicy rejectionPolicy = new ServerTimeRejectionPolicyFactory().create(period);
+
+    DateTime now = new DateTime();
+    DateTime past = now.minus(period).minus(1);
+
+    Assert.assertTrue(rejectionPolicy.accept(now.getMillis()));
+    Assert.assertFalse(rejectionPolicy.accept(past.getMillis()));
   }
 }

From 0f4a1328071b61c643d626191ef37b78fcb9c5f1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= 
Date: Wed, 21 Aug 2013 13:57:50 -0700
Subject: [PATCH 19/20] address code review

---
 .../druid/realtime/firehose/WikipediaIrcDecoder.java       | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java
index 4d1c15ce125..72e4022294e 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java
@@ -21,6 +21,7 @@ package com.metamx.druid.realtime.firehose;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.maxmind.geoip2.DatabaseReader;
@@ -79,7 +80,9 @@ class WikipediaIrcDecoder implements IrcDecoder
   public WikipediaIrcDecoder(@JsonProperty("namespaces") Map> namespaces,
                              @JsonProperty("geoIpDatabase") String geoIpDatabase)
   {
-    if(namespaces == null) namespaces = Maps.newHashMap();
+    if(namespaces == null) {
+      namespaces = Maps.newHashMap();
+    }
     this.namespaces = namespaces;
 
 
@@ -195,7 +198,7 @@ class WikipediaIrcDecoder implements IrcDecoder
       @Override
       public List getDimension(String dimension)
       {
-        return Lists.newArrayList(dimensions.get(dimension));
+        return ImmutableList.of(dimensions.get(dimension));
       }
 
       @Override

From 7913b27066051edaa6e3b0139b26cb1ff0cada42 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= 
Date: Wed, 21 Aug 2013 14:12:39 -0700
Subject: [PATCH 20/20] reuse tmp file for geoip db

---
 .../firehose/WikipediaIrcDecoder.java         | 21 ++++++++++---------
 1 file changed, 11 insertions(+), 10 deletions(-)

diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java
index 72e4022294e..b1ecf91554e 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java
@@ -91,17 +91,18 @@ class WikipediaIrcDecoder implements IrcDecoder
       geoDb = new File(geoIpDatabase);
     } else {
       try {
-        geoDb = File.createTempFile("geoip", null);
-        geoDb.deleteOnExit();
+        String tmpDir = System.getProperty("java.io.tmpdir");
+        geoDb = new File(tmpDir, this.getClass().getCanonicalName() + ".GeoLite2-City.mmdb");
+        if(!geoDb.exists()) {
+          log.info("Downloading geo ip database to [%s]", geoDb);
 
-        log.info("Downloading geo ip database to [%s]", geoDb);
-
-        FileUtils.copyInputStreamToFile(
-            new GZIPInputStream(
-                new URL("http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz").openStream()
-            ),
-            geoDb
-        );
+          FileUtils.copyInputStreamToFile(
+              new GZIPInputStream(
+                  new URL("http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz").openStream()
+              ),
+              geoDb
+          );
+        }
       } catch(IOException e) {
         throw new RuntimeException("Unable to download geo ip database [%s]", e);
       }