Refactor structure for examples and extensions

This commit is contained in:
fjy 2014-11-21 14:45:24 -08:00
parent ef62bccdec
commit 8ee4d12562
140 changed files with 719 additions and 2061 deletions

View File

@ -3,7 +3,7 @@ layout: doc_page
--- ---
# Tutorial: A First Look at Druid # Tutorial: A First Look at Druid
Greetings! This tutorial will help clarify some core Druid concepts. We will use a realtime dataset and issue some basic Druid queries. If you are ready to explore Druid, and learn a thing or two, read on! Greetings! This tutorial will help clarify some core Druid concepts. We will use a real-time dataset and issue some basic Druid queries. If you are ready to explore Druid, and learn a thing or two, read on!
About the data About the data
-------------- --------------

View File

@ -1,60 +1,73 @@
[ [
{ {
"schema": { "dataSchema" : {
"dataSource": "wikipedia", "dataSource" : "wikipedia",
"aggregators" : [{ "parser" : {
"type" : "count", "type" : "string",
"name" : "count" "parseSpec" : {
}, { "format" : "json",
"type" : "doubleSum", "timestampSpec" : {
"name" : "added", "column" : "timestamp",
"fieldName" : "added" "format" : "auto"
}, { },
"type" : "doubleSum", "dimensionsSpec" : {
"name" : "deleted", "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"fieldName" : "deleted" "dimensionExclusions" : [],
}, { "spatialDimensions" : []
"type" : "doubleSum", }
"name" : "delta",
"fieldName" : "delta"
}],
"indexGranularity": "none"
},
"config": {
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m"
},
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "wikipedia",
"parser": {
"timestampSpec": {
"column": "timestamp"
},
"data": {
"format": "json",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
} }
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE"
} }
}, },
"plumber": { "ioConfig" : {
"type": "realtime", "type" : "realtime",
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "wikipedia"
},
"plumber": {
"type": "realtime"
}
},
"tuningConfig": {
"type" : "realtime",
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m", "windowPeriod": "PT10m",
"segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist", "basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": { "rejectionPolicy": {
"type": "test" "type": "messageTime"
} }
} }
} }
] ]

View File

@ -1,51 +1,82 @@
{ {
"dataSource": "wikipedia", "dataSchema": {
"timestampSpec" : { "dataSource": "wikipedia",
"column": "timestamp", "parser": {
"format": "iso" "type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2013-08-31/2013-09-01"]
}
}, },
"dataSpec": { "ioConfig": {
"format": "json", "type": "hadoop",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"] "inputSpec": {
"type": "static",
"paths": "/myPath/druid-services-0.6.160/examples/indexing/wikipedia_data.json"
},
"metadataUpdateSpec": {
"type": "db",
"connectURI": "jdbc:mysql:\/\/localhost:3306\/druid",
"user": "druid",
"password": "diurd",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "\/tmp\/segments"
}, },
"granularitySpec" : { "tuningConfig": {
"type" : "uniform", "type": "hadoop",
"gran" : "DAY", "workingPath": "\/tmp\/working_path",
"intervals" : [ "2013-08-31/2013-09-01" ] "partitionsSpec": {
}, "targetPartitionSize": 5000000
"pathSpec": { }
"type": "static",
"paths": "examples/indexing/wikipedia_data.json"
},
"rollupSpec": {
"aggs": [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"rollupGranularity": "none"
},
"workingPath": "\/tmp\/working_path",
"segmentOutputPath": "\/tmp\/segments",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"metadataUpdateSpec": {
"type": "db",
"connectURI": "jdbc:mysql:\/\/localhost:3306\/druid",
"user": "druid",
"password": "diurd",
"segmentTable": "druid_segments"
} }
} }

View File

@ -1,43 +1,76 @@
{ {
"type" : "index_hadoop", "type": "index_hadoop",
"config": { "spec": {
"dataSource" : "wikipedia", "dataSchema": {
"timestampSpec" : { "dataSource": "wikipedia",
"column": "timestamp", "parser": {
"format": "auto" "type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2013-08-31/2013-09-01"]
}
}, },
"dataSpec" : { "ioConfig": {
"format" : "json", "type": "hadoop",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"] "inputSpec": {
"type": "static",
"paths": "/myPath/druid-services-0.6.160/examples/indexing/wikipedia_data.json"
}
}, },
"granularitySpec" : { "tuningConfig": {
"type" : "uniform", "type": "hadoop",
"gran" : "DAY", "partitionsSpec": {
"intervals" : [ "2013-08-31/2013-09-01" ] "targetPartitionSize": 5000000
}, }
"pathSpec" : {
"type" : "static",
"paths" : "examples/indexing/wikipedia_data.json"
},
"targetPartitionSize" : 5000000,
"rollupSpec" : {
"aggs": [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"rollupGranularity" : "none"
} }
} }
} }

View File

