NIFI-8442 Add a new test with date, timestamp and time as string & New management of date, time and timestamp

NIFI-8442 Put DateTimeFormatter as static and Add comments to explain why ZoneOffset.UTC is required

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5014.
This commit is contained in:
Juldrixx 2021-07-19 15:07:09 -05:00 committed by Pierre Villard
parent 0ba9f0dc21
commit 0c96e573a4
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
6 changed files with 336 additions and 4 deletions

View File

@ -126,6 +126,9 @@
<exclude>src/test/resources/mock-gcp-application-default-credentials.json</exclude>
<exclude>src/test/resources/bigquery/streaming-bad-data.json</exclude>
<exclude>src/test/resources/bigquery/streaming-correct-data.json</exclude>
<exclude>src/test/resources/bigquery/schema-correct-data-with-date.avsc</exclude>
<exclude>src/test/resources/bigquery/streaming-correct-data-with-date.json</exclude>
<exclude>src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -18,10 +18,17 @@
package org.apache.nifi.processors.gcp.bigquery;
import java.io.InputStream;
import java.util.ArrayList;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
@ -93,6 +100,9 @@ public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
.defaultValue("false")
.build();
private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS");
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.<PropertyDescriptor> builder()
@ -191,6 +201,18 @@ public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
lmapr.add(convertMapRecord(((MapRecord) mapr).toMap()));
}
result.put(key, lmapr);
} else if (obj instanceof Timestamp) {
// ZoneOffset.UTC time zone is necessary due to implicit time zone conversion in Record Readers from
// the local system time zone to the GMT time zone
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Timestamp) obj).getTime()), ZoneOffset.UTC);
result.put(key, dateTime.format(timestampFormatter));
} else if (obj instanceof Time) {
// ZoneOffset.UTC time zone is necessary due to implicit time zone conversion in Record Readers from
// the local system time zone to the GMT time zone
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Time) obj).getTime()), ZoneOffset.UTC);
result.put(key, dateTime.format(timeFormatter) );
} else if (obj instanceof Date) {
result.put(key, obj.toString());
} else {
result.put(key, obj);
}

View File

