diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index 59eaf30d27..ac95c8b985 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -46,6 +46,8 @@ import static java.sql.Types.VARCHAR; import java.io.IOException; import java.io.OutputStream; +import java.math.BigDecimal; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.sql.ResultSet; import java.sql.ResultSetMetaData; @@ -99,6 +101,10 @@ public class JdbcCommon { // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte rec.put(i - 1, ((Byte) value).intValue()); + } else if (value instanceof BigDecimal || value instanceof BigInteger) { + // Avro can't handle BigDecimal and BigInteger as numbers - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38" + rec.put(i - 1, value.toString()); + } else if (value instanceof Number || value instanceof Boolean) { rec.put(i - 1, value); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java index 9c9532f998..b8fcfed8d9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; +import java.math.BigDecimal; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; @@ -34,6 +35,7 @@ import java.sql.Statement; import java.sql.Types; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; @@ -44,6 +46,8 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestJdbcCommon { @@ -180,6 +184,47 @@ public class TestJdbcCommon { } + @Test + public void testConvertToAvroStreamForBigDecimal() throws SQLException, IOException { + final ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Mockito.when(metadata.getColumnCount()).thenReturn(1); + Mockito.when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC); + Mockito.when(metadata.getColumnName(1)).thenReturn("Chairman"); + Mockito.when(metadata.getTableName(1)).thenReturn("table"); + + final ResultSet rs = Mockito.mock(ResultSet.class); + Mockito.when(rs.getMetaData()).thenReturn(metadata); + + final AtomicInteger counter = new AtomicInteger(1); + Mockito.doAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + return counter.getAndDecrement() > 0; + } + }).when(rs).next(); + + final BigDecimal bigDecimal = new BigDecimal(38D); + Mockito.when(rs.getObject(Mockito.anyInt())).thenReturn(bigDecimal); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + JdbcCommon.convertToAvroStream(rs, baos); + + final byte[] serializedBytes = baos.toByteArray(); + + final InputStream instream = new ByteArrayInputStream(serializedBytes); + + final DatumReader datumReader = new GenericDatumReader(); + try (final DataFileStream dataFileReader = new DataFileStream(instream, datumReader)) { + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(record); + assertEquals(bigDecimal.toString(), record.get("Chairman").toString()); + } + } + } + + // many test use Derby as database, so ensure driver is available @Test public void testDriverLoad() throws ClassNotFoundException {