Add flag binaryAsString for parquet ingestion (#3381)

This commit is contained in:
Stéphane Derosiaux 2016-08-31 02:30:50 +02:00 committed by Fangjin Yang
parent c4e8440c22
commit 48dce88aab
8 changed files with 141 additions and 43 deletions

View File

@ -16,6 +16,7 @@ This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inp
|----------|-------------|----------------------------------------------------------------------------------------|---------|
| type | String | This should say `parquet` | yes |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a timeAndDims parseSpec. | yes |
| binaryAsString | Boolean | Specifies if the bytes parquet column should be converted to strings. | no(default == false) |
For example:

View File

@ -0,0 +1,65 @@
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
"paths": "example/284a0e001476716b-56d5676f53bd6e85_115466471_data.0.parq"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "/tmp/segments"
},
"dataSchema": {
"dataSource": "impala",
"parser": {
"type": "parquet",
"binaryAsString": true,
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "ts",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"field"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [{
"type": "count",
"name": "count"
}],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2013-08-30/2013-09-02"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {
"mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
},
"leaveIntermediate": true
}
}
}

View File

@ -37,14 +37,17 @@ import java.util.List;
public class ParquetHadoopInputRowParser implements InputRowParser<GenericRecord>
{
private final ParseSpec parseSpec;
private final boolean binaryAsString;
private final List<String> dimensions;
@JsonCreator
public ParquetHadoopInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("binaryAsString") Boolean binaryAsString
)
{
this.parseSpec = parseSpec;
this.binaryAsString = binaryAsString == null ? false : binaryAsString;
List<DimensionSchema> dimensionSchema = parseSpec.getDimensionsSpec().getDimensions();
this.dimensions = Lists.newArrayList();
@ -54,12 +57,12 @@ public class ParquetHadoopInputRowParser implements InputRowParser<GenericRecord
}
/**
* imitate avro extension {@link AvroStreamInputRowParser#parseGenericRecord(GenericRecord, ParseSpec, List, boolean)}
* imitate avro extension {@link AvroStreamInputRowParser#parseGenericRecord(GenericRecord, ParseSpec, List, boolean, boolean)}
*/
@Override
public InputRow parse(GenericRecord record)
{
GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, false);
GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, false, binaryAsString);
TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
DateTime dateTime = timestampSpec.extractTimestamp(genericRecordAsMap);
return new MapBasedInputRow(dateTime, dimensions, genericRecordAsMap);
@ -75,6 +78,6 @@ public class ParquetHadoopInputRowParser implements InputRowParser<GenericRecord
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new ParquetHadoopInputRowParser(parseSpec);
return new ParquetHadoopInputRowParser(parseSpec, binaryAsString);
}
}

View File

@ -18,7 +18,12 @@
*/
package io.druid.data.input.parquet;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.path.StaticPathSpec;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
@ -30,49 +35,68 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import static org.apache.parquet.avro.AvroParquetWriter.builder;
import static org.junit.Assert.assertEquals;
public class DruidParquetInputTest
{
@Test
public void test() throws IOException, InterruptedException
{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File(
"example/wikipedia_hadoop_parquet_job.json"));
public void test() throws IOException, InterruptedException {
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File("example/wikipedia_hadoop_parquet_job.json"));
Job job = Job.getInstance(new Configuration());
config.intoConfiguration(job);
File testFile = new File("example/wikipedia_list.parquet");
Path path = new Path(testFile.getAbsoluteFile().toURI());
FileSplit split = new FileSplit(path, 0, testFile.length(), null);
DruidParquetInputFormat inputFormat = ReflectionUtils.newInstance(
DruidParquetInputFormat.class,
job.getConfiguration()
);
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader reader = inputFormat.createRecordReader(split, context);
reader.initialize(split, context);
reader.nextKeyValue();
GenericRecord data = (GenericRecord) reader.getCurrentValue();
GenericRecord data = getFirstRecord(job, "example/wikipedia_list.parquet");
// field not read, should return null
assertEquals(data.get("added"), null);
assertEquals(data.get("page"), new Utf8("Gypsy Danger"));
assertEquals(config.getParser().parse(data).getDimension("page").get(0), "Gypsy Danger");
}
@Test
public void testBinaryAsString() throws IOException, InterruptedException
{
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File("example/impala_hadoop_parquet_job.json"));
Job job = Job.getInstance(new Configuration());
config.intoConfiguration(job);
GenericRecord data = getFirstRecord(job, ((StaticPathSpec) config.getPathSpec()).getPaths());
InputRow row = config.getParser().parse(data);
// without binaryAsString: true, the value would something like "[104, 101, 121, 32, 116, 104, 105, 115, 32, 105, 115, 3.... ]"
assertEquals(row.getDimension("field").get(0), "hey this is &é(-è_çà)=^$ù*! Ω^^");
assertEquals(row.getTimestampFromEpoch(), 1471800234);
}
private GenericRecord getFirstRecord(Job job, String parquetPath) throws IOException, InterruptedException {
File testFile = new File(parquetPath);
Path path = new Path(testFile.getAbsoluteFile().toURI());
FileSplit split = new FileSplit(path, 0, testFile.length(), null);
DruidParquetInputFormat inputFormat = ReflectionUtils.newInstance(DruidParquetInputFormat.class, job.getConfiguration());
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader reader = inputFormat.createRecordReader(split, context);
reader.initialize(split, context);
reader.nextKeyValue();
GenericRecord data = (GenericRecord) reader.getCurrentValue();
reader.close();
return data;
}
}

