NIFI-4773: Moved DB Fetch processors' connection code from setup to onTrigger

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2422.
This commit is contained in:
Matthew Burgess 2018-01-22 13:42:35 -05:00 committed by Pierre Villard
parent 6f282c6843
commit 84848f7cbb
3 changed files with 62 additions and 41 deletions

View File

@ -50,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.sql.Types.ARRAY;
import static java.sql.Types.BIGINT;
@ -176,6 +177,10 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
// pre-fetched when the processor is scheduled, rather than having to populate them on-the-fly.
protected volatile boolean isDynamicMaxValues = false;
// This value is cleared when the processor is scheduled, and set to true after setup() is called and completes successfully. This enables
// the setup logic to be performed in onTrigger() versus OnScheduled to avoid any issues with DB connection when first scheduled to run.
protected final AtomicBoolean setupComplete = new AtomicBoolean(false);
private static SimpleDateFormat TIME_TYPE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS");
// A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time
@ -222,11 +227,21 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
return super.customValidate(validationContext);
}
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
// If the max-value columns have changed, we need to re-fetch the column info from the DB
if (MAX_VALUE_COLUMN_NAMES.equals(descriptor) && newValue != null && !newValue.equals(oldValue)) {
setupComplete.set(false);
}
}
public void setup(final ProcessContext context) {
setup(context,true,null);
}
public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFile flowFile) {
synchronized (setupComplete) {
setupComplete.set(false);
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue();
// If there are no max-value column names specified, we don't need to perform this processing
@ -250,7 +265,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int numCols = resultSetMetaData.getColumnCount();
if (numCols > 0) {
if (shouldCleanCache){
if (shouldCleanCache) {
columnTypeMap.clear();
}
for (int i = 1; i <= numCols; i++) {
@ -266,6 +281,8 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
} catch (SQLException e) {
throw new ProcessException("Unable to communicate with database in order to determine column types", e);
}
setupComplete.set(true);
}
}
protected static String getMaxValueFromRow(ResultSet resultSet,

View File

@ -155,9 +155,6 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
@OnScheduled
public void setup(final ProcessContext context) {
maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
if (!isDynamicTableName && !isDynamicMaxValues) {
super.setup(context);
}
if (context.hasIncomingConnection() && !context.hasNonLoopConnection()) {
getLogger().error("The failure relationship can be used only if there is another incoming connection to this processor.");
}
@ -165,6 +162,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
// Fetch the column/table info once (if the table name and max value columns are not dynamic). Otherwise do the setup later
if (!isDynamicTableName && !isDynamicMaxValues && !setupComplete.get()) {
super.setup(context);
}
ProcessSession session = sessionFactory.createSession();
FlowFile fileToProcess = null;

View File

@ -18,10 +18,10 @@ package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -73,7 +73,7 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAME
import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
@EventDriven
@TriggerSerially
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"sql", "select", "jdbc", "query", "database"})
@SeeAlso({GenerateTableFetch.class, ExecuteSQL.class})
@ -178,11 +178,14 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
@OnScheduled
public void setup(final ProcessContext context) {
maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
super.setup(context);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
// Fetch the column/table info once
if (!setupComplete.get()) {
super.setup(context);
}
ProcessSession session = sessionFactory.createSession();
final List<FlowFile> resultSetFlowFiles = new ArrayList<>();