NIFI-8320: Fix column mismatch in PutDatabaseRecord

This closes #5024

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matthew Burgess 2021-04-22 16:47:16 -04:00 committed by exceptionfactory
parent e1c99e3a5c
commit 3963f66dff
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
3 changed files with 304 additions and 4 deletions

View File

@ -691,15 +691,30 @@ public class PutDatabaseRecord extends AbstractProcessor {
final Object[] values = currentRecord.getValues(); final Object[] values = currentRecord.getValues();
final List<DataType> dataTypes = currentRecord.getSchema().getDataTypes(); final List<DataType> dataTypes = currentRecord.getSchema().getDataTypes();
List<ColumnDescription> columns = tableSchema.getColumnsAsList(); final RecordSchema recordSchema = currentRecord.getSchema();
final Map<String, ColumnDescription> columns = tableSchema.getColumns();
for (int i = 0; i < fieldIndexes.size(); i++) { for (int i = 0; i < fieldIndexes.size(); i++) {
final int currentFieldIndex = fieldIndexes.get(i); final int currentFieldIndex = fieldIndexes.get(i);
Object currentValue = values[currentFieldIndex]; Object currentValue = values[currentFieldIndex];
final DataType dataType = dataTypes.get(currentFieldIndex); final DataType dataType = dataTypes.get(currentFieldIndex);
final int fieldSqlType = DataTypeUtils.getSQLTypeValue(dataType); final int fieldSqlType = DataTypeUtils.getSQLTypeValue(dataType);
final ColumnDescription column = columns.get(currentFieldIndex); final String fieldName = recordSchema.getField(currentFieldIndex).getFieldName();
int sqlType = column.dataType; String columnName = normalizeColumnName(fieldName, settings.translateFieldNames);
int sqlType;
final ColumnDescription column = columns.get(columnName);
// 'column' should not be null here as the fieldIndexes should correspond to fields that match table columns, but better to handle just in case
if (column == null) {
if (!settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", columns.keySet()));
} else {
sqlType = fieldSqlType;
}
} else {
sqlType = column.dataType;
}
// Convert (if necessary) from field data type to column data type // Convert (if necessary) from field data type to column data type
if (fieldSqlType != sqlType) { if (fieldSqlType != sqlType) {
@ -709,7 +724,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
currentValue = DataTypeUtils.convertType( currentValue = DataTypeUtils.convertType(
currentValue, currentValue,
targetDataType, targetDataType,
currentRecord.getSchema().getField(currentFieldIndex).getFieldName()); fieldName);
} }
} catch (IllegalTypeConversionException itce) { } catch (IllegalTypeConversionException itce) {
// If the field and column types don't match or the value can't otherwise be converted to the column datatype, // If the field and column types don't match or the value can't otherwise be converted to the column datatype,

View File

@ -301,6 +301,73 @@ class TestPutDatabaseRecord {
conn.close() conn.close()
} }
@Test
void testInsertNonRequiredColumns() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser)
runner.enableControllerService(parser)
parser.addSchemaField("id", RecordFieldType.INT)
parser.addSchemaField("name", RecordFieldType.STRING)
parser.addSchemaField("dt", RecordFieldType.DATE)
LocalDate testDate1 = LocalDate.of(2021, 1, 26)
Date nifiDate1 = new Date(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()) // in UTC
Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
LocalDate testDate2 = LocalDate.of(2021, 7, 26)
Date nifiDate2 = new Date(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()) // in URC
Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ
parser.addRecord(1, 'rec1', nifiDate1)
parser.addRecord(2, 'rec2', nifiDate2)
parser.addRecord(3, 'rec3', null)
parser.addRecord(4, 'rec4', null)
parser.addRecord(5, null, null)
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
runner.enqueue(new byte[0])
runner.run()
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
final Connection conn = dbcp.getConnection()
final Statement stmt = conn.createStatement()
final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
assertTrue(rs.next())
assertEquals(1, rs.getInt(1))
assertEquals('rec1', rs.getString(2))
// Zero value because of the constraint
assertEquals(0, rs.getInt(3))
assertEquals(jdbcDate1, rs.getDate(4))
assertTrue(rs.next())
assertEquals(2, rs.getInt(1))
assertEquals('rec2', rs.getString(2))
assertEquals(0, rs.getInt(3))
assertEquals(jdbcDate2, rs.getDate(4))
assertTrue(rs.next())
assertEquals(3, rs.getInt(1))
assertEquals('rec3', rs.getString(2))
assertEquals(0, rs.getInt(3))
assertNull(rs.getDate(4))
assertTrue(rs.next())
assertEquals(4, rs.getInt(1))
assertEquals('rec4', rs.getString(2))
assertEquals(0, rs.getInt(3))
assertNull(rs.getDate(4))
assertTrue(rs.next())
assertEquals(5, rs.getInt(1))
assertNull(rs.getString(2))
assertEquals(0, rs.getInt(3))
assertNull(rs.getDate(4))
assertFalse(rs.next())
stmt.close()
conn.close()
}
@Test @Test
void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException, IOException { void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable(createPersons) recreateTable(createPersons)
@ -1337,4 +1404,51 @@ class TestPutDatabaseRecord {
stmt.close() stmt.close()
conn.close() conn.close()
} }
@Test
void testInsertWithDifferentColumnOrdering() throws InitializationException, ProcessException, SQLException, IOException {
// Manually create and drop the tables and schemas
def conn = dbcp.connection
def stmt = conn.createStatement()
try {
stmt.execute('DROP TABLE TEMP')
} catch(ex) {
// Do nothing, table may not exist
}
stmt.execute('CREATE TABLE TEMP (id integer primary key, code integer, name long varchar)')
final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser)
runner.enableControllerService(parser)
parser.addSchemaField("name", RecordFieldType.STRING)
parser.addSchemaField("id", RecordFieldType.INT)
parser.addSchemaField("code", RecordFieldType.INT)
// change order of columns
parser.addRecord('rec1', 1, 101)
parser.addRecord('rec2', 2, 102)
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'TEMP')
runner.enqueue(new byte[0])
runner.run()
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
ResultSet rs = stmt.executeQuery('SELECT * FROM TEMP')
assertTrue(rs.next())
assertEquals(1, rs.getInt(1))
assertEquals(101, rs.getInt(2))
assertEquals('rec1', rs.getString(3))
assertTrue(rs.next())
assertEquals(2, rs.getInt(1))
assertEquals(102, rs.getInt(2))
assertEquals('rec2', rs.getString(3))
assertFalse(rs.next())
stmt.close()
conn.close()
}
} }

