diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index a591122bfd..c6f7766896 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -761,7 +761,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { // complete the SQL statements by adding ?'s for all of the values to be escaped. sqlBuilder.append(") VALUES ("); - sqlBuilder.append(StringUtils.repeat("?", ",", fieldCount)); + sqlBuilder.append(StringUtils.repeat("?", ",", includedColumns.size())); sqlBuilder.append(")"); if (fieldsFound.get() == 0) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy index c31cdd43af..ebf846025c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy @@ -37,6 +37,7 @@ import org.junit.runners.JUnit4 import java.sql.Connection import java.sql.DriverManager import java.sql.ResultSet +import java.sql.SQLDataException import java.sql.SQLException import java.sql.SQLNonTransientConnectionException import java.sql.Statement @@ -109,7 +110,8 @@ class TestPutDatabaseRecord { final List fields = [new RecordField('id', RecordFieldType.INT.dataType), new RecordField('name', RecordFieldType.STRING.dataType), - new RecordField('code', RecordFieldType.INT.dataType)] + new RecordField('code', RecordFieldType.INT.dataType), + new RecordField('non_existing', RecordFieldType.BOOLEAN.dataType)] def schema = [ getFields : {fields}, @@ -133,8 +135,8 @@ class TestPutDatabaseRecord { ] as PutDatabaseRecord.TableSchema runner.setProperty(PutDatabaseRecord.TRANSLATE_FIELD_NAMES, 'false') - runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, 'false') - runner.setProperty(PutDatabaseRecord.UNMATCHED_COLUMN_BEHAVIOR, 'false') + runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_FIELD) + runner.setProperty(PutDatabaseRecord.UNMATCHED_COLUMN_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_COLUMN) runner.setProperty(PutDatabaseRecord.QUOTED_IDENTIFIERS, 'false') runner.setProperty(PutDatabaseRecord.QUOTED_TABLE_IDENTIFIER, 'false') def settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext()) @@ -144,11 +146,75 @@ class TestPutDatabaseRecord { assertEquals('INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)', generateInsert(schema, 'PERSONS', tableSchema, settings).sql) + assertEquals('UPDATE PERSONS SET name = ?, code = ? WHERE id = ?', + generateUpdate(schema, 'PERSONS', null, tableSchema, settings).sql) + assertEquals('DELETE FROM PERSONS WHERE (id = ? OR (id is null AND ? is null)) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))', generateDelete(schema, 'PERSONS', tableSchema, settings).sql) } } + @Test + void testGeneratePreparedStatementsFailUnmatchedField() throws Exception { + + final List fields = [new RecordField('id', RecordFieldType.INT.dataType), + new RecordField('name', RecordFieldType.STRING.dataType), + new RecordField('code', RecordFieldType.INT.dataType), + new RecordField('non_existing', RecordFieldType.BOOLEAN.dataType)] + + def schema = [ + getFields : {fields}, + getFieldCount: {fields.size()}, + getField : {int index -> fields[index]}, + getDataTypes : {fields.collect {it.dataType}}, + getFieldNames: {fields.collect {it.fieldName}}, + getDataType : {fieldName -> fields.find {it.fieldName == fieldName}.dataType} + ] as RecordSchema + + def tableSchema = [ + [ + new PutDatabaseRecord.ColumnDescription('id', 4, true, 2), + new PutDatabaseRecord.ColumnDescription('name', 12, true, 255), + new PutDatabaseRecord.ColumnDescription('code', 4, true, 10) + ], + false, + ['id'] as Set, + '' + + ] as PutDatabaseRecord.TableSchema + + runner.setProperty(PutDatabaseRecord.TRANSLATE_FIELD_NAMES, 'false') + runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, PutDatabaseRecord.FAIL_UNMATCHED_FIELD) + runner.setProperty(PutDatabaseRecord.UNMATCHED_COLUMN_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_COLUMN) + runner.setProperty(PutDatabaseRecord.QUOTED_IDENTIFIERS, 'false') + runner.setProperty(PutDatabaseRecord.QUOTED_TABLE_IDENTIFIER, 'false') + def settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext()) + + processor.with { + + try { + generateInsert(schema, 'PERSONS', tableSchema, settings) + fail('generateInsert should fail with unmatched fields') + } catch (SQLDataException e) { + assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage()) + } + + try { + generateUpdate(schema, 'PERSONS', null, tableSchema, settings) + fail('generateUpdate should fail with unmatched fields') + } catch (SQLDataException e) { + assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage()) + } + + try { + generateDelete(schema, 'PERSONS', tableSchema, settings) + fail('generateDelete should fail with unmatched fields') + } catch (SQLDataException e) { + assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage()) + } + } + } + @Test void testInsert() throws InitializationException, ProcessException, SQLException, IOException { recreateTable("PERSONS", createPersons)