diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 2a90a924ac..c07683a50e 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -880,6 +880,12 @@ language governing permissions and limitations under the License. --> 1.18.0-SNAPSHOT nar + + org.apache.nifi + nifi-airtable-nar + 1.18.0-SNAPSHOT + nar + diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-nar/pom.xml b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-nar/pom.xml new file mode 100644 index 0000000000..a6a89c3bdf --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-nar/pom.xml @@ -0,0 +1,48 @@ + + + + + + nifi-airtable-bundle + org.apache.nifi + 1.18.0-SNAPSHOT + + 4.0.0 + + nifi-airtable-nar + nar + + true + true + + + + org.apache.nifi + nifi-airtable-processors + 1.18.0-SNAPSHOT + + + org.apache.nifi + nifi-standard-services-api-nar + 1.18.0-SNAPSHOT + nar + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..9aa168b42c --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,45 @@ +nifi-airtable-nar +Copyright 2014-2022 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +************************** +Apache Software License v2 +************************** + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2016 The Apache Software Foundation + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/pom.xml b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/pom.xml new file mode 100644 index 0000000000..207c4cd95c --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/pom.xml @@ -0,0 +1,84 @@ + + + + + + nifi-airtable-bundle + org.apache.nifi + 1.18.0-SNAPSHOT + + 4.0.0 + + nifi-airtable-processors + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + 1.18.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-web-client-provider-api + + + org.apache.commons + commons-lang3 + + + commons-io + commons-io + + + com.fasterxml.jackson.core + jackson-databind + + + + org.apache.nifi + nifi-mock + + + org.apache.nifi + nifi-web-client-provider-service + 1.18.0-SNAPSHOT + test + + + com.squareup.okhttp3 + mockwebserver + test + + + org.apache.nifi + nifi-ssl-context-service-api + test + + + org.apache.nifi + nifi-proxy-configuration-api + test + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java new file mode 100644 index 0000000000..ca5e877736 --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/QueryAirtableTable.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable; + +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.processors.airtable.service.AirtableRestService.API_V0_BASE_URL; + +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.airtable.parse.AirtableRetrieveTableResult; +import org.apache.nifi.processors.airtable.parse.AirtableTableRetriever; +import org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters; +import org.apache.nifi.processors.airtable.service.AirtableRestService; +import org.apache.nifi.processors.airtable.service.RateLimitExceededException; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +@PrimaryNodeOnly +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@TriggerSerially +@TriggerWhenEmpty +@Tags({"airtable", "query", "database"}) +@CapabilityDescription("Query records from an Airtable table. Records are incrementally retrieved based on the last modified time of the records." + + " Records can also be further filtered by setting the 'Custom Filter' property which supports the formulas provided by the Airtable API." + + " This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "The last successful query's time is stored in order to enable incremental loading." + + " The initial query returns all the records in the table and each subsequent query filters the records by their last modified time." + + " In other words, if a record is updated after the last successful query only the updated records will be returned in the next query." + + " State is stored across the cluster, so this Processor can run only on the Primary Node and if a new Primary Node is selected," + + " the new node can pick up where the previous one left off without duplicating the data.") +@WritesAttributes({ + @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile."), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Records Per FlowFile' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Records Per FlowFile' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."), + @WritesAttribute(attribute = "fragment.index", description = "If 'Max Records Per FlowFile' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced"), +}) +@DefaultSettings(yieldDuration = "15 sec") +public class QueryAirtableTable extends AbstractProcessor { + + static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder() + .name("api-url") + .displayName("API URL") + .description("The URL for the Airtable REST API including the domain and the path to the API (e.g. https://api.airtable.com/v0).") + .defaultValue(API_V0_BASE_URL) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .addValidator(StandardValidators.URL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder() + .name("api-key") + .displayName("API Key") + .description("The REST API key to use in queries. Should be generated on Airtable's account page.") + .required(true) + .sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor BASE_ID = new PropertyDescriptor.Builder() + .name("base-id") + .displayName("Base ID") + .description("The ID of the Airtable base to be queried.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor TABLE_ID = new PropertyDescriptor.Builder() + .name("table-id") + .displayName("Table ID") + .description("The name or the ID of the Airtable table to be queried.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder() + .name("fields") + .displayName("Fields") + .description("Comma-separated list of fields to query from the table. Both the field's name and ID can be used.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor CUSTOM_FILTER = new PropertyDescriptor.Builder() + .name("custom-filter") + .displayName("Custom Filter") + .description("Filter records by Airtable's formulas.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor QUERY_TIME_WINDOW_LAG = new PropertyDescriptor.Builder() + .name("query-time-window-lag") + .displayName("Query Time Window Lag") + .description("The amount of lag to be applied to the query time window's end point. Set this property to avoid missing records when the clock of your local machines" + + " and Airtable servers' clock are not in sync. Must be greater than or equal to 1 second.") + .defaultValue("3 s") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder() + .name("web-client-service-provider") + .displayName("Web Client Service Provider") + .description("Web Client Service Provider to use for Airtable REST API requests") + .identifiesControllerService(WebClientServiceProvider.class) + .required(true) + .build(); + + static final PropertyDescriptor QUERY_PAGE_SIZE = new PropertyDescriptor.Builder() + .name("query-page-size") + .displayName("Query Page Size") + .description("Number of records to be fetched in a page. Should be between 1 and 100 inclusively.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.createLongValidator(1, 100, true)) + .build(); + + static final PropertyDescriptor MAX_RECORDS_PER_FLOWFILE = new PropertyDescriptor.Builder() + .name("max-records-per-flowfile") + .displayName("Max Records Per FlowFile") + .description("The maximum number of result records that will be included in a single FlowFile. This will allow you to break up very large" + + " result sets into multiple FlowFiles. If no value specified, then all records are returned in a single FlowFile.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles created as a result of a successful query.") + .build(); + + private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + API_URL, + API_KEY, + BASE_ID, + TABLE_ID, + FIELDS, + CUSTOM_FILTER, + QUERY_TIME_WINDOW_LAG, + WEB_CLIENT_SERVICE_PROVIDER, + QUERY_PAGE_SIZE, + MAX_RECORDS_PER_FLOWFILE + )); + + private static final Set RELATIONSHIPS = Collections.singleton(REL_SUCCESS); + + private static final String LAST_QUERY_TIME_WINDOW_END = "last_query_time_window_end"; + + private volatile AirtableRestService airtableRestService; + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + final String apiUrl = context.getProperty(API_URL).evaluateAttributeExpressions().getValue(); + final String apiKey = context.getProperty(API_KEY).getValue(); + final String baseId = context.getProperty(BASE_ID).evaluateAttributeExpressions().getValue(); + final String tableId = context.getProperty(TABLE_ID).evaluateAttributeExpressions().getValue(); + final WebClientServiceProvider webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); + airtableRestService = new AirtableRestService(webClientServiceProvider, apiUrl, apiKey, baseId, tableId); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final Integer maxRecordsPerFlowFile = context.getProperty(MAX_RECORDS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger(); + final Long queryTimeWindowLagSeconds = context.getProperty(QUERY_TIME_WINDOW_LAG).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS); + + final StateMap state; + try { + state = session.getState(Scope.CLUSTER); + } catch (IOException e) { + throw new ProcessException("Failed to get cluster state", e); + } + + final String lastRecordFetchDateTime = state.get(LAST_QUERY_TIME_WINDOW_END); + final String currentRecordFetchDateTime = OffsetDateTime.now() + .minusSeconds(queryTimeWindowLagSeconds) + .truncatedTo(ChronoUnit.SECONDS) + .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); + + final AirtableGetRecordsParameters getRecordsParameters = buildGetRecordsParameters(context, lastRecordFetchDateTime, currentRecordFetchDateTime); + final AirtableRetrieveTableResult retrieveTableResult; + try { + final AirtableTableRetriever tableRetriever = new AirtableTableRetriever(airtableRestService, getRecordsParameters, maxRecordsPerFlowFile); + retrieveTableResult = tableRetriever.retrieveAll(session); + } catch (IOException e) { + throw new ProcessException("Failed to read Airtable records", e); + } catch (RateLimitExceededException e) { + context.yield(); + throw new ProcessException("Airtable REST API rate limit exceeded while reading records", e); + } + + final Map newState = new HashMap<>(state.toMap()); + newState.put(LAST_QUERY_TIME_WINDOW_END, currentRecordFetchDateTime); + try { + session.setState(newState, Scope.CLUSTER); + } catch (IOException e) { + throw new ProcessException("Failed to update cluster state", e); + } + + final List flowFiles = retrieveTableResult.getFlowFiles(); + if (flowFiles.isEmpty()) { + context.yield(); + return; + } + + if (maxRecordsPerFlowFile != null) { + addFragmentAttributesToFlowFiles(session, flowFiles); + } + transferFlowFiles(session, flowFiles, retrieveTableResult.getTotalRecordCount()); + } + + private AirtableGetRecordsParameters buildGetRecordsParameters(final ProcessContext context, + final String lastRecordFetchTime, + final String nowDateTimeString) { + Objects.requireNonNull(context); + Objects.requireNonNull(nowDateTimeString); + + final String fieldsProperty = context.getProperty(FIELDS).evaluateAttributeExpressions().getValue(); + final String customFilter = context.getProperty(CUSTOM_FILTER).evaluateAttributeExpressions().getValue(); + final Integer pageSize = context.getProperty(QUERY_PAGE_SIZE).evaluateAttributeExpressions().asInteger(); + + final AirtableGetRecordsParameters.Builder getRecordsParametersBuilder = new AirtableGetRecordsParameters.Builder(); + if (lastRecordFetchTime != null) { + getRecordsParametersBuilder.modifiedAfter(lastRecordFetchTime); + } + getRecordsParametersBuilder.modifiedBefore(nowDateTimeString); + if (fieldsProperty != null) { + getRecordsParametersBuilder.fields(Arrays.stream(fieldsProperty.split(",")).map(String::trim).collect(Collectors.toList())); + } + getRecordsParametersBuilder.customFilter(customFilter); + if (pageSize != null) { + getRecordsParametersBuilder.pageSize(pageSize); + } + + return getRecordsParametersBuilder.build(); + } + + private void addFragmentAttributesToFlowFiles(final ProcessSession session, final List flowFiles) { + final String fragmentIdentifier = UUID.randomUUID().toString(); + for (int i = 0; i < flowFiles.size(); i++) { + final Map fragmentAttributes = new HashMap<>(); + fragmentAttributes.put(FRAGMENT_ID.key(), fragmentIdentifier); + fragmentAttributes.put(FRAGMENT_INDEX.key(), String.valueOf(i)); + fragmentAttributes.put(FRAGMENT_COUNT.key(), String.valueOf(flowFiles.size())); + + flowFiles.set(i, session.putAllAttributes(flowFiles.get(i), fragmentAttributes)); + } + } + + private void transferFlowFiles(final ProcessSession session, final List flowFiles, final int totalRecordCount) { + final String transitUri = airtableRestService.createUriBuilder().build().toString(); + for (final FlowFile flowFile : flowFiles) { + session.getProvenanceReporter().receive(flowFile, transitUri); + session.transfer(flowFile, REL_SUCCESS); + } + session.adjustCounter("Records Processed", totalRecordCount, false); + final String flowFilesAsString = flowFiles.stream().map(FlowFile::toString).collect(Collectors.joining(", ", "[", "]")); + getLogger().debug("Transferred FlowFiles [{}] Records [{}]", flowFilesAsString, totalRecordCount); + } +} diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableRecordSetFlowFileWriter.java b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableRecordSetFlowFileWriter.java new file mode 100644 index 0000000000..bb96315e55 --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableRecordSetFlowFileWriter.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable.parse; + +import static org.apache.nifi.processors.airtable.parse.AirtableTableRetriever.JSON_FACTORY; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import java.io.IOException; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessSession; + +public class AirtableRecordSetFlowFileWriter { + + private final FlowFile flowFile; + private final JsonGenerator jsonGenerator; + private int recordCount = 0; + + private AirtableRecordSetFlowFileWriter(final FlowFile flowFile, final JsonGenerator jsonGenerator) { + this.flowFile = flowFile; + this.jsonGenerator = jsonGenerator; + } + + public static AirtableRecordSetFlowFileWriter startRecordSet(final ProcessSession session) throws IOException { + final FlowFile flowFile = session.create(); + final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(session.write(flowFile)); + jsonGenerator.writeStartArray(); + return new AirtableRecordSetFlowFileWriter(flowFile, jsonGenerator); + } + + public void writeRecord(final JsonParser jsonParser) throws IOException { + recordCount++; + jsonGenerator.copyCurrentStructure(jsonParser); + } + + public FlowFile closeRecordSet(final ProcessSession session) throws IOException { + jsonGenerator.writeEndArray(); + jsonGenerator.close(); + FlowFile flowFileWithAttributes = session.putAttribute(flowFile, "record.count", String.valueOf(recordCount)); + flowFileWithAttributes = session.putAttribute(flowFileWithAttributes, CoreAttributes.MIME_TYPE.key(), "application/json"); + return flowFileWithAttributes; + } + + public int getRecordCount() { + return recordCount; + } +} diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableRetrievePageResult.java b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableRetrievePageResult.java new file mode 100644 index 0000000000..e279507348 --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableRetrievePageResult.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable.parse; + +import java.util.List; +import java.util.Optional; +import org.apache.nifi.flowfile.FlowFile; + +public class AirtableRetrievePageResult { + + private final Optional nextOffset; + private final List flowFiles; + private final int parsedRecordCount; + private final Optional ongoingRecordSetFlowFileWriter; + + public AirtableRetrievePageResult(final Optional nextOffset, + final List flowFiles, + final int parsedRecordCount, + final Optional ongoingRecordSetFlowFileWriter) { + this.nextOffset = nextOffset; + this.flowFiles = flowFiles; + this.parsedRecordCount = parsedRecordCount; + this.ongoingRecordSetFlowFileWriter = ongoingRecordSetFlowFileWriter; + } + + public Optional getNextOffset() { + return nextOffset; + } + + public Optional getOngoingRecordSetFlowFileWriter() { + return ongoingRecordSetFlowFileWriter; + } + + public List getFlowFiles() { + return flowFiles; + } + + public int getParsedRecordCount() { + return parsedRecordCount; + } +} diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableRetrieveTableResult.java b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableRetrieveTableResult.java new file mode 100644 index 0000000000..427324f909 --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableRetrieveTableResult.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable.parse; + +import java.util.List; +import org.apache.nifi.flowfile.FlowFile; + +public class AirtableRetrieveTableResult { + + private final List flowFiles; + private final int totalRecordCount; + + public AirtableRetrieveTableResult(final List flowFiles, final int totalRecordCount) { + this.flowFiles = flowFiles; + this.totalRecordCount = totalRecordCount; + } + + public List getFlowFiles() { + return flowFiles; + } + + public int getTotalRecordCount() { + return totalRecordCount; + } +} diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableTableRetriever.java b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableTableRetriever.java new file mode 100644 index 0000000000..b2bfaa20e9 --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/parse/AirtableTableRetriever.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable.parse; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator.Feature; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.airtable.service.AirtableGetRecordsParameters; +import org.apache.nifi.processors.airtable.service.AirtableRestService; + +public class AirtableTableRetriever { + + static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory() + .configure(Feature.AUTO_CLOSE_JSON_CONTENT, false); + + final AirtableRestService airtableRestService; + final AirtableGetRecordsParameters getRecordsParameters; + final Integer maxRecordsPerFlowFile; + + public AirtableTableRetriever(final AirtableRestService airtableRestService, + final AirtableGetRecordsParameters getRecordsParameters, + final Integer maxRecordsPerFlowFile) { + this.airtableRestService = airtableRestService; + this.getRecordsParameters = getRecordsParameters; + this.maxRecordsPerFlowFile = maxRecordsPerFlowFile; + } + + public AirtableRetrieveTableResult retrieveAll(final ProcessSession session) throws IOException { + int totalRecordCount = 0; + final List flowFiles = new ArrayList<>(); + AirtableRetrievePageResult retrievePageResult = null; + do { + retrievePageResult = retrieveNextPage(session, Optional.ofNullable(retrievePageResult)); + totalRecordCount += retrievePageResult.getParsedRecordCount(); + flowFiles.addAll(retrievePageResult.getFlowFiles()); + } while (retrievePageResult.getNextOffset().isPresent()); + + retrievePageResult.getOngoingRecordSetFlowFileWriter() + .map(writer -> { + try { + return writer.closeRecordSet(session); + } catch (IOException e) { + throw new ProcessException("Failed to close Airtable record writer", e); + } + }) + .ifPresent(flowFiles::add); + return new AirtableRetrieveTableResult(flowFiles, totalRecordCount); + } + + private AirtableRetrievePageResult retrieveNextPage(final ProcessSession session, final Optional previousPageResult) { + final AirtableGetRecordsParameters parameters = previousPageResult.flatMap(AirtableRetrievePageResult::getNextOffset) + .map(getRecordsParameters::withOffset) + .orElse(getRecordsParameters); + + return airtableRestService.getRecords(parameters, inputStream -> parsePage(inputStream, session, previousPageResult)); + } + + private AirtableRetrievePageResult parsePage(final InputStream inputStream, final ProcessSession session, final Optional previousPageResult) { + final List flowFiles = new ArrayList<>(); + AirtableRecordSetFlowFileWriter flowFileWriter = previousPageResult.flatMap(AirtableRetrievePageResult::getOngoingRecordSetFlowFileWriter) + .orElse(null); + int parsedRecordCount = 0; + String nextOffset = null; + try (final JsonParser jsonParser = JSON_FACTORY.createParser(inputStream)) { + while (jsonParser.nextToken() != null) { + if (jsonParser.currentToken() != JsonToken.FIELD_NAME) { + continue; + } + switch (jsonParser.currentName()) { + case "records": + jsonParser.nextToken(); + if (jsonParser.currentToken() != JsonToken.START_ARRAY) { + break; + } + while (jsonParser.nextToken() != null && jsonParser.currentToken() != JsonToken.END_ARRAY) { + if (jsonParser.currentToken() != JsonToken.START_OBJECT) { + continue; + } + if (flowFileWriter == null) { + flowFileWriter = AirtableRecordSetFlowFileWriter.startRecordSet(session); + } + ++parsedRecordCount; + flowFileWriter.writeRecord(jsonParser); + if (maxRecordsPerFlowFile != null && maxRecordsPerFlowFile == flowFileWriter.getRecordCount()) { + flowFiles.add(flowFileWriter.closeRecordSet(session)); + flowFileWriter = null; + } + + } + break; + case "offset": + jsonParser.nextToken(); + nextOffset = jsonParser.getValueAsString(); + break; + } + } + } catch (final IOException e) { + throw new ProcessException("Failed to parse Airtable query table response page", e); + } + return new AirtableRetrievePageResult(Optional.ofNullable(nextOffset), flowFiles, parsedRecordCount, Optional.ofNullable(flowFileWriter)); + } +} diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/service/AirtableGetRecordsParameters.java b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/service/AirtableGetRecordsParameters.java new file mode 100644 index 0000000000..42fff1849b --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/service/AirtableGetRecordsParameters.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable.service; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; + +public class AirtableGetRecordsParameters { + + private final List fields; + private final Optional modifiedAfter; + private final Optional modifiedBefore; + private final Optional customFilter; + private final Optional offset; + private final OptionalInt pageSize; + + public AirtableGetRecordsParameters(final List fields, + final Optional modifiedAfter, + final Optional modifiedBefore, + final Optional customFilter, + final Optional offset, + final OptionalInt pageSize) { + this.fields = Objects.requireNonNull(fields); + this.modifiedAfter = modifiedAfter; + this.modifiedBefore = modifiedBefore; + this.customFilter = customFilter; + this.offset = offset; + this.pageSize = pageSize; + } + + public List getFields() { + return fields; + } + + public Optional getModifiedAfter() { + return modifiedAfter; + } + + public Optional getModifiedBefore() { + return modifiedBefore; + } + + public Optional getCustomFilter() { + return customFilter; + } + + public Optional getOffset() { + return offset; + } + + public OptionalInt getPageSize() { + return pageSize; + } + + public AirtableGetRecordsParameters withOffset(final String offset) { + return new AirtableGetRecordsParameters(fields, modifiedAfter, modifiedBefore, customFilter, Optional.of(offset), pageSize); + } + + public static class Builder { + private List fields; + private String modifiedAfter; + private String modifiedBefore; + private String customFilter; + private String offset; + private OptionalInt pageSize = OptionalInt.empty(); + + public Builder fields(final List fields) { + this.fields = fields; + return this; + } + + public Builder field(final String field) { + if (fields == null) { + fields = new ArrayList<>(); + } + fields.add(field); + return this; + } + + public Builder modifiedAfter(final String modifiedAfter) { + this.modifiedAfter = modifiedAfter; + return this; + } + + public Builder modifiedBefore(final String modifiedBefore) { + this.modifiedBefore = modifiedBefore; + return this; + } + + public Builder customFilter(final String customFilter) { + this.customFilter = customFilter; + return this; + } + + public Builder offset(final String offset) { + this.offset = offset; + return this; + } + + public Builder pageSize(final int pageSize) { + this.pageSize = OptionalInt.of(pageSize); + return this; + } + + public AirtableGetRecordsParameters build() { + return new AirtableGetRecordsParameters(fields != null ? fields : new ArrayList<>(), + Optional.ofNullable(modifiedAfter), + Optional.ofNullable(modifiedBefore), + Optional.ofNullable(customFilter), + Optional.ofNullable(offset), + pageSize); + } + } +} diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/service/AirtableRestService.java b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/service/AirtableRestService.java new file mode 100644 index 0000000000..b3fb75577d --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/service/AirtableRestService.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable.service; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.Range; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +public class AirtableRestService { + + public static final String API_V0_BASE_URL = "https://api.airtable.com/v0"; + + private static final int TOO_MANY_REQUESTS = 429; + private static final Range SUCCESSFUL_RESPONSE_RANGE = Range.between(200, 299); + + private final WebClientServiceProvider webClientServiceProvider; + private final String apiUrl; + private final String apiKey; + private final String baseId; + private final String tableId; + + public AirtableRestService(final WebClientServiceProvider webClientServiceProvider, + final String apiUrl, + final String apiKey, + final String baseId, + final String tableId) { + this.webClientServiceProvider = webClientServiceProvider; + this.apiUrl = apiUrl; + this.apiKey = apiKey; + this.baseId = baseId; + this.tableId = tableId; + } + + public R getRecords(final AirtableGetRecordsParameters getRecordsParameters, final Function callback) { + final URI uri = buildUri(getRecordsParameters); + try (final HttpResponseEntity response = webClientServiceProvider.getWebClientService() + .get() + .uri(uri) + .header("Authorization", "Bearer " + apiKey) + .retrieve()) { + + final InputStream bodyInputStream = response.body(); + if (SUCCESSFUL_RESPONSE_RANGE.contains(response.statusCode())) { + return callback.apply(bodyInputStream); + } + if (response.statusCode() == TOO_MANY_REQUESTS) { + throw new RateLimitExceededException(); + } + final StringBuilder exceptionMessageBuilder = new StringBuilder("Error response. Code: " + response.statusCode()); + final String bodyText = IOUtils.toString(bodyInputStream, StandardCharsets.UTF_8); + if (bodyText != null) { + exceptionMessageBuilder.append(" Body: ").append(bodyText); + } + + throw new ProcessException(exceptionMessageBuilder.toString()); + } catch (IOException e) { + throw new ProcessException(String.format("Airtable HTTP request failed [%s]", uri), e); + } + } + + public HttpUriBuilder createUriBuilder() { + final URI uri = URI.create(apiUrl); + final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder() + .scheme(uri.getScheme()) + .host(uri.getHost()) + .encodedPath(uri.getPath()) + .addPathSegment(baseId) + .addPathSegment(tableId); + if (uri.getPort() != -1) { + uriBuilder.port(uri.getPort()); + } + return uriBuilder; + } + + private URI buildUri(AirtableGetRecordsParameters getRecordsParameters) { + final HttpUriBuilder uriBuilder = createUriBuilder(); + for (final String field : getRecordsParameters.getFields()) { + uriBuilder.addQueryParameter("fields[]", field); + } + + final List filters = new ArrayList<>(); + getRecordsParameters.getCustomFilter() + .ifPresent(filters::add); + getRecordsParameters.getModifiedAfter() + .map(modifiedAfter -> { + final String isSameFormula = "IS_SAME(LAST_MODIFIED_TIME(),DATETIME_PARSE(\"" + modifiedAfter + "\"), 'second')"; + final String isAfterFormula = "IS_AFTER(LAST_MODIFIED_TIME(),DATETIME_PARSE(\"" + modifiedAfter + "\"))"; + return "OR(" + isSameFormula + "," + isAfterFormula + ")"; + }) + .ifPresent(filters::add); + getRecordsParameters.getModifiedBefore() + .map(modifiedBefore -> "IS_BEFORE(LAST_MODIFIED_TIME(),DATETIME_PARSE(\"" + modifiedBefore + "\"))") + .ifPresent(filters::add); + if (!filters.isEmpty()) { + uriBuilder.addQueryParameter("filterByFormula", "AND(" + String.join(",", filters) + ")"); + } + getRecordsParameters.getOffset().ifPresent(offset -> uriBuilder.addQueryParameter("offset", offset)); + getRecordsParameters.getPageSize().ifPresent(pageSize -> uriBuilder.addQueryParameter("pageSize", String.valueOf(pageSize))); + + return uriBuilder.build(); + } +} diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/service/RateLimitExceededException.java b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/service/RateLimitExceededException.java new file mode 100644 index 0000000000..f05e2430a7 --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/java/org/apache/nifi/processors/airtable/service/RateLimitExceededException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable.service; + +public class RateLimitExceededException extends RuntimeException { + + public RateLimitExceededException() { + super("Airtable REST API rate limit exceeded"); + } +} diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..cc78da9252 --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.airtable.QueryAirtableTable \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/resources/docs/org.apache.nifi.processors.airtable.QueryAirtableTable/additionalDetails.html b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/resources/docs/org.apache.nifi.processors.airtable.QueryAirtableTable/additionalDetails.html new file mode 100644 index 0000000000..50086b57c2 --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/main/resources/docs/org.apache.nifi.processors.airtable.QueryAirtableTable/additionalDetails.html @@ -0,0 +1,63 @@ + + + + + + + QueryAirtableTable + + + + + + +

QueryAirtableTable

+ +

Description

+

+ Airtable is a spreadsheet-database hybrid. In Airtable an application is called base and each base can have multiple tables. + A table consists of records (rows) and each record can have multiple fields (columns). + The QueryAirtableTable processor can query records from a single base and table via Airtable's REST API. + The processor utilizes streams to be able to handle a large number of records. + It can also split large record sets to multiple FlowFiles just like a database processor. +

+ +

API Key

+

+ Airtable REST API calls requires an API Key that needs to be passed in a request. An Airtable account is required to generate an API Key. +

+ +

API rate limit

+

+ The Airtable REST API limits the number of requests that can be sent on a per-base basis to avoid bottlenecks. Currently, this limit is 5 requests per second per base. + If this limit is exceeded you can't make another request for 30 seconds. + It's your responsibility to handle this rate limit via configuring Yield Duration and Run Schedule properly. + It is recommended to start off with the default settings and to increase both parameters when rate limit issues occur. +

+ +

Metadata API

+

+ Currently the Metadata API of Airtable is unstable, and we don't provide a way to use it. + Until it becomes stable you can set up a ConvertRecord or MergeRecord processor with a JsonTreeReader to read the content and convert it into a Record with schema. +

+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/test/java/org/apache/nifi/processors/airtable/TestQueryAirtableTable.java b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/test/java/org/apache/nifi/processors/airtable/TestQueryAirtableTable.java new file mode 100644 index 0000000000..dadd0cccb2 --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/test/java/org/apache/nifi/processors/airtable/TestQueryAirtableTable.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.airtable; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import okhttp3.HttpUrl; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; +import org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class TestQueryAirtableTable { + + public static final String RECORDS_JSON_BODY = "{\"records\":[{" + + "\"id\":\"recabcdefghijklmn\"," + + "\"createdTime\":\"1970-00-01T00:00:00.000Z\"," + + "\"fields\":{\"foo\":\"bar\"}}]}"; + public static final String RECORDS_WITH_OFFSET_JSON_BODY = "{\"records\":[{" + + "\"id\":\"recabcdefghijklmn\"," + + "\"createdTime\":\"1970-00-01T00:00:00.000Z\"," + + "\"fields\":{\"foo\":\"bar\"}}]," + + "\"offset\":\"ofsabcdefghijklmn\"}"; + public static final String EXPECTED_RECORD_CONTENT = + "{\"id\":\"recabcdefghijklmn\",\"createdTime\":\"1970-00-01T00:00:00.000Z\",\"fields\":{\"foo\":\"bar\"}}"; + public static final String API_URL_PATH = "/v0/airtable"; + + private TestRunner runner; + + @BeforeEach + void setUp() throws Exception { + final Processor queryAirtableTable = new QueryAirtableTable(); + + runner = TestRunners.newTestRunner(queryAirtableTable); + + final WebClientServiceProvider webClientServiceProvider = new StandardWebClientServiceProvider(); + runner.addControllerService("webClientService", webClientServiceProvider); + runner.enableControllerService(webClientServiceProvider); + + runner.setProperty(QueryAirtableTable.API_KEY, "???"); + runner.setProperty(QueryAirtableTable.BASE_ID, "baseid"); + runner.setProperty(QueryAirtableTable.TABLE_ID, "tableid"); + runner.setProperty(QueryAirtableTable.WEB_CLIENT_SERVICE_PROVIDER, webClientServiceProvider.getIdentifier()); + } + + @AfterEach + void tearDown() { + runner.shutdown(); + } + + @Test + void retrievesAndWritesRecords() throws Exception { + try (final MockWebServer server = new MockWebServer()) { + server.enqueue(new MockResponse().setBody(RECORDS_JSON_BODY)); + + server.start(); + final HttpUrl httpUrl = server.url(API_URL_PATH); + + runner.setProperty(QueryAirtableTable.API_URL, httpUrl.toString()); + runner.run(); + + final List results = runner.getFlowFilesForRelationship(QueryAirtableTable.REL_SUCCESS); + assertEquals(1, results.size()); + final MockFlowFile flowFile = results.get(0); + assertEquals("1", flowFile.getAttribute("record.count")); + final String content = flowFile.getContent(); + assertEquals("[" + EXPECTED_RECORD_CONTENT + "]", content); + } + } + + @Test + void retrievesAndWritesPagedRecords() throws Exception { + try (final MockWebServer server = new MockWebServer()) { + server.enqueue(new MockResponse().setBody(RECORDS_WITH_OFFSET_JSON_BODY)); + server.enqueue(new MockResponse().setBody(RECORDS_JSON_BODY)); + + server.start(); + final HttpUrl httpUrl = server.url(API_URL_PATH); + + runner.setProperty(QueryAirtableTable.API_URL, httpUrl.toString()); + runner.run(); + + final List results = runner.getFlowFilesForRelationship(QueryAirtableTable.REL_SUCCESS); + assertEquals(1, results.size()); + final MockFlowFile flowFile = results.get(0); + assertEquals("2", flowFile.getAttribute("record.count")); + final String content = flowFile.getContent(); + assertEquals("[" + EXPECTED_RECORD_CONTENT + "," + EXPECTED_RECORD_CONTENT + "]", content); + } + } + + @Test + void retrievesAndWritesPagedRecordsInMultipleFlowFiles() throws Exception { + try (final MockWebServer server = new MockWebServer()) { + server.enqueue(new MockResponse().setBody(RECORDS_WITH_OFFSET_JSON_BODY)); + server.enqueue(new MockResponse().setBody(RECORDS_JSON_BODY)); + + server.start(); + final HttpUrl httpUrl = server.url(API_URL_PATH); + + runner.setProperty(QueryAirtableTable.MAX_RECORDS_PER_FLOWFILE, "1"); + runner.setProperty(QueryAirtableTable.API_URL, httpUrl.toString()); + runner.run(); + + final List results = runner.getFlowFilesForRelationship(QueryAirtableTable.REL_SUCCESS); + assertEquals(2, results.size()); + final MockFlowFile firstFlowFile = results.get(0); + assertEquals("1", firstFlowFile.getAttribute("record.count")); + final String firstContent = firstFlowFile.getContent(); + assertEquals("[" + EXPECTED_RECORD_CONTENT + "]", firstContent); + + final MockFlowFile secondFlowFile = results.get(1); + assertEquals("1", secondFlowFile.getAttribute("record.count")); + final String secondContent = secondFlowFile.getContent(); + assertEquals("[" + EXPECTED_RECORD_CONTENT + "]", secondContent); + } + } + + @Test + void doesNotWriteEmptyRecords() throws Exception { + try (final MockWebServer server = new MockWebServer()) { + server.enqueue(new MockResponse().setBody("{\"records\":[]}")); + + server.start(); + final HttpUrl httpUrl = server.url(API_URL_PATH); + + runner.setProperty(QueryAirtableTable.API_URL, httpUrl.toString()); + runner.run(); + + final List results = runner.getFlowFilesForRelationship(QueryAirtableTable.REL_SUCCESS); + assertTrue(results.isEmpty()); + } + } +} diff --git a/nifi-nar-bundles/nifi-airtable-bundle/pom.xml b/nifi-nar-bundles/nifi-airtable-bundle/pom.xml new file mode 100644 index 0000000000..df6f17a1fe --- /dev/null +++ b/nifi-nar-bundles/nifi-airtable-bundle/pom.xml @@ -0,0 +1,35 @@ + + + + + + nifi-nar-bundles + org.apache.nifi + 1.18.0-SNAPSHOT + + 4.0.0 + + nifi-airtable-bundle + pom + + nifi-airtable-processors + nifi-airtable-nar + + \ No newline at end of file diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 718be12d4c..37bc3640aa 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -112,6 +112,7 @@ nifi-rocksdb-bundle nifi-hubspot-bundle nifi-dropbox-bundle + nifi-airtable-bundle @@ -380,6 +381,12 @@ 1.18.0-SNAPSHOT provided
+ + org.apache.nifi + nifi-web-client-provider-api + 1.18.0-SNAPSHOT + provided + org.apache.nifi