diff --git a/pom.xml b/pom.xml index 3593bf4ce18..dca1e808e2a 100644 --- a/pom.xml +++ b/pom.xml @@ -182,37 +182,37 @@ com.fasterxml.jackson.core jackson-annotations - 2.1.4 + 2.2.2 com.fasterxml.jackson.core jackson-core - 2.1.4 + 2.2.2 com.fasterxml.jackson.core jackson-databind - 2.1.4 + 2.2.2 com.fasterxml.jackson.datatype jackson-datatype-guava - 2.1.2 + 2.2.2 com.fasterxml.jackson.datatype jackson-datatype-joda - 2.1.2 + 2.2.2 com.fasterxml.jackson.dataformat jackson-dataformat-smile - 2.1.4 + 2.2.2 com.fasterxml.jackson.jaxrs jackson-jaxrs-json-provider - 2.1.4 + 2.2.2 javax.inject @@ -365,6 +365,16 @@ ${apache.curator.version} test + + com.ircclouds.irc + irc-api + 1.0-0011 + + + com.maxmind.geoip2 + geoip2 + 0.4.0 + diff --git a/realtime/pom.xml b/realtime/pom.xml index b7643285985..b387678b59e 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -147,6 +147,14 @@ java-xmlbuilder true + + com.ircclouds.irc + irc-api + + + com.maxmind.geoip2 + geoip2 + diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java index c9d9dfec3de..937947bfb34 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/FirehoseFactory.java @@ -29,7 +29,8 @@ import java.io.IOException; @JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class), @JsonSubTypes.Type(name = "rabbitmq", value = RabbitMQFirehoseFactory.class), @JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class), - @JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class) + @JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class), + @JsonSubTypes.Type(name = "irc", value = IrcFirehoseFactory.class) }) public interface FirehoseFactory { diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcDecoder.java new file mode 100644 index 00000000000..3ce230fbcfe --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcDecoder.java @@ -0,0 +1,15 @@ +package com.metamx.druid.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.metamx.druid.input.InputRow; +import org.joda.time.DateTime; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(name = "wikipedia", value = WikipediaIrcDecoder.class) + }) +public interface IrcDecoder +{ + public InputRow decodeMessage(DateTime timestamp, String channel, String msg); +} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java new file mode 100644 index 00000000000..aa9f47c8c81 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/IrcFirehoseFactory.java @@ -0,0 +1,246 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import com.ircclouds.irc.api.Callback; +import com.ircclouds.irc.api.IRCApi; +import com.ircclouds.irc.api.IRCApiImpl; +import com.ircclouds.irc.api.IServerParameters; +import com.ircclouds.irc.api.domain.IRCServer; +import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg; +import com.ircclouds.irc.api.listeners.VariousMessageListenerAdapter; +import com.ircclouds.irc.api.state.IIRCState; +import com.metamx.common.Pair; +import com.metamx.common.logger.Logger; +import com.metamx.druid.input.InputRow; +import org.joda.time.DateTime; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +/** + *

Example Usage

+ * + *

Decoder definition: wikipedia-decoder.json

+ *
{@code
+ *
+ * {
+ *   "type": "wikipedia",
+ *   "namespaces": {
+ *     "#en.wikipedia": {
+ *       "": "main",
+ *       "Category": "category",
+ *       "Template talk": "template talk",
+ *       "Help talk": "help talk",
+ *       "Media": "media",
+ *       "MediaWiki talk": "mediawiki talk",
+ *       "File talk": "file talk",
+ *       "MediaWiki": "mediawiki",
+ *       "User": "user",
+ *       "File": "file",
+ *       "User talk": "user talk",
+ *       "Template": "template",
+ *       "Help": "help",
+ *       "Special": "special",
+ *       "Talk": "talk",
+ *       "Category talk": "category talk"
+ *     }
+ *   },
+ *   "geoIpDatabase": "path/to/GeoLite2-City.mmdb"
+ * }
+ * }
+ * + *

Example code:

+ *
{@code
+ * IrcDecoder wikipediaDecoder = new ObjectMapper().readValue(
+ *   new File("wikipedia-decoder.json"),
+ *   IrcDecoder.class
+ * );
+ *
+ * IrcFirehoseFactory factory = new IrcFirehoseFactory(
+ *     "wiki123",
+ *     "irc.wikimedia.org",
+ *     Lists.newArrayList(
+ *         "#en.wikipedia",
+ *         "#fr.wikipedia",
+ *         "#de.wikipedia",
+ *         "#ja.wikipedia"
+ *     ),
+ *     wikipediaDecoder
+ * );
+ * }
+ */ +public class IrcFirehoseFactory implements FirehoseFactory +{ + private static final Logger log = new Logger(IrcFirehoseFactory.class); + + private final String nick; + private final String host; + private final List channels; + private final IrcDecoder decoder; + + @JsonCreator + public IrcFirehoseFactory( + @JsonProperty String nick, + @JsonProperty String host, + @JsonProperty List channels, + @JsonProperty IrcDecoder decoder + ) + { + this.nick = nick; + this.host = host; + this.channels = channels; + this.decoder = decoder; + } + + @Override + public Firehose connect() throws IOException + { + final IRCApi irc = new IRCApiImpl(false); + final LinkedBlockingQueue> queue = new LinkedBlockingQueue>(); + + irc.addListener(new VariousMessageListenerAdapter() { + @Override + public void onChannelMessage(ChannelPrivMsg aMsg) + { + try { + queue.put(Pair.of(DateTime.now(), aMsg)); + } catch(InterruptedException e) { + throw new RuntimeException("interrupted adding message to queue", e); + } + } + }); + + log.info("connecting to irc server [%s]", host); + irc.connect( + new IServerParameters() + { + @Override + public String getNickname() + { + return nick; + } + + @Override + public List getAlternativeNicknames() + { + return Lists.newArrayList(nick + "_", + nick + "__", + nick + "___"); + } + + @Override + public String getIdent() + { + return "druid"; + } + + @Override + public String getRealname() + { + return nick; + } + + @Override + public IRCServer getServer() + { + return new IRCServer(host, false); + } + }, + new Callback() + { + @Override + public void onSuccess(IIRCState aObject) + { + log.info("irc connection to server [%s] established", host); + for(String chan : channels) { + log.info("Joining channel %s", chan); + irc.joinChannel(chan); + } + } + + @Override + public void onFailure(Exception e) + { + log.error(e, "Unable to connect to irc server [%s]", host); + throw new RuntimeException("Unable to connect to server", e); + } + }); + + + return new Firehose() + { + InputRow nextRow = null; + + @Override + public boolean hasMore() + { + try { + while(true) { + Pair nextMsg = queue.take(); + try { + nextRow = decoder.decodeMessage(nextMsg.lhs, nextMsg.rhs.getChannelName(), nextMsg.rhs.getText()); + if(nextRow != null) return true; + } + catch (IllegalArgumentException iae) { + log.debug("ignoring invalid message in channel [%s]", nextMsg.rhs.getChannelName()); + } + } + } + catch(InterruptedException e) { + Thread.interrupted(); + throw new RuntimeException("interrupted retrieving elements from queue", e); + } + } + + @Override + public InputRow nextRow() + { + return nextRow; + } + + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + // nothing to see here + } + }; + } + + @Override + public void close() throws IOException + { + log.info("disconnecting from irc server [%s]", host); + irc.disconnect(""); + } + }; + } +} + diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java new file mode 100644 index 00000000000..b1ecf91554e --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/WikipediaIrcDecoder.java @@ -0,0 +1,222 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.maxmind.geoip2.DatabaseReader; +import com.maxmind.geoip2.exception.GeoIp2Exception; +import com.maxmind.geoip2.model.Omni; +import com.metamx.common.logger.Logger; +import com.metamx.druid.input.InputRow; +import org.apache.commons.io.FileUtils; +import org.joda.time.DateTime; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.zip.GZIPInputStream; + +class WikipediaIrcDecoder implements IrcDecoder +{ + static final Logger log = new Logger(WikipediaIrcDecoder.class); + + final DatabaseReader geoLookup; + + static final Pattern pattern = Pattern.compile( + "\\x0314\\[\\[\\x0307(.+?)\\x0314\\]\\]\\x034 (.*?)\\x0310.*\\x0302(http.+?)\\x03.+\\x0303(.+?)\\x03.+\\x03 (\\(([+-]\\d+)\\).*|.+) \\x0310(.+)\\x03" + ); + static final Pattern ipPattern = Pattern.compile("\\d+.\\d+.\\d+.\\d+"); + static final Pattern shortnamePattern = Pattern.compile("#(\\w\\w)\\..*"); + + static final List dimensionList = Lists.newArrayList( + "page", + "language", + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city" + ); + + final Map> namespaces; + + public WikipediaIrcDecoder( Map> namespaces) { + this(namespaces, null); + } + + @JsonCreator + public WikipediaIrcDecoder(@JsonProperty("namespaces") Map> namespaces, + @JsonProperty("geoIpDatabase") String geoIpDatabase) + { + if(namespaces == null) { + namespaces = Maps.newHashMap(); + } + this.namespaces = namespaces; + + + File geoDb; + if(geoIpDatabase != null) { + geoDb = new File(geoIpDatabase); + } else { + try { + String tmpDir = System.getProperty("java.io.tmpdir"); + geoDb = new File(tmpDir, this.getClass().getCanonicalName() + ".GeoLite2-City.mmdb"); + if(!geoDb.exists()) { + log.info("Downloading geo ip database to [%s]", geoDb); + + FileUtils.copyInputStreamToFile( + new GZIPInputStream( + new URL("http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz").openStream() + ), + geoDb + ); + } + } catch(IOException e) { + throw new RuntimeException("Unable to download geo ip database [%s]", e); + } + } + try { + geoLookup = new DatabaseReader(geoDb); + } catch(IOException e) { + throw new RuntimeException("Unable to open geo ip lookup database", e); + } + } + + @Override + public InputRow decodeMessage(final DateTime timestamp, String channel, String msg) + { + final Map dimensions = Maps.newHashMap(); + final Map metrics = Maps.newHashMap(); + + Matcher m = pattern.matcher(msg); + if(!m.matches()) { + throw new IllegalArgumentException("Invalid input format"); + } + + Matcher shortname = shortnamePattern.matcher(channel); + if(shortname.matches()) { + dimensions.put("language", shortname.group(1)); + } + + String page = m.group(1); + String pageUrl = page.replaceAll("\\s", "_"); + + dimensions.put("page", pageUrl); + + String user = m.group(4); + Matcher ipMatch = ipPattern.matcher(user); + boolean anonymous = ipMatch.matches(); + if(anonymous) { + try { + final InetAddress ip = InetAddress.getByName(ipMatch.group()); + final Omni lookup = geoLookup.omni(ip); + + dimensions.put("continent", lookup.getContinent().getName()); + dimensions.put("country", lookup.getCountry().getName()); + dimensions.put("region", lookup.getMostSpecificSubdivision().getName()); + dimensions.put("city", lookup.getCity().getName()); + } catch(UnknownHostException e) { + log.error(e, "invalid ip [%s]", ipMatch.group()); + } catch(IOException e) { + log.error(e, "error looking up geo ip"); + } catch(GeoIp2Exception e) { + log.error(e, "error looking up geo ip"); + } + } + dimensions.put("user", user); + + final String flags = m.group(2); + dimensions.put("unpatrolled", Boolean.toString(flags.contains("!"))); + dimensions.put("newPage", Boolean.toString(flags.contains("N"))); + dimensions.put("robot", Boolean.toString(flags.contains("B"))); + + dimensions.put("anonymous", Boolean.toString(anonymous)); + + String[] parts = page.split(":"); + if(parts.length > 1 && !parts[1].startsWith(" ")) { + Map channelNamespaces = namespaces.get(channel); + if(channelNamespaces != null && channelNamespaces.containsKey(parts[0])) { + dimensions.put("namespace", channelNamespaces.get(parts[0])); + } else { + dimensions.put("namespace", "wikipedia"); + } + } + else { + dimensions.put("namespace", "article"); + } + + float delta = m.group(6) != null ? Float.parseFloat(m.group(6)) : 0; + metrics.put("delta", delta); + metrics.put("added", Math.max(delta, 0)); + metrics.put("deleted", Math.min(delta, 0)); + + return new InputRow() + { + @Override + public List getDimensions() + { + return dimensionList; + } + + @Override + public long getTimestampFromEpoch() + { + return timestamp.getMillis(); + } + + @Override + public List getDimension(String dimension) + { + return ImmutableList.of(dimensions.get(dimension)); + } + + @Override + public float getFloatMetric(String metric) + { + return metrics.get(metric); + } + + @Override + public String toString() + { + return "WikipediaRow{" + + "timestamp=" + timestamp + + ", dimensions=" + dimensions + + ", metrics=" + metrics + + '}'; + } + }; + } +}