Merge pull request #1776 from dclim/update-twitter-example

Modified the Twitter firehose to process more properties
This commit is contained in:
Gian Merlino 2015-09-25 07:45:31 -07:00
commit 4c72dabce9
2 changed files with 67 additions and 3 deletions

View File

@ -16,13 +16,22 @@
"contributors", "contributors",
"lat", "lat",
"lon", "lon",
"source",
"retweet",
"retweet_count", "retweet_count",
"originator_screen_name",
"originator_follower_count",
"originator_friends_count",
"originator_verified",
"follower_count", "follower_count",
"friendscount", "friends_count",
"lang", "lang",
"utc_offset", "utc_offset",
"statuses_count", "statuses_count",
"user_id", "user_id",
"screen_name",
"location",
"verified",
"ts" "ts"
], ],
"dimensionExclusions": [ "dimensionExclusions": [
@ -65,6 +74,16 @@
"name": "total_statuses_count", "name": "total_statuses_count",
"type": "doubleSum" "type": "doubleSum"
}, },
{
"fieldName": "originator_follower_count",
"name": "total_originator_follower_count",
"type": "doubleSum"
},
{
"fieldName": "originator_friends_count",
"name": "total_originator_friends_count",
"type": "doubleSum"
},
{ {
"fieldName": "text", "fieldName": "text",
"name": "text_hll", "name": "text_hll",
@ -124,6 +143,26 @@
"fieldName": "retweet_count", "fieldName": "retweet_count",
"name": "max_retweet_count", "name": "max_retweet_count",
"type": "max" "type": "max"
},
{
"fieldName": "originator_follower_count",
"name": "min_originator_follower_count",
"type": "min"
},
{
"fieldName": "originator_follower_count",
"name": "max_originator_follower_count",
"type": "max"
},
{
"fieldName": "originator_friends_count",
"name": "min_originator_friends_count",
"type": "min"
},
{
"fieldName": "originator_friends_count",
"name": "max_originator_friends_count",
"type": "max"
} }
], ],
"granularitySpec": { "granularitySpec": {

View File

@ -50,6 +50,8 @@ import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.lang.Thread.sleep; import static java.lang.Thread.sleep;
@ -82,6 +84,8 @@ import static java.lang.Thread.sleep;
public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowParser> public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowParser>
{ {
private static final Logger log = new Logger(TwitterSpritzerFirehoseFactory.class); private static final Logger log = new Logger(TwitterSpritzerFirehoseFactory.class);
private static final Pattern sourcePattern = Pattern.compile("<a[^>]*>(.*?)</a>", Pattern.CASE_INSENSITIVE);
/** /**
* max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent * max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent
* infinite space consumption or to prevent getting throttled at an inconvenient time * infinite space consumption or to prevent getting throttled at an inconvenient time
@ -278,6 +282,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
throw new RuntimeException("InterruptedException", e); throw new RuntimeException("InterruptedException", e);
} }
theMap.clear();
HashtagEntity[] hts = status.getHashtagEntities(); HashtagEntity[] hts = status.getHashtagEntities();
String text = status.getText(); String text = status.getText();
theMap.put("text", (null == text) ? "" : text); theMap.put("text", (null == text) ? "" : text);
@ -313,8 +319,24 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
theMap.put("lon", null); theMap.put("lon", null);
} }
long retweetCount = status.getRetweetCount(); if (status.getSource() != null) {
theMap.put("retweet_count", retweetCount); Matcher m = sourcePattern.matcher(status.getSource());
theMap.put("source", m.find() ? m.group(1) : status.getSource());
}
theMap.put("retweet", status.isRetweet());
if (status.isRetweet()) {
Status original = status.getRetweetedStatus();
theMap.put("retweet_count", original.getRetweetCount());
User originator = original.getUser();
theMap.put("originator_screen_name", originator != null ? originator.getScreenName() : "");
theMap.put("originator_follower_count", originator != null ? originator.getFollowersCount() : "");
theMap.put("originator_friends_count", originator != null ? originator.getFriendsCount() : "");
theMap.put("originator_verified", originator != null ? originator.isVerified() : "");
}
User user = status.getUser(); User user = status.getUser();
final boolean hasUser = (null != user); final boolean hasUser = (null != user);
theMap.put("follower_count", hasUser ? user.getFollowersCount() : 0); theMap.put("follower_count", hasUser ? user.getFollowersCount() : 0);
@ -323,6 +345,9 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
theMap.put("utc_offset", hasUser ? user.getUtcOffset() : -1); // resolution in seconds, -1 if not available? theMap.put("utc_offset", hasUser ? user.getUtcOffset() : -1); // resolution in seconds, -1 if not available?
theMap.put("statuses_count", hasUser ? user.getStatusesCount() : 0); theMap.put("statuses_count", hasUser ? user.getStatusesCount() : 0);
theMap.put("user_id", hasUser ? String.format("%d", user.getId()) : ""); theMap.put("user_id", hasUser ? String.format("%d", user.getId()) : "");
theMap.put("screen_name", hasUser ? user.getScreenName() : "");
theMap.put("location", hasUser ? user.getLocation() : "");
theMap.put("verified", hasUser ? user.isVerified() : "");
theMap.put("ts",status.getCreatedAt().getTime()); theMap.put("ts",status.getCreatedAt().getTime());