Add Date support to the parquet reader (#4423)

* Add Date support to the parquet reader

Add support for the Date logical type. Currently this is not supported. Since the parquet
date is number of days since epoch gets interpreted as seconds since epoch, it will fails
on indexing the data because it will not map to the appriopriate bucket.

* Cleaned up code and tests

Got rid of unused json files in the examples, cleaned up the tests by
using try-with-resources. Now get the filenames from the json file
instead of hard coding them and integrated general improvements from
the feedback provided by leventov.

* Got rid of the caching

Remove the caching of the logical type of the time dimension column
and cleaned up the code a bit.
This commit is contained in:
Fokko Driesprong 2017-06-22 22:56:08 +02:00 committed by Roman Leventov
parent 3e60c9125d
commit ff501e8f13
10 changed files with 184 additions and 85 deletions

View File

@ -98,6 +98,7 @@ public class TimestampSpec
{
DateTime extracted = missingValue;
if (input != null) {
// Check if the input is equal to the last input, so we don't need to parse it again
if (input.equals(parseCtx.lastTimeObject)) {
extracted = parseCtx.lastDateTime;
} else {

View File

@ -18,6 +18,8 @@ This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of `in
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a timeAndDims parseSpec. | yes |
| binaryAsString | Boolean | Specifies if the bytes parquet column should be converted to strings. | no(default == false) |
When the time dimension is a [DateType column](https://github.com/Parquet/parquet-format/blob/master/LogicalTypes.md), a format should not be supplied. When the format is UTF8 (String), either `auto` or a explicitly defined [format](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html) is required.
### Example json for overlord
When posting the index job to the overlord, setting the correct `inputFormat` is required to switch to parquet ingestion. Make sure to set `jobProperties` to make hdfs path timezone unrelated:

View File

@ -6,7 +6,7 @@
"inputSpec": {
"type": "static",
"inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
"paths": "no_metrics"
"paths": "example/test_date_data.snappy.parquet"
},
"metadataUpdateSpec": {
"type": "postgresql",
@ -15,24 +15,21 @@
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "tmp/segments"
"segmentOutputPath": "/tmp/segments"
},
"dataSchema": {
"dataSource": "no_metrics",
"dataSource": "date_dataset_date",
"parser": {
"type": "parquet",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "time",
"format": "auto"
"column": "date_as_date"
},
"dimensionsSpec": {
"dimensions": [
"name"
],
"dimensionExclusions": [],
"spatialDimensions": []
"idx"
]
}
}
},
@ -43,8 +40,8 @@
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "ALL",
"intervals": ["2015-12-31/2016-01-02"]
"queryGranularity": "NONE",
"intervals": ["2017-06-17/2017-09-24"]
}
},
"tuningConfig": {

View File

@ -0,0 +1,62 @@
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
"paths": "example/test_date_data.snappy.parquet"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "/tmp/segments"
},
"dataSchema": {
"dataSource": "date_dataset_string",
"parser": {
"type": "parquet",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "date_as_string",
"format": "Y-M-d"
},
"dimensionsSpec": {
"dimensions": [
"idx"
]
}
}
},
"metricsSpec": [{
"type": "count",
"name": "count"
}],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2017-06-17/2017-09-24"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {
"mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
},
"leaveIntermediate": true
}
}
}

View File

@ -6,7 +6,7 @@
"inputSpec": {
"type": "static",
"inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
"paths": "wikipedia.gz.parquet"
"paths": "example/wikipedia_list.parquet"
},
"metadataUpdateSpec": {
"type": "postgresql",

View File

@ -1,58 +0,0 @@
{
"type" : "index_hadoop",
"spec" : {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
"paths": "wikipedia_list.parquet"
}
},
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "parquet",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2013-08-31/2013-09-01"]
}
},
"tuningConfig": {
"type": "hadoop",
"partitionsSpec": {
"targetPartitionSize": 5000000
}
}
}
}

View File

@ -29,16 +29,22 @@ import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ParquetHadoopInputRowParser implements InputRowParser<GenericRecord>
{
private final ParseSpec parseSpec;
private final boolean binaryAsString;
private final List<String> dimensions;
private final TimestampSpec timestampSpec;
@JsonCreator
public ParquetHadoopInputRowParser(
@ -47,6 +53,7 @@ public class ParquetHadoopInputRowParser implements InputRowParser<GenericRecord
)
{
this.parseSpec = parseSpec;
this.timestampSpec = parseSpec.getTimestampSpec();
this.binaryAsString = binaryAsString == null ? false : binaryAsString;
List<DimensionSchema> dimensionSchema = parseSpec.getDimensionsSpec().getDimensions();
@ -56,15 +63,41 @@ public class ParquetHadoopInputRowParser implements InputRowParser<GenericRecord
}
}
@Nullable
private LogicalType determineTimestampSpecLogicalType(Schema schema, String timestampSpecField)
{
for (Schema.Field field : schema.getFields()) {
if (field.name().equals(timestampSpecField)) {
return field.schema().getLogicalType();
}
}
return null;
}
/**
* imitate avro extension {@link AvroStreamInputRowParser#parseGenericRecord(GenericRecord, ParseSpec, List, boolean, boolean)}
*/
@Override
public InputRow parse(GenericRecord record)
{
// Map the record to a map
GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, false, binaryAsString);
TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
DateTime dateTime = timestampSpec.extractTimestamp(genericRecordAsMap);
// Determine logical type of the timestamp column
LogicalType logicalType = determineTimestampSpecLogicalType(record.getSchema(), timestampSpec.getTimestampColumn());
// Parse time timestamp based on the parquet schema.
// https://github.com/Parquet/parquet-format/blob/1afe8d9ae7e38acfc4ea273338a3c0c35feca115/LogicalTypes.md#date
DateTime dateTime;
if (logicalType instanceof LogicalTypes.Date) {
int daysSinceEpoch = (Integer) genericRecordAsMap.get(timestampSpec.getTimestampColumn());
dateTime = new DateTime(TimeUnit.DAYS.toMillis(daysSinceEpoch));
} else {
// Fall back to a binary format that will be parsed using joda-time
dateTime = timestampSpec.extractTimestamp(genericRecordAsMap);
}
return new MapBasedInputRow(dateTime, dimensions, genericRecordAsMap);
}

View File

@ -39,6 +39,14 @@ import java.util.Set;
public class DruidParquetReadSupport extends AvroReadSupport<GenericRecord>
{
/**
* Select the columns from the parquet schema that are used in the schema of the ingestion job
*
* @param context The context of the file to be read
*
* @return the partial schema that only contains the columns that are being used in the schema
*/
private MessageType getPartialReadSchema(InitContext context)
{
MessageType fullSchema = context.getFileSchema();
@ -86,7 +94,6 @@ public class DruidParquetReadSupport extends AvroReadSupport<GenericRecord>
MessageType fileSchema, ReadContext readContext
)
{
MessageType parquetSchema = readContext.getRequestedSchema();
Schema avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema);

View File

@ -18,7 +18,9 @@
*/
package io.druid.data.input.parquet;
import com.google.common.collect.Lists;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.path.StaticPathSpec;
import org.apache.avro.generic.GenericRecord;
@ -36,17 +38,21 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class DruidParquetInputTest
{
@Test
public void test() throws IOException, InterruptedException {
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File("example/wikipedia_hadoop_parquet_job.json"));
public void testReadParquetFile() throws IOException, InterruptedException
{
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File(
"example/wikipedia_hadoop_parquet_job.json")
);
Job job = Job.getInstance(new Configuration());
config.intoConfiguration(job);
GenericRecord data = getFirstRecord(job, "example/wikipedia_list.parquet");
GenericRecord data = getFirstRecord(job, ((StaticPathSpec) config.getPathSpec()).getPaths());
// field not read, should return null
assertEquals(data.get("added"), null);
@ -57,31 +63,80 @@ public class DruidParquetInputTest
@Test
public void testBinaryAsString() throws IOException, InterruptedException
{
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File("example/impala_hadoop_parquet_job.json"));
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File(
"example/impala_hadoop_parquet_job.json")
);
Job job = Job.getInstance(new Configuration());
config.intoConfiguration(job);
GenericRecord data = getFirstRecord(job, ((StaticPathSpec) config.getPathSpec()).getPaths());
InputRow row = config.getParser().parse(data);
// without binaryAsString: true, the value would something like "[104, 101, 121, 32, 116, 104, 105, 115, 32, 105, 115, 3.... ]"
assertEquals(row.getDimension("field").get(0), "hey this is &é(-è_çà)=^$ù*! Ω^^");
assertEquals(row.getTimestampFromEpoch(), 1471800234);
}
private GenericRecord getFirstRecord(Job job, String parquetPath) throws IOException, InterruptedException {
@Test
public void testDateHandling() throws IOException, InterruptedException
{
List<InputRow> rowsWithString = getAllRows("example/date_test_data_job_string.json");
List<InputRow> rowsWithDate = getAllRows("example/date_test_data_job_date.json");
assertEquals(rowsWithDate.size(), rowsWithString.size());
for (int i = 0; i < rowsWithDate.size(); i++) {
assertEquals(rowsWithString.get(i).getTimestamp(), rowsWithDate.get(i).getTimestamp());
}
}
private GenericRecord getFirstRecord(Job job, String parquetPath) throws IOException, InterruptedException
{
File testFile = new File(parquetPath);
Path path = new Path(testFile.getAbsoluteFile().toURI());
FileSplit split = new FileSplit(path, 0, testFile.length(), null);
DruidParquetInputFormat inputFormat = ReflectionUtils.newInstance(DruidParquetInputFormat.class, job.getConfiguration());
DruidParquetInputFormat inputFormat = ReflectionUtils.newInstance(
DruidParquetInputFormat.class,
job.getConfiguration()
);
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader reader = inputFormat.createRecordReader(split, context);
reader.initialize(split, context);
reader.nextKeyValue();
GenericRecord data = (GenericRecord) reader.getCurrentValue();
reader.close();
return data;
try (RecordReader reader = inputFormat.createRecordReader(split, context)) {
reader.initialize(split, context);
reader.nextKeyValue();
return (GenericRecord) reader.getCurrentValue();
}
}
private List<InputRow> getAllRows(String configPath) throws IOException, InterruptedException
{
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File(configPath));
Job job = Job.getInstance(new Configuration());
config.intoConfiguration(job);
File testFile = new File(((StaticPathSpec) config.getPathSpec()).getPaths());
Path path = new Path(testFile.getAbsoluteFile().toURI());
FileSplit split = new FileSplit(path, 0, testFile.length(), null);
DruidParquetInputFormat inputFormat = ReflectionUtils.newInstance(
DruidParquetInputFormat.class,
job.getConfiguration()
);
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
try (RecordReader reader = inputFormat.createRecordReader(split, context)) {
List<InputRow> records = Lists.newArrayList();
InputRowParser parser = config.getParser();
reader.initialize(split, context);
while (reader.nextKeyValue()) {
reader.nextKeyValue();
GenericRecord data = (GenericRecord) reader.getCurrentValue();
records.add(parser.parse(data));
}
return records;
}
}
}