Merge pull request #963 from metamx/twitterFirehoseUpdate

Added more Twitter fields to TwitterSpritzerFirehoseFactory
This commit is contained in:
Fangjin Yang 2014-12-15 13:35:15 -07:00
commit 8a3ce8acd9
5 changed files with 299 additions and 108 deletions

View File

@ -1,4 +1,31 @@
{ {
"queryType": "timeBoundary", "description": "Simple data split up by hour",
"dataSource": "twitterstream" "aggregations": [
{
"name": "tweets",
"type": "longSum",
"fieldName": "tweets"
},
{
"fieldName": "text_hll",
"name": "text_hll",
"type": "hyperUnique"
},
{
"fieldName": "htags_hll",
"name": "htag_hll",
"type": "hyperUnique"
},
{
"fieldName": "user_id_hll",
"name": "user_id_hll",
"type": "hyperUnique"
}
],
"dataSource": "twitterstream",
"granularity": "hour",
"intervals": [
"1970-01-01T00:00:00.000/2019-01-03T00:00:00.000"
],
"queryType": "timeseries"
} }

View File

@ -0,0 +1,94 @@
{
"description": "Top 10 languages by count of tweets in the contiguous US",
"aggregations": [
{
"fieldName": "tweets",
"name": "tweets",
"type": "longSum"
},
{
"fieldName": "user_id_hll",
"name": "user_id_hll",
"type": "hyperUnique"
},
{
"fieldName": "contributors_hll",
"name": "contributors_hll",
"type": "hyperUnique"
},
{
"fieldName": "htags_hll",
"name": "htags_hll",
"type": "hyperUnique"
},
{
"fieldName": "text_hll",
"name": "text_hll",
"type": "hyperUnique"
},
{
"fieldName": "min_follower_count",
"name": "min_follower_count",
"type": "min"
},
{
"fieldName": "max_follower_count",
"name": "max_follower_count",
"type": "max"
},
{
"fieldName": "min_friends_count",
"name": "min_friends_count",
"type": "min"
},
{
"fieldName": "max_friends_count",
"name": "max_friends_count",
"type": "max"
},
{
"fieldName": "min_statuses_count",
"name": "min_statuses_count",
"type": "min"
},
{
"fieldName": "max_statuses_count",
"name": "max_statuses_count",
"type": "max"
},
{
"fieldName": "min_retweet_count",
"name": "min_retweet_count",
"type": "min"
},
{
"fieldName": "max_retweet_count",
"name": "max_retweet_count",
"type": "max"
}
],
"dataSource": "twitterstream",
"dimension": "lang",
"filter": {
"bound": {
"maxCoords": [
50,
-65
],
"minCoords": [
25,
-127
],
"type": "rectangular"
},
"dimension": "geo",
"type": "spatial"
},
"granularity": "all",
"intervals": [
"2013-06-01T00:00/2020-01-01T00"
],
"metric": "tweets",
"queryType": "topN",
"threshold": "10"
}

View File

