From d08f02428d6313b4acbe4b1b43b238a74addec0c Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Wed, 10 Feb 2021 17:42:49 -0500 Subject: [PATCH] NIFI-8223: This closes #4819. Use column datatype in PutDatabaseRecord when calling setObject() Signed-off-by: Joe Witt --- .../record/util/DataTypeUtils.java | 43 +++++++ .../standard/PutDatabaseRecord.java | 27 ++++- .../standard/TestPutDatabaseRecord.groovy | 113 +++++++++++++++++- 3 files changed, 178 insertions(+), 5 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 2bfa6cffd2..377a659dd1 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -1934,6 +1934,49 @@ public class DataTypeUtils { } } + /** + * Converts the specified java.sql.Types constant field data type (INTEGER = 4, e.g.) into a DataType + * + * @param sqlType the DataType to be converted + * @return the SQL type corresponding to the specified RecordFieldType + */ + public static DataType getDataTypeFromSQLTypeValue(final int sqlType) { + switch (sqlType) { + case Types.BIGINT: + return RecordFieldType.BIGINT.getDataType(); + case Types.BOOLEAN: + return RecordFieldType.BOOLEAN.getDataType(); + case Types.TINYINT: + return RecordFieldType.BYTE.getDataType(); + case Types.CHAR: + return RecordFieldType.CHAR.getDataType(); + case Types.DATE: + return RecordFieldType.DATE.getDataType(); + case Types.DOUBLE: + return RecordFieldType.DOUBLE.getDataType(); + case Types.FLOAT: + return RecordFieldType.FLOAT.getDataType(); + case Types.NUMERIC: + return RecordFieldType.DECIMAL.getDataType(); + case Types.INTEGER: + return RecordFieldType.INT.getDataType(); + case Types.SMALLINT: + return RecordFieldType.SHORT.getDataType(); + case Types.VARCHAR: + return RecordFieldType.STRING.getDataType(); + case Types.TIME: + return RecordFieldType.TIME.getDataType(); + case Types.TIMESTAMP: + return RecordFieldType.TIMESTAMP.getDataType(); + case Types.ARRAY: + return RecordFieldType.ARRAY.getDataType(); + case Types.STRUCT: + return RecordFieldType.RECORD.getDataType(); + default: + return null; + } + } + public static boolean isScalarValue(final DataType dataType, final Object value) { final RecordFieldType fieldType = dataType.getFieldType(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 5ef883a66c..d007d183a6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -57,6 +57,7 @@ import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; import java.io.IOException; import java.io.InputStream; @@ -78,6 +79,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.ServiceLoader; @@ -689,12 +691,29 @@ public class PutDatabaseRecord extends AbstractProcessor { final Object[] values = currentRecord.getValues(); final List dataTypes = currentRecord.getSchema().getDataTypes(); + List columns = tableSchema.getColumnsAsList(); for (int i = 0; i < fieldIndexes.size(); i++) { final int currentFieldIndex = fieldIndexes.get(i); Object currentValue = values[currentFieldIndex]; final DataType dataType = dataTypes.get(currentFieldIndex); - final int sqlType = DataTypeUtils.getSQLTypeValue(dataType); + final int fieldSqlType = DataTypeUtils.getSQLTypeValue(dataType); + final ColumnDescription column = columns.get(currentFieldIndex); + int sqlType = column.dataType; + + // Convert (if necessary) from field data type to column data type + if (fieldSqlType != sqlType) { + try { + currentValue = DataTypeUtils.convertType( + currentValue, + DataTypeUtils.getDataTypeFromSQLTypeValue(sqlType), + currentRecord.getSchema().getField(currentFieldIndex).getFieldName()); + } catch (IllegalTypeConversionException itce) { + // If the field and column types don't match or the value can't otherwise be converted to the column datatype, + // try with the original object and field datatype + sqlType = DataTypeUtils.getSQLTypeValue(dataType); + } + } if (sqlType == Types.DATE && currentValue instanceof Date) { // convert Date from the internal UTC normalized form to local time zone needed by database drivers @@ -1266,7 +1285,7 @@ public class PutDatabaseRecord extends AbstractProcessor { private TableSchema(final List columnDescriptions, final boolean translateColumnNames, final Set primaryKeyColumnNames, final String quotedIdentifierString) { - this.columns = new HashMap<>(); + this.columns = new LinkedHashMap<>(); this.primaryKeyColumnNames = primaryKeyColumnNames; this.quotedIdentifierString = quotedIdentifierString; @@ -1283,6 +1302,10 @@ public class PutDatabaseRecord extends AbstractProcessor { return columns; } + public List getColumnsAsList() { + return new ArrayList<>(columns.values()); + } + public List getRequiredColumnNames() { return requiredColumnNames; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy index d252f5cec8..7e96d0d53a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy @@ -45,6 +45,7 @@ import java.sql.PreparedStatement import java.sql.ResultSet import java.sql.SQLDataException import java.sql.SQLException +import java.sql.SQLFeatureNotSupportedException import java.sql.SQLNonTransientConnectionException import java.sql.Statement import java.time.LocalDate @@ -216,21 +217,21 @@ class TestPutDatabaseRecord { generateInsert(schema, 'PERSONS', tableSchema, settings) fail('generateInsert should fail with unmatched fields') } catch (SQLDataException e) { - assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage()) + assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage()) } try { generateUpdate(schema, 'PERSONS', null, tableSchema, settings) fail('generateUpdate should fail with unmatched fields') } catch (SQLDataException e) { - assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage()) + assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage()) } try { generateDelete(schema, 'PERSONS', tableSchema, settings) fail('generateDelete should fail with unmatched fields') } catch (SQLDataException e) { - assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage()) + assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage()) } } } @@ -1190,4 +1191,110 @@ class TestPutDatabaseRecord { } } + + @Test + void testInsertMismatchedCompatibleDataTypes() throws InitializationException, ProcessException, SQLException, IOException { + recreateTable(createPersons) + final MockRecordParser parser = new MockRecordParser() + runner.addControllerService("parser", parser) + runner.enableControllerService(parser) + + parser.addSchemaField("id", RecordFieldType.INT) + parser.addSchemaField("name", RecordFieldType.STRING) + parser.addSchemaField("code", RecordFieldType.INT) + parser.addSchemaField("dt", RecordFieldType.BIGINT) + + LocalDate testDate1 = LocalDate.of(2021, 1, 26) + BigInteger nifiDate1 = testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC + Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ + LocalDate testDate2 = LocalDate.of(2021, 7, 26) + BigInteger nifiDate2 = testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC + Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ + + parser.addRecord(1, 'rec1', 101, nifiDate1) + parser.addRecord(2, 'rec2', 102, nifiDate2) + parser.addRecord(3, 'rec3', 103, null) + parser.addRecord(4, 'rec4', 104, null) + parser.addRecord(5, null, 105, null) + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE) + runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS') + + runner.enqueue(new byte[0]) + runner.run() + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1) + final Connection conn = dbcp.getConnection() + final Statement stmt = conn.createStatement() + final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS') + assertTrue(rs.next()) + assertEquals(1, rs.getInt(1)) + assertEquals('rec1', rs.getString(2)) + assertEquals(101, rs.getInt(3)) + assertEquals(jdbcDate1, rs.getDate(4)) + assertTrue(rs.next()) + assertEquals(2, rs.getInt(1)) + assertEquals('rec2', rs.getString(2)) + assertEquals(102, rs.getInt(3)) + assertEquals(jdbcDate2, rs.getDate(4)) + assertTrue(rs.next()) + assertEquals(3, rs.getInt(1)) + assertEquals('rec3', rs.getString(2)) + assertEquals(103, rs.getInt(3)) + assertNull(rs.getDate(4)) + assertTrue(rs.next()) + assertEquals(4, rs.getInt(1)) + assertEquals('rec4', rs.getString(2)) + assertEquals(104, rs.getInt(3)) + assertNull(rs.getDate(4)) + assertTrue(rs.next()) + assertEquals(5, rs.getInt(1)) + assertNull(rs.getString(2)) + assertEquals(105, rs.getInt(3)) + assertNull(rs.getDate(4)) + assertFalse(rs.next()) + + stmt.close() + conn.close() + } + + + @Test + void testInsertMismatchedNotCompatibleDataTypes() throws InitializationException, ProcessException, SQLException, IOException { + recreateTable(createPersons) + final MockRecordParser parser = new MockRecordParser() + runner.addControllerService("parser", parser) + runner.enableControllerService(parser) + + parser.addSchemaField("id", RecordFieldType.STRING) + parser.addSchemaField("name", RecordFieldType.STRING) + parser.addSchemaField("code", RecordFieldType.INT) + parser.addSchemaField("dt", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.FLOAT.getDataType()).getFieldType()); + + LocalDate testDate1 = LocalDate.of(2021, 1, 26) + BigInteger nifiDate1 = testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC + Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ + LocalDate testDate2 = LocalDate.of(2021, 7, 26) + BigInteger nifiDate2 = testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC + Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ + + parser.addRecord('1', 'rec1', 101, [1.0,2.0]) + parser.addRecord('2', 'rec2', 102, [3.0,4.0]) + parser.addRecord('3', 'rec3', 103, null) + parser.addRecord('4', 'rec4', 104, null) + parser.addRecord('5', null, 105, null) + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE) + runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS') + + runner.enqueue(new byte[0]) + runner.run() + + // A SQLFeatureNotSupportedException exception is expected from Derby when you try to put the data as an ARRAY + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0) + runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1) + + } }