mirror of https://github.com/apache/druid.git
Minor sanity checks when checking for and processing a new message.
This commit is contained in:
parent
13328a6b36
commit
82d2623cc9
|
@ -89,10 +89,6 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory{
|
|||
|
||||
return new Firehose(){
|
||||
|
||||
//private final Connection connection = conn;
|
||||
//private final Channel channel = ch;
|
||||
//private final QueueingConsumer consumer = qc;
|
||||
|
||||
/**
|
||||
* Storing the latest delivery as a member variable should be safe since this will only be run
|
||||
* by a single thread.
|
||||
|
@ -107,28 +103,35 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory{
|
|||
|
||||
@Override
|
||||
public boolean hasMore() {
|
||||
delivery = null;
|
||||
try {
|
||||
delivery = consumer.nextDelivery();
|
||||
lastDeliveryTag = delivery.getEnvelope().getDeliveryTag();
|
||||
|
||||
log.debug("got new message");
|
||||
} catch (InterruptedException e) {
|
||||
//TODO: I'm not exactly sure how we should react to this.
|
||||
// Does it mean that delivery will be null and we should handle that
|
||||
// as if there are no more messages (return false)?
|
||||
log.wtf(e, "Don't know if this is supposed to ever happen.");
|
||||
return false;
|
||||
}
|
||||
|
||||
if(delivery != null){
|
||||
// If delivery is non-null, we report that there is something more to process.
|
||||
return true;
|
||||
}
|
||||
|
||||
// Shouldn't ever get here but in case we'll assume there is no more stuff.
|
||||
log.wtf("We shouldn't be here!");
|
||||
// This means that delivery is null so we have nothing more to process.
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputRow nextRow() {
|
||||
log.debug("consuming new message");
|
||||
if(delivery == null){
|
||||
//Just making sure.
|
||||
log.wtf("I have nothing in delivery. Method hasMore() should have returned false.");
|
||||
return null;
|
||||
}
|
||||
|
||||
return parser.parse(new String(delivery.getBody()));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue