diff --git a/examples/src/main/java/druid/examples/webStream/UpdateStream.java b/examples/src/main/java/druid/examples/webStream/UpdateStream.java index 57d8fc67311..3f980762746 100644 --- a/examples/src/main/java/druid/examples/webStream/UpdateStream.java +++ b/examples/src/main/java/druid/examples/webStream/UpdateStream.java @@ -41,16 +41,14 @@ public class UpdateStream implements Runnable private final InputSupplier supplier; private final BlockingQueue> queue; private final ObjectMapper mapper; - private final ArrayList dimensions; - private final ArrayList renamedDimensions; + private final Map renamedDimensions; private final String timeDimension; public UpdateStream( InputSupplier supplier, BlockingQueue> queue, ObjectMapper mapper, - ArrayList dimensions, - ArrayList renamedDimensions, + Map renamedDimensions, String timeDimension ) { @@ -61,7 +59,6 @@ public class UpdateStream implements Runnable { }; this.timeDimension = timeDimension; - this.dimensions = dimensions; this.renamedDimensions = renamedDimensions; } @@ -84,7 +81,7 @@ public class UpdateStream implements Runnable queue.offer(renamedMap, queueWaitTime, TimeUnit.SECONDS); log.debug("Successfully added to queue"); } else { - log.debug("missing timestamp"); + log.error("missing timestamp"); } } } @@ -97,14 +94,19 @@ public class UpdateStream implements Runnable private Map renameKeys(Map update) { - Map renamedMap = Maps.newHashMap(); - for (int iter = 0; iter < dimensions.size(); iter++) { - if (update.get(dimensions.get(iter)) != null) { - Object obj = update.get(dimensions.get(iter)); - renamedMap.put(renamedDimensions.get(iter), obj); + if (renamedDimensions!=null){ + Map renamedMap = Maps.newHashMap(); + for (String key : renamedDimensions.keySet()) { + if(update.get(key)!=null){ + Object obj= update.get(key); + renamedMap.put(renamedDimensions.get(key),obj); + } } + return renamedMap; + } + else{ + return update; } - return renamedMap; } } diff --git a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java index 3896fe30b99..239e23b4acd 100644 --- a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java +++ b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java @@ -35,11 +35,13 @@ import org.joda.time.DateTime; import java.io.IOException; import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; @JsonTypeName("webstream") public class WebFirehoseFactory implements FirehoseFactory @@ -47,26 +49,36 @@ public class WebFirehoseFactory implements FirehoseFactory private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class); private static final int QUEUE_SIZE = 2000; private final String url; - private final ArrayList dimensions; private final String timeDimension; - private final ArrayList renamedDimensions; + private final String newTimeDimension; + private final Map renamedDimensions; private final String timeFormat; + private final long waitTime = 15L; @JsonCreator public WebFirehoseFactory( @JsonProperty("url") String url, - @JsonProperty("dimensions") ArrayList dimensions, - @JsonProperty("renamedDimensions") ArrayList renamedDimensions, + @JsonProperty("renamedDimensions") Map renamedDimensions, @JsonProperty("timeDimension") String timeDimension, @JsonProperty("timeFormat") String timeFormat ) { this.url = url; - this.dimensions = dimensions; this.renamedDimensions = renamedDimensions; this.timeDimension = timeDimension; - this.timeFormat = timeFormat; + if (renamedDimensions!=null){ + newTimeDimension=renamedDimensions.get(timeDimension); + } + else{ + newTimeDimension=timeDimension; + } + if (timeFormat==null){ + this.timeFormat="auto"; + } + else{ + this.timeFormat = timeFormat; + } } @Override @@ -78,7 +90,6 @@ public class WebFirehoseFactory implements FirehoseFactory new WebJsonSupplier(url), queue, new DefaultObjectMapper(), - dimensions, renamedDimensions, timeDimension ); @@ -92,7 +103,7 @@ public class WebFirehoseFactory implements FirehoseFactory @Override public boolean hasMore() { - return !(service.isTerminated()) && queue.size() > 0; + return !service.isTerminated(); } @@ -102,11 +113,10 @@ public class WebFirehoseFactory implements FirehoseFactory try { Map map = queue.take(); DateTime date = TimestampParser.createTimestampParser(timeFormat) - .apply(map.get(timeDimension).toString()); - long seconds = (long) date.getMillis(); + .apply(map.get(newTimeDimension).toString()); return new MapBasedInputRow( - seconds, - renamedDimensions, + date.getMillis(), + new ArrayList(map.keySet()), map ); } diff --git a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java index c4669833b39..fb68933f606 100644 --- a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java +++ b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java @@ -107,12 +107,10 @@ public class WebFirehoseFactoryTest testCaseSupplier, queue, mapper, - dimensions, - dimensions, + null, timeDimension ); - Thread t = new Thread(updateStream); - t.start(); + updateStream.run(); Map insertedRow = queue.poll(10, TimeUnit.SECONDS); Assert.assertEquals(expectedAnswer, insertedRow); } @@ -129,8 +127,7 @@ public class WebFirehoseFactoryTest testCaseSupplier, queue, mapper, - dimensions, - dimensions, + null, timeDimension ); updateStream.run(); @@ -168,12 +165,10 @@ public class WebFirehoseFactoryTest testCaseSupplier, queue, mapper, - dimensions, - dimensions, + null, timeDimension ); - Thread t = new Thread(updateStream); - t.start(); + updateStream.run(); Map insertedRow = queue.poll(10, TimeUnit.SECONDS); Assert.assertEquals(expectedAnswer, insertedRow); } @@ -183,23 +178,23 @@ public class WebFirehoseFactoryTest { Map expectedAnswer = new HashMap(); queue = new ArrayBlockingQueue>(QUEUE_SIZE); - ArrayList renamedDimensions = new ArrayList(); - renamedDimensions.add("bitly_hash"); - renamedDimensions.add("country"); - renamedDimensions.add("user"); - renamedDimensions.add("city"); - renamedDimensions.add("encoding_user_login"); - renamedDimensions.add("short_url"); - renamedDimensions.add("timestamp_hash"); - renamedDimensions.add("user_bitly_hash"); - renamedDimensions.add("url"); - renamedDimensions.add("timezone"); - renamedDimensions.add("time"); - renamedDimensions.add("referring_url"); - renamedDimensions.add("geo_region"); - renamedDimensions.add("known_users"); - renamedDimensions.add("accept_language"); - renamedDimensions.add("latitude_longitude"); + Map renamedDimensions = new HashMap(); + renamedDimensions.put("g","bitly_hash"); + renamedDimensions.put("c","country"); + renamedDimensions.put("a","user"); + renamedDimensions.put("cy","city"); + renamedDimensions.put("l","encoding_user_login"); + renamedDimensions.put("hh","short_url"); + renamedDimensions.put("hc","timestamp_hash"); + renamedDimensions.put("h","user_bitly_hash"); + renamedDimensions.put("u","url"); + renamedDimensions.put("tz","timezone"); + renamedDimensions.put("t","time"); + renamedDimensions.put("r","referring_url"); + renamedDimensions.put("gr","geo_region"); + renamedDimensions.put("nk","known_users"); + renamedDimensions.put("al","accept_language"); + renamedDimensions.put("ll","latitude_longitude"); expectedAnswer.put("user", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0"); expectedAnswer.put("country", "US"); @@ -225,7 +220,6 @@ public class WebFirehoseFactoryTest testCaseSupplier, queue, mapper, - dimensions, renamedDimensions, timeDimension ); diff --git a/examples/src/main/java/druid/examples/webStream/WebJsonSupplier.java b/examples/src/main/java/druid/examples/webStream/WebJsonSupplier.java index 9e69a1649f2..b8a1cd5f7a0 100644 --- a/examples/src/main/java/druid/examples/webStream/WebJsonSupplier.java +++ b/examples/src/main/java/druid/examples/webStream/WebJsonSupplier.java @@ -44,8 +44,7 @@ public class WebJsonSupplier implements InputSupplier this.url = new URL(urlString); } catch (Exception e) { - e.printStackTrace(); - log.info("Malformed url"); + log.error(e,"Malformed url"); } } @@ -55,7 +54,6 @@ public class WebJsonSupplier implements InputSupplier URL url = new URL(urlString); URLConnection connection = url.openConnection(); connection.setDoInput(true); - BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream())); - return reader; + return new BufferedReader(new InputStreamReader(url.openStream())); } }