From da658ac69a0588e2588ab21b9f891152e0d387e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20Freyr=20Stef=C3=A1nsson?= Date: Thu, 18 Jul 2013 19:30:14 +0000 Subject: [PATCH] Changes according to Eric's comments on the pull request. - moved the RabbitMQ ConnectionFactory configuration to a method of its own to clean up the FirehoseFactory.connect() method. - using Throwables.propagate*() methods in exception handling. - removed TODOs and instead using the "keyword" FUTURE as well as adding some context to the comments themselves. - cleaned up the exception handling in hasMore() a little and made it more readable. --- .../firehose/RabbitMQFirehoseFactory.java | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java index 36dbad4d106..ed5856e9679 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/RabbitMQFirehoseFactory.java @@ -2,6 +2,7 @@ package com.metamx.druid.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Throwables; import com.metamx.common.logger.Logger; import com.metamx.druid.indexer.data.StringInputRowParser; import com.metamx.druid.input.InputRow; @@ -75,10 +76,8 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory this.parser = parser; } - @Override - public Firehose connect() throws IOException + private void primeConnectionFactory(ConnectionFactory factory) throws IOException { - final ConnectionFactory factory = new ConnectionFactory(); factory.setHost(consumerProps.getProperty("host", factory.getHost())); factory.setPort(Integer.parseInt(consumerProps.getProperty("port", Integer.toString(factory.getPort())))); factory.setUsername(consumerProps.getProperty("username", factory.getUsername())); @@ -91,10 +90,17 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory factory.setUri(consumerProps.getProperty("uri")); } catch(Exception e){ - // A little silly to throw an IOException but we'll make do for now with it. - throw new IOException("Bad URI format.", e); + Throwables.propagateIfPossible(e, IOException.class); + throw Throwables.propagate(e); } } + } + + @Override + public Firehose connect() throws IOException + { + final ConnectionFactory factory = new ConnectionFactory(); + primeConnectionFactory(factory); String queue = consumerProps.getProperty("queue"); String exchange = consumerProps.getProperty("exchange"); @@ -111,7 +117,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory public void shutdownCompleted(ShutdownSignalException cause) { log.warn(cause, "Connection closed!"); - //TODO: should we re-establish the connection here? + //FUTURE: we could try to re-establish the connection here. Not done in this version though. } }); @@ -124,7 +130,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory public void shutdownCompleted(ShutdownSignalException cause) { log.warn(cause, "Channel closed!"); - //TODO: should we re-establish the connection here? + //FUTURE: we could try to re-establish the connection here. Not done in this version though. } }); @@ -152,23 +158,23 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory { delivery = null; try { + // Wait for the next delivery. This will block until something is available. delivery = consumer.nextDelivery(); - lastDeliveryTag = delivery.getEnvelope().getDeliveryTag(); - //log.debug("Received new message from RabbitMQ"); + if (delivery != null) { + lastDeliveryTag = delivery.getEnvelope().getDeliveryTag(); + // If delivery is non-null, we report that there is something more to process. + return true; + } } catch (InterruptedException e) { - //TODO: Not exactly sure how we should react to this. - // Does it mean that delivery will be null and we should handle that - // as if there are no more messages (return false)? + // A little unclear on how we should handle this. + + // At any rate, we're in an unknown state now so let's log something and return false. log.wtf(e, "Got interrupted while waiting for next delivery. Doubt this should ever happen."); } - if (delivery != null) { - // If delivery is non-null, we report that there is something more to process. - return true; - } - - // This means that delivery is null so we have nothing more to process. + // This means that delivery is null or we caught the exception above so we report that we have + // nothing more to process. return false; }