Changes according to Eric's comments on the pull request.

- moved the RabbitMQ ConnectionFactory configuration to a method of its own to clean up the FirehoseFactory.connect() method.
- using Throwables.propagate*() methods in exception handling.
- removed TODOs and instead using the "keyword" FUTURE as well as adding some context to the comments themselves.
- cleaned up the exception handling in hasMore() a little and made it more readable.
This commit is contained in:
Stefán Freyr Stefánsson 2013-07-18 19:30:14 +00:00
parent 217d539e46
commit da658ac69a

View File

@ -2,6 +2,7 @@ package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow;
@ -75,10 +76,8 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
this.parser = parser;
}
@Override
public Firehose connect() throws IOException
private void primeConnectionFactory(ConnectionFactory factory) throws IOException
{
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost(consumerProps.getProperty("host", factory.getHost()));
factory.setPort(Integer.parseInt(consumerProps.getProperty("port", Integer.toString(factory.getPort()))));
factory.setUsername(consumerProps.getProperty("username", factory.getUsername()));
@ -91,10 +90,17 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
factory.setUri(consumerProps.getProperty("uri"));
}
catch(Exception e){
// A little silly to throw an IOException but we'll make do for now with it.
throw new IOException("Bad URI format.", e);
Throwables.propagateIfPossible(e, IOException.class);
throw Throwables.propagate(e);
}
}
}
@Override
public Firehose connect() throws IOException
{
final ConnectionFactory factory = new ConnectionFactory();
primeConnectionFactory(factory);
String queue = consumerProps.getProperty("queue");
String exchange = consumerProps.getProperty("exchange");
@ -111,7 +117,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
public void shutdownCompleted(ShutdownSignalException cause)
{
log.warn(cause, "Connection closed!");
//TODO: should we re-establish the connection here?
//FUTURE: we could try to re-establish the connection here. Not done in this version though.
}
});
@ -124,7 +130,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
public void shutdownCompleted(ShutdownSignalException cause)
{
log.warn(cause, "Channel closed!");
//TODO: should we re-establish the connection here?
//FUTURE: we could try to re-establish the connection here. Not done in this version though.
}
});
@ -152,23 +158,23 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory
{
delivery = null;
try {
// Wait for the next delivery. This will block until something is available.
delivery = consumer.nextDelivery();
lastDeliveryTag = delivery.getEnvelope().getDeliveryTag();
//log.debug("Received new message from RabbitMQ");
if (delivery != null) {
lastDeliveryTag = delivery.getEnvelope().getDeliveryTag();
// If delivery is non-null, we report that there is something more to process.
return true;
}
}
catch (InterruptedException e) {
//TODO: Not exactly sure how we should react to this.
// Does it mean that delivery will be null and we should handle that
// as if there are no more messages (return false)?
// A little unclear on how we should handle this.
// At any rate, we're in an unknown state now so let's log something and return false.
log.wtf(e, "Got interrupted while waiting for next delivery. Doubt this should ever happen.");
}
if (delivery != null) {
// If delivery is non-null, we report that there is something more to process.
return true;
}
// This means that delivery is null so we have nothing more to process.
// This means that delivery is null or we caught the exception above so we report that we have
// nothing more to process.
return false;
}