NIFI-1613 Initial version, try to improve conversion for different SQL types. New test and refactored existing test to reuse DBCP service.

nifi-1613 Adding numeric and Date/time types conversion and test.
This commit is contained in:
Toivo Adams 2016-03-20 21:13:15 +02:00 committed by Matt Burgess
parent b603cb955d
commit 3b2e43b75c
2 changed files with 187 additions and 177 deletions

View File

@ -26,6 +26,7 @@ import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -38,11 +39,11 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@ -478,10 +479,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final Integer colSize = desc.getColumnSize();
final JsonNode fieldNode = rootNode.get(fieldName);
if (!fieldNode.isNull()) {
String fieldValue = fieldNode.asText();
if (colSize != null && fieldValue.length() > colSize) {
fieldValue = fieldValue.substring(0, colSize);
}
String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType);
attributes.put("sql.args." + fieldCount + ".value", fieldValue);
}
}
@ -505,6 +503,61 @@ public class ConvertJSONToSQL extends AbstractProcessor {
return sqlBuilder.toString();
}
/**
* Try to create correct SQL String representation of value.
*
*/
protected static String createSqlStringValue(final JsonNode fieldNode, final Integer colSize, final int sqlType) {
String fieldValue = fieldNode.asText();
switch (sqlType) {
// only "true" is considered true, everything else is false
case Types.BOOLEAN:
switch (fieldValue==null?"":fieldValue) {
case "true":
fieldValue = "true";
break;
default:
fieldValue = "false";
break;
}
break;
// Don't truncate numeric types.
// Should we check value is indeed number and throw error if not?
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
case Types.REAL:
case Types.FLOAT:
case Types.DOUBLE:
case Types.DECIMAL:
case Types.NUMERIC:
break;
// Don't truncate date and time types.
// Should we check value is indeed correct date and/or time and throw error if not?
// We assume date and time is already correct, but because ConvertJSONToSQL is often used together with PutSQL
// maybe we should assure PutSQL correctly understands date and time values.
// Currently PutSQL expect Long numeric values. But JSON usually uses ISO 8601, for example: 2012-04-23T18:25:43.511Z for dates.
case Types.DATE:
case Types.TIME:
case Types.TIMESTAMP:
break;
default:
if (colSize != null && fieldValue.length() > colSize) {
fieldValue = fieldValue.substring(0, colSize);
}
break;
}
return fieldValue;
}
private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) {
@ -599,10 +652,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final JsonNode fieldNode = rootNode.get(fieldName);
if (!fieldNode.isNull()) {
String fieldValue = rootNode.get(fieldName).asText();
if (colSize != null && fieldValue.length() > colSize) {
fieldValue = fieldValue.substring(0, colSize);
}
String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType);
attributes.put("sql.args." + fieldCount + ".value", fieldValue);
}
}

View File

