diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 5000c78fe1..d6db0b78fa 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -2131,6 +2131,8 @@ public class DataTypeUtils { return Types.SMALLINT; case STRING: return Types.VARCHAR; + case ENUM: + return Types.OTHER; case TIME: return Types.TIME; case TIMESTAMP: diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 3df3c89388..bcb26ceca5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -863,7 +863,19 @@ public class PutDatabaseRecord extends AbstractProcessor { } } else { try { - ps.setObject(index, value, sqlType); + // If the specified field type is OTHER and the SQL type is VARCHAR, the conversion went ok as a string literal but try the OTHER type when setting the parameter. If an error occurs, + // try the normal way of using the sqlType + // This helps with PostgreSQL enums and possibly other scenarios + if (fieldSqlType == Types.OTHER && sqlType == Types.VARCHAR) { + try { + ps.setObject(index, value, fieldSqlType); + } catch (SQLException e) { + // Fall back to default setObject params + ps.setObject(index, value, sqlType); + } + } else { + ps.setObject(index, value, sqlType); + } } catch (SQLException e) { throw new IOException("Unable to setObject() with value " + value + " at index " + index + " of type " + sqlType , e); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/DBCPServiceSimpleImpl.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/DBCPServiceSimpleImpl.java index aa1e761097..d1337a7726 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/DBCPServiceSimpleImpl.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/DBCPServiceSimpleImpl.java @@ -20,18 +20,27 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.processor.exception.ProcessException; +import java.nio.file.Path; +import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; /** - * Simple implementation only for GenerateTableFetch processor testing. + * Simple implementation only for DB processor testing. */ public class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { private String databaseLocation; + private boolean isDerby; + // Default to use Derby connection public DBCPServiceSimpleImpl(final String databaseLocation) { + this(databaseLocation, true); + } + + public DBCPServiceSimpleImpl(final String databaseLocation, final boolean isDerby) { this.databaseLocation = databaseLocation; + this.isDerby = isDerby; } @Override @@ -42,8 +51,16 @@ public class DBCPServiceSimpleImpl extends AbstractControllerService implements @Override public Connection getConnection() throws ProcessException { try { - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - return DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true"); + if (isDerby) { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + return DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true"); + } else { + // Use H2 + Path currentPath = Paths.get(""); + String absolutePathPrefix = currentPath.toFile().getAbsolutePath(); + String connectionString = "jdbc:h2:file:" + absolutePathPrefix + "/" + databaseLocation + ";DB_CLOSE_ON_EXIT=TRUE"; + return DriverManager.getConnection(connectionString, "SA", ""); + } } catch (final Exception e) { throw new ProcessException("getConnection failed: " + e); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java index dfef1eacdb..bc88542531 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java @@ -51,7 +51,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLDataException; import java.sql.SQLException; -import java.sql.SQLNonTransientConnectionException; import java.sql.Statement; import java.time.LocalDate; import java.time.ZoneOffset; @@ -115,7 +114,7 @@ public class PutDatabaseRecordTest { public static void shutdownDatabase() throws Exception { try { DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true"); - } catch (SQLNonTransientConnectionException ignore) { + } catch (Exception ignore) { // Do nothing, this is what happens at Derby shutdown } // remove previous test database, if any @@ -1759,6 +1758,51 @@ public class PutDatabaseRecordTest { runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1); } + @Test + void testInsertEnum() throws InitializationException, ProcessException, SQLException, IOException { + dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION, false)); // Use H2 + runner = TestRunners.newTestRunner(processor); + runner.addControllerService("dbcp", dbcp, new HashMap<>()); + runner.enableControllerService(dbcp); + runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp"); + try (Connection conn = dbcp.getConnection()) { + conn.createStatement().executeUpdate("DROP TABLE IF EXISTS ENUM_TEST"); + } + recreateTable("CREATE TABLE IF NOT EXISTS ENUM_TEST (id integer primary key, suit ENUM('clubs', 'diamonds', 'hearts', 'spades'))"); + final MockRecordParser parser = new MockRecordParser(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + + parser.addSchemaField("id", RecordFieldType.INT); + parser.addSchemaField("suit", RecordFieldType.ENUM.getEnumDataType(Arrays.asList("clubs", "diamonds", "hearts", "spades")).getFieldType()); + + parser.addRecord(1, "diamonds"); + parser.addRecord(2, "hearts"); + + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); + runner.setProperty(PutDatabaseRecord.TABLE_NAME, "ENUM_TEST"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1); + final Connection conn = dbcp.getConnection(); + final Statement stmt = conn.createStatement(); + final ResultSet rs = stmt.executeQuery("SELECT * FROM ENUM_TEST"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("diamonds", rs.getString(2)); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("hearts", rs.getString(2)); + assertFalse(rs.next()); + + stmt.close(); + conn.close(); + } + private void recreateTable() throws ProcessException { try (final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement()) {