NIFI-3076: Fixed handling of 'medium' unsigned integers in JdbcCommon

This closes #1254.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Matt Burgess 2016-11-21 16:33:12 -05:00 committed by Bryan Bende
parent bc223fa197
commit ba513447d7
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
2 changed files with 38 additions and 1 deletions

View File

@ -72,6 +72,7 @@ import org.apache.commons.lang3.StringUtils;
public class JdbcCommon { public class JdbcCommon {
private static final int MAX_DIGITS_IN_BIGINT = 19; private static final int MAX_DIGITS_IN_BIGINT = 19;
private static final int MAX_DIGITS_IN_INT = 9;
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException { public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException {
return convertToAvroStream(rs, outStream, null, null, convertNames); return convertToAvroStream(rs, outStream, null, null, convertNames);
@ -284,7 +285,7 @@ public class JdbcCommon {
break; break;
case INTEGER: case INTEGER:
if (meta.isSigned(i)) { if (meta.isSigned(i) || (meta.getPrecision(i) > 0 && meta.getPrecision(i) <= MAX_DIGITS_IN_INT)) {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault(); builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
} else { } else {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault(); builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();

View File

@ -282,6 +282,7 @@ public class TestJdbcCommon {
final ResultSetMetaData metadata = mock(ResultSetMetaData.class); final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
when(metadata.getColumnCount()).thenReturn(1); when(metadata.getColumnCount()).thenReturn(1);
when(metadata.getColumnType(1)).thenReturn(Types.INTEGER); when(metadata.getColumnType(1)).thenReturn(Types.INTEGER);
when(metadata.getPrecision(1)).thenReturn(10);
when(metadata.isSigned(1)).thenReturn(false); when(metadata.isSigned(1)).thenReturn(false);
when(metadata.getColumnName(1)).thenReturn("Col1"); when(metadata.getColumnName(1)).thenReturn("Col1");
when(metadata.getTableName(1)).thenReturn("Table1"); when(metadata.getTableName(1)).thenReturn("Table1");
@ -311,6 +312,41 @@ public class TestJdbcCommon {
assertTrue(foundNullSchema); assertTrue(foundNullSchema);
} }
@Test
public void testMediumUnsignedIntShouldBeInt() throws SQLException, IllegalArgumentException, IllegalAccessException {
final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
when(metadata.getColumnCount()).thenReturn(1);
when(metadata.getColumnType(1)).thenReturn(Types.INTEGER);
when(metadata.getPrecision(1)).thenReturn(8);
when(metadata.isSigned(1)).thenReturn(false);
when(metadata.getColumnName(1)).thenReturn("Col1");
when(metadata.getTableName(1)).thenReturn("Table1");
final ResultSet rs = mock(ResultSet.class);
when(rs.getMetaData()).thenReturn(metadata);
Schema schema = JdbcCommon.createSchema(rs);
Assert.assertNotNull(schema);
Schema.Field field = schema.getField("Col1");
Schema fieldSchema = field.schema();
Assert.assertEquals(2, fieldSchema.getTypes().size());
boolean foundIntSchema = false;
boolean foundNullSchema = false;
for (Schema type : fieldSchema.getTypes()) {
if (type.getType().equals(Schema.Type.INT)) {
foundIntSchema = true;
} else if (type.getType().equals(Schema.Type.NULL)) {
foundNullSchema = true;
}
}
assertTrue(foundIntSchema);
assertTrue(foundNullSchema);
}
@Test @Test
public void testConvertToAvroStreamForBigDecimal() throws SQLException, IOException { public void testConvertToAvroStreamForBigDecimal() throws SQLException, IOException {