fix oauth support to also use it when reconnecting, and have the config as part of an oauth element

This commit is contained in:
kimchy 2011-03-09 10:12:12 +02:00
parent d4e89faf6b
commit c7b36f113d

@ -54,9 +54,13 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
private final Client client;
private final String user;
private String user;
private String password;
private final String password;
private String oauthConsumerKey = null;
private String oauthConsumerSecret = null;
private String oauthAccessToken = null;
private String oauthAccessTokenSecret = null;
private final String indexName;
@ -85,29 +89,37 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
this.client = client;
this.threadPool = threadPool;
String user = null;
String password = null;
String oauthConsumerKey = null;
String oauthConsumerSecret = null;
String oauthAccessToken = null;
String oauthAccessTokenSecret = null;
if (settings.settings().containsKey("twitter")) {
Map<String, Object> twitterSettings = (Map<String, Object>) settings.settings().get("twitter");
user = XContentMapValues.nodeStringValue(twitterSettings.get("user"), null);
password = XContentMapValues.nodeStringValue(twitterSettings.get("password"), null);
if(twitterSettings.containsKey("oauthConsumerKey")){
oauthConsumerKey = XContentMapValues.nodeStringValue(twitterSettings.get("oauthConsumerKey"), null);
if (twitterSettings.containsKey("oauth")) {
Map<String, Object> oauth = (Map<String, Object>) twitterSettings.get("oauth");
if (oauth.containsKey("consumerKey")) {
oauthConsumerKey = XContentMapValues.nodeStringValue(oauth.get("consumerKey"), null);
}
if (oauth.containsKey("consumer_key")) {
oauthConsumerKey = XContentMapValues.nodeStringValue(oauth.get("consumer_key"), null);
}
if (oauth.containsKey("consumerSecret")) {
oauthConsumerSecret = XContentMapValues.nodeStringValue(oauth.get("consumerSecret"), null);
}
if (oauth.containsKey("consumer_secret")) {
oauthConsumerSecret = XContentMapValues.nodeStringValue(oauth.get("consumer_secret"), null);
}
if (oauth.containsKey("accessToken")) {
oauthAccessToken = XContentMapValues.nodeStringValue(oauth.get("accessToken"), null);
}
if (oauth.containsKey("access_token")) {
oauthAccessToken = XContentMapValues.nodeStringValue(oauth.get("access_token"), null);
}
if (oauth.containsKey("accessTokenSecret")) {
oauthAccessTokenSecret = XContentMapValues.nodeStringValue(oauth.get("accessTokenSecret"), null);
}
if (oauth.containsKey("access_token_secret")) {
oauthAccessTokenSecret = XContentMapValues.nodeStringValue(oauth.get("access_token_secret"), null);
}
}
if(twitterSettings.containsKey("oauthConsumerSecret")){
oauthConsumerSecret = XContentMapValues.nodeStringValue(twitterSettings.get("oauthConsumerSecret"), null);
}
if(twitterSettings.containsKey("oauthAccessToken")){
oauthAccessToken = XContentMapValues.nodeStringValue(twitterSettings.get("oauthAccessToken"), null);
}
if(twitterSettings.containsKey("oauthAccessTokenSecret")){
oauthAccessTokenSecret = XContentMapValues.nodeStringValue(twitterSettings.get("oauthAccessTokenSecret"), null);
}
streamType = XContentMapValues.nodeStringValue(twitterSettings.get("type"), "sample");
Map<String, Object> filterSettings = (Map<String, Object>) twitterSettings.get("filter");
if (filterSettings != null) {
@ -142,6 +154,7 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
for (int i = 0; i < ids.length; i++) {
followIds[i] = Integer.parseInt(ids[i]);
}
filterQuery.follow(followIds);
}
}
Object locations = filterSettings.get("locations");
@ -190,15 +203,13 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
logger.info("creating twitter stream river for [{}]", user);
this.user = user;
this.password = password;
if (user == null || password == null) {
if (user == null && password == null && oauthAccessToken == null && oauthConsumerKey == null && oauthConsumerSecret == null && oauthAccessTokenSecret == null) {
stream = null;
indexName = null;
typeName = "status";
bulkSize = 100;
dropThreshold = 10;
logger.warn("no user / password specified, disabling river...");
logger.warn("no user/password or oauth specified, disabling river...");
return;
}
@ -215,17 +226,16 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
dropThreshold = 10;
}
ConfigurationBuilder cb = new ConfigurationBuilder();
if (oauthAccessToken != null && oauthConsumerKey != null && oauthConsumerSecret != null && oauthAccessTokenSecret != null) {
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(oauthConsumerKey)
cb.setOAuthConsumerKey(oauthConsumerKey)
.setOAuthConsumerSecret(oauthConsumerSecret)
.setOAuthAccessToken(oauthAccessToken)
.setOAuthAccessTokenSecret(oauthAccessTokenSecret);
stream = new TwitterStreamFactory(cb.build()).getInstance();
} else {
stream = new TwitterStreamFactory().getInstance(user,password);
cb.setUser(user).setPassword(password);
}
stream = new TwitterStreamFactory(cb.build()).getInstance();
stream.addListener(new StatusHandler());
}
@ -286,7 +296,16 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
}
try {
stream = new TwitterStreamFactory().getInstance(user, password);
ConfigurationBuilder cb = new ConfigurationBuilder();
if (oauthAccessToken != null && oauthConsumerKey != null && oauthConsumerSecret != null && oauthAccessTokenSecret != null) {
cb.setOAuthConsumerKey(oauthConsumerKey)
.setOAuthConsumerSecret(oauthConsumerSecret)
.setOAuthAccessToken(oauthAccessToken)
.setOAuthAccessTokenSecret(oauthAccessTokenSecret);
} else {
cb.setUser(user).setPassword(password);
}
stream = new TwitterStreamFactory(cb.build()).getInstance();
stream.addListener(new StatusHandler());
if (streamType.equals("filter") || filterQuery != null) {