mirror of https://github.com/apache/nifi.git
Free Clob after reading from ResultSet
NIFI-10657 Added debug logging when unable to free clob or blob because it is unsupported Signed-off-by: Joe Gresock <jgresock@gmail.com> This closes #8654.
This commit is contained in:
parent
f42c2eb6bc
commit
b5943941ba
|
@ -36,6 +36,8 @@ import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||||
import org.apache.nifi.avro.AvroTypeUtil;
|
import org.apache.nifi.avro.AvroTypeUtil;
|
||||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -125,6 +127,8 @@ public class JdbcCommon {
|
||||||
public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
|
public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
|
||||||
public static final String MASKED_LOG_VALUE = "MASKED VALUE";
|
public static final String MASKED_LOG_VALUE = "MASKED VALUE";
|
||||||
|
|
||||||
|
private final static Logger logger = LoggerFactory.getLogger(JdbcCommon.class);
|
||||||
|
|
||||||
private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
|
private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
|
||||||
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
|
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
|
||||||
|
|
||||||
|
@ -274,7 +278,7 @@ public class JdbcCommon {
|
||||||
final int javaSqlType = meta.getColumnType(i);
|
final int javaSqlType = meta.getColumnType(i);
|
||||||
final Schema fieldSchema = schema.getFields().get(i - 1).schema();
|
final Schema fieldSchema = schema.getFields().get(i - 1).schema();
|
||||||
|
|
||||||
// Need to handle CLOB and BLOB before getObject() is called, due to ResultSet's maximum portability statement
|
// Need to handle CLOB and NCLOB before getObject() is called, due to ResultSet's maximum portability statement
|
||||||
if (javaSqlType == CLOB || javaSqlType == NCLOB) {
|
if (javaSqlType == CLOB || javaSqlType == NCLOB) {
|
||||||
Clob clob = rs.getClob(i);
|
Clob clob = rs.getClob(i);
|
||||||
if (clob != null) {
|
if (clob != null) {
|
||||||
|
@ -287,6 +291,12 @@ public class JdbcCommon {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rec.put(i - 1, sb.toString());
|
rec.put(i - 1, sb.toString());
|
||||||
|
try {
|
||||||
|
clob.free();
|
||||||
|
} catch (SQLFeatureNotSupportedException sfnse) {
|
||||||
|
// The driver doesn't support free, but allow processing to continue
|
||||||
|
logger.debug("Database Driver does not support freeing clob objects");
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
rec.put(i - 1, null);
|
rec.put(i - 1, null);
|
||||||
}
|
}
|
||||||
|
@ -312,6 +322,7 @@ public class JdbcCommon {
|
||||||
blob.free();
|
blob.free();
|
||||||
} catch (SQLFeatureNotSupportedException sfnse) {
|
} catch (SQLFeatureNotSupportedException sfnse) {
|
||||||
// The driver doesn't support free, but allow processing to continue
|
// The driver doesn't support free, but allow processing to continue
|
||||||
|
logger.debug("Database Driver does not support freeing blob objects");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
rec.put(i - 1, null);
|
rec.put(i - 1, null);
|
||||||
|
|
|
@ -22,6 +22,8 @@ import java.io.CharArrayReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.Reader;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.math.BigInteger;
|
import java.math.BigInteger;
|
||||||
|
@ -30,6 +32,7 @@ import java.nio.ByteBuffer;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.sql.Blob;
|
import java.sql.Blob;
|
||||||
|
import java.sql.Clob;
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.DriverManager;
|
import java.sql.DriverManager;
|
||||||
import java.sql.PreparedStatement;
|
import java.sql.PreparedStatement;
|
||||||
|
@ -633,6 +636,39 @@ public class TestJdbcCommon {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertToAvroStreamForClob_FreeNotSupported() throws SQLException, IOException {
|
||||||
|
final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
|
||||||
|
when(metadata.getColumnCount()).thenReturn(1);
|
||||||
|
when(metadata.getColumnType(1)).thenReturn(Types.CLOB);
|
||||||
|
when(metadata.getColumnName(1)).thenReturn("t_clob");
|
||||||
|
when(metadata.getTableName(1)).thenReturn("table");
|
||||||
|
|
||||||
|
final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
|
||||||
|
|
||||||
|
final byte[] byteBuffer = "test clob".getBytes(StandardCharsets.UTF_8);
|
||||||
|
final Reader reader = new InputStreamReader(new ByteArrayInputStream(byteBuffer));
|
||||||
|
|
||||||
|
Clob clob = mock(Clob.class);
|
||||||
|
when(clob.getCharacterStream()).thenReturn(reader);
|
||||||
|
when(clob.length()).thenReturn((long) byteBuffer.length);
|
||||||
|
doThrow(SQLFeatureNotSupportedException.class).when(clob).free();
|
||||||
|
when(rs.getClob(Mockito.anyInt())).thenReturn(clob);
|
||||||
|
|
||||||
|
final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs);
|
||||||
|
|
||||||
|
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
|
||||||
|
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
|
||||||
|
GenericRecord record = null;
|
||||||
|
while (dataFileReader.hasNext()) {
|
||||||
|
record = dataFileReader.next(record);
|
||||||
|
Object o = record.get("t_clob");
|
||||||
|
assertTrue(o instanceof Utf8);
|
||||||
|
assertEquals("test clob", o.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConvertToAvroStreamForBlob_FreeNotSupported() throws SQLException, IOException {
|
public void testConvertToAvroStreamForBlob_FreeNotSupported() throws SQLException, IOException {
|
||||||
final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
|
final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
|
||||||
|
|
Loading…
Reference in New Issue