From d1fd1f50922cf658dcbe1cbbef1e583ab63b95a8 Mon Sep 17 00:00:00 2001 From: Michael Karpel Date: Sun, 12 May 2019 11:31:02 +0300 Subject: [PATCH] Support for flowfile attribute in TABLE_NAME This closes #3472 Signed-off-by: Mike Thomsen --- .../org/apache/nifi/processors/kudu/PutKudu.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index 9c0c503dfd..a2889c7ea7 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -74,8 +74,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY; - @EventDriven @SupportsBatching @RequiresInstanceClassLoading // Because of calls to UserGroupInformation.setConfiguration @@ -91,7 +89,7 @@ public class PutKudu extends AbstractProcessor { .description("List all kudu masters's ip with port (e.g. 7051), comma separated") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(VARIABLE_REGISTRY) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); protected static final PropertyDescriptor TABLE_NAME = new Builder() @@ -99,7 +97,7 @@ public class PutKudu extends AbstractProcessor { .description("The name of the Kudu Table to put data into") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(VARIABLE_REGISTRY) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new Builder() @@ -169,7 +167,7 @@ public class PutKudu extends AbstractProcessor { .defaultValue("100") .required(true) .addValidator(StandardValidators.createLongValidator(1, 100000, true)) - .expressionLanguageSupported(VARIABLE_REGISTRY) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); @@ -190,7 +188,6 @@ public class PutKudu extends AbstractProcessor { protected int ffbatch = 1; protected KuduClient kuduClient; - protected KuduTable kuduTable; private volatile KerberosUser kerberosUser; @Override @@ -220,7 +217,6 @@ public class PutKudu extends AbstractProcessor { @OnScheduled public void onScheduled(final ProcessContext context) throws IOException, LoginException { - final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue(); operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue()); batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); @@ -230,7 +226,6 @@ public class PutKudu extends AbstractProcessor { getLogger().debug("Setting up Kudu connection..."); final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); kuduClient = createClient(kuduMasters, credentialsService); - kuduTable = kuduClient.openTable(tableName); getLogger().debug("Kudu connection successfully initialized"); } @@ -307,9 +302,11 @@ public class PutKudu extends AbstractProcessor { final List pendingRowErrors = new ArrayList<>(); for (FlowFile flowFile : flowFiles) { try (final InputStream in = session.read(flowFile); - final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) { + final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) { final List fieldNames = recordReader.getSchema().getFieldNames(); final RecordSet recordSet = recordReader.createRecordSet(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final KuduTable kuduTable = kuduClient.openTable(tableName); Record record = recordSet.next(); while (record != null) {