From 1e70e24267351a9c189ea26ee0536fdc8dbd7bd6 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Tue, 12 Sep 2017 11:37:12 +0900 Subject: [PATCH] NIFI-4352: Add CLOB and NCLOB support to PutSQL and ExecuteSQL Signed-off-by: Pierre Villard This closes #2145. --- .../nifi/processors/standard/PutSQL.java | 11 ++++++++++ .../processors/standard/util/JdbcCommon.java | 21 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index a3f19324d8..c35377a7e1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -54,6 +54,7 @@ import javax.xml.bind.DatatypeConverter; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.StringReader; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; @@ -929,6 +930,16 @@ public class PutSQL extends AbstractSessionFactoryProcessor { case Types.LONGVARCHAR: stmt.setString(parameterIndex, parameterValue); break; + case Types.CLOB: + try (final StringReader reader = new StringReader(parameterValue)) { + stmt.setCharacterStream(parameterIndex, reader); + } + break; + case Types.NCLOB: + try (final StringReader reader = new StringReader(parameterValue)) { + stmt.setNCharacterStream(parameterIndex, reader); + } + break; default: stmt.setObject(parameterIndex, parameterValue, jdbcType); break; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index 97d5cc1bd7..bd9a74c280 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -33,6 +33,7 @@ import static java.sql.Types.LONGNVARCHAR; import static java.sql.Types.LONGVARBINARY; import static java.sql.Types.LONGVARCHAR; import static java.sql.Types.NCHAR; +import static java.sql.Types.NCLOB; import static java.sql.Types.NUMERIC; import static java.sql.Types.NVARCHAR; import static java.sql.Types.REAL; @@ -47,11 +48,14 @@ import static java.sql.Types.VARCHAR; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.Reader; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.nio.CharBuffer; import java.sql.Blob; import java.sql.Clob; +import java.sql.NClob; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -278,6 +282,22 @@ public class JdbcCommon { continue; } + if (javaSqlType == NCLOB) { + NClob nClob = rs.getNClob(i); + if (nClob != null) { + final Reader characterStream = nClob.getCharacterStream(); + long numChars = (int) nClob.length(); + final CharBuffer buffer = CharBuffer.allocate((int) numChars); + characterStream.read(buffer); + buffer.flip(); + rec.put(i - 1, buffer.toString()); + nClob.free(); + } else { + rec.put(i - 1, null); + } + continue; + } + if (javaSqlType == BLOB) { Blob blob = rs.getBlob(i); if (blob != null) { @@ -454,6 +474,7 @@ public class JdbcCommon { case NVARCHAR: case VARCHAR: case CLOB: + case NCLOB: builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); break;