NIFI-5049 Fix handling of Phonenix datetime columns

This closes #2625

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Gardella Juan Pablo 2018-04-11 01:23:00 -03:00 committed by Mike Thomsen
parent 099bfcdf3a
commit 64356e0014
4 changed files with 133 additions and 8 deletions

View File

@ -30,6 +30,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter; import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter;
import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.StringUtils;
import java.io.IOException; import java.io.IOException;
@ -485,13 +486,26 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
case NVARCHAR: case NVARCHAR:
case VARCHAR: case VARCHAR:
case ROWID: case ROWID:
case DATE:
case TIME:
return "'" + value + "'"; return "'" + value + "'";
case TIME:
if (PhoenixDatabaseAdapter.NAME.equals(databaseType)) {
return "time '" + value + "'";
}
case DATE:
case TIMESTAMP: case TIMESTAMP:
if (!StringUtils.isEmpty(databaseType) && databaseType.contains("Oracle")) { // TODO delegate to database adapter the conversion instead of using if in this
// For backwards compatibility, the type might be TIMESTAMP but the state value is in DATE format. This should be a one-time occurrence as the next maximum value // class.
// should be stored as a full timestamp. Even so, check to see if the value is missing time-of-day information, and use the "date" coercion rather than the // TODO (cont) if a new else is added, please refactor the code.
// Ideally we should probably have a method on the adapter to get a clause that
// coerces a
// column to a Timestamp if need be (the generic one can be a no-op)
if (!StringUtils.isEmpty(databaseType)
&& (databaseType.contains("Oracle") || PhoenixDatabaseAdapter.NAME.equals(databaseType))) {
// For backwards compatibility, the type might be TIMESTAMP but the state value
// is in DATE format. This should be a one-time occurrence as the next maximum
// value
// should be stored as a full timestamp. Even so, check to see if the value is
// missing time-of-day information, and use the "date" coercion rather than the
// "timestamp" coercion in that case // "timestamp" coercion in that case
if (value.matches("\\d{4}-\\d{2}-\\d{2}")) { if (value.matches("\\d{4}-\\d{2}-\\d{2}")) {
return "date '" + value + "'"; return "date '" + value + "'";

View File

@ -310,9 +310,10 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue(); final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds st.setQueryTimeout(queryTimeout); // timeout in seconds
try { if (logger.isDebugEnabled()) {
logger.debug("Executing query {}", new Object[]{selectQuery}); logger.debug("Executing query {}", new Object[] { selectQuery });
final ResultSet resultSet = st.executeQuery(selectQuery); }
try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
int fragmentIndex=0; int fragmentIndex=0;
while(true) { while(true) {
final AtomicLong nrOfRows = new AtomicLong(0L); final AtomicLong nrOfRows = new AtomicLong(0L);

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.db.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
/**
* A Apache Phoenix database adapter that generates ANSI SQL.
*/
public final class PhoenixDatabaseAdapter implements DatabaseAdapter {
public static final String NAME = "Phoenix";
@Override
public String getName() {
return NAME;
}
@Override
public String getDescription() {
return "Generates Phoenix compliant SQL";
}
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause,
Long limit, Long offset) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
final StringBuilder query = new StringBuilder("SELECT ");
if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) {
query.append("*");
} else {
query.append(columnNames);
}
query.append(" FROM ");
query.append(tableName);
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
}
if (!StringUtils.isEmpty(orderByClause)) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
if (limit != null) {
query.append(" LIMIT ");
query.append(limit);
}
if (offset != null && offset > 0) {
query.append(" OFFSET ");
query.append(offset);
}
return query.toString();
}
}

View File

@ -32,6 +32,7 @@ import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter;
import org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
@ -180,6 +181,43 @@ public class QueryDatabaseTableTest {
dbAdapter = new OracleDatabaseAdapter(); dbAdapter = new OracleDatabaseAdapter();
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap()); query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND (type = \"CUSTOMER\")", query); assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND (type = \"CUSTOMER\")", query);
// Test time.
processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "time_created", Types.TIME);
maxValues.clear();
maxValues.put("id", "509");
maxValues.put("time_created", "12:34:57");
maxValues.put("date_created", "2016-03-07 12:34:56");
stateManager = runner.getStateManager();
stateManager.clear(Scope.CLUSTER);
stateManager.setState(maxValues, Scope.CLUSTER);
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND TIME_CREATED >= timestamp '12:34:57' AND (type = \"CUSTOMER\")", query);
dbAdapter = new GenericDatabaseAdapter();
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56' AND TIME_CREATED >= '12:34:57' AND (type = \"CUSTOMER\")", query);
}
@Test
public void testGetQueryUsingPhoenixAdapter() throws Exception {
Map<String, String> maxValues = new HashMap<>();
StateManager stateManager = runner.getStateManager();
processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER);
processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "time_created", Types.TIME);
processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", Types.TIMESTAMP);
maxValues.put("id", "509");
maxValues.put("time_created", "12:34:57");
maxValues.put("date_created", "2016-03-07 12:34:56");
stateManager.setState(maxValues, Scope.CLUSTER);
dbAdapter = new PhoenixDatabaseAdapter();
String query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND TIME_CREATED >= time '12:34:57' AND (type = \"CUSTOMER\")", query);
// Cover the other path
dbAdapter = new GenericDatabaseAdapter();
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56' AND TIME_CREATED >= '12:34:57' AND (type = \"CUSTOMER\")", query);
} }
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)