updated with further cleanliness

This commit is contained in:
Dhruv Parthasarathy 2013-06-27 19:00:34 -07:00
parent 8bc4d7c436
commit 80f4ae25b5
10 changed files with 165 additions and 172 deletions

View File

@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId> <groupId>com.metamx.druid</groupId>
<artifactId>druid-examples</artifactId> <artifactId>druid-examples</artifactId>

View File

@ -13,10 +13,10 @@ import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.log.LogLevelAdjuster; import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.realtime.RealtimeNode; import com.metamx.druid.realtime.RealtimeNode;
import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.druid.realtime.SegmentPublisher;
import druid.examples.webStream.WebFirehoseFactory;
import druid.examples.flights.FlightsFirehoseFactory; import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory; import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory; import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.webStream.WebFirehoseFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;

View File

@ -114,8 +114,7 @@ public class FlightsConverter
if (value.equals("NA")) { if (value.equals("NA")) {
event.put(metricDimension, 0); event.put(metricDimension, 0);
} } else {
else {
event.put(metricDimension, Integer.parseInt(value)); event.put(metricDimension, Integer.parseInt(value));
} }
} }

View File

@ -80,8 +80,7 @@ public class FlightsFirehoseFactory implements FirehoseFactory
try { try {
if (line != null) { if (line != null) {
return true; return true;
} } else if (in != null) {
else if (in != null) {
line = in.readLine(); line = in.readLine();
if (line == null) { if (line == null) {
@ -90,16 +89,14 @@ public class FlightsFirehoseFactory implements FirehoseFactory
} }
return true; return true;
} } else if (files.hasNext()) {
else if (files.hasNext()) {
final File nextFile = files.next(); final File nextFile = files.next();
if (nextFile.getName().endsWith(".gz")) { if (nextFile.getName().endsWith(".gz")) {
in = new BufferedReader( in = new BufferedReader(
new InputStreamReader(new GZIPInputStream(new FileInputStream(nextFile)), Charsets.UTF_8) new InputStreamReader(new GZIPInputStream(new FileInputStream(nextFile)), Charsets.UTF_8)
); );
} } else {
else {
in = new BufferedReader(new FileReader(nextFile)); in = new BufferedReader(new FileReader(nextFile));
} }
return hasMore(); return hasMore();

View File

@ -35,81 +35,89 @@ import static java.lang.Thread.sleep;
* the moment an event is delivered.) * the moment an event is delivered.)
* Values are offset by adding the modulus of the token number to the random number * Values are offset by adding the modulus of the token number to the random number
* so that token values have distinct, non-overlapping ranges. * so that token values have distinct, non-overlapping ranges.
* * <p/>
* </p> * </p>
* Example spec file: * Example spec file:
* <pre> * <pre>
[{ * [{
"schema" : { "dataSource":"randseq", * "schema" : { "dataSource":"randseq",
"aggregators":[ {"type":"count", "name":"events"}, * "aggregators":[ {"type":"count", "name":"events"},
{"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ], * {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
"indexGranularity":"minute", * "indexGranularity":"minute",
"shardSpec" : { "type": "none" } }, * "shardSpec" : { "type": "none" } },
"config" : { "maxRowsInMemory" : 50000, * "config" : { "maxRowsInMemory" : 50000,
"intermediatePersistPeriod" : "PT2m" }, * "intermediatePersistPeriod" : "PT2m" },
"firehose" : { "type" : "rand",
"sleepUsec": 100000,
"maxGeneratedRows" : 5000000,
"seed" : 0,
"nTokens" : 19,
"nPerSleep" : 3
},
"plumber" : { "type" : "realtime",
"windowPeriod" : "PT5m",
"segmentGranularity":"hour",
"basePersistDirectory" : "/tmp/realtime/basePersist" }
}]
* </pre>
* *
* "firehose" : { "type" : "rand",
* "sleepUsec": 100000,
* "maxGeneratedRows" : 5000000,
* "seed" : 0,
* "nTokens" : 19,
* "nPerSleep" : 3
* },
*
* "plumber" : { "type" : "realtime",
* "windowPeriod" : "PT5m",
* "segmentGranularity":"hour",
* "basePersistDirectory" : "/tmp/realtime/basePersist" }
* }]
* </pre>
* <p/>
* Example query using POST to /druid/v2/ (where UTC date and time MUST include the current hour): * Example query using POST to /druid/v2/ (where UTC date and time MUST include the current hour):
* <pre> * <pre>
{ * {
"queryType": "groupBy", * "queryType": "groupBy",
"dataSource": "randSeq", * "dataSource": "randSeq",
"granularity": "all", * "granularity": "all",
"dimensions": [], * "dimensions": [],
"aggregations":[ * "aggregations":[
{ "type": "count", "name": "rows"}, * { "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "events", "name": "e"}, * { "type": "doubleSum", "fieldName": "events", "name": "e"},
{ "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"} * { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
], * ],
"postAggregations":[ * "postAggregations":[
{ "type":"arithmetic", * { "type":"arithmetic",
"name":"avg_random", * "name":"avg_random",
"fn":"/", * "fn":"/",
"fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"}, * "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
{"type":"fieldAccess","name":"rows","fieldName":"rows"} ]} * {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
], * ],
"intervals":["2012-10-01T00:00/2020-01-01T00"] * "intervals":["2012-10-01T00:00/2020-01-01T00"]
} * }
* </pre> * </pre>
*/ */
@JsonTypeName("rand") @JsonTypeName("rand")
public class RandomFirehoseFactory implements FirehoseFactory public class RandomFirehoseFactory implements FirehoseFactory
{ {
private static final Logger log = new Logger(RandomFirehoseFactory.class); private static final Logger log = new Logger(RandomFirehoseFactory.class);
/** msec to sleep before generating a new row; if this and delayNsec are 0, then go as fast as possible. /**
* msec to sleep before generating a new row; if this and delayNsec are 0, then go as fast as possible.
* json param sleepUsec (microseconds) is used to initialize this. * json param sleepUsec (microseconds) is used to initialize this.
*/ */
private final long delayMsec; private final long delayMsec;
/** nsec to sleep before generating a new row; if this and delayMsec are 0, then go as fast as possible. /**
* nsec to sleep before generating a new row; if this and delayMsec are 0, then go as fast as possible.
* json param sleepUsec (microseconds) is used to initialize this. * json param sleepUsec (microseconds) is used to initialize this.
*/ */
private final int delayNsec; private final int delayNsec;
/** max rows to generate, -1 is infinite, 0 means nothing is generated; use this to prevent /**
* infinite space consumption or to see what happens when a Firehose stops delivering * max rows to generate, -1 is infinite, 0 means nothing is generated; use this to prevent
* values, or to have hasMore() return false. * infinite space consumption or to see what happens when a Firehose stops delivering
* values, or to have hasMore() return false.
*/ */
private final long maxGeneratedRows; private final long maxGeneratedRows;
/** seed for random number generator; if 0, then no seed is used. */ /**
* seed for random number generator; if 0, then no seed is used.
*/
private final long seed; private final long seed;
/** number of tokens to randomly associate with values (no heap limits). This can be used to /**
* number of tokens to randomly associate with values (no heap limits). This can be used to
* stress test the number of tokens. * stress test the number of tokens.
*/ */
private final int nTokens; private final int nTokens;
/** Number of token events per sleep interval. */ /**
* Number of token events per sleep interval.
*/
private final int nPerSleep; private final int nPerSleep;
@JsonCreator @JsonCreator
@ -124,29 +132,30 @@ public class RandomFirehoseFactory implements FirehoseFactory
long nsec = (sleepUsec > 0) ? sleepUsec * 1000L : 0; long nsec = (sleepUsec > 0) ? sleepUsec * 1000L : 0;
long msec = nsec / 1000000L; long msec = nsec / 1000000L;
this.delayMsec = msec; this.delayMsec = msec;
this.delayNsec = (int)(nsec - (msec * 1000000L)); this.delayNsec = (int) (nsec - (msec * 1000000L));
this.maxGeneratedRows = maxGeneratedRows; this.maxGeneratedRows = maxGeneratedRows;
this.seed = seed; this.seed = seed;
this.nTokens = nTokens; this.nTokens = nTokens;
this.nPerSleep = nPerSleep; this.nPerSleep = nPerSleep;
if (nTokens <= 0) { if (nTokens <= 0) {
log.warn("nTokens parameter " + nTokens +" ignored; must be greater than or equal to 1"); log.warn("nTokens parameter " + nTokens + " ignored; must be greater than or equal to 1");
nTokens = 1; nTokens = 1;
} }
if (nPerSleep <= 0) { if (nPerSleep <= 0) {
log.warn("nPerSleep parameter " + nPerSleep +" ignored; must be greater than or equal to 1"); log.warn("nPerSleep parameter " + nPerSleep + " ignored; must be greater than or equal to 1");
nPerSleep = 1; nPerSleep = 1;
} }
log.info("maxGeneratedRows=" + maxGeneratedRows); log.info("maxGeneratedRows=" + maxGeneratedRows);
log.info("seed=" + ( (seed == 0L) ? "random value" : seed )); log.info("seed=" + ((seed == 0L) ? "random value" : seed));
log.info("nTokens=" + nTokens); log.info("nTokens=" + nTokens);
log.info("nPerSleep=" + nPerSleep); log.info("nPerSleep=" + nPerSleep);
double dmsec = (double)delayMsec + ((double)this.delayNsec)/1000000.; double dmsec = (double) delayMsec + ((double) this.delayNsec) / 1000000.;
if (dmsec > 0.0) { if (dmsec > 0.0) {
log.info("sleep period=" + dmsec + "msec"); log.info("sleep period=" + dmsec + "msec");
log.info("approximate max rate of record generation=" + (nPerSleep * 1000./dmsec) + "/sec" + log.info(
" or " + (60. * nPerSleep * 1000./dmsec) + "/minute" "approximate max rate of record generation=" + (nPerSleep * 1000. / dmsec) + "/sec" +
); " or " + (60. * nPerSleep * 1000. / dmsec) + "/minute"
);
} else { } else {
log.info("sleep period= NONE"); log.info("sleep period= NONE");
log.info("approximate max rate of record generation= as fast as possible"); log.info("approximate max rate of record generation= as fast as possible");
@ -170,7 +179,7 @@ public class RandomFirehoseFactory implements FirehoseFactory
@Override @Override
public boolean hasMore() public boolean hasMore()
{ {
if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows) { if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows) {
return waitIfmaxGeneratedRows; return waitIfmaxGeneratedRows;
} else { } else {
return true; // there are always more random numbers return true; // there are always more random numbers
@ -184,7 +193,7 @@ public class RandomFirehoseFactory implements FirehoseFactory
final long nth = (rowCount % nTokens) + 1; final long nth = (rowCount % nTokens) + 1;
long sleepMsec = delayMsec; long sleepMsec = delayMsec;
// all done? // all done?
if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows && waitIfmaxGeneratedRows) { if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows && waitIfmaxGeneratedRows) {
// sleep a long time instead of terminating // sleep a long time instead of terminating
sleepMsec = 2000000000L; sleepMsec = 2000000000L;
} }
@ -193,7 +202,8 @@ public class RandomFirehoseFactory implements FirehoseFactory
if (modulus == 0) { if (modulus == 0) {
sleep(sleepMsec, delayNsec); sleep(sleepMsec, delayNsec);
} }
} catch (InterruptedException e) { }
catch (InterruptedException e) {
throw new RuntimeException("InterruptedException"); throw new RuntimeException("InterruptedException");
} }
} }
@ -202,7 +212,7 @@ public class RandomFirehoseFactory implements FirehoseFactory
} }
final Map<String, Object> theMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); final Map<String, Object> theMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
theMap.put("inColumn", anotherRand((int)nth)); theMap.put("inColumn", anotherRand((int) nth));
theMap.put("target", ("a" + nth)); theMap.put("target", ("a" + nth));
return new MapBasedInputRow(System.currentTimeMillis(), dimensions, theMap); return new MapBasedInputRow(System.currentTimeMillis(), dimensions, theMap);
} }
@ -210,7 +220,7 @@ public class RandomFirehoseFactory implements FirehoseFactory
private Float anotherRand(int scale) private Float anotherRand(int scale)
{ {
double f = rand.nextDouble(); // [0.0,1.0] double f = rand.nextDouble(); // [0.0,1.0]
return new Float(f + (double)scale); return new Float(f + (double) scale);
} }
@Override @Override

View File

@ -11,13 +11,13 @@ import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import twitter4j.ConnectionLifeCycleListener; import twitter4j.ConnectionLifeCycleListener;
import twitter4j.HashtagEntity; import twitter4j.HashtagEntity;
import twitter4j.StallWarning;
import twitter4j.Status; import twitter4j.Status;
import twitter4j.StatusDeletionNotice; import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener; import twitter4j.StatusListener;
import twitter4j.TwitterStream; import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory; import twitter4j.TwitterStreamFactory;
import twitter4j.User; import twitter4j.User;
import twitter4j.StallWarning;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -49,21 +49,23 @@ import static java.lang.Thread.sleep;
* is UTC): * is UTC):
* <pre> * <pre>
* </pre> * </pre>
* * <p/>
* * <p/>
* Notes on twitter.com HTTP (REST) API: v1.0 will be disabled around 2013-03 so v1.1 should be used; * Notes on twitter.com HTTP (REST) API: v1.0 will be disabled around 2013-03 so v1.1 should be used;
* twitter4j 3.0 (not yet released) will support the v1.1 api. * twitter4j 3.0 (not yet released) will support the v1.1 api.
* Specifically, we should be using https://stream.twitter.com/1.1/statuses/sample.json * Specifically, we should be using https://stream.twitter.com/1.1/statuses/sample.json
* See: http://jira.twitter4j.org/browse/TFJ-186 * See: http://jira.twitter4j.org/browse/TFJ-186
* <p/>
* Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString()
* can have number format exceptions), so it might be necessary to extract raw json and process it
* separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON();
* com.fasterxml.jackson.databind.ObjectMapper should be used to parse.
* *
* Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString()
* can have number format exceptions), so it might be necessary to extract raw json and process it
* separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON();
* com.fasterxml.jackson.databind.ObjectMapper should be used to parse.
* @author pbaclace * @author pbaclace
*/ */
@JsonTypeName("twitzer") @JsonTypeName("twitzer")
public class TwitterSpritzerFirehoseFactory implements FirehoseFactory { public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
{
private static final Logger log = new Logger(TwitterSpritzerFirehoseFactory.class); private static final Logger log = new Logger(TwitterSpritzerFirehoseFactory.class);
/** /**
* max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent * max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent
@ -94,7 +96,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
@Override @Override
public Firehose connect() throws IOException public Firehose connect() throws IOException
{ {
final ConnectionLifeCycleListener connectionLifeCycleListener = new ConnectionLifeCycleListener() { final ConnectionLifeCycleListener connectionLifeCycleListener = new ConnectionLifeCycleListener()
{
@Override @Override
public void onConnect() public void onConnect()
{ {
@ -134,7 +137,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
// //
twitterStream = new TwitterStreamFactory().getInstance(); twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.addConnectionLifeCycleListener(connectionLifeCycleListener); twitterStream.addConnectionLifeCycleListener(connectionLifeCycleListener);
statusListener = new StatusListener() { // This is what really gets called to deliver stuff from twitter4j statusListener = new StatusListener()
{ // This is what really gets called to deliver stuff from twitter4j
@Override @Override
public void onStatus(Status status) public void onStatus(Status status)
{ {
@ -147,7 +151,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
if (!success) { if (!success) {
log.warn("queue too slow!"); log.warn("queue too slow!");
} }
} catch (InterruptedException e) { }
catch (InterruptedException e) {
throw new RuntimeException("InterruptedException", e); throw new RuntimeException("InterruptedException", e);
} }
} }
@ -179,7 +184,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
} }
@Override @Override
public void onStallWarning(StallWarning warning) { public void onStallWarning(StallWarning warning)
{
System.out.println("Got stall warning:" + warning); System.out.println("Got stall warning:" + warning);
} }
}; };
@ -188,9 +194,11 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
twitterStream.sample(); // creates a generic StatusStream twitterStream.sample(); // creates a generic StatusStream
log.info("returned from sample()"); log.info("returned from sample()");
return new Firehose() { return new Firehose()
{
private final Runnable doNothingRunnable = new Runnable() { private final Runnable doNothingRunnable = new Runnable()
{
public void run() public void run()
{ {
} }
@ -240,7 +248,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
try { try {
log.info("reached limit, sleeping a long time..."); log.info("reached limit, sleeping a long time...");
sleep(2000000000L); sleep(2000000000L);
} catch (InterruptedException e) { }
catch (InterruptedException e) {
throw new RuntimeException("InterruptedException", e); throw new RuntimeException("InterruptedException", e);
} }
} else { } else {
@ -254,7 +263,8 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
Status status; Status status;
try { try {
status = queue.take(); status = queue.take();
} catch (InterruptedException e) { }
catch (InterruptedException e) {
throw new RuntimeException("InterruptedException", e); throw new RuntimeException("InterruptedException", e);
} }

View File

@ -27,19 +27,16 @@ import java.io.StringReader;
public class TestCaseSupplier implements InputSupplier<BufferedReader> public class TestCaseSupplier implements InputSupplier<BufferedReader>
{ {
private final String s; private final String testString;
public TestCaseSupplier(String s) public TestCaseSupplier(String testString)
{ {
this.s = s; this.testString = testString;
} }
@Override @Override
public BufferedReader getInput() throws IOException public BufferedReader getInput() throws IOException
{ {
StringBuilder buffer = new StringBuilder(); return new BufferedReader(new StringReader(testString));
buffer.append(s);
BufferedReader br = new BufferedReader(new StringReader(buffer.toString()));
return br;
} }
} }

View File

@ -19,12 +19,11 @@
package druid.examples.webStream; package druid.examples.webStream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.io.InputSupplier; import com.google.common.io.InputSupplier;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.util.HashMap; import java.util.HashMap;
@ -35,17 +34,21 @@ import java.util.concurrent.TimeUnit;
public class UpdateStream implements Runnable public class UpdateStream implements Runnable
{ {
private static final EmittingLogger log = new EmittingLogger(UpdateStream.class); private static final EmittingLogger log = new EmittingLogger(UpdateStream.class);
private InputSupplier supplier;
private BlockingQueue<Map<String, Object>> queue;
private ObjectMapper mapper;
private final TypeReference<HashMap<String, Object>> typeRef;
private static final long queueWaitTime = 15L; private static final long queueWaitTime = 15L;
private final TypeReference<HashMap<String, Object>> typeRef;
private final InputSupplier<BufferedReader> supplier;
private final BlockingQueue<Map<String, Object>> queue;
private final ObjectMapper mapper;
public UpdateStream(InputSupplier supplier, BlockingQueue<Map<String, Object>> queue) public UpdateStream(
InputSupplier<BufferedReader> supplier,
BlockingQueue<Map<String, Object>> queue,
ObjectMapper mapper
)
{ {
this.supplier = supplier; this.supplier = supplier;
this.queue = queue; this.queue = queue;
this.mapper = new ObjectMapper(); this.mapper = mapper;
this.typeRef = new TypeReference<HashMap<String, Object>>() this.typeRef = new TypeReference<HashMap<String, Object>>()
{ {
}; };
@ -60,28 +63,18 @@ public class UpdateStream implements Runnable
public void run() throws RuntimeException public void run() throws RuntimeException
{ {
try { try {
BufferedReader reader = (BufferedReader) supplier.getInput(); BufferedReader reader = supplier.getInput();
String line; String line;
while ((line = reader.readLine()) != null) { while ((line = reader.readLine()) != null) {
if (isValid(line)) { if (isValid(line)) {
try { HashMap<String, Object> map = mapper.readValue(line, typeRef);
HashMap<String, Object> map = mapper.readValue(line, typeRef); queue.offer(map, queueWaitTime, TimeUnit.SECONDS);
queue.offer(map, queueWaitTime, TimeUnit.SECONDS); log.info("Successfully added to queue");
log.info("Successfully added to queue");
}
catch (JsonParseException e) {
log.info("Invalid JSON Stream. Please check if the url returns a proper JSON stream.");
Throwables.propagate(e);
}
catch (Exception e) {
Throwables.propagate(e);
}
} }
} }
} }
catch (Exception e) { catch (Exception e) {
e.printStackTrace(); throw Throwables.propagate(e);
Throwables.propagate(e);
} }
} }

View File

@ -22,14 +22,18 @@ package druid.examples.webStream;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.metamx.common.parsers.TimestampParser;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow; import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.realtime.firehose.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
@ -41,11 +45,11 @@ import java.util.concurrent.Executors;
public class WebFirehoseFactory implements FirehoseFactory public class WebFirehoseFactory implements FirehoseFactory
{ {
private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class); private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class);
private static final int QUEUE_SIZE = 2000;
private final String url; private final String url;
private final List<String> dimensions; private final List<String> dimensions;
private final String timeDimension; private final String timeDimension;
private final List<String> renamedDimensions; private final List<String> renamedDimensions;
private static final int QUEUE_SIZE = 2000;
@JsonCreator @JsonCreator
@ -53,13 +57,13 @@ public class WebFirehoseFactory implements FirehoseFactory
@JsonProperty("url") String url, @JsonProperty("url") String url,
@JsonProperty("dimensions") List<String> dimensions, @JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("renamedDimensions") List<String> renamedDimensions, @JsonProperty("renamedDimensions") List<String> renamedDimensions,
@JsonProperty("timeDimension") String s @JsonProperty("timeDimension") String timeDimension
) )
{ {
this.url = url; this.url = url;
this.dimensions = dimensions; this.dimensions = dimensions;
this.renamedDimensions = renamedDimensions; this.renamedDimensions = renamedDimensions;
this.timeDimension = s; this.timeDimension = timeDimension;
} }
@Override @Override
@ -67,17 +71,7 @@ public class WebFirehoseFactory implements FirehoseFactory
{ {
final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE); final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
Runnable updateStream = new UpdateStream(new WebJsonSupplier(dimensions, url), queue); Runnable updateStream = new UpdateStream(new WebJsonSupplier(dimensions, url), queue, new DefaultObjectMapper());
// Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler()
// {
// public void uncaughtException(Thread th, Throwable ex)
// {
// log.info("Uncaught exception: " + ex);
// }
// };
// final Thread streamReader = new Thread(updateStream);
// streamReader.setUncaughtExceptionHandler(h);
// streamReader.start();
final ExecutorService service = Executors.newSingleThreadExecutor(); final ExecutorService service = Executors.newSingleThreadExecutor();
service.submit(updateStream); service.submit(updateStream);
@ -95,53 +89,45 @@ public class WebFirehoseFactory implements FirehoseFactory
@Override @Override
public InputRow nextRow() public InputRow nextRow()
{ {
if (Thread.currentThread().isInterrupted()) {
throw new RuntimeException("Interrupted, time to stop");
}
Map<String, Object> update;
try { try {
update = queue.take(); Map<String, Object> processedMap = processMap(queue.take());
DateTime date = TimestampParser.createTimestampParser("auto")
.apply(processedMap.get(timeDimension).toString());
long seconds = (long) date.getMillis();
//the parser doesn't check for posix. Only expects iso or millis. This checks for posix
if (new DateTime(seconds * 1000).getYear() == new DateTime().getYear()) {
seconds = (long) date.getMillis() * 1000;
}
return new MapBasedInputRow(
seconds,
renamedDimensions,
processedMap
);
} }
catch (InterruptedException e) { catch (Exception e) {
throw new RuntimeException("InterrutpedException", e); throw Throwables.propagate(e);
} }
Map<String, Object> processedMap = processMap(update);
return new MapBasedInputRow(
((Integer) processedMap.get(timeDimension)).longValue() * 1000,
renamedDimensions,
processedMap
);
} }
private Map<String, Object> renameKeys(Map<String, Object> update) private Map<String, Object> renameKeys(Map<String, Object> update)
{ {
Map<String, Object> renamedMap = new HashMap<String, Object>(); Map<String, Object> renamedMap = Maps.newHashMap();
int iter = 0; for (int iter = 0; iter < dimensions.size(); iter++) {
while (iter < dimensions.size()) { if (update.get(dimensions.get(iter)) == null) {
update.put(dimensions.get(iter), null);
}
Object obj = update.get(dimensions.get(iter)); Object obj = update.get(dimensions.get(iter));
renamedMap.put(renamedDimensions.get(iter), obj); renamedMap.put(renamedDimensions.get(iter), obj);
iter++; }
if (renamedMap.get(timeDimension) == null) {
renamedMap.put(timeDimension, System.currentTimeMillis());
} }
return renamedMap; return renamedMap;
} }
private void processNullDimensions(Map<String, Object> map)
{
for (String key : renamedDimensions) {
if (map.get(key) == null) {
if (key.equals(timeDimension)) {
map.put(key, new Integer((int) System.currentTimeMillis() / 1000));
} else {
map.put(key, null);
}
}
}
}
private Map<String, Object> processMap(Map<String, Object> map) private Map<String, Object> processMap(Map<String, Object> map)
{ {
Map<String, Object> renamedMap = renameKeys(map); Map<String, Object> renamedMap = renameKeys(map);
processNullDimensions(renamedMap);
return renamedMap; return renamedMap;
} }
@ -156,7 +142,6 @@ public class WebFirehoseFactory implements FirehoseFactory
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
log.info("CLOSING!!!");
service.shutdown(); service.shutdown();
} }

View File

@ -21,6 +21,7 @@ package druid.examples.webStream;
import com.google.common.io.InputSupplier; import com.google.common.io.InputSupplier;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.realtime.firehose.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import org.testng.annotations.Test; import org.testng.annotations.Test;
@ -128,7 +129,7 @@ public class WebFirehoseFactoryTest
InputSupplier testCaseSupplier = new TestCaseSupplier( InputSupplier testCaseSupplier = new TestCaseSupplier(
"{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }" "{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }"
); );
UpdateStream updateStream = new UpdateStream(testCaseSupplier, queue); UpdateStream updateStream = new UpdateStream(testCaseSupplier, queue, new DefaultObjectMapper());
Thread t = new Thread(updateStream); Thread t = new Thread(updateStream);
t.start(); t.start();
Map<String, Object> expectedAnswer = new HashMap<String, Object>(); Map<String, Object> expectedAnswer = new HashMap<String, Object>();
@ -188,7 +189,7 @@ public class WebFirehoseFactoryTest
InputSupplier testCaseSupplier = new TestCaseSupplier( InputSupplier testCaseSupplier = new TestCaseSupplier(
"{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"kw\": \"spcnws\", \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }" "{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"kw\": \"spcnws\", \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }"
); );
UpdateStream updateStream = new UpdateStream(testCaseSupplier, queue); UpdateStream updateStream = new UpdateStream(testCaseSupplier, queue, new DefaultObjectMapper());
Thread t = new Thread(updateStream); Thread t = new Thread(updateStream);
t.start(); t.start();
InputRow row = webbieHose.nextRow(); InputRow row = webbieHose.nextRow();