diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 377a659dd1..61e50ff1fe 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -75,6 +75,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.regex.Pattern; + public class DataTypeUtils { private static final Logger logger = LoggerFactory.getLogger(DataTypeUtils.class); @@ -1963,6 +1964,12 @@ public class DataTypeUtils { case Types.SMALLINT: return RecordFieldType.SHORT.getDataType(); case Types.VARCHAR: + case Types.LONGNVARCHAR: + case Types.LONGVARCHAR: + case Types.NCHAR: + case Types.NVARCHAR: + case Types.OTHER: + case Types.SQLXML: return RecordFieldType.STRING.getDataType(); case Types.TIME: return RecordFieldType.TIME.getDataType(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index d007d183a6..65f0a7e386 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -704,10 +704,13 @@ public class PutDatabaseRecord extends AbstractProcessor { // Convert (if necessary) from field data type to column data type if (fieldSqlType != sqlType) { try { - currentValue = DataTypeUtils.convertType( - currentValue, - DataTypeUtils.getDataTypeFromSQLTypeValue(sqlType), - currentRecord.getSchema().getField(currentFieldIndex).getFieldName()); + DataType targetDataType = DataTypeUtils.getDataTypeFromSQLTypeValue(sqlType); + if (targetDataType != null) { + currentValue = DataTypeUtils.convertType( + currentValue, + targetDataType, + currentRecord.getSchema().getField(currentFieldIndex).getFieldName()); + } } catch (IllegalTypeConversionException itce) { // If the field and column types don't match or the value can't otherwise be converted to the column datatype, // try with the original object and field datatype diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy index 7e96d0d53a..67e3e67a6d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy @@ -45,7 +45,6 @@ import java.sql.PreparedStatement import java.sql.ResultSet import java.sql.SQLDataException import java.sql.SQLException -import java.sql.SQLFeatureNotSupportedException import java.sql.SQLNonTransientConnectionException import java.sql.Statement import java.time.LocalDate @@ -58,7 +57,6 @@ import static org.junit.Assert.assertNotNull import static org.junit.Assert.assertNull import static org.junit.Assert.assertTrue import static org.junit.Assert.fail - import static org.mockito.ArgumentMatchers.anyMap import static org.mockito.Mockito.doAnswer import static org.mockito.Mockito.spy @@ -1295,6 +1293,48 @@ class TestPutDatabaseRecord { // A SQLFeatureNotSupportedException exception is expected from Derby when you try to put the data as an ARRAY runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0) runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1) + } + @Test + void testLongVarchar() throws InitializationException, ProcessException, SQLException, IOException { + // Manually create and drop the tables and schemas + def conn = dbcp.connection + def stmt = conn.createStatement() + try { + stmt.execute('DROP TABLE TEMP') + } catch(ex) { + // Do nothing, table may not exist + } + stmt.execute('CREATE TABLE TEMP (id integer primary key, name long varchar)') + + final MockRecordParser parser = new MockRecordParser() + runner.addControllerService("parser", parser) + runner.enableControllerService(parser) + + parser.addSchemaField("id", RecordFieldType.INT) + parser.addSchemaField("name", RecordFieldType.STRING) + + parser.addRecord(1, 'rec1') + parser.addRecord(2, 'rec2') + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE) + runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'TEMP') + + runner.enqueue(new byte[0]) + runner.run() + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1) + ResultSet rs = stmt.executeQuery('SELECT * FROM TEMP') + assertTrue(rs.next()) + assertEquals(1, rs.getInt(1)) + assertEquals('rec1', rs.getString(2)) + assertTrue(rs.next()) + assertEquals(2, rs.getInt(1)) + assertEquals('rec2', rs.getString(2)) + assertFalse(rs.next()) + + stmt.close() + conn.close() } }