Twitter River: Support filter stream, closes #416.

This commit is contained in:
kimchy 2010-10-09 00:39:10 +02:00
parent ee2fabb9dd
commit 8b03b914f9
4 changed files with 103 additions and 5 deletions

View File

@ -45,6 +45,7 @@
<w>encodable</w>
<w>estab</w>
<w>failover</w>
<w>firehose</w>
<w>flushable</w>
<w>formatter</w>
<w>formatters</w>

View File

@ -15,7 +15,7 @@
<orderEntry type="module-library">
<library name="twitter4j">
<CLASSES>
<root url="jar://$GRADLE_REPOSITORY$/org.twitter4j/twitter4j-core/jars/twitter4j-core-2.1.4.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.twitter4j/twitter4j-core/jars/twitter4j-core-2.1.6.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES />

View File

@ -32,8 +32,8 @@ configurations {
dependencies {
compile project(':elasticsearch')
compile('org.twitter4j:twitter4j-core:2.1.4') { transitive = false }
distLib('org.twitter4j:twitter4j-core:2.1.4') { transitive = false }
compile('org.twitter4j:twitter4j-core:2.1.6') { transitive = false }
distLib('org.twitter4j:twitter4j-core:2.1.6') { transitive = false }
testCompile project(':test-testng')

View File

@ -26,6 +26,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -38,6 +39,7 @@ import org.elasticsearch.river.RiverSettings;
import twitter4j.*;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@ -56,6 +58,10 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
private final int dropThreshold;
private FilterQuery filterQuery;
private String streamType;
private final TwitterStream stream;
@ -73,6 +79,84 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
Map<String, Object> twitterSettings = (Map<String, Object>) settings.settings().get("twitter");
user = XContentMapValues.nodeStringValue(twitterSettings.get("user"), null);
password = XContentMapValues.nodeStringValue(twitterSettings.get("password"), null);
streamType = XContentMapValues.nodeStringValue(twitterSettings.get("type"), "sample");
Map<String, Object> filterSettings = (Map<String, Object>) twitterSettings.get("filter");
if (filterSettings != null) {
filterQuery = new FilterQuery();
filterQuery.count(XContentMapValues.nodeIntegerValue(filterSettings.get("count"), 0));
Object tracks = filterSettings.get("tracks");
if (tracks != null) {
if (tracks instanceof List) {
List<String> lTracks = (List<String>) tracks;
filterQuery.track(lTracks.toArray(new String[lTracks.size()]));
} else {
filterQuery.track(Strings.commaDelimitedListToStringArray(tracks.toString()));
}
}
Object follow = filterSettings.get("follow");
if (follow != null) {
if (follow instanceof List) {
List lFollow = (List) follow;
int[] followIds = new int[lFollow.size()];
for (int i = 0; i < lFollow.size(); i++) {
Object o = lFollow.get(i);
if (o instanceof Number) {
followIds[i] = ((Number) o).intValue();
} else {
followIds[i] = Integer.parseInt(o.toString());
}
}
filterQuery.follow(followIds);
} else {
String[] ids = Strings.commaDelimitedListToStringArray(follow.toString());
int[] followIds = new int[ids.length];
for (int i = 0; i < ids.length; i++) {
followIds[i] = Integer.parseInt(ids[i]);
}
}
}
Object locations = filterSettings.get("locations");
if (locations != null) {
if (locations instanceof List) {
List lLocations = (List) locations;
double[][] dLocations = new double[lLocations.size()][];
for (int i = 0; i < lLocations.size(); i++) {
Object loc = lLocations.get(i);
double lat;
double lon;
if (loc instanceof List) {
List lLoc = (List) loc;
if (lLoc.get(0) instanceof Number) {
lat = ((Number) lLoc.get(0)).doubleValue();
} else {
lat = Double.parseDouble(lLoc.get(0).toString());
}
if (lLoc.get(1) instanceof Number) {
lon = ((Number) lLoc.get(1)).doubleValue();
} else {
lon = Double.parseDouble(lLoc.get(1).toString());
}
} else {
String[] sLoc = Strings.commaDelimitedListToStringArray(loc.toString());
lat = Double.parseDouble(sLoc[0]);
lon = Double.parseDouble(sLoc[1]);
}
dLocations[i] = new double[]{lat, lon};
}
filterQuery.locations(dLocations);
} else {
String[] sLocations = Strings.commaDelimitedListToStringArray(locations.toString());
double[][] dLocations = new double[sLocations.length / 2][];
int dCounter = 0;
for (int i = 0; i < sLocations.length; i++) {
double lat = Double.parseDouble(sLocations[i]);
double lon = Double.parseDouble(sLocations[++i]);
dLocations[dCounter++] = new double[]{lat, lon};
}
filterQuery.locations(dLocations);
}
}
}
}
logger.info("creating twitter stream river for [{}]", user);
@ -80,7 +164,7 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
if (user == null || password == null) {
stream = null;
indexName = null;
typeName = null;
typeName = "status";
bulkSize = 100;
dropThreshold = 10;
logger.warn("no user / password specified, disabling river...");
@ -104,6 +188,9 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
}
@Override public void start() {
if (stream == null) {
return;
}
logger.info("starting twitter stream");
try {
String mapping = XContentFactory.jsonBuilder().startObject().startObject(typeName)
@ -122,8 +209,18 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
}
}
currentRequest = client.prepareBulk();
if (streamType.equals("filter") || filterQuery != null) {
try {
stream.filter(filterQuery);
} catch (TwitterException e) {
logger.warn("failed to create filter stream based on query, disabling river....");
}
} else if (streamType.equals("firehose")) {
stream.firehose(0);
} else {
stream.sample();
}
}
@Override public void close() {
logger.info("closing twitter stream river");