diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index 2eb9cade3b..ca92ad4827 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -26,6 +26,7 @@ import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Types; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -38,11 +39,11 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -478,10 +479,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { final Integer colSize = desc.getColumnSize(); final JsonNode fieldNode = rootNode.get(fieldName); if (!fieldNode.isNull()) { - String fieldValue = fieldNode.asText(); - if (colSize != null && fieldValue.length() > colSize) { - fieldValue = fieldValue.substring(0, colSize); - } + String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType); attributes.put("sql.args." + fieldCount + ".value", fieldValue); } } @@ -505,6 +503,61 @@ public class ConvertJSONToSQL extends AbstractProcessor { return sqlBuilder.toString(); } + /** + * Try to create correct SQL String representation of value. + * + */ + protected static String createSqlStringValue(final JsonNode fieldNode, final Integer colSize, final int sqlType) { + String fieldValue = fieldNode.asText(); + + switch (sqlType) { + + // only "true" is considered true, everything else is false + case Types.BOOLEAN: + switch (fieldValue==null?"":fieldValue) { + case "true": + fieldValue = "true"; + break; + + default: + fieldValue = "false"; + break; + } + break; + + // Don't truncate numeric types. + // Should we check value is indeed number and throw error if not? + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + case Types.REAL: + case Types.FLOAT: + case Types.DOUBLE: + case Types.DECIMAL: + case Types.NUMERIC: + break; + + // Don't truncate date and time types. + // Should we check value is indeed correct date and/or time and throw error if not? + // We assume date and time is already correct, but because ConvertJSONToSQL is often used together with PutSQL + // maybe we should assure PutSQL correctly understands date and time values. + // Currently PutSQL expect Long numeric values. But JSON usually uses ISO 8601, for example: 2012-04-23T18:25:43.511Z for dates. + case Types.DATE: + case Types.TIME: + case Types.TIMESTAMP: + break; + + default: + if (colSize != null && fieldValue.length() > colSize) { + fieldValue = fieldValue.substring(0, colSize); + } + break; + } + + return fieldValue; + } + private String generateUpdate(final JsonNode rootNode, final Map attributes, final String tableName, final String updateKeys, final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) { @@ -599,10 +652,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { final JsonNode fieldNode = rootNode.get(fieldName); if (!fieldNode.isNull()) { - String fieldValue = rootNode.get(fieldName).asText(); - if (colSize != null && fieldValue.length() > colSize) { - fieldValue = fieldValue.substring(0, colSize); - } + String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType); attributes.put("sql.args." + fieldCount + ".value", fieldValue); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java index bc9d7f9cc2..3f9c52cb12 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java @@ -16,14 +16,21 @@ */ package org.apache.nifi.processors.standard; +import static org.junit.Assert.assertEquals; + import java.io.File; import java.io.IOException; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; @@ -32,8 +39,12 @@ import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; import org.junit.BeforeClass; -import org.junit.Rule; +import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -42,30 +53,36 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID public class TestConvertJSONToSQL { static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; + static String createDifferentTypes = "CREATE TABLE DIFTYPES (id integer primary key, b boolean, f float, dbl double, dcml decimal, d date)"; - @Rule - public TemporaryFolder folder = new TemporaryFolder(); + @ClassRule + public static TemporaryFolder folder = new TemporaryFolder(); + + /** + * Setting up Connection pooling is expensive operation. + * So let's do this only once and reuse MockDBCPService in each test. + */ + static protected DBCPService service; @BeforeClass - public static void setup() { + public static void setupClass() throws ProcessException, SQLException { System.setProperty("derby.stream.error.file", "target/derby.log"); - } - - @Test - public void testInsert() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); final File tempDir = folder.getRoot(); final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - + service = new MockDBCPService(dbDir.getAbsolutePath()); try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { stmt.executeUpdate(createPersons); } } + } + @Test + public void testInsert() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); @@ -163,18 +180,9 @@ public class TestConvertJSONToSQL { @Test public void testInsertWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); @@ -199,18 +207,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -272,18 +271,9 @@ public class TestConvertJSONToSQL { @Test public void testMultipleInserts() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); @@ -343,18 +333,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -414,18 +395,9 @@ public class TestConvertJSONToSQL { @Test public void testUnmappedFieldBehavior() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); @@ -450,18 +422,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -523,18 +486,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -561,18 +515,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -586,18 +531,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -611,18 +547,9 @@ public class TestConvertJSONToSQL { @Test public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); @@ -636,20 +563,17 @@ public class TestConvertJSONToSQL { @Test public void testInsertWithMissingColumnFail() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)"); + stmt.executeUpdate("CREATE TABLE PERSONS3 (id integer, name varchar(100), code integer, generated_key integer primary key)"); } } + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS3"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_COLUMN); runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); @@ -661,20 +585,17 @@ public class TestConvertJSONToSQL { @Test public void testInsertWithMissingColumnWarning() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)"); + stmt.executeUpdate("CREATE TABLE PERSONS2 (id integer, name varchar(100), code integer, generated_key integer primary key)"); } } + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS2"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.WARNING_UNMATCHED_COLUMN); runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); @@ -691,24 +612,15 @@ public class TestConvertJSONToSQL { out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); out.assertAttributeEquals("sql.args.3.value", "48"); - out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + out.assertContentEquals("INSERT INTO PERSONS2 (ID, NAME, CODE) VALUES (?, ?, ?)"); } // End testInsertWithMissingColumnWarning() @Test public void testInsertWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)"); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); @@ -733,18 +645,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateWithMissingColumnFail() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -759,18 +662,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateWithMissingColumnWarning() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -797,18 +691,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -832,6 +717,81 @@ public class TestConvertJSONToSQL { } // End testUpdateWithMissingColumnIgnore() + /** + * Test create correct SQL String representation of value. + * Use PutSQL processor to verify converted value can be used and don't fail. + */ + @Test + public void testCreateSqlStringValue() throws ProcessException, SQLException, JsonGenerationException, JsonMappingException, IOException, InitializationException { + final TestRunner putSqlRunner = TestRunners.newTestRunner(PutSQL.class); + + final AtomicInteger id = new AtomicInteger(20); + + putSqlRunner.addControllerService("dbcp", service); + putSqlRunner.enableControllerService(service); + putSqlRunner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + putSqlRunner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + String tableName = "DIFTYPES"; + ObjectMapper mapper = new ObjectMapper(); + ResultSet colrs = null; + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createDifferentTypes); + } + colrs = conn.getMetaData().getColumns(null, null, tableName, "%"); + while (colrs.next()) { + final int sqlType = colrs.getInt("DATA_TYPE"); + final int colSize = colrs.getInt("COLUMN_SIZE"); + switch (sqlType) { + case Types.BOOLEAN: + String json = mapper.writeValueAsString("true"); + JsonNode fieldNode = mapper.readTree(json); + String booleanString = ConvertJSONToSQL.createSqlStringValue(fieldNode, colSize, sqlType); + assertEquals("true",booleanString); + + Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(sqlType)); + attributes.put("sql.args.1.value", booleanString); + + byte[] data = ("INSERT INTO DIFTYPES (ID, B) VALUES (" + id.incrementAndGet() + ", ?)").getBytes(); + putSqlRunner.enqueue(data, attributes); + putSqlRunner.run(); + List failed = putSqlRunner.getFlowFilesForRelationship(PutSQL.REL_FAILURE); + putSqlRunner.assertTransferCount(PutSQL.REL_SUCCESS, 1); + putSqlRunner.assertTransferCount(PutSQL.REL_FAILURE, 0); + putSqlRunner.clearTransferState(); + break; + + case Types.FLOAT: + case Types.DOUBLE: + json = mapper.writeValueAsString("78895654.6575"); + fieldNode = mapper.readTree(json); + String numberString = ConvertJSONToSQL.createSqlStringValue(fieldNode, colSize, sqlType); + assertEquals("78895654.6575",numberString); + + attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(sqlType)); + attributes.put("sql.args.1.value", numberString); + + data = ("INSERT INTO DIFTYPES (ID, dbl) VALUES (" + id.incrementAndGet() + ", ?)").getBytes(); + putSqlRunner.enqueue(data, attributes); + putSqlRunner.run(); + failed = putSqlRunner.getFlowFilesForRelationship(PutSQL.REL_FAILURE); + putSqlRunner.assertTransferCount(PutSQL.REL_SUCCESS, 1); + putSqlRunner.assertTransferCount(PutSQL.REL_FAILURE, 0); + putSqlRunner.clearTransferState(); + break; + + default: + break; + } + + } + + } + + } @Test public void testDelete() throws InitializationException, ProcessException, SQLException, IOException {