introduced a ByteBufferInputRowParser to keep the old StringInputRowParser in places outside the KafkaFireHose

This commit is contained in:
Jan Rudert 2013-07-29 11:59:08 +02:00
parent 527c6c81c5
commit bba83ec532
9 changed files with 44 additions and 18 deletions

View File

@ -36,7 +36,6 @@ import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.zip.GZIPInputStream;
@ -119,7 +118,7 @@ public class FlightsFirehoseFactory implements FirehoseFactory
@Override
public InputRow nextRow()
{
final InputRow retVal = parser.parse(ByteBuffer.wrap(line.getBytes()));
final InputRow retVal = parser.parse(line);
line = null;
return retVal;
}

View File

@ -0,0 +1,17 @@
package com.metamx.druid.indexer.data;
import java.nio.ByteBuffer;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
* @author jan.rudert
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "protobuf", value = ProtoBufInputRowParser.class),
@JsonSubTypes.Type(name = "string", value = StringInputRowParser.class)
})
public interface ByteBufferInputRowParser extends InputRowParser<ByteBuffer> {
}

View File

@ -22,7 +22,7 @@ import com.metamx.druid.input.InputRow;
/**
* @author jan.rudert
*/
public class ProtoBufInputRowParser implements InputRowParser<ByteBuffer>
public class ProtoBufInputRowParser implements ByteBufferInputRowParser
{
private final MapInputRowParser inputRowCreator;

View File

@ -37,7 +37,7 @@ import com.metamx.druid.input.InputRow;
/**
*/
public class StringInputRowParser implements InputRowParser<ByteBuffer>
public class StringInputRowParser implements ByteBufferInputRowParser
{
private final InputRowParser<Map<String, Object>> inputRowCreator;
private final Parser<String, Object> parser;
@ -62,10 +62,7 @@ public class StringInputRowParser implements InputRowParser<ByteBuffer>
@Override
public InputRow parse(ByteBuffer input) throws FormattedException
{
Map<String, Object> theMap = buildStringKeyMap(input);
return inputRowCreator.parse(theMap);
return parseMap(buildStringKeyMap(input));
}
private Map<String, Object> buildStringKeyMap(ByteBuffer input)
@ -88,7 +85,7 @@ public class StringInputRowParser implements InputRowParser<ByteBuffer>
chars.flip();
try
{
theMap = parser.parse(chars.toString());
theMap = parseString(chars.toString());
} finally
{
chars.clear();
@ -104,6 +101,21 @@ public class StringInputRowParser implements InputRowParser<ByteBuffer>
return theMap;
}
private Map<String, Object> parseString(String inputString)
{
return parser.parse(inputString);
}
public InputRow parse(String input) throws FormattedException
{
return parseMap(parseString(input));
}
private InputRow parseMap(Map<String, Object> theMap)
{
return inputRowCreator.parse(theMap);
}
@JsonValue
public InputRowParser<Map<String, Object>> getInputRowCreator()
{

View File

@ -9,7 +9,6 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.joda.time.DateTime;
import java.io.IOException;
import java.nio.ByteBuffer;
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<LongWritable, Text, KEYOUT, VALUEOUT>
{
@ -42,7 +41,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
try {
final InputRow inputRow;
try {
inputRow = parser.parse(ByteBuffer.wrap(value.getBytes()));
inputRow = parser.parse(value.toString());
}
catch (IllegalArgumentException e) {
if (config.isIgnoreInvalidRows()) {

View File

@ -280,7 +280,7 @@ public class IndexGeneratorJob implements Jobby
for (final Text value : values) {
context.progress();
final InputRow inputRow = index.getSpatialDimensionRowFormatter().formatRow(parser.parse(ByteBuffer.wrap(value.getBytes())));
final InputRow inputRow = index.getSpatialDimensionRowFormatter().formatRow(parser.parse(value.toString()));
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(inputRow);

View File

@ -44,7 +44,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
@ -187,7 +186,7 @@ public class StaticS3FirehoseFactory implements FirehoseFactory
throw new NoSuchElementException();
}
return parser.parse(ByteBuffer.wrap(lineIterator.next().getBytes()));
return parser.parse(lineIterator.next());
}
@Override

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.metamx.druid.indexer.data.ByteBufferInputRowParser;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
@ -54,13 +55,13 @@ public class KafkaFirehoseFactory implements FirehoseFactory
private final String feed;
@JsonProperty
private final InputRowParser<ByteBuffer> parser;
private final ByteBufferInputRowParser parser;
@JsonCreator
public KafkaFirehoseFactory(
@JsonProperty("consumerProps") Properties consumerProps,
@JsonProperty("feed") String feed,
@JsonProperty("parser") InputRowParser<ByteBuffer> parser)
@JsonProperty("parser") ByteBufferInputRowParser parser)
{
this.consumerProps = consumerProps;
this.feed = feed;

View File

@ -43,7 +43,6 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@ -177,7 +176,7 @@ public class TestIndex
runOnce = true;
}
retVal.add(parser.parse(ByteBuffer.wrap(line.getBytes())));
retVal.add(parser.parse(line));
++lineCount;
return true;