@ -1,39 +1,76 @@
{ {
"type" : "index", "type": "index",
"dataSource" : "wikipedia", "spec": {
"granularitySpec" : { "dataSchema": {
"type" : "uniform", "dataSource": "wikipedia",
"gran" : "DAY", "parser": {
"intervals" : [ "2013-08-31/2013-09-01" ] "type": "string",
}, "parseSpec": {
"aggregators" : [{ "format": "json",
"type" : "count", "timestampSpec": {
"name" : "count" "column": "timestamp",
}, { "format": "auto"
"type" : "doubleSum", },
"name" : "added", "dimensionsSpec": {
"fieldName" : "added" "dimensions": [
}, { "page",
"type" : "doubleSum", "language",
"name" : "deleted", "user",
"fieldName" : "deleted" "unpatrolled",
}, { "newPage",
"type" : "doubleSum", "robot",
"name" : "delta", "anonymous",
"fieldName" : "delta" "namespace",
}], "continent",
"firehose" : { "country",
"type" : "local", "region",
"baseDir" : "examples/indexing/", "city"
"filter" : "wikipedia_data.json", ],
"parser" : { "dimensionExclusions": [],
"timestampSpec" : { "spatialDimensions": []
"column" : "timestamp" }
}
}, },
"data" : { "metricsSpec": [
"format" : "json", {
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"] "type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2013-08-31/2013-09-01"]
} }
},
"ioConfig": {
"type": "index",
"firehose": {
"type": "local",
"baseDir": "/MyPath/druid-services-0.6.160/examples/indexing/",
"filter": "wikipedia_data.json"
}
},
"tuningConfig": {
"type": "index",
"targetPartitionSize": 0,
"rowFlushBoundary": 0
} }
} }
} }

View File

@ -1,73 +1,92 @@
{ {
"type": "index_realtime", "type" : "index_realtime",
"schema": { "spec" : {
"dataSource": "wikipedia", "dataSchema": {
"aggregators": [ "dataSource": "wikipedia",
{ "parser": {
"type": "count", "type": "string",
"name": "count" "parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
}, },
{ "metricsSpec": [
"type": "doubleSum", {
"name": "added", "type": "count",
"fieldName": "added" "name": "count"
}, },
{ {
"type": "doubleSum", "type": "doubleSum",
"name": "deleted", "name": "added",
"fieldName": "deleted" "fieldName": "added"
}, },
{ {
"type": "doubleSum", "type": "doubleSum",
"name": "delta", "name": "deleted",
"fieldName": "delta" "fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE"
} }
],
"indexGranularity": "none"
},
"fireDepartmentConfig": {
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m"
},
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
}, },
"feed": "wikipedia", "ioConfig": {
"parser": { "type": "realtime",
"timestampSpec": { "firehose": {
"column": "timestamp" "type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "wikipedia"
}, },
"data": { "plumber": {
"format": "json", "type": "realtime"
"dimensions": [ }
"page", },
"language", "tuningConfig": {
"user", "type": "realtime",
"unpatrolled", "maxRowsInMemory": 500000,
"newPage", "intermediatePersistPeriod": "PT10m",
"robot", "windowPeriod": "PT10m",
"anonymous", "basePersistDirectory": "\/tmp\/realtime\/basePersist",
"namespace", "rejectionPolicy": {
"continent", "type": "serverTime"
"country",
"region",
"city"
]
} }
} }
},
"windowPeriod": "PT10m",
"segmentGranularity": "hour",
"rejectionPolicy": {
"type": "test"
} }
} }

View File

@ -1,71 +1,113 @@
{ {
"type": "index_realtime", "type": "index_realtime",
"schema": { "spec": {
"dataSource": "wikipedia", "dataSchema": {
"aggregators": [ "dataSource": "wikipedia",
{ "parser": {
"type": "count", "type": "irc",
"name": "count" "parseSpec": {
}, "format": "json",
{ "timestampSpec": {
"type": "doubleSum", "column": "timestamp",
"name": "added", "format": "iso"
"fieldName": "added" },
}, "dimensionsSpec": {
{ "dimensions": [
"type": "doubleSum", "page",
"name": "deleted", "language",
"fieldName": "deleted" "user",
}, "unpatrolled",
{ "newPage",
"type": "doubleSum", "robot",
"name": "delta", "anonymous",
"fieldName": "delta" "namespace",
} "continent",
], "country",
"indexGranularity": "none" "region",
}, "city"
"fireDepartmentConfig": { ],
"maxRowsInMemory": 500000, "dimensionExclusions": [],
"intermediatePersistPeriod": "PT10m" "spatialDimensions": []
}, }
"firehose": { },
"type": "irc", "decoder": {
"nick": "wiki1234567890", "type": "wikipedia",
"host": "irc.wikimedia.org", "namespaces": {
"channels": [ "#en.wikipedia": {
"#en.wikipedia", "_empty_": "main",
"#fr.wikipedia", "Category": "category",
"#de.wikipedia", "$1 talk": "project talk",
"#ja.wikipedia" "Template talk": "template talk",
], "Help talk": "help talk",
"decoder": { "Media": "media",
"type": "wikipedia", "MediaWiki talk": "mediawiki talk",
"namespaces": { "File talk": "file talk",
"#en.wikipedia": { "MediaWiki": "mediawiki",
"_empty_": "main", "User": "user",
"Category": "category", "File": "file",
"$1 talk": "project talk", "User talk": "user talk",
"Template talk": "template talk", "Template": "template",
"Help talk": "help talk", "Help": "help",
"Media": "media", "Special": "special",
"MediaWiki talk": "mediawiki talk", "Talk": "talk",
"File talk": "file talk", "Category talk": "category talk"
"MediaWiki": "mediawiki", }
"User": "user", }
"File": "file",
"User talk": "user talk",
"Template": "template",
"Help": "help",
"Special": "special",
"Talk": "talk",
"Category talk": "category talk"
} }
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE"
} }
}, },
"timeDimension": "timestamp", "ioConfig": {
"timeFormat": "iso" "type": "realtime",
}, "firehose": {
"windowPeriod": "PT10m", "type": "irc",
"segmentGranularity": "hour" "nick": "wiki1234567890",
"host": "irc.wikimedia.org",
"channels": [
"#en.wikipedia",
"#fr.wikipedia",
"#de.wikipedia",
"#ja.wikipedia"
]
},
"plumber": {
"type": "realtime"
}
},
"tuningConfig": {
"type": "realtime",
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "serverTime"
}
}
}
} }

View File

@ -1,12 +0,0 @@
{
"queryType": "groupBy",
"dataSource": "rabbitmqtest",
"granularity": "all",
"dimensions": [],
"aggregations": [
{ "type": "count", "name": "rows" },
{"type": "longSum", "name": "imps", "fieldName": "impressions"},
{"type": "doubleSum", "name": "wp", "fieldName": "wp"}
],
"intervals": ["2010-01-01T00:00/2020-01-01T00:00"]
}

View File

