mirror of https://github.com/apache/druid.git
Merge pull request #219 from metamx/wikipedia-firehose
Wikipedia firehose
This commit is contained in:
commit
b4310d6cbf
24
pom.xml
24
pom.xml
|
@ -182,37 +182,37 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
<artifactId>jackson-annotations</artifactId>
|
<artifactId>jackson-annotations</artifactId>
|
||||||
<version>2.1.4</version>
|
<version>2.2.2</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
<artifactId>jackson-core</artifactId>
|
<artifactId>jackson-core</artifactId>
|
||||||
<version>2.1.4</version>
|
<version>2.2.2</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
<artifactId>jackson-databind</artifactId>
|
<artifactId>jackson-databind</artifactId>
|
||||||
<version>2.1.4</version>
|
<version>2.2.2</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||||
<artifactId>jackson-datatype-guava</artifactId>
|
<artifactId>jackson-datatype-guava</artifactId>
|
||||||
<version>2.1.2</version>
|
<version>2.2.2</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||||
<artifactId>jackson-datatype-joda</artifactId>
|
<artifactId>jackson-datatype-joda</artifactId>
|
||||||
<version>2.1.2</version>
|
<version>2.2.2</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
||||||
<artifactId>jackson-dataformat-smile</artifactId>
|
<artifactId>jackson-dataformat-smile</artifactId>
|
||||||
<version>2.1.4</version>
|
<version>2.2.2</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.fasterxml.jackson.jaxrs</groupId>
|
<groupId>com.fasterxml.jackson.jaxrs</groupId>
|
||||||
<artifactId>jackson-jaxrs-json-provider</artifactId>
|
<artifactId>jackson-jaxrs-json-provider</artifactId>
|
||||||
<version>2.1.4</version>
|
<version>2.2.2</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>javax.inject</groupId>
|
<groupId>javax.inject</groupId>
|
||||||
|
@ -365,6 +365,16 @@
|
||||||
<version>${apache.curator.version}</version>
|
<version>${apache.curator.version}</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.ircclouds.irc</groupId>
|
||||||
|
<artifactId>irc-api</artifactId>
|
||||||
|
<version>1.0-0011</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.maxmind.geoip2</groupId>
|
||||||
|
<artifactId>geoip2</artifactId>
|
||||||
|
<version>0.4.0</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</dependencyManagement>
|
</dependencyManagement>
|
||||||
|
|
|
@ -147,6 +147,14 @@
|
||||||
<artifactId>java-xmlbuilder</artifactId>
|
<artifactId>java-xmlbuilder</artifactId>
|
||||||
<optional>true</optional>
|
<optional>true</optional>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.ircclouds.irc</groupId>
|
||||||
|
<artifactId>irc-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.maxmind.geoip2</groupId>
|
||||||
|
<artifactId>geoip2</artifactId>
|
||||||
|
</dependency>
|
||||||
<!-- Dependencies required for jets3t -->
|
<!-- Dependencies required for jets3t -->
|
||||||
|
|
||||||
<!-- Tests -->
|
<!-- Tests -->
|
||||||
|
|
|
@ -29,7 +29,8 @@ import java.io.IOException;
|
||||||
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class),
|
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class),
|
||||||
@JsonSubTypes.Type(name = "rabbitmq", value = RabbitMQFirehoseFactory.class),
|
@JsonSubTypes.Type(name = "rabbitmq", value = RabbitMQFirehoseFactory.class),
|
||||||
@JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class),
|
@JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class),
|
||||||
@JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class)
|
@JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class),
|
||||||
|
@JsonSubTypes.Type(name = "irc", value = IrcFirehoseFactory.class)
|
||||||
})
|
})
|
||||||
public interface FirehoseFactory
|
public interface FirehoseFactory
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
package com.metamx.druid.realtime.firehose;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
|
import com.metamx.druid.input.InputRow;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||||
|
@JsonSubTypes({
|
||||||
|
@JsonSubTypes.Type(name = "wikipedia", value = WikipediaIrcDecoder.class)
|
||||||
|
})
|
||||||
|
public interface IrcDecoder
|
||||||
|
{
|
||||||
|
public InputRow decodeMessage(DateTime timestamp, String channel, String msg);
|
||||||
|
}
|
|
@ -0,0 +1,246 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.realtime.firehose;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.ircclouds.irc.api.Callback;
|
||||||
|
import com.ircclouds.irc.api.IRCApi;
|
||||||
|
import com.ircclouds.irc.api.IRCApiImpl;
|
||||||
|
import com.ircclouds.irc.api.IServerParameters;
|
||||||
|
import com.ircclouds.irc.api.domain.IRCServer;
|
||||||
|
import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg;
|
||||||
|
import com.ircclouds.irc.api.listeners.VariousMessageListenerAdapter;
|
||||||
|
import com.ircclouds.irc.api.state.IIRCState;
|
||||||
|
import com.metamx.common.Pair;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import com.metamx.druid.input.InputRow;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p><b>Example Usage</b></p>
|
||||||
|
*
|
||||||
|
* <p>Decoder definition: <code>wikipedia-decoder.json</code></p>
|
||||||
|
* <pre>{@code
|
||||||
|
*
|
||||||
|
* {
|
||||||
|
* "type": "wikipedia",
|
||||||
|
* "namespaces": {
|
||||||
|
* "#en.wikipedia": {
|
||||||
|
* "": "main",
|
||||||
|
* "Category": "category",
|
||||||
|
* "Template talk": "template talk",
|
||||||
|
* "Help talk": "help talk",
|
||||||
|
* "Media": "media",
|
||||||
|
* "MediaWiki talk": "mediawiki talk",
|
||||||
|
* "File talk": "file talk",
|
||||||
|
* "MediaWiki": "mediawiki",
|
||||||
|
* "User": "user",
|
||||||
|
* "File": "file",
|
||||||
|
* "User talk": "user talk",
|
||||||
|
* "Template": "template",
|
||||||
|
* "Help": "help",
|
||||||
|
* "Special": "special",
|
||||||
|
* "Talk": "talk",
|
||||||
|
* "Category talk": "category talk"
|
||||||
|
* }
|
||||||
|
* },
|
||||||
|
* "geoIpDatabase": "path/to/GeoLite2-City.mmdb"
|
||||||
|
* }
|
||||||
|
* }</pre>
|
||||||
|
*
|
||||||
|
* <p><b>Example code:</b></p>
|
||||||
|
* <pre>{@code
|
||||||
|
* IrcDecoder wikipediaDecoder = new ObjectMapper().readValue(
|
||||||
|
* new File("wikipedia-decoder.json"),
|
||||||
|
* IrcDecoder.class
|
||||||
|
* );
|
||||||
|
*
|
||||||
|
* IrcFirehoseFactory factory = new IrcFirehoseFactory(
|
||||||
|
* "wiki123",
|
||||||
|
* "irc.wikimedia.org",
|
||||||
|
* Lists.newArrayList(
|
||||||
|
* "#en.wikipedia",
|
||||||
|
* "#fr.wikipedia",
|
||||||
|
* "#de.wikipedia",
|
||||||
|
* "#ja.wikipedia"
|
||||||
|
* ),
|
||||||
|
* wikipediaDecoder
|
||||||
|
* );
|
||||||
|
* }</pre>
|
||||||
|
*/
|
||||||
|
public class IrcFirehoseFactory implements FirehoseFactory
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(IrcFirehoseFactory.class);
|
||||||
|
|
||||||
|
private final String nick;
|
||||||
|
private final String host;
|
||||||
|
private final List<String> channels;
|
||||||
|
private final IrcDecoder decoder;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public IrcFirehoseFactory(
|
||||||
|
@JsonProperty String nick,
|
||||||
|
@JsonProperty String host,
|
||||||
|
@JsonProperty List<String> channels,
|
||||||
|
@JsonProperty IrcDecoder decoder
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.nick = nick;
|
||||||
|
this.host = host;
|
||||||
|
this.channels = channels;
|
||||||
|
this.decoder = decoder;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Firehose connect() throws IOException
|
||||||
|
{
|
||||||
|
final IRCApi irc = new IRCApiImpl(false);
|
||||||
|
final LinkedBlockingQueue<Pair<DateTime, ChannelPrivMsg>> queue = new LinkedBlockingQueue<Pair<DateTime, ChannelPrivMsg>>();
|
||||||
|
|
||||||
|
irc.addListener(new VariousMessageListenerAdapter() {
|
||||||
|
@Override
|
||||||
|
public void onChannelMessage(ChannelPrivMsg aMsg)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
queue.put(Pair.of(DateTime.now(), aMsg));
|
||||||
|
} catch(InterruptedException e) {
|
||||||
|
throw new RuntimeException("interrupted adding message to queue", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
log.info("connecting to irc server [%s]", host);
|
||||||
|
irc.connect(
|
||||||
|
new IServerParameters()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public String getNickname()
|
||||||
|
{
|
||||||
|
return nick;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getAlternativeNicknames()
|
||||||
|
{
|
||||||
|
return Lists.newArrayList(nick + "_",
|
||||||
|
nick + "__",
|
||||||
|
nick + "___");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getIdent()
|
||||||
|
{
|
||||||
|
return "druid";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getRealname()
|
||||||
|
{
|
||||||
|
return nick;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IRCServer getServer()
|
||||||
|
{
|
||||||
|
return new IRCServer(host, false);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
new Callback<IIRCState>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onSuccess(IIRCState aObject)
|
||||||
|
{
|
||||||
|
log.info("irc connection to server [%s] established", host);
|
||||||
|
for(String chan : channels) {
|
||||||
|
log.info("Joining channel %s", chan);
|
||||||
|
irc.joinChannel(chan);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e)
|
||||||
|
{
|
||||||
|
log.error(e, "Unable to connect to irc server [%s]", host);
|
||||||
|
throw new RuntimeException("Unable to connect to server", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
return new Firehose()
|
||||||
|
{
|
||||||
|
InputRow nextRow = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasMore()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
while(true) {
|
||||||
|
Pair<DateTime, ChannelPrivMsg> nextMsg = queue.take();
|
||||||
|
try {
|
||||||
|
nextRow = decoder.decodeMessage(nextMsg.lhs, nextMsg.rhs.getChannelName(), nextMsg.rhs.getText());
|
||||||
|
if(nextRow != null) return true;
|
||||||
|
}
|
||||||
|
catch (IllegalArgumentException iae) {
|
||||||
|
log.debug("ignoring invalid message in channel [%s]", nextMsg.rhs.getChannelName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch(InterruptedException e) {
|
||||||
|
Thread.interrupted();
|
||||||
|
throw new RuntimeException("interrupted retrieving elements from queue", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InputRow nextRow()
|
||||||
|
{
|
||||||
|
return nextRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable commit()
|
||||||
|
{
|
||||||
|
return new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
// nothing to see here
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException
|
||||||
|
{
|
||||||
|
log.info("disconnecting from irc server [%s]", host);
|
||||||
|
irc.disconnect("");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,222 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.realtime.firehose;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.maxmind.geoip2.DatabaseReader;
|
||||||
|
import com.maxmind.geoip2.exception.GeoIp2Exception;
|
||||||
|
import com.maxmind.geoip2.model.Omni;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import com.metamx.druid.input.InputRow;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.zip.GZIPInputStream;
|
||||||
|
|
||||||
|
class WikipediaIrcDecoder implements IrcDecoder
|
||||||
|
{
|
||||||
|
static final Logger log = new Logger(WikipediaIrcDecoder.class);
|
||||||
|
|
||||||
|
final DatabaseReader geoLookup;
|
||||||
|
|
||||||
|
static final Pattern pattern = Pattern.compile(
|
||||||
|
"\\x0314\\[\\[\\x0307(.+?)\\x0314\\]\\]\\x034 (.*?)\\x0310.*\\x0302(http.+?)\\x03.+\\x0303(.+?)\\x03.+\\x03 (\\(([+-]\\d+)\\).*|.+) \\x0310(.+)\\x03"
|
||||||
|
);
|
||||||
|
static final Pattern ipPattern = Pattern.compile("\\d+.\\d+.\\d+.\\d+");
|
||||||
|
static final Pattern shortnamePattern = Pattern.compile("#(\\w\\w)\\..*");
|
||||||
|
|
||||||
|
static final List<String> dimensionList = Lists.newArrayList(
|
||||||
|
"page",
|
||||||
|
"language",
|
||||||
|
"user",
|
||||||
|
"unpatrolled",
|
||||||
|
"newPage",
|
||||||
|
"robot",
|
||||||
|
"anonymous",
|
||||||
|
"namespace",
|
||||||
|
"continent",
|
||||||
|
"country",
|
||||||
|
"region",
|
||||||
|
"city"
|
||||||
|
);
|
||||||
|
|
||||||
|
final Map<String, Map<String, String>> namespaces;
|
||||||
|
|
||||||
|
public WikipediaIrcDecoder( Map<String, Map<String, String>> namespaces) {
|
||||||
|
this(namespaces, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public WikipediaIrcDecoder(@JsonProperty("namespaces") Map<String, Map<String, String>> namespaces,
|
||||||
|
@JsonProperty("geoIpDatabase") String geoIpDatabase)
|
||||||
|
{
|
||||||
|
if(namespaces == null) {
|
||||||
|
namespaces = Maps.newHashMap();
|
||||||
|
}
|
||||||
|
this.namespaces = namespaces;
|
||||||
|
|
||||||
|
|
||||||
|
File geoDb;
|
||||||
|
if(geoIpDatabase != null) {
|
||||||
|
geoDb = new File(geoIpDatabase);
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
String tmpDir = System.getProperty("java.io.tmpdir");
|
||||||
|
geoDb = new File(tmpDir, this.getClass().getCanonicalName() + ".GeoLite2-City.mmdb");
|
||||||
|
if(!geoDb.exists()) {
|
||||||
|
log.info("Downloading geo ip database to [%s]", geoDb);
|
||||||
|
|
||||||
|
FileUtils.copyInputStreamToFile(
|
||||||
|
new GZIPInputStream(
|
||||||
|
new URL("http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz").openStream()
|
||||||
|
),
|
||||||
|
geoDb
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch(IOException e) {
|
||||||
|
throw new RuntimeException("Unable to download geo ip database [%s]", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
geoLookup = new DatabaseReader(geoDb);
|
||||||
|
} catch(IOException e) {
|
||||||
|
throw new RuntimeException("Unable to open geo ip lookup database", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InputRow decodeMessage(final DateTime timestamp, String channel, String msg)
|
||||||
|
{
|
||||||
|
final Map<String, String> dimensions = Maps.newHashMap();
|
||||||
|
final Map<String, Float> metrics = Maps.newHashMap();
|
||||||
|
|
||||||
|
Matcher m = pattern.matcher(msg);
|
||||||
|
if(!m.matches()) {
|
||||||
|
throw new IllegalArgumentException("Invalid input format");
|
||||||
|
}
|
||||||
|
|
||||||
|
Matcher shortname = shortnamePattern.matcher(channel);
|
||||||
|
if(shortname.matches()) {
|
||||||
|
dimensions.put("language", shortname.group(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
String page = m.group(1);
|
||||||
|
String pageUrl = page.replaceAll("\\s", "_");
|
||||||
|
|
||||||
|
dimensions.put("page", pageUrl);
|
||||||
|
|
||||||
|
String user = m.group(4);
|
||||||
|
Matcher ipMatch = ipPattern.matcher(user);
|
||||||
|
boolean anonymous = ipMatch.matches();
|
||||||
|
if(anonymous) {
|
||||||
|
try {
|
||||||
|
final InetAddress ip = InetAddress.getByName(ipMatch.group());
|
||||||
|
final Omni lookup = geoLookup.omni(ip);
|
||||||
|
|
||||||
|
dimensions.put("continent", lookup.getContinent().getName());
|
||||||
|
dimensions.put("country", lookup.getCountry().getName());
|
||||||
|
dimensions.put("region", lookup.getMostSpecificSubdivision().getName());
|
||||||
|
dimensions.put("city", lookup.getCity().getName());
|
||||||
|
} catch(UnknownHostException e) {
|
||||||
|
log.error(e, "invalid ip [%s]", ipMatch.group());
|
||||||
|
} catch(IOException e) {
|
||||||
|
log.error(e, "error looking up geo ip");
|
||||||
|
} catch(GeoIp2Exception e) {
|
||||||
|
log.error(e, "error looking up geo ip");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dimensions.put("user", user);
|
||||||
|
|
||||||
|
final String flags = m.group(2);
|
||||||
|
dimensions.put("unpatrolled", Boolean.toString(flags.contains("!")));
|
||||||
|
dimensions.put("newPage", Boolean.toString(flags.contains("N")));
|
||||||
|
dimensions.put("robot", Boolean.toString(flags.contains("B")));
|
||||||
|
|
||||||
|
dimensions.put("anonymous", Boolean.toString(anonymous));
|
||||||
|
|
||||||
|
String[] parts = page.split(":");
|
||||||
|
if(parts.length > 1 && !parts[1].startsWith(" ")) {
|
||||||
|
Map<String, String> channelNamespaces = namespaces.get(channel);
|
||||||
|
if(channelNamespaces != null && channelNamespaces.containsKey(parts[0])) {
|
||||||
|
dimensions.put("namespace", channelNamespaces.get(parts[0]));
|
||||||
|
} else {
|
||||||
|
dimensions.put("namespace", "wikipedia");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
dimensions.put("namespace", "article");
|
||||||
|
}
|
||||||
|
|
||||||
|
float delta = m.group(6) != null ? Float.parseFloat(m.group(6)) : 0;
|
||||||
|
metrics.put("delta", delta);
|
||||||
|
metrics.put("added", Math.max(delta, 0));
|
||||||
|
metrics.put("deleted", Math.min(delta, 0));
|
||||||
|
|
||||||
|
return new InputRow()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public List<String> getDimensions()
|
||||||
|
{
|
||||||
|
return dimensionList;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getTimestampFromEpoch()
|
||||||
|
{
|
||||||
|
return timestamp.getMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getDimension(String dimension)
|
||||||
|
{
|
||||||
|
return ImmutableList.of(dimensions.get(dimension));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getFloatMetric(String metric)
|
||||||
|
{
|
||||||
|
return metrics.get(metric);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return "WikipediaRow{" +
|
||||||
|
"timestamp=" + timestamp +
|
||||||
|
", dimensions=" + dimensions +
|
||||||
|
", metrics=" + metrics +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue