diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index f19f7e0bcd..bb058d4e58 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -166,7 +166,9 @@ public class JdbcCommon { // direct put to avro record results: // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte rec.put(i - 1, ((Byte) value).intValue()); - + } else if(value instanceof Short) { + //MS SQL returns TINYINT as a Java Short, which Avro doesn't understand. + rec.put(i - 1, ((Short) value).intValue()); } else if (value instanceof BigDecimal) { // Avro can't handle BigDecimal as a number - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38" rec.put(i - 1, value.toString()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java index 1e0fe2fd54..e05844f452 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java @@ -450,6 +450,45 @@ public class TestJdbcCommon { } } + @Test + public void testConvertToAvroStreamForShort() throws SQLException, IOException { + final ResultSetMetaData metadata = mock(ResultSetMetaData.class); + when(metadata.getColumnCount()).thenReturn(1); + when(metadata.getColumnType(1)).thenReturn(Types.TINYINT); + when(metadata.getColumnName(1)).thenReturn("t_int"); + when(metadata.getTableName(1)).thenReturn("table"); + + final ResultSet rs = mock(ResultSet.class); + when(rs.getMetaData()).thenReturn(metadata); + + final AtomicInteger counter = new AtomicInteger(1); + Mockito.doAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + return counter.getAndDecrement() > 0; + } + }).when(rs).next(); + + final short s = 25; + when(rs.getObject(Mockito.anyInt())).thenReturn(s); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + JdbcCommon.convertToAvroStream(rs, baos, false); + + final byte[] serializedBytes = baos.toByteArray(); + + final InputStream instream = new ByteArrayInputStream(serializedBytes); + + final DatumReader datumReader = new GenericDatumReader<>(); + try (final DataFileStream dataFileReader = new DataFileStream<>(instream, datumReader)) { + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(record); + assertEquals(Short.toString(s), record.get("t_int").toString()); + } + } + } // many test use Derby as database, so ensure driver is available @Test