removed shared queue structure. Queue now encapsulated within updateStream

This commit is contained in:
Dhruv Parthasarathy 2013-07-08 14:27:35 -07:00
parent e7da31e42d
commit 01b4728c40
3 changed files with 16 additions and 25 deletions

View File

@ -30,6 +30,7 @@ import java.io.BufferedReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@ -39,21 +40,20 @@ public class UpdateStream implements Runnable
private static final long queueWaitTime = 15L;
private final TypeReference<HashMap<String, Object>> typeRef;
private final InputSupplier<BufferedReader> supplier;
private final BlockingQueue<Map<String, Object>> queue;
private final int QUEUE_SIZE=10000;
private final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
private final ObjectMapper mapper;
private final Map<String,String> renamedDimensions;
private final String timeDimension;
public UpdateStream(
InputSupplier<BufferedReader> supplier,
BlockingQueue<Map<String, Object>> queue,
ObjectMapper mapper,
Map<String,String> renamedDimensions,
String timeDimension
)
{
this.supplier = supplier;
this.queue = queue;
this.mapper = mapper;
this.typeRef = new TypeReference<HashMap<String, Object>>()
{
@ -109,4 +109,11 @@ public class UpdateStream implements Runnable
}
}
public Map<String,Object> takeFromQueue() throws InterruptedException{
return queue.take();
}
public int getQueueSize(){
return queue.size();
}
}

View File

@ -45,7 +45,6 @@ import java.util.concurrent.Executors;
public class WebFirehoseFactory implements FirehoseFactory
{
private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class);
private static final int QUEUE_SIZE = 2000;
private final String url;
private final String timeDimension;
private final String newTimeDimension;
@ -81,11 +80,9 @@ public class WebFirehoseFactory implements FirehoseFactory
@Override
public Firehose connect() throws IOException
{
final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
Runnable updateStream = new UpdateStream(
final UpdateStream updateStream = new UpdateStream(
new WebJsonSupplier(url),
queue,
new DefaultObjectMapper(),
renamedDimensions,
timeDimension
@ -108,7 +105,7 @@ public class WebFirehoseFactory implements FirehoseFactory
public InputRow nextRow()
{
try {
Map<String, Object> map = queue.take();
Map<String, Object> map = updateStream.takeFromQueue();
DateTime date = TimestampParser.createTimestampParser(timeFormat)
.apply(map.get(newTimeDimension).toString());
return new MapBasedInputRow(

View File

@ -30,16 +30,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class WebFirehoseFactoryTest
{
private final ArrayList<String> dimensions = new ArrayList<String>();
private InputSupplier testCaseSupplier;
final int QUEUE_SIZE = 2000;
BlockingQueue<Map<String, Object>> queue;
String timeDimension = "t";
DefaultObjectMapper mapper = new DefaultObjectMapper();
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
@ -102,16 +97,14 @@ public class WebFirehoseFactoryTest
@Test
public void basicIngestionCheck() throws Exception
{
queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
UpdateStream updateStream = new UpdateStream(
testCaseSupplier,
queue,
mapper,
null,
timeDimension
);
updateStream.run();
Map<String, Object> insertedRow = queue.poll(10, TimeUnit.SECONDS);
Map<String, Object> insertedRow = updateStream.takeFromQueue();
Assert.assertEquals(expectedAnswer, insertedRow);
}
@ -119,19 +112,17 @@ public class WebFirehoseFactoryTest
@Test
public void missingTimeStampCheck()
{
queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
InputSupplier testCaseSupplier = new TestCaseSupplier(
"{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }"
);
UpdateStream updateStream = new UpdateStream(
testCaseSupplier,
queue,
mapper,
null,
timeDimension
);
updateStream.run();
Assert.assertEquals(queue.size(), 0);
Assert.assertEquals(updateStream.getQueueSize(), 0);
}
//If any other value is missing, we should still add the event and process it properly
@ -157,19 +148,17 @@ public class WebFirehoseFactoryTest
expectedAnswer.put("hc", 1368193091);
expectedAnswer.put("cy", "New York");
expectedAnswer.put("ll", Arrays.asList(40.862598, -73.921799));
queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
testCaseSupplier = new TestCaseSupplier(
"{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }"
);
UpdateStream updateStream = new UpdateStream(
testCaseSupplier,
queue,
mapper,
null,
timeDimension
);
updateStream.run();
Map<String, Object> insertedRow = queue.poll(10, TimeUnit.SECONDS);
Map<String, Object> insertedRow = updateStream.takeFromQueue();
Assert.assertEquals(expectedAnswer, insertedRow);
}
@ -177,7 +166,6 @@ public class WebFirehoseFactoryTest
public void checkRenameKeys() throws Exception
{
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
Map<String,String> renamedDimensions = new HashMap<String,String>();
renamedDimensions.put("g","bitly_hash");
renamedDimensions.put("c","country");
@ -218,13 +206,12 @@ public class WebFirehoseFactoryTest
UpdateStream updateStream = new UpdateStream(
testCaseSupplier,
queue,
mapper,
renamedDimensions,
timeDimension
);
updateStream.run();
Map<String, Object> inputRow = queue.poll(10, TimeUnit.SECONDS);
Map<String, Object> inputRow = updateStream.takeFromQueue();
Assert.assertEquals(expectedAnswer, inputRow);
}