diff --git a/examples/pom.xml b/examples/pom.xml index 4a1cbc929ac..375d7963e7d 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 com.metamx.druid druid-examples diff --git a/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java b/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java index 54d4a492584..46366eb32df 100644 --- a/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java +++ b/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java @@ -13,10 +13,10 @@ import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.log.LogLevelAdjuster; import com.metamx.druid.realtime.RealtimeNode; import com.metamx.druid.realtime.SegmentPublisher; -import druid.examples.webStream.WebFirehoseFactory; import druid.examples.flights.FlightsFirehoseFactory; import druid.examples.rand.RandomFirehoseFactory; import druid.examples.twitter.TwitterSpritzerFirehoseFactory; +import druid.examples.webStream.WebFirehoseFactory; import java.io.File; import java.io.IOException; @@ -53,7 +53,7 @@ public class RealtimeStandaloneMain rn.setDataSegmentPusher(new NoopDataSegmentPusher()); rn.setServerView(new NoopServerView()); rn.setInventoryView(new NoopInventoryView()); - + Runtime.getRuntime().addShutdownHook( new Thread( new Runnable() diff --git a/examples/src/main/java/druid/examples/flights/FlightsConverter.java b/examples/src/main/java/druid/examples/flights/FlightsConverter.java index 53ea6638e60..730eea0bd92 100644 --- a/examples/src/main/java/druid/examples/flights/FlightsConverter.java +++ b/examples/src/main/java/druid/examples/flights/FlightsConverter.java @@ -114,8 +114,7 @@ public class FlightsConverter if (value.equals("NA")) { event.put(metricDimension, 0); - } - else { + } else { event.put(metricDimension, Integer.parseInt(value)); } } diff --git a/examples/src/main/java/druid/examples/flights/FlightsFirehoseFactory.java b/examples/src/main/java/druid/examples/flights/FlightsFirehoseFactory.java index 66b8de4450c..07eb856425e 100644 --- a/examples/src/main/java/druid/examples/flights/FlightsFirehoseFactory.java +++ b/examples/src/main/java/druid/examples/flights/FlightsFirehoseFactory.java @@ -80,8 +80,7 @@ public class FlightsFirehoseFactory implements FirehoseFactory try { if (line != null) { return true; - } - else if (in != null) { + } else if (in != null) { line = in.readLine(); if (line == null) { @@ -90,16 +89,14 @@ public class FlightsFirehoseFactory implements FirehoseFactory } return true; - } - else if (files.hasNext()) { + } else if (files.hasNext()) { final File nextFile = files.next(); if (nextFile.getName().endsWith(".gz")) { in = new BufferedReader( new InputStreamReader(new GZIPInputStream(new FileInputStream(nextFile)), Charsets.UTF_8) ); - } - else { + } else { in = new BufferedReader(new FileReader(nextFile)); } return hasMore(); diff --git a/examples/src/main/java/druid/examples/rand/RandomFirehoseFactory.java b/examples/src/main/java/druid/examples/rand/RandomFirehoseFactory.java index 71a0c1fbb92..54010e822ed 100644 --- a/examples/src/main/java/druid/examples/rand/RandomFirehoseFactory.java +++ b/examples/src/main/java/druid/examples/rand/RandomFirehoseFactory.java @@ -35,81 +35,89 @@ import static java.lang.Thread.sleep; * the moment an event is delivered.) * Values are offset by adding the modulus of the token number to the random number * so that token values have distinct, non-overlapping ranges. - * + *

*

* Example spec file: *
- [{
-   "schema" : { "dataSource":"randseq",
-                "aggregators":[ {"type":"count", "name":"events"},
- 	       		                    {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
-                "indexGranularity":"minute",
- 	              "shardSpec" : { "type": "none" } },
-   "config" : { "maxRowsInMemory" : 50000,
-                "intermediatePersistPeriod" : "PT2m" },
-
-   "firehose" : { "type" : "rand",
-                  "sleepUsec": 100000,
-                  "maxGeneratedRows" : 5000000,
-                  "seed" : 0,
-                  "nTokens" : 19,
-                  "nPerSleep" : 3
-                 },
-
-   "plumber" : { "type" : "realtime",
-                 "windowPeriod" : "PT5m",
-                 "segmentGranularity":"hour",
-                 "basePersistDirectory" : "/tmp/realtime/basePersist" }
- }]
- * 
+ * [{ + * "schema" : { "dataSource":"randseq", + * "aggregators":[ {"type":"count", "name":"events"}, + * {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ], + * "indexGranularity":"minute", + * "shardSpec" : { "type": "none" } }, + * "config" : { "maxRowsInMemory" : 50000, + * "intermediatePersistPeriod" : "PT2m" }, * + * "firehose" : { "type" : "rand", + * "sleepUsec": 100000, + * "maxGeneratedRows" : 5000000, + * "seed" : 0, + * "nTokens" : 19, + * "nPerSleep" : 3 + * }, + * + * "plumber" : { "type" : "realtime", + * "windowPeriod" : "PT5m", + * "segmentGranularity":"hour", + * "basePersistDirectory" : "/tmp/realtime/basePersist" } + * }] + * + *

* Example query using POST to /druid/v2/ (where UTC date and time MUST include the current hour): *

- {
-     "queryType": "groupBy",
-     "dataSource": "randSeq",
-     "granularity": "all",
-     "dimensions": [],
-     "aggregations":[
-     { "type": "count", "name": "rows"},
-     { "type": "doubleSum", "fieldName": "events", "name": "e"},
-     { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
-     ],
-     "postAggregations":[
-     {  "type":"arithmetic",
-        "name":"avg_random",
-        "fn":"/",
-        "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
-                   {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
-     ],
-     "intervals":["2012-10-01T00:00/2020-01-01T00"]
- }
+ * {
+ * "queryType": "groupBy",
+ * "dataSource": "randSeq",
+ * "granularity": "all",
+ * "dimensions": [],
+ * "aggregations":[
+ * { "type": "count", "name": "rows"},
+ * { "type": "doubleSum", "fieldName": "events", "name": "e"},
+ * { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
+ * ],
+ * "postAggregations":[
+ * {  "type":"arithmetic",
+ * "name":"avg_random",
+ * "fn":"/",
+ * "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
+ * {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
+ * ],
+ * "intervals":["2012-10-01T00:00/2020-01-01T00"]
+ * }
  * 
*/ @JsonTypeName("rand") public class RandomFirehoseFactory implements FirehoseFactory { private static final Logger log = new Logger(RandomFirehoseFactory.class); - /** msec to sleep before generating a new row; if this and delayNsec are 0, then go as fast as possible. + /** + * msec to sleep before generating a new row; if this and delayNsec are 0, then go as fast as possible. * json param sleepUsec (microseconds) is used to initialize this. */ private final long delayMsec; - /** nsec to sleep before generating a new row; if this and delayMsec are 0, then go as fast as possible. + /** + * nsec to sleep before generating a new row; if this and delayMsec are 0, then go as fast as possible. * json param sleepUsec (microseconds) is used to initialize this. - */ + */ private final int delayNsec; - /** max rows to generate, -1 is infinite, 0 means nothing is generated; use this to prevent - * infinite space consumption or to see what happens when a Firehose stops delivering - * values, or to have hasMore() return false. + /** + * max rows to generate, -1 is infinite, 0 means nothing is generated; use this to prevent + * infinite space consumption or to see what happens when a Firehose stops delivering + * values, or to have hasMore() return false. */ private final long maxGeneratedRows; - /** seed for random number generator; if 0, then no seed is used. */ + /** + * seed for random number generator; if 0, then no seed is used. + */ private final long seed; - /** number of tokens to randomly associate with values (no heap limits). This can be used to + /** + * number of tokens to randomly associate with values (no heap limits). This can be used to * stress test the number of tokens. */ private final int nTokens; - /** Number of token events per sleep interval. */ + /** + * Number of token events per sleep interval. + */ private final int nPerSleep; @JsonCreator @@ -124,29 +132,30 @@ public class RandomFirehoseFactory implements FirehoseFactory long nsec = (sleepUsec > 0) ? sleepUsec * 1000L : 0; long msec = nsec / 1000000L; this.delayMsec = msec; - this.delayNsec = (int)(nsec - (msec * 1000000L)); + this.delayNsec = (int) (nsec - (msec * 1000000L)); this.maxGeneratedRows = maxGeneratedRows; this.seed = seed; this.nTokens = nTokens; this.nPerSleep = nPerSleep; if (nTokens <= 0) { - log.warn("nTokens parameter " + nTokens +" ignored; must be greater than or equal to 1"); + log.warn("nTokens parameter " + nTokens + " ignored; must be greater than or equal to 1"); nTokens = 1; } if (nPerSleep <= 0) { - log.warn("nPerSleep parameter " + nPerSleep +" ignored; must be greater than or equal to 1"); + log.warn("nPerSleep parameter " + nPerSleep + " ignored; must be greater than or equal to 1"); nPerSleep = 1; } log.info("maxGeneratedRows=" + maxGeneratedRows); - log.info("seed=" + ( (seed == 0L) ? "random value" : seed )); + log.info("seed=" + ((seed == 0L) ? "random value" : seed)); log.info("nTokens=" + nTokens); log.info("nPerSleep=" + nPerSleep); - double dmsec = (double)delayMsec + ((double)this.delayNsec)/1000000.; + double dmsec = (double) delayMsec + ((double) this.delayNsec) / 1000000.; if (dmsec > 0.0) { log.info("sleep period=" + dmsec + "msec"); - log.info("approximate max rate of record generation=" + (nPerSleep * 1000./dmsec) + "/sec" + - " or " + (60. * nPerSleep * 1000./dmsec) + "/minute" - ); + log.info( + "approximate max rate of record generation=" + (nPerSleep * 1000. / dmsec) + "/sec" + + " or " + (60. * nPerSleep * 1000. / dmsec) + "/minute" + ); } else { log.info("sleep period= NONE"); log.info("approximate max rate of record generation= as fast as possible"); @@ -170,7 +179,7 @@ public class RandomFirehoseFactory implements FirehoseFactory @Override public boolean hasMore() { - if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows) { + if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows) { return waitIfmaxGeneratedRows; } else { return true; // there are always more random numbers @@ -184,7 +193,7 @@ public class RandomFirehoseFactory implements FirehoseFactory final long nth = (rowCount % nTokens) + 1; long sleepMsec = delayMsec; // all done? - if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows && waitIfmaxGeneratedRows) { + if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows && waitIfmaxGeneratedRows) { // sleep a long time instead of terminating sleepMsec = 2000000000L; } @@ -193,7 +202,8 @@ public class RandomFirehoseFactory implements FirehoseFactory if (modulus == 0) { sleep(sleepMsec, delayNsec); } - } catch (InterruptedException e) { + } + catch (InterruptedException e) { throw new RuntimeException("InterruptedException"); } } @@ -202,7 +212,7 @@ public class RandomFirehoseFactory implements FirehoseFactory } final Map theMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - theMap.put("inColumn", anotherRand((int)nth)); + theMap.put("inColumn", anotherRand((int) nth)); theMap.put("target", ("a" + nth)); return new MapBasedInputRow(System.currentTimeMillis(), dimensions, theMap); } @@ -210,7 +220,7 @@ public class RandomFirehoseFactory implements FirehoseFactory private Float anotherRand(int scale) { double f = rand.nextDouble(); // [0.0,1.0] - return new Float(f + (double)scale); + return new Float(f + (double) scale); } @Override diff --git a/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java b/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java index e87b5e44de2..4b588ec826c 100644 --- a/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java +++ b/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java @@ -11,13 +11,13 @@ import com.metamx.druid.realtime.firehose.Firehose; import com.metamx.druid.realtime.firehose.FirehoseFactory; import twitter4j.ConnectionLifeCycleListener; import twitter4j.HashtagEntity; +import twitter4j.StallWarning; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.User; -import twitter4j.StallWarning; import java.io.IOException; import java.util.Arrays; @@ -49,21 +49,23 @@ import static java.lang.Thread.sleep; * is UTC): *
  * 
- * - * + *

+ *

* Notes on twitter.com HTTP (REST) API: v1.0 will be disabled around 2013-03 so v1.1 should be used; * twitter4j 3.0 (not yet released) will support the v1.1 api. * Specifically, we should be using https://stream.twitter.com/1.1/statuses/sample.json * See: http://jira.twitter4j.org/browse/TFJ-186 + *

+ * Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString() + * can have number format exceptions), so it might be necessary to extract raw json and process it + * separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON(); + * com.fasterxml.jackson.databind.ObjectMapper should be used to parse. * - * Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString() - * can have number format exceptions), so it might be necessary to extract raw json and process it - * separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON(); - * com.fasterxml.jackson.databind.ObjectMapper should be used to parse. * @author pbaclace */ @JsonTypeName("twitzer") -public class TwitterSpritzerFirehoseFactory implements FirehoseFactory { +public class TwitterSpritzerFirehoseFactory implements FirehoseFactory +{ private static final Logger log = new Logger(TwitterSpritzerFirehoseFactory.class); /** * max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent @@ -94,7 +96,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory { @Override public Firehose connect() throws IOException { - final ConnectionLifeCycleListener connectionLifeCycleListener = new ConnectionLifeCycleListener() { + final ConnectionLifeCycleListener connectionLifeCycleListener = new ConnectionLifeCycleListener() + { @Override public void onConnect() { @@ -134,7 +137,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory { // twitterStream = new TwitterStreamFactory().getInstance(); twitterStream.addConnectionLifeCycleListener(connectionLifeCycleListener); - statusListener = new StatusListener() { // This is what really gets called to deliver stuff from twitter4j + statusListener = new StatusListener() + { // This is what really gets called to deliver stuff from twitter4j @Override public void onStatus(Status status) { @@ -147,7 +151,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory { if (!success) { log.warn("queue too slow!"); } - } catch (InterruptedException e) { + } + catch (InterruptedException e) { throw new RuntimeException("InterruptedException", e); } } @@ -179,7 +184,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory { } @Override - public void onStallWarning(StallWarning warning) { + public void onStallWarning(StallWarning warning) + { System.out.println("Got stall warning:" + warning); } }; @@ -188,9 +194,11 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory { twitterStream.sample(); // creates a generic StatusStream log.info("returned from sample()"); - return new Firehose() { + return new Firehose() + { - private final Runnable doNothingRunnable = new Runnable() { + private final Runnable doNothingRunnable = new Runnable() + { public void run() { } @@ -240,7 +248,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory { try { log.info("reached limit, sleeping a long time..."); sleep(2000000000L); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { throw new RuntimeException("InterruptedException", e); } } else { @@ -254,7 +263,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory { Status status; try { status = queue.take(); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { throw new RuntimeException("InterruptedException", e); } diff --git a/examples/src/main/java/druid/examples/webStream/TestCaseSupplier.java b/examples/src/main/java/druid/examples/webStream/TestCaseSupplier.java index 9ded259825d..abce656c97f 100644 --- a/examples/src/main/java/druid/examples/webStream/TestCaseSupplier.java +++ b/examples/src/main/java/druid/examples/webStream/TestCaseSupplier.java @@ -27,19 +27,16 @@ import java.io.StringReader; public class TestCaseSupplier implements InputSupplier { - private final String s; + private final String testString; - public TestCaseSupplier(String s) + public TestCaseSupplier(String testString) { - this.s = s; + this.testString = testString; } @Override public BufferedReader getInput() throws IOException { - StringBuilder buffer = new StringBuilder(); - buffer.append(s); - BufferedReader br = new BufferedReader(new StringReader(buffer.toString())); - return br; + return new BufferedReader(new StringReader(testString)); } } diff --git a/examples/src/main/java/druid/examples/webStream/UpdateStream.java b/examples/src/main/java/druid/examples/webStream/UpdateStream.java index 440bbc44549..626bd2f3b19 100644 --- a/examples/src/main/java/druid/examples/webStream/UpdateStream.java +++ b/examples/src/main/java/druid/examples/webStream/UpdateStream.java @@ -19,12 +19,11 @@ package druid.examples.webStream; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import com.google.common.io.InputSupplier; import com.metamx.emitter.EmittingLogger; -import org.codehaus.jackson.JsonParseException; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.type.TypeReference; import java.io.BufferedReader; import java.util.HashMap; @@ -35,17 +34,21 @@ import java.util.concurrent.TimeUnit; public class UpdateStream implements Runnable { private static final EmittingLogger log = new EmittingLogger(UpdateStream.class); - private InputSupplier supplier; - private BlockingQueue> queue; - private ObjectMapper mapper; - private final TypeReference> typeRef; private static final long queueWaitTime = 15L; + private final TypeReference> typeRef; + private final InputSupplier supplier; + private final BlockingQueue> queue; + private final ObjectMapper mapper; - public UpdateStream(InputSupplier supplier, BlockingQueue> queue) + public UpdateStream( + InputSupplier supplier, + BlockingQueue> queue, + ObjectMapper mapper + ) { this.supplier = supplier; this.queue = queue; - this.mapper = new ObjectMapper(); + this.mapper = mapper; this.typeRef = new TypeReference>() { }; @@ -60,28 +63,18 @@ public class UpdateStream implements Runnable public void run() throws RuntimeException { try { - BufferedReader reader = (BufferedReader) supplier.getInput(); + BufferedReader reader = supplier.getInput(); String line; while ((line = reader.readLine()) != null) { if (isValid(line)) { - try { - HashMap map = mapper.readValue(line, typeRef); - queue.offer(map, queueWaitTime, TimeUnit.SECONDS); - log.info("Successfully added to queue"); - } - catch (JsonParseException e) { - log.info("Invalid JSON Stream. Please check if the url returns a proper JSON stream."); - Throwables.propagate(e); - } - catch (Exception e) { - Throwables.propagate(e); - } + HashMap map = mapper.readValue(line, typeRef); + queue.offer(map, queueWaitTime, TimeUnit.SECONDS); + log.info("Successfully added to queue"); } } } catch (Exception e) { - e.printStackTrace(); - Throwables.propagate(e); + throw Throwables.propagate(e); } } diff --git a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java index 0718406c909..949e948176d 100644 --- a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java +++ b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java @@ -22,14 +22,18 @@ package druid.examples.webStream; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; +import com.metamx.common.parsers.TimestampParser; import com.metamx.druid.input.InputRow; import com.metamx.druid.input.MapBasedInputRow; +import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.realtime.firehose.Firehose; import com.metamx.druid.realtime.firehose.FirehoseFactory; import com.metamx.emitter.EmittingLogger; +import org.joda.time.DateTime; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; @@ -41,11 +45,11 @@ import java.util.concurrent.Executors; public class WebFirehoseFactory implements FirehoseFactory { private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class); + private static final int QUEUE_SIZE = 2000; private final String url; private final List dimensions; private final String timeDimension; private final List renamedDimensions; - private static final int QUEUE_SIZE = 2000; @JsonCreator @@ -53,13 +57,13 @@ public class WebFirehoseFactory implements FirehoseFactory @JsonProperty("url") String url, @JsonProperty("dimensions") List dimensions, @JsonProperty("renamedDimensions") List renamedDimensions, - @JsonProperty("timeDimension") String s + @JsonProperty("timeDimension") String timeDimension ) { this.url = url; this.dimensions = dimensions; this.renamedDimensions = renamedDimensions; - this.timeDimension = s; + this.timeDimension = timeDimension; } @Override @@ -67,17 +71,7 @@ public class WebFirehoseFactory implements FirehoseFactory { final BlockingQueue> queue = new ArrayBlockingQueue>(QUEUE_SIZE); - Runnable updateStream = new UpdateStream(new WebJsonSupplier(dimensions, url), queue); -// Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() -// { -// public void uncaughtException(Thread th, Throwable ex) -// { -// log.info("Uncaught exception: " + ex); -// } -// }; -// final Thread streamReader = new Thread(updateStream); -// streamReader.setUncaughtExceptionHandler(h); -// streamReader.start(); + Runnable updateStream = new UpdateStream(new WebJsonSupplier(dimensions, url), queue, new DefaultObjectMapper()); final ExecutorService service = Executors.newSingleThreadExecutor(); service.submit(updateStream); @@ -95,53 +89,45 @@ public class WebFirehoseFactory implements FirehoseFactory @Override public InputRow nextRow() { - if (Thread.currentThread().isInterrupted()) { - throw new RuntimeException("Interrupted, time to stop"); - } - Map update; try { - update = queue.take(); + Map processedMap = processMap(queue.take()); + DateTime date = TimestampParser.createTimestampParser("auto") + .apply(processedMap.get(timeDimension).toString()); + long seconds = (long) date.getMillis(); + //the parser doesn't check for posix. Only expects iso or millis. This checks for posix + if (new DateTime(seconds * 1000).getYear() == new DateTime().getYear()) { + seconds = (long) date.getMillis() * 1000; + } + return new MapBasedInputRow( + seconds, + renamedDimensions, + processedMap + ); } - catch (InterruptedException e) { - throw new RuntimeException("InterrutpedException", e); + catch (Exception e) { + throw Throwables.propagate(e); } - Map processedMap = processMap(update); - return new MapBasedInputRow( - ((Integer) processedMap.get(timeDimension)).longValue() * 1000, - renamedDimensions, - processedMap - ); } private Map renameKeys(Map update) { - Map renamedMap = new HashMap(); - int iter = 0; - while (iter < dimensions.size()) { + Map renamedMap = Maps.newHashMap(); + for (int iter = 0; iter < dimensions.size(); iter++) { + if (update.get(dimensions.get(iter)) == null) { + update.put(dimensions.get(iter), null); + } Object obj = update.get(dimensions.get(iter)); renamedMap.put(renamedDimensions.get(iter), obj); - iter++; + } + if (renamedMap.get(timeDimension) == null) { + renamedMap.put(timeDimension, System.currentTimeMillis()); } return renamedMap; } - private void processNullDimensions(Map map) - { - for (String key : renamedDimensions) { - if (map.get(key) == null) { - if (key.equals(timeDimension)) { - map.put(key, new Integer((int) System.currentTimeMillis() / 1000)); - } else { - map.put(key, null); - } - } - } - } - private Map processMap(Map map) { Map renamedMap = renameKeys(map); - processNullDimensions(renamedMap); return renamedMap; } @@ -156,7 +142,6 @@ public class WebFirehoseFactory implements FirehoseFactory @Override public void close() throws IOException { - log.info("CLOSING!!!"); service.shutdown(); } diff --git a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java index d960cfe92ee..d14b7dd94e8 100644 --- a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java +++ b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java @@ -21,6 +21,7 @@ package druid.examples.webStream; import com.google.common.io.InputSupplier; import com.metamx.druid.input.InputRow; +import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.realtime.firehose.Firehose; import com.metamx.druid.realtime.firehose.FirehoseFactory; import org.testng.annotations.Test; @@ -128,7 +129,7 @@ public class WebFirehoseFactoryTest InputSupplier testCaseSupplier = new TestCaseSupplier( "{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }" ); - UpdateStream updateStream = new UpdateStream(testCaseSupplier, queue); + UpdateStream updateStream = new UpdateStream(testCaseSupplier, queue, new DefaultObjectMapper()); Thread t = new Thread(updateStream); t.start(); Map expectedAnswer = new HashMap(); @@ -188,7 +189,7 @@ public class WebFirehoseFactoryTest InputSupplier testCaseSupplier = new TestCaseSupplier( "{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"kw\": \"spcnws\", \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }" ); - UpdateStream updateStream = new UpdateStream(testCaseSupplier, queue); + UpdateStream updateStream = new UpdateStream(testCaseSupplier, queue, new DefaultObjectMapper()); Thread t = new Thread(updateStream); t.start(); InputRow row = webbieHose.nextRow();