fix kafka8 unparsable message halt job issue (#4164)

This commit is contained in:
kaijianding 2017-04-19 02:23:02 +08:00 committed by Fangjin Yang
parent 0bcfd9354c
commit db656c5a88
1 changed files with 10 additions and 1 deletions

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Committer;
@ -182,7 +183,6 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements
@Override
public void start() throws Exception
{
nextMessage();
}
@Override
@ -224,6 +224,15 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements
if (stopped) {
return null;
}
// currRow will be called before the first advance
if (row == null) {
try {
nextMessage();
}
catch (ParseException e) {
return null;
}
}
return row;
}