NIFI-4974: Add try/catch around blob.free() to support JDBC spec

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4861.
This commit is contained in:
Matthew Burgess 2021-03-01 16:15:23 -05:00 committed by Pierre Villard
parent ff93ec42c3
commit 29ebec071e
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 45 additions and 2 deletions

View File

@ -65,6 +65,7 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLDataException; import java.sql.SQLDataException;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLXML; import java.sql.SQLXML;
import java.sql.Time; import java.sql.Time;
import java.sql.Timestamp; import java.sql.Timestamp;
@ -297,7 +298,11 @@ public class JdbcCommon {
} }
ByteBuffer bb = ByteBuffer.wrap(buffer); ByteBuffer bb = ByteBuffer.wrap(buffer);
rec.put(i - 1, bb); rec.put(i - 1, bb);
blob.free(); try {
blob.free();
} catch (SQLFeatureNotSupportedException sfnse) {
// The driver doesn't support free, but allow processing to continue
}
} else { } else {
rec.put(i - 1, null); rec.put(i - 1, null);
} }

View File

@ -48,12 +48,15 @@ import java.math.BigInteger;
import java.math.MathContext; import java.math.MathContext;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Blob;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement; import java.sql.Statement;
import java.sql.Time; import java.sql.Time;
import java.sql.Timestamp; import java.sql.Timestamp;
@ -82,6 +85,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -89,7 +93,6 @@ public class TestJdbcCommon {
private static final Logger LOGGER = LoggerFactory.getLogger(TestJdbcCommon.class); private static final Logger LOGGER = LoggerFactory.getLogger(TestJdbcCommon.class);
static final String createTable = "create table restaurants(id integer, name varchar(20), city varchar(50))"; static final String createTable = "create table restaurants(id integer, name varchar(20), city varchar(50))";
static final String dropTable = "drop table restaurants";
@ClassRule @ClassRule
public static TemporaryFolder folder = new TemporaryFolder(); public static TemporaryFolder folder = new TemporaryFolder();
@ -580,6 +583,41 @@ public class TestJdbcCommon {
} }
} }
@Test
public void testConvertToAvroStreamForBlob_FreeNotSupported() throws SQLException, IOException {
final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
when(metadata.getColumnCount()).thenReturn(1);
when(metadata.getColumnType(1)).thenReturn(Types.BLOB);
when(metadata.getColumnName(1)).thenReturn("t_blob");
when(metadata.getTableName(1)).thenReturn("table");
final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
final byte[] byteBuffer = "test blob".getBytes(StandardCharsets.UTF_8);
when(rs.getObject(Mockito.anyInt())).thenReturn(byteBuffer);
ByteArrayInputStream bais = new ByteArrayInputStream(byteBuffer);
Blob blob = mock(Blob.class);
when(blob.getBinaryStream()).thenReturn(bais);
when(blob.length()).thenReturn((long) byteBuffer.length);
doThrow(SQLFeatureNotSupportedException.class).when(blob).free();
when(rs.getBlob(Mockito.anyInt())).thenReturn(blob);
final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
GenericRecord record = null;
while (dataFileReader.hasNext()) {
record = dataFileReader.next(record);
Object o = record.get("t_blob");
assertTrue(o instanceof ByteBuffer);
ByteBuffer bb = (ByteBuffer) o;
assertEquals("test blob", new String(bb.array(), StandardCharsets.UTF_8));
}
}
}
@Test @Test
public void testConvertToAvroStreamForShort() throws SQLException, IOException { public void testConvertToAvroStreamForShort() throws SQLException, IOException {
final ResultSetMetaData metadata = mock(ResultSetMetaData.class); final ResultSetMetaData metadata = mock(ResultSetMetaData.class);