From 01b4728c40c9d4073c2bd628fecae8f7bce15878 Mon Sep 17 00:00:00 2001 From: Dhruv Parthasarathy Date: Mon, 8 Jul 2013 14:27:35 -0700 Subject: [PATCH] removed shared queue structure. Queue now encapsulated within updateStream --- .../examples/webStream/UpdateStream.java | 13 +++++++++--- .../webStream/WebFirehoseFactory.java | 7 ++----- .../webStream/WebFirehoseFactoryTest.java | 21 ++++--------------- 3 files changed, 16 insertions(+), 25 deletions(-) diff --git a/examples/src/main/java/druid/examples/webStream/UpdateStream.java b/examples/src/main/java/druid/examples/webStream/UpdateStream.java index 3f980762746..08269932520 100644 --- a/examples/src/main/java/druid/examples/webStream/UpdateStream.java +++ b/examples/src/main/java/druid/examples/webStream/UpdateStream.java @@ -30,6 +30,7 @@ import java.io.BufferedReader; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -39,21 +40,20 @@ public class UpdateStream implements Runnable private static final long queueWaitTime = 15L; private final TypeReference> typeRef; private final InputSupplier supplier; - private final BlockingQueue> queue; + private final int QUEUE_SIZE=10000; + private final BlockingQueue> queue = new ArrayBlockingQueue>(QUEUE_SIZE); private final ObjectMapper mapper; private final Map renamedDimensions; private final String timeDimension; public UpdateStream( InputSupplier supplier, - BlockingQueue> queue, ObjectMapper mapper, Map renamedDimensions, String timeDimension ) { this.supplier = supplier; - this.queue = queue; this.mapper = mapper; this.typeRef = new TypeReference>() { @@ -109,4 +109,11 @@ public class UpdateStream implements Runnable } } + public Map takeFromQueue() throws InterruptedException{ + return queue.take(); + } + + public int getQueueSize(){ + return queue.size(); + } } diff --git a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java index b2f65bb467e..cc58d3cba97 100644 --- a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java +++ b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java @@ -45,7 +45,6 @@ import java.util.concurrent.Executors; public class WebFirehoseFactory implements FirehoseFactory { private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class); - private static final int QUEUE_SIZE = 2000; private final String url; private final String timeDimension; private final String newTimeDimension; @@ -81,11 +80,9 @@ public class WebFirehoseFactory implements FirehoseFactory @Override public Firehose connect() throws IOException { - final BlockingQueue> queue = new ArrayBlockingQueue>(QUEUE_SIZE); - Runnable updateStream = new UpdateStream( + final UpdateStream updateStream = new UpdateStream( new WebJsonSupplier(url), - queue, new DefaultObjectMapper(), renamedDimensions, timeDimension @@ -108,7 +105,7 @@ public class WebFirehoseFactory implements FirehoseFactory public InputRow nextRow() { try { - Map map = queue.take(); + Map map = updateStream.takeFromQueue(); DateTime date = TimestampParser.createTimestampParser(timeFormat) .apply(map.get(newTimeDimension).toString()); return new MapBasedInputRow( diff --git a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java index fb68933f606..cc117dc5034 100644 --- a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java +++ b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java @@ -30,16 +30,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; public class WebFirehoseFactoryTest { private final ArrayList dimensions = new ArrayList(); private InputSupplier testCaseSupplier; - final int QUEUE_SIZE = 2000; - BlockingQueue> queue; String timeDimension = "t"; DefaultObjectMapper mapper = new DefaultObjectMapper(); Map expectedAnswer = new HashMap(); @@ -102,16 +97,14 @@ public class WebFirehoseFactoryTest @Test public void basicIngestionCheck() throws Exception { - queue = new ArrayBlockingQueue>(QUEUE_SIZE); UpdateStream updateStream = new UpdateStream( testCaseSupplier, - queue, mapper, null, timeDimension ); updateStream.run(); - Map insertedRow = queue.poll(10, TimeUnit.SECONDS); + Map insertedRow = updateStream.takeFromQueue(); Assert.assertEquals(expectedAnswer, insertedRow); } @@ -119,19 +112,17 @@ public class WebFirehoseFactoryTest @Test public void missingTimeStampCheck() { - queue = new ArrayBlockingQueue>(QUEUE_SIZE); InputSupplier testCaseSupplier = new TestCaseSupplier( "{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }" ); UpdateStream updateStream = new UpdateStream( testCaseSupplier, - queue, mapper, null, timeDimension ); updateStream.run(); - Assert.assertEquals(queue.size(), 0); + Assert.assertEquals(updateStream.getQueueSize(), 0); } //If any other value is missing, we should still add the event and process it properly @@ -157,19 +148,17 @@ public class WebFirehoseFactoryTest expectedAnswer.put("hc", 1368193091); expectedAnswer.put("cy", "New York"); expectedAnswer.put("ll", Arrays.asList(40.862598, -73.921799)); - queue = new ArrayBlockingQueue>(QUEUE_SIZE); testCaseSupplier = new TestCaseSupplier( "{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }" ); UpdateStream updateStream = new UpdateStream( testCaseSupplier, - queue, mapper, null, timeDimension ); updateStream.run(); - Map insertedRow = queue.poll(10, TimeUnit.SECONDS); + Map insertedRow = updateStream.takeFromQueue(); Assert.assertEquals(expectedAnswer, insertedRow); } @@ -177,7 +166,6 @@ public class WebFirehoseFactoryTest public void checkRenameKeys() throws Exception { Map expectedAnswer = new HashMap(); - queue = new ArrayBlockingQueue>(QUEUE_SIZE); Map renamedDimensions = new HashMap(); renamedDimensions.put("g","bitly_hash"); renamedDimensions.put("c","country"); @@ -218,13 +206,12 @@ public class WebFirehoseFactoryTest UpdateStream updateStream = new UpdateStream( testCaseSupplier, - queue, mapper, renamedDimensions, timeDimension ); updateStream.run(); - Map inputRow = queue.poll(10, TimeUnit.SECONDS); + Map inputRow = updateStream.takeFromQueue(); Assert.assertEquals(expectedAnswer, inputRow); }