NIFI-1056: Do not treat BigDecimal, BigInteger the same way we treat Number's when serializing to Avro because Avro Java Library doesn't support them

Reviewed and Unit Test Amended by Tony Kurc (tkurc@apache.org)
This commit is contained in:
Mark Payne 2015-10-25 11:31:30 -04:00 committed by Tony Kurc
parent 385bfbb2c6
commit 8d2f9bc64b
2 changed files with 51 additions and 0 deletions

View File

@ -46,6 +46,8 @@ import static java.sql.Types.VARCHAR;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@ -99,6 +101,10 @@ public class JdbcCommon {
// org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte
rec.put(i - 1, ((Byte) value).intValue());
} else if (value instanceof BigDecimal || value instanceof BigInteger) {
// Avro can't handle BigDecimal and BigInteger as numbers - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38"
rec.put(i - 1, value.toString());
} else if (value instanceof Number || value instanceof Boolean) {
rec.put(i - 1, value);

View File

@ -25,6 +25,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
@ -34,6 +35,7 @@ import java.sql.Statement;
import java.sql.Types;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
@ -44,6 +46,8 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestJdbcCommon {
@ -180,6 +184,47 @@ public class TestJdbcCommon {
}
@Test
public void testConvertToAvroStreamForBigDecimal() throws SQLException, IOException {
final ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class);
Mockito.when(metadata.getColumnCount()).thenReturn(1);
Mockito.when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC);
Mockito.when(metadata.getColumnName(1)).thenReturn("Chairman");
Mockito.when(metadata.getTableName(1)).thenReturn("table");
final ResultSet rs = Mockito.mock(ResultSet.class);
Mockito.when(rs.getMetaData()).thenReturn(metadata);
final AtomicInteger counter = new AtomicInteger(1);
Mockito.doAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
return counter.getAndDecrement() > 0;
}
}).when(rs).next();
final BigDecimal bigDecimal = new BigDecimal(38D);
Mockito.when(rs.getObject(Mockito.anyInt())).thenReturn(bigDecimal);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
JdbcCommon.convertToAvroStream(rs, baos);
final byte[] serializedBytes = baos.toByteArray();
final InputStream instream = new ByteArrayInputStream(serializedBytes);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
GenericRecord record = null;
while (dataFileReader.hasNext()) {
record = dataFileReader.next(record);
assertEquals(bigDecimal.toString(), record.get("Chairman").toString());
}
}
}
// many test use Derby as database, so ensure driver is available
@Test
public void testDriverLoad() throws ClassNotFoundException {