Twitter River: Automatically reconnect when disconnected from twitter stream, closes #735.

This commit is contained in:
kimchy 2011-03-02 04:00:12 +02:00
parent bb1668c2b7
commit 578b752425
1 changed files with 71 additions and 3 deletions

View File

@ -28,6 +28,7 @@ import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.common.xcontent.support.XContentMapValues;
@ -36,6 +37,7 @@ import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River; import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverName; import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings; import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.threadpool.ThreadPool;
import twitter4j.*; import twitter4j.*;
import java.util.List; import java.util.List;
@ -47,8 +49,14 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public class TwitterRiver extends AbstractRiverComponent implements River { public class TwitterRiver extends AbstractRiverComponent implements River {
private final ThreadPool threadPool;
private final Client client; private final Client client;
private final String user;
private final String password;
private final String indexName; private final String indexName;
private final String typeName; private final String typeName;
@ -62,16 +70,19 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
private String streamType; private String streamType;
private final TwitterStream stream; private volatile TwitterStream stream;
private final AtomicInteger onGoingBulks = new AtomicInteger(); private final AtomicInteger onGoingBulks = new AtomicInteger();
private volatile BulkRequestBuilder currentRequest; private volatile BulkRequestBuilder currentRequest;
private volatile boolean closed = false;
@SuppressWarnings({"unchecked"}) @SuppressWarnings({"unchecked"})
@Inject public TwitterRiver(RiverName riverName, RiverSettings settings, Client client) { @Inject public TwitterRiver(RiverName riverName, RiverSettings settings, Client client, ThreadPool threadPool) {
super(riverName, settings); super(riverName, settings);
this.client = client; this.client = client;
this.threadPool = threadPool;
String user = null; String user = null;
String password = null; String password = null;
@ -161,6 +172,8 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
logger.info("creating twitter stream river for [{}]", user); logger.info("creating twitter stream river for [{}]", user);
this.user = user;
this.password = password;
if (user == null || password == null) { if (user == null || password == null) {
stream = null; stream = null;
indexName = null; indexName = null;
@ -226,7 +239,56 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
} }
} }
private void reconnect() {
if (closed) {
return;
}
try {
stream.cleanUp();
} catch (Exception e) {
logger.debug("failed to cleanup after failure", e);
}
try {
stream.shutdown();
} catch (Exception e) {
logger.debug("failed to shutdown after failure", e);
}
if (closed) {
return;
}
try {
stream = new TwitterStreamFactory().getInstance(user, password);
stream.addListener(new StatusHandler());
if (streamType.equals("filter") || filterQuery != null) {
try {
stream.filter(filterQuery);
} catch (TwitterException e) {
logger.warn("failed to create filter stream based on query, disabling river....");
}
} else if (streamType.equals("firehose")) {
stream.firehose(0);
} else {
stream.sample();
}
} catch (Exception e) {
if (closed) {
close();
return;
}
// TODO, we can update the status of the river to RECONNECT
logger.warn("failed to connect after failure, throttling", e);
threadPool.schedule(TimeValue.timeValueSeconds(10), ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() {
reconnect();
}
});
}
}
@Override public void close() { @Override public void close() {
this.closed = true;
logger.info("closing twitter stream river"); logger.info("closing twitter stream river");
if (stream != null) { if (stream != null) {
stream.cleanUp(); stream.cleanUp();
@ -362,10 +424,16 @@ public class TwitterRiver extends AbstractRiverComponent implements River {
} }
@Override public void onTrackLimitationNotice(int numberOfLimitedStatuses) { @Override public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
logger.info("received track limitation notice, number_of_limited_statuses {}", numberOfLimitedStatuses);
} }
@Override public void onException(Exception ex) { @Override public void onException(Exception ex) {
logger.warn("stream failure", ex); logger.warn("stream failure, restarting stream...", ex);
threadPool.cached().execute(new Runnable() {
@Override public void run() {
reconnect();
}
});
} }
private void processBulkIfNeeded() { private void processBulkIfNeeded() {