@ -16,14 +16,21 @@
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
@ -32,8 +39,12 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@ -42,30 +53,36 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID
public class TestConvertJSONToSQL {
static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
static String createDifferentTypes = "CREATE TABLE DIFTYPES (id integer primary key, b boolean, f float, dbl double, dcml decimal, d date)";
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@ClassRule
public static TemporaryFolder folder = new TemporaryFolder();
/**
* Setting up Connection pooling is expensive operation.
* So let's do this only once and reuse MockDBCPService in each test.
*/
static protected DBCPService service;
@BeforeClass
public static void setup() {
public static void setupClass() throws ProcessException, SQLException {
System.setProperty("derby.stream.error.file", "target/derby.log");
}
@Test
public void testInsert() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
service = new MockDBCPService(dbDir.getAbsolutePath());
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
}
@Test
public void testInsert() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
@ -163,18 +180,9 @@ public class TestConvertJSONToSQL {
@Test
public void testInsertWithNullValue() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
@ -199,18 +207,9 @@ public class TestConvertJSONToSQL {
@Test
public void testUpdateWithNullValue() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@ -272,18 +271,9 @@ public class TestConvertJSONToSQL {
@Test
public void testMultipleInserts() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
@ -343,18 +333,9 @@ public class TestConvertJSONToSQL {
@Test
public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@ -414,18 +395,9 @@ public class TestConvertJSONToSQL {
@Test
public void testUnmappedFieldBehavior() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
@ -450,18 +422,9 @@ public class TestConvertJSONToSQL {
@Test
public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@ -523,18 +486,9 @@ public class TestConvertJSONToSQL {
@Test
public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@ -561,18 +515,9 @@ public class TestConvertJSONToSQL {
@Test
public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@ -586,18 +531,9 @@ public class TestConvertJSONToSQL {
@Test
public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@ -611,18 +547,9 @@ public class TestConvertJSONToSQL {
@Test
public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
@ -636,20 +563,17 @@ public class TestConvertJSONToSQL {
@Test
public void testInsertWithMissingColumnFail() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
stmt.executeUpdate("CREATE TABLE PERSONS3 (id integer, name varchar(100), code integer, generated_key integer primary key)");
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS3");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_COLUMN);
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
@ -661,20 +585,17 @@ public class TestConvertJSONToSQL {
@Test
public void testInsertWithMissingColumnWarning() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
stmt.executeUpdate("CREATE TABLE PERSONS2 (id integer, name varchar(100), code integer, generated_key integer primary key)");
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS2");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.WARNING_UNMATCHED_COLUMN);
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
@ -691,24 +612,15 @@ public class TestConvertJSONToSQL {
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
out.assertContentEquals("INSERT INTO PERSONS2 (ID, NAME, CODE) VALUES (?, ?, ?)");
} // End testInsertWithMissingColumnWarning()
@Test
public void testInsertWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
@ -733,18 +645,9 @@ public class TestConvertJSONToSQL {
@Test
public void testUpdateWithMissingColumnFail() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@ -759,18 +662,9 @@ public class TestConvertJSONToSQL {
@Test
public void testUpdateWithMissingColumnWarning() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@ -797,18 +691,9 @@ public class TestConvertJSONToSQL {
@Test
public void testUpdateWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@ -832,6 +717,81 @@ public class TestConvertJSONToSQL {
} // End testUpdateWithMissingColumnIgnore()
/**
* Test create correct SQL String representation of value.
* Use PutSQL processor to verify converted value can be used and don't fail.
*/
@Test
public void testCreateSqlStringValue() throws ProcessException, SQLException, JsonGenerationException, JsonMappingException, IOException, InitializationException {
final TestRunner putSqlRunner = TestRunners.newTestRunner(PutSQL.class);
final AtomicInteger id = new AtomicInteger(20);
putSqlRunner.addControllerService("dbcp", service);
putSqlRunner.enableControllerService(service);
putSqlRunner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
putSqlRunner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
String tableName = "DIFTYPES";
ObjectMapper mapper = new ObjectMapper();
ResultSet colrs = null;
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createDifferentTypes);
}
colrs = conn.getMetaData().getColumns(null, null, tableName, "%");
while (colrs.next()) {
final int sqlType = colrs.getInt("DATA_TYPE");
final int colSize = colrs.getInt("COLUMN_SIZE");
switch (sqlType) {
case Types.BOOLEAN:
String json = mapper.writeValueAsString("true");
JsonNode fieldNode = mapper.readTree(json);
String booleanString = ConvertJSONToSQL.createSqlStringValue(fieldNode, colSize, sqlType);
assertEquals("true",booleanString);
Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(sqlType));
attributes.put("sql.args.1.value", booleanString);
byte[] data = ("INSERT INTO DIFTYPES (ID, B) VALUES (" + id.incrementAndGet() + ", ?)").getBytes();
putSqlRunner.enqueue(data, attributes);
putSqlRunner.run();
List<MockFlowFile> failed = putSqlRunner.getFlowFilesForRelationship(PutSQL.REL_FAILURE);
putSqlRunner.assertTransferCount(PutSQL.REL_SUCCESS, 1);
putSqlRunner.assertTransferCount(PutSQL.REL_FAILURE, 0);
putSqlRunner.clearTransferState();
break;
case Types.FLOAT:
case Types.DOUBLE:
json = mapper.writeValueAsString("78895654.6575");
fieldNode = mapper.readTree(json);
String numberString = ConvertJSONToSQL.createSqlStringValue(fieldNode, colSize, sqlType);
assertEquals("78895654.6575",numberString);
attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(sqlType));
attributes.put("sql.args.1.value", numberString);
data = ("INSERT INTO DIFTYPES (ID, dbl) VALUES (" + id.incrementAndGet() + ", ?)").getBytes();
putSqlRunner.enqueue(data, attributes);
putSqlRunner.run();
failed = putSqlRunner.getFlowFilesForRelationship(PutSQL.REL_FAILURE);
putSqlRunner.assertTransferCount(PutSQL.REL_SUCCESS, 1);
putSqlRunner.assertTransferCount(PutSQL.REL_FAILURE, 0);
putSqlRunner.clearTransferState();
break;
default:
break;
}
}
}
}
@Test
public void testDelete() throws InitializationException, ProcessException, SQLException, IOException {