From 3250c698bba9e1dd88c1db772b39e0c16db46c26 Mon Sep 17 00:00:00 2001 From: Dhruv Parthasarathy Date: Tue, 9 Jul 2013 16:34:55 -0700 Subject: [PATCH] fixed thread stuff and made tests cleaner --- .../webStream/InputSupplierUpdateStream.java | 46 ++++--- .../InputSupplierUpdateStreamTest.java | 6 +- .../webStream/RenamingKeysUpdateStream.java | 10 +- .../RenamingKeysUpdateStreamTest.java | 2 +- .../examples/webStream/StoppableThread.java | 27 ++++ .../examples/webStream/UpdateStream.java | 5 +- .../webStream/WebFirehoseFactory.java | 7 +- .../webStream/WebFirehoseFactoryTest.java | 122 ++++++------------ 8 files changed, 108 insertions(+), 117 deletions(-) create mode 100644 examples/src/main/java/druid/examples/webStream/StoppableThread.java diff --git a/examples/src/main/java/druid/examples/webStream/InputSupplierUpdateStream.java b/examples/src/main/java/druid/examples/webStream/InputSupplierUpdateStream.java index c51121ac9c9..8a1066bcdea 100644 --- a/examples/src/main/java/druid/examples/webStream/InputSupplierUpdateStream.java +++ b/examples/src/main/java/druid/examples/webStream/InputSupplierUpdateStream.java @@ -43,7 +43,7 @@ public class InputSupplierUpdateStream implements UpdateStream private final BlockingQueue> queue = new ArrayBlockingQueue>(QUEUE_SIZE); private final ObjectMapper mapper = new DefaultObjectMapper(); private final String timeDimension; - + private StoppableThread addToQueueThread; public InputSupplierUpdateStream( InputSupplier supplier, String timeDimension @@ -61,30 +61,40 @@ public class InputSupplierUpdateStream implements UpdateStream return !(s.isEmpty()); } - @Override - public void run() + public void start() { - try { - BufferedReader reader = supplier.getInput(); - String line; - while ((line = reader.readLine()) != null) { - if (isValid(line)) { - HashMap map = mapper.readValue(line, typeRef); - if (map.get(timeDimension) != null) { - queue.offer(map, queueWaitTime, TimeUnit.SECONDS); - log.debug("Successfully added to queue"); - } else { - log.error("missing timestamp"); + addToQueueThread = new StoppableThread(){ + public void run(){ + while(!finished) + try { + BufferedReader reader = supplier.getInput(); + String line; + while ((line = reader.readLine()) != null) { + if (isValid(line)) { + HashMap map = mapper.readValue(line, typeRef); + if (map.get(timeDimension) != null) { + queue.offer(map, queueWaitTime, TimeUnit.SECONDS); + log.debug("Successfully added to queue"); + } else { + log.error("missing timestamp"); + } + } } } + catch (Exception e) { + throw Throwables.propagate(e); + } } - } - catch (Exception e) { - throw Throwables.propagate(e); - } + }; + addToQueueThread.setDaemon(true); + addToQueueThread.start(); } + public void stop(){ + addToQueueThread.stopMe(); + } + public Map pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException { diff --git a/examples/src/main/java/druid/examples/webStream/InputSupplierUpdateStreamTest.java b/examples/src/main/java/druid/examples/webStream/InputSupplierUpdateStreamTest.java index 654a8e26a73..8499661de2d 100644 --- a/examples/src/main/java/druid/examples/webStream/InputSupplierUpdateStreamTest.java +++ b/examples/src/main/java/druid/examples/webStream/InputSupplierUpdateStreamTest.java @@ -65,7 +65,7 @@ public class InputSupplierUpdateStreamTest testCaseSupplier, timeDimension ); - updateStream.run(); + updateStream.start(); Map insertedRow = updateStream.pollFromQueue(waitTime, unit); Assert.assertEquals(expectedAnswer, insertedRow); } @@ -83,7 +83,7 @@ public class InputSupplierUpdateStreamTest testCaseSupplier, timeDimension ); - updateStream.run(); + updateStream.start(); Assert.assertEquals(updateStream.getQueueSize(), 0); } @@ -99,7 +99,7 @@ public class InputSupplierUpdateStreamTest testCaseSupplier, timeDimension ); - updateStream.run(); + updateStream.start(); Map insertedRow = updateStream.pollFromQueue(waitTime, unit); Map expectedAnswer = new HashMap(); expectedAnswer.put("item1", "value1"); diff --git a/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStream.java b/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStream.java index c7ed5c8113a..3b6596e344e 100644 --- a/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStream.java +++ b/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStream.java @@ -70,10 +70,12 @@ public class RenamingKeysUpdateStream implements UpdateStream } - - @Override - public void run() + public void start() { - updateStream.run(); + updateStream.start(); + } + + public void stop(){ + updateStream.stop(); } } diff --git a/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStreamTest.java b/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStreamTest.java index 449ad4b4a90..53ed20a238e 100644 --- a/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStreamTest.java +++ b/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStreamTest.java @@ -56,7 +56,7 @@ public class RenamingKeysUpdateStreamTest renamedKeys.put("time", "t"); RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys); - renamer.run(); + renamer.start(); Map expectedAnswer = new HashMap(); expectedAnswer.put("i1", "value1"); expectedAnswer.put("i2", 2); diff --git a/examples/src/main/java/druid/examples/webStream/StoppableThread.java b/examples/src/main/java/druid/examples/webStream/StoppableThread.java new file mode 100644 index 00000000000..4b2f054af70 --- /dev/null +++ b/examples/src/main/java/druid/examples/webStream/StoppableThread.java @@ -0,0 +1,27 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package druid.examples.webStream; + +public class StoppableThread extends Thread +{ + volatile boolean finished=false; + public void stopMe(){ + finished=true; + } +} diff --git a/examples/src/main/java/druid/examples/webStream/UpdateStream.java b/examples/src/main/java/druid/examples/webStream/UpdateStream.java index 45818eb64a9..bd73c4e05e4 100644 --- a/examples/src/main/java/druid/examples/webStream/UpdateStream.java +++ b/examples/src/main/java/druid/examples/webStream/UpdateStream.java @@ -21,10 +21,11 @@ package druid.examples.webStream; import java.util.Map; import java.util.concurrent.TimeUnit; -public interface UpdateStream extends Runnable +public interface UpdateStream { public Map pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException; public String getTimeDimension(); - + public void start(); + public void stop(); } diff --git a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java index 24aff866cfe..12c80012f28 100644 --- a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java +++ b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java @@ -35,8 +35,6 @@ import org.joda.time.DateTime; import java.io.IOException; import java.util.ArrayList; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @JsonTypeName("webstream") @@ -78,8 +76,7 @@ public class WebFirehoseFactory implements FirehoseFactory { final UpdateStream updateStream = factory.build(); - final ExecutorService service = Executors.newSingleThreadExecutor(); - service.submit(updateStream); + updateStream.start(); return new Firehose() { @@ -129,7 +126,7 @@ public class WebFirehoseFactory implements FirehoseFactory @Override public void close() throws IOException { - service.shutdown(); + updateStream.stop(); } }; diff --git a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java index 9b63f361380..55018934fdf 100644 --- a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java +++ b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java @@ -51,26 +51,7 @@ public class WebFirehoseFactoryTest @Override public UpdateStream build() { - return new UpdateStream() - { - @Override - public Map pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException - { - return ImmutableMap.of("item1", "value1", "item2", 2, "time", "1372121562"); - } - - @Override - public String getTimeDimension() - { - return "time"; - } - - @Override - public void run() - { - - } - }; + return new MyUpdateStream(ImmutableMap.of("item1", "value1", "item2", 2, "time", "1372121562")); } }, "posix" @@ -82,26 +63,7 @@ public class WebFirehoseFactoryTest @Override public UpdateStream build() { - return new UpdateStream() - { - @Override - public Map pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException - { - return ImmutableMap.of("item1", "value1", "item2", 2, "time", "1373241600000"); - } - - @Override - public String getTimeDimension() - { - return "time"; - } - - @Override - public void run() - { - - } - }; + return new MyUpdateStream(ImmutableMap.of("item1", "value1", "item2", 2, "time", "1373241600000")); } }, "auto" @@ -141,37 +103,18 @@ public class WebFirehoseFactoryTest @Test public void testISOTimeStamp() throws Exception { - WebFirehoseFactory webbie4 = new WebFirehoseFactory( + WebFirehoseFactory webbie3 = new WebFirehoseFactory( new UpdateStreamFactory() { @Override public UpdateStream build() { - return new UpdateStream() - { - @Override - public Map pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException - { - return ImmutableMap.of("item1", "value1", "item2", 2, "time", "2013-07-08"); - } - - @Override - public String getTimeDimension() - { - return "time"; - } - - @Override - public void run() - { - - } - }; + return new MyUpdateStream(ImmutableMap.of("item1", "value1", "item2", 2, "time", "2013-07-08")); } }, "auto" ); - Firehose firehose1 = webbie4.connect(); + Firehose firehose1 = webbie3.connect(); if (firehose1.hasMore()) { long milliSeconds = firehose1.nextRow().getTimestampFromEpoch(); DateTime date = new DateTime("2013-07-08"); @@ -184,37 +127,18 @@ public class WebFirehoseFactoryTest @Test public void testAutoIsoTimeStamp() throws Exception { - WebFirehoseFactory webbie5 = new WebFirehoseFactory( + WebFirehoseFactory webbie2 = new WebFirehoseFactory( new UpdateStreamFactory() { @Override public UpdateStream build() { - return new UpdateStream() - { - @Override - public Map pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException - { - return ImmutableMap.of("item1", "value1", "item2", 2, "time", "2013-07-08"); - } - - @Override - public String getTimeDimension() - { - return "time"; - } - - @Override - public void run() - { - - } - }; + return new MyUpdateStream(ImmutableMap.of("item1", "value1", "item2", 2, "time", "2013-07-08")); } }, null ); - Firehose firehose2 = webbie5.connect(); + Firehose firehose2 = webbie2.connect(); if (firehose2.hasMore()) { long milliSeconds = firehose2.nextRow().getTimestampFromEpoch(); DateTime date = new DateTime("2013-07-08"); @@ -266,4 +190,34 @@ public class WebFirehoseFactoryTest Assert.assertEquals((float) 2.0, inputRow.getFloatMetric("item2")); } + + private static class MyUpdateStream implements UpdateStream + { + private static ImmutableMap map; + public MyUpdateStream(ImmutableMap map){ + this.map=map; + } + + @Override + public Map pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException + { + return map; + } + + @Override + public String getTimeDimension() + { + return "time"; + } + + @Override + public void start() + { + } + + @Override + public void stop() + { + } + } }