View File

@ -46,7 +46,7 @@ public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
@Override
public InputRow parse(GenericRecord record)
{
return AvroStreamInputRowParser.parseGenericRecord(record, parseSpec, dimensions, fromPigAvroStorage);
return AvroStreamInputRowParser.parseGenericRecord(record, parseSpec, dimensions, fromPigAvroStorage, false);
}
@JsonProperty

View File

@ -50,14 +50,13 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
@Override
public InputRow parse(ByteBuffer input)
{
return parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, dimensions, false);
return parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, dimensions, false, false);
}
protected static InputRow parseGenericRecord(
GenericRecord record, ParseSpec parseSpec, List<String> dimensions, boolean fromPigAvroStorage
)
protected static InputRow parseGenericRecord(GenericRecord record, ParseSpec parseSpec, List<String> dimensions,
boolean fromPigAvroStorage, boolean binaryAsString)
{
GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, fromPigAvroStorage);
GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, fromPigAvroStorage, binaryAsString);
TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
DateTime dateTime = timestampSpec.extractTimestamp(genericRecordAsMap);
return new MapBasedInputRow(dateTime, dimensions, genericRecordAsMap);

View File

@ -36,6 +36,7 @@ public class GenericRecordAsMap implements Map<String, Object>
{
private final GenericRecord record;
private final boolean fromPigAvroStorage;
private final boolean binaryAsString;
private static final Function<Object, String> PIG_AVRO_STORAGE_ARRAY_TO_STRING_INCLUDING_NULL = new Function<Object, String>()
{
@ -47,10 +48,11 @@ public class GenericRecordAsMap implements Map<String, Object>
}
};
public GenericRecordAsMap(GenericRecord record, boolean fromPigAvroStorage)
public GenericRecordAsMap(GenericRecord record, boolean fromPigAvroStorage, boolean binaryAsString)
{
this.record = record;
this.fromPigAvroStorage = fromPigAvroStorage;
this.binaryAsString = binaryAsString;
}
@Override
@ -83,7 +85,7 @@ public class GenericRecordAsMap implements Map<String, Object>
* <li> avro schema type -> druid dimension:</li>
* <ul>
* <li>null, boolean, int, long, float, double, string, Records, Enums, Maps, Fixed -> String, using String.valueOf</li>
* <li>bytes -> Arrays.toString() </li>
* <li>bytes -> Arrays.toString() or new String if binaryAsString is true</li>
* <li>Arrays -> List&lt;String&gt;, using Lists.transform(&lt;List&gt;dimValue, TO_STRING_INCLUDING_NULL)</li>
* </ul>
* <li> avro schema type -> druid metric:</li>
@ -103,8 +105,12 @@ public class GenericRecordAsMap implements Map<String, Object>
return Lists.transform((List) field, PIG_AVRO_STORAGE_ARRAY_TO_STRING_INCLUDING_NULL);
}
if (field instanceof ByteBuffer) {
if (binaryAsString) {
return new String(((ByteBuffer) field).array());
} else {
return Arrays.toString(((ByteBuffer) field).array());
}
}
if (field instanceof Utf8) {
return field.toString();
}