diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java index c3890c56470..0f05ec012cf 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java @@ -39,11 +39,12 @@ import java.io.IOException; import java.util.List; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** *

Example code:

*
{@code
- *
+ * 

* IrcFirehoseFactory factory = new IrcFirehoseFactory( * "wiki123", * "irc.wikimedia.org", @@ -63,6 +64,7 @@ public class IrcFirehoseFactory implements FirehoseFactory private final String nick; private final String host; private final List channels; + private volatile boolean closed = false; @JsonCreator public IrcFirehoseFactory( @@ -100,17 +102,21 @@ public class IrcFirehoseFactory implements FirehoseFactory final IRCApi irc = new IRCApiImpl(false); final LinkedBlockingQueue> queue = new LinkedBlockingQueue>(); - irc.addListener(new VariousMessageListenerAdapter() { - @Override - public void onChannelMessage(ChannelPrivMsg aMsg) - { - try { - queue.put(Pair.of(DateTime.now(), aMsg)); - } catch(InterruptedException e) { - throw new RuntimeException("interrupted adding message to queue", e); + irc.addListener( + new VariousMessageListenerAdapter() + { + @Override + public void onChannelMessage(ChannelPrivMsg aMsg) + { + try { + queue.put(Pair.of(DateTime.now(), aMsg)); + } + catch (InterruptedException e) { + throw new RuntimeException("interrupted adding message to queue", e); + } + } } - } - }); + ); log.info("connecting to irc server [%s]", host); irc.connect( @@ -152,7 +158,7 @@ public class IrcFirehoseFactory implements FirehoseFactory public void onSuccess(IIRCState aObject) { log.info("irc connection to server [%s] established", host); - for(String chan : channels) { + for (String chan : channels) { log.info("Joining channel %s", chan); irc.joinChannel(chan); } @@ -164,8 +170,10 @@ public class IrcFirehoseFactory implements FirehoseFactory log.error(e, "Unable to connect to irc server [%s]", host); throw new RuntimeException("Unable to connect to server", e); } - }); + } + ); + closed = false; return new Firehose() { @@ -175,18 +183,26 @@ public class IrcFirehoseFactory implements FirehoseFactory public boolean hasMore() { try { - while(true) { - Pair nextMsg = queue.take(); + while (true) { + Pair nextMsg = queue.poll(100, TimeUnit.MILLISECONDS); + if (closed) { + return false; + } + if (nextMsg == null) { + continue; + } try { nextRow = firehoseParser.parse(nextMsg); - if(nextRow != null) return true; + if (nextRow != null) { + return true; + } } catch (IllegalArgumentException iae) { log.debug("ignoring invalid message in channel [%s]", nextMsg.rhs.getChannelName()); } } } - catch(InterruptedException e) { + catch (InterruptedException e) { Thread.interrupted(); throw new RuntimeException("interrupted retrieving elements from queue", e); } @@ -214,8 +230,13 @@ public class IrcFirehoseFactory implements FirehoseFactory @Override public void close() throws IOException { - log.info("disconnecting from irc server [%s]", host); - irc.disconnect(""); + try { + log.info("disconnecting from irc server [%s]", host); + irc.disconnect(""); + } + finally { + closed = true; + } } }; }