NIFI-5143: Initial work to support column values for paging results

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2728.
This commit is contained in:
Matthew Burgess 2018-05-21 13:45:16 -04:00 committed by Pierre Villard
parent 729f8aa246
commit 0e09b98b02
15 changed files with 689 additions and 109 deletions

View File

@ -30,6 +30,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
@ -98,8 +99,8 @@ import java.util.stream.IntStream;
@WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
})
@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language",
expressionLanguageScope = ExpressionLanguageScope.NONE, description = "Specifies an initial "
+ "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
expressionLanguageScope = ExpressionLanguageScope.NONE, description = "Specifies an initial "
+ "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder()
@ -115,6 +116,19 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor COLUMN_FOR_VALUE_PARTITIONING = new PropertyDescriptor.Builder()
.name("gen-table-column-for-val-partitioning")
.displayName("Column for Value Partitioning")
.description("The name of a column whose values will be used for partitioning. The default behavior is to use row numbers on the result set for partitioning into "
+ "'pages' to be fetched from the database, using an offset/limit strategy. However for certain databases, it can be more efficient under the right circumstances to use "
+ "the column values themselves to define the 'pages'. This property should only be used when the default queries are not performing well, when there is no maximum-value "
+ "column or a single maximum-value column whose type can be coerced to a long integer (i.e. not date or timestamp), and the column values are evenly distributed and not "
+ "sparse, for best performance.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. "
@ -135,6 +149,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
pds.add(MAX_VALUE_COLUMN_NAMES);
pds.add(QUERY_TIMEOUT);
pds.add(PARTITION_SIZE);
pds.add(COLUMN_FOR_VALUE_PARTITIONING);
pds.add(WHERE_CLAUSE);
propDescriptors = Collections.unmodifiableList(pds);
}
@ -151,7 +166,15 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return super.customValidate(validationContext);
List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
final PropertyValue columnForPartitioning = validationContext.getProperty(COLUMN_FOR_VALUE_PARTITIONING);
// If no EL is present, ensure it's a single column (i.e. no commas in the property value)
if (columnForPartitioning.isSet() && !columnForPartitioning.isExpressionLanguagePresent() && columnForPartitioning.getValue().contains(",")) {
results.add(new ValidationResult.Builder().valid(false).explanation(
COLUMN_FOR_VALUE_PARTITIONING.getDisplayName() + " requires a single column name, but a comma was detected").build());
}
return results;
}
@Override
@ -195,6 +218,8 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
final String columnForPartitioning = context.getProperty(COLUMN_FOR_VALUE_PARTITIONING).evaluateAttributeExpressions(fileToProcess).getValue();
final boolean useColumnValsForPaging = !StringUtils.isEmpty(columnForPartitioning);
final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
final StateManager stateManager = context.getStateManager();
@ -241,20 +266,24 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
? new ArrayList<>(0)
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
List<String> maxValueClauses = new ArrayList<>(maxValueColumnNameList.size());
final int numMaxValueColumns = maxValueColumnNameList.size();
List<String> maxValueClauses = new ArrayList<>(numMaxValueColumns);
Long maxValueForPartitioning = null;
Long minValueForPartitioning = null;
String columnsClause = null;
List<String> maxValueSelectColumns = new ArrayList<>(maxValueColumnNameList.size() + 1);
List<String> maxValueSelectColumns = new ArrayList<>(numMaxValueColumns + 1);
maxValueSelectColumns.add("COUNT(*)");
// For each maximum-value column, get a WHERE filter and a MAX(column) alias
IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
IntStream.range(0, numMaxValueColumns).forEach((index) -> {
String colName = maxValueColumnNameList.get(index);
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName, dbAdapter);
if (!StringUtils.isEmpty(maxValue)) {
if(columnTypeMap.isEmpty() || getColumnType(tableName, colName, dbAdapter) == null){
if (columnTypeMap.isEmpty() || getColumnType(tableName, colName, dbAdapter) == null) {
// This means column type cache is clean after instance reboot. We should re-cache column type
super.setup(context, false, finalFileToProcess);
}
@ -263,8 +292,18 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
// Add a condition for the WHERE clause
maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
}
});
// If we are using a columns' values, get the maximum and minimum values in the context of the aforementioned WHERE clause
if (useColumnValsForPaging) {
if(columnForPartitioning.contains(",")) {
throw new ProcessException(COLUMN_FOR_VALUE_PARTITIONING.getDisplayName() + " requires a single column name, but a comma was detected");
}
maxValueSelectColumns.add("MAX(" + columnForPartitioning + ") " + columnForPartitioning);
maxValueSelectColumns.add("MIN(" + columnForPartitioning + ") MIN_" + columnForPartitioning);
}
if (customWhereClause != null) {
// adding the custom WHERE clause (if defined) to the list of existing clauses.
maxValueClauses.add("(" + customWhereClause + ")");
@ -294,7 +333,8 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
// Update the state map with the newly-observed maximum values
ResultSetMetaData rsmd = resultSet.getMetaData();
for (int i = 2; i <= rsmd.getColumnCount(); i++) {
int i = 2;
for (; i <= numMaxValueColumns + 1; i++) {
//Some JDBC drivers consider the columns name and label to be very different things.
// Since this column has been aliased lets check the label first,
// if there is no label we'll use the column name.
@ -318,11 +358,18 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
if (newMaxValue != null) {
statePropertyMap.put(fullyQualifiedStateKey, newMaxValue);
}
} catch (ParseException | IOException pie) {
} catch (ParseException | IOException | ClassCastException pice) {
// Fail the whole thing here before we start creating flow files and such
throw new ProcessException(pie);
throw new ProcessException(pice);
}
}
// Process the maximum and minimum values for the partitioning column if necessary
// These are currently required to be Long values, will throw a ClassCastException if they are not
if (useColumnValsForPaging) {
Object o = resultSet.getObject(i);
maxValueForPartitioning = o == null ? null : Long.valueOf(o.toString());
o = resultSet.getObject(i + 1);
minValueForPartitioning = o == null ? null : Long.valueOf(o.toString());
}
} else {
// Something is very wrong here, one row (even if count is zero) should be returned
@ -330,13 +377,13 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
}
// for each maximum-value column get a right bounding WHERE condition
IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
IntStream.range(0, numMaxValueColumns).forEach((index) -> {
String colName = maxValueColumnNameList.get(index);
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName, dbAdapter);
if (!StringUtils.isEmpty(maxValue)) {
if(columnTypeMap.isEmpty() || getColumnType(tableName, colName, dbAdapter) == null){
if (columnTypeMap.isEmpty() || getColumnType(tableName, colName, dbAdapter) == null) {
// This means column type cache is clean after instance reboot. We should re-cache column type
super.setup(context, false, finalFileToProcess);
}
@ -347,17 +394,30 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
}
});
//Update WHERE list to include new right hand boundaries
whereClause = StringUtils.join(maxValueClauses, " AND ");
final long numberOfFetches = (partitionSize == 0) ? 1 : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
final long numberOfFetches;
if (useColumnValsForPaging) {
final long valueRangeSize = maxValueForPartitioning == null ? 0 : (maxValueForPartitioning - minValueForPartitioning + 1);
numberOfFetches = (partitionSize == 0) ? 1 : (valueRangeSize / partitionSize) + (valueRangeSize % partitionSize == 0 ? 0 : 1);
} else {
numberOfFetches = (partitionSize == 0) ? 1 : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
}
// Generate SQL statements to read "pages" of data
Long limit = partitionSize == 0 ? null : (long) partitionSize;
for (long i = 0; i < numberOfFetches; i++) {
Long limit = partitionSize == 0 ? null : (long) partitionSize;
Long offset = partitionSize == 0 ? null : i * partitionSize;
// Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit)
if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) {
maxValueClauses.add(columnForPartitioning + " <= " + maxValueForPartitioning);
limit = null;
}
//Update WHERE list to include new right hand boundaries
whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND ");
Long offset = partitionSize == 0 ? null : i * partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0);
final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", ");
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset);
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning);
FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.tableName", tableName);

