NIFI-8237: This closes #4835. Added missing SQL types to getDataTypeFromSQLTypeValue(), added defensive code

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Matthew Burgess 2021-02-22 16:10:54 -05:00 committed by Joe Witt
parent 9101160cbf
commit 200c04c6d0
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
3 changed files with 56 additions and 6 deletions

View File

@ -75,6 +75,7 @@ import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class DataTypeUtils { public class DataTypeUtils {
private static final Logger logger = LoggerFactory.getLogger(DataTypeUtils.class); private static final Logger logger = LoggerFactory.getLogger(DataTypeUtils.class);
@ -1963,6 +1964,12 @@ public class DataTypeUtils {
case Types.SMALLINT: case Types.SMALLINT:
return RecordFieldType.SHORT.getDataType(); return RecordFieldType.SHORT.getDataType();
case Types.VARCHAR: case Types.VARCHAR:
case Types.LONGNVARCHAR:
case Types.LONGVARCHAR:
case Types.NCHAR:
case Types.NVARCHAR:
case Types.OTHER:
case Types.SQLXML:
return RecordFieldType.STRING.getDataType(); return RecordFieldType.STRING.getDataType();
case Types.TIME: case Types.TIME:
return RecordFieldType.TIME.getDataType(); return RecordFieldType.TIME.getDataType();

View File

@ -704,10 +704,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
// Convert (if necessary) from field data type to column data type // Convert (if necessary) from field data type to column data type
if (fieldSqlType != sqlType) { if (fieldSqlType != sqlType) {
try { try {
currentValue = DataTypeUtils.convertType( DataType targetDataType = DataTypeUtils.getDataTypeFromSQLTypeValue(sqlType);
currentValue, if (targetDataType != null) {
DataTypeUtils.getDataTypeFromSQLTypeValue(sqlType), currentValue = DataTypeUtils.convertType(
currentRecord.getSchema().getField(currentFieldIndex).getFieldName()); currentValue,
targetDataType,
currentRecord.getSchema().getField(currentFieldIndex).getFieldName());
}
} catch (IllegalTypeConversionException itce) { } catch (IllegalTypeConversionException itce) {
// If the field and column types don't match or the value can't otherwise be converted to the column datatype, // If the field and column types don't match or the value can't otherwise be converted to the column datatype,
// try with the original object and field datatype // try with the original object and field datatype

View File

@ -45,7 +45,6 @@ import java.sql.PreparedStatement
import java.sql.ResultSet import java.sql.ResultSet
import java.sql.SQLDataException import java.sql.SQLDataException
import java.sql.SQLException import java.sql.SQLException
import java.sql.SQLFeatureNotSupportedException
import java.sql.SQLNonTransientConnectionException import java.sql.SQLNonTransientConnectionException
import java.sql.Statement import java.sql.Statement
import java.time.LocalDate import java.time.LocalDate
@ -58,7 +57,6 @@ import static org.junit.Assert.assertNotNull
import static org.junit.Assert.assertNull import static org.junit.Assert.assertNull
import static org.junit.Assert.assertTrue import static org.junit.Assert.assertTrue
import static org.junit.Assert.fail import static org.junit.Assert.fail
import static org.mockito.ArgumentMatchers.anyMap import static org.mockito.ArgumentMatchers.anyMap
import static org.mockito.Mockito.doAnswer import static org.mockito.Mockito.doAnswer
import static org.mockito.Mockito.spy import static org.mockito.Mockito.spy
@ -1295,6 +1293,48 @@ class TestPutDatabaseRecord {
// A SQLFeatureNotSupportedException exception is expected from Derby when you try to put the data as an ARRAY // A SQLFeatureNotSupportedException exception is expected from Derby when you try to put the data as an ARRAY
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0) runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1) runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
}
@Test
void testLongVarchar() throws InitializationException, ProcessException, SQLException, IOException {
// Manually create and drop the tables and schemas
def conn = dbcp.connection
def stmt = conn.createStatement()
try {
stmt.execute('DROP TABLE TEMP')
} catch(ex) {
// Do nothing, table may not exist
}
stmt.execute('CREATE TABLE TEMP (id integer primary key, name long varchar)')
final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser)
runner.enableControllerService(parser)
parser.addSchemaField("id", RecordFieldType.INT)
parser.addSchemaField("name", RecordFieldType.STRING)
parser.addRecord(1, 'rec1')
parser.addRecord(2, 'rec2')
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'TEMP')
runner.enqueue(new byte[0])
runner.run()
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
ResultSet rs = stmt.executeQuery('SELECT * FROM TEMP')
assertTrue(rs.next())
assertEquals(1, rs.getInt(1))
assertEquals('rec1', rs.getString(2))
assertTrue(rs.next())
assertEquals(2, rs.getInt(1))
assertEquals('rec2', rs.getString(2))
assertFalse(rs.next())
stmt.close()
conn.close()
} }
} }