Fix up the demos a bit

This commit is contained in:
Eric Tschetter 2012-10-24 04:51:32 -04:00
parent 9d41599967
commit f09f595c7c
10 changed files with 78 additions and 71 deletions

View File

@ -1,17 +1,19 @@
{ {
"queryType": "topN", "queryType": "groupBy",
"dataSource": "randSeq", "dataSource": "randSeq",
"granularity": "all", "granularity": "all",
"dimension": "target", "dimensions": [],
"threshold": 10,
"metric": "randomNumberSum",
"aggregations":[ "aggregations":[
{ "type": "count", "name": "rows"}, { "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "events", "name": "e"}, { "type": "doubleSum", "fieldName": "events", "name": "e"},
{ "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"} { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
], ],
"postAggregations":[ "postAggregations":[
{"type":"arithmetic","name":"avg_random","fn":"/","fields":[{"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},{"type":"fieldAccess","name":"rows","fieldName":"rows"}]} { "type":"arithmetic",
"name":"avg_random",
"fn":"/",
"fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
{"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
], ],
"intervals":["2012-10-01T00:00/2020-01-01T00"] "intervals":["2012-10-01T00:00/2020-01-01T00"]
} }

View File

@ -1,9 +1,9 @@
[{ [{
"schema" : { "dataSource":"randseq", "schema" : { "dataSource":"randseq",
"aggregators":[ {"type":"count", "name":"events"}, "aggregators":[ {"type":"count", "name":"events"},
{"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ], {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
"indexGranularity":"minute", "indexGranularity":"minute",
"shardSpec" : { "type": "none" } }, "shardSpec" : { "type": "none" } },
"config" : { "maxRowsInMemory" : 50000, "config" : { "maxRowsInMemory" : 50000,
"intermediatePersistPeriod" : "PT2m" }, "intermediatePersistPeriod" : "PT2m" },

View File

@ -18,7 +18,8 @@ import java.util.Random;
import static java.lang.Thread.sleep; import static java.lang.Thread.sleep;
/** Random value sequence Firehost Factory named "rand". /**
* Random value sequence Firehost Factory named "rand".
* Builds a Firehose that emits a stream of random numbers (outColumn, a positive double) * Builds a Firehose that emits a stream of random numbers (outColumn, a positive double)
* with timestamps along with an associated token (target). This provides a timeseries * with timestamps along with an associated token (target). This provides a timeseries
* that requires no network access for demonstration, characterization, and testing. * that requires no network access for demonstration, characterization, and testing.
@ -42,9 +43,9 @@ import static java.lang.Thread.sleep;
[{ [{
"schema" : { "dataSource":"randseq", "schema" : { "dataSource":"randseq",
"aggregators":[ {"type":"count", "name":"events"}, "aggregators":[ {"type":"count", "name":"events"},
{"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ], {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
"indexGranularity":"minute", "indexGranularity":"minute",
"shardSpec" : { "type": "none" } }, "shardSpec" : { "type": "none" } },
"config" : { "maxRowsInMemory" : 50000, "config" : { "maxRowsInMemory" : 50000,
"intermediatePersistPeriod" : "PT2m" }, "intermediatePersistPeriod" : "PT2m" },
@ -63,25 +64,26 @@ import static java.lang.Thread.sleep;
}] }]
* </pre> * </pre>
* *
* Example query using POST to /druid/v2/?w (where w is an arbitrary parameter and the UTC date and time * Example query using POST to /druid/v2/ (where UTC date and time MUST include the current hour):
* MUST be adjusted for the current hour):
* <pre> * <pre>
{ {
"queryType": "topN", "queryType": "groupBy",
"dataSource": "randSeq", "dataSource": "randSeq",
"granularity": "all", "granularity": "all",
"dimension": "target", "dimensions": [],
"threshold": 10,
"metric": "randomNumberSum",
"aggregations":[ "aggregations":[
{ "type": "count", "name": "rows"}, { "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "events", "name": "e"}, { "type": "doubleSum", "fieldName": "events", "name": "e"},
{ "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"} { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
], ],
"postAggregations":[ "postAggregations":[
{"type":"arithmetic","name":"avg_random","fn":"/","fields":[{"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},{"type":"fieldAccess","name":"rows","fieldName":"rows"}]} { "type":"arithmetic",
"name":"avg_random",
"fn":"/",
"fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
{"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
], ],
"intervals":["2012-10-16T20:03/2012-10-16T21"] "intervals":["2012-10-01T00:00/2020-01-01T00"]
} }
* </pre> * </pre>
*/ */

View File

@ -18,7 +18,8 @@ import org.codehaus.jackson.map.jsontype.NamedType;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
/** Standalone Demo Realtime process. /**
* Standalone Demo Realtime process.
* Created: 20121009T2050 * Created: 20121009T2050
* *
* @author pbaclace * @author pbaclace

View File

@ -1,25 +1,22 @@
{ {
"queryType": "topN", "queryType": "groupBy",
"dataSource": "twitterstream", "dataSource": "twitterstream",
"granularity": "all", "granularity": "all",
"dimension": "lang", "dimension": ["lang"],
"threshold": 10, "threshold": 10,
"metric": "totally", "metric": "totally",
"aggregations":[ "aggregations":[
{ "type": "count", "name": "rows"}, { "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "events", "name": "e"}, { "type": "doubleSum", "fieldName": "tweets", "name": "tweets"},
{ "type": "max", "fieldName": "maxStatusesCount", "name": "theMaxStatusesCount"}, { "type": "max", "fieldName": "max_statuses_count", "name": "theMaxStatusesCount"},
{ "type": "max", "fieldName": "maxRetweetCount", "name": "theMaxRetweetCount"}, { "type": "max", "fieldName": "max_retweet_count", "name": "theMaxRetweetCount"},
{ "type": "max", "fieldName": "maxFriendsCount", "name": "theMaxFriendsCount"}, { "type": "max", "fieldName": "max_friends_count", "name": "theMaxFriendsCount"},
{ "type": "max", "fieldName": "maxFollowerCount", "name": "theMaxFollowerCount"}, { "type": "max", "fieldName": "max_follower_count", "name": "theMaxFollowerCount"},
{ "type": "doubleSum", "fieldName": "totalStatusesCount", "name": "totally"} { "type": "doubleSum", "fieldName": "total_statuses_count", "name": "total_tweets_all_time"}
],
"postAggregations":[
{"type":"arithmetic","name":"avg_f","fn":"/","fields":[{"type":"fieldAccess","name":"dummy","fieldName":"totally"},{"type":"fieldAccess","name":"rows","fieldName":"rows"}]}
], ],
"intervals":["2012-10-01T00:00/2020-01-01T00"] "intervals":["2012-10-01T00:00/2020-01-01T00"]
} }

View File

@ -9,12 +9,12 @@ trap "exit 1" 1 2 3 15
[ ! -e query.body ] && echo "expecting file query.body to be in current directory" && exit 2 [ ! -e query.body ] && echo "expecting file query.body to be in current directory" && exit 2
for delay in 1 30 for delay in 5 15 15 15 15 15 15 15 15 15 15
do do
echo "sleep for $delay seconds..." echo "sleep for $delay seconds..."
echo " " echo " "
sleep $delay sleep $delay
curl -X POST 'http://localhost:8080/druid/v2/?w' -H 'content-type: application/json' -d @query.body curl -X POST 'http://localhost:8080/druid/v2/' -H 'content-type: application/json' -d @query.body
echo " " echo " "
echo " " echo " "
done done

View File

@ -25,9 +25,6 @@ else
fi fi
trap "${PF_CLEANUP} ; exit 1" 1 2 3 15 trap "${PF_CLEANUP} ; exit 1" 1 2 3 15
# be sure to use UTC
export TZ=UTC
# props are set in src/main/resources/runtime.properties # props are set in src/main/resources/runtime.properties
[ -d /tmp/twitter_realtime ] && echo "cleaning up from previous run.." && /bin/rm -fr /tmp/twitter_realtime [ -d /tmp/twitter_realtime ] && echo "cleaning up from previous run.." && /bin/rm -fr /tmp/twitter_realtime
@ -39,7 +36,7 @@ OPT_PROPS=""
# start RealtimeNode process # start RealtimeNode process
# #
java -Xmx600m -classpath target/druid-examples-twitter-*-selfcontained.jar $OPT_PROPS -Dtwitter4j.http.useSSL=true -Ddruid.realtime.specFile=twitter_realtime.spec druid.examples.RealtimeStandaloneMain >RealtimeNode.out 2>&1 & java -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Xmx600m -classpath target/druid-examples-twitter-*-selfcontained.jar $OPT_PROPS -Dtwitter4j.http.useSSL=true -Ddruid.realtime.specFile=twitter_realtime.spec druid.examples.RealtimeStandaloneMain >RealtimeNode.out 2>&1 &
PID=$! PID=$!
sleep 4 sleep 4
grep com.metamx.druid.realtime.TwitterSpritzerFirehoseFactory RealtimeNode.out | awk '{ print $7,$8,$9,$10,$11,$12,$13,$14,$15 }' grep com.metamx.druid.realtime.TwitterSpritzerFirehoseFactory RealtimeNode.out | awk '{ print $7,$8,$9,$10,$11,$12,$13,$14,$15 }'

View File

@ -40,6 +40,17 @@ import static java.lang.Thread.*;
* is UTC): * is UTC):
* <pre> * <pre>
* </pre> * </pre>
*
*
* Notes on twitter.com HTTP (REST) API: v1.0 will be disabled around 2013-03 so v1.1 should be used;
* twitter4j 3.0 (not yet released) will support the v1.1 api.
* Specifically, we should be using https://stream.twitter.com/1.1/statuses/sample.json
* See: http://jira.twitter4j.org/browse/TFJ-186
*
* Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString()
* can have number format exceptions), so it might be necessary to extract raw json and process it
* separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON();
* org.codehaus.jackson.map.ObjectMapper should be used to parse.
* @author pbaclace * @author pbaclace
*/ */
@JsonTypeName("twitzer") @JsonTypeName("twitzer")
@ -106,12 +117,12 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
final long startMsec = System.currentTimeMillis(); final long startMsec = System.currentTimeMillis();
dimensions.add("htags"); dimensions.add("htags");
dimensions.add("retweetCount"); dimensions.add("retweet_count");
dimensions.add("followerCount"); dimensions.add("follower_count");
dimensions.add("friendsCount"); dimensions.add("friends_count");
dimensions.add("lang"); dimensions.add("lang");
dimensions.add("utcOffset"); dimensions.add("utc_offset");
dimensions.add("statusesCount"); dimensions.add("statuses_count");
// //
// set up Twitter Spritzer // set up Twitter Spritzer
@ -185,7 +196,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
if (maxRunMinutes <= 0) { if (maxRunMinutes <= 0) {
return false; return false;
} else { } else {
return (System.currentTimeMillis() - startMsec) / 10000L >= maxRunMinutes; return (System.currentTimeMillis() - startMsec) / 60000L >= maxRunMinutes;
} }
} }
@ -249,23 +260,23 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
} }
long retweetCount = status.getRetweetCount(); long retweetCount = status.getRetweetCount();
theMap.put("retweetCount", retweetCount); theMap.put("retweet_count", retweetCount);
User u = status.getUser(); User u = status.getUser();
if (u != null) { if (u != null) {
theMap.put("followerCount", u.getFollowersCount()); theMap.put("follower_count", u.getFollowersCount());
theMap.put("friendsCount", u.getFriendsCount()); theMap.put("friends_count", u.getFriendsCount());
theMap.put("lang", u.getLang()); theMap.put("lang", u.getLang());
theMap.put("utcOffset", u.getUtcOffset()); // resolution in seconds, -1 if not available? theMap.put("utc_offset", u.getUtcOffset()); // resolution in seconds, -1 if not available?
theMap.put("statusesCount", u.getStatusesCount()); theMap.put("statuses_count", u.getStatusesCount());
} else { } else {
log.error("status.getUser() is null"); log.error("status.getUser() is null");
} }
if (rowCount % 10 == 0) { if (rowCount % 10 == 0) {
log.info("" + status.getCreatedAt() + log.info("" + status.getCreatedAt() +
" followerCount=" + u.getFollowersCount() + " follower_count=" + u.getFollowersCount() +
" friendsCount=" + u.getFriendsCount() + " friends_count=" + u.getFriendsCount() +
" statusesCount=" + u.getStatusesCount() + " statuses_count=" + u.getStatusesCount() +
" retweetCount=" + retweetCount " retweet_count=" + retweetCount
); );
} }

View File

@ -107,8 +107,6 @@ druid.service=foo
druid.pusher.s3.bucket= druid.pusher.s3.bucket=
druid.pusher.s3.baseKey= druid.pusher.s3.baseKey=
# TODO: should the next prop also work via runtime.properties ?
# next MUST be on command line, does not work here
druid.realtime.specFile=twitter_realtime.spec druid.realtime.specFile=twitter_realtime.spec
# #

View File

@ -1,33 +1,32 @@
[{ [{
"schema" : { "dataSource":"twitterstream", "schema" : { "dataSource":"twitterstream",
"aggregators":[ "aggregators":[
{"type":"count", "name":"events"}, {"type":"count", "name":"tweets"},
{"type":"doubleSum","fieldName":"followerCount","name":"totalFollowerCount"}, {"type":"doubleSum","fieldName":"follower_count","name":"total_follower_count"},
{"type":"doubleSum","fieldName":"retweetCount","name":"totalRetweetCount"}, {"type":"doubleSum","fieldName":"retweet_count","name":"tota_retweet_count"},
{"type":"doubleSum","fieldName":"friendsCount","name":"totalFriendsCount"}, {"type":"doubleSum","fieldName":"friends_count","name":"total_friends_count"},
{"type":"doubleSum","fieldName":"statusesCount","name":"totalStatusesCount"}, {"type":"doubleSum","fieldName":"statuses_count","name":"total_statuses_count"},
{"type":"min","fieldName":"followerCount","name":"minFollowerCount"}, {"type":"min","fieldName":"follower_count","name":"min_follower_count"},
{"type":"max","fieldName":"followerCount","name":"maxFollowerCount"}, {"type":"max","fieldName":"follower_count","name":"max_follower_count"},
{"type":"min","fieldName":"friendsCount","name":"minFriendsCount"}, {"type":"min","fieldName":"friends_count","name":"min_friends_count"},
{"type":"max","fieldName":"friendsCount","name":"maxFriendsCount"}, {"type":"max","fieldName":"friends_count","name":"max_friends_count"},
{"type":"min","fieldName":"statusesCount","name":"minStatusesCount"}, {"type":"min","fieldName":"statuses_count","name":"min_statuses_count"},
{"type":"max","fieldName":"statusesCount","name":"maxStatusesCount"}, {"type":"max","fieldName":"statuses_count","name":"max_statuses_count"},
{"type":"min","fieldName":"retweetCount","name":"minRetweetCount"}, {"type":"min","fieldName":"retweet_count","name":"min_retweet_count"},
{"type":"max","fieldName":"retweetCount","name":"maxRetweetCount"} {"type":"max","fieldName":"retweet_count","name":"max_retweet_count"}
],
],
"indexGranularity":"minute", "indexGranularity":"minute",
"shardSpec" : { "type": "none" } }, "shardSpec" : { "type": "none" } },
"config" : { "maxRowsInMemory" : 50000, "config" : { "maxRowsInMemory" : 50000,
"intermediatePersistPeriod" : "PT2m" }, "intermediatePersistPeriod" : "PT2m" },
"firehose" : { "type" : "twitzer", "firehose" : { "type" : "twitzer",
"maxEventCount": 10000, "maxEventCount": 50000,
"maxRunMinutes" : 5 "maxRunMinutes" : 10
}, },
"plumber" : { "type" : "realtime", "plumber" : { "type" : "realtime",