diff --git a/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStream.java b/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStream.java new file mode 100644 index 00000000000..922839f493e --- /dev/null +++ b/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStream.java @@ -0,0 +1,79 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.webStream; + +import com.google.common.collect.Maps; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class RenamingKeysUpdateStream implements Runnable +{ + + private final UpdateStream updateStream; + private Map renamedDimensions; + + public RenamingKeysUpdateStream( + UpdateStream updateStream, + Map renamedDimensions + ) + { + this.renamedDimensions = renamedDimensions; + this.updateStream = updateStream; + } + + public Map pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException + { + return renameKeys(updateStream.pollFromQueue(waitTime, unit)); + } + + + private Map renameKeys(Map update) + { + if (renamedDimensions != null) { + Map renamedMap = Maps.newHashMap(); + for (String key : renamedDimensions.keySet()) { + if (update.get(key) != null) { + Object obj = update.get(key); + renamedMap.put(renamedDimensions.get(key), obj); + } + } + return renamedMap; + } else { + return update; + } + } + + public String getTimeDimension() + { + if (renamedDimensions != null && renamedDimensions.get(updateStream.getTimeDimension()) != null) { + return renamedDimensions.get(updateStream.getTimeDimension()); + } + return updateStream.getTimeDimension(); + + } + + + @Override + public void run() + { + updateStream.run(); + } +} diff --git a/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStreamFactory.java b/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStreamFactory.java new file mode 100644 index 00000000000..4dafc3becd3 --- /dev/null +++ b/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStreamFactory.java @@ -0,0 +1,37 @@ +package druid.examples.webStream;/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +import java.util.Map; + +public class RenamingKeysUpdateStreamFactory +{ + private UpdateStreamFactory updateStreamFactory; + private Map renamedDimensions; + + public RenamingKeysUpdateStreamFactory(UpdateStreamFactory updateStreamFactory, Map renamedDimensions) + { + this.updateStreamFactory = updateStreamFactory; + this.renamedDimensions = renamedDimensions; + } + + public RenamingKeysUpdateStream build() + { + return new RenamingKeysUpdateStream(updateStreamFactory.build(), renamedDimensions); + } +} diff --git a/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStreamTest.java b/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStreamTest.java new file mode 100644 index 00000000000..d4acd52fabc --- /dev/null +++ b/examples/src/main/java/druid/examples/webStream/RenamingKeysUpdateStreamTest.java @@ -0,0 +1,92 @@ +package druid.examples.webStream;/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +import com.google.common.io.InputSupplier; +import junit.framework.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class RenamingKeysUpdateStreamTest +{ + private final long waitTime = 15L; + private final TimeUnit unit = TimeUnit.SECONDS; + private InputSupplier testCaseSupplier; + String timeDimension; + + @BeforeClass + public void setUp() + { + timeDimension = "time"; + testCaseSupplier = new TestCaseSupplier( + "{\"item1\": \"value1\"," + + "\"item2\":2," + + "\"time\":1372121562 }" + ); + } + + @Test + public void testPollFromQueue() throws Exception + { + UpdateStream updateStream = new UpdateStream(testCaseSupplier, timeDimension); + Map renamedKeys = new HashMap(); + renamedKeys.put("item1", "i1"); + renamedKeys.put("item2", "i2"); + renamedKeys.put("time", "t"); + + RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys); + renamer.run(); + Map expectedAnswer = new HashMap(); + expectedAnswer.put("i1", "value1"); + expectedAnswer.put("i2", 2); + expectedAnswer.put("t", 1372121562); + + + Assert.assertEquals(expectedAnswer, renamer.pollFromQueue(waitTime, unit)); + } + + @Test + public void testGetTimeDimension() throws Exception + { + UpdateStream updateStream = new UpdateStream(testCaseSupplier, timeDimension); + Map renamedKeys = new HashMap(); + renamedKeys.put("item1", "i1"); + renamedKeys.put("item2", "i2"); + renamedKeys.put("time", "t"); + + RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys); + Assert.assertEquals("t", renamer.getTimeDimension()); + } + + @Test + public void testMissingTimeRename() throws Exception + { + UpdateStream updateStream = new UpdateStream(testCaseSupplier, timeDimension); + Map renamedKeys = new HashMap(); + renamedKeys.put("item1", "i1"); + renamedKeys.put("item2", "i2"); + RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys); + Assert.assertEquals("time", renamer.getTimeDimension()); + + } + +} diff --git a/examples/src/main/java/druid/examples/webStream/UpdateStream.java b/examples/src/main/java/druid/examples/webStream/UpdateStream.java index 87e1a2ffffb..3a19e11b6b5 100644 --- a/examples/src/main/java/druid/examples/webStream/UpdateStream.java +++ b/examples/src/main/java/druid/examples/webStream/UpdateStream.java @@ -43,13 +43,10 @@ public class UpdateStream implements Runnable private final int QUEUE_SIZE = 10000; private final BlockingQueue> queue = new ArrayBlockingQueue>(QUEUE_SIZE); private final ObjectMapper mapper = new DefaultObjectMapper(); - private final Map renamedDimensions; private final String timeDimension; - private final long waitTime = 15L; public UpdateStream( InputSupplier supplier, - Map renamedDimensions, String timeDimension ) { @@ -58,7 +55,6 @@ public class UpdateStream implements Runnable { }; this.timeDimension = timeDimension; - this.renamedDimensions = renamedDimensions; } private boolean isValid(String s) @@ -76,8 +72,7 @@ public class UpdateStream implements Runnable if (isValid(line)) { HashMap map = mapper.readValue(line, typeRef); if (map.get(timeDimension) != null) { - Map renamedMap = renameKeys(map); - queue.offer(renamedMap, queueWaitTime, TimeUnit.SECONDS); + queue.offer(map, queueWaitTime, TimeUnit.SECONDS); log.debug("Successfully added to queue"); } else { log.error("missing timestamp"); @@ -91,25 +86,10 @@ public class UpdateStream implements Runnable } - private Map renameKeys(Map update) - { - if (renamedDimensions != null) { - Map renamedMap = Maps.newHashMap(); - for (String key : renamedDimensions.keySet()) { - if (update.get(key) != null) { - Object obj = update.get(key); - renamedMap.put(renamedDimensions.get(key), obj); - } - } - return renamedMap; - } else { - return update; - } - } - public Map pollFromQueue() throws InterruptedException + public Map pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException { - return queue.poll(waitTime, TimeUnit.SECONDS); + return queue.poll(waitTime, unit); } public int getQueueSize() @@ -117,13 +97,8 @@ public class UpdateStream implements Runnable return queue.size(); } - public String getNewTimeDimension() - { - if (renamedDimensions != null) { - return renamedDimensions.get(timeDimension); - } else { - return timeDimension; - } - + public String getTimeDimension(){ + return timeDimension; } + } diff --git a/examples/src/main/java/druid/examples/webStream/UpdateStreamFactory.java b/examples/src/main/java/druid/examples/webStream/UpdateStreamFactory.java index 2bff55b3ce3..2a69eb42c1d 100644 --- a/examples/src/main/java/druid/examples/webStream/UpdateStreamFactory.java +++ b/examples/src/main/java/druid/examples/webStream/UpdateStreamFactory.java @@ -21,24 +21,20 @@ package druid.examples.webStream; import com.google.common.io.InputSupplier; -import java.util.Map; - public class UpdateStreamFactory { private final InputSupplier inputSupplier; - private final Map renamedDimensions; private final String timeDimension; - public UpdateStreamFactory(InputSupplier inputSupplier, Map renamedDimensions, String timeDimension) + public UpdateStreamFactory(InputSupplier inputSupplier, String timeDimension) { this.inputSupplier = inputSupplier; - this.renamedDimensions = renamedDimensions; this.timeDimension = timeDimension; } public UpdateStream build() { - return new UpdateStream(inputSupplier, renamedDimensions, timeDimension); + return new UpdateStream(inputSupplier, timeDimension); } } diff --git a/examples/src/main/java/druid/examples/webStream/UpdateStreamTest.java b/examples/src/main/java/druid/examples/webStream/UpdateStreamTest.java index 1aed4566213..a14993ff9a7 100644 --- a/examples/src/main/java/druid/examples/webStream/UpdateStreamTest.java +++ b/examples/src/main/java/druid/examples/webStream/UpdateStreamTest.java @@ -20,21 +20,21 @@ package druid.examples.webStream; import com.google.common.io.InputSupplier; -import com.metamx.druid.jackson.DefaultObjectMapper; import junit.framework.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; public class UpdateStreamTest { + private final long waitTime = 15L; + private final TimeUnit unit = TimeUnit.SECONDS; private final ArrayList dimensions = new ArrayList(); private InputSupplier testCaseSupplier; - DefaultObjectMapper mapper = new DefaultObjectMapper(); Map expectedAnswer = new HashMap(); String timeDimension; @@ -57,25 +57,16 @@ public class UpdateStreamTest expectedAnswer.put("time", 1372121562); } - @Test(expectedExceptions = UnknownHostException.class) - public void checkInvalidUrl() throws Exception - { - - String invalidURL = "http://invalid.url"; - WebJsonSupplier supplier = new WebJsonSupplier(invalidURL); - supplier.getInput(); - } @Test public void basicIngestionCheck() throws Exception { UpdateStream updateStream = new UpdateStream( testCaseSupplier, - null, timeDimension ); updateStream.run(); - Map insertedRow = updateStream.pollFromQueue(); + Map insertedRow = updateStream.pollFromQueue(waitTime, unit); Assert.assertEquals(expectedAnswer, insertedRow); } @@ -90,7 +81,6 @@ public class UpdateStreamTest UpdateStream updateStream = new UpdateStream( testCaseSupplier, - null, timeDimension ); updateStream.run(); @@ -105,40 +95,17 @@ public class UpdateStreamTest "{\"item1\": \"value1\"," + "\"time\":1372121562 }" ); + UpdateStream updateStream = new UpdateStream( + testCaseSupplier, + timeDimension + ); + updateStream.run(); + Map insertedRow = updateStream.pollFromQueue(waitTime, unit); Map expectedAnswer = new HashMap(); expectedAnswer.put("item1", "value1"); expectedAnswer.put("time", 1372121562); - UpdateStream updateStream = new UpdateStream( - testCaseSupplier, - null, - timeDimension - ); - updateStream.run(); - Map insertedRow = updateStream.pollFromQueue(); Assert.assertEquals(expectedAnswer, insertedRow); } - @Test - public void checkRenameKeys() throws Exception - { - Map expectedAnswer = new HashMap(); - Map renamedDimensions = new HashMap(); - renamedDimensions.put("item1", "i1"); - renamedDimensions.put("item2", "i2"); - renamedDimensions.put("time", "t"); - - expectedAnswer.put("i1", "value1"); - expectedAnswer.put("i2", 2); - expectedAnswer.put("t", 1372121562); - - UpdateStream updateStream = new UpdateStream( - testCaseSupplier, - renamedDimensions, - timeDimension - ); - updateStream.run(); - Map inputRow = updateStream.pollFromQueue(); - Assert.assertEquals(expectedAnswer, inputRow); - } } diff --git a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java index a174ba84654..a1285ee18e4 100644 --- a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java +++ b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactory.java @@ -34,17 +34,19 @@ import org.joda.time.DateTime; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; @JsonTypeName("webstream") public class WebFirehoseFactory implements FirehoseFactory { private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class); private final String timeFormat; - private final UpdateStreamFactory updateStreamFactory; - + private final RenamingKeysUpdateStreamFactory factory; + private final long queueWaitTime = 15L; @JsonCreator public WebFirehoseFactory( @@ -54,12 +56,17 @@ public class WebFirehoseFactory implements FirehoseFactory @JsonProperty("timeFormat") String timeFormat ) { - this(new UpdateStreamFactory(new WebJsonSupplier(url), renamedDimensions, timeDimension), timeFormat); + this( + new RenamingKeysUpdateStreamFactory( + new UpdateStreamFactory(new WebJsonSupplier(url), timeDimension), + renamedDimensions + ), timeFormat + ); } - public WebFirehoseFactory(UpdateStreamFactory updateStreamFactory, String timeFormat) + public WebFirehoseFactory(RenamingKeysUpdateStreamFactory factory, String timeFormat) { - this.updateStreamFactory = updateStreamFactory; + this.factory = factory; if (timeFormat == null) { this.timeFormat = "auto"; } else { @@ -71,9 +78,9 @@ public class WebFirehoseFactory implements FirehoseFactory public Firehose connect() throws IOException { - final UpdateStream updateStream = updateStreamFactory.build(); + final RenamingKeysUpdateStream renamingUpdateStream = factory.build(); final ExecutorService service = Executors.newSingleThreadExecutor(); - service.submit(updateStream); + service.submit(renamingUpdateStream); return new Firehose() { @@ -84,10 +91,10 @@ public class WebFirehoseFactory implements FirehoseFactory public boolean hasMore() { try { - map = updateStream.pollFromQueue(); + map = renamingUpdateStream.pollFromQueue(queueWaitTime, TimeUnit.SECONDS); return map != null; } - catch (Exception e) { + catch (InterruptedException e) { throw Throwables.propagate(e); } } @@ -98,7 +105,7 @@ public class WebFirehoseFactory implements FirehoseFactory { try { DateTime date = TimestampParser.createTimestampParser(timeFormat) - .apply(map.get(updateStream.getNewTimeDimension()).toString()); + .apply(map.get(renamingUpdateStream.getTimeDimension()).toString()); return new MapBasedInputRow( date.getMillis(), new ArrayList(map.keySet()), @@ -108,6 +115,9 @@ public class WebFirehoseFactory implements FirehoseFactory catch (Exception e) { throw Throwables.propagate(e); } + finally { + map = null; + } } @Override diff --git a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java index 5494a500826..e51ef9e79ff 100644 --- a/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java +++ b/examples/src/main/java/druid/examples/webStream/WebFirehoseFactoryTest.java @@ -35,8 +35,6 @@ public class WebFirehoseFactoryTest { private List dimensions; private WebFirehoseFactory webbie; - private Firehose firehose; - private InputRow inputRow; private TestCaseSupplier testCaseSupplier; @BeforeClass @@ -51,21 +49,20 @@ public class WebFirehoseFactoryTest + "\"item2\":2," + "\"time\":1372121562 }" ); - - UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, null, "time"); - webbie = new WebFirehoseFactory(updateStreamFactory, "posix"); - firehose = webbie.connect(); - if (firehose.hasMore()) { - inputRow = firehose.nextRow(); - } else { - throw new RuntimeException("queue is empty"); - } - } @Test public void testDimensions() throws Exception { + InputRow inputRow; + UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, "time"); + webbie = new WebFirehoseFactory(new RenamingKeysUpdateStreamFactory(updateStreamFactory, null), "posix"); + Firehose firehose = webbie.connect(); + if (firehose.hasMore()) { + inputRow = firehose.nextRow(); + } else { + throw new RuntimeException("queue is empty"); + } List actualAnswer = inputRow.getDimensions(); Collections.sort(actualAnswer); Assert.assertEquals(actualAnswer, dimensions); @@ -74,6 +71,15 @@ public class WebFirehoseFactoryTest @Test public void testPosixTimeStamp() throws Exception { + InputRow inputRow; + UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, "time"); + webbie = new WebFirehoseFactory(new RenamingKeysUpdateStreamFactory(updateStreamFactory, null), "posix"); + Firehose firehose = webbie.connect(); + if (firehose.hasMore()) { + inputRow = firehose.nextRow(); + } else { + throw new RuntimeException("queue is empty"); + } long expectedTime = 1372121562L * 1000L; Assert.assertEquals(expectedTime, inputRow.getTimestampFromEpoch()); } @@ -87,8 +93,8 @@ public class WebFirehoseFactoryTest + "\"time\":\"2013-07-08\"}" ); - UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, null, "time"); - webbie = new WebFirehoseFactory(updateStreamFactory, "iso"); + UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, "time"); + webbie = new WebFirehoseFactory(new RenamingKeysUpdateStreamFactory(updateStreamFactory, null), "iso"); Firehose firehose1 = webbie.connect(); if (firehose1.hasMore()) { long milliSeconds = firehose1.nextRow().getTimestampFromEpoch(); @@ -108,8 +114,8 @@ public class WebFirehoseFactoryTest + "\"time\":\"2013-07-08\"}" ); - UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, null, "time"); - webbie = new WebFirehoseFactory(updateStreamFactory, null); + UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, "time"); + webbie = new WebFirehoseFactory(new RenamingKeysUpdateStreamFactory(updateStreamFactory, null), null); Firehose firehose2 = webbie.connect(); if (firehose2.hasMore()) { long milliSeconds = firehose2.nextRow().getTimestampFromEpoch(); @@ -129,8 +135,8 @@ public class WebFirehoseFactoryTest + "\"time\":1373241600000}" ); - UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, null, "time"); - webbie = new WebFirehoseFactory(updateStreamFactory, null); + UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, "time"); + webbie = new WebFirehoseFactory(new RenamingKeysUpdateStreamFactory(updateStreamFactory, null), null); Firehose firehose3 = webbie.connect(); if (firehose3.hasMore()) { long milliSeconds = firehose3.nextRow().getTimestampFromEpoch(); @@ -142,16 +148,36 @@ public class WebFirehoseFactoryTest } @Test - public void testGetDimension() + public void testGetDimension() throws Exception { + InputRow inputRow; + UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, "time"); + webbie = new WebFirehoseFactory(new RenamingKeysUpdateStreamFactory(updateStreamFactory, null), "posix"); + Firehose firehose = webbie.connect(); + if (firehose.hasMore()) { + inputRow = firehose.nextRow(); + } else { + throw new RuntimeException("queue is empty"); + } + List column1 = Lists.newArrayList(); column1.add("value1"); Assert.assertEquals(column1, inputRow.getDimension("item1")); } @Test - public void testGetFloatMetric() + public void testGetFloatMetric() throws Exception { + InputRow inputRow; + UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, "time"); + webbie = new WebFirehoseFactory(new RenamingKeysUpdateStreamFactory(updateStreamFactory, null), "posix"); + Firehose firehose = webbie.connect(); + if (firehose.hasMore()) { + inputRow = firehose.nextRow(); + } else { + throw new RuntimeException("queue is empty"); + } + Assert.assertEquals((float) 2.0, inputRow.getFloatMetric("item2")); } } diff --git a/examples/src/main/java/druid/examples/webStream/WebJsonSupplier.java b/examples/src/main/java/druid/examples/webStream/WebJsonSupplier.java index b8a1cd5f7a0..105a51204b8 100644 --- a/examples/src/main/java/druid/examples/webStream/WebJsonSupplier.java +++ b/examples/src/main/java/druid/examples/webStream/WebJsonSupplier.java @@ -31,7 +31,6 @@ import java.util.List; public class WebJsonSupplier implements InputSupplier { - private List dimensions; private static final EmittingLogger log = new EmittingLogger(WebJsonSupplier.class); private String urlString; diff --git a/examples/src/main/java/druid/examples/webStream/WebJsonSupplierTest.java b/examples/src/main/java/druid/examples/webStream/WebJsonSupplierTest.java new file mode 100644 index 00000000000..c5b880b5f0a --- /dev/null +++ b/examples/src/main/java/druid/examples/webStream/WebJsonSupplierTest.java @@ -0,0 +1,36 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package druid.examples.webStream; + +import org.testng.annotations.Test; + +import java.net.UnknownHostException; + +public class WebJsonSupplierTest +{ + @Test(expectedExceptions = UnknownHostException.class) + public void checkInvalidUrl() throws Exception + { + + String invalidURL = "http://invalid.url"; + WebJsonSupplier supplier = new WebJsonSupplier(invalidURL); + supplier.getInput(); + } +}