From adc00e6576c8b7e1bcf89a3054b452ab9312c153 Mon Sep 17 00:00:00 2001
From: Dhruv Parthasarathy
Date: Mon, 8 Jul 2013 12:13:25 -0700
Subject: [PATCH] fixed with Erics suggestions
---
.../examples/flights/FlightsConverter.java | 3 +-
.../flights/FlightsFirehoseFactory.java | 9 +-
.../examples/rand/RandomFirehoseFactory.java | 140 ++++----
.../TwitterSpritzerFirehoseFactory.java | 42 +--
.../examples/webStream/UpdateStream.java | 37 +-
.../webStream/WebFirehoseFactory.java | 61 ++--
.../webStream/WebFirehoseFactoryTest.java | 315 +++++++++---------
.../examples/webStream/WebJsonSupplier.java | 5 +-
8 files changed, 311 insertions(+), 301 deletions(-)
diff --git a/examples/src/main/java/druid/examples/flights/FlightsConverter.java b/examples/src/main/java/druid/examples/flights/FlightsConverter.java
index 730eea0bd92..53ea6638e60 100644
--- a/examples/src/main/java/druid/examples/flights/FlightsConverter.java
+++ b/examples/src/main/java/druid/examples/flights/FlightsConverter.java
@@ -114,7 +114,8 @@ public class FlightsConverter
if (value.equals("NA")) {
event.put(metricDimension, 0);
- } else {
+ }
+ else {
event.put(metricDimension, Integer.parseInt(value));
}
}
diff --git a/examples/src/main/java/druid/examples/flights/FlightsFirehoseFactory.java b/examples/src/main/java/druid/examples/flights/FlightsFirehoseFactory.java
index 07eb856425e..66b8de4450c 100644
--- a/examples/src/main/java/druid/examples/flights/FlightsFirehoseFactory.java
+++ b/examples/src/main/java/druid/examples/flights/FlightsFirehoseFactory.java
@@ -80,7 +80,8 @@ public class FlightsFirehoseFactory implements FirehoseFactory
try {
if (line != null) {
return true;
- } else if (in != null) {
+ }
+ else if (in != null) {
line = in.readLine();
if (line == null) {
@@ -89,14 +90,16 @@ public class FlightsFirehoseFactory implements FirehoseFactory
}
return true;
- } else if (files.hasNext()) {
+ }
+ else if (files.hasNext()) {
final File nextFile = files.next();
if (nextFile.getName().endsWith(".gz")) {
in = new BufferedReader(
new InputStreamReader(new GZIPInputStream(new FileInputStream(nextFile)), Charsets.UTF_8)
);
- } else {
+ }
+ else {
in = new BufferedReader(new FileReader(nextFile));
}
return hasMore();
diff --git a/examples/src/main/java/druid/examples/rand/RandomFirehoseFactory.java b/examples/src/main/java/druid/examples/rand/RandomFirehoseFactory.java
index 54010e822ed..71a0c1fbb92 100644
--- a/examples/src/main/java/druid/examples/rand/RandomFirehoseFactory.java
+++ b/examples/src/main/java/druid/examples/rand/RandomFirehoseFactory.java
@@ -35,89 +35,81 @@ import static java.lang.Thread.sleep;
* the moment an event is delivered.)
* Values are offset by adding the modulus of the token number to the random number
* so that token values have distinct, non-overlapping ranges.
- *
+ *
*
* Example spec file:
*
- * [{
- * "schema" : { "dataSource":"randseq",
- * "aggregators":[ {"type":"count", "name":"events"},
- * {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
- * "indexGranularity":"minute",
- * "shardSpec" : { "type": "none" } },
- * "config" : { "maxRowsInMemory" : 50000,
- * "intermediatePersistPeriod" : "PT2m" },
- *
- * "firehose" : { "type" : "rand",
- * "sleepUsec": 100000,
- * "maxGeneratedRows" : 5000000,
- * "seed" : 0,
- * "nTokens" : 19,
- * "nPerSleep" : 3
- * },
- *
- * "plumber" : { "type" : "realtime",
- * "windowPeriod" : "PT5m",
- * "segmentGranularity":"hour",
- * "basePersistDirectory" : "/tmp/realtime/basePersist" }
- * }]
+ [{
+ "schema" : { "dataSource":"randseq",
+ "aggregators":[ {"type":"count", "name":"events"},
+ {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
+ "indexGranularity":"minute",
+ "shardSpec" : { "type": "none" } },
+ "config" : { "maxRowsInMemory" : 50000,
+ "intermediatePersistPeriod" : "PT2m" },
+
+ "firehose" : { "type" : "rand",
+ "sleepUsec": 100000,
+ "maxGeneratedRows" : 5000000,
+ "seed" : 0,
+ "nTokens" : 19,
+ "nPerSleep" : 3
+ },
+
+ "plumber" : { "type" : "realtime",
+ "windowPeriod" : "PT5m",
+ "segmentGranularity":"hour",
+ "basePersistDirectory" : "/tmp/realtime/basePersist" }
+ }]
*
- *
+ *
* Example query using POST to /druid/v2/ (where UTC date and time MUST include the current hour):
*
- * {
- * "queryType": "groupBy",
- * "dataSource": "randSeq",
- * "granularity": "all",
- * "dimensions": [],
- * "aggregations":[
- * { "type": "count", "name": "rows"},
- * { "type": "doubleSum", "fieldName": "events", "name": "e"},
- * { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
- * ],
- * "postAggregations":[
- * { "type":"arithmetic",
- * "name":"avg_random",
- * "fn":"/",
- * "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
- * {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
- * ],
- * "intervals":["2012-10-01T00:00/2020-01-01T00"]
- * }
+ {
+ "queryType": "groupBy",
+ "dataSource": "randSeq",
+ "granularity": "all",
+ "dimensions": [],
+ "aggregations":[
+ { "type": "count", "name": "rows"},
+ { "type": "doubleSum", "fieldName": "events", "name": "e"},
+ { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
+ ],
+ "postAggregations":[
+ { "type":"arithmetic",
+ "name":"avg_random",
+ "fn":"/",
+ "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
+ {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
+ ],
+ "intervals":["2012-10-01T00:00/2020-01-01T00"]
+ }
*
*/
@JsonTypeName("rand")
public class RandomFirehoseFactory implements FirehoseFactory
{
private static final Logger log = new Logger(RandomFirehoseFactory.class);
- /**
- * msec to sleep before generating a new row; if this and delayNsec are 0, then go as fast as possible.
+ /** msec to sleep before generating a new row; if this and delayNsec are 0, then go as fast as possible.
* json param sleepUsec (microseconds) is used to initialize this.
*/
private final long delayMsec;
- /**
- * nsec to sleep before generating a new row; if this and delayMsec are 0, then go as fast as possible.
+ /** nsec to sleep before generating a new row; if this and delayMsec are 0, then go as fast as possible.
* json param sleepUsec (microseconds) is used to initialize this.
- */
+ */
private final int delayNsec;
- /**
- * max rows to generate, -1 is infinite, 0 means nothing is generated; use this to prevent
- * infinite space consumption or to see what happens when a Firehose stops delivering
- * values, or to have hasMore() return false.
+ /** max rows to generate, -1 is infinite, 0 means nothing is generated; use this to prevent
+ * infinite space consumption or to see what happens when a Firehose stops delivering
+ * values, or to have hasMore() return false.
*/
private final long maxGeneratedRows;
- /**
- * seed for random number generator; if 0, then no seed is used.
- */
+ /** seed for random number generator; if 0, then no seed is used. */
private final long seed;
- /**
- * number of tokens to randomly associate with values (no heap limits). This can be used to
+ /** number of tokens to randomly associate with values (no heap limits). This can be used to
* stress test the number of tokens.
*/
private final int nTokens;
- /**
- * Number of token events per sleep interval.
- */
+ /** Number of token events per sleep interval. */
private final int nPerSleep;
@JsonCreator
@@ -132,30 +124,29 @@ public class RandomFirehoseFactory implements FirehoseFactory
long nsec = (sleepUsec > 0) ? sleepUsec * 1000L : 0;
long msec = nsec / 1000000L;
this.delayMsec = msec;
- this.delayNsec = (int) (nsec - (msec * 1000000L));
+ this.delayNsec = (int)(nsec - (msec * 1000000L));
this.maxGeneratedRows = maxGeneratedRows;
this.seed = seed;
this.nTokens = nTokens;
this.nPerSleep = nPerSleep;
if (nTokens <= 0) {
- log.warn("nTokens parameter " + nTokens + " ignored; must be greater than or equal to 1");
+ log.warn("nTokens parameter " + nTokens +" ignored; must be greater than or equal to 1");
nTokens = 1;
}
if (nPerSleep <= 0) {
- log.warn("nPerSleep parameter " + nPerSleep + " ignored; must be greater than or equal to 1");
+ log.warn("nPerSleep parameter " + nPerSleep +" ignored; must be greater than or equal to 1");
nPerSleep = 1;
}
log.info("maxGeneratedRows=" + maxGeneratedRows);
- log.info("seed=" + ((seed == 0L) ? "random value" : seed));
+ log.info("seed=" + ( (seed == 0L) ? "random value" : seed ));
log.info("nTokens=" + nTokens);
log.info("nPerSleep=" + nPerSleep);
- double dmsec = (double) delayMsec + ((double) this.delayNsec) / 1000000.;
+ double dmsec = (double)delayMsec + ((double)this.delayNsec)/1000000.;
if (dmsec > 0.0) {
log.info("sleep period=" + dmsec + "msec");
- log.info(
- "approximate max rate of record generation=" + (nPerSleep * 1000. / dmsec) + "/sec" +
- " or " + (60. * nPerSleep * 1000. / dmsec) + "/minute"
- );
+ log.info("approximate max rate of record generation=" + (nPerSleep * 1000./dmsec) + "/sec" +
+ " or " + (60. * nPerSleep * 1000./dmsec) + "/minute"
+ );
} else {
log.info("sleep period= NONE");
log.info("approximate max rate of record generation= as fast as possible");
@@ -179,7 +170,7 @@ public class RandomFirehoseFactory implements FirehoseFactory
@Override
public boolean hasMore()
{
- if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows) {
+ if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows) {
return waitIfmaxGeneratedRows;
} else {
return true; // there are always more random numbers
@@ -193,7 +184,7 @@ public class RandomFirehoseFactory implements FirehoseFactory
final long nth = (rowCount % nTokens) + 1;
long sleepMsec = delayMsec;
// all done?
- if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows && waitIfmaxGeneratedRows) {
+ if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows && waitIfmaxGeneratedRows) {
// sleep a long time instead of terminating
sleepMsec = 2000000000L;
}
@@ -202,8 +193,7 @@ public class RandomFirehoseFactory implements FirehoseFactory
if (modulus == 0) {
sleep(sleepMsec, delayNsec);
}
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
throw new RuntimeException("InterruptedException");
}
}
@@ -212,7 +202,7 @@ public class RandomFirehoseFactory implements FirehoseFactory
}
final Map theMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
- theMap.put("inColumn", anotherRand((int) nth));
+ theMap.put("inColumn", anotherRand((int)nth));
theMap.put("target", ("a" + nth));
return new MapBasedInputRow(System.currentTimeMillis(), dimensions, theMap);
}
@@ -220,7 +210,7 @@ public class RandomFirehoseFactory implements FirehoseFactory
private Float anotherRand(int scale)
{
double f = rand.nextDouble(); // [0.0,1.0]
- return new Float(f + (double) scale);
+ return new Float(f + (double)scale);
}
@Override
diff --git a/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java b/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java
index 4b588ec826c..e87b5e44de2 100644
--- a/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java
+++ b/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java
@@ -11,13 +11,13 @@ import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import twitter4j.ConnectionLifeCycleListener;
import twitter4j.HashtagEntity;
-import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.User;
+import twitter4j.StallWarning;
import java.io.IOException;
import java.util.Arrays;
@@ -49,23 +49,21 @@ import static java.lang.Thread.sleep;
* is UTC):
*
*
- *
- *
+ *
+ *
* Notes on twitter.com HTTP (REST) API: v1.0 will be disabled around 2013-03 so v1.1 should be used;
* twitter4j 3.0 (not yet released) will support the v1.1 api.
* Specifically, we should be using https://stream.twitter.com/1.1/statuses/sample.json
* See: http://jira.twitter4j.org/browse/TFJ-186
- *
- * Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString()
- * can have number format exceptions), so it might be necessary to extract raw json and process it
- * separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON();
- * com.fasterxml.jackson.databind.ObjectMapper should be used to parse.
*
+ * Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString()
+ * can have number format exceptions), so it might be necessary to extract raw json and process it
+ * separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON();
+ * com.fasterxml.jackson.databind.ObjectMapper should be used to parse.
* @author pbaclace
*/
@JsonTypeName("twitzer")
-public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
-{
+public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
private static final Logger log = new Logger(TwitterSpritzerFirehoseFactory.class);
/**
* max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent
@@ -96,8 +94,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
@Override
public Firehose connect() throws IOException
{
- final ConnectionLifeCycleListener connectionLifeCycleListener = new ConnectionLifeCycleListener()
- {
+ final ConnectionLifeCycleListener connectionLifeCycleListener = new ConnectionLifeCycleListener() {
@Override
public void onConnect()
{
@@ -137,8 +134,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
//
twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.addConnectionLifeCycleListener(connectionLifeCycleListener);
- statusListener = new StatusListener()
- { // This is what really gets called to deliver stuff from twitter4j
+ statusListener = new StatusListener() { // This is what really gets called to deliver stuff from twitter4j
@Override
public void onStatus(Status status)
{
@@ -151,8 +147,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
if (!success) {
log.warn("queue too slow!");
}
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
throw new RuntimeException("InterruptedException", e);
}
}
@@ -184,8 +179,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
}
@Override
- public void onStallWarning(StallWarning warning)
- {
+ public void onStallWarning(StallWarning warning) {
System.out.println("Got stall warning:" + warning);
}
};
@@ -194,11 +188,9 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
twitterStream.sample(); // creates a generic StatusStream
log.info("returned from sample()");
- return new Firehose()
- {
+ return new Firehose() {
- private final Runnable doNothingRunnable = new Runnable()
- {
+ private final Runnable doNothingRunnable = new Runnable() {
public void run()
{
}
@@ -248,8 +240,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
try {
log.info("reached limit, sleeping a long time...");
sleep(2000000000L);
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
throw new RuntimeException("InterruptedException", e);
}
} else {
@@ -263,8 +254,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
Status status;
try {
status = queue.take();
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
throw new RuntimeException("InterruptedException", e);
}
diff --git a/examples/src/main/java/druid/examples/webStream/UpdateStream.java b/examples/src/main/java/druid/examples/webStream/UpdateStream.java
index 3cb7e5ef27a..57d8fc67311 100644
--- a/examples/src/main/java/druid/examples/webStream/UpdateStream.java
+++ b/examples/src/main/java/druid/examples/webStream/UpdateStream.java
@@ -22,10 +22,12 @@ package druid.examples.webStream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
import com.google.common.io.InputSupplier;
import com.metamx.emitter.EmittingLogger;
import java.io.BufferedReader;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -39,11 +41,17 @@ public class UpdateStream implements Runnable
private final InputSupplier supplier;
private final BlockingQueue