now take a map for renaming

This commit is contained in:
Dhruv Parthasarathy 2013-07-08 13:35:22 -07:00
parent c8c686c738
commit 439e8ca4ad
4 changed files with 60 additions and 56 deletions

View File

@ -41,16 +41,14 @@ public class UpdateStream implements Runnable
private final InputSupplier<BufferedReader> supplier;
private final BlockingQueue<Map<String, Object>> queue;
private final ObjectMapper mapper;
private final ArrayList<String> dimensions;
private final ArrayList<String> renamedDimensions;
private final Map<String,String> renamedDimensions;
private final String timeDimension;
public UpdateStream(
InputSupplier<BufferedReader> supplier,
BlockingQueue<Map<String, Object>> queue,
ObjectMapper mapper,
ArrayList<String> dimensions,
ArrayList<String> renamedDimensions,
Map<String,String> renamedDimensions,
String timeDimension
)
{
@ -61,7 +59,6 @@ public class UpdateStream implements Runnable
{
};
this.timeDimension = timeDimension;
this.dimensions = dimensions;
this.renamedDimensions = renamedDimensions;
}
@ -84,7 +81,7 @@ public class UpdateStream implements Runnable
queue.offer(renamedMap, queueWaitTime, TimeUnit.SECONDS);
log.debug("Successfully added to queue");
} else {
log.debug("missing timestamp");
log.error("missing timestamp");
}
}
}
@ -97,14 +94,19 @@ public class UpdateStream implements Runnable
private Map<String, Object> renameKeys(Map<String, Object> update)
{
if (renamedDimensions!=null){
Map<String, Object> renamedMap = Maps.newHashMap();
for (int iter = 0; iter < dimensions.size(); iter++) {
if (update.get(dimensions.get(iter)) != null) {
Object obj = update.get(dimensions.get(iter));
renamedMap.put(renamedDimensions.get(iter), obj);
for (String key : renamedDimensions.keySet()) {
if(update.get(key)!=null){
Object obj= update.get(key);
renamedMap.put(renamedDimensions.get(key),obj);
}
}
return renamedMap;
}
else{
return update;
}
}
}

View File

@ -35,11 +35,13 @@ import org.joda.time.DateTime;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@JsonTypeName("webstream")
public class WebFirehoseFactory implements FirehoseFactory
@ -47,27 +49,37 @@ public class WebFirehoseFactory implements FirehoseFactory
private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class);
private static final int QUEUE_SIZE = 2000;
private final String url;
private final ArrayList<String> dimensions;
private final String timeDimension;
private final ArrayList<String> renamedDimensions;
private final String newTimeDimension;
private final Map<String,String> renamedDimensions;
private final String timeFormat;
private final long waitTime = 15L;
@JsonCreator
public WebFirehoseFactory(
@JsonProperty("url") String url,
@JsonProperty("dimensions") ArrayList<String> dimensions,
@JsonProperty("renamedDimensions") ArrayList<String> renamedDimensions,
@JsonProperty("renamedDimensions") Map<String,String> renamedDimensions,
@JsonProperty("timeDimension") String timeDimension,
@JsonProperty("timeFormat") String timeFormat
)
{
this.url = url;
this.dimensions = dimensions;
this.renamedDimensions = renamedDimensions;
this.timeDimension = timeDimension;
if (renamedDimensions!=null){
newTimeDimension=renamedDimensions.get(timeDimension);
}
else{
newTimeDimension=timeDimension;
}
if (timeFormat==null){
this.timeFormat="auto";
}
else{
this.timeFormat = timeFormat;
}
}
@Override
public Firehose connect() throws IOException
@ -78,7 +90,6 @@ public class WebFirehoseFactory implements FirehoseFactory
new WebJsonSupplier(url),
queue,
new DefaultObjectMapper(),
dimensions,
renamedDimensions,
timeDimension
);
@ -92,7 +103,7 @@ public class WebFirehoseFactory implements FirehoseFactory
@Override
public boolean hasMore()
{
return !(service.isTerminated()) && queue.size() > 0;
return !service.isTerminated();
}
@ -102,11 +113,10 @@ public class WebFirehoseFactory implements FirehoseFactory
try {
Map<String, Object> map = queue.take();
DateTime date = TimestampParser.createTimestampParser(timeFormat)
.apply(map.get(timeDimension).toString());
long seconds = (long) date.getMillis();
.apply(map.get(newTimeDimension).toString());
return new MapBasedInputRow(
seconds,
renamedDimensions,
date.getMillis(),
new ArrayList(map.keySet()),
map
);
}

