From 8d46041a134fa134fec59ef384d32d19fd1bfb96 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Mon, 21 Dec 2015 16:55:32 -0500 Subject: [PATCH] NIFI-1319 Updating JdbcCommon to check meta.isSigned(i) to determine if Avro schema should use a long or int Signed-off-by: Bryan Bende --- .../processors/standard/util/JdbcCommon.java | 7 ++ .../standard/util/TestJdbcCommon.java | 68 +++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index ac95c8b985..1a81b4bc04 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -154,6 +154,13 @@ public class JdbcCommon { break; case INTEGER: + if (meta.isSigned(i)) { + builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault(); + } else { + builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault(); + } + break; + case SMALLINT: case TINYINT: builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java index b8fcfed8d9..266d4265e9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java @@ -183,6 +183,74 @@ public class TestJdbcCommon { } } + @Test + public void testSignedIntShouldBeInt() throws SQLException, IllegalArgumentException, IllegalAccessException { + final ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Mockito.when(metadata.getColumnCount()).thenReturn(1); + Mockito.when(metadata.getColumnType(1)).thenReturn(Types.INTEGER); + Mockito.when(metadata.isSigned(1)).thenReturn(true); + Mockito.when(metadata.getColumnName(1)).thenReturn("Col1"); + Mockito.when(metadata.getTableName(1)).thenReturn("Table1"); + + final ResultSet rs = Mockito.mock(ResultSet.class); + Mockito.when(rs.getMetaData()).thenReturn(metadata); + + Schema schema = JdbcCommon.createSchema(rs); + Assert.assertNotNull(schema); + + Schema.Field field = schema.getField("Col1"); + Schema fieldSchema = field.schema(); + Assert.assertEquals(2, fieldSchema.getTypes().size()); + + boolean foundIntSchema = false; + boolean foundNullSchema = false; + + for (Schema type : fieldSchema.getTypes()) { + if (type.getType().equals(Schema.Type.INT)) { + foundIntSchema = true; + } else if (type.getType().equals(Schema.Type.NULL)) { + foundNullSchema = true; + } + } + + Assert.assertTrue(foundIntSchema); + Assert.assertTrue(foundNullSchema); + } + + @Test + public void testUnsignedIntShouldBeLong() throws SQLException, IllegalArgumentException, IllegalAccessException { + final ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Mockito.when(metadata.getColumnCount()).thenReturn(1); + Mockito.when(metadata.getColumnType(1)).thenReturn(Types.INTEGER); + Mockito.when(metadata.isSigned(1)).thenReturn(false); + Mockito.when(metadata.getColumnName(1)).thenReturn("Col1"); + Mockito.when(metadata.getTableName(1)).thenReturn("Table1"); + + final ResultSet rs = Mockito.mock(ResultSet.class); + Mockito.when(rs.getMetaData()).thenReturn(metadata); + + Schema schema = JdbcCommon.createSchema(rs); + Assert.assertNotNull(schema); + + Schema.Field field = schema.getField("Col1"); + Schema fieldSchema = field.schema(); + Assert.assertEquals(2, fieldSchema.getTypes().size()); + + boolean foundLongSchema = false; + boolean foundNullSchema = false; + + for (Schema type : fieldSchema.getTypes()) { + if (type.getType().equals(Schema.Type.LONG)) { + foundLongSchema = true; + } else if (type.getType().equals(Schema.Type.NULL)) { + foundNullSchema = true; + } + } + + Assert.assertTrue(foundLongSchema); + Assert.assertTrue(foundNullSchema); + } + @Test public void testConvertToAvroStreamForBigDecimal() throws SQLException, IOException {