From f09f595c7c0c40e6b4f40eabc5ab9f82b309308d Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Wed, 24 Oct 2012 04:51:32 -0400 Subject: [PATCH] Fix up the demos a bit --- examples/rand/query.body | 12 +++--- examples/rand/rand_realtime.spec | 4 +- .../druid/examples/RandomFirehoseFactory.java | 24 ++++++----- .../examples/RealtimeStandaloneMain.java | 3 +- examples/twitter/query.body | 19 ++++----- examples/twitter/run_client.sh | 4 +- examples/twitter/run_server.sh | 5 +-- .../TwitterSpritzerFirehoseFactory.java | 41 ++++++++++++------- .../src/main/resources/runtime.properties | 2 - examples/twitter/twitter_realtime.spec | 35 ++++++++-------- 10 files changed, 78 insertions(+), 71 deletions(-) diff --git a/examples/rand/query.body b/examples/rand/query.body index 50bb98560d6..05007c955b9 100644 --- a/examples/rand/query.body +++ b/examples/rand/query.body @@ -1,17 +1,19 @@ { - "queryType": "topN", + "queryType": "groupBy", "dataSource": "randSeq", "granularity": "all", - "dimension": "target", - "threshold": 10, - "metric": "randomNumberSum", + "dimensions": [], "aggregations":[ { "type": "count", "name": "rows"}, { "type": "doubleSum", "fieldName": "events", "name": "e"}, { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"} ], "postAggregations":[ - {"type":"arithmetic","name":"avg_random","fn":"/","fields":[{"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},{"type":"fieldAccess","name":"rows","fieldName":"rows"}]} + { "type":"arithmetic", + "name":"avg_random", + "fn":"/", + "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"}, + {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]} ], "intervals":["2012-10-01T00:00/2020-01-01T00"] } diff --git a/examples/rand/rand_realtime.spec b/examples/rand/rand_realtime.spec index 08eb4be931d..eefc16f307f 100644 --- a/examples/rand/rand_realtime.spec +++ b/examples/rand/rand_realtime.spec @@ -1,9 +1,9 @@ [{ "schema" : { "dataSource":"randseq", "aggregators":[ {"type":"count", "name":"events"}, - {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ], + {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ], "indexGranularity":"minute", - "shardSpec" : { "type": "none" } }, + "shardSpec" : { "type": "none" } }, "config" : { "maxRowsInMemory" : 50000, "intermediatePersistPeriod" : "PT2m" }, diff --git a/examples/rand/src/main/java/druid/examples/RandomFirehoseFactory.java b/examples/rand/src/main/java/druid/examples/RandomFirehoseFactory.java index 4d914982faf..0fdd1bbb9b2 100644 --- a/examples/rand/src/main/java/druid/examples/RandomFirehoseFactory.java +++ b/examples/rand/src/main/java/druid/examples/RandomFirehoseFactory.java @@ -18,7 +18,8 @@ import java.util.Random; import static java.lang.Thread.sleep; -/** Random value sequence Firehost Factory named "rand". +/** + * Random value sequence Firehost Factory named "rand". * Builds a Firehose that emits a stream of random numbers (outColumn, a positive double) * with timestamps along with an associated token (target). This provides a timeseries * that requires no network access for demonstration, characterization, and testing. @@ -42,9 +43,9 @@ import static java.lang.Thread.sleep; [{ "schema" : { "dataSource":"randseq", "aggregators":[ {"type":"count", "name":"events"}, - {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ], + {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ], "indexGranularity":"minute", - "shardSpec" : { "type": "none" } }, + "shardSpec" : { "type": "none" } }, "config" : { "maxRowsInMemory" : 50000, "intermediatePersistPeriod" : "PT2m" }, @@ -63,25 +64,26 @@ import static java.lang.Thread.sleep; }] * * - * Example query using POST to /druid/v2/?w (where w is an arbitrary parameter and the UTC date and time - * MUST be adjusted for the current hour): + * Example query using POST to /druid/v2/ (where UTC date and time MUST include the current hour): *
  {
-     "queryType": "topN",
+     "queryType": "groupBy",
      "dataSource": "randSeq",
      "granularity": "all",
-     "dimension": "target",
-     "threshold": 10,
-     "metric": "randomNumberSum",
+     "dimensions": [],
      "aggregations":[
      { "type": "count", "name": "rows"},
      { "type": "doubleSum", "fieldName": "events", "name": "e"},
      { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
      ],
      "postAggregations":[
-     {"type":"arithmetic","name":"avg_random","fn":"/","fields":[{"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},{"type":"fieldAccess","name":"rows","fieldName":"rows"}]}
+     {  "type":"arithmetic",
+        "name":"avg_random",
+        "fn":"/",
+        "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
+                   {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
      ],
-     "intervals":["2012-10-16T20:03/2012-10-16T21"]
+     "intervals":["2012-10-01T00:00/2020-01-01T00"]
  }
  * 
*/ diff --git a/examples/rand/src/main/java/druid/examples/RealtimeStandaloneMain.java b/examples/rand/src/main/java/druid/examples/RealtimeStandaloneMain.java index 0b33e7d8719..29263f1d6e5 100644 --- a/examples/rand/src/main/java/druid/examples/RealtimeStandaloneMain.java +++ b/examples/rand/src/main/java/druid/examples/RealtimeStandaloneMain.java @@ -18,7 +18,8 @@ import org.codehaus.jackson.map.jsontype.NamedType; import java.io.File; import java.io.IOException; -/** Standalone Demo Realtime process. +/** + * Standalone Demo Realtime process. * Created: 20121009T2050 * * @author pbaclace diff --git a/examples/twitter/query.body b/examples/twitter/query.body index b2c1b78a8b3..a81b84aaa32 100644 --- a/examples/twitter/query.body +++ b/examples/twitter/query.body @@ -1,25 +1,22 @@ { - "queryType": "topN", + "queryType": "groupBy", "dataSource": "twitterstream", "granularity": "all", - "dimension": "lang", + "dimension": ["lang"], "threshold": 10, "metric": "totally", "aggregations":[ { "type": "count", "name": "rows"}, - { "type": "doubleSum", "fieldName": "events", "name": "e"}, + { "type": "doubleSum", "fieldName": "tweets", "name": "tweets"}, - { "type": "max", "fieldName": "maxStatusesCount", "name": "theMaxStatusesCount"}, - { "type": "max", "fieldName": "maxRetweetCount", "name": "theMaxRetweetCount"}, + { "type": "max", "fieldName": "max_statuses_count", "name": "theMaxStatusesCount"}, + { "type": "max", "fieldName": "max_retweet_count", "name": "theMaxRetweetCount"}, - { "type": "max", "fieldName": "maxFriendsCount", "name": "theMaxFriendsCount"}, - { "type": "max", "fieldName": "maxFollowerCount", "name": "theMaxFollowerCount"}, + { "type": "max", "fieldName": "max_friends_count", "name": "theMaxFriendsCount"}, + { "type": "max", "fieldName": "max_follower_count", "name": "theMaxFollowerCount"}, - { "type": "doubleSum", "fieldName": "totalStatusesCount", "name": "totally"} + { "type": "doubleSum", "fieldName": "total_statuses_count", "name": "total_tweets_all_time"} - ], - "postAggregations":[ - {"type":"arithmetic","name":"avg_f","fn":"/","fields":[{"type":"fieldAccess","name":"dummy","fieldName":"totally"},{"type":"fieldAccess","name":"rows","fieldName":"rows"}]} ], "intervals":["2012-10-01T00:00/2020-01-01T00"] } diff --git a/examples/twitter/run_client.sh b/examples/twitter/run_client.sh index b2fcd95119c..e8113285502 100755 --- a/examples/twitter/run_client.sh +++ b/examples/twitter/run_client.sh @@ -9,12 +9,12 @@ trap "exit 1" 1 2 3 15 [ ! -e query.body ] && echo "expecting file query.body to be in current directory" && exit 2 -for delay in 1 30 +for delay in 5 15 15 15 15 15 15 15 15 15 15 do echo "sleep for $delay seconds..." echo " " sleep $delay - curl -X POST 'http://localhost:8080/druid/v2/?w' -H 'content-type: application/json' -d @query.body + curl -X POST 'http://localhost:8080/druid/v2/' -H 'content-type: application/json' -d @query.body echo " " echo " " done diff --git a/examples/twitter/run_server.sh b/examples/twitter/run_server.sh index 8d9094000cf..86fe2c5857c 100755 --- a/examples/twitter/run_server.sh +++ b/examples/twitter/run_server.sh @@ -25,9 +25,6 @@ else fi trap "${PF_CLEANUP} ; exit 1" 1 2 3 15 -# be sure to use UTC -export TZ=UTC - # props are set in src/main/resources/runtime.properties [ -d /tmp/twitter_realtime ] && echo "cleaning up from previous run.." && /bin/rm -fr /tmp/twitter_realtime @@ -39,7 +36,7 @@ OPT_PROPS="" # start RealtimeNode process # -java -Xmx600m -classpath target/druid-examples-twitter-*-selfcontained.jar $OPT_PROPS -Dtwitter4j.http.useSSL=true -Ddruid.realtime.specFile=twitter_realtime.spec druid.examples.RealtimeStandaloneMain >RealtimeNode.out 2>&1 & +java -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Xmx600m -classpath target/druid-examples-twitter-*-selfcontained.jar $OPT_PROPS -Dtwitter4j.http.useSSL=true -Ddruid.realtime.specFile=twitter_realtime.spec druid.examples.RealtimeStandaloneMain >RealtimeNode.out 2>&1 & PID=$! sleep 4 grep com.metamx.druid.realtime.TwitterSpritzerFirehoseFactory RealtimeNode.out | awk '{ print $7,$8,$9,$10,$11,$12,$13,$14,$15 }' diff --git a/examples/twitter/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java b/examples/twitter/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java index 304dc46d79b..249ea12e3b1 100644 --- a/examples/twitter/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java +++ b/examples/twitter/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java @@ -40,6 +40,17 @@ import static java.lang.Thread.*; * is UTC): *
  * 
+ * + * + * Notes on twitter.com HTTP (REST) API: v1.0 will be disabled around 2013-03 so v1.1 should be used; + * twitter4j 3.0 (not yet released) will support the v1.1 api. + * Specifically, we should be using https://stream.twitter.com/1.1/statuses/sample.json + * See: http://jira.twitter4j.org/browse/TFJ-186 + * + * Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString() + * can have number format exceptions), so it might be necessary to extract raw json and process it + * separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON(); + * org.codehaus.jackson.map.ObjectMapper should be used to parse. * @author pbaclace */ @JsonTypeName("twitzer") @@ -106,12 +117,12 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory { final long startMsec = System.currentTimeMillis(); dimensions.add("htags"); - dimensions.add("retweetCount"); - dimensions.add("followerCount"); - dimensions.add("friendsCount"); + dimensions.add("retweet_count"); + dimensions.add("follower_count"); + dimensions.add("friends_count"); dimensions.add("lang"); - dimensions.add("utcOffset"); - dimensions.add("statusesCount"); + dimensions.add("utc_offset"); + dimensions.add("statuses_count"); // // set up Twitter Spritzer @@ -185,7 +196,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory { if (maxRunMinutes <= 0) { return false; } else { - return (System.currentTimeMillis() - startMsec) / 10000L >= maxRunMinutes; + return (System.currentTimeMillis() - startMsec) / 60000L >= maxRunMinutes; } } @@ -249,23 +260,23 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory { } long retweetCount = status.getRetweetCount(); - theMap.put("retweetCount", retweetCount); + theMap.put("retweet_count", retweetCount); User u = status.getUser(); if (u != null) { - theMap.put("followerCount", u.getFollowersCount()); - theMap.put("friendsCount", u.getFriendsCount()); + theMap.put("follower_count", u.getFollowersCount()); + theMap.put("friends_count", u.getFriendsCount()); theMap.put("lang", u.getLang()); - theMap.put("utcOffset", u.getUtcOffset()); // resolution in seconds, -1 if not available? - theMap.put("statusesCount", u.getStatusesCount()); + theMap.put("utc_offset", u.getUtcOffset()); // resolution in seconds, -1 if not available? + theMap.put("statuses_count", u.getStatusesCount()); } else { log.error("status.getUser() is null"); } if (rowCount % 10 == 0) { log.info("" + status.getCreatedAt() + - " followerCount=" + u.getFollowersCount() + - " friendsCount=" + u.getFriendsCount() + - " statusesCount=" + u.getStatusesCount() + - " retweetCount=" + retweetCount + " follower_count=" + u.getFollowersCount() + + " friends_count=" + u.getFriendsCount() + + " statuses_count=" + u.getStatusesCount() + + " retweet_count=" + retweetCount ); } diff --git a/examples/twitter/src/main/resources/runtime.properties b/examples/twitter/src/main/resources/runtime.properties index 16a12f26a34..3f7fdeaec50 100644 --- a/examples/twitter/src/main/resources/runtime.properties +++ b/examples/twitter/src/main/resources/runtime.properties @@ -107,8 +107,6 @@ druid.service=foo druid.pusher.s3.bucket= druid.pusher.s3.baseKey= -# TODO: should the next prop also work via runtime.properties ? -# next MUST be on command line, does not work here druid.realtime.specFile=twitter_realtime.spec # diff --git a/examples/twitter/twitter_realtime.spec b/examples/twitter/twitter_realtime.spec index b44333799d8..8ffe014822b 100644 --- a/examples/twitter/twitter_realtime.spec +++ b/examples/twitter/twitter_realtime.spec @@ -1,33 +1,32 @@ [{ "schema" : { "dataSource":"twitterstream", "aggregators":[ - {"type":"count", "name":"events"}, - {"type":"doubleSum","fieldName":"followerCount","name":"totalFollowerCount"}, - {"type":"doubleSum","fieldName":"retweetCount","name":"totalRetweetCount"}, - {"type":"doubleSum","fieldName":"friendsCount","name":"totalFriendsCount"}, - {"type":"doubleSum","fieldName":"statusesCount","name":"totalStatusesCount"}, + {"type":"count", "name":"tweets"}, + {"type":"doubleSum","fieldName":"follower_count","name":"total_follower_count"}, + {"type":"doubleSum","fieldName":"retweet_count","name":"tota_retweet_count"}, + {"type":"doubleSum","fieldName":"friends_count","name":"total_friends_count"}, + {"type":"doubleSum","fieldName":"statuses_count","name":"total_statuses_count"}, - {"type":"min","fieldName":"followerCount","name":"minFollowerCount"}, - {"type":"max","fieldName":"followerCount","name":"maxFollowerCount"}, + {"type":"min","fieldName":"follower_count","name":"min_follower_count"}, + {"type":"max","fieldName":"follower_count","name":"max_follower_count"}, - {"type":"min","fieldName":"friendsCount","name":"minFriendsCount"}, - {"type":"max","fieldName":"friendsCount","name":"maxFriendsCount"}, + {"type":"min","fieldName":"friends_count","name":"min_friends_count"}, + {"type":"max","fieldName":"friends_count","name":"max_friends_count"}, - {"type":"min","fieldName":"statusesCount","name":"minStatusesCount"}, - {"type":"max","fieldName":"statusesCount","name":"maxStatusesCount"}, + {"type":"min","fieldName":"statuses_count","name":"min_statuses_count"}, + {"type":"max","fieldName":"statuses_count","name":"max_statuses_count"}, - {"type":"min","fieldName":"retweetCount","name":"minRetweetCount"}, - {"type":"max","fieldName":"retweetCount","name":"maxRetweetCount"} - - ], + {"type":"min","fieldName":"retweet_count","name":"min_retweet_count"}, + {"type":"max","fieldName":"retweet_count","name":"max_retweet_count"} + ], "indexGranularity":"minute", - "shardSpec" : { "type": "none" } }, + "shardSpec" : { "type": "none" } }, "config" : { "maxRowsInMemory" : 50000, "intermediatePersistPeriod" : "PT2m" }, "firehose" : { "type" : "twitzer", - "maxEventCount": 10000, - "maxRunMinutes" : 5 + "maxEventCount": 50000, + "maxRunMinutes" : 10 }, "plumber" : { "type" : "realtime",