diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java index c19efe51ca..86d51e7f38 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java @@ -23,8 +23,10 @@ import org.apache.nifi.processors.standard.db.DatabaseAdapter; import java.sql.JDBCType; import java.sql.Types; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import static java.sql.Types.CHAR; import static java.sql.Types.CLOB; @@ -106,6 +108,37 @@ public final class PhoenixDatabaseAdapter implements DatabaseAdapter { return query.toString(); } + @Override + public String getUpsertStatement(String table, List columnNames, Collection uniqueKeyColumnNames) { + if (org.apache.nifi.util.StringUtils.isEmpty(table)) { + throw new IllegalArgumentException("Table name cannot be null or blank"); + } + if (columnNames == null || columnNames.isEmpty()) { + throw new IllegalArgumentException("Column names cannot be null or empty"); + } + if (uniqueKeyColumnNames == null || uniqueKeyColumnNames.isEmpty()) { + throw new IllegalArgumentException("Key column names cannot be null or empty"); + } + + String columns = String.join(", ", columnNames); + + String parameterizedUpsertValues = columnNames.stream() + .map(columnName -> "?") + .collect(Collectors.joining(", ")); + + StringBuilder statementStringBuilder = new StringBuilder("UPSERT INTO ") + .append(table) + .append("(").append(columns).append(")") + .append(" VALUES ") + .append("(").append(parameterizedUpsertValues).append(")"); + return statementStringBuilder.toString(); + } + + @Override + public boolean supportsUpsert() { + return true; + } + @Override public boolean supportsCreateTableIfNotExists() { return true; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter index f104782c5b..641223d21b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter @@ -18,4 +18,5 @@ org.apache.nifi.processors.standard.db.impl.Oracle12DatabaseAdapter org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter org.apache.nifi.processors.standard.db.impl.MSSQL2008DatabaseAdapter org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter -org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter \ No newline at end of file +org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter +org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPhoenixDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPhoenixDatabaseAdapter.java new file mode 100644 index 0000000000..7234fe57e9 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPhoenixDatabaseAdapter.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.db.impl; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TestPhoenixDatabaseAdapter { + private PhoenixDatabaseAdapter testSubject; + + @BeforeEach + public void setUp() throws Exception { + testSubject = new PhoenixDatabaseAdapter(); + } + + @Test + void testSupportsUpsert() { + assertTrue(testSubject.supportsUpsert(), testSubject.getClass().getSimpleName() + " should support upsert"); + } + + @Test + void testGetUpsertStatementWithNullTableName() { + testGetUpsertStatement(null, Arrays.asList("notEmpty"), Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be null or blank")); + } + + @Test + void testGetUpsertStatementWithBlankTableName() { + testGetUpsertStatement("", Arrays.asList("notEmpty"), Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be null or blank")); + } + + @Test + void testGetUpsertStatementWithNullColumnNames() { + testGetUpsertStatement("notEmpty", null, Arrays.asList("notEmpty"), new IllegalArgumentException("Column names cannot be null or empty")); + } + + @Test + void testGetUpsertStatementWithEmptyColumnNames() { + testGetUpsertStatement("notEmpty", Collections.emptyList(), Arrays.asList("notEmpty"), new IllegalArgumentException("Column names cannot be null or empty")); + } + + @Test + void testGetUpsertStatementWithNullKeyColumnNames() { + testGetUpsertStatement("notEmpty", Arrays.asList("notEmpty"), null, new IllegalArgumentException("Key column names cannot be null or empty")); + } + + @Test + void testGetUpsertStatementWithEmptyKeyColumnNames() { + testGetUpsertStatement("notEmpty", Arrays.asList("notEmpty"), Collections.emptyList(), new IllegalArgumentException("Key column names cannot be null or empty")); + } + + @Test + void testGetUpsertStatement() { + // GIVEN + String tableName = "table"; + List columnNames = Arrays.asList("column1", "column2", "column3", "column4"); + Collection uniqueKeyColumnNames = Arrays.asList("column2", "column4"); + + String expected = "UPSERT INTO" + + " table(column1, column2, column3, column4) VALUES (?, ?, ?, ?)"; + + // WHEN + // THEN + testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, expected); + } + + private void testGetUpsertStatement(String tableName, List columnNames, Collection uniqueKeyColumnNames, IllegalArgumentException expected) { + final IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> { + testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, (String) null); + }); + assertEquals(expected.getMessage(), e.getMessage()); + } + + private void testGetUpsertStatement(String tableName, List columnNames, Collection uniqueKeyColumnNames, String expected) { + // WHEN + String actual = testSubject.getUpsertStatement(tableName, columnNames, uniqueKeyColumnNames); + + // THEN + assertEquals(expected, actual); + } + + @Test + public void testGetUpsertStatementQuoted() { + // GIVEN + String tableName = "\"table\""; + List columnNames = Arrays.asList("column1", "\"column2\"", "column3", "column4"); + Collection uniqueKeyColumnNames = Arrays.asList("\"column2\"", "column4"); + + String expected = "UPSERT INTO" + + " \"table\"(column1, \"column2\", column3, column4) VALUES (?, ?, ?, ?)"; + + // WHEN + // THEN + testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, expected); + } +}