Support for flowfile attribute in TABLE_NAME

This closes #3472

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Michael Karpel 2019-05-12 11:31:02 +03:00 committed by Mike Thomsen
parent f08c2ee43f
commit d1fd1f5092
1 changed files with 6 additions and 9 deletions

View File

@ -74,8 +74,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
@EventDriven
@SupportsBatching
@RequiresInstanceClassLoading // Because of calls to UserGroupInformation.setConfiguration
@ -91,7 +89,7 @@ public class PutKudu extends AbstractProcessor {
.description("List all kudu masters's ip with port (e.g. 7051), comma separated")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(VARIABLE_REGISTRY)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor TABLE_NAME = new Builder()
@ -99,7 +97,7 @@ public class PutKudu extends AbstractProcessor {
.description("The name of the Kudu Table to put data into")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(VARIABLE_REGISTRY)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new Builder()
@ -169,7 +167,7 @@ public class PutKudu extends AbstractProcessor {
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.createLongValidator(1, 100000, true))
.expressionLanguageSupported(VARIABLE_REGISTRY)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
@ -190,7 +188,6 @@ public class PutKudu extends AbstractProcessor {
protected int ffbatch = 1;
protected KuduClient kuduClient;
protected KuduTable kuduTable;
private volatile KerberosUser kerberosUser;
@Override
@ -220,7 +217,6 @@ public class PutKudu extends AbstractProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException, LoginException {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
@ -230,7 +226,6 @@ public class PutKudu extends AbstractProcessor {
getLogger().debug("Setting up Kudu connection...");
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
kuduClient = createClient(kuduMasters, credentialsService);
kuduTable = kuduClient.openTable(tableName);
getLogger().debug("Kudu connection successfully initialized");
}
@ -307,9 +302,11 @@ public class PutKudu extends AbstractProcessor {
final List<RowError> pendingRowErrors = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
try (final InputStream in = session.read(flowFile);
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final List<String> fieldNames = recordReader.getSchema().getFieldNames();
final RecordSet recordSet = recordReader.createRecordSet();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final KuduTable kuduTable = kuduClient.openTable(tableName);
Record record = recordSet.next();
while (record != null) {