EventReceiverFirehoseFactory: Move parse exceptions from hasMore to addAll

This commit is contained in:
Gian Merlino 2013-04-30 15:05:26 +03:00
parent f463b95256
commit 862365e1fa
1 changed files with 20 additions and 13 deletions

View File

@ -7,6 +7,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.metamx.druid.indexer.data.MapInputRowParser;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.firehose.Firehose;
@ -15,6 +16,7 @@ import com.metamx.emitter.EmittingLogger;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
@ -84,14 +86,14 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
public class EventReceiverFirehose implements EventReceiver, Firehose
{
private final BlockingQueue<Map<String, Object>> buffer;
private final BlockingQueue<InputRow> buffer;
private final Object readLock = new Object();
private volatile Map<String, Object> nextEvent = null;
private volatile InputRow nextRow = null;
private volatile boolean closed = false;
public EventReceiverFirehose()
{
this.buffer = new ArrayBlockingQueue<Map<String, Object>>(bufferSize);
this.buffer = new ArrayBlockingQueue<InputRow>(bufferSize);
}
@Override
@ -99,11 +101,17 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
{
log.debug("Adding %,d events to firehose: %s", events.size(), firehoseId);
final List<InputRow> rows = Lists.newArrayList();
for (final Map<String, Object> event : events) {
// Might throw an exception. We'd like that to happen now, instead of while adding to the row buffer.
rows.add(parser.parse(event));
}
try {
for (final Map<String, Object> event : events) {
for (final InputRow row : rows) {
boolean added = false;
while (!closed && !added) {
added = buffer.offer(event, 500, TimeUnit.MILLISECONDS);
added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);
}
if (!added) {
@ -121,8 +129,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
{
synchronized (readLock) {
try {
while (!closed && nextEvent == null) {
nextEvent = buffer.poll(500, TimeUnit.MILLISECONDS);
while (!closed && nextRow == null) {
nextRow = buffer.poll(500, TimeUnit.MILLISECONDS);
}
}
catch (InterruptedException e) {
@ -130,7 +138,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
throw Throwables.propagate(e);
}
return nextEvent != null;
return nextRow != null;
}
}
@ -138,14 +146,13 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
public InputRow nextRow()
{
synchronized (readLock) {
final Map<String, Object> event = nextEvent;
final InputRow row = nextRow;
if (event == null) {
if (row == null) {
throw new NoSuchElementException();
} else {
// If nextEvent is unparseable, don't return it again
nextEvent = null;
return parser.parse(event);
nextRow = null;
return row;
}
}
}