diff --git a/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverFirehoseFactory.java b/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverFirehoseFactory.java index bc7c65e9897..d2376e5d0c2 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverFirehoseFactory.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/index/EventReceiverFirehoseFactory.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.Lists; import com.metamx.druid.indexer.data.MapInputRowParser; import com.metamx.druid.input.InputRow; import com.metamx.druid.realtime.firehose.Firehose; @@ -15,6 +16,7 @@ import com.metamx.emitter.EmittingLogger; import java.io.IOException; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; @@ -84,14 +86,14 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory public class EventReceiverFirehose implements EventReceiver, Firehose { - private final BlockingQueue> buffer; + private final BlockingQueue buffer; private final Object readLock = new Object(); - private volatile Map nextEvent = null; + private volatile InputRow nextRow = null; private volatile boolean closed = false; public EventReceiverFirehose() { - this.buffer = new ArrayBlockingQueue>(bufferSize); + this.buffer = new ArrayBlockingQueue(bufferSize); } @Override @@ -99,11 +101,17 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory { log.debug("Adding %,d events to firehose: %s", events.size(), firehoseId); + final List rows = Lists.newArrayList(); + for (final Map event : events) { + // Might throw an exception. We'd like that to happen now, instead of while adding to the row buffer. + rows.add(parser.parse(event)); + } + try { - for (final Map event : events) { + for (final InputRow row : rows) { boolean added = false; while (!closed && !added) { - added = buffer.offer(event, 500, TimeUnit.MILLISECONDS); + added = buffer.offer(row, 500, TimeUnit.MILLISECONDS); } if (!added) { @@ -121,8 +129,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory { synchronized (readLock) { try { - while (!closed && nextEvent == null) { - nextEvent = buffer.poll(500, TimeUnit.MILLISECONDS); + while (!closed && nextRow == null) { + nextRow = buffer.poll(500, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { @@ -130,7 +138,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory throw Throwables.propagate(e); } - return nextEvent != null; + return nextRow != null; } } @@ -138,14 +146,13 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory public InputRow nextRow() { synchronized (readLock) { - final Map event = nextEvent; + final InputRow row = nextRow; - if (event == null) { + if (row == null) { throw new NoSuchElementException(); } else { - // If nextEvent is unparseable, don't return it again - nextEvent = null; - return parser.parse(event); + nextRow = null; + return row; } } }