From 142271aad2bfb1befc8a66a3b78894cc370a5050 Mon Sep 17 00:00:00 2001 From: Dhruv Parthasarathy Date: Mon, 8 Jul 2013 16:46:47 -0700 Subject: [PATCH] better encapsulation --- .../examples/webStream/UpdateStream.java | 42 ++- .../webStream/UpdateStreamFactory.java | 44 +++ .../examples/webStream/UpdateStreamTest.java | 144 +++++++++ .../webStream/WebFirehoseFactory.java | 45 ++- .../webStream/WebFirehoseFactoryTest.java | 275 +++++++----------- 5 files changed, 339 insertions(+), 211 deletions(-) create mode 100644 examples/src/main/java/druid/examples/webStream/UpdateStreamFactory.java create mode 100644 examples/src/main/java/druid/examples/webStream/UpdateStreamTest.java diff --git a/examples/src/main/java/druid/examples/webStream/UpdateStream.java b/examples/src/main/java/druid/examples/webStream/UpdateStream.java index 08269932520..87e1a2ffffb 100644 --- a/examples/src/main/java/druid/examples/webStream/UpdateStream.java +++ b/examples/src/main/java/druid/examples/webStream/UpdateStream.java @@ -24,10 +24,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.io.InputSupplier; +import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.emitter.EmittingLogger; import java.io.BufferedReader; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; @@ -40,21 +40,20 @@ public class UpdateStream implements Runnable private static final long queueWaitTime = 15L; private final TypeReference> typeRef; private final InputSupplier supplier; - private final int QUEUE_SIZE=10000; + private final int QUEUE_SIZE = 10000; private final BlockingQueue> queue = new ArrayBlockingQueue>(QUEUE_SIZE); - private final ObjectMapper mapper; - private final Map renamedDimensions; + private final ObjectMapper mapper = new DefaultObjectMapper(); + private final Map renamedDimensions; private final String timeDimension; + private final long waitTime = 15L; public UpdateStream( InputSupplier supplier, - ObjectMapper mapper, - Map renamedDimensions, + Map renamedDimensions, String timeDimension ) { this.supplier = supplier; - this.mapper = mapper; this.typeRef = new TypeReference>() { }; @@ -94,26 +93,37 @@ public class UpdateStream implements Runnable private Map renameKeys(Map update) { - if (renamedDimensions!=null){ + if (renamedDimensions != null) { Map renamedMap = Maps.newHashMap(); for (String key : renamedDimensions.keySet()) { - if(update.get(key)!=null){ - Object obj= update.get(key); - renamedMap.put(renamedDimensions.get(key),obj); + if (update.get(key) != null) { + Object obj = update.get(key); + renamedMap.put(renamedDimensions.get(key), obj); } } return renamedMap; - } - else{ + } else { return update; } } - public Map takeFromQueue() throws InterruptedException{ - return queue.take(); + public Map pollFromQueue() throws InterruptedException + { + return queue.poll(waitTime, TimeUnit.SECONDS); } - public int getQueueSize(){ + public int getQueueSize() + { return queue.size(); } + + public String getNewTimeDimension() + { + if (renamedDimensions != null) { + return renamedDimensions.get(timeDimension); + } else { + return timeDimension; + } + + } } diff --git a/examples/src/main/java/druid/examples/webStream/UpdateStreamFactory.java b/examples/src/main/java/druid/examples/webStream/UpdateStreamFactory.java new file mode 100644 index 00000000000..2bff55b3ce3 --- /dev/null +++ b/examples/src/main/java/druid/examples/webStream/UpdateStreamFactory.java @@ -0,0 +1,44 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.webStream; + +import com.google.common.io.InputSupplier; + +import java.util.Map; + +public class UpdateStreamFactory +{ + private final InputSupplier inputSupplier; + private final Map renamedDimensions; + private final String timeDimension; + + public UpdateStreamFactory(InputSupplier inputSupplier, Map renamedDimensions, String timeDimension) + { + this.inputSupplier = inputSupplier; + this.renamedDimensions = renamedDimensions; + this.timeDimension = timeDimension; + } + + public UpdateStream build() + { + return new UpdateStream(inputSupplier, renamedDimensions, timeDimension); + } + +} diff --git a/examples/src/main/java/druid/examples/webStream/UpdateStreamTest.java b/examples/src/main/java/druid/examples/webStream/UpdateStreamTest.java new file mode 100644 index 00000000000..1aed4566213 --- /dev/null +++ b/examples/src/main/java/druid/examples/webStream/UpdateStreamTest.java @@ -0,0 +1,144 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.webStream; + +import com.google.common.io.InputSupplier; +import com.metamx.druid.jackson.DefaultObjectMapper; +import junit.framework.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +public class UpdateStreamTest +{ + private final ArrayList dimensions = new ArrayList(); + private InputSupplier testCaseSupplier; + DefaultObjectMapper mapper = new DefaultObjectMapper(); + Map expectedAnswer = new HashMap(); + String timeDimension; + + @BeforeClass + public void setUp() + { + timeDimension = "time"; + testCaseSupplier = new TestCaseSupplier( + "{\"item1\": \"value1\"," + + "\"item2\":2," + + "\"time\":1372121562 }" + ); + + dimensions.add("item1"); + dimensions.add("item2"); + dimensions.add("time"); + + expectedAnswer.put("item1", "value1"); + expectedAnswer.put("item2", 2); + expectedAnswer.put("time", 1372121562); + } + + @Test(expectedExceptions = UnknownHostException.class) + public void checkInvalidUrl() throws Exception + { + + String invalidURL = "http://invalid.url"; + WebJsonSupplier supplier = new WebJsonSupplier(invalidURL); + supplier.getInput(); + } + + @Test + public void basicIngestionCheck() throws Exception + { + UpdateStream updateStream = new UpdateStream( + testCaseSupplier, + null, + timeDimension + ); + updateStream.run(); + Map insertedRow = updateStream.pollFromQueue(); + Assert.assertEquals(expectedAnswer, insertedRow); + } + + //If a timestamp is missing, we should throw away the event + @Test + public void missingTimeStampCheck() + { + testCaseSupplier = new TestCaseSupplier( + "{\"item1\": \"value1\"," + + "\"item2\":2}" + ); + + UpdateStream updateStream = new UpdateStream( + testCaseSupplier, + null, + timeDimension + ); + updateStream.run(); + Assert.assertEquals(updateStream.getQueueSize(), 0); + } + + //If any other value is missing, we should still add the event and process it properly + @Test + public void otherNullValueCheck() throws Exception + { + testCaseSupplier = new TestCaseSupplier( + "{\"item1\": \"value1\"," + + "\"time\":1372121562 }" + ); + Map expectedAnswer = new HashMap(); + expectedAnswer.put("item1", "value1"); + expectedAnswer.put("time", 1372121562); + UpdateStream updateStream = new UpdateStream( + testCaseSupplier, + null, + timeDimension + ); + updateStream.run(); + Map insertedRow = updateStream.pollFromQueue(); + Assert.assertEquals(expectedAnswer, insertedRow); + } + + @Test + public void checkRenameKeys() throws Exception + { + Map expectedAnswer = new HashMap(); + Map renamedDimensions = new HashMap(); + renamedDimensions.put("item1", "i1"); + renamedDimensions.put("item2", "i2"); + renamedDimensions.put("time", "t"); + + expectedAnswer.put("i1", "value1"); + expectedAnswer.put("i2", 2); + expectedAnswer.put("t", 1372121562); + + UpdateStream updateStream = new UpdateStream( + testCaseSupplier, + renamedDimensions, + timeDimension + ); + updateStream.run(); + Map inputRow = updateStream.pollFromQueue(); + Assert.assertEquals(expectedAnswer, inputRow); + } + +} diff --git a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java index cc58d3cba97..a174ba84654 100644 --- a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java +++ b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java @@ -27,7 +27,6 @@ import com.metamx.common.parsers.TimestampParser; import com.metamx.druid.guava.Runnables; import com.metamx.druid.input.InputRow; import com.metamx.druid.input.MapBasedInputRow; -import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.realtime.firehose.Firehose; import com.metamx.druid.realtime.firehose.FirehoseFactory; import com.metamx.emitter.EmittingLogger; @@ -36,8 +35,6 @@ import org.joda.time.DateTime; import java.io.IOException; import java.util.ArrayList; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -45,11 +42,8 @@ import java.util.concurrent.Executors; public class WebFirehoseFactory implements FirehoseFactory { private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class); - private final String url; - private final String timeDimension; - private final String newTimeDimension; - private final Map renamedDimensions; private final String timeFormat; + private final UpdateStreamFactory updateStreamFactory; @JsonCreator @@ -60,19 +54,15 @@ public class WebFirehoseFactory implements FirehoseFactory @JsonProperty("timeFormat") String timeFormat ) { - this.url = url; - this.renamedDimensions = renamedDimensions; - this.timeDimension = timeDimension; - if (renamedDimensions != null) { - newTimeDimension = renamedDimensions.get(timeDimension); - } - else { - newTimeDimension = timeDimension; - } + this(new UpdateStreamFactory(new WebJsonSupplier(url), renamedDimensions, timeDimension), timeFormat); + } + + public WebFirehoseFactory(UpdateStreamFactory updateStreamFactory, String timeFormat) + { + this.updateStreamFactory = updateStreamFactory; if (timeFormat == null) { this.timeFormat = "auto"; - } - else { + } else { this.timeFormat = timeFormat; } } @@ -81,23 +71,25 @@ public class WebFirehoseFactory implements FirehoseFactory public Firehose connect() throws IOException { - final UpdateStream updateStream = new UpdateStream( - new WebJsonSupplier(url), - new DefaultObjectMapper(), - renamedDimensions, - timeDimension - ); + final UpdateStream updateStream = updateStreamFactory.build(); final ExecutorService service = Executors.newSingleThreadExecutor(); service.submit(updateStream); return new Firehose() { + Map map; private final Runnable doNothingRunnable = Runnables.getNoopRunnable(); @Override public boolean hasMore() { - return !service.isTerminated(); + try { + map = updateStream.pollFromQueue(); + return map != null; + } + catch (Exception e) { + throw Throwables.propagate(e); + } } @@ -105,9 +97,8 @@ public class WebFirehoseFactory implements FirehoseFactory public InputRow nextRow() { try { - Map map = updateStream.takeFromQueue(); DateTime date = TimestampParser.createTimestampParser(timeFormat) - .apply(map.get(newTimeDimension).toString()); + .apply(map.get(updateStream.getNewTimeDimension()).toString()); return new MapBasedInputRow( date.getMillis(), new ArrayList(map.keySet()), diff --git a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java index cc117dc5034..5494a500826 100644 --- a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java +++ b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java @@ -19,200 +19,139 @@ package druid.examples.webStream; -import com.google.common.io.InputSupplier; -import com.metamx.druid.jackson.DefaultObjectMapper; +import com.beust.jcommander.internal.Lists; +import com.metamx.druid.input.InputRow; +import com.metamx.druid.realtime.firehose.Firehose; import junit.framework.Assert; +import org.joda.time.DateTime; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; +import java.util.Collections; +import java.util.List; public class WebFirehoseFactoryTest { - private final ArrayList dimensions = new ArrayList(); - private InputSupplier testCaseSupplier; - String timeDimension = "t"; - DefaultObjectMapper mapper = new DefaultObjectMapper(); - Map expectedAnswer = new HashMap(); + private List dimensions; + private WebFirehoseFactory webbie; + private Firehose firehose; + private InputRow inputRow; + private TestCaseSupplier testCaseSupplier; @BeforeClass - public void setUp() + public void setUp() throws Exception + { + dimensions = new ArrayList(); + dimensions.add("item1"); + dimensions.add("item2"); + dimensions.add("time"); + testCaseSupplier = new TestCaseSupplier( + "{\"item1\":\"value1\"," + + "\"item2\":2," + + "\"time\":1372121562 }" + ); + + UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, null, "time"); + webbie = new WebFirehoseFactory(updateStreamFactory, "posix"); + firehose = webbie.connect(); + if (firehose.hasMore()) { + inputRow = firehose.nextRow(); + } else { + throw new RuntimeException("queue is empty"); + } + + } + + @Test + public void testDimensions() throws Exception + { + List actualAnswer = inputRow.getDimensions(); + Collections.sort(actualAnswer); + Assert.assertEquals(actualAnswer, dimensions); + } + + @Test + public void testPosixTimeStamp() throws Exception + { + long expectedTime = 1372121562L * 1000L; + Assert.assertEquals(expectedTime, inputRow.getTimestampFromEpoch()); + } + + @Test + public void testISOTimeStamp() throws Exception { testCaseSupplier = new TestCaseSupplier( - "{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }" + "{\"item1\": \"value1\"," + + "\"item2\":2," + + "\"time\":\"2013-07-08\"}" ); - dimensions.add("g"); - dimensions.add("c"); - dimensions.add("a"); - dimensions.add("cy"); - dimensions.add("l"); - dimensions.add("hh"); - dimensions.add("hc"); - dimensions.add("h"); - dimensions.add("u"); - dimensions.add("tz"); - dimensions.add("t"); - dimensions.add("r"); - dimensions.add("gr"); - dimensions.add("nk"); - dimensions.add("al"); - dimensions.add("ll"); - - expectedAnswer.put("a", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0"); - expectedAnswer.put("c", "US"); - expectedAnswer.put("nk", 1); - expectedAnswer.put("tz", "America/New_York"); - expectedAnswer.put("gr", "NY"); - expectedAnswer.put("g", "1Chgyj"); - expectedAnswer.put("h", "15vMQjX"); - expectedAnswer.put("l", "o_d63rn9enb"); - expectedAnswer.put("al", "en-US,en;q=0.5"); - expectedAnswer.put("hh", "1.usa.gov"); - expectedAnswer.put( - "r", - "http://forecast.weather.gov/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX" - ); - expectedAnswer.put("u", "http://www.spc.ncep.noaa.gov/"); - expectedAnswer.put("t", 1372121562); - expectedAnswer.put("hc", 1368193091); - expectedAnswer.put("cy", "New York"); - expectedAnswer.put("ll", Arrays.asList(40.862598, -73.921799)); - - } - - @Test(expectedExceptions = UnknownHostException.class) - public void checkInvalidUrl() throws Exception - { - - String invalidURL = "http://invalid.url"; - WebJsonSupplier supplier = new WebJsonSupplier(invalidURL); - supplier.getInput(); + UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, null, "time"); + webbie = new WebFirehoseFactory(updateStreamFactory, "iso"); + Firehose firehose1 = webbie.connect(); + if (firehose1.hasMore()) { + long milliSeconds = firehose1.nextRow().getTimestampFromEpoch(); + DateTime date = new DateTime("2013-07-08"); + Assert.assertEquals(date.getMillis(), milliSeconds); + } else { + Assert.assertFalse("hasMore returned false", true); + } } @Test - public void basicIngestionCheck() throws Exception + public void testAutoIsoTimeStamp() throws Exception { - UpdateStream updateStream = new UpdateStream( - testCaseSupplier, - mapper, - null, - timeDimension - ); - updateStream.run(); - Map insertedRow = updateStream.takeFromQueue(); - Assert.assertEquals(expectedAnswer, insertedRow); - } - - //If a timestamp is missing, we should throw away the event - @Test - public void missingTimeStampCheck() - { - InputSupplier testCaseSupplier = new TestCaseSupplier( - "{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }" - ); - UpdateStream updateStream = new UpdateStream( - testCaseSupplier, - mapper, - null, - timeDimension - ); - updateStream.run(); - Assert.assertEquals(updateStream.getQueueSize(), 0); - } - - //If any other value is missing, we should still add the event and process it properly - @Test - public void otherNullValueCheck() throws Exception - { - Map expectedAnswer = new HashMap(); - expectedAnswer.put("a", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0"); - expectedAnswer.put("nk", 1); - expectedAnswer.put("tz", "America/New_York"); - expectedAnswer.put("gr", "NY"); - expectedAnswer.put("g", "1Chgyj"); - expectedAnswer.put("h", "15vMQjX"); - expectedAnswer.put("l", "o_d63rn9enb"); - expectedAnswer.put("al", "en-US,en;q=0.5"); - expectedAnswer.put("hh", "1.usa.gov"); - expectedAnswer.put( - "r", - "http://forecast.weather.gov/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX" - ); - expectedAnswer.put("u", "http://www.spc.ncep.noaa.gov/"); - expectedAnswer.put("t", 1372121562); - expectedAnswer.put("hc", 1368193091); - expectedAnswer.put("cy", "New York"); - expectedAnswer.put("ll", Arrays.asList(40.862598, -73.921799)); testCaseSupplier = new TestCaseSupplier( - "{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }" + "{\"item1\": \"value1\"," + + "\"item2\":2," + + "\"time\":\"2013-07-08\"}" ); - UpdateStream updateStream = new UpdateStream( - testCaseSupplier, - mapper, - null, - timeDimension - ); - updateStream.run(); - Map insertedRow = updateStream.takeFromQueue(); - Assert.assertEquals(expectedAnswer, insertedRow); + + UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, null, "time"); + webbie = new WebFirehoseFactory(updateStreamFactory, null); + Firehose firehose2 = webbie.connect(); + if (firehose2.hasMore()) { + long milliSeconds = firehose2.nextRow().getTimestampFromEpoch(); + DateTime date = new DateTime("2013-07-08"); + Assert.assertEquals(date.getMillis(), milliSeconds); + } else { + Assert.assertFalse("hasMore returned false", true); + } } @Test - public void checkRenameKeys() throws Exception + public void testAutoMilliSecondsTimeStamp() throws Exception { - Map expectedAnswer = new HashMap(); - Map renamedDimensions = new HashMap(); - renamedDimensions.put("g","bitly_hash"); - renamedDimensions.put("c","country"); - renamedDimensions.put("a","user"); - renamedDimensions.put("cy","city"); - renamedDimensions.put("l","encoding_user_login"); - renamedDimensions.put("hh","short_url"); - renamedDimensions.put("hc","timestamp_hash"); - renamedDimensions.put("h","user_bitly_hash"); - renamedDimensions.put("u","url"); - renamedDimensions.put("tz","timezone"); - renamedDimensions.put("t","time"); - renamedDimensions.put("r","referring_url"); - renamedDimensions.put("gr","geo_region"); - renamedDimensions.put("nk","known_users"); - renamedDimensions.put("al","accept_language"); - renamedDimensions.put("ll","latitude_longitude"); - - expectedAnswer.put("user", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0"); - expectedAnswer.put("country", "US"); - expectedAnswer.put("known_users", 1); - expectedAnswer.put("timezone", "America/New_York"); - expectedAnswer.put("geo_region", "NY"); - expectedAnswer.put("bitly_hash", "1Chgyj"); - expectedAnswer.put("user_bitly_hash", "15vMQjX"); - expectedAnswer.put("encoding_user_login", "o_d63rn9enb"); - expectedAnswer.put("accept_language", "en-US,en;q=0.5"); - expectedAnswer.put("short_url", "1.usa.gov"); - expectedAnswer.put( - "referring_url", - "http://forecast.weather.gov/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX" + testCaseSupplier = new TestCaseSupplier( + "{\"item1\": \"value1\"," + + "\"item2\":2," + + "\"time\":1373241600000}" ); - expectedAnswer.put("url", "http://www.spc.ncep.noaa.gov/"); - expectedAnswer.put("time", 1372121562); - expectedAnswer.put("timestamp_hash", 1368193091); - expectedAnswer.put("city", "New York"); - expectedAnswer.put("latitude_longitude", Arrays.asList(40.862598, -73.921799)); - UpdateStream updateStream = new UpdateStream( - testCaseSupplier, - mapper, - renamedDimensions, - timeDimension - ); - updateStream.run(); - Map inputRow = updateStream.takeFromQueue(); - Assert.assertEquals(expectedAnswer, inputRow); + UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, null, "time"); + webbie = new WebFirehoseFactory(updateStreamFactory, null); + Firehose firehose3 = webbie.connect(); + if (firehose3.hasMore()) { + long milliSeconds = firehose3.nextRow().getTimestampFromEpoch(); + DateTime date = new DateTime("2013-07-08"); + Assert.assertEquals(date.getMillis(), milliSeconds); + } else { + Assert.assertFalse("hasMore returned false", true); + } } + @Test + public void testGetDimension() + { + List column1 = Lists.newArrayList(); + column1.add("value1"); + Assert.assertEquals(column1, inputRow.getDimension("item1")); + } + + @Test + public void testGetFloatMetric() + { + Assert.assertEquals((float) 2.0, inputRow.getFloatMetric("item2")); + } }