mirror of https://github.com/apache/druid.git
Merge pull request #898 from metamx/refactor-examples
Refactor structure for examples and extensions
This commit is contained in:
commit
829c254bac
|
@ -3,7 +3,7 @@ layout: doc_page
|
|||
---
|
||||
|
||||
# Tutorial: A First Look at Druid
|
||||
Greetings! This tutorial will help clarify some core Druid concepts. We will use a realtime dataset and issue some basic Druid queries. If you are ready to explore Druid, and learn a thing or two, read on!
|
||||
Greetings! This tutorial will help clarify some core Druid concepts. We will use a real-time dataset and issue some basic Druid queries. If you are ready to explore Druid, and learn a thing or two, read on!
|
||||
|
||||
About the data
|
||||
--------------
|
||||
|
|
|
@ -1,8 +1,23 @@
|
|||
[
|
||||
{
|
||||
"schema": {
|
||||
"dataSchema" : {
|
||||
"dataSource" : "wikipedia",
|
||||
"aggregators" : [{
|
||||
"parser" : {
|
||||
"type" : "string",
|
||||
"parseSpec" : {
|
||||
"format" : "json",
|
||||
"timestampSpec" : {
|
||||
"column" : "timestamp",
|
||||
"format" : "auto"
|
||||
},
|
||||
"dimensionsSpec" : {
|
||||
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
|
||||
"dimensionExclusions" : [],
|
||||
"spatialDimensions" : []
|
||||
}
|
||||
}
|
||||
},
|
||||
"metricsSpec" : [{
|
||||
"type" : "count",
|
||||
"name" : "count"
|
||||
}, {
|
||||
|
@ -18,12 +33,14 @@
|
|||
"name" : "delta",
|
||||
"fieldName" : "delta"
|
||||
}],
|
||||
"indexGranularity": "none"
|
||||
},
|
||||
"config": {
|
||||
"maxRowsInMemory": 500000,
|
||||
"intermediatePersistPeriod": "PT10m"
|
||||
"granularitySpec" : {
|
||||
"type" : "uniform",
|
||||
"segmentGranularity" : "DAY",
|
||||
"queryGranularity" : "NONE"
|
||||
}
|
||||
},
|
||||
"ioConfig" : {
|
||||
"type" : "realtime",
|
||||
"firehose": {
|
||||
"type": "kafka-0.7.2",
|
||||
"consumerProps": {
|
||||
|
@ -36,24 +53,20 @@
|
|||
"autooffset.reset": "largest",
|
||||
"autocommit.enable": "false"
|
||||
},
|
||||
"feed": "wikipedia",
|
||||
"parser": {
|
||||
"timestampSpec": {
|
||||
"column": "timestamp"
|
||||
},
|
||||
"data": {
|
||||
"format": "json",
|
||||
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
|
||||
}
|
||||
}
|
||||
"feed": "wikipedia"
|
||||
},
|
||||
"plumber": {
|
||||
"type": "realtime"
|
||||
}
|
||||
},
|
||||
"tuningConfig": {
|
||||
"type" : "realtime",
|
||||
"maxRowsInMemory": 500000,
|
||||
"intermediatePersistPeriod": "PT10m",
|
||||
"windowPeriod": "PT10m",
|
||||
"segmentGranularity": "hour",
|
||||
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
|
||||
"rejectionPolicy": {
|
||||
"type": "test"
|
||||
"type": "messageTime"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,45 +1,67 @@
|
|||
{
|
||||
"dataSchema": {
|
||||
"dataSource": "wikipedia",
|
||||
"parser": {
|
||||
"type": "string",
|
||||
"parseSpec": {
|
||||
"format": "json",
|
||||
"timestampSpec": {
|
||||
"column": "timestamp",
|
||||
"format": "iso"
|
||||
"format": "auto"
|
||||
},
|
||||
"dataSpec": {
|
||||
"format": "json",
|
||||
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
|
||||
"dimensionsSpec": {
|
||||
"dimensions": [
|
||||
"page",
|
||||
"language",
|
||||
"user",
|
||||
"unpatrolled",
|
||||
"newPage",
|
||||
"robot",
|
||||
"anonymous",
|
||||
"namespace",
|
||||
"continent",
|
||||
"country",
|
||||
"region",
|
||||
"city"
|
||||
],
|
||||
"dimensionExclusions": [],
|
||||
"spatialDimensions": []
|
||||
}
|
||||
}
|
||||
},
|
||||
"granularitySpec" : {
|
||||
"type" : "uniform",
|
||||
"gran" : "DAY",
|
||||
"intervals" : [ "2013-08-31/2013-09-01" ]
|
||||
},
|
||||
"pathSpec": {
|
||||
"type": "static",
|
||||
"paths": "examples/indexing/wikipedia_data.json"
|
||||
},
|
||||
"rollupSpec": {
|
||||
"aggs": [{
|
||||
"metricsSpec": [
|
||||
{
|
||||
"type": "count",
|
||||
"name": "count"
|
||||
}, {
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "added",
|
||||
"fieldName": "added"
|
||||
}, {
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "deleted",
|
||||
"fieldName": "deleted"
|
||||
}, {
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "delta",
|
||||
"fieldName": "delta"
|
||||
}],
|
||||
"rollupGranularity": "none"
|
||||
}
|
||||
],
|
||||
"granularitySpec": {
|
||||
"type": "uniform",
|
||||
"segmentGranularity": "DAY",
|
||||
"queryGranularity": "NONE",
|
||||
"intervals": ["2013-08-31/2013-09-01"]
|
||||
}
|
||||
},
|
||||
"workingPath": "\/tmp\/working_path",
|
||||
"segmentOutputPath": "\/tmp\/segments",
|
||||
"partitionsSpec": {
|
||||
"targetPartitionSize": 5000000
|
||||
"ioConfig": {
|
||||
"type": "hadoop",
|
||||
"inputSpec": {
|
||||
"type": "static",
|
||||
"paths": "/myPath/druid-services-0.6.160/examples/indexing/wikipedia_data.json"
|
||||
},
|
||||
"metadataUpdateSpec": {
|
||||
"type": "db",
|
||||
|
@ -47,5 +69,14 @@
|
|||
"user": "druid",
|
||||
"password": "diurd",
|
||||
"segmentTable": "druid_segments"
|
||||
},
|
||||
"segmentOutputPath": "\/tmp\/segments"
|
||||
},
|
||||
"tuningConfig": {
|
||||
"type": "hadoop",
|
||||
"workingPath": "\/tmp\/working_path",
|
||||
"partitionsSpec": {
|
||||
"targetPartitionSize": 5000000
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,43 +1,76 @@
|
|||
{
|
||||
"type": "index_hadoop",
|
||||
"config": {
|
||||
"spec": {
|
||||
"dataSchema": {
|
||||
"dataSource": "wikipedia",
|
||||
"parser": {
|
||||
"type": "string",
|
||||
"parseSpec": {
|
||||
"format": "json",
|
||||
"timestampSpec": {
|
||||
"column": "timestamp",
|
||||
"format": "auto"
|
||||
},
|
||||
"dataSpec" : {
|
||||
"format" : "json",
|
||||
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
|
||||
"dimensionsSpec": {
|
||||
"dimensions": [
|
||||
"page",
|
||||
"language",
|
||||
"user",
|
||||
"unpatrolled",
|
||||
"newPage",
|
||||
"robot",
|
||||
"anonymous",
|
||||
"namespace",
|
||||
"continent",
|
||||
"country",
|
||||
"region",
|
||||
"city"
|
||||
],
|
||||
"dimensionExclusions": [],
|
||||
"spatialDimensions": []
|
||||
}
|
||||
}
|
||||
},
|
||||
"granularitySpec" : {
|
||||
"type" : "uniform",
|
||||
"gran" : "DAY",
|
||||
"intervals" : [ "2013-08-31/2013-09-01" ]
|
||||
},
|
||||
"pathSpec" : {
|
||||
"type" : "static",
|
||||
"paths" : "examples/indexing/wikipedia_data.json"
|
||||
},
|
||||
"targetPartitionSize" : 5000000,
|
||||
"rollupSpec" : {
|
||||
"aggs": [{
|
||||
"metricsSpec": [
|
||||
{
|
||||
"type": "count",
|
||||
"name": "count"
|
||||
}, {
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "added",
|
||||
"fieldName": "added"
|
||||
}, {
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "deleted",
|
||||
"fieldName": "deleted"
|
||||
}, {
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "delta",
|
||||
"fieldName": "delta"
|
||||
}],
|
||||
"rollupGranularity" : "none"
|
||||
}
|
||||
],
|
||||
"granularitySpec": {
|
||||
"type": "uniform",
|
||||
"segmentGranularity": "DAY",
|
||||
"queryGranularity": "NONE",
|
||||
"intervals": ["2013-08-31/2013-09-01"]
|
||||
}
|
||||
},
|
||||
"ioConfig": {
|
||||
"type": "hadoop",
|
||||
"inputSpec": {
|
||||
"type": "static",
|
||||
"paths": "/myPath/druid-services-0.6.160/examples/indexing/wikipedia_data.json"
|
||||
}
|
||||
},
|
||||
"tuningConfig": {
|
||||
"type": "hadoop",
|
||||
"partitionsSpec": {
|
||||
"targetPartitionSize": 5000000
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,39 +1,76 @@
|
|||
{
|
||||
"type": "index",
|
||||
"spec": {
|
||||
"dataSchema": {
|
||||
"dataSource": "wikipedia",
|
||||
"granularitySpec" : {
|
||||
"type" : "uniform",
|
||||
"gran" : "DAY",
|
||||
"intervals" : [ "2013-08-31/2013-09-01" ]
|
||||
"parser": {
|
||||
"type": "string",
|
||||
"parseSpec": {
|
||||
"format": "json",
|
||||
"timestampSpec": {
|
||||
"column": "timestamp",
|
||||
"format": "auto"
|
||||
},
|
||||
"aggregators" : [{
|
||||
"dimensionsSpec": {
|
||||
"dimensions": [
|
||||
"page",
|
||||
"language",
|
||||
"user",
|
||||
"unpatrolled",
|
||||
"newPage",
|
||||
"robot",
|
||||
"anonymous",
|
||||
"namespace",
|
||||
"continent",
|
||||
"country",
|
||||
"region",
|
||||
"city"
|
||||
],
|
||||
"dimensionExclusions": [],
|
||||
"spatialDimensions": []
|
||||
}
|
||||
}
|
||||
},
|
||||
"metricsSpec": [
|
||||
{
|
||||
"type": "count",
|
||||
"name": "count"
|
||||
}, {
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "added",
|
||||
"fieldName": "added"
|
||||
}, {
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "deleted",
|
||||
"fieldName": "deleted"
|
||||
}, {
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "delta",
|
||||
"fieldName": "delta"
|
||||
}],
|
||||
}
|
||||
],
|
||||
"granularitySpec": {
|
||||
"type": "uniform",
|
||||
"segmentGranularity": "DAY",
|
||||
"queryGranularity": "NONE",
|
||||
"intervals": ["2013-08-31/2013-09-01"]
|
||||
}
|
||||
},
|
||||
"ioConfig": {
|
||||
"type": "index",
|
||||
"firehose": {
|
||||
"type": "local",
|
||||
"baseDir" : "examples/indexing/",
|
||||
"filter" : "wikipedia_data.json",
|
||||
"parser" : {
|
||||
"timestampSpec" : {
|
||||
"column" : "timestamp"
|
||||
"baseDir": "/MyPath/druid-services-0.6.160/examples/indexing/",
|
||||
"filter": "wikipedia_data.json"
|
||||
}
|
||||
},
|
||||
"data" : {
|
||||
"format" : "json",
|
||||
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
|
||||
}
|
||||
"tuningConfig": {
|
||||
"type": "index",
|
||||
"targetPartitionSize": 0,
|
||||
"rowFlushBoundary": 0
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,8 +1,37 @@
|
|||
{
|
||||
"type" : "index_realtime",
|
||||
"schema": {
|
||||
"spec" : {
|
||||
"dataSchema": {
|
||||
"dataSource": "wikipedia",
|
||||
"aggregators": [
|
||||
"parser": {
|
||||
"type": "string",
|
||||
"parseSpec": {
|
||||
"format": "json",
|
||||
"timestampSpec": {
|
||||
"column": "timestamp",
|
||||
"format": "auto"
|
||||
},
|
||||
"dimensionsSpec": {
|
||||
"dimensions": [
|
||||
"page",
|
||||
"language",
|
||||
"user",
|
||||
"unpatrolled",
|
||||
"newPage",
|
||||
"robot",
|
||||
"anonymous",
|
||||
"namespace",
|
||||
"continent",
|
||||
"country",
|
||||
"region",
|
||||
"city"
|
||||
],
|
||||
"dimensionExclusions": [],
|
||||
"spatialDimensions": []
|
||||
}
|
||||
}
|
||||
},
|
||||
"metricsSpec": [
|
||||
{
|
||||
"type": "count",
|
||||
"name": "count"
|
||||
|
@ -23,12 +52,14 @@
|
|||
"fieldName": "delta"
|
||||
}
|
||||
],
|
||||
"indexGranularity": "none"
|
||||
},
|
||||
"fireDepartmentConfig": {
|
||||
"maxRowsInMemory": 500000,
|
||||
"intermediatePersistPeriod": "PT10m"
|
||||
"granularitySpec": {
|
||||
"type": "uniform",
|
||||
"segmentGranularity": "DAY",
|
||||
"queryGranularity": "NONE"
|
||||
}
|
||||
},
|
||||
"ioConfig": {
|
||||
"type": "realtime",
|
||||
"firehose": {
|
||||
"type": "kafka-0.7.2",
|
||||
"consumerProps": {
|
||||
|
@ -41,33 +72,21 @@
|
|||
"autooffset.reset": "largest",
|
||||
"autocommit.enable": "false"
|
||||
},
|
||||
"feed": "wikipedia",
|
||||
"parser": {
|
||||
"timestampSpec": {
|
||||
"column": "timestamp"
|
||||
"feed": "wikipedia"
|
||||
},
|
||||
"data": {
|
||||
"format": "json",
|
||||
"dimensions": [
|
||||
"page",
|
||||
"language",
|
||||
"user",
|
||||
"unpatrolled",
|
||||
"newPage",
|
||||
"robot",
|
||||
"anonymous",
|
||||
"namespace",
|
||||
"continent",
|
||||
"country",
|
||||
"region",
|
||||
"city"
|
||||
]
|
||||
}
|
||||
"plumber": {
|
||||
"type": "realtime"
|
||||
}
|
||||
},
|
||||
"tuningConfig": {
|
||||
"type": "realtime",
|
||||
"maxRowsInMemory": 500000,
|
||||
"intermediatePersistPeriod": "PT10m",
|
||||
"windowPeriod": "PT10m",
|
||||
"segmentGranularity": "hour",
|
||||
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
|
||||
"rejectionPolicy": {
|
||||
"type": "test"
|
||||
"type": "serverTime"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,44 +1,35 @@
|
|||
{
|
||||
"type": "index_realtime",
|
||||
"schema": {
|
||||
"spec": {
|
||||
"dataSchema": {
|
||||
"dataSource": "wikipedia",
|
||||
"aggregators": [
|
||||
{
|
||||
"type": "count",
|
||||
"name": "count"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "added",
|
||||
"fieldName": "added"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "deleted",
|
||||
"fieldName": "deleted"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "delta",
|
||||
"fieldName": "delta"
|
||||
}
|
||||
],
|
||||
"indexGranularity": "none"
|
||||
},
|
||||
"fireDepartmentConfig": {
|
||||
"maxRowsInMemory": 500000,
|
||||
"intermediatePersistPeriod": "PT10m"
|
||||
},
|
||||
"firehose": {
|
||||
"parser": {
|
||||
"type": "irc",
|
||||
"nick": "wiki1234567890",
|
||||
"host": "irc.wikimedia.org",
|
||||
"channels": [
|
||||
"#en.wikipedia",
|
||||
"#fr.wikipedia",
|
||||
"#de.wikipedia",
|
||||
"#ja.wikipedia"
|
||||
"parseSpec": {
|
||||
"format": "json",
|
||||
"timestampSpec": {
|
||||
"column": "timestamp",
|
||||
"format": "iso"
|
||||
},
|
||||
"dimensionsSpec": {
|
||||
"dimensions": [
|
||||
"page",
|
||||
"language",
|
||||
"user",
|
||||
"unpatrolled",
|
||||
"newPage",
|
||||
"robot",
|
||||
"anonymous",
|
||||
"namespace",
|
||||
"continent",
|
||||
"country",
|
||||
"region",
|
||||
"city"
|
||||
],
|
||||
"dimensionExclusions": [],
|
||||
"spatialDimensions": []
|
||||
}
|
||||
},
|
||||
"decoder": {
|
||||
"type": "wikipedia",
|
||||
"namespaces": {
|
||||
|
@ -62,10 +53,61 @@
|
|||
"Category talk": "category talk"
|
||||
}
|
||||
}
|
||||
},
|
||||
"timeDimension": "timestamp",
|
||||
"timeFormat": "iso"
|
||||
},
|
||||
"windowPeriod": "PT10m",
|
||||
"segmentGranularity": "hour"
|
||||
}
|
||||
},
|
||||
"metricsSpec": [
|
||||
{
|
||||
"type": "count",
|
||||
"name": "count"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "added",
|
||||
"fieldName": "added"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "deleted",
|
||||
"fieldName": "deleted"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "delta",
|
||||
"fieldName": "delta"
|
||||
}
|
||||
],
|
||||
"granularitySpec": {
|
||||
"type": "uniform",
|
||||
"segmentGranularity": "DAY",
|
||||
"queryGranularity": "NONE"
|
||||
}
|
||||
},
|
||||
"ioConfig": {
|
||||
"type": "realtime",
|
||||
"firehose": {
|
||||
"type": "irc",
|
||||
"nick": "wiki1234567890",
|
||||
"host": "irc.wikimedia.org",
|
||||
"channels": [
|
||||
"#en.wikipedia",
|
||||
"#fr.wikipedia",
|
||||
"#de.wikipedia",
|
||||
"#ja.wikipedia"
|
||||
]
|
||||
},
|
||||
"plumber": {
|
||||
"type": "realtime"
|
||||
}
|
||||
},
|
||||
"tuningConfig": {
|
||||
"type": "realtime",
|
||||
"maxRowsInMemory": 500000,
|
||||
"intermediatePersistPeriod": "PT10m",
|
||||
"windowPeriod": "PT10m",
|
||||
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
|
||||
"rejectionPolicy": {
|
||||
"type": "serverTime"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,12 +0,0 @@
|
|||
{
|
||||
"queryType": "groupBy",
|
||||
"dataSource": "rabbitmqtest",
|
||||
"granularity": "all",
|
||||
"dimensions": [],
|
||||
"aggregations": [
|
||||
{ "type": "count", "name": "rows" },
|
||||
{"type": "longSum", "name": "imps", "fieldName": "impressions"},
|
||||
{"type": "doubleSum", "name": "wp", "fieldName": "wp"}
|
||||
],
|
||||
"intervals": ["2010-01-01T00:00/2020-01-01T00:00"]
|
||||
}
|
|
@ -1,48 +0,0 @@
|
|||
[{
|
||||
"schema" : {
|
||||
"dataSource":"rabbitmqtest",
|
||||
"aggregators":[
|
||||
{"type":"count", "name":"impressions"},
|
||||
{"type":"doubleSum","name":"wp","fieldName":"wp"}
|
||||
],
|
||||
"indexGranularity":"minute",
|
||||
"shardSpec" : { "type": "none" }
|
||||
},
|
||||
"config" : {
|
||||
"maxRowsInMemory" : 500000,
|
||||
"intermediatePersistPeriod" : "PT1m"
|
||||
},
|
||||
"firehose" : {
|
||||
"type" : "rabbitmq",
|
||||
"connection" : {
|
||||
"host": "localhost",
|
||||
"username": "test-dude",
|
||||
"password": "word-dude",
|
||||
"virtualHost": "test-vhost"
|
||||
},
|
||||
"config" : {
|
||||
"exchange": "test-exchange",
|
||||
"queue" : "druidtest",
|
||||
"routingKey": "#",
|
||||
"durable": "true",
|
||||
"exclusive": "false",
|
||||
"autoDelete": "false",
|
||||
|
||||
"maxRetries": "10",
|
||||
"retryIntervalSeconds": "1",
|
||||
"maxDurationSeconds": "300"
|
||||
},
|
||||
"parser" : {
|
||||
"timestampSpec" : { "column" : "utcdt", "format" : "iso" },
|
||||
"data" : { "format" : "json" },
|
||||
"dimensionExclusions" : ["wp"]
|
||||
}
|
||||
},
|
||||
"plumber" : {
|
||||
"type" : "realtime",
|
||||
"windowPeriod" : "PT5m",
|
||||
"segmentGranularity":"hour",
|
||||
"basePersistDirectory" : "/tmp/realtime/basePersist",
|
||||
"rejectionPolicy": { "type": "test" }
|
||||
}
|
||||
}]
|
|
@ -1,19 +0,0 @@
|
|||
{
|
||||
"queryType": "groupBy",
|
||||
"dataSource": "randSeq",
|
||||
"granularity": "all",
|
||||
"dimensions": [],
|
||||
"aggregations":[
|
||||
{ "type": "count", "name": "rows"},
|
||||
{ "type": "doubleSum", "fieldName": "events", "name": "e"},
|
||||
{ "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
|
||||
],
|
||||
"postAggregations":[
|
||||
{ "type":"arithmetic",
|
||||
"name":"avg_random",
|
||||
"fn":"/",
|
||||
"fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
|
||||
{"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
|
||||
],
|
||||
"intervals":["2012-10-01T00:00/2020-01-01T00"]
|
||||
}
|
|
@ -1,32 +0,0 @@
|
|||
[{
|
||||
"schema": {
|
||||
"dataSource": "randseq",
|
||||
"aggregators": [
|
||||
{"type": "count", "name": "events"},
|
||||
{"type": "doubleSum", "name": "outColumn", "fieldName": "inColumn"}
|
||||
],
|
||||
"indexGranularity": "minute",
|
||||
"shardSpec": {"type": "none"}
|
||||
},
|
||||
|
||||
"config": {
|
||||
"maxRowsInMemory": 50000,
|
||||
"intermediatePersistPeriod": "PT10m"
|
||||
},
|
||||
|
||||
"firehose": {
|
||||
"type": "rand",
|
||||
"sleepUsec": 100000,
|
||||
"maxGeneratedRows": 5000000,
|
||||
"seed": 0,
|
||||
"nTokens": 255,
|
||||
"nPerSleep": 3
|
||||
},
|
||||
|
||||
"plumber": {
|
||||
"type": "realtime",
|
||||
"windowPeriod": "PT5m",
|
||||
"segmentGranularity": "hour",
|
||||
"basePersistDirectory": "/tmp/example/rand_realtime/basePersist"
|
||||
}
|
||||
}]
|
|
@ -1,12 +1,24 @@
|
|||
{
|
||||
"queryType": "groupBy",
|
||||
"queryType": "timeseries",
|
||||
"dataSource": "twitterstream",
|
||||
"granularity": "all",
|
||||
"dimensions": ["lang", "utc_offset"],
|
||||
"aggregations": [
|
||||
{ "type": "count", "name": "rows"},
|
||||
{ "type": "doubleSum", "fieldName": "tweets", "name": "tweets"}
|
||||
],
|
||||
"filter": { "type": "selector", "dimension": "lang", "value": "en" },
|
||||
"intervals":["2012-10-01T00:00/2020-01-01T00"]
|
||||
{
|
||||
"type": "count",
|
||||
"name": "rows"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"fieldName": "tweets",
|
||||
"name": "tweets"
|
||||
}
|
||||
],
|
||||
"filter": {
|
||||
"type": "selector",
|
||||
"dimension": "lang",
|
||||
"value": "en"
|
||||
},
|
||||
"intervals": [
|
||||
"2012-10-01T00:00\/2020-01-01T00"
|
||||
]
|
||||
}
|
|
@ -2,12 +2,20 @@
|
|||
"queryType": "search",
|
||||
"dataSource": "twitterstream",
|
||||
"granularity": "all",
|
||||
"searchDimensions": ["htags"],
|
||||
"searchDimensions": [
|
||||
"htags"
|
||||
],
|
||||
"limit": 1,
|
||||
"query": {
|
||||
"type": "fragment",
|
||||
"values": ["men"],
|
||||
"sort": { "type": "strlen" }
|
||||
},
|
||||
"intervals":["2012-10-01T00:00/2020-01-01T00"]
|
||||
"values": [
|
||||
"men"
|
||||
],
|
||||
"sort": {
|
||||
"type": "strlen"
|
||||
}
|
||||
},
|
||||
"intervals": [
|
||||
"2012-10-01T00:00\/2020-01-01T00"
|
||||
]
|
||||
}
|
|
@ -1,44 +1,119 @@
|
|||
[{
|
||||
"schema": {
|
||||
[
|
||||
{
|
||||
"dataSchema": {
|
||||
"dataSource": "twitterstream",
|
||||
"aggregators": [
|
||||
{"type": "count", "name": "tweets"},
|
||||
{"type": "doubleSum", "fieldName": "follower_count", "name": "total_follower_count"},
|
||||
{"type": "doubleSum", "fieldName": "retweet_count", "name": "total_retweet_count" },
|
||||
{"type": "doubleSum", "fieldName": "friends_count", "name": "total_friends_count" },
|
||||
{"type": "doubleSum", "fieldName": "statuses_count", "name": "total_statuses_count"},
|
||||
"parser": {
|
||||
"parseSpec": {
|
||||
"format": "json",
|
||||
"timestampSpec": {
|
||||
"column": "utcdt",
|
||||
"format": "iso"
|
||||
},
|
||||
"dimensionsSpec": {
|
||||
"dimensions": [
|
||||
|
||||
{"type": "min", "fieldName": "follower_count", "name": "min_follower_count"},
|
||||
{"type": "max", "fieldName": "follower_count", "name": "max_follower_count"},
|
||||
|
||||
{"type": "min", "fieldName": "friends_count", "name": "min_friends_count"},
|
||||
{"type": "max", "fieldName": "friends_count", "name": "max_friends_count"},
|
||||
|
||||
{"type": "min", "fieldName": "statuses_count", "name": "min_statuses_count"},
|
||||
{"type": "max", "fieldName": "statuses_count", "name": "max_statuses_count"},
|
||||
|
||||
{"type": "min", "fieldName": "retweet_count", "name": "min_retweet_count"},
|
||||
{"type": "max", "fieldName": "retweet_count", "name": "max_retweet_count"}
|
||||
],
|
||||
"indexGranularity": "minute",
|
||||
"shardSpec": {"type": "none"}
|
||||
},
|
||||
"dimensionExclusions": [
|
||||
|
||||
"config": {
|
||||
"maxRowsInMemory": 50000,
|
||||
"intermediatePersistPeriod": "PT2m"
|
||||
},
|
||||
],
|
||||
"spatialDimensions": [
|
||||
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"metricsSpec": [
|
||||
{
|
||||
"type": "count",
|
||||
"name": "tweets"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"fieldName": "follower_count",
|
||||
"name": "total_follower_count"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"fieldName": "retweet_count",
|
||||
"name": "total_retweet_count"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"fieldName": "friends_count",
|
||||
"name": "total_friends_count"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"fieldName": "statuses_count",
|
||||
"name": "total_statuses_count"
|
||||
},
|
||||
{
|
||||
"type": "min",
|
||||
"fieldName": "follower_count",
|
||||
"name": "min_follower_count"
|
||||
},
|
||||
{
|
||||
"type": "max",
|
||||
"fieldName": "follower_count",
|
||||
"name": "max_follower_count"
|
||||
},
|
||||
{
|
||||
"type": "min",
|
||||
"fieldName": "friends_count",
|
||||
"name": "min_friends_count"
|
||||
},
|
||||
{
|
||||
"type": "max",
|
||||
"fieldName": "friends_count",
|
||||
"name": "max_friends_count"
|
||||
},
|
||||
{
|
||||
"type": "min",
|
||||
"fieldName": "statuses_count",
|
||||
"name": "min_statuses_count"
|
||||
},
|
||||
{
|
||||
"type": "max",
|
||||
"fieldName": "statuses_count",
|
||||
"name": "max_statuses_count"
|
||||
},
|
||||
{
|
||||
"type": "min",
|
||||
"fieldName": "retweet_count",
|
||||
"name": "min_retweet_count"
|
||||
},
|
||||
{
|
||||
"type": "max",
|
||||
"fieldName": "retweet_count",
|
||||
"name": "max_retweet_count"
|
||||
}
|
||||
],
|
||||
"granularitySpec": {
|
||||
"type": "uniform",
|
||||
"segmentGranularity": "DAY",
|
||||
"queryGranularity": "NONE"
|
||||
}
|
||||
},
|
||||
"ioConfig": {
|
||||
"type": "realtime",
|
||||
"firehose": {
|
||||
"type": "twitzer",
|
||||
"maxEventCount": 500000,
|
||||
"maxRunMinutes": 120
|
||||
},
|
||||
|
||||
"plumber": {
|
||||
"type": "realtime",
|
||||
"windowPeriod": "PT3m",
|
||||
"segmentGranularity": "hour",
|
||||
"basePersistDirectory": "/tmp/example/twitter_realtime/basePersist"
|
||||
"type": "realtime"
|
||||
}
|
||||
}]
|
||||
},
|
||||
"tuningConfig": {
|
||||
"type": "realtime",
|
||||
"maxRowsInMemory": 500000,
|
||||
"intermediatePersistPeriod": "PT2m",
|
||||
"windowPeriod": "PT3m",
|
||||
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
|
||||
"rejectionPolicy": {
|
||||
"type": "messageTime"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
|
@ -1,27 +0,0 @@
|
|||
{
|
||||
"queryType":"groupBy",
|
||||
"dataSource":"webstream",
|
||||
"granularity":"minute",
|
||||
"dimensions":[
|
||||
"timezone"
|
||||
],
|
||||
"aggregations":[
|
||||
{
|
||||
"type":"count",
|
||||
"name":"rows"
|
||||
},
|
||||
{
|
||||
"type":"doubleSum",
|
||||
"fieldName":"known_users",
|
||||
"name":"known_users"
|
||||
}
|
||||
],
|
||||
"filter":{
|
||||
"type":"selector",
|
||||
"dimension":"country",
|
||||
"value":"US"
|
||||
},
|
||||
"intervals":[
|
||||
"2013-06-01T00:00/2020-01-01T00"
|
||||
]
|
||||
}
|
|
@ -1,47 +0,0 @@
|
|||
[{
|
||||
"schema": {
|
||||
"dataSource": "webstream",
|
||||
"aggregators": [
|
||||
{"type": "count", "name": "rows"},
|
||||
{"type": "doubleSum", "fieldName": "known_users", "name": "known_users"}
|
||||
],
|
||||
"indexGranularity": "second",
|
||||
"shardSpec": {"type": "none"}
|
||||
},
|
||||
|
||||
"config": {
|
||||
"maxRowsInMemory": 50000,
|
||||
"intermediatePersistPeriod": "PT2m"
|
||||
},
|
||||
|
||||
"firehose": {
|
||||
"type": "webstream",
|
||||
"url":"http://developer.usa.gov/1usagov",
|
||||
"renamedDimensions": {
|
||||
"g":"bitly_hash",
|
||||
"c":"country",
|
||||
"a":"user",
|
||||
"cy":"city",
|
||||
"l":"encoding_user_login",
|
||||
"hh":"short_url",
|
||||
"hc":"timestamp_hash",
|
||||
"h":"user_bitly_hash",
|
||||
"u":"url",
|
||||
"tz":"timezone",
|
||||
"t":"time",
|
||||
"r":"referring_url",
|
||||
"gr":"geo_region",
|
||||
"nk":"known_users",
|
||||
"al":"accept_language"
|
||||
},
|
||||
"timeDimension":"t",
|
||||
"timeFormat":"posix"
|
||||
},
|
||||
|
||||
"plumber": {
|
||||
"type": "realtime",
|
||||
"windowPeriod": "PT3m",
|
||||
"segmentGranularity": "hour",
|
||||
"basePersistDirectory": "/tmp/example/usagov_realtime/basePersist"
|
||||
}
|
||||
}]
|
|
@ -1,10 +1,7 @@
|
|||
{
|
||||
"queryType":"groupBy",
|
||||
"queryType":"timeseries",
|
||||
"dataSource":"wikipedia",
|
||||
"granularity":"minute",
|
||||
"dimensions":[
|
||||
"page"
|
||||
],
|
||||
"aggregations":[
|
||||
{
|
||||
"type":"count",
|
||||
|
|
|
@ -1,36 +1,38 @@
|
|||
[{
|
||||
"schema": {
|
||||
"dataSchema": {
|
||||
"dataSource": "wikipedia",
|
||||
"aggregators": [
|
||||
{"type": "count", "name": "count"},
|
||||
{"type": "longSum", "fieldName": "added", "name": "added"},
|
||||
{"type": "longSum", "fieldName": "deleted", "name": "deleted"},
|
||||
{"type": "longSum", "fieldName": "delta", "name": "delta"}
|
||||
],
|
||||
"indexGranularity": "minute",
|
||||
"shardSpec": {"type": "none"}
|
||||
},
|
||||
|
||||
"config": {
|
||||
"maxRowsInMemory": 50000,
|
||||
"intermediatePersistPeriod": "PT2m"
|
||||
},
|
||||
|
||||
"firehose": {
|
||||
"parser": {
|
||||
"type": "irc",
|
||||
"nick": "wiki1234567890",
|
||||
"host": "irc.wikimedia.org",
|
||||
"channels": [
|
||||
"#en.wikipedia",
|
||||
"#fr.wikipedia",
|
||||
"#de.wikipedia",
|
||||
"#ja.wikipedia"
|
||||
"parseSpec": {
|
||||
"format": "json",
|
||||
"timestampSpec": {
|
||||
"column": "timestamp",
|
||||
"format": "iso"
|
||||
},
|
||||
"dimensionsSpec": {
|
||||
"dimensions": [
|
||||
"page",
|
||||
"language",
|
||||
"user",
|
||||
"unpatrolled",
|
||||
"newPage",
|
||||
"robot",
|
||||
"anonymous",
|
||||
"namespace",
|
||||
"continent",
|
||||
"country",
|
||||
"region",
|
||||
"city"
|
||||
],
|
||||
"dimensionExclusions": [],
|
||||
"spatialDimensions": []
|
||||
}
|
||||
},
|
||||
"decoder": {
|
||||
"type": "wikipedia",
|
||||
"namespaces": {
|
||||
"#en.wikipedia": {
|
||||
"": "main",
|
||||
"_empty_": "main",
|
||||
"Category": "category",
|
||||
"$1 talk": "project talk",
|
||||
"Template talk": "template talk",
|
||||
|
@ -49,15 +51,60 @@
|
|||
"Category talk": "category talk"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"timeDimension":"timestamp",
|
||||
"timeFormat":"iso"
|
||||
"metricsSpec": [
|
||||
{
|
||||
"type": "count",
|
||||
"name": "count"
|
||||
},
|
||||
|
||||
"plumber": {
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "added",
|
||||
"fieldName": "added"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "deleted",
|
||||
"fieldName": "deleted"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "delta",
|
||||
"fieldName": "delta"
|
||||
}
|
||||
],
|
||||
"granularitySpec": {
|
||||
"type": "uniform",
|
||||
"segmentGranularity": "DAY",
|
||||
"queryGranularity": "NONE"
|
||||
}
|
||||
},
|
||||
"ioConfig": {
|
||||
"type": "realtime",
|
||||
"windowPeriod": "PT3m",
|
||||
"segmentGranularity": "hour",
|
||||
"basePersistDirectory": "/tmp/example/wikipedia/basePersist"
|
||||
"firehose": {
|
||||
"type": "irc",
|
||||
"nick": "wiki1234567890",
|
||||
"host": "irc.wikimedia.org",
|
||||
"channels": [
|
||||
"#en.wikipedia",
|
||||
"#fr.wikipedia",
|
||||
"#de.wikipedia",
|
||||
"#ja.wikipedia"
|
||||
]
|
||||
},
|
||||
"plumber": {
|
||||
"type": "realtime"
|
||||
}
|
||||
},
|
||||
"tuningConfig": {
|
||||
"type": "realtime",
|
||||
"maxRowsInMemory": 500000,
|
||||
"intermediatePersistPeriod": "PT10m",
|
||||
"windowPeriod": "PT10m",
|
||||
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
|
||||
"rejectionPolicy": {
|
||||
"type": "serverTime"
|
||||
}
|
||||
}
|
||||
}]
|
||||
|
|
|
@ -45,7 +45,7 @@ for delay in 5 30 30 30 30 30 30 30 30 30 30
|
|||
echo "sleep for $delay seconds..."
|
||||
echo " "
|
||||
sleep $delay
|
||||
curl -X POST 'http://localhost:8083/druid/v2/?w' -H 'content-type: application/json' -d "`cat ${QUERY_FILE}`"
|
||||
curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d "`cat ${QUERY_FILE}`"
|
||||
echo " "
|
||||
echo " "
|
||||
done
|
||||
|
|
|
@ -58,7 +58,7 @@ DRUID_CP=${EXAMPLE_LOC}
|
|||
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/../config/realtime
|
||||
#For the kit
|
||||
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/lib/*
|
||||
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/_global
|
||||
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/_common
|
||||
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/realtime
|
||||
|
||||
echo "Running command:"
|
||||
|
|
|
@ -1,5 +0,0 @@
|
|||
## Example Prerequisite
|
||||
|
||||
The code in this example assumes Cassandra has been configured as deep storage for Druid.
|
||||
|
||||
For details on how to accomplish this, see [Cassandra Deep Storage](http://druid.io/docs/latest/Cassandra-Deep-Storage.html).
|
|
@ -1 +0,0 @@
|
|||
curl -sX POST "http://localhost:9090/druid/v2/?pretty=true" -H 'content-type: application/json' -d @query
|
|
@ -1,19 +0,0 @@
|
|||
{
|
||||
"queryType": "groupBy",
|
||||
"dataSource": "randSeq",
|
||||
"granularity": "all",
|
||||
"dimensions": [],
|
||||
"aggregations":[
|
||||
{ "type": "count", "name": "rows"},
|
||||
{ "type": "doubleSum", "fieldName": "events", "name": "e"},
|
||||
{ "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
|
||||
],
|
||||
"postAggregations":[
|
||||
{ "type":"arithmetic",
|
||||
"name":"avg_random",
|
||||
"fn":"/",
|
||||
"fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
|
||||
{"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
|
||||
],
|
||||
"intervals":["2012-10-01T00:00/2020-01-01T00"]
|
||||
}
|
|
@ -1,2 +0,0 @@
|
|||
CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE;
|
||||
CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE;
|
|
@ -1,5 +1,5 @@
|
|||
# Extensions
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.7.0","io.druid.extensions:druid-kafka-seven:0.7.0","io.druid.extensions:druid-rabbitmq:0.7.0", "io.druid.extensions:druid-s3-extensions:0.7.0"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.7.0-SNAPSHOT","io.druid.extensions:druid-kafka-seven:0.7.0-SNAPSHOT","io.druid.extensions:druid-rabbitmq:0.7.0-SNAPSHOT"]
|
||||
|
||||
# Zookeeper
|
||||
druid.zk.service.host=localhost
|
|
@ -2,16 +2,9 @@ druid.host=localhost
|
|||
druid.service=historical
|
||||
druid.port=8081
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.7.0"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
|
||||
|
||||
druid.server.maxSize=10000000000
|
||||
|
||||
# Change these to make Druid faster
|
||||
druid.processing.buffer.sizeBytes=100000000
|
||||
druid.processing.numThreads=1
|
||||
|
||||
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
|
||||
druid.server.maxSize=10000000000
|
||||
|
|
|
@ -23,9 +23,7 @@ import com.fasterxml.jackson.databind.Module;
|
|||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.inject.Binder;
|
||||
import io.druid.examples.rand.RandomFirehoseFactory;
|
||||
import io.druid.examples.twitter.TwitterSpritzerFirehoseFactory;
|
||||
import io.druid.examples.web.WebFirehoseFactory;
|
||||
import io.druid.initialization.DruidModule;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -41,9 +39,7 @@ public class ExamplesDruidModule implements DruidModule
|
|||
return Arrays.<Module>asList(
|
||||
new SimpleModule("ExamplesModule")
|
||||
.registerSubtypes(
|
||||
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
|
||||
new NamedType(RandomFirehoseFactory.class, "rand"),
|
||||
new NamedType(WebFirehoseFactory.class, "webstream")
|
||||
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer")
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -1,269 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.examples.rand;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.data.input.Firehose;
|
||||
import io.druid.data.input.FirehoseFactory;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.data.input.impl.InputRowParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
|
||||
/**
|
||||
* Random value sequence Firehost Factory named "rand".
|
||||
* Builds a Firehose that emits a stream of random numbers (outColumn, a positive double)
|
||||
* with timestamps along with an associated token (target). This provides a timeseries
|
||||
* that requires no network access for demonstration, characterization, and testing.
|
||||
* The generated tuples can be thought of as asynchronously
|
||||
* produced triples (timestamp, outColumn, target) where the timestamp varies depending on
|
||||
* speed of processing.
|
||||
*
|
||||
* <p>
|
||||
* InputRows are produced as fast as requested, so this can be used to determine the
|
||||
* upper rate of ingest if sleepUsec is set to 0; nTokens specifies how many associated
|
||||
* target labels are used. Generation is round-robin for nTokens and sleep occurs
|
||||
* every nPerSleep values generated. A random number seed can be used by setting the
|
||||
* firehose parameter "seed" to a non-zero value so that values can be reproducible
|
||||
* (but note that timestamp is not deterministic because timestamps are obtained at
|
||||
* the moment an event is delivered.)
|
||||
* Values are offset by adding the modulus of the token number to the random number
|
||||
* so that token values have distinct, non-overlapping ranges.
|
||||
* </p>
|
||||
*
|
||||
* Example spec file:
|
||||
* <pre>
|
||||
* [{
|
||||
* "schema" : { "dataSource":"randseq",
|
||||
* "aggregators":[ {"type":"count", "name":"events"},
|
||||
* {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
|
||||
* "indexGranularity":"minute",
|
||||
* "shardSpec" : { "type": "none" } },
|
||||
* "config" : { "maxRowsInMemory" : 50000,
|
||||
* "intermediatePersistPeriod" : "PT2m" },
|
||||
*
|
||||
* "firehose" : { "type" : "rand",
|
||||
* "sleepUsec": 100000,
|
||||
* "maxGeneratedRows" : 5000000,
|
||||
* "seed" : 0,
|
||||
* "nTokens" : 19,
|
||||
* "nPerSleep" : 3
|
||||
* },
|
||||
*
|
||||
* "plumber" : { "type" : "realtime",
|
||||
* "windowPeriod" : "PT5m",
|
||||
* "segmentGranularity":"hour",
|
||||
* "basePersistDirectory" : "/tmp/realtime/basePersist" }
|
||||
* }]
|
||||
* </pre>
|
||||
*
|
||||
* Example query using POST to /druid/v2/ (where UTC date and time MUST include the current hour):
|
||||
* <pre>
|
||||
* {
|
||||
* "queryType": "groupBy",
|
||||
* "dataSource": "randSeq",
|
||||
* "granularity": "all",
|
||||
* "dimensions": [],
|
||||
* "aggregations":[
|
||||
* { "type": "count", "name": "rows"},
|
||||
* { "type": "doubleSum", "fieldName": "events", "name": "e"},
|
||||
* { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
|
||||
* ],
|
||||
* "postAggregations":[
|
||||
* { "type":"arithmetic",
|
||||
* "name":"avg_random",
|
||||
* "fn":"/",
|
||||
* "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
|
||||
* {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
|
||||
* ],
|
||||
* "intervals":["2012-10-01T00:00/2020-01-01T00"]
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
@JsonTypeName("rand")
|
||||
public class RandomFirehoseFactory implements FirehoseFactory<InputRowParser>
|
||||
{
|
||||
private static final Logger log = new Logger(RandomFirehoseFactory.class);
|
||||
/**
|
||||
* msec to sleep before generating a new row; if this and delayNsec are 0, then go as fast as possible.
|
||||
* json param sleepUsec (microseconds) is used to initialize this.
|
||||
*/
|
||||
private final long delayMsec;
|
||||
/**
|
||||
* nsec to sleep before generating a new row; if this and delayMsec are 0, then go as fast as possible.
|
||||
* json param sleepUsec (microseconds) is used to initialize this.
|
||||
*/
|
||||
private final int delayNsec;
|
||||
/**
|
||||
* max rows to generate, -1 is infinite, 0 means nothing is generated; use this to prevent
|
||||
* infinite space consumption or to see what happens when a Firehose stops delivering
|
||||
* values, or to have hasMore() return false.
|
||||
*/
|
||||
private final long maxGeneratedRows;
|
||||
/**
|
||||
* seed for random number generator; if 0, then no seed is used.
|
||||
*/
|
||||
private final long seed;
|
||||
/**
|
||||
* number of tokens to randomly associate with values (no heap limits). This can be used to
|
||||
* stress test the number of tokens.
|
||||
*/
|
||||
private final int nTokens;
|
||||
/**
|
||||
* Number of token events per sleep interval.
|
||||
*/
|
||||
private final int nPerSleep;
|
||||
|
||||
@JsonCreator
|
||||
public RandomFirehoseFactory(
|
||||
@JsonProperty("sleepUsec") Long sleepUsec,
|
||||
@JsonProperty("maxGeneratedRows") Long maxGeneratedRows,
|
||||
@JsonProperty("seed") Long seed,
|
||||
@JsonProperty("nTokens") Integer nTokens,
|
||||
@JsonProperty("nPerSleep") Integer nPerSleep
|
||||
)
|
||||
{
|
||||
long nsec = (sleepUsec > 0) ? sleepUsec * 1000L : 0;
|
||||
long msec = nsec / 1000000L;
|
||||
this.delayMsec = msec;
|
||||
this.delayNsec = (int) (nsec - (msec * 1000000L));
|
||||
this.maxGeneratedRows = maxGeneratedRows;
|
||||
this.seed = seed;
|
||||
this.nTokens = nTokens;
|
||||
this.nPerSleep = nPerSleep;
|
||||
if (nTokens <= 0) {
|
||||
log.warn("nTokens parameter " + nTokens + " ignored; must be greater than or equal to 1");
|
||||
nTokens = 1;
|
||||
}
|
||||
if (nPerSleep <= 0) {
|
||||
log.warn("nPerSleep parameter " + nPerSleep + " ignored; must be greater than or equal to 1");
|
||||
nPerSleep = 1;
|
||||
}
|
||||
log.info("maxGeneratedRows=" + maxGeneratedRows);
|
||||
log.info("seed=" + ((seed == 0L) ? "random value" : seed));
|
||||
log.info("nTokens=" + nTokens);
|
||||
log.info("nPerSleep=" + nPerSleep);
|
||||
double dmsec = (double) delayMsec + ((double) this.delayNsec) / 1000000.;
|
||||
if (dmsec > 0.0) {
|
||||
log.info("sleep period=" + dmsec + "msec");
|
||||
log.info(
|
||||
"approximate max rate of record generation=" + (nPerSleep * 1000. / dmsec) + "/sec" +
|
||||
" or " + (60. * nPerSleep * 1000. / dmsec) + "/minute"
|
||||
);
|
||||
} else {
|
||||
log.info("sleep period= NONE");
|
||||
log.info("approximate max rate of record generation= as fast as possible");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Firehose connect(InputRowParser parser) throws IOException
|
||||
{
|
||||
final LinkedList<String> dimensions = new LinkedList<String>();
|
||||
dimensions.add("inColumn");
|
||||
dimensions.add("target");
|
||||
|
||||
return new Firehose()
|
||||
{
|
||||
private final java.util.Random rand = (seed == 0L) ? new Random() : new Random(seed);
|
||||
|
||||
private long rowCount = 0L;
|
||||
private boolean waitIfmaxGeneratedRows = true;
|
||||
|
||||
@Override
|
||||
public boolean hasMore()
|
||||
{
|
||||
if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows) {
|
||||
return waitIfmaxGeneratedRows;
|
||||
} else {
|
||||
return true; // there are always more random numbers
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputRow nextRow()
|
||||
{
|
||||
final long modulus = rowCount % nPerSleep;
|
||||
final long nth = (rowCount % nTokens) + 1;
|
||||
long sleepMsec = delayMsec;
|
||||
// all done?
|
||||
if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows && waitIfmaxGeneratedRows) {
|
||||
// sleep a long time instead of terminating
|
||||
sleepMsec = 2000000000L;
|
||||
}
|
||||
if (sleepMsec > 0L || delayNsec > 0) {
|
||||
try {
|
||||
if (modulus == 0) {
|
||||
sleep(sleepMsec, delayNsec);
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new RuntimeException("InterruptedException");
|
||||
}
|
||||
}
|
||||
if (++rowCount % 1000 == 0) {
|
||||
log.info("%,d events created.", rowCount);
|
||||
}
|
||||
|
||||
final Map<String, Object> theMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
|
||||
theMap.put("inColumn", anotherRand((int) nth));
|
||||
theMap.put("target", ("a" + nth));
|
||||
return new MapBasedInputRow(System.currentTimeMillis(), dimensions, theMap);
|
||||
}
|
||||
|
||||
private Float anotherRand(int scale)
|
||||
{
|
||||
double f = rand.nextDouble(); // [0.0,1.0]
|
||||
return new Float(f + (double) scale);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable commit()
|
||||
{
|
||||
// Do nothing.
|
||||
return new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
// do nothing
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
|
@ -1,133 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.examples.web;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class InputSupplierUpdateStream implements UpdateStream
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(InputSupplierUpdateStream.class);
|
||||
private static final long queueWaitTime = 15L;
|
||||
private final TypeReference<HashMap<String, Object>> typeRef;
|
||||
private final InputSupplier<BufferedReader> supplier;
|
||||
private final int QUEUE_SIZE = 10000;
|
||||
private final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
|
||||
private final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
private final String timeDimension;
|
||||
private final Thread addToQueueThread;
|
||||
|
||||
public InputSupplierUpdateStream(
|
||||
final InputSupplier<BufferedReader> supplier,
|
||||
final String timeDimension
|
||||
)
|
||||
{
|
||||
addToQueueThread = new Thread()
|
||||
{
|
||||
public void run()
|
||||
{
|
||||
while (!isInterrupted()) {
|
||||
try {
|
||||
BufferedReader reader = supplier.getInput();
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
if (isValid(line)) {
|
||||
HashMap<String, Object> map = mapper.readValue(line, typeRef);
|
||||
if (map.get(timeDimension) != null) {
|
||||
queue.offer(map, queueWaitTime, TimeUnit.SECONDS);
|
||||
log.debug("Successfully added to queue");
|
||||
} else {
|
||||
log.info("missing timestamp");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
catch (InterruptedException e){
|
||||
log.info(e, "Thread adding events to the queue interrupted");
|
||||
return;
|
||||
}
|
||||
catch (JsonMappingException e) {
|
||||
log.info(e, "Error in converting json to map");
|
||||
}
|
||||
catch (JsonParseException e) {
|
||||
log.info(e, "Error in parsing json");
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.info(e, "Error in connecting to InputStream");
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
addToQueueThread.setDaemon(true);
|
||||
|
||||
this.supplier = supplier;
|
||||
this.typeRef = new TypeReference<HashMap<String, Object>>()
|
||||
{
|
||||
};
|
||||
this.timeDimension = timeDimension;
|
||||
}
|
||||
|
||||
private boolean isValid(String s)
|
||||
{
|
||||
return !(s.isEmpty());
|
||||
}
|
||||
|
||||
public void start()
|
||||
{
|
||||
addToQueueThread.start();
|
||||
|
||||
}
|
||||
|
||||
public void stop()
|
||||
{
|
||||
addToQueueThread.interrupt();
|
||||
}
|
||||
|
||||
|
||||
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
|
||||
{
|
||||
return queue.poll(waitTime, unit);
|
||||
}
|
||||
|
||||
public int getQueueSize()
|
||||
{
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
public String getTimeDimension()
|
||||
{
|
||||
return timeDimension;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,42 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.examples.web;
|
||||
|
||||
import com.google.common.io.InputSupplier;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
|
||||
public class InputSupplierUpdateStreamFactory implements UpdateStreamFactory
|
||||
{
|
||||
private final InputSupplier<BufferedReader> inputSupplier;
|
||||
private final String timeDimension;
|
||||
|
||||
public InputSupplierUpdateStreamFactory(InputSupplier<BufferedReader> inputSupplier, String timeDimension)
|
||||
{
|
||||
this.inputSupplier = inputSupplier;
|
||||
this.timeDimension = timeDimension;
|
||||
}
|
||||
|
||||
public InputSupplierUpdateStream build()
|
||||
{
|
||||
return new InputSupplierUpdateStream(inputSupplier, timeDimension);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,81 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.examples.web;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RenamingKeysUpdateStream implements UpdateStream
|
||||
{
|
||||
|
||||
private final InputSupplierUpdateStream updateStream;
|
||||
private Map<String, String> renamedDimensions;
|
||||
|
||||
public RenamingKeysUpdateStream(
|
||||
InputSupplierUpdateStream updateStream,
|
||||
Map<String, String> renamedDimensions
|
||||
)
|
||||
{
|
||||
this.renamedDimensions = renamedDimensions;
|
||||
this.updateStream = updateStream;
|
||||
}
|
||||
|
||||
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
|
||||
{
|
||||
return renameKeys(updateStream.pollFromQueue(waitTime, unit));
|
||||
}
|
||||
|
||||
|
||||
private Map<String, Object> renameKeys(Map<String, Object> update)
|
||||
{
|
||||
if (renamedDimensions != null) {
|
||||
Map<String, Object> renamedMap = Maps.newHashMap();
|
||||
for (String key : renamedDimensions.keySet()) {
|
||||
if (update.get(key) != null) {
|
||||
Object obj = update.get(key);
|
||||
renamedMap.put(renamedDimensions.get(key), obj);
|
||||
}
|
||||
}
|
||||
return renamedMap;
|
||||
} else {
|
||||
return update;
|
||||
}
|
||||
}
|
||||
|
||||
public String getTimeDimension()
|
||||
{
|
||||
if (renamedDimensions != null && renamedDimensions.get(updateStream.getTimeDimension()) != null) {
|
||||
return renamedDimensions.get(updateStream.getTimeDimension());
|
||||
}
|
||||
return updateStream.getTimeDimension();
|
||||
|
||||
}
|
||||
|
||||
public void start()
|
||||
{
|
||||
updateStream.start();
|
||||
}
|
||||
|
||||
public void stop(){
|
||||
updateStream.stop();
|
||||
}
|
||||
}
|
|
@ -1,39 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.examples.web;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class RenamingKeysUpdateStreamFactory implements UpdateStreamFactory
|
||||
{
|
||||
private InputSupplierUpdateStreamFactory updateStreamFactory;
|
||||
private Map<String, String> renamedDimensions;
|
||||
|
||||
public RenamingKeysUpdateStreamFactory(InputSupplierUpdateStreamFactory updateStreamFactory, Map<String, String> renamedDimensions)
|
||||
{
|
||||
this.updateStreamFactory = updateStreamFactory;
|
||||
this.renamedDimensions = renamedDimensions;
|
||||
}
|
||||
|
||||
public RenamingKeysUpdateStream build()
|
||||
{
|
||||
return new RenamingKeysUpdateStream(updateStreamFactory.build(), renamedDimensions);
|
||||
}
|
||||
}
|
|
@ -1,31 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
package io.druid.examples.web;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public interface UpdateStream
|
||||
{
|
||||
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException;
|
||||
public String getTimeDimension();
|
||||
public void start();
|
||||
public void stop();
|
||||
|
||||
}
|
|
@ -1,24 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
package io.druid.examples.web;
|
||||
|
||||
public interface UpdateStreamFactory
|
||||
{
|
||||
public UpdateStream build();
|
||||
}
|
|
@ -1,136 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.examples.web;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.metamx.common.parsers.TimestampParser;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.data.input.Firehose;
|
||||
import io.druid.data.input.FirehoseFactory;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.data.input.impl.InputRowParser;
|
||||
import io.druid.utils.Runnables;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@JsonTypeName("webstream")
|
||||
public class WebFirehoseFactory implements FirehoseFactory<InputRowParser>
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class);
|
||||
|
||||
private final String timeFormat;
|
||||
private final UpdateStreamFactory factory;
|
||||
private final long queueWaitTime = 15L;
|
||||
|
||||
@JsonCreator
|
||||
public WebFirehoseFactory(
|
||||
@JsonProperty("url") String url,
|
||||
@JsonProperty("renamedDimensions") Map<String, String> renamedDimensions,
|
||||
@JsonProperty("timeDimension") String timeDimension,
|
||||
@JsonProperty("timeFormat") String timeFormat
|
||||
)
|
||||
{
|
||||
this(
|
||||
new RenamingKeysUpdateStreamFactory(
|
||||
new InputSupplierUpdateStreamFactory(new WebJsonSupplier(url), timeDimension),
|
||||
renamedDimensions
|
||||
), timeFormat
|
||||
);
|
||||
}
|
||||
|
||||
public WebFirehoseFactory(UpdateStreamFactory factory, String timeFormat)
|
||||
{
|
||||
this.factory = factory;
|
||||
if (timeFormat == null) {
|
||||
this.timeFormat = "auto";
|
||||
} else {
|
||||
this.timeFormat = timeFormat;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Firehose connect(InputRowParser parser) throws IOException
|
||||
{
|
||||
|
||||
final UpdateStream updateStream = factory.build();
|
||||
updateStream.start();
|
||||
|
||||
return new Firehose()
|
||||
{
|
||||
Map<String, Object> map;
|
||||
private final Runnable doNothingRunnable = Runnables.getNoopRunnable();
|
||||
|
||||
@Override
|
||||
public boolean hasMore()
|
||||
{
|
||||
try {
|
||||
map = updateStream.pollFromQueue(queueWaitTime, TimeUnit.SECONDS);
|
||||
return map != null;
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public InputRow nextRow()
|
||||
{
|
||||
try {
|
||||
DateTime date = TimestampParser.createTimestampParser(timeFormat)
|
||||
.apply(map.get(updateStream.getTimeDimension()).toString());
|
||||
return new MapBasedInputRow(
|
||||
date.getMillis(),
|
||||
new ArrayList(map.keySet()),
|
||||
map
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
finally {
|
||||
map = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable commit()
|
||||
{
|
||||
// ephemera in, ephemera out.
|
||||
return doNothingRunnable; // reuse the same object each time
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
updateStream.stop();
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
}
|
|
@ -1,61 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.examples.web;
|
||||
|
||||
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import org.apache.commons.validator.routines.UrlValidator;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
|
||||
public class WebJsonSupplier implements InputSupplier<BufferedReader>
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(WebJsonSupplier.class);
|
||||
private static final UrlValidator urlValidator = new UrlValidator();
|
||||
|
||||
private URL url;
|
||||
|
||||
public WebJsonSupplier(String urlString)
|
||||
{
|
||||
Preconditions.checkState(urlValidator.isValid(urlString));
|
||||
|
||||
try {
|
||||
this.url = new URL(urlString);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferedReader getInput() throws IOException
|
||||
{
|
||||
URLConnection connection = url.openConnection();
|
||||
connection.setDoInput(true);
|
||||
return new BufferedReader(new InputStreamReader(url.openStream(), Charsets.UTF_8));
|
||||
}
|
||||
}
|
|
@ -1,114 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.examples.web;
|
||||
|
||||
import com.google.common.io.InputSupplier;
|
||||
import junit.framework.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class InputSupplierUpdateStreamTest
|
||||
{
|
||||
private final long waitTime = 1L;
|
||||
private final TimeUnit unit = TimeUnit.SECONDS;
|
||||
private final ArrayList<String> dimensions = new ArrayList<String>();
|
||||
private InputSupplier testCaseSupplier;
|
||||
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
|
||||
String timeDimension;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
timeDimension = "time";
|
||||
testCaseSupplier = new TestCaseSupplier(
|
||||
"{\"item1\": \"value1\","
|
||||
+ "\"item2\":2,"
|
||||
+ "\"time\":1372121562 }"
|
||||
);
|
||||
|
||||
dimensions.add("item1");
|
||||
dimensions.add("item2");
|
||||
dimensions.add("time");
|
||||
|
||||
expectedAnswer.put("item1", "value1");
|
||||
expectedAnswer.put("item2", 2);
|
||||
expectedAnswer.put("time", 1372121562);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void basicIngestionCheck() throws Exception
|
||||
{
|
||||
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
|
||||
testCaseSupplier,
|
||||
timeDimension
|
||||
);
|
||||
updateStream.start();
|
||||
Map<String, Object> insertedRow = updateStream.pollFromQueue(waitTime, unit);
|
||||
Assert.assertEquals(expectedAnswer, insertedRow);
|
||||
updateStream.stop();
|
||||
}
|
||||
|
||||
//If a timestamp is missing, we should throw away the event
|
||||
@Test
|
||||
public void missingTimeStampCheck()
|
||||
{
|
||||
testCaseSupplier = new TestCaseSupplier(
|
||||
"{\"item1\": \"value1\","
|
||||
+ "\"item2\":2}"
|
||||
);
|
||||
|
||||
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
|
||||
testCaseSupplier,
|
||||
timeDimension
|
||||
);
|
||||
updateStream.start();
|
||||
Assert.assertEquals(updateStream.getQueueSize(), 0);
|
||||
updateStream.stop();
|
||||
}
|
||||
|
||||
//If any other value is missing, we should still add the event and process it properly
|
||||
@Test
|
||||
public void otherNullValueCheck() throws Exception
|
||||
{
|
||||
testCaseSupplier = new TestCaseSupplier(
|
||||
"{\"item1\": \"value1\","
|
||||
+ "\"time\":1372121562 }"
|
||||
);
|
||||
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
|
||||
testCaseSupplier,
|
||||
timeDimension
|
||||
);
|
||||
updateStream.start();
|
||||
Map<String, Object> insertedRow = updateStream.pollFromQueue(waitTime, unit);
|
||||
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
|
||||
expectedAnswer.put("item1", "value1");
|
||||
expectedAnswer.put("time", 1372121562);
|
||||
Assert.assertEquals(expectedAnswer, insertedRow);
|
||||
updateStream.stop();
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,91 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.examples.web;
|
||||
|
||||
import com.google.common.io.InputSupplier;
|
||||
import junit.framework.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RenamingKeysUpdateStreamTest
|
||||
{
|
||||
private final long waitTime = 15L;
|
||||
private final TimeUnit unit = TimeUnit.SECONDS;
|
||||
private InputSupplier testCaseSupplier;
|
||||
String timeDimension;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
timeDimension = "time";
|
||||
testCaseSupplier = new TestCaseSupplier(
|
||||
"{\"item1\": \"value1\","
|
||||
+ "\"item2\":2,"
|
||||
+ "\"time\":1372121562 }"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPolFromQueue() throws Exception
|
||||
{
|
||||
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
|
||||
Map<String, String> renamedKeys = new HashMap<String, String>();
|
||||
renamedKeys.put("item1", "i1");
|
||||
renamedKeys.put("item2", "i2");
|
||||
renamedKeys.put("time", "t");
|
||||
|
||||
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
|
||||
renamer.start();
|
||||
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
|
||||
expectedAnswer.put("i1", "value1");
|
||||
expectedAnswer.put("i2", 2);
|
||||
expectedAnswer.put("t", 1372121562);
|
||||
Assert.assertEquals(expectedAnswer, renamer.pollFromQueue(waitTime, unit));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTimeDimension() throws Exception
|
||||
{
|
||||
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
|
||||
Map<String, String> renamedKeys = new HashMap<String, String>();
|
||||
renamedKeys.put("item1", "i1");
|
||||
renamedKeys.put("item2", "i2");
|
||||
renamedKeys.put("time", "t");
|
||||
|
||||
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
|
||||
Assert.assertEquals("t", renamer.getTimeDimension());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingTimeRename() throws Exception
|
||||
{
|
||||
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
|
||||
Map<String, String> renamedKeys = new HashMap<String, String>();
|
||||
renamedKeys.put("item1", "i1");
|
||||
renamedKeys.put("item2", "i2");
|
||||
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
|
||||
Assert.assertEquals("time", renamer.getTimeDimension());
|
||||
}
|
||||
|
||||
}
|
|
@ -1,42 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.examples.web;
|
||||
|
||||
import com.google.common.io.InputSupplier;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
|
||||
public class TestCaseSupplier implements InputSupplier<BufferedReader>
|
||||
{
|
||||
private final String testString;
|
||||
|
||||
public TestCaseSupplier(String testString)
|
||||
{
|
||||
this.testString = testString;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferedReader getInput() throws IOException
|
||||
{
|
||||
return new BufferedReader(new StringReader(testString));
|
||||
}
|
||||
}
|
|
@ -1,223 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.examples.web;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.data.input.Firehose;
|
||||
import io.druid.data.input.InputRow;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class WebFirehoseFactoryTest
|
||||
{
|
||||
private List<String> dimensions = Lists.newArrayList();
|
||||
private WebFirehoseFactory webbie;
|
||||
private WebFirehoseFactory webbie1;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
dimensions.add("item1");
|
||||
dimensions.add("item2");
|
||||
dimensions.add("time");
|
||||
webbie = new WebFirehoseFactory(
|
||||
new UpdateStreamFactory()
|
||||
{
|
||||
@Override
|
||||
public UpdateStream build()
|
||||
{
|
||||
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "1372121562"));
|
||||
}
|
||||
},
|
||||
"posix"
|
||||
);
|
||||
|
||||
webbie1 = new WebFirehoseFactory(
|
||||
new UpdateStreamFactory()
|
||||
{
|
||||
@Override
|
||||
public UpdateStream build()
|
||||
{
|
||||
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "1373241600000"));
|
||||
}
|
||||
},
|
||||
"auto"
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDimensions() throws Exception
|
||||
{
|
||||
InputRow inputRow;
|
||||
Firehose firehose = webbie.connect(null);
|
||||
if (firehose.hasMore()) {
|
||||
inputRow = firehose.nextRow();
|
||||
} else {
|
||||
throw new RuntimeException("queue is empty");
|
||||
}
|
||||
List<String> actualAnswer = inputRow.getDimensions();
|
||||
Collections.sort(actualAnswer);
|
||||
Assert.assertEquals(actualAnswer, dimensions);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPosixTimeStamp() throws Exception
|
||||
{
|
||||
InputRow inputRow;
|
||||
Firehose firehose = webbie.connect(null);
|
||||
if (firehose.hasMore()) {
|
||||
inputRow = firehose.nextRow();
|
||||
} else {
|
||||
throw new RuntimeException("queue is empty");
|
||||
}
|
||||
long expectedTime = 1372121562L * 1000L;
|
||||
Assert.assertEquals(expectedTime, inputRow.getTimestampFromEpoch());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testISOTimeStamp() throws Exception
|
||||
{
|
||||
WebFirehoseFactory webbie3 = new WebFirehoseFactory(
|
||||
new UpdateStreamFactory()
|
||||
{
|
||||
@Override
|
||||
public UpdateStream build()
|
||||
{
|
||||
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "2013-07-08"));
|
||||
}
|
||||
},
|
||||
"auto"
|
||||
);
|
||||
Firehose firehose1 = webbie3.connect(null);
|
||||
if (firehose1.hasMore()) {
|
||||
long milliSeconds = firehose1.nextRow().getTimestampFromEpoch();
|
||||
DateTime date = new DateTime("2013-07-08");
|
||||
Assert.assertEquals(date.getMillis(), milliSeconds);
|
||||
} else {
|
||||
Assert.assertFalse("hasMore returned false", true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoIsoTimeStamp() throws Exception
|
||||
{
|
||||
WebFirehoseFactory webbie2 = new WebFirehoseFactory(
|
||||
new UpdateStreamFactory()
|
||||
{
|
||||
@Override
|
||||
public UpdateStream build()
|
||||
{
|
||||
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "2013-07-08"));
|
||||
}
|
||||
},
|
||||
null
|
||||
);
|
||||
Firehose firehose2 = webbie2.connect(null);
|
||||
if (firehose2.hasMore()) {
|
||||
long milliSeconds = firehose2.nextRow().getTimestampFromEpoch();
|
||||
DateTime date = new DateTime("2013-07-08");
|
||||
Assert.assertEquals(date.getMillis(), milliSeconds);
|
||||
} else {
|
||||
Assert.assertFalse("hasMore returned false", true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoMilliSecondsTimeStamp() throws Exception
|
||||
{
|
||||
Firehose firehose3 = webbie1.connect(null);
|
||||
if (firehose3.hasMore()) {
|
||||
long milliSeconds = firehose3.nextRow().getTimestampFromEpoch();
|
||||
DateTime date = new DateTime("2013-07-08");
|
||||
Assert.assertEquals(date.getMillis(), milliSeconds);
|
||||
} else {
|
||||
Assert.assertFalse("hasMore returned false", true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetDimension() throws Exception
|
||||
{
|
||||
InputRow inputRow;
|
||||
Firehose firehose = webbie1.connect(null);
|
||||
if (firehose.hasMore()) {
|
||||
inputRow = firehose.nextRow();
|
||||
} else {
|
||||
throw new RuntimeException("queue is empty");
|
||||
}
|
||||
|
||||
List<String> column1 = Lists.newArrayList();
|
||||
column1.add("value1");
|
||||
Assert.assertEquals(column1, inputRow.getDimension("item1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFloatMetric() throws Exception
|
||||
{
|
||||
InputRow inputRow;
|
||||
Firehose firehose = webbie1.connect(null);
|
||||
if (firehose.hasMore()) {
|
||||
inputRow = firehose.nextRow();
|
||||
} else {
|
||||
throw new RuntimeException("queue is empty");
|
||||
}
|
||||
|
||||
Assert.assertEquals((float) 2.0, inputRow.getFloatMetric("item2"), 0.0f);
|
||||
}
|
||||
|
||||
private static class MyUpdateStream implements UpdateStream
|
||||
{
|
||||
private static ImmutableMap<String,Object> map;
|
||||
public MyUpdateStream(ImmutableMap<String,Object> map){
|
||||
this.map=map;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
|
||||
{
|
||||
return map;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTimeDimension()
|
||||
{
|
||||
return "time";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop()
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,32 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.examples.web;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class WebJsonSupplierTest
|
||||
{
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void checkInvalidUrl() throws Exception
|
||||
{
|
||||
String invalidURL = "http://invalid.url.";
|
||||
WebJsonSupplier supplier = new WebJsonSupplier(invalidURL);
|
||||
}
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue