diff --git a/examples/src/main/java/druid/examples/flights/FlightsConverter.java b/examples/src/main/java/druid/examples/flights/FlightsConverter.java index 730eea0bd92..53ea6638e60 100644 --- a/examples/src/main/java/druid/examples/flights/FlightsConverter.java +++ b/examples/src/main/java/druid/examples/flights/FlightsConverter.java @@ -114,7 +114,8 @@ public class FlightsConverter if (value.equals("NA")) { event.put(metricDimension, 0); - } else { + } + else { event.put(metricDimension, Integer.parseInt(value)); } } diff --git a/examples/src/main/java/druid/examples/flights/FlightsFirehoseFactory.java b/examples/src/main/java/druid/examples/flights/FlightsFirehoseFactory.java index 07eb856425e..66b8de4450c 100644 --- a/examples/src/main/java/druid/examples/flights/FlightsFirehoseFactory.java +++ b/examples/src/main/java/druid/examples/flights/FlightsFirehoseFactory.java @@ -80,7 +80,8 @@ public class FlightsFirehoseFactory implements FirehoseFactory try { if (line != null) { return true; - } else if (in != null) { + } + else if (in != null) { line = in.readLine(); if (line == null) { @@ -89,14 +90,16 @@ public class FlightsFirehoseFactory implements FirehoseFactory } return true; - } else if (files.hasNext()) { + } + else if (files.hasNext()) { final File nextFile = files.next(); if (nextFile.getName().endsWith(".gz")) { in = new BufferedReader( new InputStreamReader(new GZIPInputStream(new FileInputStream(nextFile)), Charsets.UTF_8) ); - } else { + } + else { in = new BufferedReader(new FileReader(nextFile)); } return hasMore(); diff --git a/examples/src/main/java/druid/examples/rand/RandomFirehoseFactory.java b/examples/src/main/java/druid/examples/rand/RandomFirehoseFactory.java index 54010e822ed..71a0c1fbb92 100644 --- a/examples/src/main/java/druid/examples/rand/RandomFirehoseFactory.java +++ b/examples/src/main/java/druid/examples/rand/RandomFirehoseFactory.java @@ -35,89 +35,81 @@ import static java.lang.Thread.sleep; * the moment an event is delivered.) * Values are offset by adding the modulus of the token number to the random number * so that token values have distinct, non-overlapping ranges. - *

+ * *

* Example spec file: *
- * [{
- * "schema" : { "dataSource":"randseq",
- * "aggregators":[ {"type":"count", "name":"events"},
- * {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
- * "indexGranularity":"minute",
- * "shardSpec" : { "type": "none" } },
- * "config" : { "maxRowsInMemory" : 50000,
- * "intermediatePersistPeriod" : "PT2m" },
- *
- * "firehose" : { "type" : "rand",
- * "sleepUsec": 100000,
- * "maxGeneratedRows" : 5000000,
- * "seed" : 0,
- * "nTokens" : 19,
- * "nPerSleep" : 3
- * },
- *
- * "plumber" : { "type" : "realtime",
- * "windowPeriod" : "PT5m",
- * "segmentGranularity":"hour",
- * "basePersistDirectory" : "/tmp/realtime/basePersist" }
- * }]
+ [{
+   "schema" : { "dataSource":"randseq",
+                "aggregators":[ {"type":"count", "name":"events"},
+ 	       		                    {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
+                "indexGranularity":"minute",
+ 	              "shardSpec" : { "type": "none" } },
+   "config" : { "maxRowsInMemory" : 50000,
+                "intermediatePersistPeriod" : "PT2m" },
+
+   "firehose" : { "type" : "rand",
+                  "sleepUsec": 100000,
+                  "maxGeneratedRows" : 5000000,
+                  "seed" : 0,
+                  "nTokens" : 19,
+                  "nPerSleep" : 3
+                 },
+
+   "plumber" : { "type" : "realtime",
+                 "windowPeriod" : "PT5m",
+                 "segmentGranularity":"hour",
+                 "basePersistDirectory" : "/tmp/realtime/basePersist" }
+ }]
  * 
- *

+ * * Example query using POST to /druid/v2/ (where UTC date and time MUST include the current hour): *

- * {
- * "queryType": "groupBy",
- * "dataSource": "randSeq",
- * "granularity": "all",
- * "dimensions": [],
- * "aggregations":[
- * { "type": "count", "name": "rows"},
- * { "type": "doubleSum", "fieldName": "events", "name": "e"},
- * { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
- * ],
- * "postAggregations":[
- * {  "type":"arithmetic",
- * "name":"avg_random",
- * "fn":"/",
- * "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
- * {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
- * ],
- * "intervals":["2012-10-01T00:00/2020-01-01T00"]
- * }
+ {
+     "queryType": "groupBy",
+     "dataSource": "randSeq",
+     "granularity": "all",
+     "dimensions": [],
+     "aggregations":[
+     { "type": "count", "name": "rows"},
+     { "type": "doubleSum", "fieldName": "events", "name": "e"},
+     { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
+     ],
+     "postAggregations":[
+     {  "type":"arithmetic",
+        "name":"avg_random",
+        "fn":"/",
+        "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
+                   {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
+     ],
+     "intervals":["2012-10-01T00:00/2020-01-01T00"]
+ }
  * 
*/ @JsonTypeName("rand") public class RandomFirehoseFactory implements FirehoseFactory { private static final Logger log = new Logger(RandomFirehoseFactory.class); - /** - * msec to sleep before generating a new row; if this and delayNsec are 0, then go as fast as possible. + /** msec to sleep before generating a new row; if this and delayNsec are 0, then go as fast as possible. * json param sleepUsec (microseconds) is used to initialize this. */ private final long delayMsec; - /** - * nsec to sleep before generating a new row; if this and delayMsec are 0, then go as fast as possible. + /** nsec to sleep before generating a new row; if this and delayMsec are 0, then go as fast as possible. * json param sleepUsec (microseconds) is used to initialize this. - */ + */ private final int delayNsec; - /** - * max rows to generate, -1 is infinite, 0 means nothing is generated; use this to prevent - * infinite space consumption or to see what happens when a Firehose stops delivering - * values, or to have hasMore() return false. + /** max rows to generate, -1 is infinite, 0 means nothing is generated; use this to prevent + * infinite space consumption or to see what happens when a Firehose stops delivering + * values, or to have hasMore() return false. */ private final long maxGeneratedRows; - /** - * seed for random number generator; if 0, then no seed is used. - */ + /** seed for random number generator; if 0, then no seed is used. */ private final long seed; - /** - * number of tokens to randomly associate with values (no heap limits). This can be used to + /** number of tokens to randomly associate with values (no heap limits). This can be used to * stress test the number of tokens. */ private final int nTokens; - /** - * Number of token events per sleep interval. - */ + /** Number of token events per sleep interval. */ private final int nPerSleep; @JsonCreator @@ -132,30 +124,29 @@ public class RandomFirehoseFactory implements FirehoseFactory long nsec = (sleepUsec > 0) ? sleepUsec * 1000L : 0; long msec = nsec / 1000000L; this.delayMsec = msec; - this.delayNsec = (int) (nsec - (msec * 1000000L)); + this.delayNsec = (int)(nsec - (msec * 1000000L)); this.maxGeneratedRows = maxGeneratedRows; this.seed = seed; this.nTokens = nTokens; this.nPerSleep = nPerSleep; if (nTokens <= 0) { - log.warn("nTokens parameter " + nTokens + " ignored; must be greater than or equal to 1"); + log.warn("nTokens parameter " + nTokens +" ignored; must be greater than or equal to 1"); nTokens = 1; } if (nPerSleep <= 0) { - log.warn("nPerSleep parameter " + nPerSleep + " ignored; must be greater than or equal to 1"); + log.warn("nPerSleep parameter " + nPerSleep +" ignored; must be greater than or equal to 1"); nPerSleep = 1; } log.info("maxGeneratedRows=" + maxGeneratedRows); - log.info("seed=" + ((seed == 0L) ? "random value" : seed)); + log.info("seed=" + ( (seed == 0L) ? "random value" : seed )); log.info("nTokens=" + nTokens); log.info("nPerSleep=" + nPerSleep); - double dmsec = (double) delayMsec + ((double) this.delayNsec) / 1000000.; + double dmsec = (double)delayMsec + ((double)this.delayNsec)/1000000.; if (dmsec > 0.0) { log.info("sleep period=" + dmsec + "msec"); - log.info( - "approximate max rate of record generation=" + (nPerSleep * 1000. / dmsec) + "/sec" + - " or " + (60. * nPerSleep * 1000. / dmsec) + "/minute" - ); + log.info("approximate max rate of record generation=" + (nPerSleep * 1000./dmsec) + "/sec" + + " or " + (60. * nPerSleep * 1000./dmsec) + "/minute" + ); } else { log.info("sleep period= NONE"); log.info("approximate max rate of record generation= as fast as possible"); @@ -179,7 +170,7 @@ public class RandomFirehoseFactory implements FirehoseFactory @Override public boolean hasMore() { - if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows) { + if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows) { return waitIfmaxGeneratedRows; } else { return true; // there are always more random numbers @@ -193,7 +184,7 @@ public class RandomFirehoseFactory implements FirehoseFactory final long nth = (rowCount % nTokens) + 1; long sleepMsec = delayMsec; // all done? - if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows && waitIfmaxGeneratedRows) { + if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows && waitIfmaxGeneratedRows) { // sleep a long time instead of terminating sleepMsec = 2000000000L; } @@ -202,8 +193,7 @@ public class RandomFirehoseFactory implements FirehoseFactory if (modulus == 0) { sleep(sleepMsec, delayNsec); } - } - catch (InterruptedException e) { + } catch (InterruptedException e) { throw new RuntimeException("InterruptedException"); } } @@ -212,7 +202,7 @@ public class RandomFirehoseFactory implements FirehoseFactory } final Map theMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - theMap.put("inColumn", anotherRand((int) nth)); + theMap.put("inColumn", anotherRand((int)nth)); theMap.put("target", ("a" + nth)); return new MapBasedInputRow(System.currentTimeMillis(), dimensions, theMap); } @@ -220,7 +210,7 @@ public class RandomFirehoseFactory implements FirehoseFactory private Float anotherRand(int scale) { double f = rand.nextDouble(); // [0.0,1.0] - return new Float(f + (double) scale); + return new Float(f + (double)scale); } @Override diff --git a/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java b/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java index 4b588ec826c..e87b5e44de2 100644 --- a/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java +++ b/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java @@ -11,13 +11,13 @@ import com.metamx.druid.realtime.firehose.Firehose; import com.metamx.druid.realtime.firehose.FirehoseFactory; import twitter4j.ConnectionLifeCycleListener; import twitter4j.HashtagEntity; -import twitter4j.StallWarning; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.User; +import twitter4j.StallWarning; import java.io.IOException; import java.util.Arrays; @@ -49,23 +49,21 @@ import static java.lang.Thread.sleep; * is UTC): *
  * 
- *

- *

+ * + * * Notes on twitter.com HTTP (REST) API: v1.0 will be disabled around 2013-03 so v1.1 should be used; * twitter4j 3.0 (not yet released) will support the v1.1 api. * Specifically, we should be using https://stream.twitter.com/1.1/statuses/sample.json * See: http://jira.twitter4j.org/browse/TFJ-186 - *

- * Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString() - * can have number format exceptions), so it might be necessary to extract raw json and process it - * separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON(); - * com.fasterxml.jackson.databind.ObjectMapper should be used to parse. * + * Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString() + * can have number format exceptions), so it might be necessary to extract raw json and process it + * separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON(); + * com.fasterxml.jackson.databind.ObjectMapper should be used to parse. * @author pbaclace */ @JsonTypeName("twitzer") -public class TwitterSpritzerFirehoseFactory implements FirehoseFactory -{ +public class TwitterSpritzerFirehoseFactory implements FirehoseFactory { private static final Logger log = new Logger(TwitterSpritzerFirehoseFactory.class); /** * max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent @@ -96,8 +94,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory @Override public Firehose connect() throws IOException { - final ConnectionLifeCycleListener connectionLifeCycleListener = new ConnectionLifeCycleListener() - { + final ConnectionLifeCycleListener connectionLifeCycleListener = new ConnectionLifeCycleListener() { @Override public void onConnect() { @@ -137,8 +134,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory // twitterStream = new TwitterStreamFactory().getInstance(); twitterStream.addConnectionLifeCycleListener(connectionLifeCycleListener); - statusListener = new StatusListener() - { // This is what really gets called to deliver stuff from twitter4j + statusListener = new StatusListener() { // This is what really gets called to deliver stuff from twitter4j @Override public void onStatus(Status status) { @@ -151,8 +147,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory if (!success) { log.warn("queue too slow!"); } - } - catch (InterruptedException e) { + } catch (InterruptedException e) { throw new RuntimeException("InterruptedException", e); } } @@ -184,8 +179,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory } @Override - public void onStallWarning(StallWarning warning) - { + public void onStallWarning(StallWarning warning) { System.out.println("Got stall warning:" + warning); } }; @@ -194,11 +188,9 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory twitterStream.sample(); // creates a generic StatusStream log.info("returned from sample()"); - return new Firehose() - { + return new Firehose() { - private final Runnable doNothingRunnable = new Runnable() - { + private final Runnable doNothingRunnable = new Runnable() { public void run() { } @@ -248,8 +240,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory try { log.info("reached limit, sleeping a long time..."); sleep(2000000000L); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { throw new RuntimeException("InterruptedException", e); } } else { @@ -263,8 +254,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory Status status; try { status = queue.take(); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { throw new RuntimeException("InterruptedException", e); } diff --git a/examples/src/main/java/druid/examples/webStream/UpdateStream.java b/examples/src/main/java/druid/examples/webStream/UpdateStream.java index 3cb7e5ef27a..57d8fc67311 100644 --- a/examples/src/main/java/druid/examples/webStream/UpdateStream.java +++ b/examples/src/main/java/druid/examples/webStream/UpdateStream.java @@ -22,10 +22,12 @@ package druid.examples.webStream; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; +import com.google.common.collect.Maps; import com.google.common.io.InputSupplier; import com.metamx.emitter.EmittingLogger; import java.io.BufferedReader; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -39,11 +41,17 @@ public class UpdateStream implements Runnable private final InputSupplier supplier; private final BlockingQueue> queue; private final ObjectMapper mapper; + private final ArrayList dimensions; + private final ArrayList renamedDimensions; + private final String timeDimension; public UpdateStream( InputSupplier supplier, BlockingQueue> queue, - ObjectMapper mapper + ObjectMapper mapper, + ArrayList dimensions, + ArrayList renamedDimensions, + String timeDimension ) { this.supplier = supplier; @@ -52,6 +60,9 @@ public class UpdateStream implements Runnable this.typeRef = new TypeReference>() { }; + this.timeDimension = timeDimension; + this.dimensions = dimensions; + this.renamedDimensions = renamedDimensions; } private boolean isValid(String s) @@ -60,7 +71,7 @@ public class UpdateStream implements Runnable } @Override - public void run() throws RuntimeException + public void run() { try { BufferedReader reader = supplier.getInput(); @@ -68,8 +79,13 @@ public class UpdateStream implements Runnable while ((line = reader.readLine()) != null) { if (isValid(line)) { HashMap map = mapper.readValue(line, typeRef); - queue.offer(map, queueWaitTime, TimeUnit.SECONDS); - log.debug("Successfully added to queue"); + if (map.get(timeDimension) != null) { + Map renamedMap = renameKeys(map); + queue.offer(renamedMap, queueWaitTime, TimeUnit.SECONDS); + log.debug("Successfully added to queue"); + } else { + log.debug("missing timestamp"); + } } } } @@ -78,4 +94,17 @@ public class UpdateStream implements Runnable } } + + private Map renameKeys(Map update) + { + Map renamedMap = Maps.newHashMap(); + for (int iter = 0; iter < dimensions.size(); iter++) { + if (update.get(dimensions.get(iter)) != null) { + Object obj = update.get(dimensions.get(iter)); + renamedMap.put(renamedDimensions.get(iter), obj); + } + } + return renamedMap; + } + } diff --git a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java index cebf2b7913c..3896fe30b99 100644 --- a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java +++ b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Throwables; -import com.google.common.collect.Maps; import com.metamx.common.parsers.TimestampParser; import com.metamx.druid.guava.Runnables; import com.metamx.druid.input.InputRow; @@ -35,7 +34,7 @@ import com.metamx.emitter.EmittingLogger; import org.joda.time.DateTime; import java.io.IOException; -import java.util.List; +import java.util.ArrayList; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -48,23 +47,26 @@ public class WebFirehoseFactory implements FirehoseFactory private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class); private static final int QUEUE_SIZE = 2000; private final String url; - private final List dimensions; + private final ArrayList dimensions; private final String timeDimension; - private final List renamedDimensions; + private final ArrayList renamedDimensions; + private final String timeFormat; @JsonCreator public WebFirehoseFactory( @JsonProperty("url") String url, - @JsonProperty("dimensions") List dimensions, - @JsonProperty("renamedDimensions") List renamedDimensions, - @JsonProperty("timeDimension") String timeDimension + @JsonProperty("dimensions") ArrayList dimensions, + @JsonProperty("renamedDimensions") ArrayList renamedDimensions, + @JsonProperty("timeDimension") String timeDimension, + @JsonProperty("timeFormat") String timeFormat ) { this.url = url; this.dimensions = dimensions; this.renamedDimensions = renamedDimensions; this.timeDimension = timeDimension; + this.timeFormat = timeFormat; } @Override @@ -72,7 +74,14 @@ public class WebFirehoseFactory implements FirehoseFactory { final BlockingQueue> queue = new ArrayBlockingQueue>(QUEUE_SIZE); - Runnable updateStream = new UpdateStream(new WebJsonSupplier(dimensions, url), queue, new DefaultObjectMapper()); + Runnable updateStream = new UpdateStream( + new WebJsonSupplier(url), + queue, + new DefaultObjectMapper(), + dimensions, + renamedDimensions, + timeDimension + ); final ExecutorService service = Executors.newSingleThreadExecutor(); service.submit(updateStream); @@ -83,7 +92,7 @@ public class WebFirehoseFactory implements FirehoseFactory @Override public boolean hasMore() { - return !(service.isTerminated()); + return !(service.isTerminated()) && queue.size() > 0; } @@ -91,18 +100,14 @@ public class WebFirehoseFactory implements FirehoseFactory public InputRow nextRow() { try { - Map processedMap = processMap(queue.take()); - DateTime date = TimestampParser.createTimestampParser("auto") - .apply(processedMap.get(timeDimension).toString()); + Map map = queue.take(); + DateTime date = TimestampParser.createTimestampParser(timeFormat) + .apply(map.get(timeDimension).toString()); long seconds = (long) date.getMillis(); - //the parser doesn't check for posix. Only expects iso or millis. This checks for posix - if (new DateTime(seconds * 1000).getYear() == new DateTime().getYear()) { - seconds = (long) date.getMillis() * 1000; - } return new MapBasedInputRow( seconds, renamedDimensions, - processedMap + map ); } catch (Exception e) { @@ -110,28 +115,6 @@ public class WebFirehoseFactory implements FirehoseFactory } } - private Map renameKeys(Map update) - { - Map renamedMap = Maps.newHashMap(); - for (int iter = 0; iter < dimensions.size(); iter++) { - if (update.get(dimensions.get(iter)) != null) { - Object obj = update.get(dimensions.get(iter)); - renamedMap.put(renamedDimensions.get(iter), obj); - } - } - if (renamedMap.get(timeDimension) == null) { - renamedMap.put(timeDimension, System.currentTimeMillis()); - } - return renamedMap; - } - - private Map processMap(Map map) - { - Map renamedMap = renameKeys(map); - return renamedMap; - } - - @Override public Runnable commit() { diff --git a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java index d14b7dd94e8..8011d52f5fd 100644 --- a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java +++ b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java @@ -1,36 +1,15 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - package druid.examples.webStream; import com.google.common.io.InputSupplier; -import com.metamx.druid.input.InputRow; import com.metamx.druid.jackson.DefaultObjectMapper; -import com.metamx.druid.realtime.firehose.Firehose; -import com.metamx.druid.realtime.firehose.FirehoseFactory; +import junit.framework.Assert; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -38,101 +17,38 @@ import java.util.concurrent.TimeUnit; public class WebFirehoseFactoryTest { - @Test - void testMalformedUrlConnect() throws Exception + private final ArrayList dimensions = new ArrayList(); + private InputSupplier testCaseSupplier; + final int QUEUE_SIZE = 2000; + BlockingQueue> queue; + String timeDimension = "t"; + DefaultObjectMapper mapper = new DefaultObjectMapper(); + Map expectedAnswer = new HashMap(); + + @BeforeClass + public void setUp() { - List dimensions = new LinkedList(); - dimensions.add("g"); - dimensions.add("c"); - dimensions.add("a"); - dimensions.add("cy"); - dimensions.add("l"); - dimensions.add("hh"); - dimensions.add("hc"); - dimensions.add("h"); - dimensions.add("u"); - dimensions.add("tz"); - dimensions.add("t"); - dimensions.add("r"); - dimensions.add("gr"); - dimensions.add("nk"); - dimensions.add("al"); - - String invalidURL = "http://invalid.url"; - FirehoseFactory test = new WebFirehoseFactory(invalidURL, dimensions, dimensions, "t"); - Firehose returnedFirehose = test.connect(); - Thread.sleep(3000); - assert returnedFirehose.hasMore() == false; - } - - - @Test - public void testUrlWithNoJsonStreamConnect() throws Exception - { - List dimensions = new LinkedList(); - dimensions.add("g"); - dimensions.add("c"); - dimensions.add("a"); - dimensions.add("cy"); - dimensions.add("l"); - dimensions.add("hh"); - dimensions.add("hc"); - dimensions.add("h"); - dimensions.add("u"); - dimensions.add("tz"); - dimensions.add("t"); - dimensions.add("r"); - dimensions.add("gr"); - dimensions.add("nk"); - dimensions.add("al"); - - String nonJsonUrl = "http://google.com"; - FirehoseFactory test = new WebFirehoseFactory(nonJsonUrl, dimensions, dimensions, "t"); - Firehose returnedFirehose = test.connect(); - Thread.sleep(3000); - assert returnedFirehose.hasMore() == false; - } - - @Test - public void correctUrlCheck() throws Exception - { - List dimensions = new LinkedList(); - dimensions.add("g"); - dimensions.add("c"); - dimensions.add("a"); - dimensions.add("cy"); - dimensions.add("l"); - dimensions.add("hh"); - dimensions.add("hc"); - dimensions.add("h"); - dimensions.add("u"); - dimensions.add("tz"); - dimensions.add("t"); - dimensions.add("r"); - dimensions.add("gr"); - dimensions.add("nk"); - dimensions.add("al"); - - String url = "http://developer.usa.gov/1usagov"; - FirehoseFactory test = new WebFirehoseFactory(url, dimensions, dimensions, "t"); - Firehose returnedFirehose = test.connect(); - Thread.sleep(3000); - assert returnedFirehose.hasMore() == true; - } - - - @Test - public void basicIngestionCheck() throws Exception - { - final int QUEUE_SIZE = 2000; - BlockingQueue> queue = new ArrayBlockingQueue>(QUEUE_SIZE); - InputSupplier testCaseSupplier = new TestCaseSupplier( + testCaseSupplier = new TestCaseSupplier( "{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }" ); - UpdateStream updateStream = new UpdateStream(testCaseSupplier, queue, new DefaultObjectMapper()); - Thread t = new Thread(updateStream); - t.start(); - Map expectedAnswer = new HashMap(); + + dimensions.add("g"); + dimensions.add("c"); + dimensions.add("a"); + dimensions.add("cy"); + dimensions.add("l"); + dimensions.add("hh"); + dimensions.add("hc"); + dimensions.add("h"); + dimensions.add("u"); + dimensions.add("tz"); + dimensions.add("t"); + dimensions.add("r"); + dimensions.add("gr"); + dimensions.add("nk"); + dimensions.add("al"); + dimensions.add("ll"); + expectedAnswer.put("a", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0"); expectedAnswer.put("c", "US"); expectedAnswer.put("nk", 1); @@ -152,50 +68,151 @@ public class WebFirehoseFactoryTest expectedAnswer.put("hc", 1368193091); expectedAnswer.put("cy", "New York"); expectedAnswer.put("ll", Arrays.asList(40.862598, -73.921799)); - Map insertedRow = queue.poll(10, TimeUnit.SECONDS); - assert expectedAnswer.equals(insertedRow); + } + @Test(expectedExceptions = UnknownHostException.class) + public void checkInvalidUrl() throws Exception + { + + String invalidURL = "http://invalid.url"; + WebJsonSupplier supplier = new WebJsonSupplier(invalidURL); + supplier.getInput(); + } @Test - public void renameDimensionsCheck() throws Exception + public void basicIngestionCheck() throws Exception { - List dimensions = new ArrayList(); - dimensions.add("bitly_hash"); - dimensions.add("country"); - dimensions.add("user"); - dimensions.add("city"); - dimensions.add("encoding_user_login"); - dimensions.add("short_url"); - dimensions.add("timestamp_hash"); - dimensions.add("user_bitly_hash"); - dimensions.add("url"); - dimensions.add("timezone"); - dimensions.add("time"); - dimensions.add("referring_url"); - dimensions.add("geo_region"); - dimensions.add("known_users"); - dimensions.add("accept_language"); - - WebFirehoseFactory webbie = new WebFirehoseFactory( - "http://developer.usa.gov/1usagov", + queue = new ArrayBlockingQueue>(QUEUE_SIZE); + UpdateStream updateStream = new UpdateStream( + testCaseSupplier, + queue, + mapper, dimensions, dimensions, - "time" + timeDimension ); - Firehose webbieHose = webbie.connect(); - final int QUEUE_SIZE = 2000; - BlockingQueue> queue = new ArrayBlockingQueue>(QUEUE_SIZE); - InputSupplier testCaseSupplier = new TestCaseSupplier( - "{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"kw\": \"spcnws\", \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }" - ); - UpdateStream updateStream = new UpdateStream(testCaseSupplier, queue, new DefaultObjectMapper()); Thread t = new Thread(updateStream); t.start(); - InputRow row = webbieHose.nextRow(); - assert row.getDimensions().equals(dimensions); - + Map insertedRow = queue.poll(10, TimeUnit.SECONDS); + Assert.assertEquals(expectedAnswer, insertedRow); } + //If a timestamp is missing, we should throw away the event + @Test + public void missingTimeStampCheck() + { + queue = new ArrayBlockingQueue>(QUEUE_SIZE); + InputSupplier testCaseSupplier = new TestCaseSupplier( + "{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }" + ); + UpdateStream updateStream = new UpdateStream( + testCaseSupplier, + queue, + mapper, + dimensions, + dimensions, + timeDimension + ); + updateStream.run(); + Assert.assertEquals(queue.size(), 0); + } + + //If any other value is missing, we should still add the event and process it properly + @Test + public void otherNullValueCheck() throws Exception + { + Map expectedAnswer = new HashMap(); + expectedAnswer.put("a", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0"); + expectedAnswer.put("nk", 1); + expectedAnswer.put("tz", "America/New_York"); + expectedAnswer.put("gr", "NY"); + expectedAnswer.put("g", "1Chgyj"); + expectedAnswer.put("h", "15vMQjX"); + expectedAnswer.put("l", "o_d63rn9enb"); + expectedAnswer.put("al", "en-US,en;q=0.5"); + expectedAnswer.put("hh", "1.usa.gov"); + expectedAnswer.put( + "r", + "http://forecast.weather.gov/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX" + ); + expectedAnswer.put("u", "http://www.spc.ncep.noaa.gov/"); + expectedAnswer.put("t", 1372121562); + expectedAnswer.put("hc", 1368193091); + expectedAnswer.put("cy", "New York"); + expectedAnswer.put("ll", Arrays.asList(40.862598, -73.921799)); + queue = new ArrayBlockingQueue>(QUEUE_SIZE); + testCaseSupplier = new TestCaseSupplier( + "{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }" + ); + UpdateStream updateStream = new UpdateStream( + testCaseSupplier, + queue, + mapper, + dimensions, + dimensions, + timeDimension + ); + Thread t = new Thread(updateStream); + t.start(); + Map insertedRow = queue.poll(10, TimeUnit.SECONDS); + Assert.assertEquals(expectedAnswer, insertedRow); + } + + @Test + public void checkRenameKeys() throws Exception + { + Map expectedAnswer = new HashMap(); + queue = new ArrayBlockingQueue>(QUEUE_SIZE); + ArrayList renamedDimensions = new ArrayList(); + renamedDimensions.add("bitly_hash"); + renamedDimensions.add("country"); + renamedDimensions.add("user"); + renamedDimensions.add("city"); + renamedDimensions.add("encoding_user_login"); + renamedDimensions.add("short_url"); + renamedDimensions.add("timestamp_hash"); + renamedDimensions.add("user_bitly_hash"); + renamedDimensions.add("url"); + renamedDimensions.add("timezone"); + renamedDimensions.add("time"); + renamedDimensions.add("referring_url"); + renamedDimensions.add("geo_region"); + renamedDimensions.add("known_users"); + renamedDimensions.add("accept_language"); + renamedDimensions.add("latitude_longitude"); + + expectedAnswer.put("user", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0"); + expectedAnswer.put("country", "US"); + expectedAnswer.put("known_users", 1); + expectedAnswer.put("timezone", "America/New_York"); + expectedAnswer.put("geo_region", "NY"); + expectedAnswer.put("bitly_hash", "1Chgyj"); + expectedAnswer.put("user_bitly_hash", "15vMQjX"); + expectedAnswer.put("encoding_user_login", "o_d63rn9enb"); + expectedAnswer.put("accept_language", "en-US,en;q=0.5"); + expectedAnswer.put("short_url", "1.usa.gov"); + expectedAnswer.put( + "referring_url", + "http://forecast.weather.gov/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX" + ); + expectedAnswer.put("url", "http://www.spc.ncep.noaa.gov/"); + expectedAnswer.put("time", 1372121562); + expectedAnswer.put("timestamp_hash", 1368193091); + expectedAnswer.put("city", "New York"); + expectedAnswer.put("latitude_longitude", Arrays.asList(40.862598, -73.921799)); + + UpdateStream updateStream = new UpdateStream( + testCaseSupplier, + queue, + mapper, + dimensions, + renamedDimensions, + timeDimension + ); + updateStream.run(); + Map inputRow = queue.poll(10, TimeUnit.SECONDS); + Assert.assertEquals(expectedAnswer, inputRow); + } } diff --git a/examples/src/main/java/druid/examples/webStream/WebJsonSupplier.java b/examples/src/main/java/druid/examples/webStream/WebJsonSupplier.java index e103fd879b8..9e69a1649f2 100644 --- a/examples/src/main/java/druid/examples/webStream/WebJsonSupplier.java +++ b/examples/src/main/java/druid/examples/webStream/WebJsonSupplier.java @@ -37,9 +37,8 @@ public class WebJsonSupplier implements InputSupplier private String urlString; private URL url; - public WebJsonSupplier(List dimensions, String urlString) + public WebJsonSupplier(String urlString) { - this.dimensions = dimensions; this.urlString = urlString; try { this.url = new URL(urlString); @@ -56,8 +55,6 @@ public class WebJsonSupplier implements InputSupplier URL url = new URL(urlString); URLConnection connection = url.openConnection(); connection.setDoInput(true); - //connection.setDoOutput(true); - BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream())); return reader; }