View File

@ -107,12 +107,10 @@ public class WebFirehoseFactoryTest
testCaseSupplier,
queue,
mapper,
dimensions,
dimensions,
null,
timeDimension
);
Thread t = new Thread(updateStream);
t.start();
updateStream.run();
Map<String, Object> insertedRow = queue.poll(10, TimeUnit.SECONDS);
Assert.assertEquals(expectedAnswer, insertedRow);
}
@ -129,8 +127,7 @@ public class WebFirehoseFactoryTest
testCaseSupplier,
queue,
mapper,
dimensions,
dimensions,
null,
timeDimension
);
updateStream.run();
@ -168,12 +165,10 @@ public class WebFirehoseFactoryTest
testCaseSupplier,
queue,
mapper,
dimensions,
dimensions,
null,
timeDimension
);
Thread t = new Thread(updateStream);
t.start();
updateStream.run();
Map<String, Object> insertedRow = queue.poll(10, TimeUnit.SECONDS);
Assert.assertEquals(expectedAnswer, insertedRow);
}
@ -183,23 +178,23 @@ public class WebFirehoseFactoryTest
{
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
ArrayList<String> renamedDimensions = new ArrayList<String>();
renamedDimensions.add("bitly_hash");
renamedDimensions.add("country");
renamedDimensions.add("user");
renamedDimensions.add("city");
renamedDimensions.add("encoding_user_login");
renamedDimensions.add("short_url");
renamedDimensions.add("timestamp_hash");
renamedDimensions.add("user_bitly_hash");
renamedDimensions.add("url");
renamedDimensions.add("timezone");
renamedDimensions.add("time");
renamedDimensions.add("referring_url");
renamedDimensions.add("geo_region");
renamedDimensions.add("known_users");
renamedDimensions.add("accept_language");
renamedDimensions.add("latitude_longitude");
Map<String,String> renamedDimensions = new HashMap<String,String>();
renamedDimensions.put("g","bitly_hash");
renamedDimensions.put("c","country");
renamedDimensions.put("a","user");
renamedDimensions.put("cy","city");
renamedDimensions.put("l","encoding_user_login");
renamedDimensions.put("hh","short_url");
renamedDimensions.put("hc","timestamp_hash");
renamedDimensions.put("h","user_bitly_hash");
renamedDimensions.put("u","url");
renamedDimensions.put("tz","timezone");
renamedDimensions.put("t","time");
renamedDimensions.put("r","referring_url");
renamedDimensions.put("gr","geo_region");
renamedDimensions.put("nk","known_users");
renamedDimensions.put("al","accept_language");
renamedDimensions.put("ll","latitude_longitude");
expectedAnswer.put("user", "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0");
expectedAnswer.put("country", "US");
@ -225,7 +220,6 @@ public class WebFirehoseFactoryTest
testCaseSupplier,
queue,
mapper,
dimensions,
renamedDimensions,
timeDimension
);

View File

@ -44,8 +44,7 @@ public class WebJsonSupplier implements InputSupplier<BufferedReader>
this.url = new URL(urlString);
}
catch (Exception e) {
e.printStackTrace();
log.info("Malformed url");
log.error(e,"Malformed url");
}
}
@ -55,7 +54,6 @@ public class WebJsonSupplier implements InputSupplier<BufferedReader>
URL url = new URL(urlString);
URLConnection connection = url.openConnection();
connection.setDoInput(true);
BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream()));
return reader;
return new BufferedReader(new InputStreamReader(url.openStream()));
}
}