no bytebuffer in the middle

This commit is contained in:
Jan Rudert 2013-07-12 16:02:15 +02:00
parent 89b0c84f3b
commit a8039107d8
3 changed files with 9 additions and 24 deletions

View File

@ -12,6 +12,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
@ -20,7 +21,7 @@ import com.metamx.druid.input.InputRow;
/**
* @author jan.rudert
*/
public class ProtoBufInputRowParser implements InputRowParser<byte[]>
public class ProtoBufInputRowParser implements InputRowParser<ByteString>
{
private final MapInputRowParser inputRowCreator;
@ -41,7 +42,7 @@ public class ProtoBufInputRowParser implements InputRowParser<byte[]>
}
@Override
public InputRow parse(byte[] input)
public InputRow parse(ByteString input)
{
Map<String, Object> theMap = buildStringKeyMap(input);
@ -49,7 +50,7 @@ public class ProtoBufInputRowParser implements InputRowParser<byte[]>
return inputRowCreator.parse(theMap);
}
private Map<String, Object> buildStringKeyMap(byte[] input)
private Map<String, Object> buildStringKeyMap(ByteString input)
{
Map<String, Object> theMap = Maps.newHashMap();
@ -60,7 +61,7 @@ public class ProtoBufInputRowParser implements InputRowParser<byte[]>
for (Map.Entry<Descriptors.FieldDescriptor, Object> entry : allFields.entrySet())
{
String name = entry.getKey().getName();
String name = entry.getKey().getName().toLowerCase();
if (theMap.containsKey(name))
{
continue;

View File

@ -7,6 +7,7 @@ import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.util.List;
import com.google.protobuf.ByteString;
import org.joda.time.DateTime;
import org.junit.Test;
@ -59,7 +60,7 @@ public class ProtoBufInputRowParserTest {
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
InputRow row = parser.parse(out.toByteArray());
InputRow row = parser.parse(ByteString.copyFrom(out.toByteArray()));
System.out.println(row);
assertEquals(Arrays.asList(DIMENSIONS), row.getDimensions());
assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());

View File

@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.google.protobuf.ByteString;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
@ -212,8 +213,6 @@ public class KafkaFirehoseFactory implements FirehoseFactory
private class ProtoBufMessageFirehose extends AbstractKafkaFirehose
{
private ByteBuffer bytes = null;
public ProtoBufMessageFirehose(KafkaStream<Message> stream, ConsumerConnector connector)
{
super(connector, stream);
@ -222,23 +221,7 @@ public class KafkaFirehoseFactory implements FirehoseFactory
@Override
public InputRow parseMessage(Message message) throws FormattedException
{
int payloadSize = message.payloadSize();
if (bytes == null || bytes.remaining() < payloadSize)
{
bytes = ByteBuffer.allocate(payloadSize);
}
bytes.put(message.payload());
bytes.flip();
try
{
return parser.parse(bytes);
} finally
{
bytes.clear();
}
return parser.parse(ByteString.copyFrom(message.payload()));
}
}