NIFI-7996 Conversion with ConvertRecord to avro results in invalid date

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4666.
This commit is contained in:
Denes Arvay 2020-11-11 12:32:20 +01:00 committed by Pierre Villard
parent e66362194d
commit 83948db989
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 42 additions and 3 deletions

View File

@ -61,7 +61,7 @@ import java.sql.Blob;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDate;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.AbstractMap;
import java.util.ArrayList;
@ -666,8 +666,7 @@ public class AvroTypeUtil {
if (LOGICAL_TYPE_DATE.equals(logicalType.getName())) {
final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat();
final java.sql.Date date = DataTypeUtils.toDate(rawValue, () -> DataTypeUtils.getDateFormat(format), fieldName);
final long days = ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0), date.toLocalDate());
return (int) days;
return (int) ChronoUnit.DAYS.between(Instant.EPOCH, Instant.ofEpochMilli(date.getTime()));
} else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalType.getName())) {
final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat();
final Time time = DataTypeUtils.toTime(rawValue, () -> DataTypeUtils.getDateFormat(format), fieldName);

View File

@ -18,6 +18,7 @@
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -29,8 +30,11 @@ import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.avro.AvroRecordSetWriter;
import org.apache.nifi.avro.NonCachingDatumReader;
import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.csv.CSVRecordSetWriter;
import org.apache.nifi.csv.CSVUtils;
@ -38,6 +42,7 @@ import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.DateTimeUtils;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
@ -368,4 +373,39 @@ public class TestConvertRecord {
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
}
@Test
public void testDateConversionWithUTCMinusTimezone() throws Exception {
final String timezone = System.getProperty("user.timezone");
System.setProperty("user.timezone", "EST");
try {
TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
JsonTreeReader jsonTreeReader = new JsonTreeReader();
runner.addControllerService("json-reader", jsonTreeReader);
runner.setProperty(jsonTreeReader, DateTimeUtils.DATE_FORMAT, "yyyy-MM-dd");
runner.enableControllerService(jsonTreeReader);
AvroRecordSetWriter avroWriter = new AvroRecordSetWriter();
runner.addControllerService("avro-writer", avroWriter);
runner.enableControllerService(avroWriter);
runner.setProperty(ConvertRecord.RECORD_READER, "json-reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "avro-writer");
runner.enqueue("{ \"date\": \"1970-01-02\" }");
runner.run();
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
DataFileStream<GenericRecord> avroStream = new DataFileStream<>(flowFile.getContentStream(), new NonCachingDatumReader<>());
assertTrue(avroStream.hasNext());
assertEquals(1, avroStream.next().get("date")); // see https://avro.apache.org/docs/1.10.0/spec.html#Date
} finally {
System.setProperty("user.timezone", timezone);
}
}
}