View File

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.SQLNonTransientConnectionException;
import java.sql.Statement;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Mockito.spy;
public class PutDatabaseRecordTest {
private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," +
" code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)";
private static final String createPersonsSchema1 = "CREATE TABLE SCHEMA1.PERSONS (id integer primary key, name varchar(100)," +
" code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)";
private static final String createPersonsSchema2 = "CREATE TABLE SCHEMA2.PERSONS (id2 integer primary key, name varchar(100)," +
" code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)";
private final static String DB_LOCATION = "target/db_pdr";
TestRunner runner;
PutDatabaseRecord processor;
DBCPServiceSimpleImpl dbcp;
@BeforeClass
public static void setupBeforeClass() throws IOException {
System.setProperty("derby.stream.error.file", "target/derby.log");
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
try {
FileUtils.deleteFile(dbLocation, true);
} catch (IOException ignore) {
// Do nothing, may not have existed
}
}
@AfterClass
public static void cleanUpAfterClass() throws Exception {
try {
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
} catch (SQLNonTransientConnectionException ignore) {
// Do nothing, this is what happens at Derby shutdown
}
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
try {
FileUtils.deleteFile(dbLocation, true);
} catch (IOException ignore) {
// Do nothing, may not have existed
}
}
@Before
public void setUp() throws Exception {
processor = new PutDatabaseRecord();
//Mock the DBCP Controller Service so we can control the Results
dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION));
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
}
@Test
public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationException, ProcessException, SQLException, IOException {
// Need to override the @Before method with a new processor that behaves badly
processor = new PutDatabaseRecordUnmatchedField();
//Mock the DBCP Controller Service so we can control the Results
dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION));
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("extra", RecordFieldType.STRING);
parser.addSchemaField("dt", RecordFieldType.DATE);
LocalDate testDate1 = LocalDate.of(2021, 1, 26);
Date nifiDate1 = new Date(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); // in UTC
Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
LocalDate testDate2 = LocalDate.of(2021, 7, 26);
Date nifiDate2 = new Date(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); // in URC
Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ
parser.addRecord(1, "rec1", "test", nifiDate1);
parser.addRecord(2, "rec2", "test", nifiDate2);
parser.addRecord(3, "rec3", "test", null);
parser.addRecord(4, "rec4", "test", null);
parser.addRecord(5, null, null, null);
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, PutDatabaseRecord.FAIL_UNMATCHED_FIELD);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0);
runner.assertTransferCount(PutDatabaseRecord.REL_RETRY, 0);
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1);
}
private void recreateTable(String createSQL) throws ProcessException, SQLException {
try (final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement()) {
stmt.execute("drop table PERSONS");
stmt.execute(createSQL);
} catch (SQLException ignore) {
// Do nothing, may not have existed
}
}
static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord {
@Override
SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName, TableSchema tableSchema, DMLSettings settings) throws IllegalArgumentException, SQLException {
return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES (?,?,?,?)", Arrays.asList(0,1,2,3));
}
}
}