From 68291636cb3e823013c430ccbffbc449d178b89d Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Tue, 20 Sep 2016 11:35:53 -0400 Subject: [PATCH] NIFI-1841: Fixed support for CLOB/BLOB types This closes #1035. Signed-off-by: Bryan Bende --- .../processors/standard/util/JdbcCommon.java | 50 +++++++- .../standard/util/TestJdbcCommon.java | 114 +++++++++++++++++- 2 files changed, 156 insertions(+), 8 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index 11ba141b1e..b591d01f21 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -45,10 +45,13 @@ import static java.sql.Types.VARBINARY; import static java.sql.Types.VARCHAR; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.sql.Blob; +import java.sql.Clob; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -102,12 +105,55 @@ public class JdbcCommon { } for (int i = 1; i <= nrOfColumns; i++) { final int javaSqlType = meta.getColumnType(i); + + // Need to handle CLOB and BLOB before getObject() is called, due to ResultSet's maximum portability statement + if (javaSqlType == CLOB) { + Clob clob = rs.getClob(i); + if (clob != null) { + long numChars = clob.length(); + char[] buffer = new char[(int) numChars]; + InputStream is = clob.getAsciiStream(); + int index = 0; + int c = is.read(); + while (c > 0) { + buffer[index++] = (char) c; + c = is.read(); + } + rec.put(i - 1, new String(buffer)); + clob.free(); + } else { + rec.put(i - 1, null); + } + continue; + } + + if (javaSqlType == BLOB) { + Blob blob = rs.getBlob(i); + if (blob != null) { + long numChars = blob.length(); + byte[] buffer = new byte[(int) numChars]; + InputStream is = blob.getBinaryStream(); + int index = 0; + int c = is.read(); + while (c > 0) { + buffer[index++] = (byte) c; + c = is.read(); + } + ByteBuffer bb = ByteBuffer.wrap(buffer); + rec.put(i - 1, bb); + blob.free(); + } else { + rec.put(i - 1, null); + } + continue; + } + final Object value = rs.getObject(i); if (value == null) { rec.put(i - 1, null); - } else if (javaSqlType == BINARY || javaSqlType == VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY || javaSqlType == BLOB || javaSqlType == CLOB) { + } else if (javaSqlType == BINARY || javaSqlType == VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY) { // bytes requires little bit different handling byte[] bytes = rs.getBytes(i); ByteBuffer bb = ByteBuffer.wrap(bytes); @@ -211,6 +257,7 @@ public class JdbcCommon { case NCHAR: case NVARCHAR: case VARCHAR: + case CLOB: builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); break; @@ -277,7 +324,6 @@ public class JdbcCommon { case LONGVARBINARY: case ARRAY: case BLOB: - case CLOB: builder.name(columnName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault(); break; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java index b66c1782a8..dd375aa7f7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java @@ -19,17 +19,22 @@ package org.apache.nifi.processors.standard.util; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.CharArrayReader; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -38,12 +43,14 @@ import java.sql.Types; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; +import org.apache.commons.io.input.ReaderInputStream; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -160,10 +167,8 @@ public class TestJdbcCommon { System.out.println("Avro serialized result size in bytes: " + serializedBytes.length); st.close(); - con.close(); // Deserialize bytes to records - final InputStream instream = new ByteArrayInputStream(serializedBytes); final DatumReader datumReader = new GenericDatumReader<>(); @@ -243,8 +248,8 @@ public class TestJdbcCommon { } } - Assert.assertTrue(foundIntSchema); - Assert.assertTrue(foundNullSchema); + assertTrue(foundIntSchema); + assertTrue(foundNullSchema); } @Test @@ -277,8 +282,8 @@ public class TestJdbcCommon { } } - Assert.assertTrue(foundLongSchema); - Assert.assertTrue(foundNullSchema); + assertTrue(foundLongSchema); + assertTrue(foundNullSchema); } @@ -323,6 +328,103 @@ public class TestJdbcCommon { } } + @Test + public void testClob() throws Exception { + try (final Statement stmt = con.createStatement()) { + stmt.executeUpdate("CREATE TABLE clobtest (id INT, text CLOB(64 K))"); + stmt.execute("INSERT INTO blobtest VALUES (41, NULL)"); + PreparedStatement ps = con.prepareStatement("INSERT INTO clobtest VALUES (?, ?)"); + ps.setInt(1, 42); + final char[] buffer = new char[4002]; + IntStream.range(0, 4002).forEach((i) -> buffer[i] = String.valueOf(i % 10).charAt(0)); + ReaderInputStream isr = new ReaderInputStream(new CharArrayReader(buffer), Charset.defaultCharset()); + + // - set the value of the input parameter to the input stream + ps.setAsciiStream(2, isr, 4002); + ps.execute(); + isr.close(); + + final ResultSet resultSet = stmt.executeQuery("select * from clobtest"); + + final ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + JdbcCommon.convertToAvroStream(resultSet, outStream, false); + + final byte[] serializedBytes = outStream.toByteArray(); + assertNotNull(serializedBytes); + + // Deserialize bytes to records + final InputStream instream = new ByteArrayInputStream(serializedBytes); + + final DatumReader datumReader = new GenericDatumReader<>(); + try (final DataFileStream dataFileReader = new DataFileStream<>(instream, datumReader)) { + GenericRecord record = null; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with + // many items. + record = dataFileReader.next(record); + Integer id = (Integer) record.get("ID"); + Object o = record.get("TEXT"); + if (id == 41) { + assertNull(o); + } else { + assertNotNull(o); + assertEquals(4002, o.toString().length()); + } + } + } + } + } + + @Test + public void testBlob() throws Exception { + try (final Statement stmt = con.createStatement()) { + stmt.executeUpdate("CREATE TABLE blobtest (id INT, b BLOB(64 K))"); + stmt.execute("INSERT INTO blobtest VALUES (41, NULL)"); + PreparedStatement ps = con.prepareStatement("INSERT INTO blobtest VALUES (?, ?)"); + ps.setInt(1, 42); + final byte[] buffer = new byte[4002]; + IntStream.range(0, 4002).forEach((i) -> buffer[i] = (byte) ((i % 10) + 65)); + ByteArrayInputStream bais = new ByteArrayInputStream(buffer); + + // - set the value of the input parameter to the input stream + ps.setBlob(2, bais, 4002); + ps.execute(); + bais.close(); + + final ResultSet resultSet = stmt.executeQuery("select * from blobtest"); + + final ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + JdbcCommon.convertToAvroStream(resultSet, outStream, false); + + final byte[] serializedBytes = outStream.toByteArray(); + assertNotNull(serializedBytes); + + // Deserialize bytes to records + final InputStream instream = new ByteArrayInputStream(serializedBytes); + + final DatumReader datumReader = new GenericDatumReader<>(); + try (final DataFileStream dataFileReader = new DataFileStream<>(instream, datumReader)) { + GenericRecord record = null; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with + // many items. + record = dataFileReader.next(record); + Integer id = (Integer) record.get("ID"); + Object o = record.get("B"); + if (id == 41) { + assertNull(o); + } else { + assertNotNull(o); + assertTrue(o instanceof ByteBuffer); + assertEquals(4002, ((ByteBuffer) o).array().length); + } + } + } + } + } + // many test use Derby as database, so ensure driver is available @Test