@ -1,48 +0,0 @@
[{
"schema" : {
"dataSource":"rabbitmqtest",
"aggregators":[
{"type":"count", "name":"impressions"},
{"type":"doubleSum","name":"wp","fieldName":"wp"}
],
"indexGranularity":"minute",
"shardSpec" : { "type": "none" }
},
"config" : {
"maxRowsInMemory" : 500000,
"intermediatePersistPeriod" : "PT1m"
},
"firehose" : {
"type" : "rabbitmq",
"connection" : {
"host": "localhost",
"username": "test-dude",
"password": "word-dude",
"virtualHost": "test-vhost"
},
"config" : {
"exchange": "test-exchange",
"queue" : "druidtest",
"routingKey": "#",
"durable": "true",
"exclusive": "false",
"autoDelete": "false",
"maxRetries": "10",
"retryIntervalSeconds": "1",
"maxDurationSeconds": "300"
},
"parser" : {
"timestampSpec" : { "column" : "utcdt", "format" : "iso" },
"data" : { "format" : "json" },
"dimensionExclusions" : ["wp"]
}
},
"plumber" : {
"type" : "realtime",
"windowPeriod" : "PT5m",
"segmentGranularity":"hour",
"basePersistDirectory" : "/tmp/realtime/basePersist",
"rejectionPolicy": { "type": "test" }
}
}]

View File

