diff --git a/plugins/river/twitter/src/main/java/org/elasticsearch/river/twitter/TwitterRiver.java b/plugins/river/twitter/src/main/java/org/elasticsearch/river/twitter/TwitterRiver.java index 11befd87c03..97bac6bc75c 100644 --- a/plugins/river/twitter/src/main/java/org/elasticsearch/river/twitter/TwitterRiver.java +++ b/plugins/river/twitter/src/main/java/org/elasticsearch/river/twitter/TwitterRiver.java @@ -28,6 +28,7 @@ import org.elasticsearch.client.action.bulk.BulkRequestBuilder; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -36,6 +37,7 @@ import org.elasticsearch.river.AbstractRiverComponent; import org.elasticsearch.river.River; import org.elasticsearch.river.RiverName; import org.elasticsearch.river.RiverSettings; +import org.elasticsearch.threadpool.ThreadPool; import twitter4j.*; import java.util.List; @@ -47,8 +49,14 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class TwitterRiver extends AbstractRiverComponent implements River { + private final ThreadPool threadPool; + private final Client client; + private final String user; + + private final String password; + private final String indexName; private final String typeName; @@ -62,16 +70,19 @@ public class TwitterRiver extends AbstractRiverComponent implements River { private String streamType; - private final TwitterStream stream; + private volatile TwitterStream stream; private final AtomicInteger onGoingBulks = new AtomicInteger(); private volatile BulkRequestBuilder currentRequest; + private volatile boolean closed = false; + @SuppressWarnings({"unchecked"}) - @Inject public TwitterRiver(RiverName riverName, RiverSettings settings, Client client) { + @Inject public TwitterRiver(RiverName riverName, RiverSettings settings, Client client, ThreadPool threadPool) { super(riverName, settings); this.client = client; + this.threadPool = threadPool; String user = null; String password = null; @@ -161,6 +172,8 @@ public class TwitterRiver extends AbstractRiverComponent implements River { logger.info("creating twitter stream river for [{}]", user); + this.user = user; + this.password = password; if (user == null || password == null) { stream = null; indexName = null; @@ -226,7 +239,56 @@ public class TwitterRiver extends AbstractRiverComponent implements River { } } + private void reconnect() { + if (closed) { + return; + } + try { + stream.cleanUp(); + } catch (Exception e) { + logger.debug("failed to cleanup after failure", e); + } + try { + stream.shutdown(); + } catch (Exception e) { + logger.debug("failed to shutdown after failure", e); + } + if (closed) { + return; + } + + try { + stream = new TwitterStreamFactory().getInstance(user, password); + stream.addListener(new StatusHandler()); + + if (streamType.equals("filter") || filterQuery != null) { + try { + stream.filter(filterQuery); + } catch (TwitterException e) { + logger.warn("failed to create filter stream based on query, disabling river...."); + } + } else if (streamType.equals("firehose")) { + stream.firehose(0); + } else { + stream.sample(); + } + } catch (Exception e) { + if (closed) { + close(); + return; + } + // TODO, we can update the status of the river to RECONNECT + logger.warn("failed to connect after failure, throttling", e); + threadPool.schedule(TimeValue.timeValueSeconds(10), ThreadPool.Names.CACHED, new Runnable() { + @Override public void run() { + reconnect(); + } + }); + } + } + @Override public void close() { + this.closed = true; logger.info("closing twitter stream river"); if (stream != null) { stream.cleanUp(); @@ -362,10 +424,16 @@ public class TwitterRiver extends AbstractRiverComponent implements River { } @Override public void onTrackLimitationNotice(int numberOfLimitedStatuses) { + logger.info("received track limitation notice, number_of_limited_statuses {}", numberOfLimitedStatuses); } @Override public void onException(Exception ex) { - logger.warn("stream failure", ex); + logger.warn("stream failure, restarting stream...", ex); + threadPool.cached().execute(new Runnable() { + @Override public void run() { + reconnect(); + } + }); } private void processBulkIfNeeded() {