improve kafka intake stability

This commit is contained in:
Hagen Rother 2013-12-17 15:27:17 +01:00
parent 9fc0e18962
commit af202d7576
2 changed files with 18 additions and 2 deletions

View File

@ -107,8 +107,16 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory
return null;
}
try {
return parser.parse(ByteBuffer.wrap(message));
}
catch (Exception e) {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Error parsing[%s], got [%s]", ByteBuffer.wrap(message), e.toString()))
.build();
}
}
@Override
public Runnable commit()

View File

@ -120,8 +120,16 @@ public class KafkaSevenFirehoseFactory implements FirehoseFactory
public InputRow parseMessage(Message message) throws FormattedException
{
try {
return parser.parse(message.payload());
}
catch (Exception e) {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Error parsing[%s], got [%s]", message.payload(), e.toString()))
.build();
}
}
@Override
public Runnable commit()