Applied codestyle and optimized imports

This commit is contained in:
Dhruv Parthasarathy 2013-06-25 14:48:02 -07:00
parent 8750ee88f2
commit 0e1857f046
5 changed files with 115 additions and 91 deletions

View File

@ -10,9 +10,12 @@ import java.util.ArrayList;
public class TestCaseSupplier implements InputSupplier<BufferedReader>
{
private final ArrayList<String> inputList = new ArrayList<String>();
public TestCaseSupplier(String s){
public TestCaseSupplier(String s)
{
inputList.add(s);
}
@Override
public BufferedReader getInput() throws IOException
{

View File

@ -19,39 +19,44 @@ public class UpdateStream implements Runnable
{
private static final EmittingLogger log = new EmittingLogger(UpdateStream.class);
private InputSupplier supplier;
private BlockingQueue<Map<String,Object>> queue;
public UpdateStream(InputSupplier supplier,BlockingQueue<Map<String,Object>> queue, String s){
this.supplier=supplier;
this.queue=queue;
private BlockingQueue<Map<String, Object>> queue;
public UpdateStream(InputSupplier supplier, BlockingQueue<Map<String, Object>> queue)
{
this.supplier = supplier;
this.queue = queue;
}
@Override
public void run() throws RuntimeException
{
try{
try {
BufferedReader reader = (BufferedReader) supplier.getInput();
String line;
ObjectMapper mapper = new ObjectMapper();
TypeReference<HashMap<String,Object>> typeRef = new TypeReference<HashMap<String,Object>> () {};
while ((line = reader.readLine())!= null){
if(!line.equals("")){
try{
HashMap<String,Object> map=mapper.readValue(line, typeRef);;
TypeReference<HashMap<String, Object>> typeRef = new TypeReference<HashMap<String, Object>>()
{
};
while ((line = reader.readLine()) != null) {
if (!line.equals("")) {
try {
HashMap<String, Object> map = mapper.readValue(line, typeRef);
;
queue.offer(map, 15L, TimeUnit.SECONDS);
log.info("Successfully added to queue");
}
catch (JsonParseException e){
catch (JsonParseException e) {
System.out.println("Invalid JSON Stream. Please check if the url returns a proper JSON stream.");
throw new RuntimeException("Invalid JSON Stream");
}
catch (Exception e){
catch (Exception e) {
System.out.println(e);
return;
}
}
}
}
catch (MalformedURLException e){
catch (MalformedURLException e) {
throw new RuntimeException("Malformed url");
}
catch (ProtocolException e) {

View File

@ -29,23 +29,25 @@ public class WebFirehoseFactory implements FirehoseFactory
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("newDimensionNames") List<String> newDimensionNames,
@JsonProperty("timeDimension") String s
)
)
{
this.url=url;
this.dimensions=dimensions;
this.url = url;
this.dimensions = dimensions;
this.timeDimension = s;
this.newDimensionNames=newDimensionNames;
this.newDimensionNames = newDimensionNames;
}
@Override
public Firehose connect() throws IOException
{
final int QUEUE_SIZE=2000;
final BlockingQueue<Map<String,Object>> queue= new ArrayBlockingQueue<Map<String,Object>>(QUEUE_SIZE);
final int QUEUE_SIZE = 2000;
final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
Runnable updateStream = new UpdateStream(new WebJsonSupplier(dimensions,url),queue,timeDimension);
Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread th, Throwable ex) {
Runnable updateStream = new UpdateStream(new WebJsonSupplier(dimensions, url), queue);
Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler()
{
public void uncaughtException(Thread th, Throwable ex)
{
System.out.println("Uncaught exception: " + ex);
}
};
@ -53,19 +55,22 @@ public class WebFirehoseFactory implements FirehoseFactory
t.setUncaughtExceptionHandler(h);
t.start();
return new Firehose() {
private final Runnable doNothingRunnable = new Runnable() {
public void run(){
return new Firehose()
{
private final Runnable doNothingRunnable = new Runnable()
{
public void run()
{
}
};
@Override
public boolean hasMore(){
if (t.isAlive()){
public boolean hasMore()
{
if (t.isAlive()) {
return true;
}
else{
} else {
return false;
}
}
@ -77,50 +82,54 @@ public class WebFirehoseFactory implements FirehoseFactory
if (Thread.currentThread().isInterrupted()) {
throw new RuntimeException("Interrupted, time to stop");
}
Map<String,Object> update;
try{
update=queue.take();
Map<String, Object> update;
try {
update = queue.take();
}
catch (InterruptedException e) {
throw new RuntimeException("InterrutpedException", e);
}
Map<String,Object> processedMap = processMap(update);
return new MapBasedInputRow(((Integer) processedMap.get(timeDimension)).longValue()*1000,newDimensionNames,processedMap);
Map<String, Object> processedMap = processMap(update);
return new MapBasedInputRow(
((Integer) processedMap.get(timeDimension)).longValue() * 1000,
newDimensionNames,
processedMap
);
}
private Map<String,Object> renameKeys (Map <String,Object> update){
Map<String,Object> renamedMap = new HashMap<String,Object>();
int iter=0;
while (iter<dimensions.size()){
private Map<String, Object> renameKeys(Map<String, Object> update)
{
Map<String, Object> renamedMap = new HashMap<String, Object>();
int iter = 0;
while (iter < dimensions.size()) {
Object obj = update.get(dimensions.get(iter));
renamedMap.put(newDimensionNames.get(iter),obj);
renamedMap.put(newDimensionNames.get(iter), obj);
iter++;
}
return renamedMap;
}
private void processNullDimensions(Map<String,Object> map)
private void processNullDimensions(Map<String, Object> map)
{
for (String key:newDimensionNames){
if (map.get(key)==null){
if (key.equals(timeDimension)){
map.put(key,new Integer((int) System.currentTimeMillis()/1000));
}
else{
map.put(key, null);
for (String key : newDimensionNames) {
if (map.get(key) == null) {
if (key.equals(timeDimension)) {
map.put(key, new Integer((int) System.currentTimeMillis() / 1000));
} else {
map.put(key, null);
}
}
}
}
private Map<String,Object> processMap(Map<String,Object> map){
Map<String,Object> renamedMap = renameKeys(map);
private Map<String, Object> processMap(Map<String, Object> map)
{
Map<String, Object> renamedMap = renameKeys(map);
processNullDimensions(renamedMap);
return renamedMap;
}
@Override
public Runnable commit()
{

View File

@ -39,7 +39,7 @@ public class WebFirehoseFactoryTest
dimensions.add("al");
String invalidURL = "http://invalid.url";
FirehoseFactory test = new WebFirehoseFactory(invalidURL,dimensions,dimensions,"t");
FirehoseFactory test = new WebFirehoseFactory(invalidURL, dimensions, dimensions, "t");
Firehose returnedFirehose = test.connect();
Thread.sleep(3000);
assert returnedFirehose.hasMore() == false;
@ -67,10 +67,10 @@ public class WebFirehoseFactoryTest
dimensions.add("al");
String nonJsonUrl = "http://google.com";
FirehoseFactory test = new WebFirehoseFactory(nonJsonUrl,dimensions,dimensions,"t");
FirehoseFactory test = new WebFirehoseFactory(nonJsonUrl, dimensions, dimensions, "t");
Firehose returnedFirehose = test.connect();
Thread.sleep(3000);
assert returnedFirehose.hasMore()== false;
assert returnedFirehose.hasMore() == false;
}
@Test
@ -94,47 +94,49 @@ public class WebFirehoseFactoryTest
dimensions.add("al");
String url = "http://developer.usa.gov/1usagov";
FirehoseFactory test = new WebFirehoseFactory(url,dimensions,dimensions,"t");
FirehoseFactory test = new WebFirehoseFactory(url, dimensions, dimensions, "t");
Firehose returnedFirehose = test.connect();
Thread.sleep(3000);
assert returnedFirehose.hasMore()== true;
assert returnedFirehose.hasMore() == true;
}
@Test
public void basicIngestionCheck() throws Exception
{
final int QUEUE_SIZE=2000;
BlockingQueue<Map<String,Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
InputSupplier testCaseSupplier = new TestCaseSupplier("{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }");
UpdateStream updateStream = new UpdateStream (testCaseSupplier, queue,"t");
final int QUEUE_SIZE = 2000;
BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
InputSupplier testCaseSupplier = new TestCaseSupplier(
"{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"c\": \"US\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"gr\": \"NY\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }"
);
UpdateStream updateStream = new UpdateStream(testCaseSupplier, queue);
Thread t = new Thread(updateStream);
t.start();
Map<String,Object> expectedAnswer = new HashMap<String,Object>();
expectedAnswer.put("a","Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0");
expectedAnswer.put("c","US");
expectedAnswer.put("nk",1);
expectedAnswer.put ("tz", "America/New_York");
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
expectedAnswer.put("a", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0");
expectedAnswer.put("c", "US");
expectedAnswer.put("nk", 1);
expectedAnswer.put("tz", "America/New_York");
expectedAnswer.put("gr", "NY");
expectedAnswer.put("g","1Chgyj");
expectedAnswer.put("h","15vMQjX");
expectedAnswer.put("l","o_d63rn9enb");
expectedAnswer.put("al","en-US,en;q=0.5");
expectedAnswer.put("hh","1.usa.gov");
expectedAnswer.put("r","http://forecast.weather.gov/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX");
expectedAnswer.put("u","http://www.spc.ncep.noaa.gov/");
expectedAnswer.put("t",1372121562);
expectedAnswer.put("hc",1368193091);
expectedAnswer.put("cy","New York");
expectedAnswer.put("g", "1Chgyj");
expectedAnswer.put("h", "15vMQjX");
expectedAnswer.put("l", "o_d63rn9enb");
expectedAnswer.put("al", "en-US,en;q=0.5");
expectedAnswer.put("hh", "1.usa.gov");
expectedAnswer.put(
"r",
"http://forecast.weather.gov/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX"
);
expectedAnswer.put("u", "http://www.spc.ncep.noaa.gov/");
expectedAnswer.put("t", 1372121562);
expectedAnswer.put("hc", 1368193091);
expectedAnswer.put("cy", "New York");
expectedAnswer.put("ll", Arrays.asList(40.862598, -73.921799));
Map<String,Object> insertedRow=queue.poll(10, TimeUnit.SECONDS);
Map<String, Object> insertedRow = queue.poll(10, TimeUnit.SECONDS);
assert expectedAnswer.equals(insertedRow);
}
@Test
public void renameDimensionsCheck() throws Exception
{
@ -155,12 +157,19 @@ public class WebFirehoseFactoryTest
dimensions.add("known_users");
dimensions.add("accept_language");
WebFirehoseFactory webbie = new WebFirehoseFactory("http://developer.usa.gov/1usagov",dimensions,dimensions,"time");
WebFirehoseFactory webbie = new WebFirehoseFactory(
"http://developer.usa.gov/1usagov",
dimensions,
dimensions,
"time"
);
Firehose webbieHose = webbie.connect();
final int QUEUE_SIZE=2000;
BlockingQueue<Map<String,Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
InputSupplier testCaseSupplier = new TestCaseSupplier("{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"kw\": \"spcnws\", \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }");
UpdateStream updateStream = new UpdateStream (testCaseSupplier, queue,"time");
final int QUEUE_SIZE = 2000;
BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
InputSupplier testCaseSupplier = new TestCaseSupplier(
"{ \"a\": \"Mozilla\\/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko\\/20100101 Firefox\\/21.0\", \"nk\": 1, \"tz\": \"America\\/New_York\", \"g\": \"1Chgyj\", \"h\": \"15vMQjX\", \"l\": \"o_d63rn9enb\", \"al\": \"en-US,en;q=0.5\", \"hh\": \"1.usa.gov\", \"r\": \"http:\\/\\/forecast.weather.gov\\/MapClick.php?site=okx&FcstType=text&zmx=1&zmy=1&map.x=98&map.y=200&site=OKX\", \"u\": \"http:\\/\\/www.spc.ncep.noaa.gov\\/\", \"t\": 1372121562, \"hc\": 1368193091, \"kw\": \"spcnws\", \"cy\": \"New York\", \"ll\": [ 40.862598, -73.921799 ] }"
);
UpdateStream updateStream = new UpdateStream(testCaseSupplier, queue);
Thread t = new Thread(updateStream);
t.start();
InputRow row = webbieHose.nextRow();

View File

@ -1,7 +1,6 @@
package druid.examples.webStream;
import com.google.common.io.InputSupplier;
import com.metamx.common.Pair;
import java.io.BufferedReader;
import java.io.IOException;
@ -9,19 +8,18 @@ import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
public class WebJsonSupplier implements InputSupplier<BufferedReader>
{
private BlockingQueue<Pair<Map<String,Object>,Long>> queue;
private List<String> dimensions;
private String urlString;
public WebJsonSupplier(List<String> dimensions, String urlString){
this.dimensions=dimensions;
this.urlString=urlString;
public WebJsonSupplier(List<String> dimensions, String urlString)
{
this.dimensions = dimensions;
this.urlString = urlString;
}
@Override
public BufferedReader getInput() throws IOException
{