mirror of https://github.com/apache/druid.git
fixed with Erics suggestions
This commit is contained in:
parent
6f6bf1996c
commit
adc00e6576
|
@ -114,7 +114,8 @@ public class FlightsConverter
|
|||
|
||||
if (value.equals("NA")) {
|
||||
event.put(metricDimension, 0);
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
event.put(metricDimension, Integer.parseInt(value));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,7 +80,8 @@ public class FlightsFirehoseFactory implements FirehoseFactory
|
|||
try {
|
||||
if (line != null) {
|
||||
return true;
|
||||
} else if (in != null) {
|
||||
}
|
||||
else if (in != null) {
|
||||
line = in.readLine();
|
||||
|
||||
if (line == null) {
|
||||
|
@ -89,14 +90,16 @@ public class FlightsFirehoseFactory implements FirehoseFactory
|
|||
}
|
||||
|
||||
return true;
|
||||
} else if (files.hasNext()) {
|
||||
}
|
||||
else if (files.hasNext()) {
|
||||
final File nextFile = files.next();
|
||||
|
||||
if (nextFile.getName().endsWith(".gz")) {
|
||||
in = new BufferedReader(
|
||||
new InputStreamReader(new GZIPInputStream(new FileInputStream(nextFile)), Charsets.UTF_8)
|
||||
);
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
in = new BufferedReader(new FileReader(nextFile));
|
||||
}
|
||||
return hasMore();
|
||||
|
|
|
@ -35,89 +35,81 @@ import static java.lang.Thread.sleep;
|
|||
* the moment an event is delivered.)
|
||||
* Values are offset by adding the modulus of the token number to the random number
|
||||
* so that token values have distinct, non-overlapping ranges.
|
||||
* <p/>
|
||||
*
|
||||
* </p>
|
||||
* Example spec file:
|
||||
* <pre>
|
||||
* [{
|
||||
* "schema" : { "dataSource":"randseq",
|
||||
* "aggregators":[ {"type":"count", "name":"events"},
|
||||
* {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
|
||||
* "indexGranularity":"minute",
|
||||
* "shardSpec" : { "type": "none" } },
|
||||
* "config" : { "maxRowsInMemory" : 50000,
|
||||
* "intermediatePersistPeriod" : "PT2m" },
|
||||
*
|
||||
* "firehose" : { "type" : "rand",
|
||||
* "sleepUsec": 100000,
|
||||
* "maxGeneratedRows" : 5000000,
|
||||
* "seed" : 0,
|
||||
* "nTokens" : 19,
|
||||
* "nPerSleep" : 3
|
||||
* },
|
||||
*
|
||||
* "plumber" : { "type" : "realtime",
|
||||
* "windowPeriod" : "PT5m",
|
||||
* "segmentGranularity":"hour",
|
||||
* "basePersistDirectory" : "/tmp/realtime/basePersist" }
|
||||
* }]
|
||||
[{
|
||||
"schema" : { "dataSource":"randseq",
|
||||
"aggregators":[ {"type":"count", "name":"events"},
|
||||
{"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
|
||||
"indexGranularity":"minute",
|
||||
"shardSpec" : { "type": "none" } },
|
||||
"config" : { "maxRowsInMemory" : 50000,
|
||||
"intermediatePersistPeriod" : "PT2m" },
|
||||
|
||||
"firehose" : { "type" : "rand",
|
||||
"sleepUsec": 100000,
|
||||
"maxGeneratedRows" : 5000000,
|
||||
"seed" : 0,
|
||||
"nTokens" : 19,
|
||||
"nPerSleep" : 3
|
||||
},
|
||||
|
||||
"plumber" : { "type" : "realtime",
|
||||
"windowPeriod" : "PT5m",
|
||||
"segmentGranularity":"hour",
|
||||
"basePersistDirectory" : "/tmp/realtime/basePersist" }
|
||||
}]
|
||||
* </pre>
|
||||
* <p/>
|
||||
*
|
||||
* Example query using POST to /druid/v2/ (where UTC date and time MUST include the current hour):
|
||||
* <pre>
|
||||
* {
|
||||
* "queryType": "groupBy",
|
||||
* "dataSource": "randSeq",
|
||||
* "granularity": "all",
|
||||
* "dimensions": [],
|
||||
* "aggregations":[
|
||||
* { "type": "count", "name": "rows"},
|
||||
* { "type": "doubleSum", "fieldName": "events", "name": "e"},
|
||||
* { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
|
||||
* ],
|
||||
* "postAggregations":[
|
||||
* { "type":"arithmetic",
|
||||
* "name":"avg_random",
|
||||
* "fn":"/",
|
||||
* "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
|
||||
* {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
|
||||
* ],
|
||||
* "intervals":["2012-10-01T00:00/2020-01-01T00"]
|
||||
* }
|
||||
{
|
||||
"queryType": "groupBy",
|
||||
"dataSource": "randSeq",
|
||||
"granularity": "all",
|
||||
"dimensions": [],
|
||||
"aggregations":[
|
||||
{ "type": "count", "name": "rows"},
|
||||
{ "type": "doubleSum", "fieldName": "events", "name": "e"},
|
||||
{ "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"}
|
||||
],
|
||||
"postAggregations":[
|
||||
{ "type":"arithmetic",
|
||||
"name":"avg_random",
|
||||
"fn":"/",
|
||||
"fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"},
|
||||
{"type":"fieldAccess","name":"rows","fieldName":"rows"} ]}
|
||||
],
|
||||
"intervals":["2012-10-01T00:00/2020-01-01T00"]
|
||||
}
|
||||
* </pre>
|
||||
*/
|
||||
@JsonTypeName("rand")
|
||||
public class RandomFirehoseFactory implements FirehoseFactory
|
||||
{
|
||||
private static final Logger log = new Logger(RandomFirehoseFactory.class);
|
||||
/**
|
||||
* msec to sleep before generating a new row; if this and delayNsec are 0, then go as fast as possible.
|
||||
/** msec to sleep before generating a new row; if this and delayNsec are 0, then go as fast as possible.
|
||||
* json param sleepUsec (microseconds) is used to initialize this.
|
||||
*/
|
||||
private final long delayMsec;
|
||||
/**
|
||||
* nsec to sleep before generating a new row; if this and delayMsec are 0, then go as fast as possible.
|
||||
/** nsec to sleep before generating a new row; if this and delayMsec are 0, then go as fast as possible.
|
||||
* json param sleepUsec (microseconds) is used to initialize this.
|
||||
*/
|
||||
*/
|
||||
private final int delayNsec;
|
||||
/**
|
||||
* max rows to generate, -1 is infinite, 0 means nothing is generated; use this to prevent
|
||||
* infinite space consumption or to see what happens when a Firehose stops delivering
|
||||
* values, or to have hasMore() return false.
|
||||
/** max rows to generate, -1 is infinite, 0 means nothing is generated; use this to prevent
|
||||
* infinite space consumption or to see what happens when a Firehose stops delivering
|
||||
* values, or to have hasMore() return false.
|
||||
*/
|
||||
private final long maxGeneratedRows;
|
||||
/**
|
||||
* seed for random number generator; if 0, then no seed is used.
|
||||
*/
|
||||
/** seed for random number generator; if 0, then no seed is used. */
|
||||
private final long seed;
|
||||
/**
|
||||
* number of tokens to randomly associate with values (no heap limits). This can be used to
|
||||
/** number of tokens to randomly associate with values (no heap limits). This can be used to
|
||||
* stress test the number of tokens.
|
||||
*/
|
||||
private final int nTokens;
|
||||
/**
|
||||
* Number of token events per sleep interval.
|
||||
*/
|
||||
/** Number of token events per sleep interval. */
|
||||
private final int nPerSleep;
|
||||
|
||||
@JsonCreator
|
||||
|
@ -132,30 +124,29 @@ public class RandomFirehoseFactory implements FirehoseFactory
|
|||
long nsec = (sleepUsec > 0) ? sleepUsec * 1000L : 0;
|
||||
long msec = nsec / 1000000L;
|
||||
this.delayMsec = msec;
|
||||
this.delayNsec = (int) (nsec - (msec * 1000000L));
|
||||
this.delayNsec = (int)(nsec - (msec * 1000000L));
|
||||
this.maxGeneratedRows = maxGeneratedRows;
|
||||
this.seed = seed;
|
||||
this.nTokens = nTokens;
|
||||
this.nPerSleep = nPerSleep;
|
||||
if (nTokens <= 0) {
|
||||
log.warn("nTokens parameter " + nTokens + " ignored; must be greater than or equal to 1");
|
||||
log.warn("nTokens parameter " + nTokens +" ignored; must be greater than or equal to 1");
|
||||
nTokens = 1;
|
||||
}
|
||||
if (nPerSleep <= 0) {
|
||||
log.warn("nPerSleep parameter " + nPerSleep + " ignored; must be greater than or equal to 1");
|
||||
log.warn("nPerSleep parameter " + nPerSleep +" ignored; must be greater than or equal to 1");
|
||||
nPerSleep = 1;
|
||||
}
|
||||
log.info("maxGeneratedRows=" + maxGeneratedRows);
|
||||
log.info("seed=" + ((seed == 0L) ? "random value" : seed));
|
||||
log.info("seed=" + ( (seed == 0L) ? "random value" : seed ));
|
||||
log.info("nTokens=" + nTokens);
|
||||
log.info("nPerSleep=" + nPerSleep);
|
||||
double dmsec = (double) delayMsec + ((double) this.delayNsec) / 1000000.;
|
||||
double dmsec = (double)delayMsec + ((double)this.delayNsec)/1000000.;
|
||||
if (dmsec > 0.0) {
|
||||
log.info("sleep period=" + dmsec + "msec");
|
||||
log.info(
|
||||
"approximate max rate of record generation=" + (nPerSleep * 1000. / dmsec) + "/sec" +
|
||||
" or " + (60. * nPerSleep * 1000. / dmsec) + "/minute"
|
||||
);
|
||||
log.info("approximate max rate of record generation=" + (nPerSleep * 1000./dmsec) + "/sec" +
|
||||
" or " + (60. * nPerSleep * 1000./dmsec) + "/minute"
|
||||
);
|
||||
} else {
|
||||
log.info("sleep period= NONE");
|
||||
log.info("approximate max rate of record generation= as fast as possible");
|
||||
|
@ -179,7 +170,7 @@ public class RandomFirehoseFactory implements FirehoseFactory
|
|||
@Override
|
||||
public boolean hasMore()
|
||||
{
|
||||
if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows) {
|
||||
if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows) {
|
||||
return waitIfmaxGeneratedRows;
|
||||
} else {
|
||||
return true; // there are always more random numbers
|
||||
|
@ -193,7 +184,7 @@ public class RandomFirehoseFactory implements FirehoseFactory
|
|||
final long nth = (rowCount % nTokens) + 1;
|
||||
long sleepMsec = delayMsec;
|
||||
// all done?
|
||||
if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows && waitIfmaxGeneratedRows) {
|
||||
if (maxGeneratedRows >= 0 && rowCount >= maxGeneratedRows && waitIfmaxGeneratedRows) {
|
||||
// sleep a long time instead of terminating
|
||||
sleepMsec = 2000000000L;
|
||||
}
|
||||
|
@ -202,8 +193,7 @@ public class RandomFirehoseFactory implements FirehoseFactory
|
|||
if (modulus == 0) {
|
||||
sleep(sleepMsec, delayNsec);
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("InterruptedException");
|
||||
}
|
||||
}
|
||||
|
@ -212,7 +202,7 @@ public class RandomFirehoseFactory implements FirehoseFactory
|
|||
}
|
||||
|
||||
final Map<String, Object> theMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
|
||||
theMap.put("inColumn", anotherRand((int) nth));
|
||||
theMap.put("inColumn", anotherRand((int)nth));
|
||||
theMap.put("target", ("a" + nth));
|
||||
return new MapBasedInputRow(System.currentTimeMillis(), dimensions, theMap);
|
||||
}
|
||||
|
@ -220,7 +210,7 @@ public class RandomFirehoseFactory implements FirehoseFactory
|
|||
private Float anotherRand(int scale)
|
||||
{
|
||||
double f = rand.nextDouble(); // [0.0,1.0]
|
||||
return new Float(f + (double) scale);
|
||||
return new Float(f + (double)scale);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -11,13 +11,13 @@ import com.metamx.druid.realtime.firehose.Firehose;
|
|||
import com.metamx.druid.realtime.firehose.FirehoseFactory;
|
||||
import twitter4j.ConnectionLifeCycleListener;
|
||||
import twitter4j.HashtagEntity;
|
||||
import twitter4j.StallWarning;
|
||||
import twitter4j.Status;
|
||||
import twitter4j.StatusDeletionNotice;
|
||||
import twitter4j.StatusListener;
|
||||
import twitter4j.TwitterStream;
|
||||
import twitter4j.TwitterStreamFactory;
|
||||
import twitter4j.User;
|
||||
import twitter4j.StallWarning;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
@ -49,23 +49,21 @@ import static java.lang.Thread.sleep;
|
|||
* is UTC):
|
||||
* <pre>
|
||||
* </pre>
|
||||
* <p/>
|
||||
* <p/>
|
||||
*
|
||||
*
|
||||
* Notes on twitter.com HTTP (REST) API: v1.0 will be disabled around 2013-03 so v1.1 should be used;
|
||||
* twitter4j 3.0 (not yet released) will support the v1.1 api.
|
||||
* Specifically, we should be using https://stream.twitter.com/1.1/statuses/sample.json
|
||||
* See: http://jira.twitter4j.org/browse/TFJ-186
|
||||
* <p/>
|
||||
* Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString()
|
||||
* can have number format exceptions), so it might be necessary to extract raw json and process it
|
||||
* separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON();
|
||||
* com.fasterxml.jackson.databind.ObjectMapper should be used to parse.
|
||||
*
|
||||
* Notes on JSON parsing: as of twitter4j 2.2.x, the json parser has some bugs (ex: Status.toString()
|
||||
* can have number format exceptions), so it might be necessary to extract raw json and process it
|
||||
* separately. If so, set twitter4.jsonStoreEnabled=true and look at DataObjectFactory#getRawJSON();
|
||||
* com.fasterxml.jackson.databind.ObjectMapper should be used to parse.
|
||||
* @author pbaclace
|
||||
*/
|
||||
@JsonTypeName("twitzer")
|
||||
public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
|
||||
{
|
||||
public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
|
||||
private static final Logger log = new Logger(TwitterSpritzerFirehoseFactory.class);
|
||||
/**
|
||||
* max events to receive, -1 is infinite, 0 means nothing is delivered; use this to prevent
|
||||
|
@ -96,8 +94,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
|
|||
@Override
|
||||
public Firehose connect() throws IOException
|
||||
{
|
||||
final ConnectionLifeCycleListener connectionLifeCycleListener = new ConnectionLifeCycleListener()
|
||||
{
|
||||
final ConnectionLifeCycleListener connectionLifeCycleListener = new ConnectionLifeCycleListener() {
|
||||
@Override
|
||||
public void onConnect()
|
||||
{
|
||||
|
@ -137,8 +134,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
|
|||
//
|
||||
twitterStream = new TwitterStreamFactory().getInstance();
|
||||
twitterStream.addConnectionLifeCycleListener(connectionLifeCycleListener);
|
||||
statusListener = new StatusListener()
|
||||
{ // This is what really gets called to deliver stuff from twitter4j
|
||||
statusListener = new StatusListener() { // This is what really gets called to deliver stuff from twitter4j
|
||||
@Override
|
||||
public void onStatus(Status status)
|
||||
{
|
||||
|
@ -151,8 +147,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
|
|||
if (!success) {
|
||||
log.warn("queue too slow!");
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("InterruptedException", e);
|
||||
}
|
||||
}
|
||||
|
@ -184,8 +179,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onStallWarning(StallWarning warning)
|
||||
{
|
||||
public void onStallWarning(StallWarning warning) {
|
||||
System.out.println("Got stall warning:" + warning);
|
||||
}
|
||||
};
|
||||
|
@ -194,11 +188,9 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
|
|||
twitterStream.sample(); // creates a generic StatusStream
|
||||
log.info("returned from sample()");
|
||||
|
||||
return new Firehose()
|
||||
{
|
||||
return new Firehose() {
|
||||
|
||||
private final Runnable doNothingRunnable = new Runnable()
|
||||
{
|
||||
private final Runnable doNothingRunnable = new Runnable() {
|
||||
public void run()
|
||||
{
|
||||
}
|
||||
|
@ -248,8 +240,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
|
|||
try {
|
||||
log.info("reached limit, sleeping a long time...");
|
||||
sleep(2000000000L);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("InterruptedException", e);
|
||||
}
|
||||
} else {
|
||||
|
@ -263,8 +254,7 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory
|
|||
Status status;
|
||||
try {
|
||||
status = queue.take();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("InterruptedException", e);
|
||||
}
|
||||
|
||||
|
|
|
@ -22,10 +22,12 @@ package druid.examples.webStream;
|
|||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
@ -39,11 +41,17 @@ public class UpdateStream implements Runnable
|
|||
private final InputSupplier<BufferedReader> supplier;
|
||||
private final BlockingQueue<Map<String, Object>> queue;
|
||||
private final ObjectMapper mapper;
|
||||
private final ArrayList<String> dimensions;
|
||||
private final ArrayList<String> renamedDimensions;
|
||||
private final String timeDimension;
|
||||
|
||||
public UpdateStream(
|
||||
InputSupplier<BufferedReader> supplier,
|
||||
BlockingQueue<Map<String, Object>> queue,
|
||||
ObjectMapper mapper
|
||||
ObjectMapper mapper,
|
||||
ArrayList<String> dimensions,
|
||||
ArrayList<String> renamedDimensions,
|
||||
String timeDimension
|
||||
)
|
||||
{
|
||||
this.supplier = supplier;
|
||||
|
@ -52,6 +60,9 @@ public class UpdateStream implements Runnable
|
|||
this.typeRef = new TypeReference<HashMap<String, Object>>()
|
||||
{
|
||||
};
|
||||
this.timeDimension = timeDimension;
|
||||
this.dimensions = dimensions;
|
||||
this.renamedDimensions = renamedDimensions;
|
||||
}
|
||||
|
||||
private boolean isValid(String s)
|
||||
|
@ -60,7 +71,7 @@ public class UpdateStream implements Runnable
|
|||
}
|
||||
|
||||
@Override
|
||||
public void run() throws RuntimeException
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
BufferedReader reader = supplier.getInput();
|
||||
|
@ -68,8 +79,13 @@ public class UpdateStream implements Runnable
|
|||
while ((line = reader.readLine()) != null) {
|
||||
if (isValid(line)) {
|
||||
HashMap<String, Object> map = mapper.readValue(line, typeRef);
|
||||
queue.offer(map, queueWaitTime, TimeUnit.SECONDS);
|
||||
log.debug("Successfully added to queue");
|
||||
if (map.get(timeDimension) != null) {
|
||||
Map<String, Object> renamedMap = renameKeys(map);
|
||||
queue.offer(renamedMap, queueWaitTime, TimeUnit.SECONDS);
|
||||
log.debug("Successfully added to queue");
|
||||
} else {
|
||||
log.debug("missing timestamp");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -78,4 +94,17 @@ public class UpdateStream implements Runnable
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
private Map<String, Object> renameKeys(Map<String, Object> update)
|
||||
{
|
||||
Map<String, Object> renamedMap = Maps.newHashMap();
|
||||
for (int iter = 0; iter < dimensions.size(); iter++) {
|
||||
if (update.get(dimensions.get(iter)) != null) {
|
||||
Object obj = update.get(dimensions.get(iter));
|
||||
renamedMap.put(renamedDimensions.get(iter), obj);
|
||||
}
|
||||
}
|
||||
return renamedMap;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.parsers.TimestampParser;
|
||||
import com.metamx.druid.guava.Runnables;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
|
@ -35,7 +34,7 @@ import com.metamx.emitter.EmittingLogger;
|
|||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
@ -48,23 +47,26 @@ public class WebFirehoseFactory implements FirehoseFactory
|
|||
private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class);
|
||||
private static final int QUEUE_SIZE = 2000;
|
||||
private final String url;
|
||||
private final List<String> dimensions;
|
||||
private final ArrayList<String> dimensions;
|
||||
private final String timeDimension;
|
||||
private final List<String> renamedDimensions;
|
||||
private final ArrayList<String> renamedDimensions;
|
||||
private final String timeFormat;
|
||||
|
||||
|
||||
@JsonCreator
|
||||
public WebFirehoseFactory(
|
||||
@JsonProperty("url") String url,
|
||||
@JsonProperty("dimensions") List<String> dimensions,
|
||||
@JsonProperty("renamedDimensions") List<String> renamedDimensions,
|
||||
@JsonProperty("timeDimension") String timeDimension
|
||||
@JsonProperty("dimensions") ArrayList<String> dimensions,
|
||||
@JsonProperty("renamedDimensions") ArrayList<String> renamedDimensions,
|
||||
@JsonProperty("timeDimension") String timeDimension,
|
||||
@JsonProperty("timeFormat") String timeFormat
|
||||
)
|
||||
{
|
||||
this.url = url;
|
||||
this.dimensions = dimensions;
|
||||
this.renamedDimensions = renamedDimensions;
|
||||
this.timeDimension = timeDimension;
|
||||
this.timeFormat = timeFormat;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -72,7 +74,14 @@ public class WebFirehoseFactory implements FirehoseFactory
|
|||
{
|
||||
final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
|
||||
|
||||
Runnable updateStream = new UpdateStream(new WebJsonSupplier(dimensions, url), queue, new DefaultObjectMapper());
|
||||
Runnable updateStream = new UpdateStream(
|
||||
new WebJsonSupplier(url),
|
||||
queue,
|
||||
new DefaultObjectMapper(),
|
||||
dimensions,
|
||||
renamedDimensions,
|
||||
timeDimension
|
||||
);
|
||||
final ExecutorService service = Executors.newSingleThreadExecutor();
|
||||
service.submit(updateStream);
|
||||
|
||||
|
@ -83,7 +92,7 @@ public class WebFirehoseFactory implements FirehoseFactory
|
|||
@Override
|
||||
public boolean hasMore()
|
||||
{
|
||||
return !(service.isTerminated());
|
||||
return !(service.isTerminated()) && queue.size() > 0;
|
||||
}
|
||||
|
||||
|
||||
|
@ -91,18 +100,14 @@ public class WebFirehoseFactory implements FirehoseFactory
|
|||
public InputRow nextRow()
|
||||
{
|
||||
try {
|
||||
Map<String, Object> processedMap = processMap(queue.take());
|
||||
DateTime date = TimestampParser.createTimestampParser("auto")
|
||||
.apply(processedMap.get(timeDimension).toString());
|
||||
Map<String, Object> map = queue.take();
|
||||
DateTime date = TimestampParser.createTimestampParser(timeFormat)
|
||||
.apply(map.get(timeDimension).toString());
|
||||
long seconds = (long) date.getMillis();
|
||||
//the parser doesn't check for posix. Only expects iso or millis. This checks for posix
|
||||
if (new DateTime(seconds * 1000).getYear() == new DateTime().getYear()) {
|
||||
seconds = (long) date.getMillis() * 1000;
|
||||
}
|
||||
return new MapBasedInputRow(
|
||||
seconds,
|
||||
renamedDimensions,
|
||||
processedMap
|
||||
map
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
@ -110,28 +115,6 @@ public class WebFirehoseFactory implements FirehoseFactory
|
|||
}
|
||||
}
|
||||
|
||||
private Map<String, Object> renameKeys(Map<String, Object> update)
|
||||
{
|
||||
Map<String, Object> renamedMap = Maps.newHashMap();
|
||||
for (int iter = 0; iter < dimensions.size(); iter++) {
|
||||
if (update.get(dimensions.get(iter)) != null) {
|
||||
Object obj = update.get(dimensions.get(iter));
|
||||
renamedMap.put(renamedDimensions.get(iter), obj);
|
||||
}
|
||||
}
|
||||
if (renamedMap.get(timeDimension) == null) {
|
||||
renamedMap.put(timeDimension, System.currentTimeMillis());
|
||||
}
|
||||
return renamedMap;
|
||||
}
|
||||
|
||||
private Map<String, Object> processMap(Map<String, Object> map)
|
||||
{
|
||||
Map<String, Object> renamedMap = renameKeys(map);
|
||||
return renamedMap;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Runnable commit()
|
||||
{
|
||||
|
|
|
@ -1,36 +1,15 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package druid.examples.webStream;
|
||||
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.realtime.firehose.Firehose;
|
||||
import com.metamx.druid.realtime.firehose.FirehoseFactory;
|
||||
import junit.framework.Assert;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
@ -38,101 +17,38 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
public class WebFirehoseFactoryTest
|
||||
{
|
||||
@Test
|
||||
void testMalformedUrlConnect() throws Exception
|
||||
private final ArrayList<String> dimensions = new ArrayList<String>();
|
||||
private InputSupplier testCaseSupplier;
|
||||
final int QUEUE_SIZE = 2000;
|
||||
BlockingQueue<Map<String, Object>> queue;
|
||||
String timeDimension = "t";
|
||||
DefaultObjectMapper mapper = new DefaultObjectMapper();
|
||||
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
|
||||
|
||||
@BeforeClass
|
||||
public void setUp()
|
||||
{
|
||||
List<String> dimensions = new LinkedList<String>();
|
||||
dimensions.add("g");
|
||||
dimensions.add("c");
|
||||
dimensions.add("a");
|
||||
dimensions.add("cy");
|
||||
dimensions.add("l");
|
||||
dimensions.add("hh");
|
||||
dimensions.add("hc");
|
||||
dimensions.add("h");
|
||||
dimensions.add("u");
|
||||
dimensions.add("tz");
|
||||
dimensions.add("t");
|
||||
dimensions.add("r");
|
||||
dimensions.add("gr");
|
||||
dimensions.add("nk");
|
||||
dimensions.add("al");
|
||||
|
||||
String invalidURL = "http://invalid.url";
|
||||
FirehoseFactory test = new WebFirehoseFactory(invalidURL, dimensions, dimensions, "t");
|
||||
Firehose returnedFirehose = test.connect();
|
||||
Thread.sleep(3000);
|
||||
assert returnedFirehose.hasMore() == false;
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testUrlWithNoJsonStreamConnect() throws Exception
|
||||
{
|
||||
List<String> dimensions = new LinkedList<String>();
|
||||
dimensions.add("g");
|
||||
dimensions.add("c");
|
||||
dimensions.add("a");
|
||||
dimensions.add("cy");
|
||||
dimensions.add("l");
|
||||
dimensions.add("hh");
|
||||
dimensions.add("hc");
|
||||
dimensions.add("h");
|
||||
dimensions.add("u");
|
||||
dimensions.add("tz");
|
||||
dimensions.add("t");
|
||||
dimensions.add("r");
|
||||
dimensions.add("gr");
|
||||
dimensions.add("nk");
|
||||
dimensions.add("al");
|
||||
|
||||
String nonJsonUrl = "http://google.com";
|
||||
FirehoseFactory test = new WebFirehoseFactory(nonJsonUrl, dimensions, dimensions, "t");
|
||||
Firehose returnedFirehose = test.connect();
|
||||
Thread.sleep(3000);
|
||||
assert returnedFirehose.hasMore() == false;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void correctUrlCheck() throws Exception
|
||||
{
|
||||
List<String> dimensions = new LinkedList<String>();
|
||||
dimensions.add("g");
|
||||
dimensions.add("c");
|
||||
dimensions.add("a");
|
||||
dimensions.add("cy");
|
||||
dimensions.add("l");
|
||||
dimensions.add("hh");
|
||||
dimensions.add("hc");
|
||||
dimensions.add("h");
|
||||
dimensions.add("u");
|
||||
dimensions.add("tz");
|
||||
dimensions.add("t");
|
||||
dimensions.add("r");
|
||||
dimensions.add("gr");
|
||||
dimensions.add("nk");
|
||||
dimensions.add("al");
|
||||
|
||||
String url = "http://developer.usa.gov/1usagov";
|
||||
FirehoseFactory test = new WebFirehoseFactory(url, dimensions, dimensions, "t");
|
||||
Firehose returnedFirehose = test.connect();
|
||||
Thread.sleep(3000);
|
||||
assert returnedFirehose.hasMore() == true;
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void basicIngestionCheck() throws Exception
|
||||
{
|
||||
final int QUEUE_SIZE = 2000;
|
||||
BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
|
||||
InputSupplier testCaseSupplier = new TestCaseSupplier(
|
||||
testCaseSupplier = new TestCaseSupplier(
|
||||
"{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }"
|
||||
);
|
||||
UpdateStream updateStream = new UpdateStream(testCaseSupplier, queue, new DefaultObjectMapper());
|
||||
Thread t = new Thread(updateStream);
|
||||
t.start();
|
||||
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
|
||||
|
||||
dimensions.add("g");
|
||||
dimensions.add("c");
|
||||
dimensions.add("a");
|
||||
dimensions.add("cy");
|
||||
dimensions.add("l");
|
||||
dimensions.add("hh");
|
||||
dimensions.add("hc");
|
||||
dimensions.add("h");
|
||||
dimensions.add("u");
|
||||
dimensions.add("tz");
|
||||
dimensions.add("t");
|
||||
dimensions.add("r");
|
||||
dimensions.add("gr");
|
||||
dimensions.add("nk");
|
||||
dimensions.add("al");
|
||||
dimensions.add("ll");
|
||||
|
||||
expectedAnswer.put("a", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0");
|
||||
expectedAnswer.put("c", "US");
|
||||
expectedAnswer.put("nk", 1);
|
||||
|
@ -152,50 +68,151 @@ public class WebFirehoseFactoryTest
|
|||
expectedAnswer.put("hc", 1368193091);
|
||||
expectedAnswer.put("cy", "New York");
|
||||
expectedAnswer.put("ll", Arrays.asList(40.862598, -73.921799));
|
||||
Map<String, Object> insertedRow = queue.poll(10, TimeUnit.SECONDS);
|
||||
assert expectedAnswer.equals(insertedRow);
|
||||
|
||||
}
|
||||
|
||||
@Test(expectedExceptions = UnknownHostException.class)
|
||||
public void checkInvalidUrl() throws Exception
|
||||
{
|
||||
|
||||
String invalidURL = "http://invalid.url";
|
||||
WebJsonSupplier supplier = new WebJsonSupplier(invalidURL);
|
||||
supplier.getInput();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void renameDimensionsCheck() throws Exception
|
||||
public void basicIngestionCheck() throws Exception
|
||||
{
|
||||
List<String> dimensions = new ArrayList<String>();
|
||||
dimensions.add("bitly_hash");
|
||||
dimensions.add("country");
|
||||
dimensions.add("user");
|
||||
dimensions.add("city");
|
||||
dimensions.add("encoding_user_login");
|
||||
dimensions.add("short_url");
|
||||
dimensions.add("timestamp_hash");
|
||||
dimensions.add("user_bitly_hash");
|
||||
dimensions.add("url");
|
||||
dimensions.add("timezone");
|
||||
dimensions.add("time");
|
||||
dimensions.add("referring_url");
|
||||
dimensions.add("geo_region");
|
||||
dimensions.add("known_users");
|
||||
dimensions.add("accept_language");
|
||||
|
||||
WebFirehoseFactory webbie = new WebFirehoseFactory(
|
||||
"http://developer.usa.gov/1usagov",
|
||||
queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
|
||||
UpdateStream updateStream = new UpdateStream(
|
||||
testCaseSupplier,
|
||||
queue,
|
||||
mapper,
|
||||
dimensions,
|
||||
dimensions,
|
||||
"time"
|
||||
timeDimension
|
||||
);
|
||||
Firehose webbieHose = webbie.connect();
|
||||
final int QUEUE_SIZE = 2000;
|
||||
BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
|
||||
InputSupplier testCaseSupplier = new TestCaseSupplier(
|
||||
"{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"kw\": \"spcnws\", \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }"
|
||||
);
|
||||
UpdateStream updateStream = new UpdateStream(testCaseSupplier, queue, new DefaultObjectMapper());
|
||||
Thread t = new Thread(updateStream);
|
||||
t.start();
|
||||
InputRow row = webbieHose.nextRow();
|
||||
assert row.getDimensions().equals(dimensions);
|
||||
|
||||
Map<String, Object> insertedRow = queue.poll(10, TimeUnit.SECONDS);
|
||||
Assert.assertEquals(expectedAnswer, insertedRow);
|
||||
}
|
||||
|
||||
//If a timestamp is missing, we should throw away the event
|
||||
@Test
|
||||
public void missingTimeStampCheck()
|
||||
{
|
||||
queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
|
||||
InputSupplier testCaseSupplier = new TestCaseSupplier(
|
||||
"{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }"
|
||||
);
|
||||
UpdateStream updateStream = new UpdateStream(
|
||||
testCaseSupplier,
|
||||
queue,
|
||||
mapper,
|
||||
dimensions,
|
||||
dimensions,
|
||||
timeDimension
|
||||
);
|
||||
updateStream.run();
|
||||
Assert.assertEquals(queue.size(), 0);
|
||||
}
|
||||
|
||||
//If any other value is missing, we should still add the event and process it properly
|
||||
@Test
|
||||
public void otherNullValueCheck() throws Exception
|
||||
{
|
||||
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
|
||||
expectedAnswer.put("a", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0");
|
||||
expectedAnswer.put("nk", 1);
|
||||
expectedAnswer.put("tz", "America/New_York");
|
||||
expectedAnswer.put("gr", "NY");
|
||||
expectedAnswer.put("g", "1Chgyj");
|
||||
expectedAnswer.put("h", "15vMQjX");
|
||||
expectedAnswer.put("l", "o_d63rn9enb");
|
||||
expectedAnswer.put("al", "en-US,en;q=0.5");
|
||||
expectedAnswer.put("hh", "1.usa.gov");
|
||||
expectedAnswer.put(
|
||||
"r",
|
||||
"http://forecast.weather.gov/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX"
|
||||
);
|
||||
expectedAnswer.put("u", "http://www.spc.ncep.noaa.gov/");
|
||||
expectedAnswer.put("t", 1372121562);
|
||||
expectedAnswer.put("hc", 1368193091);
|
||||
expectedAnswer.put("cy", "New York");
|
||||
expectedAnswer.put("ll", Arrays.asList(40.862598, -73.921799));
|
||||
queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
|
||||
testCaseSupplier = new TestCaseSupplier(
|
||||
"{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }"
|
||||
);
|
||||
UpdateStream updateStream = new UpdateStream(
|
||||
testCaseSupplier,
|
||||
queue,
|
||||
mapper,
|
||||
dimensions,
|
||||
dimensions,
|
||||
timeDimension
|
||||
);
|
||||
Thread t = new Thread(updateStream);
|
||||
t.start();
|
||||
Map<String, Object> insertedRow = queue.poll(10, TimeUnit.SECONDS);
|
||||
Assert.assertEquals(expectedAnswer, insertedRow);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkRenameKeys() throws Exception
|
||||
{
|
||||
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
|
||||
queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
|
||||
ArrayList<String> renamedDimensions = new ArrayList<String>();
|
||||
renamedDimensions.add("bitly_hash");
|
||||
renamedDimensions.add("country");
|
||||
renamedDimensions.add("user");
|
||||
renamedDimensions.add("city");
|
||||
renamedDimensions.add("encoding_user_login");
|
||||
renamedDimensions.add("short_url");
|
||||
renamedDimensions.add("timestamp_hash");
|
||||
renamedDimensions.add("user_bitly_hash");
|
||||
renamedDimensions.add("url");
|
||||
renamedDimensions.add("timezone");
|
||||
renamedDimensions.add("time");
|
||||
renamedDimensions.add("referring_url");
|
||||
renamedDimensions.add("geo_region");
|
||||
renamedDimensions.add("known_users");
|
||||
renamedDimensions.add("accept_language");
|
||||
renamedDimensions.add("latitude_longitude");
|
||||
|
||||
expectedAnswer.put("user", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0");
|
||||
expectedAnswer.put("country", "US");
|
||||
expectedAnswer.put("known_users", 1);
|
||||
expectedAnswer.put("timezone", "America/New_York");
|
||||
expectedAnswer.put("geo_region", "NY");
|
||||
expectedAnswer.put("bitly_hash", "1Chgyj");
|
||||
expectedAnswer.put("user_bitly_hash", "15vMQjX");
|
||||
expectedAnswer.put("encoding_user_login", "o_d63rn9enb");
|
||||
expectedAnswer.put("accept_language", "en-US,en;q=0.5");
|
||||
expectedAnswer.put("short_url", "1.usa.gov");
|
||||
expectedAnswer.put(
|
||||
"referring_url",
|
||||
"http://forecast.weather.gov/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX"
|
||||
);
|
||||
expectedAnswer.put("url", "http://www.spc.ncep.noaa.gov/");
|
||||
expectedAnswer.put("time", 1372121562);
|
||||
expectedAnswer.put("timestamp_hash", 1368193091);
|
||||
expectedAnswer.put("city", "New York");
|
||||
expectedAnswer.put("latitude_longitude", Arrays.asList(40.862598, -73.921799));
|
||||
|
||||
UpdateStream updateStream = new UpdateStream(
|
||||
testCaseSupplier,
|
||||
queue,
|
||||
mapper,
|
||||
dimensions,
|
||||
renamedDimensions,
|
||||
timeDimension
|
||||
);
|
||||
updateStream.run();
|
||||
Map<String, Object> inputRow = queue.poll(10, TimeUnit.SECONDS);
|
||||
Assert.assertEquals(expectedAnswer, inputRow);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -37,9 +37,8 @@ public class WebJsonSupplier implements InputSupplier<BufferedReader>
|
|||
private String urlString;
|
||||
private URL url;
|
||||
|
||||
public WebJsonSupplier(List<String> dimensions, String urlString)
|
||||
public WebJsonSupplier(String urlString)
|
||||
{
|
||||
this.dimensions = dimensions;
|
||||
this.urlString = urlString;
|
||||
try {
|
||||
this.url = new URL(urlString);
|
||||
|
@ -56,8 +55,6 @@ public class WebJsonSupplier implements InputSupplier<BufferedReader>
|
|||
URL url = new URL(urlString);
|
||||
URLConnection connection = url.openConnection();
|
||||
connection.setDoInput(true);
|
||||
//connection.setDoOutput(true);
|
||||
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream()));
|
||||
return reader;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue