NIFI-6062: Add support for BLOB, CLOB, NCLOB in record handling

This closes #3329

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Matthew Burgess 2019-02-22 11:44:18 -05:00 committed by Mike Thomsen
parent e5fa18d63c
commit 32bd7ed8b4
3 changed files with 173 additions and 6 deletions

View File

@ -31,9 +31,13 @@ import org.apache.nifi.serialization.record.type.RecordDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
@ -551,7 +555,28 @@ public class DataTypeUtils {
return list.toArray();
}
throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName);
try {
if (value instanceof Blob) {
Blob blob = (Blob) value;
long rawBlobLength = blob.length();
if(rawBlobLength > Integer.MAX_VALUE) {
throw new IllegalTypeConversionException("Value of type " + value.getClass() + " too large to convert to Object Array for field " + fieldName);
}
int blobLength = (int) rawBlobLength;
byte[] src = blob.getBytes(1, blobLength);
Byte[] dest = new Byte[blobLength];
for (int i = 0; i < src.length; i++) {
dest[i] = src[i];
}
return dest;
} else {
throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName);
}
} catch (IllegalTypeConversionException itce) {
throw itce;
} catch (Exception e) {
throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName, e);
}
}
public static boolean isArrayTypeCompatible(final Object value, final DataType elementDataType) {
@ -746,6 +771,20 @@ public class DataTypeUtils {
return ""; // Empty array = empty string
}
}
if (value instanceof Clob) {
Clob clob = (Clob) value;
StringBuilder sb = new StringBuilder();
char[] buffer = new char[32 * 1024]; // 32K default buffer
try (Reader reader = clob.getCharacterStream()) {
int charsRead;
while ((charsRead = reader.read(buffer)) != -1) {
sb.append(buffer, 0, charsRead);
}
return sb.toString();
} catch (Exception e) {
throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + value.getClass() + " to a valid String", e);
}
}
return value.toString();
}
@ -788,6 +827,34 @@ public class DataTypeUtils {
if (value instanceof java.util.Date) {
return getDateFormat(format).format((java.util.Date) value);
}
if (value instanceof Blob) {
Blob blob = (Blob) value;
StringBuilder sb = new StringBuilder();
byte[] buffer = new byte[32 * 1024]; // 32K default buffer
try (InputStream inStream = blob.getBinaryStream()) {
int bytesRead;
while ((bytesRead = inStream.read(buffer)) != -1) {
sb.append(new String(buffer, charset), 0, bytesRead);
}
return sb.toString();
} catch (Exception e) {
throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + value.getClass() + " to a valid String", e);
}
}
if (value instanceof Clob) {
Clob clob = (Clob) value;
StringBuilder sb = new StringBuilder();
char[] buffer = new char[32 * 1024]; // 32K default buffer
try (Reader reader = clob.getCharacterStream()) {
int charsRead;
while ((charsRead = reader.read(buffer)) != -1) {
sb.append(buffer, 0, charsRead);
}
return sb.toString();
} catch (Exception e) {
throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + value.getClass() + " to a valid String", e);
}
}
if (value instanceof Object[]) {
return Arrays.toString((Object[]) value);

View File

@ -30,6 +30,7 @@ import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.util.Utf8;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.serialization.SimpleRecordSchema;
@ -55,6 +56,7 @@ import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Blob;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Duration;
@ -726,8 +728,18 @@ public class AvroTypeUtil {
}
if (rawValue instanceof Object[]) {
return AvroTypeUtil.convertByteArray((Object[]) rawValue);
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
}
try {
if (rawValue instanceof Blob) {
Blob blob = (Blob) rawValue;
return ByteBuffer.wrap(IOUtils.toByteArray(blob.getBinaryStream()));
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
}
} catch (IllegalTypeConversionException itce) {
throw itce;
} catch (Exception e) {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer", e);
}
case MAP:
if (rawValue instanceof Record) {

View File

@ -16,6 +16,13 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.nifi.avro.AvroRecordSetWriter;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -24,6 +31,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -35,8 +43,10 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@ -47,7 +57,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
@ -102,7 +115,7 @@ public class TestExecuteSQLRecord {
@Before
public void setup() throws InitializationException {
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final DBCPService dbcp = new DBCPServiceSimpleImpl("derby");
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(ExecuteSQLRecord.class);
@ -239,6 +252,69 @@ public class TestExecuteSQLRecord {
runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "0");
}
@Test
public void testWriteLOBsToAvro() throws Exception {
final DBCPService dbcp = new DBCPServiceSimpleImpl("h2");
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(ExecuteSQLRecord.class);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(AbstractExecuteSQL.DBCP_SERVICE, "dbcp");
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NULL_INT");
} catch (final SQLException sqle) {
}
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, image blob(1K), words clob(1K), "
+ "natwords nclob(1K), constraint my_pk primary key (id))");
stmt.execute("insert into TEST_NULL_INT (id, val1, val2, image, words, natwords) VALUES (0, NULL, 1, CAST (X'DEADBEEF' AS BLOB), "
+ "CAST ('Hello World' AS CLOB), CAST ('I am an NCLOB' AS NCLOB))");
runner.setIncomingConnection(false);
runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
AvroRecordSetWriter recordWriter = new AvroRecordSetWriter();
runner.addControllerService("writer", recordWriter);
runner.setProperty(recordWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
runner.enableControllerService(recordWriter);
runner.run();
runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "1");
ByteArrayInputStream bais = new ByteArrayInputStream(flowFile.toByteArray());
final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(bais, new GenericDatumReader<>());
final Schema avroSchema = dataFileStream.getSchema();
GenericData.setStringType(avroSchema, GenericData.StringType.String);
final GenericRecord avroRecord = dataFileStream.next();
Object imageObj = avroRecord.get("IMAGE");
assertNotNull(imageObj);
assertTrue(imageObj instanceof ByteBuffer);
assertArrayEquals(new byte[]{(byte) 0xDE, (byte) 0xAD, (byte) 0xBE, (byte) 0xEF}, ((ByteBuffer) imageObj).array());
Object wordsObj = avroRecord.get("WORDS");
assertNotNull(wordsObj);
assertTrue(wordsObj instanceof Utf8);
assertEquals("Hello World", wordsObj.toString());
Object natwordsObj = avroRecord.get("NATWORDS");
assertNotNull(natwordsObj);
assertTrue(natwordsObj instanceof Utf8);
assertEquals("I am an NCLOB", natwordsObj.toString());
}
@Test
public void testNoRowsStatementCreatesEmptyFlowFile() throws Exception {
// remove previous test database, if any
@ -531,6 +607,13 @@ public class TestExecuteSQLRecord {
*/
class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
private final String type;
public DBCPServiceSimpleImpl(String type) {
this.type = type;
}
@Override
public String getIdentifier() {
return "dbcp";
@ -539,8 +622,13 @@ public class TestExecuteSQLRecord {
@Override
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
final Connection con;
if ("h2".equalsIgnoreCase(type)) {
con = DriverManager.getConnection("jdbc:h2:file:" + "./target/testdb7");
} else {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
}
return con;
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);