@ -1,19 +0,0 @@
{
"queryType": "groupBy",
"dataSource": "randSeq",
"granularity": "all",
"dimensions": [],
"aggregations":[
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "events", "name": "e"},
{ "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
],
"postAggregations":[
{ "type":"arithmetic",
"name":"avg_random",
"fn":"/",
"fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
{"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
],
"intervals":["2012-10-01T00:00/2020-01-01T00"]
}

View File

@ -1,32 +0,0 @@
[{
"schema": {
"dataSource": "randseq",
"aggregators": [
{"type": "count", "name": "events"},
{"type": "doubleSum", "name": "outColumn", "fieldName": "inColumn"}
],
"indexGranularity": "minute",
"shardSpec": {"type": "none"}
},
"config": {
"maxRowsInMemory": 50000,
"intermediatePersistPeriod": "PT10m"
},
"firehose": {
"type": "rand",
"sleepUsec": 100000,
"maxGeneratedRows": 5000000,
"seed": 0,
"nTokens": 255,
"nPerSleep": 3
},
"plumber": {
"type": "realtime",
"windowPeriod": "PT5m",
"segmentGranularity": "hour",
"basePersistDirectory": "/tmp/example/rand_realtime/basePersist"
}
}]

View File

@ -1,12 +1,24 @@
{ {
"queryType": "groupBy", "queryType": "timeseries",
"dataSource": "twitterstream", "dataSource": "twitterstream",
"granularity": "all", "granularity": "all",
"dimensions": ["lang", "utc_offset"], "aggregations": [
"aggregations":[ {
{ "type": "count", "name": "rows"}, "type": "count",
{ "type": "doubleSum", "fieldName": "tweets", "name": "tweets"} "name": "rows"
], },
"filter": { "type": "selector", "dimension": "lang", "value": "en" }, {
"intervals":["2012-10-01T00:00/2020-01-01T00"] "type": "doubleSum",
} "fieldName": "tweets",
"name": "tweets"
}
],
"filter": {
"type": "selector",
"dimension": "lang",
"value": "en"
},
"intervals": [
"2012-10-01T00:00\/2020-01-01T00"
]
}

View File

@ -1,13 +1,21 @@
{ {
"queryType": "search", "queryType": "search",
"dataSource": "twitterstream", "dataSource": "twitterstream",
"granularity": "all", "granularity": "all",
"searchDimensions": ["htags"], "searchDimensions": [
"limit": 1, "htags"
"query": { ],
"type": "fragment", "limit": 1,
"values": ["men"], "query": {
"sort": { "type": "strlen" } "type": "fragment",
}, "values": [
"intervals":["2012-10-01T00:00/2020-01-01T00"] "men"
} ],
"sort": {
"type": "strlen"
}
},
"intervals": [
"2012-10-01T00:00\/2020-01-01T00"
]
}

View File

@ -1,44 +1,119 @@
[{ [
"schema": { {
"dataSource": "twitterstream", "dataSchema": {
"aggregators": [ "dataSource": "twitterstream",
{"type": "count", "name": "tweets"}, "parser": {
{"type": "doubleSum", "fieldName": "follower_count", "name": "total_follower_count"}, "parseSpec": {
{"type": "doubleSum", "fieldName": "retweet_count", "name": "total_retweet_count" }, "format": "json",
{"type": "doubleSum", "fieldName": "friends_count", "name": "total_friends_count" }, "timestampSpec": {
{"type": "doubleSum", "fieldName": "statuses_count", "name": "total_statuses_count"}, "column": "utcdt",
"format": "iso"
{"type": "min", "fieldName": "follower_count", "name": "min_follower_count"}, },
{"type": "max", "fieldName": "follower_count", "name": "max_follower_count"}, "dimensionsSpec": {
"dimensions": [
{"type": "min", "fieldName": "friends_count", "name": "min_friends_count"},
{"type": "max", "fieldName": "friends_count", "name": "max_friends_count"}, ],
"dimensionExclusions": [
{"type": "min", "fieldName": "statuses_count", "name": "min_statuses_count"},
{"type": "max", "fieldName": "statuses_count", "name": "max_statuses_count"}, ],
"spatialDimensions": [
{"type": "min", "fieldName": "retweet_count", "name": "min_retweet_count"},
{"type": "max", "fieldName": "retweet_count", "name": "max_retweet_count"} ]
], }
"indexGranularity": "minute", }
"shardSpec": {"type": "none"} },
"metricsSpec": [
{
"type": "count",
"name": "tweets"
},
{
"type": "doubleSum",
"fieldName": "follower_count",
"name": "total_follower_count"
},
{
"type": "doubleSum",
"fieldName": "retweet_count",
"name": "total_retweet_count"
},
{
"type": "doubleSum",
"fieldName": "friends_count",
"name": "total_friends_count"
},
{
"type": "doubleSum",
"fieldName": "statuses_count",
"name": "total_statuses_count"
},
{
"type": "min",
"fieldName": "follower_count",
"name": "min_follower_count"
},
{
"type": "max",
"fieldName": "follower_count",
"name": "max_follower_count"
},
{
"type": "min",
"fieldName": "friends_count",
"name": "min_friends_count"
},
{
"type": "max",
"fieldName": "friends_count",
"name": "max_friends_count"
},
{
"type": "min",
"fieldName": "statuses_count",
"name": "min_statuses_count"
},
{
"type": "max",
"fieldName": "statuses_count",
"name": "max_statuses_count"
},
{
"type": "min",
"fieldName": "retweet_count",
"name": "min_retweet_count"
},
{
"type": "max",
"fieldName": "retweet_count",
"name": "max_retweet_count"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE"
}
}, },
"ioConfig": {
"config": { "type": "realtime",
"maxRowsInMemory": 50000, "firehose": {
"intermediatePersistPeriod": "PT2m"
},
"firehose": {
"type": "twitzer", "type": "twitzer",
"maxEventCount": 500000, "maxEventCount": 500000,
"maxRunMinutes": 120 "maxRunMinutes": 120
},
"plumber": {
"type": "realtime"
}
}, },
"tuningConfig": {
"plumber": { "type": "realtime",
"type": "realtime", "maxRowsInMemory": 500000,
"windowPeriod": "PT3m", "intermediatePersistPeriod": "PT2m",
"segmentGranularity": "hour", "windowPeriod": "PT3m",
"basePersistDirectory": "/tmp/example/twitter_realtime/basePersist" "basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "messageTime"
}
} }
}] }
]

View File

@ -1,27 +0,0 @@
{
"queryType":"groupBy",
"dataSource":"webstream",
"granularity":"minute",
"dimensions":[
"timezone"
],
"aggregations":[
{
"type":"count",
"name":"rows"
},
{
"type":"doubleSum",
"fieldName":"known_users",
"name":"known_users"
}
],
"filter":{
"type":"selector",
"dimension":"country",
"value":"US"
},
"intervals":[
"2013-06-01T00:00/2020-01-01T00"
]
}

View File

@ -1,47 +0,0 @@
[{
"schema": {
"dataSource": "webstream",
"aggregators": [
{"type": "count", "name": "rows"},
{"type": "doubleSum", "fieldName": "known_users", "name": "known_users"}
],
"indexGranularity": "second",
"shardSpec": {"type": "none"}
},
"config": {
"maxRowsInMemory": 50000,
"intermediatePersistPeriod": "PT2m"
},
"firehose": {
"type": "webstream",
"url":"http://developer.usa.gov/1usagov",
"renamedDimensions": {
"g":"bitly_hash",
"c":"country",
"a":"user",
"cy":"city",
"l":"encoding_user_login",
"hh":"short_url",
"hc":"timestamp_hash",
"h":"user_bitly_hash",
"u":"url",
"tz":"timezone",
"t":"time",
"r":"referring_url",
"gr":"geo_region",
"nk":"known_users",
"al":"accept_language"
},
"timeDimension":"t",
"timeFormat":"posix"
},
"plumber": {
"type": "realtime",
"windowPeriod": "PT3m",
"segmentGranularity": "hour",
"basePersistDirectory": "/tmp/example/usagov_realtime/basePersist"
}
}]

View File

@ -1,10 +1,7 @@
{ {
"queryType":"groupBy", "queryType":"timeseries",
"dataSource":"wikipedia", "dataSource":"wikipedia",
"granularity":"minute", "granularity":"minute",
"dimensions":[
"page"
],
"aggregations":[ "aggregations":[
{ {
"type":"count", "type":"count",

View File

@ -45,7 +45,7 @@ for delay in 5 30 30 30 30 30 30 30 30 30 30
echo "sleep for $delay seconds..." echo "sleep for $delay seconds..."
echo " " echo " "
sleep $delay sleep $delay
curl -X POST 'http://localhost:8083/druid/v2/?w' -H 'content-type: application/json' -d "`cat ${QUERY_FILE}`" curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d "`cat ${QUERY_FILE}`"
echo " " echo " "
echo " " echo " "
done done

View File

@ -1,5 +0,0 @@
## Example Prerequisite
The code in this example assumes Cassandra has been configured as deep storage for Druid.
For details on how to accomplish this, see [Cassandra Deep Storage](http://druid.io/docs/latest/Cassandra-Deep-Storage.html).

View File

@ -1 +0,0 @@
curl -sX POST "http://localhost:9090/druid/v2/?pretty=true" -H 'content-type: application/json' -d @query

View File

@ -1,19 +0,0 @@
{
"queryType": "groupBy",
"dataSource": "randSeq",
"granularity": "all",
"dimensions": [],
"aggregations":[
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "events", "name": "e"},
{ "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
],
"postAggregations":[
{ "type":"arithmetic",
"name":"avg_random",
"fn":"/",
"fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
{"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
],
"intervals":["2012-10-01T00:00/2020-01-01T00"]
}

View File

@ -1,2 +0,0 @@
CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE;
CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE;

View File

@ -1,5 +1,5 @@
# Extensions # Extensions
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.7.0","io.druid.extensions:druid-kafka-seven:0.7.0","io.druid.extensions:druid-rabbitmq:0.7.0", "io.druid.extensions:druid-s3-extensions:0.7.0"] druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.7.0","io.druid.extensions:druid-kafka-seven:0.7.0","io.druid.extensions:druid-rabbitmq:0.7.0"]
# Zookeeper # Zookeeper
druid.zk.service.host=localhost druid.zk.service.host=localhost

View File

@ -2,16 +2,9 @@ druid.host=localhost
druid.service=historical druid.service=historical
druid.port=8081 druid.port=8081
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.7.0"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
druid.server.maxSize=10000000000
# Change these to make Druid faster # Change these to make Druid faster
druid.processing.buffer.sizeBytes=100000000 druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1 druid.processing.numThreads=1
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}] druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
druid.server.maxSize=10000000000

View File

@ -23,9 +23,7 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder; import com.google.inject.Binder;
import io.druid.examples.rand.RandomFirehoseFactory;
import io.druid.examples.twitter.TwitterSpritzerFirehoseFactory; import io.druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import io.druid.examples.web.WebFirehoseFactory;
import io.druid.initialization.DruidModule; import io.druid.initialization.DruidModule;
import java.util.Arrays; import java.util.Arrays;
@ -41,9 +39,7 @@ public class ExamplesDruidModule implements DruidModule
return Arrays.<Module>asList( return Arrays.<Module>asList(
new SimpleModule("ExamplesModule") new SimpleModule("ExamplesModule")
.registerSubtypes( .registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer")
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream")
) )
); );
} }

View File

@ -1,269 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.rand;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import static java.lang.Thread.sleep;
/**
* Random value sequence Firehost Factory named "rand".
* Builds a Firehose that emits a stream of random numbers (outColumn, a positive double)
* with timestamps along with an associated token (target). This provides a timeseries
* that requires no network access for demonstration, characterization, and testing.
* The generated tuples can be thought of as asynchronously
* produced triples (timestamp, outColumn, target) where the timestamp varies depending on
* speed of processing.
*
* <p>
* InputRows are produced as fast as requested, so this can be used to determine the
* upper rate of ingest if sleepUsec is set to 0; nTokens specifies how many associated
* target labels are used. Generation is round-robin for nTokens and sleep occurs
* every nPerSleep values generated. A random number seed can be used by setting the
* firehose parameter "seed" to a non-zero value so that values can be reproducible
* (but note that timestamp is not deterministic because timestamps are obtained at
* the moment an event is delivered.)
* Values are offset by adding the modulus of the token number to the random number
* so that token values have distinct, non-overlapping ranges.
* </p>
*
* Example spec file:
* <pre>
* [{
* "schema" : { "dataSource":"randseq",
* "aggregators":[ {"type":"count", "name":"events"},
* {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
* "indexGranularity":"minute",
* "shardSpec" : { "type": "none" } },
* "config" : { "maxRowsInMemory" : 50000,
* "intermediatePersistPeriod" : "PT2m" },
*
* "firehose" : { "type" : "rand",
* "sleepUsec": 100000,
* "maxGeneratedRows" : 5000000,
* "seed" : 0,
* "nTokens" : 19,
* "nPerSleep" : 3
* },
*
* "plumber" : { "type" : "realtime",
* "windowPeriod" : "PT5m",
* "segmentGranularity":"hour",
* "basePersistDirectory" : "/tmp/realtime/basePersist" }
* }]
* </pre>
*
* Example query using POST to /druid/v2/ (where UTC date and time MUST include the current hour):
* <pre>
* {
* "queryType": "groupBy",
* "dataSource": "randSeq",
* "granularity": "all",
* "dimensions": [],
* "aggregations":[
* { "type": "count", "name": "rows"},
* { "type": "doubleSum", "fieldName": "events", "name": "e"},
* { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
* ],
* "postAggregations":[
* { "type":"arithmetic",
* "name":"avg_random",
* "fn":"/",
* "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
* {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
* ],
* "intervals":["2012-10-01T00:00/2020-01-01T00"]
* }
* </pre>
*/
@JsonTypeName("rand")
public class RandomFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private static final Logger log = new Logger(RandomFirehoseFactory.class);
/**
* msec to sleep before generating a new row; if this and delayNsec are 0, then go as fast as possible.
* json param sleepUsec (microseconds) is used to initialize this.
*/
private final long delayMsec;
/**
* nsec to sleep before generating a new row; if this and delayMsec are 0, then go as fast as possible.
* json param sleepUsec (microseconds) is used to initialize this.
*/
private final int delayNsec;
/**
* max rows to generate, -1 is infinite, 0 means nothing is generated; use this to prevent
* infinite space consumption or to see what happens when a Firehose stops delivering
* values, or to have hasMore() return false.
*/
private final long maxGeneratedRows;
/**
* seed for random number generator; if 0, then no seed is used.
*/
private final long seed;
/**
* number of tokens to randomly associate with values (no heap limits). This can be used to
* stress test the number of tokens.
*/
private final int nTokens;
/**
* Number of token events per sleep interval.
*/
private final int nPerSleep;
@JsonCreator
public RandomFirehoseFactory(
@JsonProperty("sleepUsec") Long sleepUsec,
@JsonProperty("maxGeneratedRows") Long maxGeneratedRows,
@JsonProperty("seed") Long seed,
@JsonProperty("nTokens") Integer nTokens,
@JsonProperty("nPerSleep") Integer nPerSleep
)
{
long nsec = (sleepUsec > 0) ? sleepUsec * 1000L : 0;
long msec = nsec / 1000000L;
this.delayMsec = msec;
this.delayNsec = (int) (nsec - (msec * 1000000L));
this.maxGeneratedRows = maxGeneratedRows;
this.seed = seed;
this.nTokens = nTokens;
this.nPerSleep = nPerSleep;
if (nTokens <= 0) {
log.warn("nTokens parameter " + nTokens + " ignored; must be greater than or equal to 1");
nTokens = 1;
}
if (nPerSleep <= 0) {
log.warn("nPerSleep parameter " + nPerSleep + " ignored; must be greater than or equal to 1");
nPerSleep = 1;
}
log.info("maxGeneratedRows=" + maxGeneratedRows);
log.info("seed=" + ((seed == 0L) ? "random value" : seed));
log.info("nTokens=" + nTokens);
log.info("nPerSleep=" + nPerSleep);
double dmsec = (double) delayMsec + ((double) this.delayNsec) / 1000000.;
if (dmsec > 0.0) {
log.info("sleep period=" + dmsec + "msec");
log.info(
"approximate max rate of record generation=" + (nPerSleep * 1000. / dmsec) + "/sec" +
" or " + (60. * nPerSleep * 1000. / dmsec) + "/minute"
);
} else {
log.info("sleep period= NONE");
log.info("approximate max rate of record generation= as fast as possible");
}
}
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
final LinkedList<String> dimensions = new LinkedList<String>();
dimensions.add("inColumn");
dimensions.add("target");
return new Firehose()
{
private final java.util.Random rand = (seed == 0L) ? new Random() : new Random(seed);
private long rowCount = 0L;
private boolean waitIfmaxGeneratedRows = true;
@Override
public boolean hasMore()
{
if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows) {
return waitIfmaxGeneratedRows;
} else {
return true; // there are always more random numbers
}
}
@Override
public InputRow nextRow()
{
final long modulus = rowCount % nPerSleep;
final long nth = (rowCount % nTokens) + 1;
long sleepMsec = delayMsec;
// all done?
if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows && waitIfmaxGeneratedRows) {
// sleep a long time instead of terminating
sleepMsec = 2000000000L;
}
if (sleepMsec > 0L || delayNsec > 0) {
try {
if (modulus == 0) {
sleep(sleepMsec, delayNsec);
}
}
catch (InterruptedException e) {
throw new RuntimeException("InterruptedException");
}
}
if (++rowCount % 1000 == 0) {
log.info("%,d events created.", rowCount);
}
final Map<String, Object> theMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
theMap.put("inColumn", anotherRand((int) nth));
theMap.put("target", ("a" + nth));
return new MapBasedInputRow(System.currentTimeMillis(), dimensions, theMap);
}
private Float anotherRand(int scale)
{
double f = rand.nextDouble(); // [0.0,1.0]
return new Float(f + (double) scale);
}
@Override
public Runnable commit()
{
// Do nothing.
return new Runnable()
{
@Override
public void run()
{
}
};
}
@Override
public void close() throws IOException
{
// do nothing
}
};
}
}

View File

@ -1,133 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.InputSupplier;
import com.metamx.emitter.EmittingLogger;
import io.druid.jackson.DefaultObjectMapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class InputSupplierUpdateStream implements UpdateStream
{
private static final EmittingLogger log = new EmittingLogger(InputSupplierUpdateStream.class);
private static final long queueWaitTime = 15L;
private final TypeReference<HashMap<String, Object>> typeRef;
private final InputSupplier<BufferedReader> supplier;
private final int QUEUE_SIZE = 10000;
private final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
private final ObjectMapper mapper = new DefaultObjectMapper();
private final String timeDimension;
private final Thread addToQueueThread;
public InputSupplierUpdateStream(
final InputSupplier<BufferedReader> supplier,
final String timeDimension
)
{
addToQueueThread = new Thread()
{
public void run()
{
while (!isInterrupted()) {
try {
BufferedReader reader = supplier.getInput();
String line;
while ((line = reader.readLine()) != null) {
if (isValid(line)) {
HashMap<String, Object> map = mapper.readValue(line, typeRef);
if (map.get(timeDimension) != null) {
queue.offer(map, queueWaitTime, TimeUnit.SECONDS);
log.debug("Successfully added to queue");
} else {
log.info("missing timestamp");
}
}
}
}
catch (InterruptedException e){
log.info(e, "Thread adding events to the queue interrupted");
return;
}
catch (JsonMappingException e) {
log.info(e, "Error in converting json to map");
}
catch (JsonParseException e) {
log.info(e, "Error in parsing json");
}
catch (IOException e) {
log.info(e, "Error in connecting to InputStream");
}
}
}
};
addToQueueThread.setDaemon(true);
this.supplier = supplier;
this.typeRef = new TypeReference<HashMap<String, Object>>()
{
};
this.timeDimension = timeDimension;
}
private boolean isValid(String s)
{
return !(s.isEmpty());
}
public void start()
{
addToQueueThread.start();
}
public void stop()
{
addToQueueThread.interrupt();
}
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
{
return queue.poll(waitTime, unit);
}
public int getQueueSize()
{
return queue.size();
}
public String getTimeDimension()
{
return timeDimension;
}
}

View File

@ -1,42 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.google.common.io.InputSupplier;
import java.io.BufferedReader;
public class InputSupplierUpdateStreamFactory implements UpdateStreamFactory
{
private final InputSupplier<BufferedReader> inputSupplier;
private final String timeDimension;
public InputSupplierUpdateStreamFactory(InputSupplier<BufferedReader> inputSupplier, String timeDimension)
{
this.inputSupplier = inputSupplier;
this.timeDimension = timeDimension;
}
public InputSupplierUpdateStream build()
{
return new InputSupplierUpdateStream(inputSupplier, timeDimension);
}
}

View File

@ -1,81 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class RenamingKeysUpdateStream implements UpdateStream
{
private final InputSupplierUpdateStream updateStream;
private Map<String, String> renamedDimensions;
public RenamingKeysUpdateStream(
InputSupplierUpdateStream updateStream,
Map<String, String> renamedDimensions
)
{
this.renamedDimensions = renamedDimensions;
this.updateStream = updateStream;
}
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
{
return renameKeys(updateStream.pollFromQueue(waitTime, unit));
}
private Map<String, Object> renameKeys(Map<String, Object> update)
{
if (renamedDimensions != null) {
Map<String, Object> renamedMap = Maps.newHashMap();
for (String key : renamedDimensions.keySet()) {
if (update.get(key) != null) {
Object obj = update.get(key);
renamedMap.put(renamedDimensions.get(key), obj);
}
}
return renamedMap;
} else {
return update;
}
}
public String getTimeDimension()
{
if (renamedDimensions != null && renamedDimensions.get(updateStream.getTimeDimension()) != null) {
return renamedDimensions.get(updateStream.getTimeDimension());
}
return updateStream.getTimeDimension();
}
public void start()
{
updateStream.start();
}
public void stop(){
updateStream.stop();
}
}

View File

@ -1,39 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import java.util.Map;
public class RenamingKeysUpdateStreamFactory implements UpdateStreamFactory
{
private InputSupplierUpdateStreamFactory updateStreamFactory;
private Map<String, String> renamedDimensions;
public RenamingKeysUpdateStreamFactory(InputSupplierUpdateStreamFactory updateStreamFactory, Map<String, String> renamedDimensions)
{
this.updateStreamFactory = updateStreamFactory;
this.renamedDimensions = renamedDimensions;
}
public RenamingKeysUpdateStream build()
{
return new RenamingKeysUpdateStream(updateStreamFactory.build(), renamedDimensions);
}
}

View File

@ -1,31 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public interface UpdateStream
{
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException;
public String getTimeDimension();
public void start();
public void stop();
}

View File

@ -1,24 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
public interface UpdateStreamFactory
{
public UpdateStream build();
}

View File

@ -1,136 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Throwables;
import com.metamx.common.parsers.TimestampParser;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.utils.Runnables;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@JsonTypeName("webstream")
public class WebFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class);
private final String timeFormat;
private final UpdateStreamFactory factory;
private final long queueWaitTime = 15L;
@JsonCreator
public WebFirehoseFactory(
@JsonProperty("url") String url,
@JsonProperty("renamedDimensions") Map<String, String> renamedDimensions,
@JsonProperty("timeDimension") String timeDimension,
@JsonProperty("timeFormat") String timeFormat
)
{
this(
new RenamingKeysUpdateStreamFactory(
new InputSupplierUpdateStreamFactory(new WebJsonSupplier(url), timeDimension),
renamedDimensions
), timeFormat
);
}
public WebFirehoseFactory(UpdateStreamFactory factory, String timeFormat)
{
this.factory = factory;
if (timeFormat == null) {
this.timeFormat = "auto";
} else {
this.timeFormat = timeFormat;
}
}
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
final UpdateStream updateStream = factory.build();
updateStream.start();
return new Firehose()
{
Map<String, Object> map;
private final Runnable doNothingRunnable = Runnables.getNoopRunnable();
@Override
public boolean hasMore()
{
try {
map = updateStream.pollFromQueue(queueWaitTime, TimeUnit.SECONDS);
return map != null;
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
@Override
public InputRow nextRow()
{
try {
DateTime date = TimestampParser.createTimestampParser(timeFormat)
.apply(map.get(updateStream.getTimeDimension()).toString());
return new MapBasedInputRow(
date.getMillis(),
new ArrayList(map.keySet()),
map
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
map = null;
}
}
@Override
public Runnable commit()
{
// ephemera in, ephemera out.
return doNothingRunnable; // reuse the same object each time
}
@Override
public void close() throws IOException
{
updateStream.stop();
}
};
}
}

View File

@ -1,61 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.io.InputSupplier;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.validator.routines.UrlValidator;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
public class WebJsonSupplier implements InputSupplier<BufferedReader>
{
private static final EmittingLogger log = new EmittingLogger(WebJsonSupplier.class);
private static final UrlValidator urlValidator = new UrlValidator();
private URL url;
public WebJsonSupplier(String urlString)
{
Preconditions.checkState(urlValidator.isValid(urlString));
try {
this.url = new URL(urlString);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public BufferedReader getInput() throws IOException
{
URLConnection connection = url.openConnection();
connection.setDoInput(true);
return new BufferedReader(new InputStreamReader(url.openStream(), Charsets.UTF_8));
}
}

View File

@ -1,114 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.google.common.io.InputSupplier;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class InputSupplierUpdateStreamTest
{
private final long waitTime = 1L;
private final TimeUnit unit = TimeUnit.SECONDS;
private final ArrayList<String> dimensions = new ArrayList<String>();
private InputSupplier testCaseSupplier;
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
String timeDimension;
@Before
public void setUp()
{
timeDimension = "time";
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"item2\":2,"
+ "\"time\":1372121562 }"
);
dimensions.add("item1");
dimensions.add("item2");
dimensions.add("time");
expectedAnswer.put("item1", "value1");
expectedAnswer.put("item2", 2);
expectedAnswer.put("time", 1372121562);
}
@Test
public void basicIngestionCheck() throws Exception
{
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
testCaseSupplier,
timeDimension
);
updateStream.start();
Map<String, Object> insertedRow = updateStream.pollFromQueue(waitTime, unit);
Assert.assertEquals(expectedAnswer, insertedRow);
updateStream.stop();
}
//If a timestamp is missing, we should throw away the event
@Test
public void missingTimeStampCheck()
{
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"item2\":2}"
);
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
testCaseSupplier,
timeDimension
);
updateStream.start();
Assert.assertEquals(updateStream.getQueueSize(), 0);
updateStream.stop();
}
//If any other value is missing, we should still add the event and process it properly
@Test
public void otherNullValueCheck() throws Exception
{
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"time\":1372121562 }"
);
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
testCaseSupplier,
timeDimension
);
updateStream.start();
Map<String, Object> insertedRow = updateStream.pollFromQueue(waitTime, unit);
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
expectedAnswer.put("item1", "value1");
expectedAnswer.put("time", 1372121562);
Assert.assertEquals(expectedAnswer, insertedRow);
updateStream.stop();
}
}

