mirror of https://github.com/apache/nifi.git
parent
027fbf48b8
commit
46fa6a167a
|
@ -183,13 +183,22 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
||||||
|
|
||||||
static final PropertyDescriptor QUOTED_IDENTIFIERS = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor QUOTED_IDENTIFIERS = new PropertyDescriptor.Builder()
|
||||||
.name("jts-quoted-identifiers")
|
.name("jts-quoted-identifiers")
|
||||||
.displayName("Quote Identifiers")
|
.displayName("Quote Column Identifiers")
|
||||||
.description("Enabling this option will cause all column names to be quoted, allowing you to "
|
.description("Enabling this option will cause all column names to be quoted, allowing you to "
|
||||||
+ "use reserved words as column names in your tables.")
|
+ "use reserved words as column names in your tables.")
|
||||||
.allowableValues("true", "false")
|
.allowableValues("true", "false")
|
||||||
.defaultValue("false")
|
.defaultValue("false")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor QUOTED_TABLE_IDENTIFIER = new PropertyDescriptor.Builder()
|
||||||
|
.name("jts-quoted-table-identifiers")
|
||||||
|
.displayName("Quote Table Identifiers")
|
||||||
|
.description("Enabling this option will cause the table name to be quoted to support the "
|
||||||
|
+ "use of special characters in the table name")
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("false")
|
||||||
|
.build();
|
||||||
|
|
||||||
static final Relationship REL_ORIGINAL = new Relationship.Builder()
|
static final Relationship REL_ORIGINAL = new Relationship.Builder()
|
||||||
.name("original")
|
.name("original")
|
||||||
.description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship")
|
.description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship")
|
||||||
|
@ -226,6 +235,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
||||||
properties.add(UNMATCHED_COLUMN_BEHAVIOR);
|
properties.add(UNMATCHED_COLUMN_BEHAVIOR);
|
||||||
properties.add(UPDATE_KEY);
|
properties.add(UPDATE_KEY);
|
||||||
properties.add(QUOTED_IDENTIFIERS);
|
properties.add(QUOTED_IDENTIFIERS);
|
||||||
|
properties.add(QUOTED_TABLE_IDENTIFIER);
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -272,6 +282,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
||||||
//Escape column names?
|
//Escape column names?
|
||||||
final boolean escapeColumnNames = context.getProperty(QUOTED_IDENTIFIERS).asBoolean();
|
final boolean escapeColumnNames = context.getProperty(QUOTED_IDENTIFIERS).asBoolean();
|
||||||
|
|
||||||
|
// Quote table name?
|
||||||
|
final boolean quoteTableName = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
|
||||||
|
|
||||||
// get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
|
// get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
|
||||||
// using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
|
// using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
|
||||||
// the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
|
// the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
|
||||||
|
@ -349,10 +362,10 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
||||||
|
|
||||||
if (INSERT_TYPE.equals(statementType)) {
|
if (INSERT_TYPE.equals(statementType)) {
|
||||||
sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
|
sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
|
||||||
failUnmappedColumns, warningUnmappedColumns, escapeColumnNames);
|
failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
|
||||||
} else {
|
} else {
|
||||||
sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields,
|
sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields,
|
||||||
failUnmappedColumns, warningUnmappedColumns, escapeColumnNames);
|
failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
|
||||||
}
|
}
|
||||||
} catch (final ProcessException pe) {
|
} catch (final ProcessException pe) {
|
||||||
getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
|
getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
|
||||||
|
@ -402,7 +415,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
||||||
|
|
||||||
private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
|
private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
|
||||||
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
|
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
|
||||||
final boolean warningUnmappedColumns, boolean escapeColumnNames) {
|
final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) {
|
||||||
|
|
||||||
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
|
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
|
||||||
for (final String requiredColName : schema.getRequiredColumnNames()) {
|
for (final String requiredColName : schema.getRequiredColumnNames()) {
|
||||||
|
@ -420,7 +433,15 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
||||||
|
|
||||||
final StringBuilder sqlBuilder = new StringBuilder();
|
final StringBuilder sqlBuilder = new StringBuilder();
|
||||||
int fieldCount = 0;
|
int fieldCount = 0;
|
||||||
sqlBuilder.append("INSERT INTO ").append(tableName).append(" (");
|
sqlBuilder.append("INSERT INTO ");
|
||||||
|
if (quoteTableName) {
|
||||||
|
sqlBuilder.append(schema.getQuotedIdentifierString())
|
||||||
|
.append(tableName)
|
||||||
|
.append(schema.getQuotedIdentifierString());
|
||||||
|
} else {
|
||||||
|
sqlBuilder.append(tableName);
|
||||||
|
}
|
||||||
|
sqlBuilder.append(" (");
|
||||||
|
|
||||||
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
|
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
|
||||||
// adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
|
// adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
|
||||||
|
@ -439,12 +460,12 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
||||||
sqlBuilder.append(", ");
|
sqlBuilder.append(", ");
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!escapeColumnNames){
|
if(escapeColumnNames){
|
||||||
sqlBuilder.append(desc.getColumnName());
|
sqlBuilder.append(schema.getQuotedIdentifierString())
|
||||||
|
.append(desc.getColumnName())
|
||||||
|
.append(schema.getQuotedIdentifierString());
|
||||||
} else {
|
} else {
|
||||||
sqlBuilder.append(schema.getQuotedIdentifierString());
|
|
||||||
sqlBuilder.append(desc.getColumnName());
|
sqlBuilder.append(desc.getColumnName());
|
||||||
sqlBuilder.append(schema.getQuotedIdentifierString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final int sqlType = desc.getDataType();
|
final int sqlType = desc.getDataType();
|
||||||
|
@ -482,7 +503,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
||||||
|
|
||||||
private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
|
private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
|
||||||
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
|
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
|
||||||
final boolean warningUnmappedColumns, boolean escapeColumnNames) {
|
final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) {
|
||||||
|
|
||||||
final Set<String> updateKeyNames;
|
final Set<String> updateKeyNames;
|
||||||
if (updateKeys == null) {
|
if (updateKeys == null) {
|
||||||
|
@ -500,7 +521,16 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
||||||
|
|
||||||
final StringBuilder sqlBuilder = new StringBuilder();
|
final StringBuilder sqlBuilder = new StringBuilder();
|
||||||
int fieldCount = 0;
|
int fieldCount = 0;
|
||||||
sqlBuilder.append("UPDATE ").append(tableName).append(" SET ");
|
sqlBuilder.append("UPDATE ");
|
||||||
|
if (quoteTableName) {
|
||||||
|
sqlBuilder.append(schema.getQuotedIdentifierString())
|
||||||
|
.append(tableName)
|
||||||
|
.append(schema.getQuotedIdentifierString());
|
||||||
|
} else {
|
||||||
|
sqlBuilder.append(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
sqlBuilder.append(" SET ");
|
||||||
|
|
||||||
|
|
||||||
// Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON
|
// Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON
|
||||||
|
@ -549,12 +579,12 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
||||||
sqlBuilder.append(", ");
|
sqlBuilder.append(", ");
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!escapeColumnNames){
|
if(escapeColumnNames){
|
||||||
sqlBuilder.append(desc.getColumnName());
|
|
||||||
} else {
|
|
||||||
sqlBuilder.append(schema.getQuotedIdentifierString())
|
sqlBuilder.append(schema.getQuotedIdentifierString())
|
||||||
.append(desc.getColumnName())
|
.append(desc.getColumnName())
|
||||||
.append(schema.getQuotedIdentifierString());
|
.append(schema.getQuotedIdentifierString());
|
||||||
|
} else {
|
||||||
|
sqlBuilder.append(desc.getColumnName());
|
||||||
}
|
}
|
||||||
|
|
||||||
sqlBuilder.append(" = ?");
|
sqlBuilder.append(" = ?");
|
||||||
|
@ -598,12 +628,12 @@ public class ConvertJSONToSQL extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
fieldCount++;
|
fieldCount++;
|
||||||
|
|
||||||
if(!escapeColumnNames){
|
if(escapeColumnNames){
|
||||||
sqlBuilder.append(normalizedColName);
|
|
||||||
} else {
|
|
||||||
sqlBuilder.append(schema.getQuotedIdentifierString())
|
sqlBuilder.append(schema.getQuotedIdentifierString())
|
||||||
.append(normalizedColName)
|
.append(normalizedColName)
|
||||||
.append(schema.getQuotedIdentifierString());
|
.append(schema.getQuotedIdentifierString());
|
||||||
|
} else {
|
||||||
|
sqlBuilder.append(normalizedColName);
|
||||||
}
|
}
|
||||||
sqlBuilder.append(" = ?");
|
sqlBuilder.append(" = ?");
|
||||||
final int sqlType = desc.getDataType();
|
final int sqlType = desc.getDataType();
|
||||||
|
|
|
@ -123,6 +123,43 @@ public class TestConvertJSONToSQL {
|
||||||
out.assertContentEquals("INSERT INTO PERSONS (\"ID\", \"NAME\", \"CODE\") VALUES (?, ?, ?)");
|
out.assertContentEquals("INSERT INTO PERSONS (\"ID\", \"NAME\", \"CODE\") VALUES (?, ?, ?)");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInsertQuotedTableIdentifier() throws InitializationException, ProcessException, SQLException, IOException {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
|
||||||
|
final File tempDir = folder.getRoot();
|
||||||
|
final File dbDir = new File(tempDir, "db");
|
||||||
|
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
|
||||||
|
runner.addControllerService("dbcp", service);
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
|
try (final Connection conn = service.getConnection()) {
|
||||||
|
try (final Statement stmt = conn.createStatement()) {
|
||||||
|
stmt.executeUpdate(createPersons);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
|
||||||
|
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
|
||||||
|
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
|
||||||
|
runner.setProperty(ConvertJSONToSQL.QUOTED_TABLE_IDENTIFIER, "true");
|
||||||
|
|
||||||
|
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
|
||||||
|
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
|
||||||
|
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
|
||||||
|
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
|
||||||
|
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
|
||||||
|
out.assertAttributeEquals("sql.args.1.value", "1");
|
||||||
|
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
|
||||||
|
out.assertAttributeEquals("sql.args.2.value", "Mark");
|
||||||
|
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
|
||||||
|
out.assertAttributeEquals("sql.args.3.value", "48");
|
||||||
|
|
||||||
|
out.assertContentEquals("INSERT INTO \"PERSONS\" (ID, NAME, CODE) VALUES (?, ?, ?)");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInsertWithNullValue() throws InitializationException, ProcessException, SQLException, IOException {
|
public void testInsertWithNullValue() throws InitializationException, ProcessException, SQLException, IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
|
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
|
||||||
|
@ -195,6 +232,43 @@ public class TestConvertJSONToSQL {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateQuotedTableIdentifier() throws InitializationException, ProcessException, SQLException, IOException {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
|
||||||
|
final File tempDir = folder.getRoot();
|
||||||
|
final File dbDir = new File(tempDir, "db");
|
||||||
|
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
|
||||||
|
runner.addControllerService("dbcp", service);
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
|
try (final Connection conn = service.getConnection()) {
|
||||||
|
try (final Statement stmt = conn.createStatement()) {
|
||||||
|
stmt.executeUpdate(createPersons);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
|
||||||
|
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
|
||||||
|
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
|
||||||
|
runner.setProperty(ConvertJSONToSQL.QUOTED_TABLE_IDENTIFIER, "true");
|
||||||
|
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json"));
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
|
||||||
|
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
|
||||||
|
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
|
||||||
|
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
|
||||||
|
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
|
||||||
|
out.assertAttributeEquals("sql.args.1.value", "Mark");
|
||||||
|
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER));
|
||||||
|
out.assertAttributeNotExists("sql.args.2.value");
|
||||||
|
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
|
||||||
|
out.assertAttributeEquals("sql.args.3.value", "1");
|
||||||
|
|
||||||
|
out.assertContentEquals("UPDATE \"PERSONS\" SET NAME = ?, CODE = ? WHERE ID = ?");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultipleInserts() throws InitializationException, ProcessException, SQLException, IOException {
|
public void testMultipleInserts() throws InitializationException, ProcessException, SQLException, IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
|
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
|
||||||
|
|
Loading…
Reference in New Issue