@ -21,12 +21,18 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Iterator;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.DateTimeUtils;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
@ -70,8 +76,13 @@ public class PutBigQueryStreamingIT extends AbstractBigQueryIT {
Field company = Field.newBuilder("company", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field job = Field.newBuilder("job", LegacySQLTypeName.RECORD, position, company).setMode(Mode.NULLABLE).build();
Field date = Field.newBuilder("date", LegacySQLTypeName.DATE).setMode(Mode.NULLABLE).build();
Field time = Field.newBuilder("time", LegacySQLTypeName.TIME).setMode(Mode.NULLABLE).build();
Field full = Field.newBuilder("full", LegacySQLTypeName.TIMESTAMP).setMode(Mode.NULLABLE).build();
Field birth = Field.newBuilder("birth", LegacySQLTypeName.RECORD, date, time, full).setMode(Mode.NULLABLE).build();
// Table schema definition
schema = Schema.of(id, name, alias, addresses, job);
schema = Schema.of(id, name, alias, addresses, job, birth);
TableDefinition tableDefinition = StandardTableDefinition.of(schema);
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
@ -181,4 +192,116 @@ public class PutBigQueryStreamingIT extends AbstractBigQueryIT {
deleteTable(tableName);
}
@Test
public void PutBigQueryStreamingNoErrorWithDate() throws Exception {
String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
createTable(tableName);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/schema-correct-data-with-date.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema);
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data-with-date.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
FieldValueList sndElt = iterator.next();
assertTrue(firstElt.get("name").getStringValue().endsWith("Doe"));
assertTrue(sndElt.get("name").getStringValue().endsWith("Doe"));
FieldValueList john;
FieldValueList jane;
john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt;
jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt;
assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director");
assertTrue(john.get("alias").getRepeatedValue().size() == 2);
assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000"));
long timestampRecordJohn = LocalDateTime.parse("07-18-2021 12:35:24",
DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
assertEquals(john.get("birth").getRecordValue().get(0).getStringValue(), "2021-07-18");
assertEquals(john.get("birth").getRecordValue().get(1).getStringValue(), "12:35:24");
assertEquals((john.get("birth").getRecordValue().get(2).getTimestampValue() / 1000), timestampRecordJohn);
long timestampRecordJane = LocalDateTime.parse("01-01-1992 00:00:00",
DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
assertEquals(jane.get("birth").getRecordValue().get(0).getStringValue(), "1992-01-01");
assertEquals(jane.get("birth").getRecordValue().get(1).getStringValue(), "00:00:00");
assertEquals((jane.get("birth").getRecordValue().get(2).getTimestampValue() / 1000) , timestampRecordJane);
deleteTable(tableName);
}
@Test
public void PutBigQueryStreamingNoErrorWithDateFormat() throws Exception {
String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
createTable(tableName);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/schema-correct-data-with-date.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema);
runner.setProperty(jsonReader, DateTimeUtils.DATE_FORMAT, "MM/dd/yyyy");
runner.setProperty(jsonReader, DateTimeUtils.TIME_FORMAT, "HH:mm:ss");
runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, "MM-dd-yyyy HH:mm:ss");
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
FieldValueList sndElt = iterator.next();
assertTrue(firstElt.get("name").getStringValue().endsWith("Doe"));
assertTrue(sndElt.get("name").getStringValue().endsWith("Doe"));
FieldValueList john;
FieldValueList jane;
john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt;
jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt;
assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director");
assertTrue(john.get("alias").getRepeatedValue().size() == 2);
assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000"));
long timestampRecordJohn = LocalDateTime.parse("07-18-2021 12:35:24",
DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
assertEquals(john.get("birth").getRecordValue().get(0).getStringValue(), "2021-07-18");
assertEquals(john.get("birth").getRecordValue().get(1).getStringValue(), "12:35:24");
assertEquals(john.get("birth").getRecordValue().get(2).getTimestampValue() / 1000, timestampRecordJohn);
long timestampRecordJane = LocalDateTime.parse("01-01-1992 00:00:00",
DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
assertEquals(jane.get("birth").getRecordValue().get(0).getStringValue(), "1992-01-01");
assertEquals(jane.get("birth").getRecordValue().get(1).getStringValue(), "00:00:00");
assertEquals(jane.get("birth").getRecordValue().get(2).getTimestampValue() / 1000, timestampRecordJane);
deleteTable(tableName);
}
}

View File

@ -0,0 +1,92 @@
{
"namespace": "nifi.example",
"name": "streaming_correct_data_with_schema",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": ["null", "string"]
},
{
"name": "alias",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "addresses",
"type": {
"type": "array",
"items": {
"namespace": "nifi.example.address",
"name": "address",
"type": "record",
"fields": [
{
"name": "zip",
"type": ["null", "string"]
},
{
"name": "city",
"type": ["null", "string"]
}
]
}
}
},
{
"name": "job",
"type": ["null", {
"namespace": "nifi.example.job",
"name": "job",
"type": "record",
"fields": [
{
"name": "position",
"type": ["null", "string"]
},
{
"name": "company",
"type": ["null", "string"]
}
]
} ]
},
{
"name": "birth",
"type": {
"namespace": "nifi.example.birth",
"name": "job",
"type": "record",
"fields": [
{
"name": "date",
"type": ["null", {
"type": "int",
"logicalType": "date"
} ]
},
{
"name": "time",
"type": ["null", {
"type": "int",
"logicalType": "time-millis"
} ]
},
{
"name": "full",
"type": ["null", {
"type": "long",
"logicalType": "timestamp-millis"
} ]
}
]
}
}
]
}

View File

@ -0,0 +1,46 @@
[
{
"id": 1,
"name": "John Doe",
"alias": ["john", "jd"],
"addresses": [
{
"zip": "1000",
"city": "NiFi"
},
{
"zip": "2000",
"city": "Bar"
}
],
"job": {
"position": "Manager",
"company": "ASF"
},
"birth": {
"date": "07/18/2021",
"time": "12:35:24",
"full": "07-18-2021 12:35:24"
}
},
{
"id": 2,
"name": "Jane Doe",
"alias": [],
"addresses": [
{
"zip": "1000",
"city": "NiFi"
}
],
"job": {
"position": "Director",
"company": "ASF"
},
"birth": {
"date": "01/01/1992",
"time": "00:00:00",
"full": "01-01-1992 00:00:00"
}
}
]

View File

@ -0,0 +1,46 @@
[
{
"id": 1,
"name": "John Doe",
"alias": ["john", "jd"],
"addresses": [
{
"zip": "1000",
"city": "NiFi"
},
{
"zip": "2000",
"city": "Bar"
}
],
"job": {
"position": "Manager",
"company": "ASF"
},
"birth": {
"date": 1626611724000,
"time": 1626611724000,
"full": 1626611724000
}
},
{
"id": 2,
"name": "Jane Doe",
"alias": [],
"addresses": [
{
"zip": "1000",
"city": "NiFi"
}
],
"job": {
"position": "Director",
"company": "ASF"
},
"birth": {
"date": 694224000000,
"time": 694224000000,
"full": 694224000000
}
}
]