diff --git a/api/src/main/java/io/druid/data/input/impl/TimestampSpec.java b/api/src/main/java/io/druid/data/input/impl/TimestampSpec.java index 5ebf8504e63..2144e674e6d 100644 --- a/api/src/main/java/io/druid/data/input/impl/TimestampSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/TimestampSpec.java @@ -98,6 +98,7 @@ public class TimestampSpec { DateTime extracted = missingValue; if (input != null) { + // Check if the input is equal to the last input, so we don't need to parse it again if (input.equals(parseCtx.lastTimeObject)) { extracted = parseCtx.lastDateTime; } else { diff --git a/docs/content/development/extensions-contrib/parquet.md b/docs/content/development/extensions-contrib/parquet.md index b3732f52d6e..a0856c89000 100644 --- a/docs/content/development/extensions-contrib/parquet.md +++ b/docs/content/development/extensions-contrib/parquet.md @@ -18,6 +18,8 @@ This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of `in | parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a timeAndDims parseSpec. | yes | | binaryAsString | Boolean | Specifies if the bytes parquet column should be converted to strings. | no(default == false) | +When the time dimension is a [DateType column](https://github.com/Parquet/parquet-format/blob/master/LogicalTypes.md), a format should not be supplied. When the format is UTF8 (String), either `auto` or a explicitly defined [format](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html) is required. + ### Example json for overlord When posting the index job to the overlord, setting the correct `inputFormat` is required to switch to parquet ingestion. Make sure to set `jobProperties` to make hdfs path timezone unrelated: diff --git a/extensions-contrib/parquet-extensions/example/parquet_no_metric_job.json b/extensions-contrib/parquet-extensions/example/date_test_data_job_date.json similarity index 80% rename from extensions-contrib/parquet-extensions/example/parquet_no_metric_job.json rename to extensions-contrib/parquet-extensions/example/date_test_data_job_date.json index ead44698879..e88935aa206 100755 --- a/extensions-contrib/parquet-extensions/example/parquet_no_metric_job.json +++ b/extensions-contrib/parquet-extensions/example/date_test_data_job_date.json @@ -6,7 +6,7 @@ "inputSpec": { "type": "static", "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat", - "paths": "no_metrics" + "paths": "example/test_date_data.snappy.parquet" }, "metadataUpdateSpec": { "type": "postgresql", @@ -15,24 +15,21 @@ "password" : "asdf", "segmentTable": "druid_segments" }, - "segmentOutputPath": "tmp/segments" + "segmentOutputPath": "/tmp/segments" }, "dataSchema": { - "dataSource": "no_metrics", + "dataSource": "date_dataset_date", "parser": { "type": "parquet", "parseSpec": { "format": "timeAndDims", "timestampSpec": { - "column": "time", - "format": "auto" + "column": "date_as_date" }, "dimensionsSpec": { "dimensions": [ - "name" - ], - "dimensionExclusions": [], - "spatialDimensions": [] + "idx" + ] } } }, @@ -43,8 +40,8 @@ "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", - "queryGranularity": "ALL", - "intervals": ["2015-12-31/2016-01-02"] + "queryGranularity": "NONE", + "intervals": ["2017-06-17/2017-09-24"] } }, "tuningConfig": { diff --git a/extensions-contrib/parquet-extensions/example/date_test_data_job_string.json b/extensions-contrib/parquet-extensions/example/date_test_data_job_string.json new file mode 100755 index 00000000000..d6e355f4844 --- /dev/null +++ b/extensions-contrib/parquet-extensions/example/date_test_data_job_string.json @@ -0,0 +1,62 @@ +{ + "type": "index_hadoop", + "spec": { + "ioConfig": { + "type": "hadoop", + "inputSpec": { + "type": "static", + "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat", + "paths": "example/test_date_data.snappy.parquet" + }, + "metadataUpdateSpec": { + "type": "postgresql", + "connectURI": "jdbc:postgresql://localhost/druid", + "user" : "druid", + "password" : "asdf", + "segmentTable": "druid_segments" + }, + "segmentOutputPath": "/tmp/segments" + }, + "dataSchema": { + "dataSource": "date_dataset_string", + "parser": { + "type": "parquet", + "parseSpec": { + "format": "timeAndDims", + "timestampSpec": { + "column": "date_as_string", + "format": "Y-M-d" + }, + "dimensionsSpec": { + "dimensions": [ + "idx" + ] + } + } + }, + "metricsSpec": [{ + "type": "count", + "name": "count" + }], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "NONE", + "intervals": ["2017-06-17/2017-09-24"] + } + }, + "tuningConfig": { + "type": "hadoop", + "workingPath": "tmp/working_path", + "partitionsSpec": { + "targetPartitionSize": 5000000 + }, + "jobProperties" : { + "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps", + "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps", + "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" + }, + "leaveIntermediate": true + } + } +} diff --git a/extensions-contrib/parquet-extensions/example/test_date_data.snappy.parquet b/extensions-contrib/parquet-extensions/example/test_date_data.snappy.parquet new file mode 100644 index 00000000000..4900c09edc1 Binary files /dev/null and b/extensions-contrib/parquet-extensions/example/test_date_data.snappy.parquet differ diff --git a/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_job.json b/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_job.json index 2d5947899b3..aab4126517c 100755 --- a/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_job.json +++ b/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_job.json @@ -6,7 +6,7 @@ "inputSpec": { "type": "static", "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat", - "paths": "wikipedia.gz.parquet" + "paths": "example/wikipedia_list.parquet" }, "metadataUpdateSpec": { "type": "postgresql", diff --git a/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_task.json b/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_task.json deleted file mode 100755 index 858636b7ff3..00000000000 --- a/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_task.json +++ /dev/null @@ -1,58 +0,0 @@ -{ - "type" : "index_hadoop", - "spec" : { - "ioConfig": { - "type": "hadoop", - "inputSpec": { - "type": "static", - "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat", - "paths": "wikipedia_list.parquet" - } - }, - "dataSchema": { - "dataSource": "wikipedia", - "parser": { - "type": "parquet", - "parseSpec": { - "format": "timeAndDims", - "timestampSpec": { - "column": "timestamp", - "format": "auto" - }, - "dimensionsSpec": { - "dimensions": [ - "page", - "language", - "user" - ], - "dimensionExclusions": [], - "spatialDimensions": [] - } - } - }, - "metricsSpec": [ - { - "type": "count", - "name": "count" - }, - { - "type": "doubleSum", - "name": "added", - "fieldName": "added" - } - ], - "granularitySpec": { - "type": "uniform", - "segmentGranularity": "DAY", - "queryGranularity": "NONE", - "intervals": ["2013-08-31/2013-09-01"] - } - }, - "tuningConfig": { - "type": "hadoop", - "partitionsSpec": { - "targetPartitionSize": 5000000 - } - } - } -} diff --git a/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetHadoopInputRowParser.java b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetHadoopInputRowParser.java index e2663100080..2b5b705ca29 100755 --- a/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetHadoopInputRowParser.java +++ b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetHadoopInputRowParser.java @@ -29,16 +29,22 @@ import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.ParseSpec; import io.druid.data.input.impl.TimestampSpec; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.util.List; +import java.util.concurrent.TimeUnit; public class ParquetHadoopInputRowParser implements InputRowParser { private final ParseSpec parseSpec; private final boolean binaryAsString; private final List dimensions; + private final TimestampSpec timestampSpec; @JsonCreator public ParquetHadoopInputRowParser( @@ -47,6 +53,7 @@ public class ParquetHadoopInputRowParser implements InputRowParser dimensionSchema = parseSpec.getDimensionsSpec().getDimensions(); @@ -56,15 +63,41 @@ public class ParquetHadoopInputRowParser implements InputRowParser { + + /** + * Select the columns from the parquet schema that are used in the schema of the ingestion job + * + * @param context The context of the file to be read + * + * @return the partial schema that only contains the columns that are being used in the schema + */ private MessageType getPartialReadSchema(InitContext context) { MessageType fullSchema = context.getFileSchema(); @@ -86,7 +94,6 @@ public class DruidParquetReadSupport extends AvroReadSupport MessageType fileSchema, ReadContext readContext ) { - MessageType parquetSchema = readContext.getRequestedSchema(); Schema avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema); diff --git a/extensions-contrib/parquet-extensions/src/test/java/io/druid/data/input/parquet/DruidParquetInputTest.java b/extensions-contrib/parquet-extensions/src/test/java/io/druid/data/input/parquet/DruidParquetInputTest.java index 55052735ad1..c470b6a0eb1 100644 --- a/extensions-contrib/parquet-extensions/src/test/java/io/druid/data/input/parquet/DruidParquetInputTest.java +++ b/extensions-contrib/parquet-extensions/src/test/java/io/druid/data/input/parquet/DruidParquetInputTest.java @@ -18,7 +18,9 @@ */ package io.druid.data.input.parquet; +import com.google.common.collect.Lists; import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.path.StaticPathSpec; import org.apache.avro.generic.GenericRecord; @@ -36,17 +38,21 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.util.List; import static org.junit.Assert.assertEquals; public class DruidParquetInputTest { @Test - public void test() throws IOException, InterruptedException { - HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File("example/wikipedia_hadoop_parquet_job.json")); + public void testReadParquetFile() throws IOException, InterruptedException + { + HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File( + "example/wikipedia_hadoop_parquet_job.json") + ); Job job = Job.getInstance(new Configuration()); config.intoConfiguration(job); - GenericRecord data = getFirstRecord(job, "example/wikipedia_list.parquet"); + GenericRecord data = getFirstRecord(job, ((StaticPathSpec) config.getPathSpec()).getPaths()); // field not read, should return null assertEquals(data.get("added"), null); @@ -57,31 +63,80 @@ public class DruidParquetInputTest @Test public void testBinaryAsString() throws IOException, InterruptedException { - HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File("example/impala_hadoop_parquet_job.json")); + HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File( + "example/impala_hadoop_parquet_job.json") + ); Job job = Job.getInstance(new Configuration()); config.intoConfiguration(job); GenericRecord data = getFirstRecord(job, ((StaticPathSpec) config.getPathSpec()).getPaths()); InputRow row = config.getParser().parse(data); + // without binaryAsString: true, the value would something like "[104, 101, 121, 32, 116, 104, 105, 115, 32, 105, 115, 3.... ]" assertEquals(row.getDimension("field").get(0), "hey this is &é(-è_çà)=^$ù*! Ω^^"); assertEquals(row.getTimestampFromEpoch(), 1471800234); } - private GenericRecord getFirstRecord(Job job, String parquetPath) throws IOException, InterruptedException { + @Test + public void testDateHandling() throws IOException, InterruptedException + { + List rowsWithString = getAllRows("example/date_test_data_job_string.json"); + List rowsWithDate = getAllRows("example/date_test_data_job_date.json"); + assertEquals(rowsWithDate.size(), rowsWithString.size()); + + for (int i = 0; i < rowsWithDate.size(); i++) { + assertEquals(rowsWithString.get(i).getTimestamp(), rowsWithDate.get(i).getTimestamp()); + } + } + + private GenericRecord getFirstRecord(Job job, String parquetPath) throws IOException, InterruptedException + { File testFile = new File(parquetPath); Path path = new Path(testFile.getAbsoluteFile().toURI()); FileSplit split = new FileSplit(path, 0, testFile.length(), null); - DruidParquetInputFormat inputFormat = ReflectionUtils.newInstance(DruidParquetInputFormat.class, job.getConfiguration()); + DruidParquetInputFormat inputFormat = ReflectionUtils.newInstance( + DruidParquetInputFormat.class, + job.getConfiguration() + ); TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); - RecordReader reader = inputFormat.createRecordReader(split, context); - reader.initialize(split, context); - reader.nextKeyValue(); - GenericRecord data = (GenericRecord) reader.getCurrentValue(); - reader.close(); - return data; + try (RecordReader reader = inputFormat.createRecordReader(split, context)) { + + reader.initialize(split, context); + reader.nextKeyValue(); + return (GenericRecord) reader.getCurrentValue(); + } } + private List getAllRows(String configPath) throws IOException, InterruptedException + { + HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File(configPath)); + Job job = Job.getInstance(new Configuration()); + config.intoConfiguration(job); + + File testFile = new File(((StaticPathSpec) config.getPathSpec()).getPaths()); + Path path = new Path(testFile.getAbsoluteFile().toURI()); + FileSplit split = new FileSplit(path, 0, testFile.length(), null); + + DruidParquetInputFormat inputFormat = ReflectionUtils.newInstance( + DruidParquetInputFormat.class, + job.getConfiguration() + ); + TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); + + try (RecordReader reader = inputFormat.createRecordReader(split, context)) { + List records = Lists.newArrayList(); + InputRowParser parser = config.getParser(); + + reader.initialize(split, context); + while (reader.nextKeyValue()) { + reader.nextKeyValue(); + GenericRecord data = (GenericRecord) reader.getCurrentValue(); + records.add(parser.parse(data)); + } + + return records; + } + } }