diff --git a/examples/src/main/java/druid/examples/webStream/InputSupplierUpdateStream.java b/examples/src/main/java/druid/examples/webStream/InputSupplierUpdateStream.java index 8a1066bcdea..b26f241414c 100644 --- a/examples/src/main/java/druid/examples/webStream/InputSupplierUpdateStream.java +++ b/examples/src/main/java/druid/examples/webStream/InputSupplierUpdateStream.java @@ -43,12 +43,42 @@ public class InputSupplierUpdateStream implements UpdateStream private final BlockingQueue> queue = new ArrayBlockingQueue>(QUEUE_SIZE); private final ObjectMapper mapper = new DefaultObjectMapper(); private final String timeDimension; - private StoppableThread addToQueueThread; + private final StoppableThread addToQueueThread; + public InputSupplierUpdateStream( - InputSupplier supplier, - String timeDimension + final InputSupplier supplier, + final String timeDimension ) { + addToQueueThread = new StoppableThread() + { + public void run() + { + while (!finished) { + try { + BufferedReader reader = supplier.getInput(); + String line; + while ((line = reader.readLine()) != null) { + if (isValid(line)) { + HashMap map = mapper.readValue(line, typeRef); + if (map.get(timeDimension) != null) { + queue.offer(map, queueWaitTime, TimeUnit.SECONDS); + log.debug("Successfully added to queue"); + } else { + log.error("missing timestamp"); + } + } + } + } + + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + }; + addToQueueThread.setDaemon(true); + this.supplier = supplier; this.typeRef = new TypeReference>() { @@ -63,35 +93,12 @@ public class InputSupplierUpdateStream implements UpdateStream public void start() { - addToQueueThread = new StoppableThread(){ - public void run(){ - while(!finished) - try { - BufferedReader reader = supplier.getInput(); - String line; - while ((line = reader.readLine()) != null) { - if (isValid(line)) { - HashMap map = mapper.readValue(line, typeRef); - if (map.get(timeDimension) != null) { - queue.offer(map, queueWaitTime, TimeUnit.SECONDS); - log.debug("Successfully added to queue"); - } else { - log.error("missing timestamp"); - } - } - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - }; - addToQueueThread.setDaemon(true); addToQueueThread.start(); } - public void stop(){ + public void stop() + { addToQueueThread.stopMe(); } @@ -106,7 +113,8 @@ public class InputSupplierUpdateStream implements UpdateStream return queue.size(); } - public String getTimeDimension(){ + public String getTimeDimension() + { return timeDimension; }