better encapsulation

This commit is contained in:
Dhruv Parthasarathy 2013-07-08 16:46:47 -07:00
parent 01b4728c40
commit 142271aad2
5 changed files with 339 additions and 211 deletions

View File

@ -24,10 +24,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.InputSupplier;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.emitter.EmittingLogger;
import java.io.BufferedReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
@ -42,19 +42,18 @@ public class UpdateStream implements Runnable
private final InputSupplier<BufferedReader> supplier;
private final int QUEUE_SIZE = 10000;
private final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
private final ObjectMapper mapper;
private final ObjectMapper mapper = new DefaultObjectMapper();
private final Map<String, String> renamedDimensions;
private final String timeDimension;
private final long waitTime = 15L;
public UpdateStream(
InputSupplier<BufferedReader> supplier,
ObjectMapper mapper,
Map<String, String> renamedDimensions,
String timeDimension
)
{
this.supplier = supplier;
this.mapper = mapper;
this.typeRef = new TypeReference<HashMap<String, Object>>()
{
};
@ -103,17 +102,28 @@ public class UpdateStream implements Runnable
}
}
return renamedMap;
}
else{
} else {
return update;
}
}
public Map<String,Object> takeFromQueue() throws InterruptedException{
return queue.take();
public Map<String, Object> pollFromQueue() throws InterruptedException
{
return queue.poll(waitTime, TimeUnit.SECONDS);
}
public int getQueueSize(){
public int getQueueSize()
{
return queue.size();
}
public String getNewTimeDimension()
{
if (renamedDimensions != null) {
return renamedDimensions.get(timeDimension);
} else {
return timeDimension;
}
}
}

View File

@ -0,0 +1,44 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.webStream;
import com.google.common.io.InputSupplier;
import java.util.Map;
public class UpdateStreamFactory
{
private final InputSupplier inputSupplier;
private final Map<String, String> renamedDimensions;
private final String timeDimension;
public UpdateStreamFactory(InputSupplier inputSupplier, Map<String, String> renamedDimensions, String timeDimension)
{
this.inputSupplier = inputSupplier;
this.renamedDimensions = renamedDimensions;
this.timeDimension = timeDimension;
}
public UpdateStream build()
{
return new UpdateStream(inputSupplier, renamedDimensions, timeDimension);
}
}

View File

@ -0,0 +1,144 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.webStream;
import com.google.common.io.InputSupplier;
import com.metamx.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
public class UpdateStreamTest
{
private final ArrayList<String> dimensions = new ArrayList<String>();
private InputSupplier testCaseSupplier;
DefaultObjectMapper mapper = new DefaultObjectMapper();
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
String timeDimension;
@BeforeClass
public void setUp()
{
timeDimension = "time";
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"item2\":2,"
+ "\"time\":1372121562 }"
);
dimensions.add("item1");
dimensions.add("item2");
dimensions.add("time");
expectedAnswer.put("item1", "value1");
expectedAnswer.put("item2", 2);
expectedAnswer.put("time", 1372121562);
}
@Test(expectedExceptions = UnknownHostException.class)
public void checkInvalidUrl() throws Exception
{
String invalidURL = "http://invalid.url";
WebJsonSupplier supplier = new WebJsonSupplier(invalidURL);
supplier.getInput();
}
@Test
public void basicIngestionCheck() throws Exception
{
UpdateStream updateStream = new UpdateStream(
testCaseSupplier,
null,
timeDimension
);
updateStream.run();
Map<String, Object> insertedRow = updateStream.pollFromQueue();
Assert.assertEquals(expectedAnswer, insertedRow);
}
//If a timestamp is missing, we should throw away the event
@Test
public void missingTimeStampCheck()
{
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"item2\":2}"
);
UpdateStream updateStream = new UpdateStream(
testCaseSupplier,
null,
timeDimension
);
updateStream.run();
Assert.assertEquals(updateStream.getQueueSize(), 0);
}
//If any other value is missing, we should still add the event and process it properly
@Test
public void otherNullValueCheck() throws Exception
{
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"time\":1372121562 }"
);
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
expectedAnswer.put("item1", "value1");
expectedAnswer.put("time", 1372121562);
UpdateStream updateStream = new UpdateStream(
testCaseSupplier,
null,
timeDimension
);
updateStream.run();
Map<String, Object> insertedRow = updateStream.pollFromQueue();
Assert.assertEquals(expectedAnswer, insertedRow);
}
@Test
public void checkRenameKeys() throws Exception
{
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
Map<String, String> renamedDimensions = new HashMap<String, String>();
renamedDimensions.put("item1", "i1");
renamedDimensions.put("item2", "i2");
renamedDimensions.put("time", "t");
expectedAnswer.put("i1", "value1");
expectedAnswer.put("i2", 2);
expectedAnswer.put("t", 1372121562);
UpdateStream updateStream = new UpdateStream(
testCaseSupplier,
renamedDimensions,
timeDimension
);
updateStream.run();
Map<String, Object> inputRow = updateStream.pollFromQueue();
Assert.assertEquals(expectedAnswer, inputRow);
}
}

