NIFI-5151: Add UPSERT support for Apache Phoenix

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #7263
This commit is contained in:
Lehel Boér 2023-05-18 16:52:51 +02:00 committed by Matthew Burgess
parent e2134e2733
commit 6c70471cc6
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
3 changed files with 153 additions and 1 deletions

View File

@ -23,8 +23,10 @@ import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import java.sql.JDBCType;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static java.sql.Types.CHAR;
import static java.sql.Types.CLOB;
@ -106,6 +108,37 @@ public final class PhoenixDatabaseAdapter implements DatabaseAdapter {
return query.toString();
}
@Override
public String getUpsertStatement(String table, List<String> columnNames, Collection<String> uniqueKeyColumnNames) {
if (org.apache.nifi.util.StringUtils.isEmpty(table)) {
throw new IllegalArgumentException("Table name cannot be null or blank");
}
if (columnNames == null || columnNames.isEmpty()) {
throw new IllegalArgumentException("Column names cannot be null or empty");
}
if (uniqueKeyColumnNames == null || uniqueKeyColumnNames.isEmpty()) {
throw new IllegalArgumentException("Key column names cannot be null or empty");
}
String columns = String.join(", ", columnNames);
String parameterizedUpsertValues = columnNames.stream()
.map(columnName -> "?")
.collect(Collectors.joining(", "));
StringBuilder statementStringBuilder = new StringBuilder("UPSERT INTO ")
.append(table)
.append("(").append(columns).append(")")
.append(" VALUES ")
.append("(").append(parameterizedUpsertValues).append(")");
return statementStringBuilder.toString();
}
@Override
public boolean supportsUpsert() {
return true;
}
@Override
public boolean supportsCreateTableIfNotExists() {
return true;

View File

@ -18,4 +18,5 @@ org.apache.nifi.processors.standard.db.impl.Oracle12DatabaseAdapter
org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter
org.apache.nifi.processors.standard.db.impl.MSSQL2008DatabaseAdapter
org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter
org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter
org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter
org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.db.impl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class TestPhoenixDatabaseAdapter {
private PhoenixDatabaseAdapter testSubject;
@BeforeEach
public void setUp() throws Exception {
testSubject = new PhoenixDatabaseAdapter();
}
@Test
void testSupportsUpsert() {
assertTrue(testSubject.supportsUpsert(), testSubject.getClass().getSimpleName() + " should support upsert");
}
@Test
void testGetUpsertStatementWithNullTableName() {
testGetUpsertStatement(null, Arrays.asList("notEmpty"), Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be null or blank"));
}
@Test
void testGetUpsertStatementWithBlankTableName() {
testGetUpsertStatement("", Arrays.asList("notEmpty"), Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be null or blank"));
}
@Test
void testGetUpsertStatementWithNullColumnNames() {
testGetUpsertStatement("notEmpty", null, Arrays.asList("notEmpty"), new IllegalArgumentException("Column names cannot be null or empty"));
}
@Test
void testGetUpsertStatementWithEmptyColumnNames() {
testGetUpsertStatement("notEmpty", Collections.emptyList(), Arrays.asList("notEmpty"), new IllegalArgumentException("Column names cannot be null or empty"));
}
@Test
void testGetUpsertStatementWithNullKeyColumnNames() {
testGetUpsertStatement("notEmpty", Arrays.asList("notEmpty"), null, new IllegalArgumentException("Key column names cannot be null or empty"));
}
@Test
void testGetUpsertStatementWithEmptyKeyColumnNames() {
testGetUpsertStatement("notEmpty", Arrays.asList("notEmpty"), Collections.emptyList(), new IllegalArgumentException("Key column names cannot be null or empty"));
}
@Test
void testGetUpsertStatement() {
// GIVEN
String tableName = "table";
List<String> columnNames = Arrays.asList("column1", "column2", "column3", "column4");
Collection<String> uniqueKeyColumnNames = Arrays.asList("column2", "column4");
String expected = "UPSERT INTO" +
" table(column1, column2, column3, column4) VALUES (?, ?, ?, ?)";
// WHEN
// THEN
testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, expected);
}
private void testGetUpsertStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, IllegalArgumentException expected) {
final IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> {
testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, (String) null);
});
assertEquals(expected.getMessage(), e.getMessage());
}
private void testGetUpsertStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, String expected) {
// WHEN
String actual = testSubject.getUpsertStatement(tableName, columnNames, uniqueKeyColumnNames);
// THEN
assertEquals(expected, actual);
}
@Test
public void testGetUpsertStatementQuoted() {
// GIVEN
String tableName = "\"table\"";
List<String> columnNames = Arrays.asList("column1", "\"column2\"", "column3", "column4");
Collection<String> uniqueKeyColumnNames = Arrays.asList("\"column2\"", "column4");
String expected = "UPSERT INTO" +
" \"table\"(column1, \"column2\", column3, column4) VALUES (?, ?, ?, ?)";
// WHEN
// THEN
testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, expected);
}
}