NIFI-1841: Fixed support for CLOB/BLOB types

This closes #1035.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Matt Burgess 2016-09-20 11:35:53 -04:00 committed by Bryan Bende
parent 52fa50cdeb
commit 68291636cb
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
2 changed files with 156 additions and 8 deletions

View File

@ -45,10 +45,13 @@ import static java.sql.Types.VARBINARY;
import static java.sql.Types.VARCHAR;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@ -102,12 +105,55 @@ public class JdbcCommon {
}
for (int i = 1; i <= nrOfColumns; i++) {
final int javaSqlType = meta.getColumnType(i);
// Need to handle CLOB and BLOB before getObject() is called, due to ResultSet's maximum portability statement
if (javaSqlType == CLOB) {
Clob clob = rs.getClob(i);
if (clob != null) {
long numChars = clob.length();
char[] buffer = new char[(int) numChars];
InputStream is = clob.getAsciiStream();
int index = 0;
int c = is.read();
while (c > 0) {
buffer[index++] = (char) c;
c = is.read();
}
rec.put(i - 1, new String(buffer));
clob.free();
} else {
rec.put(i - 1, null);
}
continue;
}
if (javaSqlType == BLOB) {
Blob blob = rs.getBlob(i);
if (blob != null) {
long numChars = blob.length();
byte[] buffer = new byte[(int) numChars];
InputStream is = blob.getBinaryStream();
int index = 0;
int c = is.read();
while (c > 0) {
buffer[index++] = (byte) c;
c = is.read();
}
ByteBuffer bb = ByteBuffer.wrap(buffer);
rec.put(i - 1, bb);
blob.free();
} else {
rec.put(i - 1, null);
}
continue;
}
final Object value = rs.getObject(i);
if (value == null) {
rec.put(i - 1, null);
} else if (javaSqlType == BINARY || javaSqlType == VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY || javaSqlType == BLOB || javaSqlType == CLOB) {
} else if (javaSqlType == BINARY || javaSqlType == VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY) {
// bytes requires little bit different handling
byte[] bytes = rs.getBytes(i);
ByteBuffer bb = ByteBuffer.wrap(bytes);
@ -211,6 +257,7 @@ public class JdbcCommon {
case NCHAR:
case NVARCHAR:
case VARCHAR:
case CLOB:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;
@ -277,7 +324,6 @@ public class JdbcCommon {
case LONGVARBINARY:
case ARRAY:
case BLOB:
case CLOB:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
break;

View File

@ -19,17 +19,22 @@ package org.apache.nifi.processors.standard.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.CharArrayReader;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@ -38,12 +43,14 @@ import java.sql.Types;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.commons.io.input.ReaderInputStream;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@ -160,10 +167,8 @@ public class TestJdbcCommon {
System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
st.close();
con.close();
// Deserialize bytes to records
final InputStream instream = new ByteArrayInputStream(serializedBytes);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
@ -243,8 +248,8 @@ public class TestJdbcCommon {
}
}
Assert.assertTrue(foundIntSchema);
Assert.assertTrue(foundNullSchema);
assertTrue(foundIntSchema);
assertTrue(foundNullSchema);
}
@Test
@ -277,8 +282,8 @@ public class TestJdbcCommon {
}
}
Assert.assertTrue(foundLongSchema);
Assert.assertTrue(foundNullSchema);
assertTrue(foundLongSchema);
assertTrue(foundNullSchema);
}
@ -323,6 +328,103 @@ public class TestJdbcCommon {
}
}
@Test
public void testClob() throws Exception {
try (final Statement stmt = con.createStatement()) {
stmt.executeUpdate("CREATE TABLE clobtest (id INT, text CLOB(64 K))");
stmt.execute("INSERT INTO blobtest VALUES (41, NULL)");
PreparedStatement ps = con.prepareStatement("INSERT INTO clobtest VALUES (?, ?)");
ps.setInt(1, 42);
final char[] buffer = new char[4002];
IntStream.range(0, 4002).forEach((i) -> buffer[i] = String.valueOf(i % 10).charAt(0));
ReaderInputStream isr = new ReaderInputStream(new CharArrayReader(buffer), Charset.defaultCharset());
// - set the value of the input parameter to the input stream
ps.setAsciiStream(2, isr, 4002);
ps.execute();
isr.close();
final ResultSet resultSet = stmt.executeQuery("select * from clobtest");
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
JdbcCommon.convertToAvroStream(resultSet, outStream, false);
final byte[] serializedBytes = outStream.toByteArray();
assertNotNull(serializedBytes);
// Deserialize bytes to records
final InputStream instream = new ByteArrayInputStream(serializedBytes);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
GenericRecord record = null;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
record = dataFileReader.next(record);
Integer id = (Integer) record.get("ID");
Object o = record.get("TEXT");
if (id == 41) {
assertNull(o);
} else {
assertNotNull(o);
assertEquals(4002, o.toString().length());
}
}
}
}
}
@Test
public void testBlob() throws Exception {
try (final Statement stmt = con.createStatement()) {
stmt.executeUpdate("CREATE TABLE blobtest (id INT, b BLOB(64 K))");
stmt.execute("INSERT INTO blobtest VALUES (41, NULL)");
PreparedStatement ps = con.prepareStatement("INSERT INTO blobtest VALUES (?, ?)");
ps.setInt(1, 42);
final byte[] buffer = new byte[4002];
IntStream.range(0, 4002).forEach((i) -> buffer[i] = (byte) ((i % 10) + 65));
ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
// - set the value of the input parameter to the input stream
ps.setBlob(2, bais, 4002);
ps.execute();
bais.close();
final ResultSet resultSet = stmt.executeQuery("select * from blobtest");
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
JdbcCommon.convertToAvroStream(resultSet, outStream, false);
final byte[] serializedBytes = outStream.toByteArray();
assertNotNull(serializedBytes);
// Deserialize bytes to records
final InputStream instream = new ByteArrayInputStream(serializedBytes);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
GenericRecord record = null;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
record = dataFileReader.next(record);
Integer id = (Integer) record.get("ID");
Object o = record.get("B");
if (id == 41) {
assertNull(o);
} else {
assertNotNull(o);
assertTrue(o instanceof ByteBuffer);
assertEquals(4002, ((ByteBuffer) o).array().length);
}
}
}
}
}
// many test use Derby as database, so ensure driver is available
@Test