View File

@ -27,7 +27,6 @@ import com.metamx.common.parsers.TimestampParser;
import com.metamx.druid.guava.Runnables;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.emitter.EmittingLogger;
@ -36,8 +35,6 @@ import org.joda.time.DateTime;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -45,11 +42,8 @@ import java.util.concurrent.Executors;
public class WebFirehoseFactory implements FirehoseFactory
{
private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class);
private final String url;
private final String timeDimension;
private final String newTimeDimension;
private final Map<String, String> renamedDimensions;
private final String timeFormat;
private final UpdateStreamFactory updateStreamFactory;
@JsonCreator
@ -60,19 +54,15 @@ public class WebFirehoseFactory implements FirehoseFactory
@JsonProperty("timeFormat") String timeFormat
)
{
this.url = url;
this.renamedDimensions = renamedDimensions;
this.timeDimension = timeDimension;
if (renamedDimensions != null) {
newTimeDimension = renamedDimensions.get(timeDimension);
}
else {
newTimeDimension = timeDimension;
this(new UpdateStreamFactory(new WebJsonSupplier(url), renamedDimensions, timeDimension), timeFormat);
}
public WebFirehoseFactory(UpdateStreamFactory updateStreamFactory, String timeFormat)
{
this.updateStreamFactory = updateStreamFactory;
if (timeFormat == null) {
this.timeFormat = "auto";
}
else {
} else {
this.timeFormat = timeFormat;
}
}
@ -81,23 +71,25 @@ public class WebFirehoseFactory implements FirehoseFactory
public Firehose connect() throws IOException
{
final UpdateStream updateStream = new UpdateStream(
new WebJsonSupplier(url),
new DefaultObjectMapper(),
renamedDimensions,
timeDimension
);
final UpdateStream updateStream = updateStreamFactory.build();
final ExecutorService service = Executors.newSingleThreadExecutor();
service.submit(updateStream);
return new Firehose()
{
Map<String, Object> map;
private final Runnable doNothingRunnable = Runnables.getNoopRunnable();
@Override
public boolean hasMore()
{
return !service.isTerminated();
try {
map = updateStream.pollFromQueue();
return map != null;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@ -105,9 +97,8 @@ public class WebFirehoseFactory implements FirehoseFactory
public InputRow nextRow()
{
try {
Map<String, Object> map = updateStream.takeFromQueue();
DateTime date = TimestampParser.createTimestampParser(timeFormat)
.apply(map.get(newTimeDimension).toString());
.apply(map.get(updateStream.getNewTimeDimension()).toString());
return new MapBasedInputRow(
date.getMillis(),
new ArrayList(map.keySet()),

View File

@ -19,200 +19,139 @@
package druid.examples.webStream;
import com.google.common.io.InputSupplier;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.beust.jcommander.internal.Lists;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.firehose.Firehose;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
import java.util.List;
public class WebFirehoseFactoryTest
{
private final ArrayList<String> dimensions = new ArrayList<String>();
private InputSupplier testCaseSupplier;
String timeDimension = "t";
DefaultObjectMapper mapper = new DefaultObjectMapper();
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
private List<String> dimensions;
private WebFirehoseFactory webbie;
private Firehose firehose;
private InputRow inputRow;
private TestCaseSupplier testCaseSupplier;
@BeforeClass
public void setUp()
public void setUp() throws Exception
{
dimensions = new ArrayList<String>();
dimensions.add("item1");
dimensions.add("item2");
dimensions.add("time");
testCaseSupplier = new TestCaseSupplier(
"{\"item1\":\"value1\","
+ "\"item2\":2,"
+ "\"time\":1372121562 }"
);
UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, null, "time");
webbie = new WebFirehoseFactory(updateStreamFactory, "posix");
firehose = webbie.connect();
if (firehose.hasMore()) {
inputRow = firehose.nextRow();
} else {
throw new RuntimeException("queue is empty");
}
}
@Test
public void testDimensions() throws Exception
{
List<String> actualAnswer = inputRow.getDimensions();
Collections.sort(actualAnswer);
Assert.assertEquals(actualAnswer, dimensions);
}
@Test
public void testPosixTimeStamp() throws Exception
{
long expectedTime = 1372121562L * 1000L;
Assert.assertEquals(expectedTime, inputRow.getTimestampFromEpoch());
}
@Test
public void testISOTimeStamp() throws Exception
{
testCaseSupplier = new TestCaseSupplier(
"{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }"
"{\"item1\": \"value1\","
+ "\"item2\":2,"
+ "\"time\":\"2013-07-08\"}"
);
dimensions.add("g");
dimensions.add("c");
dimensions.add("a");
dimensions.add("cy");
dimensions.add("l");
dimensions.add("hh");
dimensions.add("hc");
dimensions.add("h");
dimensions.add("u");
dimensions.add("tz");
dimensions.add("t");
dimensions.add("r");
dimensions.add("gr");
dimensions.add("nk");
dimensions.add("al");
dimensions.add("ll");
expectedAnswer.put("a", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0");
expectedAnswer.put("c", "US");
expectedAnswer.put("nk", 1);
expectedAnswer.put("tz", "America/New_York");
expectedAnswer.put("gr", "NY");
expectedAnswer.put("g", "1Chgyj");
expectedAnswer.put("h", "15vMQjX");
expectedAnswer.put("l", "o_d63rn9enb");
expectedAnswer.put("al", "en-US,en;q=0.5");
expectedAnswer.put("hh", "1.usa.gov");
expectedAnswer.put(
"r",
"http://forecast.weather.gov/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX"
);
expectedAnswer.put("u", "http://www.spc.ncep.noaa.gov/");
expectedAnswer.put("t", 1372121562);
expectedAnswer.put("hc", 1368193091);
expectedAnswer.put("cy", "New York");
expectedAnswer.put("ll", Arrays.asList(40.862598, -73.921799));
UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, null, "time");
webbie = new WebFirehoseFactory(updateStreamFactory, "iso");
Firehose firehose1 = webbie.connect();
if (firehose1.hasMore()) {
long milliSeconds = firehose1.nextRow().getTimestampFromEpoch();
DateTime date = new DateTime("2013-07-08");
Assert.assertEquals(date.getMillis(), milliSeconds);
} else {
Assert.assertFalse("hasMore returned false", true);
}
@Test(expectedExceptions = UnknownHostException.class)
public void checkInvalidUrl() throws Exception
{
String invalidURL = "http://invalid.url";
WebJsonSupplier supplier = new WebJsonSupplier(invalidURL);
supplier.getInput();
}
@Test
public void basicIngestionCheck() throws Exception
public void testAutoIsoTimeStamp() throws Exception
{
UpdateStream updateStream = new UpdateStream(
testCaseSupplier,
mapper,
null,
timeDimension
);
updateStream.run();
Map<String, Object> insertedRow = updateStream.takeFromQueue();
Assert.assertEquals(expectedAnswer, insertedRow);
}
//If a timestamp is missing, we should throw away the event
@Test
public void missingTimeStampCheck()
{
InputSupplier testCaseSupplier = new TestCaseSupplier(
"{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }"
);
UpdateStream updateStream = new UpdateStream(
testCaseSupplier,
mapper,
null,
timeDimension
);
updateStream.run();
Assert.assertEquals(updateStream.getQueueSize(), 0);
}
//If any other value is missing, we should still add the event and process it properly
@Test
public void otherNullValueCheck() throws Exception
{
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
expectedAnswer.put("a", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0");
expectedAnswer.put("nk", 1);
expectedAnswer.put("tz", "America/New_York");
expectedAnswer.put("gr", "NY");
expectedAnswer.put("g", "1Chgyj");
expectedAnswer.put("h", "15vMQjX");
expectedAnswer.put("l", "o_d63rn9enb");
expectedAnswer.put("al", "en-US,en;q=0.5");
expectedAnswer.put("hh", "1.usa.gov");
expectedAnswer.put(
"r",
"http://forecast.weather.gov/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX"
);
expectedAnswer.put("u", "http://www.spc.ncep.noaa.gov/");
expectedAnswer.put("t", 1372121562);
expectedAnswer.put("hc", 1368193091);
expectedAnswer.put("cy", "New York");
expectedAnswer.put("ll", Arrays.asList(40.862598, -73.921799));
testCaseSupplier = new TestCaseSupplier(
"{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }"
"{\"item1\": \"value1\","
+ "\"item2\":2,"
+ "\"time\":\"2013-07-08\"}"
);
UpdateStream updateStream = new UpdateStream(
testCaseSupplier,
mapper,
null,
timeDimension
);
updateStream.run();
Map<String, Object> insertedRow = updateStream.takeFromQueue();
Assert.assertEquals(expectedAnswer, insertedRow);
UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, null, "time");
webbie = new WebFirehoseFactory(updateStreamFactory, null);
Firehose firehose2 = webbie.connect();
if (firehose2.hasMore()) {
long milliSeconds = firehose2.nextRow().getTimestampFromEpoch();
DateTime date = new DateTime("2013-07-08");
Assert.assertEquals(date.getMillis(), milliSeconds);
} else {
Assert.assertFalse("hasMore returned false", true);
}
}
@Test
public void checkRenameKeys() throws Exception
public void testAutoMilliSecondsTimeStamp() throws Exception
{
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
Map<String,String> renamedDimensions = new HashMap<String,String>();
renamedDimensions.put("g","bitly_hash");
renamedDimensions.put("c","country");
renamedDimensions.put("a","user");
renamedDimensions.put("cy","city");
renamedDimensions.put("l","encoding_user_login");
renamedDimensions.put("hh","short_url");
renamedDimensions.put("hc","timestamp_hash");
renamedDimensions.put("h","user_bitly_hash");
renamedDimensions.put("u","url");
renamedDimensions.put("tz","timezone");
renamedDimensions.put("t","time");
renamedDimensions.put("r","referring_url");
renamedDimensions.put("gr","geo_region");
renamedDimensions.put("nk","known_users");
renamedDimensions.put("al","accept_language");
renamedDimensions.put("ll","latitude_longitude");
expectedAnswer.put("user", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0");
expectedAnswer.put("country", "US");
expectedAnswer.put("known_users", 1);
expectedAnswer.put("timezone", "America/New_York");
expectedAnswer.put("geo_region", "NY");
expectedAnswer.put("bitly_hash", "1Chgyj");
expectedAnswer.put("user_bitly_hash", "15vMQjX");
expectedAnswer.put("encoding_user_login", "o_d63rn9enb");
expectedAnswer.put("accept_language", "en-US,en;q=0.5");
expectedAnswer.put("short_url", "1.usa.gov");
expectedAnswer.put(
"referring_url",
"http://forecast.weather.gov/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX"
testCaseSupplier = new TestCaseSupplier(
"{\"item1\": \"value1\","
+ "\"item2\":2,"
+ "\"time\":1373241600000}"
);
expectedAnswer.put("url", "http://www.spc.ncep.noaa.gov/");
expectedAnswer.put("time", 1372121562);
expectedAnswer.put("timestamp_hash", 1368193091);
expectedAnswer.put("city", "New York");
expectedAnswer.put("latitude_longitude", Arrays.asList(40.862598, -73.921799));
UpdateStream updateStream = new UpdateStream(
testCaseSupplier,
mapper,
renamedDimensions,
timeDimension
);
updateStream.run();
Map<String, Object> inputRow = updateStream.takeFromQueue();
Assert.assertEquals(expectedAnswer, inputRow);
UpdateStreamFactory updateStreamFactory = new UpdateStreamFactory(testCaseSupplier, null, "time");
webbie = new WebFirehoseFactory(updateStreamFactory, null);
Firehose firehose3 = webbie.connect();
if (firehose3.hasMore()) {
long milliSeconds = firehose3.nextRow().getTimestampFromEpoch();
DateTime date = new DateTime("2013-07-08");
Assert.assertEquals(date.getMillis(), milliSeconds);
} else {
Assert.assertFalse("hasMore returned false", true);
}
}
@Test
public void testGetDimension()
{
List<String> column1 = Lists.newArrayList();
column1.add("value1");
Assert.assertEquals(column1, inputRow.getDimension("item1"));
}
@Test
public void testGetFloatMetric()
{
Assert.assertEquals((float) 2.0, inputRow.getFloatMetric("item2"));
}
}