Modified the Twitter firehose to process more properties

Add dimensions such as screen name, retweet and verified booleans,
source, location, and originator information to support additional
analytics.
This commit is contained in:
David Lim 2015-09-25 00:21:15 -06:00
parent d60610ced3
commit f42f6247ee
2 changed files with 67 additions and 3 deletions

View File

@ -16,13 +16,22 @@
"contributors", "contributors",
"lat", "lat",
"lon", "lon",
"source",
"retweet",
"retweet_count", "retweet_count",
"originator_screen_name",
"originator_follower_count",
"originator_friends_count",
"originator_verified",
"follower_count", "follower_count",
"friendscount", "friends_count",
"lang", "lang",
"utc_offset", "utc_offset",
"statuses_count", "statuses_count",
"user_id", "user_id",
"screen_name",
"location",
"verified",
"ts" "ts"
], ],
"dimensionExclusions": [ "dimensionExclusions": [
@ -65,6 +74,16 @@
"name": "total_statuses_count", "name": "total_statuses_count",
"type": "doubleSum" "type": "doubleSum"
}, },
{
"fieldName": "originator_follower_count",
"name": "total_originator_follower_count",
"type": "doubleSum"
},
{
"fieldName": "originator_friends_count",
"name": "total_originator_friends_count",
"type": "doubleSum"
},
{ {
"fieldName": "text", "fieldName": "text",
"name": "text_hll", "name": "text_hll",
@ -124,6 +143,26 @@
"fieldName": "retweet_count", "fieldName": "retweet_count",
"name": "max_retweet_count", "name": "max_retweet_count",
"type": "max" "type": "max"
},
{
"fieldName": "originator_follower_count",
"name": "min_originator_follower_count",
"type": "min"
},
{
"fieldName": "originator_follower_count",
"name": "max_originator_follower_count",
"type": "max"
},
{
"fieldName": "originator_friends_count",
"name": "min_originator_friends_count",
"type": "min"
},
{
"fieldName": "originator_friends_count",
"name": "max_originator_friends_count",
"type": "max"
} }
], ],
"granularitySpec": { "granularitySpec": {

View File

@ -50,6 +50,8 @@ import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.lang.Thread.sleep; import static java.lang.Thread.sleep;
@ -82,6 +84,8 @@ import static java.lang.Thread.sleep;
public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowParser> public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowParser>
{ {
private static final Logger log = new Logger(TwitterSpritzerFirehoseFactory.class); private static final Logger log = new Logger(TwitterSpritzerFirehoseFactory.class);
private static final Pattern sourcePattern = Pattern.compile("<a[^>]*>(.*?)</a>", Pattern.CASE_INSENSITIVE);
/** /**
* max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent * max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent
* infinite space consumption or to prevent getting throttled at an inconvenient time * infinite space consumption or to prevent getting throttled at an inconvenient time
@ -278,6 +282,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
throw new RuntimeException("InterruptedException", e); throw new RuntimeException("InterruptedException", e);
} }
theMap.clear();
HashtagEntity[] hts = status.getHashtagEntities(); HashtagEntity[] hts = status.getHashtagEntities();
String text = status.getText(); String text = status.getText();
theMap.put("text", (null == text) ? "" : text); theMap.put("text", (null == text) ? "" : text);
@ -313,8 +319,24 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
theMap.put("lon", null); theMap.put("lon", null);
} }
long retweetCount = status.getRetweetCount(); if (status.getSource() != null) {
theMap.put("retweet_count", retweetCount); Matcher m = sourcePattern.matcher(status.getSource());
theMap.put("source", m.find() ? m.group(1) : status.getSource());
}
theMap.put("retweet", status.isRetweet());
if (status.isRetweet()) {
Status original = status.getRetweetedStatus();
theMap.put("retweet_count", original.getRetweetCount());
User originator = original.getUser();
theMap.put("originator_screen_name", originator != null ? originator.getScreenName() : "");
theMap.put("originator_follower_count", originator != null ? originator.getFollowersCount() : "");
theMap.put("originator_friends_count", originator != null ? originator.getFriendsCount() : "");
theMap.put("originator_verified", originator != null ? originator.isVerified() : "");
}
User user = status.getUser(); User user = status.getUser();
final boolean hasUser = (null != user); final boolean hasUser = (null != user);
theMap.put("follower_count", hasUser ? user.getFollowersCount() : 0); theMap.put("follower_count", hasUser ? user.getFollowersCount() : 0);
@ -323,6 +345,9 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
theMap.put("utc_offset", hasUser ? user.getUtcOffset() : -1); // resolution in seconds, -1 if not available? theMap.put("utc_offset", hasUser ? user.getUtcOffset() : -1); // resolution in seconds, -1 if not available?
theMap.put("statuses_count", hasUser ? user.getStatusesCount() : 0); theMap.put("statuses_count", hasUser ? user.getStatusesCount() : 0);
theMap.put("user_id", hasUser ? String.format("%d", user.getId()) : ""); theMap.put("user_id", hasUser ? String.format("%d", user.getId()) : "");
theMap.put("screen_name", hasUser ? user.getScreenName() : "");
theMap.put("location", hasUser ? user.getLocation() : "");
theMap.put("verified", hasUser ? user.isVerified() : "");
theMap.put("ts",status.getCreatedAt().getTime()); theMap.put("ts",status.getCreatedAt().getTime());