Fix various InputRowParser serde issues

This commit is contained in:
Gian Merlino 2013-08-14 16:51:02 -07:00
parent 36d02ab943
commit dacee9ed20
4 changed files with 112 additions and 38 deletions

View File

@ -1,15 +1,8 @@
package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.common.exception.FormattedException;
import com.metamx.druid.input.InputRow;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
@JsonSubTypes({
@JsonSubTypes.Type(name = "string", value = StringInputRowParser.class),
@JsonSubTypes.Type(name = "protobuf", value = ProtoBufInputRowParser.class)
})
public interface InputRowParser<T>
{
public InputRow parse(T input) throws FormattedException;

View File

@ -19,6 +19,15 @@
package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ToLowerCaseParser;
import com.metamx.druid.input.InputRow;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CoderResult;
@ -26,21 +35,13 @@ import java.nio.charset.CodingErrorAction;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Charsets;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ToLowerCaseParser;
import com.metamx.druid.input.InputRow;
/**
*/
public class StringInputRowParser implements ByteBufferInputRowParser
{
private final InputRowParser<Map<String, Object>> inputRowCreator;
private final MapInputRowParser inputRowCreator;
private final Parser<String, Object> parser;
private final DataSpec dataSpec;
private CharBuffer chars = null;
@ -50,6 +51,7 @@ public class StringInputRowParser implements ByteBufferInputRowParser
@JsonProperty("data") DataSpec dataSpec,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions)
{
this.dataSpec = dataSpec;
this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec.getDimensions(), dimensionExclusions);
this.parser = new ToLowerCaseParser(dataSpec.getParser());
}
@ -116,9 +118,21 @@ public class StringInputRowParser implements ByteBufferInputRowParser
return inputRowCreator.parse(theMap);
}
@JsonValue
public InputRowParser<Map<String, Object>> getInputRowCreator()
{
return inputRowCreator;
}
@JsonProperty
public TimestampSpec getTimestampSpec()
{
return inputRowCreator.getTimestampSpec();
}
@JsonProperty("data")
public DataSpec getDataSpec()
{
return dataSpec;
}
@JsonProperty
public List<String> getDimensionExclusions()
{
return ImmutableList.copyOf(inputRowCreator.getDimensionExclusions());
}
}

View File

@ -0,0 +1,70 @@
package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.junit.Test;
import java.nio.ByteBuffer;
public class InputRowParserSerdeTest
{
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testStringInputRowParserSerde() throws Exception
{
final StringInputRowParser parser = new StringInputRowParser(
new TimestampSpec("timestamp", "iso"),
new JSONDataSpec(
ImmutableList.of("foo", "bar"), ImmutableList.<SpatialDimensionSchema>of()
),
ImmutableList.of("baz")
);
final ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser),
ByteBufferInputRowParser.class
);
final InputRow parsed = parser2.parse(
ByteBuffer.wrap(
"{\"foo\":\"x\",\"bar\":\"y\",\"qux\":\"z\",\"timestamp\":\"2000\"}".getBytes(Charsets.UTF_8)
)
);
Assert.assertEquals(ImmutableList.of("foo", "bar"), parsed.getDimensions());
Assert.assertEquals(ImmutableList.of("x"), parsed.getDimension("foo"));
Assert.assertEquals(ImmutableList.of("y"), parsed.getDimension("bar"));
Assert.assertEquals(new DateTime("2000").getMillis(), parsed.getTimestampFromEpoch());
}
@Test
public void testMapInputRowParserSerde() throws Exception
{
final MapInputRowParser parser = new MapInputRowParser(
new TimestampSpec("timestamp", "iso"),
ImmutableList.of("foo", "bar"),
ImmutableList.of("baz")
);
final MapInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser),
MapInputRowParser.class
);
final InputRow parsed = parser2.parse(
ImmutableMap.<String, Object>of(
"foo", "x",
"bar", "y",
"qux", "z",
"timestamp", "2000"
)
);
Assert.assertEquals(ImmutableList.of("foo", "bar"), parsed.getDimensions());
Assert.assertEquals(ImmutableList.of("x"), parsed.getDimension("foo"));
Assert.assertEquals(ImmutableList.of("y"), parsed.getDimension("bar"));
Assert.assertEquals(new DateTime("2000").getMillis(), parsed.getTimestampFromEpoch());
}
}

View File

@ -19,14 +19,13 @@
package com.metamx.druid.realtime.firehose;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.ByteBufferInputRowParser;
import com.metamx.druid.input.InputRow;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
@ -34,13 +33,11 @@ import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.InputRowParser;
import com.metamx.druid.input.InputRow;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
*/
@ -92,9 +89,9 @@ public class KafkaFirehoseFactory implements FirehoseFactory
{
private final ConsumerConnector connector;
private final Iterator<MessageAndMetadata<Message>> iter;
private final InputRowParser<ByteBuffer> parser;
private final ByteBufferInputRowParser parser;
public DefaultFirehose(ConsumerConnector connector, KafkaStream<Message> stream, InputRowParser<ByteBuffer> parser)
public DefaultFirehose(ConsumerConnector connector, KafkaStream<Message> stream, ByteBufferInputRowParser parser)
{
iter = stream.iterator();
this.connector = connector;