View File

@ -1,91 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.google.common.io.InputSupplier;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class RenamingKeysUpdateStreamTest
{
private final long waitTime = 15L;
private final TimeUnit unit = TimeUnit.SECONDS;
private InputSupplier testCaseSupplier;
String timeDimension;
@Before
public void setUp()
{
timeDimension = "time";
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"item2\":2,"
+ "\"time\":1372121562 }"
);
}
@Test
public void testPolFromQueue() throws Exception
{
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
Map<String, String> renamedKeys = new HashMap<String, String>();
renamedKeys.put("item1", "i1");
renamedKeys.put("item2", "i2");
renamedKeys.put("time", "t");
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
renamer.start();
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
expectedAnswer.put("i1", "value1");
expectedAnswer.put("i2", 2);
expectedAnswer.put("t", 1372121562);
Assert.assertEquals(expectedAnswer, renamer.pollFromQueue(waitTime, unit));
}
@Test
public void testGetTimeDimension() throws Exception
{
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
Map<String, String> renamedKeys = new HashMap<String, String>();
renamedKeys.put("item1", "i1");
renamedKeys.put("item2", "i2");
renamedKeys.put("time", "t");
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
Assert.assertEquals("t", renamer.getTimeDimension());
}
@Test
public void testMissingTimeRename() throws Exception
{
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
Map<String, String> renamedKeys = new HashMap<String, String>();
renamedKeys.put("item1", "i1");
renamedKeys.put("item2", "i2");
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
Assert.assertEquals("time", renamer.getTimeDimension());
}
}

