diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java index c5a7a5c925..65897c4e6e 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java @@ -65,6 +65,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLDataException; import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLXML; import java.sql.Time; import java.sql.Timestamp; @@ -297,7 +298,11 @@ public class JdbcCommon { } ByteBuffer bb = ByteBuffer.wrap(buffer); rec.put(i - 1, bb); - blob.free(); + try { + blob.free(); + } catch (SQLFeatureNotSupportedException sfnse) { + // The driver doesn't support free, but allow processing to continue + } } else { rec.put(i - 1, null); } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java index cc518898ee..b04f988e27 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java @@ -48,12 +48,15 @@ import java.math.BigInteger; import java.math.MathContext; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.sql.Blob; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; @@ -82,6 +85,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -89,7 +93,6 @@ public class TestJdbcCommon { private static final Logger LOGGER = LoggerFactory.getLogger(TestJdbcCommon.class); static final String createTable = "create table restaurants(id integer, name varchar(20), city varchar(50))"; - static final String dropTable = "drop table restaurants"; @ClassRule public static TemporaryFolder folder = new TemporaryFolder(); @@ -580,6 +583,41 @@ public class TestJdbcCommon { } } + @Test + public void testConvertToAvroStreamForBlob_FreeNotSupported() throws SQLException, IOException { + final ResultSetMetaData metadata = mock(ResultSetMetaData.class); + when(metadata.getColumnCount()).thenReturn(1); + when(metadata.getColumnType(1)).thenReturn(Types.BLOB); + when(metadata.getColumnName(1)).thenReturn("t_blob"); + when(metadata.getTableName(1)).thenReturn("table"); + + final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata); + + final byte[] byteBuffer = "test blob".getBytes(StandardCharsets.UTF_8); + when(rs.getObject(Mockito.anyInt())).thenReturn(byteBuffer); + + ByteArrayInputStream bais = new ByteArrayInputStream(byteBuffer); + Blob blob = mock(Blob.class); + when(blob.getBinaryStream()).thenReturn(bais); + when(blob.length()).thenReturn((long) byteBuffer.length); + doThrow(SQLFeatureNotSupportedException.class).when(blob).free(); + when(rs.getBlob(Mockito.anyInt())).thenReturn(blob); + + final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs); + + final DatumReader datumReader = new GenericDatumReader<>(); + try (final DataFileStream dataFileReader = new DataFileStream<>(instream, datumReader)) { + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(record); + Object o = record.get("t_blob"); + assertTrue(o instanceof ByteBuffer); + ByteBuffer bb = (ByteBuffer) o; + assertEquals("test blob", new String(bb.array(), StandardCharsets.UTF_8)); + } + } + } + @Test public void testConvertToAvroStreamForShort() throws SQLException, IOException { final ResultSetMetaData metadata = mock(ResultSetMetaData.class);