From 0dd382370bf139e0f8c1b22761e4aa306943dd77 Mon Sep 17 00:00:00 2001 From: Colin Dean Date: Wed, 19 Sep 2018 20:27:47 -0400 Subject: [PATCH] NIFI-5612: Support JDBC drivers that return Long for unsigned ints Refactors tests in order to share code repeated in tests and to enable some parameterized testing. MySQL Connector/J 5.1.x in conjunction with MySQL 5.0.x will return a Long for ResultSet#getObject when the SQL type is an unsigned integer. This change prevents that error from occurring while implementing a more informational exception describing what the failing object's POJO type is in addition to its string value. Signed-off-by: Matthew Burgess This closes #3032 --- .../processors/standard/util/JdbcCommon.java | 25 ++- .../standard/util/JdbcCommonTestUtils.java | 60 +++++++ .../standard/util/TestJdbcCommon.java | 88 ++++++---- .../util/TestJdbcCommonConvertToAvro.java | 152 ++++++++++++++++++ 4 files changed, 295 insertions(+), 30 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index 03761c6dd5..9681e2fd37 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -91,11 +91,13 @@ import org.apache.avro.SchemaBuilder.FieldAssembler; import org.apache.avro.SchemaBuilder.NullDefault; import org.apache.avro.SchemaBuilder.UnionAccumulator; import org.apache.avro.file.CodecFactory; +import org.apache.avro.UnresolvedUnionException; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; @@ -449,8 +451,11 @@ public class JdbcCommon { } else { rec.put(i - 1, value); } + } else if ((value instanceof Long) && meta.getPrecision(i) < MAX_DIGITS_IN_INT) { + int intValue = ((Long)value).intValue(); + rec.put(i-1, intValue); } else { - rec.put(i - 1, value); + rec.put(i-1, value); } } else if (value instanceof Date) { @@ -470,8 +475,22 @@ public class JdbcCommon { rec.put(i - 1, value.toString()); } } - dataFileWriter.append(rec); - nrOfRows += 1; + try { + dataFileWriter.append(rec); + nrOfRows += 1; + } catch (DataFileWriter.AppendWriteException awe) { + Throwable rootCause = ExceptionUtils.getRootCause(awe); + if(rootCause instanceof UnresolvedUnionException) { + UnresolvedUnionException uue = (UnresolvedUnionException) rootCause; + throw new RuntimeException( + "Unable to resolve union for value " + uue.getUnresolvedDatum() + + " with type " + uue.getUnresolvedDatum().getClass().getCanonicalName() + + " while appending record " + rec, + awe); + } else { + throw awe; + } + } if (options.maxRows > 0 && nrOfRows == options.maxRows) break; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java new file mode 100644 index 0000000000..ad571588dc --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class JdbcCommonTestUtils { + static ResultSet resultSetReturningMetadata(ResultSetMetaData metadata) throws SQLException { + final ResultSet rs = mock(ResultSet.class); + when(rs.getMetaData()).thenReturn(metadata); + + final AtomicInteger counter = new AtomicInteger(1); + Mockito.doAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + return counter.getAndDecrement() > 0; + } + }).when(rs).next(); + + return rs; + } + + static InputStream convertResultSetToAvroInputStream(ResultSet rs) throws SQLException, IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + JdbcCommon.convertToAvroStream(rs, baos, false); + + final byte[] serializedBytes = baos.toByteArray(); + + return new ByteArrayInputStream(serializedBytes); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java index 5eca32af85..9cf4fc1309 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.processors.standard.util; +import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.convertResultSetToAvroInputStream; +import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.resultSetReturningMetadata; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -425,16 +427,7 @@ public class TestJdbcCommon { when(metadata.getPrecision(1)).thenReturn(dbPrecision); when(metadata.getScale(1)).thenReturn(expectedScale); - final ResultSet rs = mock(ResultSet.class); - when(rs.getMetaData()).thenReturn(metadata); - - final AtomicInteger counter = new AtomicInteger(1); - Mockito.doAnswer(new Answer() { - @Override - public Boolean answer(InvocationOnMock invocation) throws Throwable { - return counter.getAndDecrement() > 0; - } - }).when(rs).next(); + final ResultSet rs = resultSetReturningMetadata(metadata); when(rs.getObject(Mockito.anyInt())).thenReturn(bigDecimal); @@ -587,27 +580,12 @@ public class TestJdbcCommon { when(metadata.getColumnName(1)).thenReturn("t_int"); when(metadata.getTableName(1)).thenReturn("table"); - final ResultSet rs = mock(ResultSet.class); - when(rs.getMetaData()).thenReturn(metadata); - - final AtomicInteger counter = new AtomicInteger(1); - Mockito.doAnswer(new Answer() { - @Override - public Boolean answer(InvocationOnMock invocation) throws Throwable { - return counter.getAndDecrement() > 0; - } - }).when(rs).next(); + final ResultSet rs = resultSetReturningMetadata(metadata); final short s = 25; when(rs.getObject(Mockito.anyInt())).thenReturn(s); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - JdbcCommon.convertToAvroStream(rs, baos, false); - - final byte[] serializedBytes = baos.toByteArray(); - - final InputStream instream = new ByteArrayInputStream(serializedBytes); + final InputStream instream = convertResultSetToAvroInputStream(rs); final DatumReader datumReader = new GenericDatumReader<>(); try (final DataFileStream dataFileReader = new DataFileStream<>(instream, datumReader)) { @@ -619,6 +597,62 @@ public class TestJdbcCommon { } } + @Test + public void testConvertToAvroStreamForUnsignedIntegerWithPrecision1ReturnedAsLong_NIFI5612() throws SQLException, IOException { + final String mockColumnName = "t_int"; + final ResultSetMetaData metadata = mock(ResultSetMetaData.class); + when(metadata.getColumnCount()).thenReturn(1); + when(metadata.getColumnType(1)).thenReturn(Types.INTEGER); + when(metadata.isSigned(1)).thenReturn(false); + when(metadata.getPrecision(1)).thenReturn(1); + when(metadata.getColumnName(1)).thenReturn(mockColumnName); + when(metadata.getTableName(1)).thenReturn("table"); + + final ResultSet rs = resultSetReturningMetadata(metadata); + + final Long ret = 0L; + when(rs.getObject(Mockito.anyInt())).thenReturn(ret); + + final InputStream instream = convertResultSetToAvroInputStream(rs); + + final DatumReader datumReader = new GenericDatumReader<>(); + try (final DataFileStream dataFileReader = new DataFileStream<>(instream, datumReader)) { + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(record); + assertEquals(Long.toString(ret), record.get(mockColumnName).toString()); + } + } + } + + @Test + public void testConvertToAvroStreamForUnsignedIntegerWithPrecision10() throws SQLException, IOException { + final String mockColumnName = "t_int"; + final ResultSetMetaData metadata = mock(ResultSetMetaData.class); + when(metadata.getColumnCount()).thenReturn(1); + when(metadata.getColumnType(1)).thenReturn(Types.INTEGER); + when(metadata.isSigned(1)).thenReturn(false); + when(metadata.getPrecision(1)).thenReturn(10); + when(metadata.getColumnName(1)).thenReturn(mockColumnName); + when(metadata.getTableName(1)).thenReturn("table"); + + final ResultSet rs = resultSetReturningMetadata(metadata); + + final Long ret = 0L; + when(rs.getObject(Mockito.anyInt())).thenReturn(ret); + + final InputStream instream = convertResultSetToAvroInputStream(rs); + + final DatumReader datumReader = new GenericDatumReader<>(); + try (final DataFileStream dataFileReader = new DataFileStream<>(instream, datumReader)) { + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(record); + assertEquals(Long.toString(ret), record.get(mockColumnName).toString()); + } + } + } + @Test public void testConvertToAvroStreamForDateTimeAsString() throws SQLException, IOException, ParseException { final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java new file mode 100644 index 0000000000..eb736e29fc --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; + +import java.io.IOException; +import java.io.InputStream; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.IntStream; + +import static java.sql.Types.INTEGER; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TINYINT; +import static java.sql.Types.BIGINT; +import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.convertResultSetToAvroInputStream; +import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.resultSetReturningMetadata; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(Parameterized.class) +public class TestJdbcCommonConvertToAvro { + + private final static boolean SIGNED = true; + private final static boolean UNSIGNED = false; + + private static int[] range(int start, int end) { + return IntStream.rangeClosed(start, end).toArray(); + } + + @Parameterized.Parameters(name = "{index}: {0}") + public static Collection data() { + Map typeWithPrecisionRange = new HashMap<>(); + typeWithPrecisionRange.put(TINYINT, range(1,3)); + typeWithPrecisionRange.put(SMALLINT, range(1,5)); + typeWithPrecisionRange.put(INTEGER, range(1,9)); + + ArrayList params = new ArrayList<>(); + + typeWithPrecisionRange.forEach( (sqlType, precisions) -> { + for (int precision : precisions) { + params.add(new TestParams(sqlType, precision, SIGNED)); + params.add(new TestParams(sqlType, precision, UNSIGNED)); + } + }); + // remove cases that we know should fail + params.removeIf(param -> + param.sqlType == INTEGER + && + param.precision == 9 + && + param.signed == UNSIGNED + ); + + return params; + } + + @Parameterized.Parameter + public TestParams testParams; + + static class TestParams { + int sqlType; + int precision; + boolean signed; + + TestParams(int sqlType, int precision, boolean signed) { + this.sqlType = sqlType; + this.precision = precision; + this.signed = signed; + } + private String humanReadableType() { + switch(sqlType){ + case TINYINT: + return "TINYINT"; + case INTEGER: + return "INTEGER"; + case SMALLINT: + return "SMALLINT"; + case BIGINT: + return "BIGINT"; + default: + return "UNKNOWN - ADD TO LIST"; + } + } + private String humanReadableSigned() { + if(signed) return "SIGNED"; + return "UNSIGNED"; + } + public String toString(){ + return String.format( + "TestParams(SqlType=%s, Precision=%s, Signed=%s)", + humanReadableType(), + precision, + humanReadableSigned()); + } + } + + @Test + public void testConvertToAvroStreamForNumbers() throws SQLException, IOException { + final ResultSetMetaData metadata = mock(ResultSetMetaData.class); + when(metadata.getColumnCount()).thenReturn(1); + when(metadata.getColumnType(1)).thenReturn(testParams.sqlType); + when(metadata.isSigned(1)).thenReturn(testParams.signed); + when(metadata.getPrecision(1)).thenReturn(testParams.precision); + when(metadata.getColumnName(1)).thenReturn("t_int"); + when(metadata.getTableName(1)).thenReturn("table"); + + final ResultSet rs = resultSetReturningMetadata(metadata); + + final int ret = 0; + when(rs.getObject(Mockito.anyInt())).thenReturn(ret); + + final InputStream instream = convertResultSetToAvroInputStream(rs); + + final DatumReader datumReader = new GenericDatumReader<>(); + try (final DataFileStream dataFileReader = new DataFileStream<>(instream, datumReader)) { + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(record); + assertEquals(Integer.toString(ret), record.get("t_int").toString()); + } + } + } +}