mirror of https://github.com/apache/nifi.git
NIFI-3481 - Add DB Adapter for MS SQL 2012+
Signed-off-by: Matt Burgess <mattyb149@apache.org> NIFI-3481 Added support for Database Adapter Descriptions Signed-off-by: Matt Burgess <mattyb149@apache.org> NIFI-3481 Updated Adapter Name Signed-off-by: Matt Burgess <mattyb149@apache.org> This closes #1510
This commit is contained in:
parent
cd8eb775e6
commit
18f05856ff
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
|
@ -39,6 +40,7 @@ import java.sql.Timestamp;
|
|||
import java.text.DecimalFormat;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
|
@ -175,16 +177,20 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
|
|||
|
||||
static {
|
||||
// Load the DatabaseAdapters
|
||||
ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
|
||||
ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class);
|
||||
dbAdapterLoader.forEach(it -> dbAdapters.put(it.getName(), it));
|
||||
dbAdapterLoader.forEach(it -> {
|
||||
dbAdapters.put(it.getName(), it);
|
||||
dbAdapterValues.add(new AllowableValue(it.getName(),it.getName(), it.getDescription()));
|
||||
});
|
||||
|
||||
DB_TYPE = new PropertyDescriptor.Builder()
|
||||
.name("db-fetch-db-type")
|
||||
.displayName("Database Type")
|
||||
.description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
|
||||
+ "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
|
||||
.allowableValues(dbAdapters.keySet())
|
||||
.defaultValue(dbAdapters.values().stream().findFirst().get().getName())
|
||||
.allowableValues(dbAdapterValues.toArray(new AllowableValue[dbAdapterValues.size()]))
|
||||
.defaultValue("Generic")
|
||||
.required(true)
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ public interface DatabaseAdapter {
|
|||
|
||||
String getName();
|
||||
|
||||
String getDescription();
|
||||
|
||||
/**
|
||||
* Returns a SQL SELECT statement with the given clauses applied.
|
||||
*
|
||||
|
|
|
@ -28,6 +28,11 @@ public class GenericDatabaseAdapter implements DatabaseAdapter {
|
|||
return "Generic";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "Generates ANSI SQL";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
|
||||
if (StringUtils.isEmpty(tableName)) {
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.db.impl;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
|
||||
|
||||
/**
|
||||
* A database adapter that generates MS SQL Compatible SQL.
|
||||
*/
|
||||
public class MSSQLDatabaseAdapter implements DatabaseAdapter {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "MS SQL 2012+";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "Generates MS SQL Compatible SQL, for version 2012 or greater";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
|
||||
if (StringUtils.isEmpty(tableName)) {
|
||||
throw new IllegalArgumentException("Table name cannot be null or empty");
|
||||
}
|
||||
final StringBuilder query = new StringBuilder("SELECT ");
|
||||
|
||||
//If this is a limit query and not a paging query then use TOP in MS SQL
|
||||
if (limit != null && offset == null){
|
||||
query.append("TOP ");
|
||||
query.append(limit);
|
||||
query.append(" ");
|
||||
}
|
||||
|
||||
if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) {
|
||||
query.append("*");
|
||||
} else {
|
||||
query.append(columnNames);
|
||||
}
|
||||
query.append(" FROM ");
|
||||
query.append(tableName);
|
||||
|
||||
if (!StringUtils.isEmpty(whereClause)) {
|
||||
query.append(" WHERE ");
|
||||
query.append(whereClause);
|
||||
}
|
||||
if (!StringUtils.isEmpty(orderByClause)) {
|
||||
query.append(" ORDER BY ");
|
||||
query.append(orderByClause);
|
||||
}
|
||||
if (offset != null && limit != null && limit > 0) {
|
||||
if (StringUtils.isEmpty(orderByClause)) {
|
||||
throw new IllegalArgumentException("Order by clause cannot be null or empty when using row paging");
|
||||
}
|
||||
|
||||
query.append(" OFFSET ");
|
||||
query.append(offset);
|
||||
query.append(" ROWS");
|
||||
|
||||
query.append(" FETCH NEXT ");
|
||||
query.append(limit);
|
||||
query.append(" ROWS ONLY");
|
||||
}
|
||||
|
||||
return query.toString();
|
||||
}
|
||||
}
|
|
@ -28,6 +28,11 @@ public class OracleDatabaseAdapter implements DatabaseAdapter {
|
|||
return "Oracle";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "Generates Oracle compliant SQL";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
|
||||
if (StringUtils.isEmpty(tableName)) {
|
||||
|
|
|
@ -13,4 +13,5 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter
|
||||
org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter
|
||||
org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter
|
||||
org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter
|
|
@ -29,6 +29,11 @@ public class DerbyDatabaseAdapter implements DatabaseAdapter {
|
|||
return "Derby";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "Generates Derby compatible SQL (used for testing)";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
|
||||
if (StringUtils.isEmpty(tableName)) {
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.db.impl;
|
||||
|
||||
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestMSSQLDatabaseAdapter {
|
||||
final DatabaseAdapter db = new MSSQLDatabaseAdapter();
|
||||
|
||||
@Test
|
||||
public void testGeneration() throws Exception {
|
||||
String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","",null,null);
|
||||
String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
|
||||
Assert.assertEquals(sql1,expected1);
|
||||
|
||||
String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","that=\'some\"\' value\'","",null,null);
|
||||
String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE that=\'some\"\' value\'";
|
||||
Assert.assertEquals(sql2,expected2);
|
||||
|
||||
String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","that=\'some\"\' value\'","might DESC",null,null);
|
||||
String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
|
||||
Assert.assertEquals(sql3,expected3);
|
||||
|
||||
String sql4 = db.getSelectStatement("database.tablename", "","that=\'some\"\' value\'","might DESC",null,null);
|
||||
String expected4 = "SELECT * FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
|
||||
Assert.assertEquals(sql4,expected4);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testNoTableName() throws Exception {
|
||||
db.getSelectStatement("", "some(set),of(columns),that,might,contain,methods,a.*","","",null,null);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testPagingNoOrderBy() throws Exception {
|
||||
db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","",10L,0L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTOPQuery() throws Exception {
|
||||
String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","",100L,null);
|
||||
String expected1 = "SELECT TOP 100 some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
|
||||
Assert.assertEquals(sql1,expected1);
|
||||
|
||||
String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","contain",100L,null);
|
||||
String expected2 = "SELECT TOP 100 some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename ORDER BY contain";
|
||||
Assert.assertEquals(sql2,expected2);
|
||||
|
||||
String sql4 = db.getSelectStatement("database.tablename", "","that=\'some\"\' value\'","might DESC",123456L,null);
|
||||
String expected4 = "SELECT TOP 123456 * FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
|
||||
Assert.assertEquals(sql4,expected4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPagingQuery() throws Exception {
|
||||
String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","contain",100L,0L);
|
||||
String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename ORDER BY contain OFFSET 0 ROWS FETCH NEXT 100 ROWS ONLY";
|
||||
Assert.assertEquals(sql1,expected1);
|
||||
|
||||
String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","contain",10000L,123456L);
|
||||
String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename ORDER BY contain OFFSET 123456 ROWS FETCH NEXT 10000 ROWS ONLY";
|
||||
Assert.assertEquals(sql2,expected2);
|
||||
|
||||
String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","methods='strange'","contain",10000L,123456L);
|
||||
String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' ORDER BY contain OFFSET 123456 ROWS FETCH NEXT 10000 ROWS ONLY";
|
||||
Assert.assertEquals(sql3,expected3);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue