From 687c82daa882e7d5b494aff766a4913b4b10f632 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 11 Dec 2014 16:33:28 -0800 Subject: [PATCH] Added more Twitter fields to TwitterSpritzerFirehoseFactory * Now with GEOGRAPHY support! --- examples/bin/examples/twitter/query.body | 33 +++- examples/bin/examples/twitter/topN_query.body | 94 ++++++++++ .../examples/twitter/twitter_realtime.spec | 164 +++++++++++------- .../TwitterSpritzerFirehoseFactory.java | 114 ++++++++---- pom.xml | 2 +- 5 files changed, 299 insertions(+), 108 deletions(-) create mode 100644 examples/bin/examples/twitter/topN_query.body diff --git a/examples/bin/examples/twitter/query.body b/examples/bin/examples/twitter/query.body index b41ea3b46d5..76d3b9baae6 100644 --- a/examples/bin/examples/twitter/query.body +++ b/examples/bin/examples/twitter/query.body @@ -1,4 +1,31 @@ { - "queryType": "timeBoundary", - "dataSource": "twitterstream" -} + "description": "Simple data split up by hour", + "aggregations": [ + { + "name": "tweets", + "type": "longSum", + "fieldName": "tweets" + }, + { + "fieldName": "text_hll", + "name": "text_hll", + "type": "hyperUnique" + }, + { + "fieldName": "htags_hll", + "name": "htag_hll", + "type": "hyperUnique" + }, + { + "fieldName": "user_id_hll", + "name": "user_id_hll", + "type": "hyperUnique" + } + ], + "dataSource": "twitterstream", + "granularity": "hour", + "intervals": [ + "1970-01-01T00:00:00.000/2019-01-03T00:00:00.000" + ], + "queryType": "timeseries" +} \ No newline at end of file diff --git a/examples/bin/examples/twitter/topN_query.body b/examples/bin/examples/twitter/topN_query.body new file mode 100644 index 00000000000..fcd644a4f1c --- /dev/null +++ b/examples/bin/examples/twitter/topN_query.body @@ -0,0 +1,94 @@ +{ + "description": "Top 10 languages by count of tweets in the contiguous US", + "aggregations": [ + { + "fieldName": "tweets", + "name": "tweets", + "type": "longSum" + }, + { + "fieldName": "user_id_hll", + "name": "user_id_hll", + "type": "hyperUnique" + }, + { + "fieldName": "contributors_hll", + "name": "contributors_hll", + "type": "hyperUnique" + }, + { + "fieldName": "htags_hll", + "name": "htags_hll", + "type": "hyperUnique" + }, + { + "fieldName": "text_hll", + "name": "text_hll", + "type": "hyperUnique" + }, + { + "fieldName": "min_follower_count", + "name": "min_follower_count", + "type": "min" + }, + { + "fieldName": "max_follower_count", + "name": "max_follower_count", + "type": "max" + }, + { + "fieldName": "min_friends_count", + "name": "min_friends_count", + "type": "min" + }, + { + "fieldName": "max_friends_count", + "name": "max_friends_count", + "type": "max" + }, + { + "fieldName": "min_statuses_count", + "name": "min_statuses_count", + "type": "min" + }, + { + "fieldName": "max_statuses_count", + "name": "max_statuses_count", + "type": "max" + }, + { + "fieldName": "min_retweet_count", + "name": "min_retweet_count", + "type": "min" + }, + { + "fieldName": "max_retweet_count", + "name": "max_retweet_count", + "type": "max" + } + ], + "dataSource": "twitterstream", + "dimension": "lang", + "filter": { + "bound": { + "maxCoords": [ + 50, + -65 + ], + "minCoords": [ + 25, + -127 + ], + "type": "rectangular" + }, + "dimension": "geo", + "type": "spatial" + }, + "granularity": "all", + "intervals": [ + "2013-06-01T00:00/2020-01-01T00" + ], + "metric": "tweets", + "queryType": "topN", + "threshold": "10" +} \ No newline at end of file diff --git a/examples/bin/examples/twitter/twitter_realtime.spec b/examples/bin/examples/twitter/twitter_realtime.spec index a452c764d94..8a502ed5516 100644 --- a/examples/bin/examples/twitter/twitter_realtime.spec +++ b/examples/bin/examples/twitter/twitter_realtime.spec @@ -1,119 +1,151 @@ -[ - { +{ + "description": "Ingestion spec for Twitter spritzer. Dimension values taken from io.druid.examples.twitter.TwitterSpritzerFirehoseFactory", + "spec": { "dataSchema": { "dataSource": "twitterstream", - "parser": { - "parseSpec": { - "format": "json", - "timestampSpec": { - "column": "utcdt", - "format": "iso" - }, - "dimensionsSpec": { - "dimensions": [ - - ], - "dimensionExclusions": [ - - ], - "spatialDimensions": [ - - ] - } - } + "granularitySpec": { + "queryGranularity": "all", + "segmentGranularity": "hour", + "type": "uniform" }, "metricsSpec": [ { - "type": "count", - "name": "tweets" + "name": "tweets", + "type": "count" }, { - "type": "doubleSum", "fieldName": "follower_count", - "name": "total_follower_count" + "name": "total_follower_count", + "type": "doubleSum" }, { - "type": "doubleSum", "fieldName": "retweet_count", - "name": "total_retweet_count" + "name": "total_retweet_count", + "type": "doubleSum" }, { - "type": "doubleSum", "fieldName": "friends_count", - "name": "total_friends_count" + "name": "total_friends_count", + "type": "doubleSum" }, { - "type": "doubleSum", "fieldName": "statuses_count", - "name": "total_statuses_count" + "name": "total_statuses_count", + "type": "doubleSum" + }, + { + "fieldName": "text", + "name": "text_hll", + "type": "hyperUnique" + }, + { + "fieldName": "user_id", + "name": "user_id_hll", + "type": "hyperUnique" + }, + { + "fieldName": "contributors", + "name": "contributors_hll", + "type": "hyperUnique" + }, + { + "fieldName": "htags", + "name": "htags_hll", + "type": "hyperUnique" }, { - "type": "min", "fieldName": "follower_count", - "name": "min_follower_count" + "name": "min_follower_count", + "type": "min" }, { - "type": "max", "fieldName": "follower_count", - "name": "max_follower_count" + "name": "max_follower_count", + "type": "max" }, { - "type": "min", "fieldName": "friends_count", - "name": "min_friends_count" + "name": "min_friends_count", + "type": "min" }, { - "type": "max", "fieldName": "friends_count", - "name": "max_friends_count" + "name": "max_friends_count", + "type": "max" }, { - "type": "min", "fieldName": "statuses_count", - "name": "min_statuses_count" + "name": "min_statuses_count", + "type": "min" }, { - "type": "max", "fieldName": "statuses_count", - "name": "max_statuses_count" + "name": "max_statuses_count", + "type": "max" }, { - "type": "min", "fieldName": "retweet_count", - "name": "min_retweet_count" + "name": "min_retweet_count", + "type": "min" }, { - "type": "max", "fieldName": "retweet_count", - "name": "max_retweet_count" + "name": "max_retweet_count", + "type": "max" } ], - "granularitySpec": { - "type": "uniform", - "segmentGranularity": "DAY", - "queryGranularity": "NONE" + "parser": { + "parseSpec": { + "dimensionsSpec": { + "dimensions": [ + "text", + "htags", + "contributors", + "lat", + "lon", + "retweet_count", + "follower_count", + "friendscount", + "lang", + "utc_offset", + "statuses_count", + "user_id", + "ts" + ], + "dimensionExclusions": [ + ], + "spatialDimensions": [ + { + "dimName": "geo", + "dims": [ + "lat", + "lon" + ] + } + ] + }, + "format": "json", + "timestampSpec": { + "column": "ts", + "format": "millis" + } + } } }, "ioConfig": { - "type": "realtime", "firehose": { - "type": "twitzer", "maxEventCount": 500000, - "maxRunMinutes": 120 + "maxRunMinutes": 120, + "type": "twitzer" }, - "plumber": { - "type": "realtime" - } + "type": "realtime" }, "tuningConfig": { - "type": "realtime", + "intermediatePersistPeriod": "PT10m", "maxRowsInMemory": 500000, - "intermediatePersistPeriod": "PT2m", - "windowPeriod": "PT3m", - "basePersistDirectory": "\/tmp\/realtime\/basePersist", - "rejectionPolicy": { - "type": "messageTime" - } + "type": "realtime", + "windowPeriod": "PT10m" } - } -] + }, + "type": "index_realtime" +} \ No newline at end of file diff --git a/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java b/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java index 6e338a27939..6744616ae2f 100644 --- a/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java +++ b/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java @@ -1,6 +1,6 @@ /* * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -22,6 +22,8 @@ package io.druid.examples.twitter; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.metamx.common.logger.Logger; import io.druid.data.input.Firehose; @@ -30,6 +32,7 @@ import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.InputRowParser; import twitter4j.ConnectionLifeCycleListener; +import twitter4j.GeoLocation; import twitter4j.HashtagEntity; import twitter4j.StallWarning; import twitter4j.Status; @@ -39,12 +42,13 @@ import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.User; +import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -58,25 +62,27 @@ import static java.lang.Thread.sleep; * with timestamps along with ??. * The generated tuples have the form (timestamp, ????) * where the timestamp is from the twitter event. - * + *

* Example spec file: - * + *

* Example query using POST to /druid/v2/?w (where w is an arbitrary parameter and the date and time * is UTC): - * + *

* Notes on twitter.com HTTP (REST) API: v1.0 will be disabled around 2013-03 so v1.1 should be used; * twitter4j 3.0 (not yet released) will support the v1.1 api. * Specifically, we should be using https://stream.twitter.com/1.1/statuses/sample.json * See: http://jira.twitter4j.org/browse/TFJ-186 + *

+ * Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString() + * can have number format exceptions), so it might be necessary to extract raw json and process it + * separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON(); + * com.fasterxml.jackson.databind.ObjectMapper should be used to parse. * - * Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString() - * can have number format exceptions), so it might be necessary to extract raw json and process it - * separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON(); - * com.fasterxml.jackson.databind.ObjectMapper should be used to parse. * @author pbaclace */ @JsonTypeName("twitzer") -public class TwitterSpritzerFirehoseFactory implements FirehoseFactory { +public class TwitterSpritzerFirehoseFactory implements FirehoseFactory +{ private static final Logger log = new Logger(TwitterSpritzerFirehoseFactory.class); /** * max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent @@ -107,7 +113,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory queue = new ArrayBlockingQueue(QUEUE_SIZE); - final LinkedList dimensions = new LinkedList(); final long startMsec = System.currentTimeMillis(); - dimensions.add("htags"); - dimensions.add("lang"); - dimensions.add("utc_offset"); - // // set up Twitter Spritzer // twitterStream = new TwitterStreamFactory().getInstance(); twitterStream.addConnectionLifeCycleListener(connectionLifeCycleListener); - statusListener = new StatusListener() { // This is what really gets called to deliver stuff from twitter4j + statusListener = new StatusListener() + { // This is what really gets called to deliver stuff from twitter4j @Override public void onStatus(Status status) { @@ -160,7 +163,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory theMap = new HashMap(2); + private final Map theMap = new TreeMap<>(); // DIY json parsing // private final ObjectMapper omapper = new ObjectMapper(); private boolean maxTimeReached() @@ -253,7 +260,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory 0) { - List hashTags = Lists.newArrayListWithExpectedSize(hts.length); - for (HashtagEntity ht : hts) { - hashTags.add(ht.getText()); - } + String text = status.getText(); + theMap.put("text", (null == text) ? "" : text); + theMap.put( + "htags", (hts.length > 0) ? Lists.transform( + Arrays.asList(hts), new Function() + { + @Nullable + @Override + public String apply(HashtagEntity input) + { + return input.getText(); + } + } + ) : ImmutableList.of() + ); - theMap.put("htags", Arrays.asList(hashTags.get(0))); + long[] lcontrobutors = status.getContributors(); + List contributors = new ArrayList<>(); + for (long contrib : lcontrobutors) { + contributors.add(String.format("%d", contrib)); + } + theMap.put("contributors", contributors); + + GeoLocation geoLocation = status.getGeoLocation(); + if (null != geoLocation) { + double lat = status.getGeoLocation().getLatitude(); + double lon = status.getGeoLocation().getLongitude(); + theMap.put("lat", lat); + theMap.put("lon", lon); + } else { + theMap.put("lat", null); + theMap.put("lon", null); } long retweetCount = status.getRetweetCount(); theMap.put("retweet_count", retweetCount); User user = status.getUser(); - if (user != null) { - theMap.put("follower_count", user.getFollowersCount()); - theMap.put("friends_count", user.getFriendsCount()); - theMap.put("lang", user.getLang()); - theMap.put("utc_offset", user.getUtcOffset()); // resolution in seconds, -1 if not available? - theMap.put("statuses_count", user.getStatusesCount()); - } + final boolean hasUser = (null != user); + theMap.put("follower_count", hasUser ? user.getFollowersCount() : 0); + theMap.put("friends_count", hasUser ? user.getFriendsCount() : 0); + theMap.put("lang", hasUser ? user.getLang() : ""); + theMap.put("utc_offset", hasUser ? user.getUtcOffset() : -1); // resolution in seconds, -1 if not available? + theMap.put("statuses_count", hasUser ? user.getStatusesCount() : 0); + theMap.put("user_id", hasUser ? String.format("%d", user.getId()) : ""); + + theMap.put("ts",status.getCreatedAt().getTime()); + + List dimensions = Lists.newArrayList(theMap.keySet()); return new MapBasedInputRow(status.getCreatedAt().getTime(), dimensions, theMap); } diff --git a/pom.xml b/pom.xml index 577ddc38ff6..26b26ad37be 100644 --- a/pom.xml +++ b/pom.xml @@ -94,7 +94,7 @@ com.metamx bytebuffer-collections - 0.1.1 + 0.1.2 com.metamx