NIFI-10635: Fix handling of enums in PutDatabaseRecord

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #6518.
This commit is contained in:
Matthew Burgess 2022-10-12 14:17:50 -04:00 committed by Nathan Gough
parent 298dd2024e
commit a76abef270
4 changed files with 81 additions and 6 deletions

View File

@ -2131,6 +2131,8 @@ public class DataTypeUtils {
return Types.SMALLINT; return Types.SMALLINT;
case STRING: case STRING:
return Types.VARCHAR; return Types.VARCHAR;
case ENUM:
return Types.OTHER;
case TIME: case TIME:
return Types.TIME; return Types.TIME;
case TIMESTAMP: case TIMESTAMP:

View File

@ -863,7 +863,19 @@ public class PutDatabaseRecord extends AbstractProcessor {
} }
} else { } else {
try { try {
ps.setObject(index, value, sqlType); // If the specified field type is OTHER and the SQL type is VARCHAR, the conversion went ok as a string literal but try the OTHER type when setting the parameter. If an error occurs,
// try the normal way of using the sqlType
// This helps with PostgreSQL enums and possibly other scenarios
if (fieldSqlType == Types.OTHER && sqlType == Types.VARCHAR) {
try {
ps.setObject(index, value, fieldSqlType);
} catch (SQLException e) {
// Fall back to default setObject params
ps.setObject(index, value, sqlType);
}
} else {
ps.setObject(index, value, sqlType);
}
} catch (SQLException e) { } catch (SQLException e) {
throw new IOException("Unable to setObject() with value " + value + " at index " + index + " of type " + sqlType , e); throw new IOException("Unable to setObject() with value " + value + " at index " + index + " of type " + sqlType , e);
} }

View File

@ -20,18 +20,27 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
/** /**
* Simple implementation only for GenerateTableFetch processor testing. * Simple implementation only for DB processor testing.
*/ */
public class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { public class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
private String databaseLocation; private String databaseLocation;
private boolean isDerby;
// Default to use Derby connection
public DBCPServiceSimpleImpl(final String databaseLocation) { public DBCPServiceSimpleImpl(final String databaseLocation) {
this(databaseLocation, true);
}
public DBCPServiceSimpleImpl(final String databaseLocation, final boolean isDerby) {
this.databaseLocation = databaseLocation; this.databaseLocation = databaseLocation;
this.isDerby = isDerby;
} }
@Override @Override
@ -42,8 +51,16 @@ public class DBCPServiceSimpleImpl extends AbstractControllerService implements
@Override @Override
public Connection getConnection() throws ProcessException { public Connection getConnection() throws ProcessException {
try { try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); if (isDerby) {
return DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true"); Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
return DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true");
} else {
// Use H2
Path currentPath = Paths.get("");
String absolutePathPrefix = currentPath.toFile().getAbsolutePath();
String connectionString = "jdbc:h2:file:" + absolutePathPrefix + "/" + databaseLocation + ";DB_CLOSE_ON_EXIT=TRUE";
return DriverManager.getConnection(connectionString, "SA", "");
}
} catch (final Exception e) { } catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e); throw new ProcessException("getConnection failed: " + e);
} }

View File

@ -51,7 +51,6 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLDataException; import java.sql.SQLDataException;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLNonTransientConnectionException;
import java.sql.Statement; import java.sql.Statement;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.ZoneOffset; import java.time.ZoneOffset;
@ -115,7 +114,7 @@ public class PutDatabaseRecordTest {
public static void shutdownDatabase() throws Exception { public static void shutdownDatabase() throws Exception {
try { try {
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true"); DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
} catch (SQLNonTransientConnectionException ignore) { } catch (Exception ignore) {
// Do nothing, this is what happens at Derby shutdown // Do nothing, this is what happens at Derby shutdown
} }
// remove previous test database, if any // remove previous test database, if any
@ -1759,6 +1758,51 @@ public class PutDatabaseRecordTest {
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1); runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1);
} }
@Test
void testInsertEnum() throws InitializationException, ProcessException, SQLException, IOException {
dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION, false)); // Use H2
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, new HashMap<>());
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
try (Connection conn = dbcp.getConnection()) {
conn.createStatement().executeUpdate("DROP TABLE IF EXISTS ENUM_TEST");
}
recreateTable("CREATE TABLE IF NOT EXISTS ENUM_TEST (id integer primary key, suit ENUM('clubs', 'diamonds', 'hearts', 'spades'))");
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("suit", RecordFieldType.ENUM.getEnumDataType(Arrays.asList("clubs", "diamonds", "hearts", "spades")).getFieldType());
parser.addRecord(1, "diamonds");
parser.addRecord(2, "hearts");
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "ENUM_TEST");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery("SELECT * FROM ENUM_TEST");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("diamonds", rs.getString(2));
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertEquals("hearts", rs.getString(2));
assertFalse(rs.next());
stmt.close();
conn.close();
}
private void recreateTable() throws ProcessException { private void recreateTable() throws ProcessException {
try (final Connection conn = dbcp.getConnection(); try (final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement()) { final Statement stmt = conn.createStatement()) {