NIFI-6878: Added 'Use statement.type Attribute' to ConvertJSONToSQL

This closes #3893

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Matthew Burgess 2019-11-18 23:33:42 -05:00 committed by Mike Thomsen
parent 29ea872f2c
commit 71d5162965
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
2 changed files with 39 additions and 4 deletions

View File

@ -109,6 +109,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
private static final String UPDATE_TYPE = "UPDATE";
private static final String INSERT_TYPE = "INSERT";
private static final String DELETE_TYPE = "DELETE";
static final String USE_ATTR_TYPE = "Use statement.type Attribute";
static final String STATEMENT_TYPE_ATTRIBUTE = "statement.type";
static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
"Any field in the JSON document that cannot be mapped to a column in the database is ignored");
@ -135,7 +138,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
.name("Statement Type")
.description("Specifies the type of SQL Statement to generate")
.required(true)
.allowableValues(UPDATE_TYPE, INSERT_TYPE, DELETE_TYPE)
.allowableValues(UPDATE_TYPE, INSERT_TYPE, DELETE_TYPE, USE_ATTR_TYPE)
.build();
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
@ -288,7 +291,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
final boolean ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
final String statementType = context.getProperty(STATEMENT_TYPE).getValue();
String statementType = context.getProperty(STATEMENT_TYPE).getValue();
final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
@ -380,6 +383,10 @@ public class ConvertJSONToSQL extends AbstractProcessor {
tableNameBuilder.append(tableName);
final String fqTableName = tableNameBuilder.toString();
if (USE_ATTR_TYPE.equals(statementType)) {
statementType = flowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
}
if (INSERT_TYPE.equals(statementType)) {
sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName, attributePrefix);

View File

@ -52,8 +52,6 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_CO
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
public class TestConvertJSONToSQL {
static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
static String createDifferentTypes = "CREATE TABLE DIFTYPES (id integer primary key, b boolean, f float, dbl double, dcml decimal, d date)";
@ClassRule
public static TemporaryFolder folder = new TemporaryFolder();
@ -70,6 +68,7 @@ public class TestConvertJSONToSQL {
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
service = new MockDBCPService(dbDir.getAbsolutePath());
final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
@ -103,6 +102,34 @@ public class TestConvertJSONToSQL {
out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
}
@Test
public void testInsertStatementType() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, ConvertJSONToSQL.USE_ATTR_TYPE);
Map<String, String> attrs = new HashMap<>();
attrs.put(ConvertJSONToSQL.STATEMENT_TYPE_ATTRIBUTE, "INSERT");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"), attrs);
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
}
@Test
public void testInsertQuotedIdentifiers() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
@ -709,6 +736,7 @@ public class TestConvertJSONToSQL {
ResultSet colrs = null;
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
String createDifferentTypes = "CREATE TABLE DIFTYPES (id integer primary key, b boolean, f float, dbl double, dcml decimal, d date)";
stmt.executeUpdate(createDifferentTypes);
}
colrs = conn.getMetaData().getColumns(null, null, tableName, "%");