mirror of https://github.com/apache/druid.git
add ip geo lookup
This commit is contained in:
parent
1cc1d0be5f
commit
6735ae4ecd
14
pom.xml
14
pom.xml
|
@ -182,37 +182,37 @@
|
|||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-annotations</artifactId>
|
||||
<version>2.1.4</version>
|
||||
<version>2.2.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<version>2.1.4</version>
|
||||
<version>2.2.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>2.1.4</version>
|
||||
<version>2.2.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||
<artifactId>jackson-datatype-guava</artifactId>
|
||||
<version>2.1.2</version>
|
||||
<version>2.2.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||
<artifactId>jackson-datatype-joda</artifactId>
|
||||
<version>2.1.2</version>
|
||||
<version>2.2.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
||||
<artifactId>jackson-dataformat-smile</artifactId>
|
||||
<version>2.1.4</version>
|
||||
<version>2.2.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.jaxrs</groupId>
|
||||
<artifactId>jackson-jaxrs-json-provider</artifactId>
|
||||
<version>2.1.4</version>
|
||||
<version>2.2.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.inject</groupId>
|
||||
|
|
|
@ -151,6 +151,11 @@
|
|||
<groupId>com.ircclouds.irc</groupId>
|
||||
<artifactId>irc-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.maxmind.geoip2</groupId>
|
||||
<artifactId>geoip2</artifactId>
|
||||
<version>0.4.0</version>
|
||||
</dependency>
|
||||
<!-- Dependencies required for jets3t -->
|
||||
|
||||
<!-- Tests -->
|
||||
|
|
|
@ -3,7 +3,6 @@ package com.metamx.druid.realtime.firehose;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.ircclouds.irc.api.Callback;
|
||||
import com.ircclouds.irc.api.IRCApi;
|
||||
import com.ircclouds.irc.api.IRCApiImpl;
|
||||
|
@ -90,7 +89,7 @@ public class IrcFirehoseFactory implements FirehoseFactory
|
|||
public void onChannelMessage(ChannelPrivMsg aMsg)
|
||||
{
|
||||
try {
|
||||
queue.put(Pair.of(new DateTime(), aMsg));
|
||||
queue.put(Pair.of(DateTime.now(), aMsg));
|
||||
} catch(InterruptedException e) {
|
||||
throw new RuntimeException("interrupted adding message to queue", e);
|
||||
}
|
||||
|
|
|
@ -4,16 +4,31 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.maxmind.geoip2.DatabaseReader;
|
||||
import com.maxmind.geoip2.exception.GeoIp2Exception;
|
||||
import com.maxmind.geoip2.model.Omni;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.URL;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
class WikipediaIrcDecoder implements IrcDecoder
|
||||
{
|
||||
static final Logger log = new Logger(WikipediaIrcDecoder.class);
|
||||
|
||||
final DatabaseReader geoLookup;
|
||||
|
||||
static final Pattern pattern = Pattern.compile(
|
||||
"\\x0314\\[\\[\\x0307(.+?)\\x0314\\]\\]\\x034 (.*?)\\x0310.*\\x0302(http.+?)\\x03.+\\x0303(.+?)\\x03.+\\x03 (\\(([+-]\\d+)\\).*|.+) \\x0310(.+)\\x03"
|
||||
);
|
||||
|
@ -29,18 +44,51 @@ class WikipediaIrcDecoder implements IrcDecoder
|
|||
"robot",
|
||||
"anonymous",
|
||||
"namespace",
|
||||
"added",
|
||||
"delete",
|
||||
"delta"
|
||||
"continent",
|
||||
"country",
|
||||
"region",
|
||||
"city"
|
||||
);
|
||||
|
||||
final Map<String, String> namespaces;
|
||||
|
||||
public WikipediaIrcDecoder(Map<String, String> namespaces) {
|
||||
this(namespaces, null);
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public WikipediaIrcDecoder(@JsonProperty Map<String, String> namespaces)
|
||||
public WikipediaIrcDecoder(@JsonProperty Map<String, String> namespaces,
|
||||
@JsonProperty String geoDbFile)
|
||||
{
|
||||
if(namespaces == null) namespaces = Maps.newHashMap();
|
||||
this.namespaces = namespaces;
|
||||
|
||||
|
||||
File geoDb;
|
||||
if(geoDbFile != null) {
|
||||
geoDb = new File(geoDbFile);
|
||||
} else {
|
||||
try {
|
||||
geoDb = File.createTempFile("geoip", null);
|
||||
geoDb.deleteOnExit();
|
||||
|
||||
log.info("Downloading geo ip database to [%s]", geoDb);
|
||||
|
||||
FileUtils.copyInputStreamToFile(
|
||||
new GZIPInputStream(
|
||||
new URL("http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz").openStream()
|
||||
),
|
||||
geoDb
|
||||
);
|
||||
} catch(IOException e) {
|
||||
throw new RuntimeException("Unable to download geo ip database [%s]", e);
|
||||
}
|
||||
}
|
||||
try {
|
||||
geoLookup = new DatabaseReader(geoDb);
|
||||
} catch(IOException e) {
|
||||
throw new RuntimeException("Unable to open geo ip lookup database", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -65,7 +113,25 @@ class WikipediaIrcDecoder implements IrcDecoder
|
|||
dimensions.put("page", pageUrl);
|
||||
|
||||
String user = m.group(4);
|
||||
boolean anonymous = ipPattern.matcher(user).matches();
|
||||
Matcher ipMatch = ipPattern.matcher(user);
|
||||
boolean anonymous = ipMatch.matches();
|
||||
if(anonymous) {
|
||||
try {
|
||||
final InetAddress ip = InetAddress.getByName(ipMatch.group());
|
||||
final Omni lookup = geoLookup.omni(ip);
|
||||
|
||||
dimensions.put("continent", lookup.getContinent().getName());
|
||||
dimensions.put("country", lookup.getCountry().getName());
|
||||
dimensions.put("region", lookup.getMostSpecificSubdivision().getName());
|
||||
dimensions.put("city", lookup.getCity().getName());
|
||||
} catch(UnknownHostException e) {
|
||||
log.error(e, "invalid ip [%s]", ipMatch.group());
|
||||
} catch(IOException e) {
|
||||
log.error(e, "error looking up geo ip");
|
||||
} catch(GeoIp2Exception e) {
|
||||
log.error(e, "error looking up geo ip");
|
||||
}
|
||||
}
|
||||
dimensions.put("user", user);
|
||||
|
||||
final String flags = m.group(2);
|
||||
|
|
Loading…
Reference in New Issue