From 0f2dfe6fe8e21fca39a73b71c7fbba86c4158086 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 26 Sep 2018 13:18:15 -0700 Subject: [PATCH] fix issue with parsing 'date' columns with druid-orc-extensions (#6380) * fix issue with parsing date columns with druid-orc-extensions * stuff * fix forbidden api --- .../example/hadoop_orc_job.json | 1 + .../input/orc/OrcHadoopInputRowParser.java | 17 +++- .../input/orc/DruidOrcInputFormatTest.java | 81 +++++++++++++++++++ 3 files changed, 97 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/orc-extensions/example/hadoop_orc_job.json b/extensions-contrib/orc-extensions/example/hadoop_orc_job.json index 870f6077fc6..c85ebb6721a 100755 --- a/extensions-contrib/orc-extensions/example/hadoop_orc_job.json +++ b/extensions-contrib/orc-extensions/example/hadoop_orc_job.json @@ -29,6 +29,7 @@ }, "dimensionsSpec": { "dimensions": [ + "timestamp", "col1", "col2" ], diff --git a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java index 67a69faef9d..c90834af612 100644 --- a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java +++ b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java @@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; @@ -81,7 +82,10 @@ public class OrcHadoopInputRowParser implements InputRowParser { this.parseSpec = parseSpec; this.typeString = typeString == null ? typeStringFromParseSpec(parseSpec) : typeString; - this.mapFieldNameFormat = mapFieldNameFormat == null || mapFieldNameFormat.indexOf(MAP_PARENT_TAG) < 0 || mapFieldNameFormat.indexOf(MAP_CHILD_TAG) < 0 ? DEFAULT_MAP_FIELD_NAME_FORMAT : mapFieldNameFormat; + this.mapFieldNameFormat = + mapFieldNameFormat == null || + mapFieldNameFormat.indexOf(MAP_PARENT_TAG) < 0 || + mapFieldNameFormat.indexOf(MAP_CHILD_TAG) < 0 ? DEFAULT_MAP_FIELD_NAME_FORMAT : mapFieldNameFormat; this.mapParentFieldNameFormat = this.mapFieldNameFormat.replace(MAP_PARENT_TAG, "%s"); this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); this.oip = makeObjectInspector(this.typeString); @@ -226,9 +230,16 @@ public class OrcHadoopInputRowParser implements InputRowParser { StringBuilder builder = new StringBuilder("struct<"); builder.append(parseSpec.getTimestampSpec().getTimestampColumn()).append(":string"); + // the typeString seems positionally dependent, so repeated timestamp column causes incorrect mapping if (parseSpec.getDimensionsSpec().getDimensionNames().size() > 0) { builder.append(","); - builder.append(String.join(":string,", parseSpec.getDimensionsSpec().getDimensionNames())); + builder.append(String.join( + ":string,", + parseSpec.getDimensionsSpec() + .getDimensionNames() + .stream() + .filter(s -> !s.equals(parseSpec.getTimestampSpec().getTimestampColumn())) + .collect(Collectors.toList()))); builder.append(":string"); } builder.append(">"); @@ -241,6 +252,8 @@ public class OrcHadoopInputRowParser implements InputRowParser if (object instanceof HiveDecimalWritable) { // inspector on HiveDecimal rounds off to integer for some reason. return ((HiveDecimalWritable) object).getHiveDecimal().doubleValue(); + } else if (object instanceof DateWritable) { + return object.toString(); } else { return inspector.getPrimitiveJavaObject(object); } diff --git a/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/DruidOrcInputFormatTest.java b/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/DruidOrcInputFormatTest.java index 0b22eb754e3..615efdbbd44 100644 --- a/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/DruidOrcInputFormatTest.java +++ b/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/DruidOrcInputFormatTest.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; @@ -43,6 +44,7 @@ import org.apache.orc.CompressionKind; import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -52,6 +54,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.concurrent.TimeUnit; public class DruidOrcInputFormatTest { @@ -110,6 +113,36 @@ public class DruidOrcInputFormatTest reader.close(); } + @Test + public void testReadDateColumn() throws IOException, InterruptedException + { + File testFile2 = makeOrcFileWithDate(); + Path path = new Path(testFile2.getAbsoluteFile().toURI()); + FileSplit split = new FileSplit(path, 0, testFile2.length(), null); + + InputFormat inputFormat = ReflectionUtils.newInstance(OrcNewInputFormat.class, job.getConfiguration()); + + TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); + RecordReader reader = inputFormat.createRecordReader(split, context); + InputRowParser parser = (InputRowParser) config.getParser(); + + reader.initialize(split, context); + + reader.nextKeyValue(); + + OrcStruct data = (OrcStruct) reader.getCurrentValue(); + + MapBasedInputRow row = (MapBasedInputRow) parser.parseBatch(data).get(0); + + Assert.assertTrue(row.getEvent().keySet().size() == 4); + Assert.assertEquals(DateTimes.of(timestamp), row.getTimestamp()); + Assert.assertEquals(parser.getParseSpec().getDimensionsSpec().getDimensionNames(), row.getDimensions()); + Assert.assertEquals(col1, row.getEvent().get("col1")); + Assert.assertEquals(Arrays.asList(col2), row.getDimension("col2")); + + reader.close(); + } + private File makeOrcFile() throws IOException { final File dir = temporaryFolder.newFolder(); @@ -157,4 +190,52 @@ public class DruidOrcInputFormatTest return testOrc; } + + private File makeOrcFileWithDate() throws IOException + { + final File dir = temporaryFolder.newFolder(); + final File testOrc = new File(dir, "test-2.orc"); + TypeDescription schema = TypeDescription.createStruct() + .addField("timestamp", TypeDescription.createDate()) + .addField("col1", TypeDescription.createString()) + .addField("col2", TypeDescription.createList(TypeDescription.createString())) + .addField("val1", TypeDescription.createFloat()); + Configuration conf = new Configuration(); + Writer writer = OrcFile.createWriter( + new Path(testOrc.getPath()), + OrcFile.writerOptions(conf) + .setSchema(schema) + .stripeSize(100000) + .bufferSize(10000) + .compress(CompressionKind.ZLIB) + .version(OrcFile.Version.CURRENT) + ); + VectorizedRowBatch batch = schema.createRowBatch(); + batch.size = 1; + DateTime ts = DateTimes.of(timestamp); + + // date is stored as long column vector with number of days since epoch + ((LongColumnVector) batch.cols[0]).vector[0] = + TimeUnit.MILLISECONDS.toDays(ts.minus(DateTimes.EPOCH.getMillis()).getMillis()); + + ((BytesColumnVector) batch.cols[1]).setRef(0, StringUtils.toUtf8(col1), 0, col1.length()); + + ListColumnVector listColumnVector = (ListColumnVector) batch.cols[2]; + listColumnVector.childCount = col2.length; + listColumnVector.lengths[0] = 3; + for (int idx = 0; idx < col2.length; idx++) { + ((BytesColumnVector) listColumnVector.child).setRef( + idx, + StringUtils.toUtf8(col2[idx]), + 0, + col2[idx].length() + ); + } + + ((DoubleColumnVector) batch.cols[3]).vector[0] = val1; + writer.addRowBatch(batch); + writer.close(); + + return testOrc; + } }