NIFI-2750

Signed-off-by: Matt Burgess <mattyb149@apache.org>

NIFI-2750

Signed-off-by: Matt Burgess <mattyb149@apache.org>

NIFI-2750 Tweaking Property Retrieval

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1001
This commit is contained in:
Peter Wicks 2016-09-09 21:10:26 -06:00 committed by Matt Burgess
parent 2ee66de1a6
commit d65f46f7a5
2 changed files with 197 additions and 12 deletions

View File

@ -22,6 +22,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@ -175,6 +176,14 @@ public class ConvertJSONToSQL extends AbstractProcessor {
.expressionLanguageSupported(true)
.build();
static final PropertyDescriptor QUOTED_IDENTIFIERS = new PropertyDescriptor.Builder()
.name("jts-quoted-identifiers")
.displayName("Quote Identifiers")
.description("Enabling this option will cause all column names to be quoted, allowing you to "
+ "use reserved words as column names in your tables.")
.allowableValues("true", "false")
.defaultValue("false")
.build();
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
@ -211,6 +220,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
properties.add(UNMATCHED_FIELD_BEHAVIOR);
properties.add(UNMATCHED_COLUMN_BEHAVIOR);
properties.add(UPDATE_KEY);
properties.add(QUOTED_IDENTIFIERS);
return properties;
}
@ -254,6 +264,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final boolean failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
final boolean warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
//Escape column names?
final boolean escapeColumnNames = context.getProperty(QUOTED_IDENTIFIERS).asBoolean();
// get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
// using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
// the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
@ -330,9 +343,11 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final String fqTableName = tableNameBuilder.toString();
if (INSERT_TYPE.equals(statementType)) {
sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns);
sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
failUnmappedColumns, warningUnmappedColumns, escapeColumnNames);
} else {
sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns);
sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields,
failUnmappedColumns, warningUnmappedColumns, escapeColumnNames);
}
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
@ -381,7 +396,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
final boolean warningUnmappedColumns) {
final boolean warningUnmappedColumns, boolean escapeColumnNames) {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
for (final String requiredColName : schema.getRequiredColumnNames()) {
@ -418,7 +433,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
sqlBuilder.append(", ");
}
if(!escapeColumnNames){
sqlBuilder.append(desc.getColumnName());
} else {
sqlBuilder.append(schema.getQuotedIdentifierString());
sqlBuilder.append(desc.getColumnName());
sqlBuilder.append(schema.getQuotedIdentifierString());
}
final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
@ -455,7 +476,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
final boolean warningUnmappedColumns) {
final boolean warningUnmappedColumns, boolean escapeColumnNames) {
final Set<String> updateKeyNames;
if (updateKeys == null) {
@ -522,7 +543,15 @@ public class ConvertJSONToSQL extends AbstractProcessor {
sqlBuilder.append(", ");
}
sqlBuilder.append(desc.getColumnName()).append(" = ?");
if(!escapeColumnNames){
sqlBuilder.append(desc.getColumnName());
} else {
sqlBuilder.append(schema.getQuotedIdentifierString())
.append(desc.getColumnName())
.append(schema.getQuotedIdentifierString());
}
sqlBuilder.append(" = ?");
final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
@ -563,7 +592,14 @@ public class ConvertJSONToSQL extends AbstractProcessor {
}
fieldCount++;
sqlBuilder.append(normalizedColName).append(" = ?");
if(!escapeColumnNames){
sqlBuilder.append(normalizedColName);
} else {
sqlBuilder.append(schema.getQuotedIdentifierString())
.append(normalizedColName)
.append(schema.getQuotedIdentifierString());
}
sqlBuilder.append(" = ?");
final int sqlType = desc.getDataType();
attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
@ -586,11 +622,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
private List<String> requiredColumnNames;
private Set<String> primaryKeyColumnNames;
private Map<String, ColumnDescription> columns;
private String quotedIdentifierString;
private TableSchema(final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
final Set<String> primaryKeyColumnNames) {
final Set<String> primaryKeyColumnNames, final String quotedIdentifierString) {
this.columns = new HashMap<>();
this.primaryKeyColumnNames = primaryKeyColumnNames;
this.quotedIdentifierString = quotedIdentifierString;
this.requiredColumnNames = new ArrayList<>();
for (final ColumnDescription desc : columnDescriptions) {
@ -613,10 +651,15 @@ public class ConvertJSONToSQL extends AbstractProcessor {
return primaryKeyColumnNames;
}
public String getQuotedIdentifierString() {
return quotedIdentifierString;
}
public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName,
final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, schema, tableName, "%")) {
final DatabaseMetaData dmd = conn.getMetaData();
try (final ResultSet colrs = dmd.getColumns(catalog, schema, tableName, "%")) {
final List<ColumnDescription> cols = new ArrayList<>();
while (colrs.next()) {
final ColumnDescription col = ColumnDescription.from(colrs);
@ -634,7 +677,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
}
}
return new TableSchema(cols, translateColumnNames, primaryKeyColumns);
return new TableSchema(cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString());
}
}
}

View File

@ -82,6 +82,42 @@ public class TestConvertJSONToSQL {
out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
}
@Test
public void testInsertQuotedIdentifiers() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("INSERT INTO PERSONS (\"ID\", \"NAME\", \"CODE\") VALUES (?, ?, ?)");
}
@Test
public void testInsertWithNullValue() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
@ -186,6 +222,41 @@ public class TestConvertJSONToSQL {
}
}
@Test
public void testMultipleInsertsQuotedIdentifiers() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/persons.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5);
final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL);
for (final MockFlowFile mff : mffs) {
mff.assertContentEquals("INSERT INTO PERSONS (\"ID\", \"NAME\", \"CODE\") VALUES (?, ?, ?)");
for (int i=1; i <= 3; i++) {
mff.assertAttributeExists("sql.args." + i + ".type");
mff.assertAttributeExists("sql.args." + i + ".value");
}
}
}
@Test
public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
@ -220,6 +291,41 @@ public class TestConvertJSONToSQL {
out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?");
}
@Test
public void testUpdateBasedOnPrimaryKeyQuotedIdentifier() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.1.value", "Mark");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.2.value", "48");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "1");
out.assertContentEquals("UPDATE PERSONS SET \"NAME\" = ?, \"CODE\" = ? WHERE \"ID\" = ?");
}
@Test
public void testUnmappedFieldBehavior() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
@ -290,6 +396,42 @@ public class TestConvertJSONToSQL {
out.assertContentEquals("UPDATE PERSONS SET ID = ?, NAME = ? WHERE CODE = ?");
}
@Test
public void testUpdateBasedOnUpdateKeyQuotedIdentifier() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "code");
runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("UPDATE PERSONS SET \"ID\" = ?, \"NAME\" = ? WHERE \"CODE\" = ?");
}
@Test
public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);