NIFI-5612: Support JDBC drivers that return Long for unsigned ints

Refactors tests in order to share code repeated in tests and to enable
some parameterized testing.

MySQL Connector/J 5.1.x in conjunction with MySQL 5.0.x will return
a Long for ResultSet#getObject when the SQL type is an unsigned integer.
This change prevents that error from occurring while implementing a more
informational exception describing what the failing object's POJO type
is in addition to its string value.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #3032
This commit is contained in:
Colin Dean 2018-09-19 20:27:47 -04:00 committed by Matthew Burgess
parent e24388aa7f
commit 0dd382370b
4 changed files with 295 additions and 30 deletions

View File

@ -91,11 +91,13 @@ import org.apache.avro.SchemaBuilder.FieldAssembler;
import org.apache.avro.SchemaBuilder.NullDefault;
import org.apache.avro.SchemaBuilder.UnionAccumulator;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.UnresolvedUnionException;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
@ -449,6 +451,9 @@ public class JdbcCommon {
} else {
rec.put(i - 1, value);
}
} else if ((value instanceof Long) && meta.getPrecision(i) < MAX_DIGITS_IN_INT) {
int intValue = ((Long)value).intValue();
rec.put(i-1, intValue);
} else {
rec.put(i-1, value);
}
@ -470,8 +475,22 @@ public class JdbcCommon {
rec.put(i - 1, value.toString());
}
}
try {
dataFileWriter.append(rec);
nrOfRows += 1;
} catch (DataFileWriter.AppendWriteException awe) {
Throwable rootCause = ExceptionUtils.getRootCause(awe);
if(rootCause instanceof UnresolvedUnionException) {
UnresolvedUnionException uue = (UnresolvedUnionException) rootCause;
throw new RuntimeException(
"Unable to resolve union for value " + uue.getUnresolvedDatum() +
" with type " + uue.getUnresolvedDatum().getClass().getCanonicalName() +
" while appending record " + rec,
awe);
} else {
throw awe;
}
}
if (options.maxRows > 0 && nrOfRows == options.maxRows)
break;

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class JdbcCommonTestUtils {
static ResultSet resultSetReturningMetadata(ResultSetMetaData metadata) throws SQLException {
final ResultSet rs = mock(ResultSet.class);
when(rs.getMetaData()).thenReturn(metadata);
final AtomicInteger counter = new AtomicInteger(1);
Mockito.doAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
return counter.getAndDecrement() > 0;
}
}).when(rs).next();
return rs;
}
static InputStream convertResultSetToAvroInputStream(ResultSet rs) throws SQLException, IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
JdbcCommon.convertToAvroStream(rs, baos, false);
final byte[] serializedBytes = baos.toByteArray();
return new ByteArrayInputStream(serializedBytes);
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.standard.util;
import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.convertResultSetToAvroInputStream;
import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.resultSetReturningMetadata;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@ -425,16 +427,7 @@ public class TestJdbcCommon {
when(metadata.getPrecision(1)).thenReturn(dbPrecision);
when(metadata.getScale(1)).thenReturn(expectedScale);
final ResultSet rs = mock(ResultSet.class);
when(rs.getMetaData()).thenReturn(metadata);
final AtomicInteger counter = new AtomicInteger(1);
Mockito.doAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
return counter.getAndDecrement() > 0;
}
}).when(rs).next();
final ResultSet rs = resultSetReturningMetadata(metadata);
when(rs.getObject(Mockito.anyInt())).thenReturn(bigDecimal);
@ -587,27 +580,12 @@ public class TestJdbcCommon {
when(metadata.getColumnName(1)).thenReturn("t_int");
when(metadata.getTableName(1)).thenReturn("table");
final ResultSet rs = mock(ResultSet.class);
when(rs.getMetaData()).thenReturn(metadata);
final AtomicInteger counter = new AtomicInteger(1);
Mockito.doAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
return counter.getAndDecrement() > 0;
}
}).when(rs).next();
final ResultSet rs = resultSetReturningMetadata(metadata);
final short s = 25;
when(rs.getObject(Mockito.anyInt())).thenReturn(s);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
JdbcCommon.convertToAvroStream(rs, baos, false);
final byte[] serializedBytes = baos.toByteArray();
final InputStream instream = new ByteArrayInputStream(serializedBytes);
final InputStream instream = convertResultSetToAvroInputStream(rs);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
@ -619,6 +597,62 @@ public class TestJdbcCommon {
}
}
@Test
public void testConvertToAvroStreamForUnsignedIntegerWithPrecision1ReturnedAsLong_NIFI5612() throws SQLException, IOException {
final String mockColumnName = "t_int";
final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
when(metadata.getColumnCount()).thenReturn(1);
when(metadata.getColumnType(1)).thenReturn(Types.INTEGER);
when(metadata.isSigned(1)).thenReturn(false);
when(metadata.getPrecision(1)).thenReturn(1);
when(metadata.getColumnName(1)).thenReturn(mockColumnName);
when(metadata.getTableName(1)).thenReturn("table");
final ResultSet rs = resultSetReturningMetadata(metadata);
final Long ret = 0L;
when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
final InputStream instream = convertResultSetToAvroInputStream(rs);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
GenericRecord record = null;
while (dataFileReader.hasNext()) {
record = dataFileReader.next(record);
assertEquals(Long.toString(ret), record.get(mockColumnName).toString());
}
}
}
@Test
public void testConvertToAvroStreamForUnsignedIntegerWithPrecision10() throws SQLException, IOException {
final String mockColumnName = "t_int";
final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
when(metadata.getColumnCount()).thenReturn(1);
when(metadata.getColumnType(1)).thenReturn(Types.INTEGER);
when(metadata.isSigned(1)).thenReturn(false);
when(metadata.getPrecision(1)).thenReturn(10);
when(metadata.getColumnName(1)).thenReturn(mockColumnName);
when(metadata.getTableName(1)).thenReturn("table");
final ResultSet rs = resultSetReturningMetadata(metadata);
final Long ret = 0L;
when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
final InputStream instream = convertResultSetToAvroInputStream(rs);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
GenericRecord record = null;
while (dataFileReader.hasNext()) {
record = dataFileReader.next(record);
assertEquals(Long.toString(ret), record.get(mockColumnName).toString());
}
}
}
@Test
public void testConvertToAvroStreamForDateTimeAsString() throws SQLException, IOException, ParseException {
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import java.io.IOException;
import java.io.InputStream;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;
import static java.sql.Types.INTEGER;
import static java.sql.Types.SMALLINT;
import static java.sql.Types.TINYINT;
import static java.sql.Types.BIGINT;
import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.convertResultSetToAvroInputStream;
import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.resultSetReturningMetadata;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class TestJdbcCommonConvertToAvro {
private final static boolean SIGNED = true;
private final static boolean UNSIGNED = false;
private static int[] range(int start, int end) {
return IntStream.rangeClosed(start, end).toArray();
}
@Parameterized.Parameters(name = "{index}: {0}")
public static Collection<TestParams> data() {
Map<Integer, int[]> typeWithPrecisionRange = new HashMap<>();
typeWithPrecisionRange.put(TINYINT, range(1,3));
typeWithPrecisionRange.put(SMALLINT, range(1,5));
typeWithPrecisionRange.put(INTEGER, range(1,9));
ArrayList<TestParams> params = new ArrayList<>();
typeWithPrecisionRange.forEach( (sqlType, precisions) -> {
for (int precision : precisions) {
params.add(new TestParams(sqlType, precision, SIGNED));
params.add(new TestParams(sqlType, precision, UNSIGNED));
}
});
// remove cases that we know should fail
params.removeIf(param ->
param.sqlType == INTEGER
&&
param.precision == 9
&&
param.signed == UNSIGNED
);
return params;
}
@Parameterized.Parameter
public TestParams testParams;
static class TestParams {
int sqlType;
int precision;
boolean signed;
TestParams(int sqlType, int precision, boolean signed) {
this.sqlType = sqlType;
this.precision = precision;
this.signed = signed;
}
private String humanReadableType() {
switch(sqlType){
case TINYINT:
return "TINYINT";
case INTEGER:
return "INTEGER";
case SMALLINT:
return "SMALLINT";
case BIGINT:
return "BIGINT";
default:
return "UNKNOWN - ADD TO LIST";
}
}
private String humanReadableSigned() {
if(signed) return "SIGNED";
return "UNSIGNED";
}
public String toString(){
return String.format(
"TestParams(SqlType=%s, Precision=%s, Signed=%s)",
humanReadableType(),
precision,
humanReadableSigned());
}
}
@Test
public void testConvertToAvroStreamForNumbers() throws SQLException, IOException {
final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
when(metadata.getColumnCount()).thenReturn(1);
when(metadata.getColumnType(1)).thenReturn(testParams.sqlType);
when(metadata.isSigned(1)).thenReturn(testParams.signed);
when(metadata.getPrecision(1)).thenReturn(testParams.precision);
when(metadata.getColumnName(1)).thenReturn("t_int");
when(metadata.getTableName(1)).thenReturn("table");
final ResultSet rs = resultSetReturningMetadata(metadata);
final int ret = 0;
when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
final InputStream instream = convertResultSetToAvroInputStream(rs);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
GenericRecord record = null;
while (dataFileReader.hasNext()) {
record = dataFileReader.next(record);
assertEquals(Integer.toString(ret), record.get("t_int").toString());
}
}
}
}