fix issue with parsing 'date' columns with druid-orc-extensions (#6380)

* fix issue with parsing date columns with druid-orc-extensions

* stuff

* fix forbidden api
This commit is contained in:
Clint Wylie 2018-09-26 13:18:15 -07:00 committed by Jonathan Wei
parent 6a909c85d0
commit 0f2dfe6fe8
3 changed files with 97 additions and 2 deletions

View File

@ -29,6 +29,7 @@
},
"dimensionsSpec": {
"dimensions": [
"timestamp",
"col1",
"col2"
],

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
@ -81,7 +82,10 @@ public class OrcHadoopInputRowParser implements InputRowParser<OrcStruct>
{
this.parseSpec = parseSpec;
this.typeString = typeString == null ? typeStringFromParseSpec(parseSpec) : typeString;
this.mapFieldNameFormat = mapFieldNameFormat == null || mapFieldNameFormat.indexOf(MAP_PARENT_TAG) < 0 || mapFieldNameFormat.indexOf(MAP_CHILD_TAG) < 0 ? DEFAULT_MAP_FIELD_NAME_FORMAT : mapFieldNameFormat;
this.mapFieldNameFormat =
mapFieldNameFormat == null ||
mapFieldNameFormat.indexOf(MAP_PARENT_TAG) < 0 ||
mapFieldNameFormat.indexOf(MAP_CHILD_TAG) < 0 ? DEFAULT_MAP_FIELD_NAME_FORMAT : mapFieldNameFormat;
this.mapParentFieldNameFormat = this.mapFieldNameFormat.replace(MAP_PARENT_TAG, "%s");
this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
this.oip = makeObjectInspector(this.typeString);
@ -226,9 +230,16 @@ public class OrcHadoopInputRowParser implements InputRowParser<OrcStruct>
{
StringBuilder builder = new StringBuilder("struct<");
builder.append(parseSpec.getTimestampSpec().getTimestampColumn()).append(":string");
// the typeString seems positionally dependent, so repeated timestamp column causes incorrect mapping
if (parseSpec.getDimensionsSpec().getDimensionNames().size() > 0) {
builder.append(",");
builder.append(String.join(":string,", parseSpec.getDimensionsSpec().getDimensionNames()));
builder.append(String.join(
":string,",
parseSpec.getDimensionsSpec()
.getDimensionNames()
.stream()
.filter(s -> !s.equals(parseSpec.getTimestampSpec().getTimestampColumn()))
.collect(Collectors.toList())));
builder.append(":string");
}
builder.append(">");
@ -241,6 +252,8 @@ public class OrcHadoopInputRowParser implements InputRowParser<OrcStruct>
if (object instanceof HiveDecimalWritable) {
// inspector on HiveDecimal rounds off to integer for some reason.
return ((HiveDecimalWritable) object).getHiveDecimal().doubleValue();
} else if (object instanceof DateWritable) {
return object.toString();
} else {
return inspector.getPrimitiveJavaObject(object);
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
@ -43,6 +44,7 @@ import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@ -52,6 +54,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class DruidOrcInputFormatTest
{
@ -110,6 +113,36 @@ public class DruidOrcInputFormatTest
reader.close();
}
@Test
public void testReadDateColumn() throws IOException, InterruptedException
{
File testFile2 = makeOrcFileWithDate();
Path path = new Path(testFile2.getAbsoluteFile().toURI());
FileSplit split = new FileSplit(path, 0, testFile2.length(), null);
InputFormat inputFormat = ReflectionUtils.newInstance(OrcNewInputFormat.class, job.getConfiguration());
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader reader = inputFormat.createRecordReader(split, context);
InputRowParser<OrcStruct> parser = (InputRowParser<OrcStruct>) config.getParser();
reader.initialize(split, context);
reader.nextKeyValue();
OrcStruct data = (OrcStruct) reader.getCurrentValue();
MapBasedInputRow row = (MapBasedInputRow) parser.parseBatch(data).get(0);
Assert.assertTrue(row.getEvent().keySet().size() == 4);
Assert.assertEquals(DateTimes.of(timestamp), row.getTimestamp());
Assert.assertEquals(parser.getParseSpec().getDimensionsSpec().getDimensionNames(), row.getDimensions());
Assert.assertEquals(col1, row.getEvent().get("col1"));
Assert.assertEquals(Arrays.asList(col2), row.getDimension("col2"));
reader.close();
}
private File makeOrcFile() throws IOException
{
final File dir = temporaryFolder.newFolder();
@ -157,4 +190,52 @@ public class DruidOrcInputFormatTest
return testOrc;
}
private File makeOrcFileWithDate() throws IOException
{
final File dir = temporaryFolder.newFolder();
final File testOrc = new File(dir, "test-2.orc");
TypeDescription schema = TypeDescription.createStruct()
.addField("timestamp", TypeDescription.createDate())
.addField("col1", TypeDescription.createString())
.addField("col2", TypeDescription.createList(TypeDescription.createString()))
.addField("val1", TypeDescription.createFloat());
Configuration conf = new Configuration();
Writer writer = OrcFile.createWriter(
new Path(testOrc.getPath()),
OrcFile.writerOptions(conf)
.setSchema(schema)
.stripeSize(100000)
.bufferSize(10000)
.compress(CompressionKind.ZLIB)
.version(OrcFile.Version.CURRENT)
);
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 1;
DateTime ts = DateTimes.of(timestamp);
// date is stored as long column vector with number of days since epoch
((LongColumnVector) batch.cols[0]).vector[0] =
TimeUnit.MILLISECONDS.toDays(ts.minus(DateTimes.EPOCH.getMillis()).getMillis());
((BytesColumnVector) batch.cols[1]).setRef(0, StringUtils.toUtf8(col1), 0, col1.length());
ListColumnVector listColumnVector = (ListColumnVector) batch.cols[2];
listColumnVector.childCount = col2.length;
listColumnVector.lengths[0] = 3;
for (int idx = 0; idx < col2.length; idx++) {
((BytesColumnVector) listColumnVector.child).setRef(
idx,
StringUtils.toUtf8(col2[idx]),
0,
col2[idx].length()
);
}
((DoubleColumnVector) batch.cols[3]).vector[0] = val1;
writer.addRowBatch(batch);
writer.close();
return testOrc;
}
}