View File

@ -38,6 +38,23 @@ public interface DatabaseAdapter {
*/
String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset);
/**
* Returns a SQL SELECT statement with the given clauses applied. Note that if this method is overridden, the other overloaded methods
* need to be overridden as well, to call this method with columnForPartitioning = false
*
* @param tableName The name of the table to fetch rows from
* @param columnNames The names of the columns to fetch from the table
* @param whereClause The filter to apply to the statement. This should not include the WHERE keyword
* @param orderByClause The columns/clause used for ordering the result rows. This should not include the ORDER BY keywords
* @param limit The value for the LIMIT clause (i.e. the number of rows to return)
* @param offset The value for the OFFSET clause (i.e. the number of rows to skip)
* @param columnForPartitioning The (optional) column name that, if provided, the limit and offset values are based on values from the column itself (rather than the row number)
* @return A String containing a SQL SELECT statement with the given clauses applied
*/
default String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset);
}
/**
* <p>Returns a bare identifier string by removing wrapping escape characters
* from identifier strings such as table and column names.</p>

View File

@ -35,6 +35,11 @@ public class GenericDatabaseAdapter implements DatabaseAdapter {
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
}
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
@ -50,20 +55,33 @@ public class GenericDatabaseAdapter implements DatabaseAdapter {
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
if (!StringUtils.isEmpty(columnForPartitioning)) {
query.append(" AND ");
query.append(columnForPartitioning);
query.append(" >= ");
query.append(offset != null ? offset : "0");
if (limit != null) {
query.append(" AND ");
query.append(columnForPartitioning);
query.append(" < ");
query.append((offset == null ? 0 : offset) + limit);
}
}
}
if (!StringUtils.isEmpty(orderByClause)) {
if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
if (limit != null) {
query.append(" LIMIT ");
query.append(limit);
if (StringUtils.isEmpty(columnForPartitioning)) {
if (limit != null) {
query.append(" LIMIT ");
query.append(limit);
}
if (offset != null && offset > 0) {
query.append(" OFFSET ");
query.append(offset);
}
}
if (offset != null && offset > 0) {
query.append(" OFFSET ");
query.append(offset);
}
return query.toString();
}
}

View File

@ -34,14 +34,19 @@ public class MSSQL2008DatabaseAdapter extends MSSQLDatabaseAdapter {
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
}
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
final StringBuilder query = new StringBuilder("SELECT ");
boolean useColumnForPartitioning = !StringUtils.isEmpty(columnForPartitioning);
// If this is a limit query and not a paging query then use TOP in MS SQL
if (limit != null) {
if (limit != null && !useColumnForPartitioning) {
if (offset != null) {
query.append("* FROM (SELECT ");
@ -60,7 +65,7 @@ public class MSSQL2008DatabaseAdapter extends MSSQLDatabaseAdapter {
query.append(columnNames);
}
if (limit != null && offset != null && orderByClause != null) {
if (limit != null && offset != null && orderByClause != null && !useColumnForPartitioning) {
query.append(", ROW_NUMBER() OVER(ORDER BY ");
query.append(orderByClause);
query.append(" asc) rnum");
@ -71,14 +76,26 @@ public class MSSQL2008DatabaseAdapter extends MSSQLDatabaseAdapter {
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
if (useColumnForPartitioning) {
query.append(" AND ");
query.append(columnForPartitioning);
query.append(" >= ");
query.append(offset != null ? offset : "0");
if (limit != null) {
query.append(" AND ");
query.append(columnForPartitioning);
query.append(" < ");
query.append((offset == null ? 0 : offset) + limit);
}
}
}
if (!StringUtils.isEmpty(orderByClause)) {
if (!StringUtils.isEmpty(orderByClause) && !useColumnForPartitioning) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
if (limit != null && offset != null) {
if (limit != null && offset != null && !useColumnForPartitioning) {
query.append(") A WHERE rnum > ");
query.append(offset);
query.append(" AND rnum <= ");

View File

@ -35,13 +35,18 @@ public class MSSQLDatabaseAdapter implements DatabaseAdapter {
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
}
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
final StringBuilder query = new StringBuilder("SELECT ");
//If this is a limit query and not a paging query then use TOP in MS SQL
if (limit != null && offset == null){
if (limit != null && offset == null && StringUtils.isEmpty(columnForPartitioning)){
query.append("TOP ");
query.append(limit);
query.append(" ");
@ -58,23 +63,37 @@ public class MSSQLDatabaseAdapter implements DatabaseAdapter {
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
if (!StringUtils.isEmpty(columnForPartitioning)) {
query.append(" AND ");
query.append(columnForPartitioning);
query.append(" >= ");
query.append(offset != null ? offset : "0");
if (limit != null) {
query.append(" AND ");
query.append(columnForPartitioning);
query.append(" < ");
query.append((offset == null ? 0 : offset) + limit);
}
}
}
if (!StringUtils.isEmpty(orderByClause)) {
if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
if (offset != null && limit != null && limit > 0) {
if (StringUtils.isEmpty(orderByClause)) {
throw new IllegalArgumentException("Order by clause cannot be null or empty when using row paging");
if (StringUtils.isEmpty(columnForPartitioning)) {
if (offset != null && limit != null && limit > 0) {
if (StringUtils.isEmpty(orderByClause)) {
throw new IllegalArgumentException("Order by clause cannot be null or empty when using row paging");
}
query.append(" OFFSET ");
query.append(offset);
query.append(" ROWS");
query.append(" FETCH NEXT ");
query.append(limit);
query.append(" ROWS ONLY");
}
query.append(" OFFSET ");
query.append(offset);
query.append(" ROWS");
query.append(" FETCH NEXT ");
query.append(limit);
query.append(" ROWS ONLY");
}
return query.toString();

View File

@ -35,6 +35,11 @@ public class Oracle12DatabaseAdapter implements DatabaseAdapter {
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
}
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
@ -51,20 +56,34 @@ public class Oracle12DatabaseAdapter implements DatabaseAdapter {
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
if (!StringUtils.isEmpty(columnForPartitioning)) {
query.append(" AND ");
query.append(columnForPartitioning);
query.append(" >= ");
query.append(offset != null ? offset : "0");
if (limit != null) {
query.append(" AND ");
query.append(columnForPartitioning);
query.append(" < ");
query.append((offset == null ? 0 : offset) + limit);
}
}
}
if (!StringUtils.isEmpty(orderByClause)) {
if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
if (offset != null && offset > 0) {
query.append(" OFFSET ");
query.append(offset);
query.append(" ROWS");
}
if (limit != null) {
query.append(" FETCH NEXT ");
query.append(limit);
query.append(" ROWS ONLY");
if (StringUtils.isEmpty(columnForPartitioning)) {
if (offset != null && offset > 0) {
query.append(" OFFSET ");
query.append(offset);
query.append(" ROWS");
}
if (limit != null) {
query.append(" FETCH NEXT ");
query.append(limit);
query.append(" ROWS ONLY");
}
}
return query.toString();

View File

@ -35,12 +35,17 @@ public class OracleDatabaseAdapter implements DatabaseAdapter {
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
}
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
final StringBuilder query = new StringBuilder();
boolean nestedSelect = (limit != null || offset != null);
boolean nestedSelect = (limit != null || offset != null) && StringUtils.isEmpty(columnForPartitioning);
if (nestedSelect) {
// Need a nested SELECT query here in order to use ROWNUM to limit the results
query.append("SELECT ");
@ -64,8 +69,20 @@ public class OracleDatabaseAdapter implements DatabaseAdapter {
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
if (!StringUtils.isEmpty(columnForPartitioning)) {
query.append(" AND ");
query.append(columnForPartitioning);
query.append(" >= ");
query.append(offset != null ? offset : "0");
if (limit != null) {
query.append(" AND ");
query.append(columnForPartitioning);
query.append(" < ");
query.append((offset == null ? 0 : offset) + limit);
}
}
}
if (!StringUtils.isEmpty(orderByClause)) {
if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
@ -82,6 +99,7 @@ public class OracleDatabaseAdapter implements DatabaseAdapter {
query.append(") WHERE rnum > ");
query.append(offsetVal);
}
return query.toString();
}
}

View File

@ -36,8 +36,12 @@ public final class PhoenixDatabaseAdapter implements DatabaseAdapter {
}
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause,
Long limit, Long offset) {
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
}
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
@ -53,20 +57,34 @@ public final class PhoenixDatabaseAdapter implements DatabaseAdapter {
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
if (!StringUtils.isEmpty(columnForPartitioning)) {
query.append(" AND ");
query.append(columnForPartitioning);
query.append(" >= ");
query.append(offset != null ? offset : "0");
if (limit != null) {
query.append(" AND ");
query.append(columnForPartitioning);
query.append(" < ");
query.append((offset == null ? 0 : offset) + limit);
}
}
}
if (!StringUtils.isEmpty(orderByClause)) {
if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
if (limit != null) {
query.append(" LIMIT ");
query.append(limit);
}
if (offset != null && offset > 0) {
query.append(" OFFSET ");
query.append(offset);
}
if (StringUtils.isEmpty(columnForPartitioning)) {
if (limit != null) {
query.append(" LIMIT ");
query.append(limit);
}
if (offset != null && offset > 0) {
query.append(" OFFSET ");
query.append(offset);
}
}
return query.toString();
}
}

View File

@ -0,0 +1,69 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>GenerateTableFetch</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<p>
GenerateTableFetch uses its properties and the specified database connection to generate flow files
containing SQL statements that can be used to fetch "pages" (aka "partitions") of data from a table.
GenerateTableFetch executes a query to the database to determine the current row count and maximum value,
and if Maximum Value Columns are specified, will collect the count of rows whose values for the Maximum
Value Columns are larger than those last observed by GenerateTableFetch. This allows for incremental
fetching of "new" rows, rather than generating SQL to fetch the entire table each time. If no Maximum
Value Columns are set, then the processor will generate SQL to fetch the entire table each time.
</p>
<p>
In order to generate SQL that will fetch pages/partitions of data, by default GenerateTableFetch will
generate SQL that orders the data based on the Maximum Value Columns (if present) and utilize the row
numbers of the result set to determine each page. For example if the Maximum Value Column is an integer "id"
and the partition size is 10, then the SQL for the first page might be "SELECT * FROM myTable LIMIT 10" and
the second page might be "SELECT * FROM myTable OFFSET 10 LIMIT 10", and so on.
</p>
<p>
Ordering the data can be an expensive operation depending on the database, the number of rows, etc.
Alternatively, it is possible to specify a column whose values will be used to determine the pages, using
the Column for Value Partitioning property. If set, GenerateTableFetch will determine the minimum and
maximum values for the column, and uses the minimum value as the initial offset. The SQL to fetch a page is
then based on this initial offset and the total difference in values (i.e. maximum - minimum) divided by
the page size. For example, if the column "id" is used for value partitioning, and the column contains
values 100 to 200, then with a page size of 10 the SQL to fetch the first page might be "SELECT * FROM
myTable WHERE id >= 100 AND id < 110" and the second page might be "SELECT * FROM myTable WHERE id >= 110
AND id < 120", and so on.
</p>
<p>
It is important that the Column for Value Partitioning be set to a column whose type can be coerced to a
long integer (i.e. not date or timestamp), and that the column values are evenly distributed and not
sparse, for best performance. As a counterexample to the above, consider a column "id" whose values are 100,
2000, and 30000. If the Partition Size is 100, then the column values are relatively sparse, so the SQL
for the "second page" (see above example) will return zero rows, and so will every page until the value in
the query becomes "id >= 2000". Another counterexample is when the values are not uniformly distributed.
Consider a column "id" with values 100, 200, 201, 202, ... 299. Then the SQL for the first page (see above
example) will return one row with value id = 100, and the second page will return 100 rows with values 200
... 299. This can cause inconsistent processing times downstream, as the pages may contain a very different
number of rows. For these reasons it is recommended to use a Column for Value Partitioning that is
sufficiently dense (not sparse) and fairly evenly distributed.
</p>
</body>
</html>

View File

@ -855,9 +855,9 @@ public class TestGenerateTableFetch {
@Test
public void testRidiculousRowCount() throws ClassNotFoundException, SQLException, InitializationException, IOException {
long rowCount= Long.parseLong(Integer.toString(Integer.MAX_VALUE)) + 100;
long rowCount = Long.parseLong(Integer.toString(Integer.MAX_VALUE)) + 100;
int partitionSize = 1000000;
int expectedFileCount = (int)(rowCount/partitionSize) + 1;
int expectedFileCount = (int) (rowCount / partitionSize) + 1;
Connection conn = mock(Connection.class);
when(dbcp.getConnection()).thenReturn(conn);
@ -867,7 +867,7 @@ public class TestGenerateTableFetch {
ResultSet rs = mock(ResultSet.class);
when(st.executeQuery(anyString())).thenReturn(rs);
when(rs.next()).thenReturn(true);
when(rs.getInt(1)).thenReturn((int)rowCount);
when(rs.getInt(1)).thenReturn((int) rowCount);
when(rs.getLong(1)).thenReturn(rowCount);
final ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
@ -890,7 +890,7 @@ public class TestGenerateTableFetch {
runner.assertAllFlowFilesTransferred(REL_SUCCESS, expectedFileCount);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY ID FETCH NEXT 1000000 ROWS ONLY", query);
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE 1=1 ORDER BY ID FETCH NEXT 1000000 ROWS ONLY", query);
runner.clearTransferState();
}
@ -997,7 +997,7 @@ public class TestGenerateTableFetch {
ResultSet resultSet = stmt.executeQuery(query);
int numberRecordsFirstExecution = 0; // Should be three records
while(resultSet.next()) {
while (resultSet.next()) {
numberRecordsFirstExecution++;
}
runner.clearTransferState();
@ -1010,7 +1010,7 @@ public class TestGenerateTableFetch {
resultSet = stmt.executeQuery(query);
int numberRecordsSecondExecution = 0; // Should be three records
while(resultSet.next()) {
while (resultSet.next()) {
numberRecordsSecondExecution++;
}
@ -1227,12 +1227,12 @@ public class TestGenerateTableFetch {
runner.run(2);
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
assertEquals(2,processor.columnTypeMap.size());
assertEquals(2, processor.columnTypeMap.size());
runner.clearTransferState();
// Remove one element from columnTypeMap to simulate it's re-cache partial state
Map.Entry<String,Integer> entry = processor.columnTypeMap.entrySet().iterator().next();
Map.Entry<String, Integer> entry = processor.columnTypeMap.entrySet().iterator().next();
String key = entry.getKey();
processor.columnTypeMap.remove(key);
@ -1248,7 +1248,143 @@ public class TestGenerateTableFetch {
// It should re-cache column type
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
assertEquals(2,processor.columnTypeMap.size());
assertEquals(2, processor.columnTypeMap.size());
runner.clearTransferState();
}
@Test
public void testUseColumnValuesForPartitioning() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (10, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (11, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (12, NULL, 2.0, '2010-01-01 00:00:00')");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty(GenerateTableFetch.COLUMN_FOR_VALUE_PARTITIONING, "ID");
runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
// First flow file
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 12 AND ID >= 10 AND ID < 12", query);
ResultSet resultSet = stmt.executeQuery(query);
// Should be two records
assertTrue(resultSet.next());
assertTrue(resultSet.next());
assertFalse(resultSet.next());
// Second flow file
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 12 AND ID >= 12 AND ID < 14", query);
resultSet = stmt.executeQuery(query);
// Should be one record
assertTrue(resultSet.next());
assertFalse(resultSet.next());
runner.clearTransferState();
// Run again, this time no flowfiles/rows should be transferred
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
runner.clearTransferState();
// Add 3 new rows with a higher ID and run with a partition size of 2. Three flow files should be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (20, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (21, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (24, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 3);
// Verify first flow file's contents
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 12 AND ID <= 24 AND ID >= 20 AND ID < 22", query);
resultSet = stmt.executeQuery(query);
// Should be two records
assertTrue(resultSet.next());
assertTrue(resultSet.next());
assertFalse(resultSet.next());
// Verify second flow file's contents
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 12 AND ID <= 24 AND ID >= 22 AND ID < 24", query);
resultSet = stmt.executeQuery(query);
// Should be no records
assertFalse(resultSet.next());
// Verify third flow file's contents
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(2);
query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 12 AND ID <= 24 AND ID >= 24 AND ID < 26", query);
resultSet = stmt.executeQuery(query);
// Should be one record
assertTrue(resultSet.next());
assertFalse(resultSet.next());
runner.clearTransferState();
}
@Test
public void testUseColumnValuesForPartitioningNoMaxValueColumn() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (10, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (11, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (12, NULL, 2.0, '2010-01-01 00:00:00')");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(GenerateTableFetch.COLUMN_FOR_VALUE_PARTITIONING, "ID");
runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
// First flow file
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE 1=1 AND ID >= 10 AND ID < 12", query);
ResultSet resultSet = stmt.executeQuery(query);
// Should be two records
assertTrue(resultSet.next());
assertTrue(resultSet.next());
assertFalse(resultSet.next());
// Second flow file
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 12 AND ID >= 12", query);
resultSet = stmt.executeQuery(query);
// Should be one record
assertTrue(resultSet.next());
assertFalse(resultSet.next());
runner.clearTransferState();
// Run again, the same flowfiles should be transferred as we have no maximum-value column
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
runner.clearTransferState();
}

View File

@ -36,6 +36,11 @@ public class DerbyDatabaseAdapter implements DatabaseAdapter {
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
}
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
@ -51,21 +56,37 @@ public class DerbyDatabaseAdapter implements DatabaseAdapter {
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
}
if (!StringUtils.isEmpty(orderByClause)) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
if (offset != null && offset > 0) {
query.append(" OFFSET ");
query.append(offset);
query.append(" ROWS");
if (!StringUtils.isEmpty(columnForPartitioning)) {
query.append(" AND ");
query.append(columnForPartitioning);
query.append(" >= ");
query.append(offset != null ? offset : "0");
if (limit != null) {
query.append(" AND ");
query.append(columnForPartitioning);
query.append(" < ");
query.append((offset == null ? 0 : offset) + limit);
}
}
}
if (limit != null) {
query.append(" FETCH NEXT ");
query.append(limit);
query.append(" ROWS ONLY");
if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
if (StringUtils.isEmpty(columnForPartitioning)) {
if (offset != null && offset > 0) {
query.append(" OFFSET ");
query.append(offset);
query.append(" ROWS");
}
if (limit != null) {
query.append(" FETCH NEXT ");
query.append(limit);
query.append(" ROWS ONLY");
}
}
return query.toString();

View File

@ -79,4 +79,28 @@ public class TestMSSQL2008DatabaseAdapter {
+ "WHERE methods='strange' ORDER BY contain) A WHERE rnum > 123456 AND rnum <= 133456";
Assert.assertEquals(expected3, sql);
}
@Test
public void testPagingQueryUsingColumnValuesForPartitioning() {
String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
100L, 0L, "contain");
String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 0 AND contain < 100";
Assert.assertEquals(expected1, sql1);
String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
10000L, 123456L, "contain");
String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 123456 AND contain < 133456";
Assert.assertEquals(expected2, sql2);
String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'",
"contain", 10000L, 123456L, "contain");
String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' AND contain >= 123456 AND contain < 133456";
Assert.assertEquals(expected3, sql3);
// Paging (limit/offset) is only supported when an orderByClause is supplied, note that it is not honored here
String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "",
100L, null, "contain");
String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
Assert.assertEquals(expected4, sql4);
}
}

View File

@ -81,4 +81,28 @@ public class TestMSSQLDatabaseAdapter {
String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' ORDER BY contain OFFSET 123456 ROWS FETCH NEXT 10000 ROWS ONLY";
Assert.assertEquals(sql3,expected3);
}
@Test
public void testPagingQueryUsingColumnValuesForPartitioning() {
String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
100L, 0L, "contain");
String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 0 AND contain < 100";
Assert.assertEquals(expected1, sql1);
String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
10000L, 123456L, "contain");
String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 123456 AND contain < 133456";
Assert.assertEquals(expected2, sql2);
String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'",
"contain", 10000L, 123456L, "contain");
String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' AND contain >= 123456 AND contain < 133456";
Assert.assertEquals(expected3, sql3);
// Paging (limit/offset) is only supported when an orderByClause is supplied, note that it is not honored here
String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "",
100L, null, "contain");
String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
Assert.assertEquals(expected4, sql4);
}
}

View File

@ -26,45 +26,64 @@ public class TestOracle12DatabaseAdapter {
@Test
public void testGeneration() throws Exception {
String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","",null,null);
String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "", null, null);
String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
Assert.assertEquals(sql1,expected1);
Assert.assertEquals(sql1, expected1);
String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","that=\'some\"\' value\'","",null,null);
String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "that=\'some\"\' value\'", "", null, null);
String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE that=\'some\"\' value\'";
Assert.assertEquals(sql2,expected2);
Assert.assertEquals(sql2, expected2);
String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","that=\'some\"\' value\'","might DESC",null,null);
String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "that=\'some\"\' value\'", "might DESC", null, null);
String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
Assert.assertEquals(sql3,expected3);
Assert.assertEquals(sql3, expected3);
String sql4 = db.getSelectStatement("database.tablename", "","that=\'some\"\' value\'","might DESC",null,null);
String sql4 = db.getSelectStatement("database.tablename", "", "that=\'some\"\' value\'", "might DESC", null, null);
String expected4 = "SELECT * FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
Assert.assertEquals(sql4,expected4);
Assert.assertEquals(sql4, expected4);
}
@Test(expected = IllegalArgumentException.class)
public void testNoTableName() throws Exception {
db.getSelectStatement("", "some(set),of(columns),that,might,contain,methods,a.*","","",null,null);
db.getSelectStatement("", "some(set),of(columns),that,might,contain,methods,a.*", "", "", null, null);
}
@Test
public void testPagingQuery() throws Exception {
String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","contain",100L,0L);
String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "contain", 100L, 0L);
String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename ORDER BY contain FETCH NEXT 100 ROWS ONLY";
Assert.assertEquals(sql1,expected1);
Assert.assertEquals(sql1, expected1);
String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","contain",10000L,123456L);
String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "contain", 10000L, 123456L);
String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename ORDER BY contain OFFSET 123456 ROWS FETCH NEXT 10000 ROWS ONLY";
Assert.assertEquals(sql2,expected2);
Assert.assertEquals(sql2, expected2);
String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","methods='strange'","contain",10000L,123456L);
String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'", "contain", 10000L, 123456L);
String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' ORDER BY contain OFFSET 123456 ROWS FETCH NEXT 10000 ROWS ONLY";
Assert.assertEquals(sql3,expected3);
Assert.assertEquals(sql3, expected3);
String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","",100L,null);
String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "", 100L, null);
String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename FETCH NEXT 100 ROWS ONLY";
Assert.assertEquals(sql4,expected4);
Assert.assertEquals(sql4, expected4);
}
@Test
public void testPagingQueryUsingColumnValuesForPartitioning() throws Exception {
String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain", 100L, 0L, "contain");
String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 0 AND contain < 100";
Assert.assertEquals(expected1, sql1);
String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain", 10000L, 123456L, "contain");
String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 123456 AND contain < 133456";
Assert.assertEquals(expected2, sql2);
String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'", "contain", 10000L, 123456L, "contain");
String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' AND contain >= 123456 AND contain < 133456";
Assert.assertEquals(expected3, sql3);
// Paging (limit/offset) is only supported when an orderByClause is supplied, note that it is not honored here
String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "", 100L, null, "contain");
String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
Assert.assertEquals(expected4, sql4);
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.db.impl;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.junit.Assert;
import org.junit.Test;
public class TestOracleDatabaseAdapter {
private final DatabaseAdapter db = new OracleDatabaseAdapter();
@Test
public void testGeneration() {
String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "", null, null);
String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
Assert.assertEquals(expected1, sql1);
String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "that=\'some\"\' value\'", "", null, null);
String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE that=\'some\"\' value\'";
Assert.assertEquals(expected2, sql2);
String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "that=\'some\"\' value\'", "might DESC", null, null);
String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
Assert.assertEquals(expected3, sql3);
String sql4 = db.getSelectStatement("database.tablename", "", "that=\'some\"\' value\'", "might DESC", null, null);
String expected4 = "SELECT * FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
Assert.assertEquals(expected4, sql4);
}
@Test(expected = IllegalArgumentException.class)
public void testNoTableName() throws IllegalArgumentException {
db.getSelectStatement("", "some(set),of(columns),that,might,contain,methods,a.*", "", "", null, null);
}
@Test
public void testPagingQuery() {
String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "contain",
100L, 0L);
String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM (SELECT a.*, ROWNUM rnum FROM (SELECT some(set),of(columns),that,might,contain,methods,a.* "
+ "FROM database.tablename ORDER BY contain) a WHERE ROWNUM <= 100) WHERE rnum > 0";
Assert.assertEquals(expected1, sql1);
String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "contain",
10000L, 123456L);
String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM (SELECT a.*, ROWNUM rnum FROM (SELECT some(set),of(columns),that,might,contain,methods,a.* "
+ "FROM database.tablename ORDER BY contain) a WHERE ROWNUM <= 133456) WHERE rnum > 123456";
Assert.assertEquals(expected2, sql2);
String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'",
"contain", 10000L, 123456L);
String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM (SELECT a.*, ROWNUM rnum FROM (SELECT some(set),of(columns),that,might,contain,methods,a.* "
+ "FROM database.tablename WHERE methods='strange' ORDER BY contain) a WHERE ROWNUM <= 133456) WHERE rnum > 123456";
Assert.assertEquals(expected3, sql3);
String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "",
100L, null);
String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM (SELECT a.*, ROWNUM rnum FROM (SELECT some(set),of(columns),that,might,contain,methods,a.* "
+ "FROM database.tablename) a WHERE ROWNUM <= 100) WHERE rnum > 0";
Assert.assertEquals(expected4, sql4);
}
@Test
public void testPagingQueryUsingColumnValuesForPartitioning() {
String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
100L, 0L, "contain");
String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 0 AND contain < 100";
Assert.assertEquals(expected1, sql1);
String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
10000L, 123456L, "contain");
String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 123456 AND contain < 133456";
Assert.assertEquals(expected2, sql2);
String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'",
"contain", 10000L, 123456L, "contain");
String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' AND contain >= 123456 AND contain < 133456";
Assert.assertEquals(expected3, sql3);
// Paging (limit/offset) is only supported when an orderByClause is supplied, note that it is not honored here
String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "",
100L, null, "contain");
String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
Assert.assertEquals(expected4, sql4);
}
}