added oauth support

This commit is contained in:
unknown 2011-03-08 10:07:09 -05:00 committed by kimchy
parent ad0d681b6d
commit d4e89faf6b

@ -39,6 +39,7 @@ import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.threadpool.ThreadPool;
import twitter4j.*;
import twitter4j.conf.ConfigurationBuilder;
import java.util.List;
import java.util.Map;
@ -86,10 +87,27 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
String user = null;
String password = null;
String oauthConsumerKey = null;
String oauthConsumerSecret = null;
String oauthAccessToken = null;
String oauthAccessTokenSecret = null;
if (settings.settings().containsKey("twitter")) {
Map<String, Object> twitterSettings = (Map<String, Object>) settings.settings().get("twitter");
user = XContentMapValues.nodeStringValue(twitterSettings.get("user"), null);
password = XContentMapValues.nodeStringValue(twitterSettings.get("password"), null);
if(twitterSettings.containsKey("oauthConsumerKey")){
oauthConsumerKey = XContentMapValues.nodeStringValue(twitterSettings.get("oauthConsumerKey"), null);
}
if(twitterSettings.containsKey("oauthConsumerSecret")){
oauthConsumerSecret = XContentMapValues.nodeStringValue(twitterSettings.get("oauthConsumerSecret"), null);
}
if(twitterSettings.containsKey("oauthAccessToken")){
oauthAccessToken = XContentMapValues.nodeStringValue(twitterSettings.get("oauthAccessToken"), null);
}
if(twitterSettings.containsKey("oauthAccessTokenSecret")){
oauthAccessTokenSecret = XContentMapValues.nodeStringValue(twitterSettings.get("oauthAccessTokenSecret"), null);
}
streamType = XContentMapValues.nodeStringValue(twitterSettings.get("type"), "sample");
Map<String, Object> filterSettings = (Map<String, Object>) twitterSettings.get("filter");
if (filterSettings != null) {
@ -197,7 +215,17 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
dropThreshold = 10;
}
stream = new TwitterStreamFactory().getInstance(user, password);
if (oauthAccessToken != null && oauthConsumerKey != null && oauthConsumerSecret != null && oauthAccessTokenSecret != null) {
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(oauthConsumerKey)
.setOAuthConsumerSecret(oauthConsumerSecret)
.setOAuthAccessToken(oauthAccessToken)
.setOAuthAccessTokenSecret(oauthAccessTokenSecret);
stream = new TwitterStreamFactory(cb.build()).getInstance();
} else {
stream = new TwitterStreamFactory().getInstance(user,password);
}
stream.addListener(new StatusHandler());
}