@ -1,119 +1,151 @@
[
{ {
"description": "Ingestion spec for Twitter spritzer. Dimension values taken from io.druid.examples.twitter.TwitterSpritzerFirehoseFactory",
"spec": {
"dataSchema": { "dataSchema": {
"dataSource": "twitterstream", "dataSource": "twitterstream",
"parser": { "granularitySpec": {
"parseSpec": { "queryGranularity": "all",
"format": "json", "segmentGranularity": "hour",
"timestampSpec": { "type": "uniform"
"column": "utcdt",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
],
"dimensionExclusions": [
],
"spatialDimensions": [
]
}
}
}, },
"metricsSpec": [ "metricsSpec": [
{ {
"type": "count", "name": "tweets",
"name": "tweets" "type": "count"
}, },
{ {
"type": "doubleSum",
"fieldName": "follower_count", "fieldName": "follower_count",
"name": "total_follower_count" "name": "total_follower_count",
"type": "doubleSum"
}, },
{ {
"type": "doubleSum",
"fieldName": "retweet_count", "fieldName": "retweet_count",
"name": "total_retweet_count" "name": "total_retweet_count",
"type": "doubleSum"
}, },
{ {
"type": "doubleSum",
"fieldName": "friends_count", "fieldName": "friends_count",
"name": "total_friends_count" "name": "total_friends_count",
"type": "doubleSum"
}, },
{ {
"type": "doubleSum",
"fieldName": "statuses_count", "fieldName": "statuses_count",
"name": "total_statuses_count" "name": "total_statuses_count",
"type": "doubleSum"
},
{
"fieldName": "text",
"name": "text_hll",
"type": "hyperUnique"
},
{
"fieldName": "user_id",
"name": "user_id_hll",
"type": "hyperUnique"
},
{
"fieldName": "contributors",
"name": "contributors_hll",
"type": "hyperUnique"
},
{
"fieldName": "htags",
"name": "htags_hll",
"type": "hyperUnique"
}, },
{ {
"type": "min",
"fieldName": "follower_count", "fieldName": "follower_count",
"name": "min_follower_count" "name": "min_follower_count",
"type": "min"
}, },
{ {
"type": "max",
"fieldName": "follower_count", "fieldName": "follower_count",
"name": "max_follower_count" "name": "max_follower_count",
"type": "max"
}, },
{ {
"type": "min",
"fieldName": "friends_count", "fieldName": "friends_count",
"name": "min_friends_count" "name": "min_friends_count",
"type": "min"
}, },
{ {
"type": "max",
"fieldName": "friends_count", "fieldName": "friends_count",
"name": "max_friends_count" "name": "max_friends_count",
"type": "max"
}, },
{ {
"type": "min",
"fieldName": "statuses_count", "fieldName": "statuses_count",
"name": "min_statuses_count" "name": "min_statuses_count",
"type": "min"
}, },
{ {
"type": "max",
"fieldName": "statuses_count", "fieldName": "statuses_count",
"name": "max_statuses_count" "name": "max_statuses_count",
"type": "max"
}, },
{ {
"type": "min",
"fieldName": "retweet_count", "fieldName": "retweet_count",
"name": "min_retweet_count" "name": "min_retweet_count",
"type": "min"
}, },
{ {
"type": "max",
"fieldName": "retweet_count", "fieldName": "retweet_count",
"name": "max_retweet_count" "name": "max_retweet_count",
"type": "max"
} }
], ],
"granularitySpec": { "parser": {
"type": "uniform", "parseSpec": {
"segmentGranularity": "DAY", "dimensionsSpec": {
"queryGranularity": "NONE" "dimensions": [
"text",
"htags",
"contributors",
"lat",
"lon",
"retweet_count",
"follower_count",
"friendscount",
"lang",
"utc_offset",
"statuses_count",
"user_id",
"ts"
],
"dimensionExclusions": [
],
"spatialDimensions": [
{
"dimName": "geo",
"dims": [
"lat",
"lon"
]
}
]
},
"format": "json",
"timestampSpec": {
"column": "ts",
"format": "millis"
}
}
} }
}, },
"ioConfig": { "ioConfig": {
"type": "realtime",
"firehose": { "firehose": {
"type": "twitzer",
"maxEventCount": 500000, "maxEventCount": 500000,
"maxRunMinutes": 120 "maxRunMinutes": 120,
"type": "twitzer"
}, },
"plumber": {
"type": "realtime" "type": "realtime"
}
}, },
"tuningConfig": { "tuningConfig": {
"type": "realtime", "intermediatePersistPeriod": "PT10m",
"maxRowsInMemory": 500000, "maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT2m", "type": "realtime",
"windowPeriod": "PT3m", "windowPeriod": "PT10m"
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "messageTime"
} }
},
"type": "index_realtime"
} }
}
]

View File

