From 32bd7ed8b42ca76f2fb9ca65046989368c6459ac Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Fri, 22 Feb 2019 11:44:18 -0500 Subject: [PATCH] NIFI-6062: Add support for BLOB, CLOB, NCLOB in record handling This closes #3329 Signed-off-by: Mike Thomsen --- .../record/util/DataTypeUtils.java | 69 +++++++++++++- .../org/apache/nifi/avro/AvroTypeUtil.java | 16 +++- .../standard/TestExecuteSQLRecord.java | 94 ++++++++++++++++++- 3 files changed, 173 insertions(+), 6 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index d6e4878668..a399f6724a 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -31,9 +31,13 @@ import org.apache.nifi.serialization.record.type.RecordDataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.InputStream; +import java.io.Reader; import java.math.BigInteger; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.sql.Blob; +import java.sql.Clob; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -551,7 +555,28 @@ public class DataTypeUtils { return list.toArray(); } - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName); + try { + if (value instanceof Blob) { + Blob blob = (Blob) value; + long rawBlobLength = blob.length(); + if(rawBlobLength > Integer.MAX_VALUE) { + throw new IllegalTypeConversionException("Value of type " + value.getClass() + " too large to convert to Object Array for field " + fieldName); + } + int blobLength = (int) rawBlobLength; + byte[] src = blob.getBytes(1, blobLength); + Byte[] dest = new Byte[blobLength]; + for (int i = 0; i < src.length; i++) { + dest[i] = src[i]; + } + return dest; + } else { + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName); + } + } catch (IllegalTypeConversionException itce) { + throw itce; + } catch (Exception e) { + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName, e); + } } public static boolean isArrayTypeCompatible(final Object value, final DataType elementDataType) { @@ -746,6 +771,20 @@ public class DataTypeUtils { return ""; // Empty array = empty string } } + if (value instanceof Clob) { + Clob clob = (Clob) value; + StringBuilder sb = new StringBuilder(); + char[] buffer = new char[32 * 1024]; // 32K default buffer + try (Reader reader = clob.getCharacterStream()) { + int charsRead; + while ((charsRead = reader.read(buffer)) != -1) { + sb.append(buffer, 0, charsRead); + } + return sb.toString(); + } catch (Exception e) { + throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + value.getClass() + " to a valid String", e); + } + } return value.toString(); } @@ -788,6 +827,34 @@ public class DataTypeUtils { if (value instanceof java.util.Date) { return getDateFormat(format).format((java.util.Date) value); } + if (value instanceof Blob) { + Blob blob = (Blob) value; + StringBuilder sb = new StringBuilder(); + byte[] buffer = new byte[32 * 1024]; // 32K default buffer + try (InputStream inStream = blob.getBinaryStream()) { + int bytesRead; + while ((bytesRead = inStream.read(buffer)) != -1) { + sb.append(new String(buffer, charset), 0, bytesRead); + } + return sb.toString(); + } catch (Exception e) { + throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + value.getClass() + " to a valid String", e); + } + } + if (value instanceof Clob) { + Clob clob = (Clob) value; + StringBuilder sb = new StringBuilder(); + char[] buffer = new char[32 * 1024]; // 32K default buffer + try (Reader reader = clob.getCharacterStream()) { + int charsRead; + while ((charsRead = reader.read(buffer)) != -1) { + sb.append(buffer, 0, charsRead); + } + return sb.toString(); + } catch (Exception e) { + throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + value.getClass() + " to a valid String", e); + } + } if (value instanceof Object[]) { return Arrays.toString((Object[]) value); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index 4b13226027..097844a31c 100755 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -30,6 +30,7 @@ import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.avro.specific.SpecificRecord; import org.apache.avro.util.Utf8; +import org.apache.commons.compress.utils.IOUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.nifi.serialization.SimpleRecordSchema; @@ -55,6 +56,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.sql.Blob; import java.sql.Time; import java.sql.Timestamp; import java.time.Duration; @@ -726,8 +728,18 @@ public class AvroTypeUtil { } if (rawValue instanceof Object[]) { return AvroTypeUtil.convertByteArray((Object[]) rawValue); - } else { - throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer"); + } + try { + if (rawValue instanceof Blob) { + Blob blob = (Blob) rawValue; + return ByteBuffer.wrap(IOUtils.toByteArray(blob.getBinaryStream())); + } else { + throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer"); + } + } catch (IllegalTypeConversionException itce) { + throw itce; + } catch (Exception e) { + throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer", e); } case MAP: if (rawValue instanceof Record) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java index b0f5cda217..93997d1969 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java @@ -16,6 +16,13 @@ */ package org.apache.nifi.processors.standard; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.nifi.avro.AvroRecordSetWriter; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -24,6 +31,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.standard.util.TestJdbcHugeStream; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -35,8 +43,10 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -47,7 +57,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; @@ -102,7 +115,7 @@ public class TestExecuteSQLRecord { @Before public void setup() throws InitializationException { - final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final DBCPService dbcp = new DBCPServiceSimpleImpl("derby"); final Map dbcpProperties = new HashMap<>(); runner = TestRunners.newTestRunner(ExecuteSQLRecord.class); @@ -239,6 +252,69 @@ public class TestExecuteSQLRecord { runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "0"); } + @Test + public void testWriteLOBsToAvro() throws Exception { + final DBCPService dbcp = new DBCPServiceSimpleImpl("h2"); + final Map dbcpProperties = new HashMap<>(); + + runner = TestRunners.newTestRunner(ExecuteSQLRecord.class); + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(AbstractExecuteSQL.DBCP_SERVICE, "dbcp"); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, image blob(1K), words clob(1K), " + + "natwords nclob(1K), constraint my_pk primary key (id))"); + stmt.execute("insert into TEST_NULL_INT (id, val1, val2, image, words, natwords) VALUES (0, NULL, 1, CAST (X'DEADBEEF' AS BLOB), " + + "CAST ('Hello World' AS CLOB), CAST ('I am an NCLOB' AS NCLOB))"); + + runner.setIncomingConnection(false); + runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); + AvroRecordSetWriter recordWriter = new AvroRecordSetWriter(); + runner.addControllerService("writer", recordWriter); + runner.setProperty(recordWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); + runner.enableControllerService(recordWriter); + runner.run(); + + runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "1"); + + ByteArrayInputStream bais = new ByteArrayInputStream(flowFile.toByteArray()); + final DataFileStream dataFileStream = new DataFileStream<>(bais, new GenericDatumReader<>()); + final Schema avroSchema = dataFileStream.getSchema(); + GenericData.setStringType(avroSchema, GenericData.StringType.String); + final GenericRecord avroRecord = dataFileStream.next(); + + Object imageObj = avroRecord.get("IMAGE"); + assertNotNull(imageObj); + assertTrue(imageObj instanceof ByteBuffer); + assertArrayEquals(new byte[]{(byte) 0xDE, (byte) 0xAD, (byte) 0xBE, (byte) 0xEF}, ((ByteBuffer) imageObj).array()); + + Object wordsObj = avroRecord.get("WORDS"); + assertNotNull(wordsObj); + assertTrue(wordsObj instanceof Utf8); + assertEquals("Hello World", wordsObj.toString()); + + Object natwordsObj = avroRecord.get("NATWORDS"); + assertNotNull(natwordsObj); + assertTrue(natwordsObj instanceof Utf8); + assertEquals("I am an NCLOB", natwordsObj.toString()); + } + @Test public void testNoRowsStatementCreatesEmptyFlowFile() throws Exception { // remove previous test database, if any @@ -531,6 +607,13 @@ public class TestExecuteSQLRecord { */ class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { + private final String type; + + public DBCPServiceSimpleImpl(String type) { + this.type = type; + + } + @Override public String getIdentifier() { return "dbcp"; @@ -539,8 +622,13 @@ public class TestExecuteSQLRecord { @Override public Connection getConnection() throws ProcessException { try { - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + final Connection con; + if ("h2".equalsIgnoreCase(type)) { + con = DriverManager.getConnection("jdbc:h2:file:" + "./target/testdb7"); + } else { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + } return con; } catch (final Exception e) { throw new ProcessException("getConnection failed: " + e);