NIFI-8223: This closes #4819. Use column datatype in PutDatabaseRecord when calling setObject()

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Matthew Burgess 2021-02-10 17:42:49 -05:00 committed by Joe Witt
parent 8057f8f6c5
commit d08f02428d
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
3 changed files with 178 additions and 5 deletions

View File

@ -1934,6 +1934,49 @@ public class DataTypeUtils {
}
}
/**
* Converts the specified java.sql.Types constant field data type (INTEGER = 4, e.g.) into a DataType
*
* @param sqlType the DataType to be converted
* @return the SQL type corresponding to the specified RecordFieldType
*/
public static DataType getDataTypeFromSQLTypeValue(final int sqlType) {
switch (sqlType) {
case Types.BIGINT:
return RecordFieldType.BIGINT.getDataType();
case Types.BOOLEAN:
return RecordFieldType.BOOLEAN.getDataType();
case Types.TINYINT:
return RecordFieldType.BYTE.getDataType();
case Types.CHAR:
return RecordFieldType.CHAR.getDataType();
case Types.DATE:
return RecordFieldType.DATE.getDataType();
case Types.DOUBLE:
return RecordFieldType.DOUBLE.getDataType();
case Types.FLOAT:
return RecordFieldType.FLOAT.getDataType();
case Types.NUMERIC:
return RecordFieldType.DECIMAL.getDataType();
case Types.INTEGER:
return RecordFieldType.INT.getDataType();
case Types.SMALLINT:
return RecordFieldType.SHORT.getDataType();
case Types.VARCHAR:
return RecordFieldType.STRING.getDataType();
case Types.TIME:
return RecordFieldType.TIME.getDataType();
case Types.TIMESTAMP:
return RecordFieldType.TIMESTAMP.getDataType();
case Types.ARRAY:
return RecordFieldType.ARRAY.getDataType();
case Types.STRUCT:
return RecordFieldType.RECORD.getDataType();
default:
return null;
}
}
public static boolean isScalarValue(final DataType dataType, final Object value) {
final RecordFieldType fieldType = dataType.getFieldType();

View File

@ -57,6 +57,7 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import java.io.IOException;
import java.io.InputStream;
@ -78,6 +79,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
@ -689,12 +691,29 @@ public class PutDatabaseRecord extends AbstractProcessor {
final Object[] values = currentRecord.getValues();
final List<DataType> dataTypes = currentRecord.getSchema().getDataTypes();
List<ColumnDescription> columns = tableSchema.getColumnsAsList();
for (int i = 0; i < fieldIndexes.size(); i++) {
final int currentFieldIndex = fieldIndexes.get(i);
Object currentValue = values[currentFieldIndex];
final DataType dataType = dataTypes.get(currentFieldIndex);
final int sqlType = DataTypeUtils.getSQLTypeValue(dataType);
final int fieldSqlType = DataTypeUtils.getSQLTypeValue(dataType);
final ColumnDescription column = columns.get(currentFieldIndex);
int sqlType = column.dataType;
// Convert (if necessary) from field data type to column data type
if (fieldSqlType != sqlType) {
try {
currentValue = DataTypeUtils.convertType(
currentValue,
DataTypeUtils.getDataTypeFromSQLTypeValue(sqlType),
currentRecord.getSchema().getField(currentFieldIndex).getFieldName());
} catch (IllegalTypeConversionException itce) {
// If the field and column types don't match or the value can't otherwise be converted to the column datatype,
// try with the original object and field datatype
sqlType = DataTypeUtils.getSQLTypeValue(dataType);
}
}
if (sqlType == Types.DATE && currentValue instanceof Date) {
// convert Date from the internal UTC normalized form to local time zone needed by database drivers
@ -1266,7 +1285,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
private TableSchema(final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
final Set<String> primaryKeyColumnNames, final String quotedIdentifierString) {
this.columns = new HashMap<>();
this.columns = new LinkedHashMap<>();
this.primaryKeyColumnNames = primaryKeyColumnNames;
this.quotedIdentifierString = quotedIdentifierString;
@ -1283,6 +1302,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
return columns;
}
public List<ColumnDescription> getColumnsAsList() {
return new ArrayList<>(columns.values());
}
public List<String> getRequiredColumnNames() {
return requiredColumnNames;
}

View File

@ -45,6 +45,7 @@ import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLDataException
import java.sql.SQLException
import java.sql.SQLFeatureNotSupportedException
import java.sql.SQLNonTransientConnectionException
import java.sql.Statement
import java.time.LocalDate
@ -216,21 +217,21 @@ class TestPutDatabaseRecord {
generateInsert(schema, 'PERSONS', tableSchema, settings)
fail('generateInsert should fail with unmatched fields')
} catch (SQLDataException e) {
assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage())
}
try {
generateUpdate(schema, 'PERSONS', null, tableSchema, settings)
fail('generateUpdate should fail with unmatched fields')
} catch (SQLDataException e) {
assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage())
}
try {
generateDelete(schema, 'PERSONS', tableSchema, settings)
fail('generateDelete should fail with unmatched fields')
} catch (SQLDataException e) {
assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage())
}
}
}
@ -1190,4 +1191,110 @@ class TestPutDatabaseRecord {
}
}
@Test
void testInsertMismatchedCompatibleDataTypes() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser)
runner.enableControllerService(parser)
parser.addSchemaField("id", RecordFieldType.INT)
parser.addSchemaField("name", RecordFieldType.STRING)
parser.addSchemaField("code", RecordFieldType.INT)
parser.addSchemaField("dt", RecordFieldType.BIGINT)
LocalDate testDate1 = LocalDate.of(2021, 1, 26)
BigInteger nifiDate1 = testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
LocalDate testDate2 = LocalDate.of(2021, 7, 26)
BigInteger nifiDate2 = testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ
parser.addRecord(1, 'rec1', 101, nifiDate1)
parser.addRecord(2, 'rec2', 102, nifiDate2)
parser.addRecord(3, 'rec3', 103, null)
parser.addRecord(4, 'rec4', 104, null)
parser.addRecord(5, null, 105, null)
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
runner.enqueue(new byte[0])
runner.run()
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
final Connection conn = dbcp.getConnection()
final Statement stmt = conn.createStatement()
final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
assertTrue(rs.next())
assertEquals(1, rs.getInt(1))
assertEquals('rec1', rs.getString(2))
assertEquals(101, rs.getInt(3))
assertEquals(jdbcDate1, rs.getDate(4))
assertTrue(rs.next())
assertEquals(2, rs.getInt(1))
assertEquals('rec2', rs.getString(2))
assertEquals(102, rs.getInt(3))
assertEquals(jdbcDate2, rs.getDate(4))
assertTrue(rs.next())
assertEquals(3, rs.getInt(1))
assertEquals('rec3', rs.getString(2))
assertEquals(103, rs.getInt(3))
assertNull(rs.getDate(4))
assertTrue(rs.next())
assertEquals(4, rs.getInt(1))
assertEquals('rec4', rs.getString(2))
assertEquals(104, rs.getInt(3))
assertNull(rs.getDate(4))
assertTrue(rs.next())
assertEquals(5, rs.getInt(1))
assertNull(rs.getString(2))
assertEquals(105, rs.getInt(3))
assertNull(rs.getDate(4))
assertFalse(rs.next())
stmt.close()
conn.close()
}
@Test
void testInsertMismatchedNotCompatibleDataTypes() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser)
runner.enableControllerService(parser)
parser.addSchemaField("id", RecordFieldType.STRING)
parser.addSchemaField("name", RecordFieldType.STRING)
parser.addSchemaField("code", RecordFieldType.INT)
parser.addSchemaField("dt", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.FLOAT.getDataType()).getFieldType());
LocalDate testDate1 = LocalDate.of(2021, 1, 26)
BigInteger nifiDate1 = testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
LocalDate testDate2 = LocalDate.of(2021, 7, 26)
BigInteger nifiDate2 = testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ
parser.addRecord('1', 'rec1', 101, [1.0,2.0])
parser.addRecord('2', 'rec2', 102, [3.0,4.0])
parser.addRecord('3', 'rec3', 103, null)
parser.addRecord('4', 'rec4', 104, null)
parser.addRecord('5', null, 105, null)
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
runner.enqueue(new byte[0])
runner.run()
// A SQLFeatureNotSupportedException exception is expected from Derby when you try to put the data as an ARRAY
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
}
}