@ -1,6 +1,6 @@
/* /*
* Druid - a distributed column store. * Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc. * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
* *
* This program is free software; you can redistribute it and/or * This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License * modify it under the terms of the GNU General Public License
@ -22,6 +22,8 @@ package io.druid.examples.twitter;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
@ -30,6 +32,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.InputRowParser;
import twitter4j.ConnectionLifeCycleListener; import twitter4j.ConnectionLifeCycleListener;
import twitter4j.GeoLocation;
import twitter4j.HashtagEntity; import twitter4j.HashtagEntity;
import twitter4j.StallWarning; import twitter4j.StallWarning;
import twitter4j.Status; import twitter4j.Status;
@ -39,12 +42,13 @@ import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory; import twitter4j.TwitterStreamFactory;
import twitter4j.User; import twitter4j.User;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -58,25 +62,27 @@ import static java.lang.Thread.sleep;
* with timestamps along with ??. * with timestamps along with ??.
* The generated tuples have the form (timestamp, ????) * The generated tuples have the form (timestamp, ????)
* where the timestamp is from the twitter event. * where the timestamp is from the twitter event.
* * <p/>
* Example spec file: * Example spec file:
* * <p/>
* Example query using POST to /druid/v2/?w (where w is an arbitrary parameter and the date and time * Example query using POST to /druid/v2/?w (where w is an arbitrary parameter and the date and time
* is UTC): * is UTC):
* * <p/>
* Notes on twitter.com HTTP (REST) API: v1.0 will be disabled around 2013-03 so v1.1 should be used; * Notes on twitter.com HTTP (REST) API: v1.0 will be disabled around 2013-03 so v1.1 should be used;
* twitter4j 3.0 (not yet released) will support the v1.1 api. * twitter4j 3.0 (not yet released) will support the v1.1 api.
* Specifically, we should be using https://stream.twitter.com/1.1/statuses/sample.json * Specifically, we should be using https://stream.twitter.com/1.1/statuses/sample.json
* See: http://jira.twitter4j.org/browse/TFJ-186 * See: http://jira.twitter4j.org/browse/TFJ-186
* * <p/>
* Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString() * Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString()
* can have number format exceptions), so it might be necessary to extract raw json and process it * can have number format exceptions), so it might be necessary to extract raw json and process it
* separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON(); * separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON();
* com.fasterxml.jackson.databind.ObjectMapper should be used to parse. * com.fasterxml.jackson.databind.ObjectMapper should be used to parse.
*
* @author pbaclace * @author pbaclace
*/ */
@JsonTypeName("twitzer") @JsonTypeName("twitzer")
public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowParser> { public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private static final Logger log = new Logger(TwitterSpritzerFirehoseFactory.class); private static final Logger log = new Logger(TwitterSpritzerFirehoseFactory.class);
/** /**
* max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent * max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent
@ -107,7 +113,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
@Override @Override
public Firehose connect(InputRowParser parser) throws IOException public Firehose connect(InputRowParser parser) throws IOException
{ {
final ConnectionLifeCycleListener connectionLifeCycleListener = new ConnectionLifeCycleListener() { final ConnectionLifeCycleListener connectionLifeCycleListener = new ConnectionLifeCycleListener()
{
@Override @Override
public void onConnect() public void onConnect()
{ {
@ -135,19 +142,15 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
final int QUEUE_SIZE = 2000; final int QUEUE_SIZE = 2000;
/** This queue is used to move twitter events from the twitter4j thread to the druid ingest thread. */ /** This queue is used to move twitter events from the twitter4j thread to the druid ingest thread. */
final BlockingQueue<Status> queue = new ArrayBlockingQueue<Status>(QUEUE_SIZE); final BlockingQueue<Status> queue = new ArrayBlockingQueue<Status>(QUEUE_SIZE);
final LinkedList<String> dimensions = new LinkedList<String>();
final long startMsec = System.currentTimeMillis(); final long startMsec = System.currentTimeMillis();
dimensions.add("htags");
dimensions.add("lang");
dimensions.add("utc_offset");
// //
// set up Twitter Spritzer // set up Twitter Spritzer
// //
twitterStream = new TwitterStreamFactory().getInstance(); twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.addConnectionLifeCycleListener(connectionLifeCycleListener); twitterStream.addConnectionLifeCycleListener(connectionLifeCycleListener);
statusListener = new StatusListener() { // This is what really gets called to deliver stuff from twitter4j statusListener = new StatusListener()
{ // This is what really gets called to deliver stuff from twitter4j
@Override @Override
public void onStatus(Status status) public void onStatus(Status status)
{ {
@ -160,7 +163,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
if (!success) { if (!success) {
log.warn("queue too slow!"); log.warn("queue too slow!");
} }
} catch (InterruptedException e) { }
catch (InterruptedException e) {
throw new RuntimeException("InterruptedException", e); throw new RuntimeException("InterruptedException", e);
} }
} }
@ -192,7 +196,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
} }
@Override @Override
public void onStallWarning(StallWarning warning) { public void onStallWarning(StallWarning warning)
{
System.out.println("Got stall warning:" + warning); System.out.println("Got stall warning:" + warning);
} }
}; };
@ -201,9 +206,11 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
twitterStream.sample(); // creates a generic StatusStream twitterStream.sample(); // creates a generic StatusStream
log.info("returned from sample()"); log.info("returned from sample()");
return new Firehose() { return new Firehose()
{
private final Runnable doNothingRunnable = new Runnable() { private final Runnable doNothingRunnable = new Runnable()
{
public void run() public void run()
{ {
} }
@ -211,7 +218,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
private long rowCount = 0L; private long rowCount = 0L;
private boolean waitIfmax = (getMaxEventCount() < 0L); private boolean waitIfmax = (getMaxEventCount() < 0L);
private final Map<String, Object> theMap = new HashMap<String, Object>(2); private final Map<String, Object> theMap = new TreeMap<>();
// DIY json parsing // private final ObjectMapper omapper = new ObjectMapper(); // DIY json parsing // private final ObjectMapper omapper = new ObjectMapper();
private boolean maxTimeReached() private boolean maxTimeReached()
@ -253,7 +260,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
try { try {
log.info("reached limit, sleeping a long time..."); log.info("reached limit, sleeping a long time...");
sleep(2000000000L); sleep(2000000000L);
} catch (InterruptedException e) { }
catch (InterruptedException e) {
throw new RuntimeException("InterruptedException", e); throw new RuntimeException("InterruptedException", e);
} }
} else { } else {
@ -267,30 +275,60 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
Status status; Status status;
try { try {
status = queue.take(); status = queue.take();
} catch (InterruptedException e) { }
catch (InterruptedException e) {
throw new RuntimeException("InterruptedException", e); throw new RuntimeException("InterruptedException", e);
} }
HashtagEntity[] hts = status.getHashtagEntities(); HashtagEntity[] hts = status.getHashtagEntities();
if (hts != null && hts.length > 0) { String text = status.getText();
List<String> hashTags = Lists.newArrayListWithExpectedSize(hts.length); theMap.put("text", (null == text) ? "" : text);
for (HashtagEntity ht : hts) { theMap.put(
hashTags.add(ht.getText()); "htags", (hts.length > 0) ? Lists.transform(
Arrays.asList(hts), new Function<HashtagEntity, String>()
{
@Nullable
@Override
public String apply(HashtagEntity input)
{
return input.getText();
} }
}
) : ImmutableList.<String>of()
);
theMap.put("htags", Arrays.asList(hashTags.get(0))); long[] lcontrobutors = status.getContributors();
List<String> contributors = new ArrayList<>();
for (long contrib : lcontrobutors) {
contributors.add(String.format("%d", contrib));
}
theMap.put("contributors", contributors);
GeoLocation geoLocation = status.getGeoLocation();
if (null != geoLocation) {
double lat = status.getGeoLocation().getLatitude();
double lon = status.getGeoLocation().getLongitude();
theMap.put("lat", lat);
theMap.put("lon", lon);
} else {
theMap.put("lat", null);
theMap.put("lon", null);
} }
long retweetCount = status.getRetweetCount(); long retweetCount = status.getRetweetCount();
theMap.put("retweet_count", retweetCount); theMap.put("retweet_count", retweetCount);
User user = status.getUser(); User user = status.getUser();
if (user != null) { final boolean hasUser = (null != user);
theMap.put("follower_count", user.getFollowersCount()); theMap.put("follower_count", hasUser ? user.getFollowersCount() : 0);
theMap.put("friends_count", user.getFriendsCount()); theMap.put("friends_count", hasUser ? user.getFriendsCount() : 0);
theMap.put("lang", user.getLang()); theMap.put("lang", hasUser ? user.getLang() : "");
theMap.put("utc_offset", user.getUtcOffset()); // resolution in seconds, -1 if not available? theMap.put("utc_offset", hasUser ? user.getUtcOffset() : -1); // resolution in seconds, -1 if not available?
theMap.put("statuses_count", user.getStatusesCount()); theMap.put("statuses_count", hasUser ? user.getStatusesCount() : 0);
} theMap.put("user_id", hasUser ? String.format("%d", user.getId()) : "");
theMap.put("ts",status.getCreatedAt().getTime());
List<String> dimensions = Lists.newArrayList(theMap.keySet());
return new MapBasedInputRow(status.getCreatedAt().getTime(), dimensions, theMap); return new MapBasedInputRow(status.getCreatedAt().getTime(), dimensions, theMap);
} }

View File

@ -94,7 +94,7 @@
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>bytebuffer-collections</artifactId> <artifactId>bytebuffer-collections</artifactId>
<version>0.1.1</version> <version>0.1.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>