From 8ccac2f13d670fd9c4069079c007d9eb1378ae72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 13 Aug 2013 18:35:07 -0700 Subject: [PATCH 1/8] basic wikipedia firehose factory --- pom.xml | 5 + realtime/pom.xml | 4 + .../realtime/firehose/FirehoseFactory.java | 3 +- .../druid/realtime/firehose/IrcDecoder.java | 15 ++ .../realtime/firehose/IrcFirehoseFactory.java | 217 ++++++++++++++++++ .../firehose/WikipediaIrcDecoder.java | 132 +++++++++++ 6 files changed, 375 insertions(+), 1 deletion(-) create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcDecoder.java create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java diff --git a/pom.xml b/pom.xml index ece774d529a..e81fb297fb8 100644 --- a/pom.xml +++ b/pom.xml @@ -359,6 +359,11 @@ ${apache.curator.version} test + + com.ircclouds.irc + irc-api + 1.0-0011 + diff --git a/realtime/pom.xml b/realtime/pom.xml index eff0de32e91..5ebde38abac 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -147,6 +147,10 @@ java-xmlbuilder true + + com.ircclouds.irc + irc-api + diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java index c9d9dfec3de..937947bfb34 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java @@ -29,7 +29,8 @@ import java.io.IOException; @JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class), @JsonSubTypes.Type(name = "rabbitmq", value = RabbitMQFirehoseFactory.class), @JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class), - @JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class) + @JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class), + @JsonSubTypes.Type(name = "irc", value = IrcFirehoseFactory.class) }) public interface FirehoseFactory { diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcDecoder.java new file mode 100644 index 00000000000..3ce230fbcfe --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcDecoder.java @@ -0,0 +1,15 @@ +package com.metamx.druid.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.metamx.druid.input.InputRow; +import org.joda.time.DateTime; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(name = "wikipedia", value = WikipediaIrcDecoder.class) + }) +public interface IrcDecoder +{ + public InputRow decodeMessage(DateTime timestamp, String channel, String msg); +} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java new file mode 100644 index 00000000000..6c0a5ff0756 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java @@ -0,0 +1,217 @@ +package com.metamx.druid.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.ircclouds.irc.api.Callback; +import com.ircclouds.irc.api.IRCApi; +import com.ircclouds.irc.api.IRCApiImpl; +import com.ircclouds.irc.api.IServerParameters; +import com.ircclouds.irc.api.domain.IRCServer; +import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg; +import com.ircclouds.irc.api.listeners.VariousMessageListenerAdapter; +import com.ircclouds.irc.api.state.IIRCState; +import com.metamx.common.Pair; +import com.metamx.common.logger.Logger; +import com.metamx.druid.input.InputRow; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.LinkedBlockingQueue; + + +public class IrcFirehoseFactory implements FirehoseFactory +{ + private static final Logger log = new Logger(IrcFirehoseFactory.class); + + private final String nick; + private final String host; + private final List channels; + private final IrcDecoder decoder; + + @JsonCreator + public IrcFirehoseFactory( + @JsonProperty String nick, + @JsonProperty String host, + @JsonProperty List channels, + @JsonProperty IrcDecoder decoder + ) + { + this.nick = nick; + this.host = host; + this.channels = channels; + this.decoder = decoder; + } + + @Override + public Firehose connect() throws IOException + { + final IRCApi irc = new IRCApiImpl(false); + final LinkedBlockingQueue> queue = new LinkedBlockingQueue>(); + + irc.addListener(new VariousMessageListenerAdapter() { + @Override + public void onChannelMessage(ChannelPrivMsg aMsg) + { + try { + queue.put(Pair.of(new DateTime(), aMsg)); + } catch(InterruptedException e) { + throw new RuntimeException("interrupted adding message to queue", e); + } + } + }); + + log.info("connecting to irc server [%s]", host); + irc.connect( + new IServerParameters() + { + @Override + public String getNickname() + { + return nick; + } + + @Override + public List getAlternativeNicknames() + { + return Lists.newArrayList(nick + "_", + nick + "__", + nick + "___"); + } + + @Override + public String getIdent() + { + return "druid"; + } + + @Override + public String getRealname() + { + return nick; + } + + @Override + public IRCServer getServer() + { + return new IRCServer(host, false); + } + }, + new Callback() + { + @Override + public void onSuccess(IIRCState aObject) + { + log.info("irc connection to server [%s] established", host); + for(String chan : channels) { + log.info("Joining channel %s", chan); + irc.joinChannel(chan); + } + } + + @Override + public void onFailure(Exception e) + { + log.error(e, "Unable to connect to irc server [%s]", host); + throw new RuntimeException("Unable to connect to server", e); + } + }); + + + return new Firehose() + { + InputRow nextRow = null; + + @Override + public boolean hasMore() + { + try { + while(true) { + Pair nextMsg = queue.take(); + try { + nextRow = decoder.decodeMessage(nextMsg.lhs, nextMsg.rhs.getChannelName(), nextMsg.rhs.getText()); + if(nextRow != null) return true; + } + catch (IllegalArgumentException iae) { + log.debug("ignoring invalid message in channel [%s]", nextMsg.rhs.getChannelName()); + } + } + } + catch(InterruptedException e) { + Thread.interrupted(); + throw new RuntimeException("interrupted retrieving elements from queue", e); + } + } + + @Override + public InputRow nextRow() + { + return nextRow; + } + + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + // nothing to see here + } + }; + } + + @Override + public void close() throws IOException + { + log.info("disconnecting from irc server [%s]", host); + irc.disconnect(""); + } + }; + } + + public static void main(String[] args) throws IOException { + + Map namespaces = Maps.newHashMap(); + namespaces.put("", "main"); + namespaces.put("Category", "category"); + namespaces.put("Media", "media"); + namespaces.put("MediaWiki", "mediawiki"); + namespaces.put("Template", "template"); + namespaces.put("$1 talk", "project talk"); + namespaces.put("Help talk", "help talk"); + namespaces.put("User", "user"); + namespaces.put("Template talk", "template talk"); + namespaces.put("MediaWiki talk", "mediawiki talk"); + namespaces.put("Talk", "talk"); + namespaces.put("Help", "help"); + namespaces.put("File talk", "file talk"); + namespaces.put("File", "file"); + namespaces.put("User talk", "user talk"); + namespaces.put("Special", "special"); + namespaces.put("Category talk", "category talk"); + + IrcFirehoseFactory factory = new IrcFirehoseFactory( + UUID.randomUUID().toString(), + "irc.wikimedia.org", + Lists.newArrayList( + "#en.wikipedia", + "#fr.wikipedia", + "#de.wikipedia", + "#ja.wikipedia" + ), + new WikipediaIrcDecoder(namespaces) + ); + + Firehose f = factory.connect(); + while(f.hasMore()) { + System.out.println(f.nextRow()); + } + } +} + diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java new file mode 100644 index 00000000000..e7faebaa912 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java @@ -0,0 +1,132 @@ +package com.metamx.druid.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.druid.input.InputRow; +import org.joda.time.DateTime; + +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +class WikipediaIrcDecoder implements IrcDecoder +{ + static final Pattern pattern = Pattern.compile( + "\\x0314\\[\\[\\x0307(.+?)\\x0314\\]\\]\\x034 (.*?)\\x0310.*\\x0302(http.+?)\\x03.+\\x0303(.+?)\\x03.+\\x03 (\\(([+-]\\d+)\\).*|.+) \\x0310(.+)\\x03" + ); + static final Pattern ipPattern = Pattern.compile("\\d+.\\d+.\\d+.\\d+"); + static final Pattern shortnamePattern = Pattern.compile("#(\\w\\w)\\..*"); + + static final List dimensionList = Lists.newArrayList( + "page", + "language", + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "added", + "delete", + "delta" + ); + + final Map namespaces; + + @JsonCreator + public WikipediaIrcDecoder(@JsonProperty Map namespaces) + { + if(namespaces == null) namespaces = Maps.newHashMap(); + this.namespaces = namespaces; + } + + @Override + public InputRow decodeMessage(final DateTime timestamp, String channel, String msg) + { + final Map dimensions = Maps.newHashMap(); + final Map metrics = Maps.newHashMap(); + + Matcher m = pattern.matcher(msg); + if(!m.matches()) { + throw new IllegalArgumentException("Invalid input format"); + } + + Matcher shortname = shortnamePattern.matcher(channel); + if(shortname.matches()) { + dimensions.put("language", shortname.group(1)); + } + + String page = m.group(1); + String pageUrl = page.replaceAll("\\s", "_"); + + dimensions.put("page", pageUrl); + + String user = m.group(4); + boolean anonymous = ipPattern.matcher(user).matches(); + dimensions.put("user", user); + + final String flags = m.group(2); + dimensions.put("unpatrolled", Boolean.toString(flags.contains("!"))); + dimensions.put("newPage", Boolean.toString(flags.contains("N"))); + dimensions.put("robot", Boolean.toString(flags.contains("B"))); + + dimensions.put("anonymous", Boolean.toString(anonymous)); + + String[] parts = page.split(":"); + if(parts.length > 1 && !parts[1].startsWith(" ")) { + if(namespaces.containsKey(parts[0])) { + dimensions.put("namespace", namespaces.get(parts[0])); + } else { + dimensions.put("namespace", "wikipedia"); + } + } + else { + dimensions.put("namespace", "article"); + } + + float delta = m.group(6) != null ? Float.parseFloat(m.group(6)) : 0; + metrics.put("delta", delta); + metrics.put("added", Math.max(delta, 0)); + metrics.put("deleted", Math.min(delta, 0)); + + return new InputRow() + { + @Override + public List getDimensions() + { + return dimensionList; + } + + @Override + public long getTimestampFromEpoch() + { + return timestamp.getMillis(); + } + + @Override + public List getDimension(String dimension) + { + return Lists.newArrayList(dimensions.get(dimension)); + } + + @Override + public float getFloatMetric(String metric) + { + return metrics.get(metric); + } + + @Override + public String toString() + { + return "WikipediaRow{" + + "timestamp=" + timestamp + + ", dimensions=" + dimensions + + ", metrics=" + metrics + + '}'; + } + }; + } +} From 1cc1d0be5f801646f951dd1b3eaf820cb4318e40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 13 Aug 2013 19:05:56 -0700 Subject: [PATCH 2/8] remove example code --- .../realtime/firehose/IrcFirehoseFactory.java | 75 +++++++++---------- 1 file changed, 34 insertions(+), 41 deletions(-) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java index 6c0a5ff0756..b9b4bb57247 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java @@ -19,11 +19,43 @@ import org.joda.time.DateTime; import java.io.IOException; import java.util.List; -import java.util.Map; -import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; +/** + * Example usage + *
+    Map namespaces = Maps.newHashMap();
+    namespaces.put("", "main");
+    namespaces.put("Category", "category");
+    namespaces.put("Media", "media");
+    namespaces.put("MediaWiki", "mediawiki");
+    namespaces.put("Template", "template");
+    namespaces.put("$1 talk", "project talk");
+    namespaces.put("Help talk", "help talk");
+    namespaces.put("User", "user");
+    namespaces.put("Template talk", "template talk");
+    namespaces.put("MediaWiki talk", "mediawiki talk");
+    namespaces.put("Talk", "talk");
+    namespaces.put("Help", "help");
+    namespaces.put("File talk", "file talk");
+    namespaces.put("File", "file");
+    namespaces.put("User talk", "user talk");
+    namespaces.put("Special", "special");
+    namespaces.put("Category talk", "category talk");
 
+    IrcFirehoseFactory factory = new IrcFirehoseFactory(
+        "wiki-druid-123",
+        "irc.wikimedia.org",
+        Lists.newArrayList(
+            "#en.wikipedia",
+            "#fr.wikipedia",
+            "#de.wikipedia",
+            "#ja.wikipedia"
+        ),
+        new WikipediaIrcDecoder(namespaces)
+    );
+   
+ */ public class IrcFirehoseFactory implements FirehoseFactory { private static final Logger log = new Logger(IrcFirehoseFactory.class); @@ -174,44 +206,5 @@ public class IrcFirehoseFactory implements FirehoseFactory } }; } - - public static void main(String[] args) throws IOException { - - Map namespaces = Maps.newHashMap(); - namespaces.put("", "main"); - namespaces.put("Category", "category"); - namespaces.put("Media", "media"); - namespaces.put("MediaWiki", "mediawiki"); - namespaces.put("Template", "template"); - namespaces.put("$1 talk", "project talk"); - namespaces.put("Help talk", "help talk"); - namespaces.put("User", "user"); - namespaces.put("Template talk", "template talk"); - namespaces.put("MediaWiki talk", "mediawiki talk"); - namespaces.put("Talk", "talk"); - namespaces.put("Help", "help"); - namespaces.put("File talk", "file talk"); - namespaces.put("File", "file"); - namespaces.put("User talk", "user talk"); - namespaces.put("Special", "special"); - namespaces.put("Category talk", "category talk"); - - IrcFirehoseFactory factory = new IrcFirehoseFactory( - UUID.randomUUID().toString(), - "irc.wikimedia.org", - Lists.newArrayList( - "#en.wikipedia", - "#fr.wikipedia", - "#de.wikipedia", - "#ja.wikipedia" - ), - new WikipediaIrcDecoder(namespaces) - ); - - Firehose f = factory.connect(); - while(f.hasMore()) { - System.out.println(f.nextRow()); - } - } } From 6735ae4ecd8a29c0b55776270e7d301a323df6c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 14 Aug 2013 14:28:29 -0700 Subject: [PATCH 3/8] add ip geo lookup --- pom.xml | 14 ++-- realtime/pom.xml | 5 ++ .../realtime/firehose/IrcFirehoseFactory.java | 3 +- .../firehose/WikipediaIrcDecoder.java | 76 +++++++++++++++++-- 4 files changed, 84 insertions(+), 14 deletions(-) diff --git a/pom.xml b/pom.xml index e81fb297fb8..b2f2c922af7 100644 --- a/pom.xml +++ b/pom.xml @@ -182,37 +182,37 @@ com.fasterxml.jackson.core jackson-annotations - 2.1.4 + 2.2.2 com.fasterxml.jackson.core jackson-core - 2.1.4 + 2.2.2 com.fasterxml.jackson.core jackson-databind - 2.1.4 + 2.2.2 com.fasterxml.jackson.datatype jackson-datatype-guava - 2.1.2 + 2.2.2 com.fasterxml.jackson.datatype jackson-datatype-joda - 2.1.2 + 2.2.2 com.fasterxml.jackson.dataformat jackson-dataformat-smile - 2.1.4 + 2.2.2 com.fasterxml.jackson.jaxrs jackson-jaxrs-json-provider - 2.1.4 + 2.2.2 javax.inject diff --git a/realtime/pom.xml b/realtime/pom.xml index 5ebde38abac..b4f81e06595 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -151,6 +151,11 @@ com.ircclouds.irc irc-api + + com.maxmind.geoip2 + geoip2 + 0.4.0 + diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java index b9b4bb57247..d7102c46a39 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java @@ -3,7 +3,6 @@ package com.metamx.druid.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.ircclouds.irc.api.Callback; import com.ircclouds.irc.api.IRCApi; import com.ircclouds.irc.api.IRCApiImpl; @@ -90,7 +89,7 @@ public class IrcFirehoseFactory implements FirehoseFactory public void onChannelMessage(ChannelPrivMsg aMsg) { try { - queue.put(Pair.of(new DateTime(), aMsg)); + queue.put(Pair.of(DateTime.now(), aMsg)); } catch(InterruptedException e) { throw new RuntimeException("interrupted adding message to queue", e); } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java index e7faebaa912..a11937cd06a 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java @@ -4,16 +4,31 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.maxmind.geoip2.DatabaseReader; +import com.maxmind.geoip2.exception.GeoIp2Exception; +import com.maxmind.geoip2.model.Omni; +import com.metamx.common.logger.Logger; import com.metamx.druid.input.InputRow; +import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.zip.GZIPInputStream; class WikipediaIrcDecoder implements IrcDecoder { + static final Logger log = new Logger(WikipediaIrcDecoder.class); + + final DatabaseReader geoLookup; + static final Pattern pattern = Pattern.compile( "\\x0314\\[\\[\\x0307(.+?)\\x0314\\]\\]\\x034 (.*?)\\x0310.*\\x0302(http.+?)\\x03.+\\x0303(.+?)\\x03.+\\x03 (\\(([+-]\\d+)\\).*|.+) \\x0310(.+)\\x03" ); @@ -29,18 +44,51 @@ class WikipediaIrcDecoder implements IrcDecoder "robot", "anonymous", "namespace", - "added", - "delete", - "delta" + "continent", + "country", + "region", + "city" ); final Map namespaces; + public WikipediaIrcDecoder(Map namespaces) { + this(namespaces, null); + } + @JsonCreator - public WikipediaIrcDecoder(@JsonProperty Map namespaces) + public WikipediaIrcDecoder(@JsonProperty Map namespaces, + @JsonProperty String geoDbFile) { if(namespaces == null) namespaces = Maps.newHashMap(); this.namespaces = namespaces; + + + File geoDb; + if(geoDbFile != null) { + geoDb = new File(geoDbFile); + } else { + try { + geoDb = File.createTempFile("geoip", null); + geoDb.deleteOnExit(); + + log.info("Downloading geo ip database to [%s]", geoDb); + + FileUtils.copyInputStreamToFile( + new GZIPInputStream( + new URL("http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz").openStream() + ), + geoDb + ); + } catch(IOException e) { + throw new RuntimeException("Unable to download geo ip database [%s]", e); + } + } + try { + geoLookup = new DatabaseReader(geoDb); + } catch(IOException e) { + throw new RuntimeException("Unable to open geo ip lookup database", e); + } } @Override @@ -65,7 +113,25 @@ class WikipediaIrcDecoder implements IrcDecoder dimensions.put("page", pageUrl); String user = m.group(4); - boolean anonymous = ipPattern.matcher(user).matches(); + Matcher ipMatch = ipPattern.matcher(user); + boolean anonymous = ipMatch.matches(); + if(anonymous) { + try { + final InetAddress ip = InetAddress.getByName(ipMatch.group()); + final Omni lookup = geoLookup.omni(ip); + + dimensions.put("continent", lookup.getContinent().getName()); + dimensions.put("country", lookup.getCountry().getName()); + dimensions.put("region", lookup.getMostSpecificSubdivision().getName()); + dimensions.put("city", lookup.getCity().getName()); + } catch(UnknownHostException e) { + log.error(e, "invalid ip [%s]", ipMatch.group()); + } catch(IOException e) { + log.error(e, "error looking up geo ip"); + } catch(GeoIp2Exception e) { + log.error(e, "error looking up geo ip"); + } + } dimensions.put("user", user); final String flags = m.group(2); From 70b3640b852a1a22798a6819711fb560d329ca6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 14 Aug 2013 14:31:55 -0700 Subject: [PATCH 4/8] move dependency version into main pom --- pom.xml | 5 +++++ realtime/pom.xml | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b2f2c922af7..d36f7ca9605 100644 --- a/pom.xml +++ b/pom.xml @@ -364,6 +364,11 @@ irc-api 1.0-0011 + + com.maxmind.geoip2 + geoip2 + 0.4.0 + diff --git a/realtime/pom.xml b/realtime/pom.xml index b4f81e06595..722697a1f8a 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -154,7 +154,6 @@ com.maxmind.geoip2 geoip2 - 0.4.0 From 3f33ca15b057a5771b64ceda6e35fc7125dfa797 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 15 Aug 2013 10:58:03 -0700 Subject: [PATCH 5/8] refactor namespaces and add docs / license --- .../realtime/firehose/IrcFirehoseFactory.java | 99 +++++++++++++------ .../firehose/WikipediaIrcDecoder.java | 36 +++++-- 2 files changed, 95 insertions(+), 40 deletions(-) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java index d7102c46a39..6a51be2518a 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; @@ -16,44 +35,60 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.input.InputRow; import org.joda.time.DateTime; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; /** * Example usage - *
-    Map namespaces = Maps.newHashMap();
-    namespaces.put("", "main");
-    namespaces.put("Category", "category");
-    namespaces.put("Media", "media");
-    namespaces.put("MediaWiki", "mediawiki");
-    namespaces.put("Template", "template");
-    namespaces.put("$1 talk", "project talk");
-    namespaces.put("Help talk", "help talk");
-    namespaces.put("User", "user");
-    namespaces.put("Template talk", "template talk");
-    namespaces.put("MediaWiki talk", "mediawiki talk");
-    namespaces.put("Talk", "talk");
-    namespaces.put("Help", "help");
-    namespaces.put("File talk", "file talk");
-    namespaces.put("File", "file");
-    namespaces.put("User talk", "user talk");
-    namespaces.put("Special", "special");
-    namespaces.put("Category talk", "category talk");
-
-    IrcFirehoseFactory factory = new IrcFirehoseFactory(
-        "wiki-druid-123",
-        "irc.wikimedia.org",
-        Lists.newArrayList(
-            "#en.wikipedia",
-            "#fr.wikipedia",
-            "#de.wikipedia",
-            "#ja.wikipedia"
-        ),
-        new WikipediaIrcDecoder(namespaces)
-    );
-   
+ * + * Decoder definition wikipedia-decoder.json + *
{@code
+ * {
+ *   "type": "wikipedia",
+ *   "namespaces": {
+ *     "#en.wikipedia": {
+ *       "": "main",
+ *       "Category": "category",
+ *       "Template talk": "template talk",
+ *       "Help talk": "help talk",
+ *       "Media": "media",
+ *       "MediaWiki talk": "mediawiki talk",
+ *       "File talk": "file talk",
+ *       "MediaWiki": "mediawiki",
+ *       "User": "user",
+ *       "File": "file",
+ *       "User talk": "user talk",
+ *       "Template": "template",
+ *       "Help": "help",
+ *       "Special": "special",
+ *       "Talk": "talk",
+ *       "Category talk": "category talk"
+ *     }
+ *   },
+ *   "geoIpDatabase": "path/to/GeoLite2-City.mmdb"
+ * }
+ * }
+ * + *
{@code
+ * IrcDecoder wikipediaDecoder = new ObjectMapper().readValue(
+ *   new File("wikipedia-decoder.json"),
+ *   IrcDecoder.class
+ * );
+ *
+ * IrcFirehoseFactory factory = new IrcFirehoseFactory(
+ *     "wiki123",
+ *     "irc.wikimedia.org",
+ *     Lists.newArrayList(
+ *         "#en.wikipedia",
+ *         "#fr.wikipedia",
+ *         "#de.wikipedia",
+ *         "#ja.wikipedia"
+ *     ),
+ *     wikipediaDecoder
+ * );
+ * }
*/ public class IrcFirehoseFactory implements FirehoseFactory { diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java index a11937cd06a..4d1c15ce125 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; @@ -50,23 +69,23 @@ class WikipediaIrcDecoder implements IrcDecoder "city" ); - final Map namespaces; + final Map> namespaces; - public WikipediaIrcDecoder(Map namespaces) { + public WikipediaIrcDecoder( Map> namespaces) { this(namespaces, null); } @JsonCreator - public WikipediaIrcDecoder(@JsonProperty Map namespaces, - @JsonProperty String geoDbFile) + public WikipediaIrcDecoder(@JsonProperty("namespaces") Map> namespaces, + @JsonProperty("geoIpDatabase") String geoIpDatabase) { if(namespaces == null) namespaces = Maps.newHashMap(); this.namespaces = namespaces; File geoDb; - if(geoDbFile != null) { - geoDb = new File(geoDbFile); + if(geoIpDatabase != null) { + geoDb = new File(geoIpDatabase); } else { try { geoDb = File.createTempFile("geoip", null); @@ -143,8 +162,9 @@ class WikipediaIrcDecoder implements IrcDecoder String[] parts = page.split(":"); if(parts.length > 1 && !parts[1].startsWith(" ")) { - if(namespaces.containsKey(parts[0])) { - dimensions.put("namespace", namespaces.get(parts[0])); + Map channelNamespaces = namespaces.get(channel); + if(channelNamespaces != null && channelNamespaces.containsKey(parts[0])) { + dimensions.put("namespace", channelNamespaces.get(parts[0])); } else { dimensions.put("namespace", "wikipedia"); } From 071e8cb2f33732bef17aec9ac39cadf7d4a04f9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 15 Aug 2013 11:07:55 -0700 Subject: [PATCH 6/8] updated javadocs --- .../metamx/druid/realtime/firehose/IrcFirehoseFactory.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java index 6a51be2518a..aa9f47c8c81 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java @@ -41,10 +41,11 @@ import java.util.List; import java.util.concurrent.LinkedBlockingQueue; /** - * Example usage + *

Example Usage

* - * Decoder definition wikipedia-decoder.json + *

Decoder definition: wikipedia-decoder.json

*
{@code
+ *
  * {
  *   "type": "wikipedia",
  *   "namespaces": {
@@ -71,6 +72,7 @@ import java.util.concurrent.LinkedBlockingQueue;
  * }
  * }
* + *

Example code:

*
{@code
  * IrcDecoder wikipediaDecoder = new ObjectMapper().readValue(
  *   new File("wikipedia-decoder.json"),

From 0f4a1328071b61c643d626191ef37b78fcb9c5f1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= 
Date: Wed, 21 Aug 2013 13:57:50 -0700
Subject: [PATCH 7/8] address code review

---
 .../druid/realtime/firehose/WikipediaIrcDecoder.java       | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java
index 4d1c15ce125..72e4022294e 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java
@@ -21,6 +21,7 @@ package com.metamx.druid.realtime.firehose;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.maxmind.geoip2.DatabaseReader;
@@ -79,7 +80,9 @@ class WikipediaIrcDecoder implements IrcDecoder
   public WikipediaIrcDecoder(@JsonProperty("namespaces") Map> namespaces,
                              @JsonProperty("geoIpDatabase") String geoIpDatabase)
   {
-    if(namespaces == null) namespaces = Maps.newHashMap();
+    if(namespaces == null) {
+      namespaces = Maps.newHashMap();
+    }
     this.namespaces = namespaces;
 
 
@@ -195,7 +198,7 @@ class WikipediaIrcDecoder implements IrcDecoder
       @Override
       public List getDimension(String dimension)
       {
-        return Lists.newArrayList(dimensions.get(dimension));
+        return ImmutableList.of(dimensions.get(dimension));
       }
 
       @Override

From 7913b27066051edaa6e3b0139b26cb1ff0cada42 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= 
Date: Wed, 21 Aug 2013 14:12:39 -0700
Subject: [PATCH 8/8] reuse tmp file for geoip db

---
 .../firehose/WikipediaIrcDecoder.java         | 21 ++++++++++---------
 1 file changed, 11 insertions(+), 10 deletions(-)

diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java
index 72e4022294e..b1ecf91554e 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java
@@ -91,17 +91,18 @@ class WikipediaIrcDecoder implements IrcDecoder
       geoDb = new File(geoIpDatabase);
     } else {
       try {
-        geoDb = File.createTempFile("geoip", null);
-        geoDb.deleteOnExit();
+        String tmpDir = System.getProperty("java.io.tmpdir");
+        geoDb = new File(tmpDir, this.getClass().getCanonicalName() + ".GeoLite2-City.mmdb");
+        if(!geoDb.exists()) {
+          log.info("Downloading geo ip database to [%s]", geoDb);
 
-        log.info("Downloading geo ip database to [%s]", geoDb);
-
-        FileUtils.copyInputStreamToFile(
-            new GZIPInputStream(
-                new URL("http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz").openStream()
-            ),
-            geoDb
-        );
+          FileUtils.copyInputStreamToFile(
+              new GZIPInputStream(
+                  new URL("http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz").openStream()
+              ),
+              geoDb
+          );
+        }
       } catch(IOException e) {
         throw new RuntimeException("Unable to download geo ip database [%s]", e);
       }