View File

@ -1,42 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.google.common.io.InputSupplier;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
public class TestCaseSupplier implements InputSupplier<BufferedReader>
{
private final String testString;
public TestCaseSupplier(String testString)
{
this.testString = testString;
}
@Override
public BufferedReader getInput() throws IOException
{
return new BufferedReader(new StringReader(testString));
}
}

View File

@ -1,223 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class WebFirehoseFactoryTest
{
private List<String> dimensions = Lists.newArrayList();
private WebFirehoseFactory webbie;
private WebFirehoseFactory webbie1;
@Before
public void setUp() throws Exception
{
dimensions.add("item1");
dimensions.add("item2");
dimensions.add("time");
webbie = new WebFirehoseFactory(
new UpdateStreamFactory()
{
@Override
public UpdateStream build()
{
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "1372121562"));
}
},
"posix"
);
webbie1 = new WebFirehoseFactory(
new UpdateStreamFactory()
{
@Override
public UpdateStream build()
{
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "1373241600000"));
}
},
"auto"
);
}
@Test
public void testDimensions() throws Exception
{
InputRow inputRow;
Firehose firehose = webbie.connect(null);
if (firehose.hasMore()) {
inputRow = firehose.nextRow();
} else {
throw new RuntimeException("queue is empty");
}
List<String> actualAnswer = inputRow.getDimensions();
Collections.sort(actualAnswer);
Assert.assertEquals(actualAnswer, dimensions);
}
@Test
public void testPosixTimeStamp() throws Exception
{
InputRow inputRow;
Firehose firehose = webbie.connect(null);
if (firehose.hasMore()) {
inputRow = firehose.nextRow();
} else {
throw new RuntimeException("queue is empty");
}
long expectedTime = 1372121562L * 1000L;
Assert.assertEquals(expectedTime, inputRow.getTimestampFromEpoch());
}
@Test
public void testISOTimeStamp() throws Exception
{
WebFirehoseFactory webbie3 = new WebFirehoseFactory(
new UpdateStreamFactory()
{
@Override
public UpdateStream build()
{
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "2013-07-08"));
}
},
"auto"
);
Firehose firehose1 = webbie3.connect(null);
if (firehose1.hasMore()) {
long milliSeconds = firehose1.nextRow().getTimestampFromEpoch();
DateTime date = new DateTime("2013-07-08");
Assert.assertEquals(date.getMillis(), milliSeconds);
} else {
Assert.assertFalse("hasMore returned false", true);
}
}
@Test
public void testAutoIsoTimeStamp() throws Exception
{
WebFirehoseFactory webbie2 = new WebFirehoseFactory(
new UpdateStreamFactory()
{
@Override
public UpdateStream build()
{
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "2013-07-08"));
}
},
null
);
Firehose firehose2 = webbie2.connect(null);
if (firehose2.hasMore()) {
long milliSeconds = firehose2.nextRow().getTimestampFromEpoch();
DateTime date = new DateTime("2013-07-08");
Assert.assertEquals(date.getMillis(), milliSeconds);
} else {
Assert.assertFalse("hasMore returned false", true);
}
}
@Test
public void testAutoMilliSecondsTimeStamp() throws Exception
{
Firehose firehose3 = webbie1.connect(null);
if (firehose3.hasMore()) {
long milliSeconds = firehose3.nextRow().getTimestampFromEpoch();
DateTime date = new DateTime("2013-07-08");
Assert.assertEquals(date.getMillis(), milliSeconds);
} else {
Assert.assertFalse("hasMore returned false", true);
}
}
@Test
public void testGetDimension() throws Exception
{
InputRow inputRow;
Firehose firehose = webbie1.connect(null);
if (firehose.hasMore()) {
inputRow = firehose.nextRow();
} else {
throw new RuntimeException("queue is empty");
}
List<String> column1 = Lists.newArrayList();
column1.add("value1");
Assert.assertEquals(column1, inputRow.getDimension("item1"));
}
@Test
public void testGetFloatMetric() throws Exception
{
InputRow inputRow;
Firehose firehose = webbie1.connect(null);
if (firehose.hasMore()) {
inputRow = firehose.nextRow();
} else {
throw new RuntimeException("queue is empty");
}
Assert.assertEquals((float) 2.0, inputRow.getFloatMetric("item2"), 0.0f);
}
private static class MyUpdateStream implements UpdateStream
{
private static ImmutableMap<String,Object> map;
public MyUpdateStream(ImmutableMap<String,Object> map){
this.map=map;
}
@Override
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
{
return map;
}
@Override
public String getTimeDimension()
{
return "time";
}
@Override
public void start()
{
}
@Override
public void stop()
{
}
}
}

View File

@ -1,32 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.examples.web;
import org.junit.Test;
public class WebJsonSupplierTest
{
@Test(expected = IllegalStateException.class)
public void checkInvalidUrl() throws Exception
{
String invalidURL = "http://invalid.url.";
WebJsonSupplier supplier = new WebJsonSupplier(invalidURL);
}
}

Some files were not shown because too many files have changed in this diff Show More