mirror of https://github.com/apache/nifi.git
NIFI-4228: Fix PutDatabaseRecord to ignore unmatched fields
- Unmatched fields were ignored, but the number of prepared statement place holders were not correct. - Added unit test code for generateUpdate. - Added unit test code with "Ignore Unmatched Columns". Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #2165.
This commit is contained in:
parent
90ed08ec33
commit
7b07eb0577
|
@ -761,7 +761,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
|
||||||
|
|
||||||
// complete the SQL statements by adding ?'s for all of the values to be escaped.
|
// complete the SQL statements by adding ?'s for all of the values to be escaped.
|
||||||
sqlBuilder.append(") VALUES (");
|
sqlBuilder.append(") VALUES (");
|
||||||
sqlBuilder.append(StringUtils.repeat("?", ",", fieldCount));
|
sqlBuilder.append(StringUtils.repeat("?", ",", includedColumns.size()));
|
||||||
sqlBuilder.append(")");
|
sqlBuilder.append(")");
|
||||||
|
|
||||||
if (fieldsFound.get() == 0) {
|
if (fieldsFound.get() == 0) {
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.junit.runners.JUnit4
|
||||||
import java.sql.Connection
|
import java.sql.Connection
|
||||||
import java.sql.DriverManager
|
import java.sql.DriverManager
|
||||||
import java.sql.ResultSet
|
import java.sql.ResultSet
|
||||||
|
import java.sql.SQLDataException
|
||||||
import java.sql.SQLException
|
import java.sql.SQLException
|
||||||
import java.sql.SQLNonTransientConnectionException
|
import java.sql.SQLNonTransientConnectionException
|
||||||
import java.sql.Statement
|
import java.sql.Statement
|
||||||
|
@ -109,7 +110,8 @@ class TestPutDatabaseRecord {
|
||||||
|
|
||||||
final List<RecordField> fields = [new RecordField('id', RecordFieldType.INT.dataType),
|
final List<RecordField> fields = [new RecordField('id', RecordFieldType.INT.dataType),
|
||||||
new RecordField('name', RecordFieldType.STRING.dataType),
|
new RecordField('name', RecordFieldType.STRING.dataType),
|
||||||
new RecordField('code', RecordFieldType.INT.dataType)]
|
new RecordField('code', RecordFieldType.INT.dataType),
|
||||||
|
new RecordField('non_existing', RecordFieldType.BOOLEAN.dataType)]
|
||||||
|
|
||||||
def schema = [
|
def schema = [
|
||||||
getFields : {fields},
|
getFields : {fields},
|
||||||
|
@ -133,8 +135,8 @@ class TestPutDatabaseRecord {
|
||||||
] as PutDatabaseRecord.TableSchema
|
] as PutDatabaseRecord.TableSchema
|
||||||
|
|
||||||
runner.setProperty(PutDatabaseRecord.TRANSLATE_FIELD_NAMES, 'false')
|
runner.setProperty(PutDatabaseRecord.TRANSLATE_FIELD_NAMES, 'false')
|
||||||
runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, 'false')
|
runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_FIELD)
|
||||||
runner.setProperty(PutDatabaseRecord.UNMATCHED_COLUMN_BEHAVIOR, 'false')
|
runner.setProperty(PutDatabaseRecord.UNMATCHED_COLUMN_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_COLUMN)
|
||||||
runner.setProperty(PutDatabaseRecord.QUOTED_IDENTIFIERS, 'false')
|
runner.setProperty(PutDatabaseRecord.QUOTED_IDENTIFIERS, 'false')
|
||||||
runner.setProperty(PutDatabaseRecord.QUOTED_TABLE_IDENTIFIER, 'false')
|
runner.setProperty(PutDatabaseRecord.QUOTED_TABLE_IDENTIFIER, 'false')
|
||||||
def settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext())
|
def settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext())
|
||||||
|
@ -144,11 +146,75 @@ class TestPutDatabaseRecord {
|
||||||
assertEquals('INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)',
|
assertEquals('INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)',
|
||||||
generateInsert(schema, 'PERSONS', tableSchema, settings).sql)
|
generateInsert(schema, 'PERSONS', tableSchema, settings).sql)
|
||||||
|
|
||||||
|
assertEquals('UPDATE PERSONS SET name = ?, code = ? WHERE id = ?',
|
||||||
|
generateUpdate(schema, 'PERSONS', null, tableSchema, settings).sql)
|
||||||
|
|
||||||
assertEquals('DELETE FROM PERSONS WHERE (id = ? OR (id is null AND ? is null)) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))',
|
assertEquals('DELETE FROM PERSONS WHERE (id = ? OR (id is null AND ? is null)) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))',
|
||||||
generateDelete(schema, 'PERSONS', tableSchema, settings).sql)
|
generateDelete(schema, 'PERSONS', tableSchema, settings).sql)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGeneratePreparedStatementsFailUnmatchedField() throws Exception {
|
||||||
|
|
||||||
|
final List<RecordField> fields = [new RecordField('id', RecordFieldType.INT.dataType),
|
||||||
|
new RecordField('name', RecordFieldType.STRING.dataType),
|
||||||
|
new RecordField('code', RecordFieldType.INT.dataType),
|
||||||
|
new RecordField('non_existing', RecordFieldType.BOOLEAN.dataType)]
|
||||||
|
|
||||||
|
def schema = [
|
||||||
|
getFields : {fields},
|
||||||
|
getFieldCount: {fields.size()},
|
||||||
|
getField : {int index -> fields[index]},
|
||||||
|
getDataTypes : {fields.collect {it.dataType}},
|
||||||
|
getFieldNames: {fields.collect {it.fieldName}},
|
||||||
|
getDataType : {fieldName -> fields.find {it.fieldName == fieldName}.dataType}
|
||||||
|
] as RecordSchema
|
||||||
|
|
||||||
|
def tableSchema = [
|
||||||
|
[
|
||||||
|
new PutDatabaseRecord.ColumnDescription('id', 4, true, 2),
|
||||||
|
new PutDatabaseRecord.ColumnDescription('name', 12, true, 255),
|
||||||
|
new PutDatabaseRecord.ColumnDescription('code', 4, true, 10)
|
||||||
|
],
|
||||||
|
false,
|
||||||
|
['id'] as Set<String>,
|
||||||
|
''
|
||||||
|
|
||||||
|
] as PutDatabaseRecord.TableSchema
|
||||||
|
|
||||||
|
runner.setProperty(PutDatabaseRecord.TRANSLATE_FIELD_NAMES, 'false')
|
||||||
|
runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, PutDatabaseRecord.FAIL_UNMATCHED_FIELD)
|
||||||
|
runner.setProperty(PutDatabaseRecord.UNMATCHED_COLUMN_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_COLUMN)
|
||||||
|
runner.setProperty(PutDatabaseRecord.QUOTED_IDENTIFIERS, 'false')
|
||||||
|
runner.setProperty(PutDatabaseRecord.QUOTED_TABLE_IDENTIFIER, 'false')
|
||||||
|
def settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext())
|
||||||
|
|
||||||
|
processor.with {
|
||||||
|
|
||||||
|
try {
|
||||||
|
generateInsert(schema, 'PERSONS', tableSchema, settings)
|
||||||
|
fail('generateInsert should fail with unmatched fields')
|
||||||
|
} catch (SQLDataException e) {
|
||||||
|
assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage())
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
generateUpdate(schema, 'PERSONS', null, tableSchema, settings)
|
||||||
|
fail('generateUpdate should fail with unmatched fields')
|
||||||
|
} catch (SQLDataException e) {
|
||||||
|
assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage())
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
generateDelete(schema, 'PERSONS', tableSchema, settings)
|
||||||
|
fail('generateDelete should fail with unmatched fields')
|
||||||
|
} catch (SQLDataException e) {
|
||||||
|
assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testInsert() throws InitializationException, ProcessException, SQLException, IOException {
|
void testInsert() throws InitializationException, ProcessException, SQLException, IOException {
|
||||||
recreateTable("PERSONS", createPersons)
|
recreateTable("PERSONS", createPersons)
|
||||||
|
|
Loading…
Reference in New Issue