diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index d923e3ae75..78f67698e8 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -892,6 +892,12 @@ language governing permissions and limitations under the License. --> 1.18.0-SNAPSHOT nar + + org.apache.nifi + nifi-zendesk-nar + 1.18.0-SNAPSHOT + nar + org.apache.nifi nifi-dropbox-processors-nar diff --git a/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-nar/pom.xml b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-nar/pom.xml new file mode 100644 index 0000000000..b84a6fcdfc --- /dev/null +++ b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-nar/pom.xml @@ -0,0 +1,46 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-zendesk-bundle + 1.18.0-SNAPSHOT + + + nifi-zendesk-nar + nar + + true + true + + + + + org.apache.nifi + nifi-zendesk-processors + 1.18.0-SNAPSHOT + + + org.apache.nifi + nifi-standard-services-api-nar + 1.18.0-SNAPSHOT + nar + + + + diff --git a/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..de4b130f35 --- /dev/null +++ b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,204 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + diff --git a/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..24842de097 --- /dev/null +++ b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,39 @@ +nifi-zendesk-nar +Copyright 2015-2022 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2017 The Apache Software Foundation + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/pom.xml b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/pom.xml new file mode 100644 index 0000000000..ed6b77303b --- /dev/null +++ b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/pom.xml @@ -0,0 +1,91 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-zendesk-bundle + 1.18.0-SNAPSHOT + + + nifi-zendesk-processors + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + 1.18.0-SNAPSHOT + + + org.apache.nifi + nifi-web-client-provider-api + 1.18.0-SNAPSHOT + provided + + + com.fasterxml.jackson.core + jackson-databind + + + commons-io + commons-io + + + + + org.apache.nifi + nifi-mock + test + + + org.apache.nifi + nifi-web-client-provider-service + 1.18.0-SNAPSHOT + test + + + org.apache.nifi + nifi-ssl-context-service-api + test + + + org.apache.nifi + nifi-proxy-configuration-api + test + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + com.squareup.okhttp3 + mockwebserver + test + + + diff --git a/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/GetZendesk.java b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/GetZendesk.java new file mode 100644 index 0000000000..9c0229d09f --- /dev/null +++ b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/GetZendesk.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.zendesk; + +import static com.fasterxml.jackson.core.JsonEncoding.UTF8; +import static com.fasterxml.jackson.core.JsonToken.FIELD_NAME; +import static com.fasterxml.jackson.core.JsonToken.VALUE_NULL; +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Base64.getEncoder; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonMap; +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toList; +import static org.apache.nifi.annotation.behavior.InputRequirement.Requirement.INPUT_FORBIDDEN; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.POSITIVE_LONG_VALIDATOR; +import static org.apache.nifi.processors.zendesk.GetZendesk.RECORD_COUNT_ATTRIBUTE_NAME; +import static org.apache.nifi.web.client.api.HttpResponseStatus.OK; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +@PrimaryNodeOnly +@TriggerSerially +@InputRequirement(INPUT_FORBIDDEN) +@DefaultSettings(yieldDuration = "20 sec") +@Tags({"zendesk"}) +@CapabilityDescription("Incrementally fetches data from Zendesk API.") +@Stateful(scopes = CLUSTER, description = "Paging cursor for Zendesk API is stored. Cursor is updated after each successful request.") +@WritesAttributes({ + @WritesAttribute(attribute = RECORD_COUNT_ATTRIBUTE_NAME, description = "The number of records fetched by the processor.")}) +public class GetZendesk extends AbstractProcessor { + + static final int HTTP_TOO_MANY_REQUESTS = 429; + static final String RECORD_COUNT_ATTRIBUTE_NAME = "record.count"; + + static final String REL_SUCCESS_NAME = "success"; + static final String WEB_CLIENT_SERVICE_PROVIDER_NAME = "web-client-service-provider"; + static final String ZENDESK_SUBDOMAIN_NAME = "zendesk-subdomain"; + static final String ZENDESK_USER_NAME = "zendesk-user"; + static final String ZENDESK_AUTHENTICATION_TYPE_NAME = "zendesk-authentication-type-name"; + static final String ZENDESK_AUTHENTICATION_CREDENTIAL_NAME = "zendesk-authentication-value-name"; + static final String ZENDESK_EXPORT_METHOD_NAME = "zendesk-export-method"; + static final String ZENDESK_RESOURCE_NAME = "zendesk-resource"; + static final String ZENDESK_QUERY_START_TIMESTAMP_NAME = "zendesk-query-start-timestamp"; + + private static final String HTTPS = "https"; + private static final String AUTHORIZATION_HEADER_NAME = "Authorization"; + private static final String BASIC_AUTH_PREFIX = "Basic "; + private static final String ZENDESK_HOST_TEMPLATE = "%s.zendesk.com"; + + private static final Relationship REL_SUCCESS = new Relationship.Builder() + .name(REL_SUCCESS_NAME) + .description("For FlowFiles created as a result of a successful HTTP request.") + .build(); + + private static final Set RELATIONSHIPS = singleton(REL_SUCCESS); + + private static final PropertyDescriptor WEB_CLIENT_SERVICE_PROVIDER = new PropertyDescriptor.Builder() + .name(WEB_CLIENT_SERVICE_PROVIDER_NAME) + .displayName("Web Client Service Provider") + .description("Controller service for HTTP client operations.") + .identifiesControllerService(WebClientServiceProvider.class) + .required(true) + .build(); + + private static final PropertyDescriptor ZENDESK_SUBDOMAIN = new PropertyDescriptor.Builder() + .name(ZENDESK_SUBDOMAIN_NAME) + .displayName("Zendesk Subdomain Name") + .description("Name of the Zendesk subdomain.") + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .required(true) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + private static final PropertyDescriptor ZENDESK_USER = new PropertyDescriptor.Builder() + .name(ZENDESK_USER_NAME) + .displayName("Zendesk User Name") + .description("Login user to Zendesk subdomain.") + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .required(true) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + private static final PropertyDescriptor ZENDESK_AUTHENTICATION_TYPE = new PropertyDescriptor.Builder() + .name(ZENDESK_AUTHENTICATION_TYPE_NAME) + .displayName("Zendesk Authentication Type") + .description("Type of authentication to Zendesk API.") + .required(true) + .allowableValues(ZendeskAuthenticationType.class) + .build(); + + private static final PropertyDescriptor ZENDESK_AUTHENTICATION_CREDENTIAL = new PropertyDescriptor.Builder() + .name(ZENDESK_AUTHENTICATION_CREDENTIAL_NAME) + .displayName("Zendesk Authentication Credential") + .description("Password or authentication token for Zendesk login user.") + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .sensitive(true) + .required(true) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + + private static final PropertyDescriptor ZENDESK_EXPORT_METHOD = new PropertyDescriptor.Builder() + .name(ZENDESK_EXPORT_METHOD_NAME) + .displayName("Zendesk Export Method") + .description("Method for incremental export.") + .required(true) + .allowableValues(ZendeskExportMethod.class) + .build(); + + private static final PropertyDescriptor ZENDESK_RESOURCE = new PropertyDescriptor.Builder() + .name(ZENDESK_RESOURCE_NAME) + .displayName("Zendesk Resource") + .description("The particular Zendesk resource which is meant to be exported.") + .required(true) + .allowableValues(ZendeskResource.class) + .build(); + + private static final PropertyDescriptor ZENDESK_QUERY_START_TIMESTAMP = new PropertyDescriptor.Builder() + .name(ZENDESK_QUERY_START_TIMESTAMP_NAME) + .displayName("Zendesk Query Start Timestamp") + .description("Initial timestamp to query Zendesk API from in Unix timestamp seconds format.") + .addValidator(POSITIVE_LONG_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .required(true) + .build(); + + private static final List DESCRIPTORS = Stream.of( + WEB_CLIENT_SERVICE_PROVIDER, + ZENDESK_SUBDOMAIN, + ZENDESK_USER, + ZENDESK_AUTHENTICATION_TYPE, + ZENDESK_AUTHENTICATION_CREDENTIAL, + ZENDESK_EXPORT_METHOD, + ZENDESK_RESOURCE, + ZENDESK_QUERY_START_TIMESTAMP + ).collect(collectingAndThen(toList(), Collections::unmodifiableList)); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory(); + + private volatile WebClientServiceProvider webClientServiceProvider; + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + public List getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + List results = new ArrayList<>(1); + + ZendeskExportMethod exportMethod = ZendeskExportMethod.forName(validationContext.getProperty(ZENDESK_EXPORT_METHOD).getValue()); + ZendeskResource zendeskResource = ZendeskResource.forName(validationContext.getProperty(ZENDESK_RESOURCE).getValue()); + if (!zendeskResource.supportsExportMethod(exportMethod)) { + results.add(new ValidationResult.Builder() + .subject(ZENDESK_EXPORT_METHOD_NAME) + .valid(false) + .explanation("Not supported export method for resource.") + .build()); + } + + return results; + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) { + ZendeskResource zendeskResource = ZendeskResource.forName(context.getProperty(ZENDESK_RESOURCE).getValue()); + ZendeskExportMethod exportMethod = ZendeskExportMethod.forName(context.getProperty(ZENDESK_EXPORT_METHOD).getValue()); + + URI uri = createUri(context, zendeskResource, exportMethod); + HttpResponseEntity response = performQuery(context, uri); + + if (response.statusCode() == OK.getCode()) { + AtomicInteger resultCount = new AtomicInteger(0); + FlowFile createdFlowFile = session.write( + session.create(), + httpResponseParser(context, response, zendeskResource, exportMethod, resultCount)); + int recordCount = resultCount.get(); + if (recordCount > 0) { + FlowFile updatedFlowFile = session.putAttribute(createdFlowFile, RECORD_COUNT_ATTRIBUTE_NAME, Integer.toString(recordCount)); + session.getProvenanceReporter().receive(updatedFlowFile, uri.toString()); + session.transfer(updatedFlowFile, REL_SUCCESS); + } else { + session.remove(createdFlowFile); + } + } else if (response.statusCode() == HTTP_TOO_MANY_REQUESTS) { + getLogger().error("Rate limit exceeded for uri={}, yielding before retrying request.", uri); + context.yield(); + } else { + getLogger().error("HTTP {} error for uri={} with response={}, yielding before retrying request.", response.statusCode(), uri, responseBodyToString(context, response)); + context.yield(); + } + } + + private URI createUri(ProcessContext context, ZendeskResource zendeskResource, ZendeskExportMethod exportMethod) { + String subDomain = context.getProperty(ZENDESK_SUBDOMAIN).evaluateAttributeExpressions().getValue(); + String resourcePath = zendeskResource.apiPath(exportMethod); + HttpUriBuilder uriBuilder = uriBuilder(subDomain, resourcePath); + + String cursor = getCursorState(context, zendeskResource, exportMethod); + if (cursor == null) { + String queryStartTimestamp = context.getProperty(ZENDESK_QUERY_START_TIMESTAMP).evaluateAttributeExpressions().getValue(); + uriBuilder.addQueryParameter(exportMethod.getInitialCursorQueryParameterName(), queryStartTimestamp); + } else { + uriBuilder.addQueryParameter(exportMethod.getCursorQueryParameterName(), cursor); + } + return uriBuilder.build(); + } + + HttpUriBuilder uriBuilder(String subDomain, String resourcePath) { + return webClientServiceProvider.getHttpUriBuilder() + .scheme(HTTPS) + .host(format(ZENDESK_HOST_TEMPLATE, subDomain)) + .encodedPath(resourcePath); + } + + private String getCursorState(ProcessContext context, ZendeskResource zendeskResource, ZendeskExportMethod exportMethod) { + try { + return context.getStateManager().getState(CLUSTER).get(zendeskResource.getValue() + exportMethod.getValue()); + } catch (IOException e) { + throw new ProcessException("Failed to retrieve cursor state", e); + } + } + + private HttpResponseEntity performQuery(ProcessContext context, URI uri) { + String userName = context.getProperty(ZENDESK_USER).evaluateAttributeExpressions().getValue(); + ZendeskAuthenticationType authenticationType = ZendeskAuthenticationType.forName(context.getProperty(ZENDESK_AUTHENTICATION_TYPE).getValue()); + String authenticationCredential = context.getProperty(ZENDESK_AUTHENTICATION_CREDENTIAL).evaluateAttributeExpressions().getValue(); + + return webClientServiceProvider.getWebClientService() + .get() + .uri(uri) + .header(AUTHORIZATION_HEADER_NAME, basicAuthHeaderValue(authenticationType.enrichUserName(userName), authenticationCredential)) + .retrieve(); + } + + private String basicAuthHeaderValue(String user, String credential) { + String userWithPassword = user + ":" + credential; + return BASIC_AUTH_PREFIX + getEncoder().encodeToString(userWithPassword.getBytes()); + } + + private OutputStreamCallback httpResponseParser(ProcessContext context, HttpResponseEntity response, + ZendeskResource zendeskResource, ZendeskExportMethod exportMethod, + AtomicInteger resultCount) { + return out -> { + try (JsonParser parser = JSON_FACTORY.createParser(response.body()); + JsonGenerator generator = JSON_FACTORY.createGenerator(out, UTF8)) { + while (parser.nextToken() != null) { + if (parser.getCurrentToken() == FIELD_NAME) { + String fieldName = parser.getCurrentName(); + parser.nextToken(); + if (zendeskResource.getResponseFieldName().equals(fieldName)) { + int numberOfExtractedRecords = extractZendeskResourceData(parser, generator); + resultCount.addAndGet(numberOfExtractedRecords); + } + if (exportMethod.getCursorJsonFieldName().equals(fieldName) && parser.currentToken() != VALUE_NULL) { + updateCursorState(context, zendeskResource, exportMethod, parser.getText()); + } + } + } + } + }; + } + + private int extractZendeskResourceData(JsonParser parser, JsonGenerator generator) throws IOException { + ArrayNode zendeskItems = OBJECT_MAPPER.readTree(parser); + if (zendeskItems.size() > 0) { + generator.writeStartArray(); + for (JsonNode zendeskItem : zendeskItems) { + generator.writeTree(zendeskItem); + } + generator.writeEndArray(); + } + return zendeskItems.size(); + } + + private void updateCursorState(ProcessContext context, ZendeskResource zendeskResource, ZendeskExportMethod exportMethod, String cursor) { + try { + context.getStateManager().setState(singletonMap(zendeskResource.getValue() + exportMethod.getValue(), cursor), CLUSTER); + } catch (IOException e) { + throw new ProcessException("Failed to update cursor state", e); + } + } + + private String responseBodyToString(ProcessContext context, HttpResponseEntity response) { + try { + return IOUtils.toString(response.body(), UTF_8); + } catch (IOException e) { + context.yield(); + throw new UncheckedIOException("Reading response body has failed", e); + } + } +} diff --git a/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/ZendeskAuthenticationType.java b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/ZendeskAuthenticationType.java new file mode 100644 index 0000000000..5a16a84dae --- /dev/null +++ b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/ZendeskAuthenticationType.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.zendesk; + +import static java.lang.String.format; + +import java.util.stream.Stream; +import org.apache.nifi.components.DescribedValue; + +public enum ZendeskAuthenticationType implements DescribedValue { + PASSWORD("password", "Password", + "Password of Zendesk login user.") { + @Override + public String enrichUserName(String userName) { + return userName; + } + }, + TOKEN("token", "Token", + "Authentication token generated in Zendesk Admin menu for API access.") { + @Override + public String enrichUserName(String userName) { + return format(ZENDESK_USERNAME_WITH_TOKEN_TEMPLATE, userName); + } + }; + + private static final String ZENDESK_USERNAME_WITH_TOKEN_TEMPLATE = "%s/token"; + + private final String value; + private final String displayName; + private final String description; + + ZendeskAuthenticationType(String value, String displayName, String description) { + this.value = value; + this.displayName = displayName; + this.description = description; + } + + public static ZendeskAuthenticationType forName(String authenticationType) { + return Stream.of(values()).filter(authType -> authType.getValue().equalsIgnoreCase(authenticationType)).findFirst() + .orElseThrow(() -> new IllegalArgumentException("Invalid Zendesk authentication type: " + authenticationType)); + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + + public abstract String enrichUserName(String userName); +} diff --git a/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/ZendeskExportMethod.java b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/ZendeskExportMethod.java new file mode 100644 index 0000000000..128ccd24cf --- /dev/null +++ b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/ZendeskExportMethod.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.zendesk; + +import java.util.stream.Stream; +import org.apache.nifi.components.DescribedValue; + +public enum ZendeskExportMethod implements DescribedValue { + CURSOR("cursor", "Cursor Based", "%s/cursor.json", + "start_time", "cursor", "after_cursor", + "In cursor-based incremental exports, each page of results includes an \"after\" cursor pointer to use as the starting cursor for the next page of results."), + TIME("time", "Time Based", "%s.json", + "start_time", "start_time", "end_time", + "In time-based incremental exports, each page of results includes an end time to use as the start time for the next page of results."); + + private final String value; + private final String displayName; + private final String exportApiPathTemplate; + private final String initialCursorQueryParameterName; + private final String cursorQueryParameterName; + private final String cursorJsonFieldName; + private final String description; + + ZendeskExportMethod(String value, String displayName, String exportApiPathTemplate, String initialCursorQueryParameterName, + String cursorQueryParameterName, String cursorJsonFieldName, String description) { + this.value = value; + this.displayName = displayName; + this.description = description; + this.exportApiPathTemplate = exportApiPathTemplate; + this.initialCursorQueryParameterName = initialCursorQueryParameterName; + this.cursorQueryParameterName = cursorQueryParameterName; + this.cursorJsonFieldName = cursorJsonFieldName; + } + + public static ZendeskExportMethod forName(String methodName) { + return Stream.of(values()).filter(m -> m.getValue().equalsIgnoreCase(methodName)).findFirst() + .orElseThrow(() -> new IllegalArgumentException("Invalid Zendesk incremental export method: " + methodName)); + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + + public String getExportApiPathTemplate() { + return exportApiPathTemplate; + } + + public String getInitialCursorQueryParameterName() { + return initialCursorQueryParameterName; + } + + public String getCursorQueryParameterName() { + return cursorQueryParameterName; + } + + public String getCursorJsonFieldName() { + return cursorJsonFieldName; + } +} diff --git a/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/ZendeskResource.java b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/ZendeskResource.java new file mode 100644 index 0000000000..6d1d33b3c9 --- /dev/null +++ b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/ZendeskResource.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.zendesk; + +import static java.lang.String.format; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.apache.nifi.processors.zendesk.ZendeskExportMethod.CURSOR; +import static org.apache.nifi.processors.zendesk.ZendeskExportMethod.TIME; + +import java.util.List; +import java.util.stream.Stream; +import org.apache.nifi.components.DescribedValue; + +public enum ZendeskResource implements DescribedValue { + TICKETS("/api/v2/incremental/tickets", "Tickets", "tickets", unmodifiableList(asList(TIME, CURSOR)), + "Tickets are the means through which end users (customers) communicate with agents in Zendesk Support."), + TICKET_EVENTS("/api/v2/incremental/ticket_events", "Ticket Events", "ticket_events", unmodifiableList(asList(TIME)), + "Stream of changes that occurred on tickets. Each event is tied to an update on a ticket and contains all the fields that were updated in that change."), + TICKET_METRIC_EVENTS("/api/v2/incremental/ticket_metric_events", "Ticket Metric Events", "ticket_metric_events", unmodifiableList(asList(TIME)), + "Ticket metric events API can be used to track reply times, agent work times, and requester wait times."), + USERS("/api/v2/incremental/users", "Users", "users", unmodifiableList(asList(TIME, CURSOR)), + "Zendesk Support has three types of users: end users (customers), agents, and administrators."), + ORGANIZATIONS("/api/v2/incremental/organizations", "Organizations", "organizations", unmodifiableList(asList(TIME)), + "Just as agents can be segmented into groups in Zendesk Support, customers (end-users) can be segmented into organizations."), + ARTICLES("/api/v2/help_center/incremental/articles", "Articles", "articles", unmodifiableList(asList(TIME)), + "Articles are content items such as help topics or tech notes contained in sections."), + NPS_RESPONSES("/api/v2/nps/incremental/responses", "NPS - Responses", "responses", unmodifiableList(asList(TIME)), + "When a recipient responds to an NPS survey, their rating, comment, and last survey date are captured."), + NPS_RECIPIENTS("/api/v2/nps/incremental/recipients", "NPS - Recipients", "recipients", unmodifiableList(asList(TIME)), + "Every NPS survey is delivered to one or multiple recipients. For most businesses that use Zendesk Support, the recipients are customers. Agents and admins will never receive surveys."); + + private final String value; + private final String displayName; + private final String responseFieldName; + private final List supportedExportMethods; + private final String description; + + ZendeskResource(String value, String displayName, String responseFieldName, List supportedExportMethods, String description) { + this.value = value; + this.displayName = displayName; + this.responseFieldName = responseFieldName; + this.supportedExportMethods = supportedExportMethods; + this.description = description; + } + + public static ZendeskResource forName(String resourceName) { + return Stream.of(values()).filter(r -> r.getValue().equalsIgnoreCase(resourceName)).findFirst() + .orElseThrow(() -> new IllegalArgumentException("Invalid Zendesk resource: " + resourceName)); + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + + public String getResponseFieldName() { + return responseFieldName; + } + + public List getSupportedExportMethods() { + return supportedExportMethods; + } + + public boolean supportsExportMethod(ZendeskExportMethod exportMethod) { + return supportedExportMethods.contains(exportMethod); + } + + public String apiPath(ZendeskExportMethod exportMethod) { + return format(exportMethod.getExportApiPathTemplate(), value); + } +} diff --git a/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..4129ab6136 --- /dev/null +++ b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.zendesk.GetZendesk \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/resources/org.apache.nifi.processors.zendesk.GetZendesk/additionalDetails.html b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/resources/org.apache.nifi.processors.zendesk.GetZendesk/additionalDetails.html new file mode 100644 index 0000000000..e97af5914b --- /dev/null +++ b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/resources/org.apache.nifi.processors.zendesk.GetZendesk/additionalDetails.html @@ -0,0 +1,71 @@ + + + + + + + GetZendesk + + + + + + +

GetZendesk

+ +

Description

+ +

+ The processor uses the Zendesk Incremental Exports API to initially export a complete list of items from some arbitrary milestone, + and then periodically poll the API to incrementally export items that have been added or changed since the previous poll. + The processor extracts data from the response and emits a flow file having an array of objects as content if the response was not empty, + also placing an attribute on the flow file having the value of the number of records fetched. + If the response was empty, no flow file is emitted. + SplitJson processor can be used the split the array of records into distinct flow files where each flow file will contain exactly one record. +

+ +

Authentication

+ +

+ Zendesk Incremental Exports API uses basic authentication. Either a password or an authentication token have to be provided. + Authentication token can be created in Zendesk API Settings, so the users don't have to expose their passwords, + and also auth tokens can be revoked quickly if necessary. +

+ +

Export methods

+ +

+ Zendesk Incremental Exports API supports cursor and time based export methods. + Cursor based method is the preferred way and should be used where available. + Due to the limitations of time based export the result set may contain duplicated records. + For more details on export methods please visit this guide +

+ +

Excluding duplicate items

+ +

+ Because of limitations with time-based pagination, the exported data may contain duplicate items. + The processor won't do the deduplication, instead DetectDuplicate or DeduplicateRecord processors can be used with UpdateAttribute processor to extract the necessary attributes from the flow file content. + Please see the following guide + for details and the list of attributes to use in the deduplication process. +

+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/test/java/org/apache/nifi/processors/zendesk/GetZendeskTest.java b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/test/java/org/apache/nifi/processors/zendesk/GetZendeskTest.java new file mode 100644 index 0000000000..e5bcf7a750 --- /dev/null +++ b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/test/java/org/apache/nifi/processors/zendesk/GetZendeskTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.zendesk; + +import static java.lang.String.format; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processors.zendesk.GetZendesk.HTTP_TOO_MANY_REQUESTS; +import static org.apache.nifi.processors.zendesk.GetZendesk.RECORD_COUNT_ATTRIBUTE_NAME; +import static org.apache.nifi.processors.zendesk.GetZendesk.REL_SUCCESS_NAME; +import static org.apache.nifi.processors.zendesk.GetZendesk.WEB_CLIENT_SERVICE_PROVIDER_NAME; +import static org.apache.nifi.processors.zendesk.GetZendesk.ZENDESK_AUTHENTICATION_CREDENTIAL_NAME; +import static org.apache.nifi.processors.zendesk.GetZendesk.ZENDESK_AUTHENTICATION_TYPE_NAME; +import static org.apache.nifi.processors.zendesk.GetZendesk.ZENDESK_EXPORT_METHOD_NAME; +import static org.apache.nifi.processors.zendesk.GetZendesk.ZENDESK_QUERY_START_TIMESTAMP_NAME; +import static org.apache.nifi.processors.zendesk.GetZendesk.ZENDESK_RESOURCE_NAME; +import static org.apache.nifi.processors.zendesk.GetZendesk.ZENDESK_SUBDOMAIN_NAME; +import static org.apache.nifi.processors.zendesk.GetZendesk.ZENDESK_USER_NAME; +import static org.apache.nifi.processors.zendesk.ZendeskExportMethod.CURSOR; +import static org.apache.nifi.processors.zendesk.ZendeskResource.TICKETS; +import static org.apache.nifi.util.TestRunners.newTestRunner; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; +import okhttp3.HttpUrl; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.web.client.StandardHttpUriBuilder; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.opentest4j.AssertionFailedError; + +public class GetZendeskTest { + + private static final int HTTP_OK = 200; + private static final int HTTP_BAD_REQUEST = 400; + + private static final String DEFAULT_QUERY_START_TIMESTAMP = "1640995200"; + private static final String DEFAULT_CURSOR_VALUE = "123456789"; + private static final String EMPTY_RESPONSE = "{}"; + private static final String RESPONSE_WITH_CURSOR_FIELD_TEMPLATE = "{ \"%s\": " + DEFAULT_CURSOR_VALUE + " }"; + private static final String THREE_RECORDS = "[{\"id\":1},{\"id\":2},{\"id\":3}]"; + private static final String RESPONSE_WITH_THREE_RECORDS_TEMPLATE = "{ \"%s\": " + THREE_RECORDS + " }"; + private static final String RESPONSE_WITH_ZERO_RECORDS_TEMPLATE = "{ \"%s\": [] }"; + + private MockWebServer server; + private TestRunner testRunner; + + private static Stream supportedZendeskResourcesExportMethodCombinations() { + return Stream.of(ZendeskResource.values()) + .flatMap(zendeskResource -> zendeskResource.getSupportedExportMethods() + .stream() + .map(exportMethod -> Arguments.of(zendeskResource, exportMethod))); + } + + private static Stream unsupportedZendeskResourcesExportMethodCombinations() { + return Stream.of(ZendeskResource.values()) + .flatMap(zendeskResource -> { + List zendeskExportMethods = new ArrayList<>(asList(ZendeskExportMethod.values())); + zendeskExportMethods.removeAll(zendeskResource.getSupportedExportMethods()); + return zendeskExportMethods.stream() + .map(exportMethod -> Arguments.of(zendeskResource, exportMethod)); + }); + } + + @BeforeEach + public void init() throws IOException, InitializationException { + server = new MockWebServer(); + server.start(); + + testRunner = newTestRunner(new TestGetZendesk()); + + StandardWebClientServiceProvider standardWebClientServiceProvider = new StandardWebClientServiceProvider(); + String standardWebClientServiceProviderId = "standardWebClientServiceProvider"; + testRunner.addControllerService(standardWebClientServiceProviderId, standardWebClientServiceProvider); + testRunner.enableControllerService(standardWebClientServiceProvider); + + testRunner.setProperty(WEB_CLIENT_SERVICE_PROVIDER_NAME, standardWebClientServiceProviderId); + testRunner.setProperty(ZENDESK_SUBDOMAIN_NAME, "default-zendesk-subdomain"); + testRunner.setProperty(ZENDESK_USER_NAME, "default-zendesk-user-name"); + testRunner.setProperty(ZENDESK_AUTHENTICATION_TYPE_NAME, ZendeskAuthenticationType.PASSWORD.getValue()); + testRunner.setProperty(ZENDESK_AUTHENTICATION_CREDENTIAL_NAME, "default-zendesk-password"); + testRunner.setProperty(ZENDESK_QUERY_START_TIMESTAMP_NAME, DEFAULT_QUERY_START_TIMESTAMP); + } + + @AfterEach + void tearDown() throws IOException { + server.shutdown(); + } + + @ParameterizedTest + @MethodSource("supportedZendeskResourcesExportMethodCombinations") + public void testQueryStartTimestampIsUsedWhenNoStateIsAvailable(ZendeskResource zendeskResource, ZendeskExportMethod exportMethod) throws InterruptedException { + // given + server.enqueue(new MockResponse().setResponseCode(HTTP_OK).setBody(EMPTY_RESPONSE)); + testRunner.setProperty(ZENDESK_RESOURCE_NAME, zendeskResource.getValue()); + testRunner.setProperty(ZENDESK_EXPORT_METHOD_NAME, exportMethod.getValue()); + + // when + testRunner.run(1); + + // then + RecordedRequest request = server.takeRequest(); + assertEquals( + zendeskResource.apiPath(exportMethod) + "?" + exportMethod.getInitialCursorQueryParameterName() + "=" + DEFAULT_QUERY_START_TIMESTAMP, + request.getPath()); + } + + @ParameterizedTest + @MethodSource("supportedZendeskResourcesExportMethodCombinations") + public void testCursorFromStateIsUsedWhenStateIsAvailable(ZendeskResource zendeskResource, ZendeskExportMethod exportMethod) throws InterruptedException, IOException { + // given + server.enqueue(new MockResponse().setResponseCode(HTTP_OK).setBody(EMPTY_RESPONSE)); + testRunner.setProperty(ZENDESK_RESOURCE_NAME, zendeskResource.getValue()); + testRunner.setProperty(ZENDESK_EXPORT_METHOD_NAME, exportMethod.getValue()); + testRunner.getStateManager().setState(singletonMap(zendeskResource.getValue() + exportMethod.getValue(), DEFAULT_CURSOR_VALUE), CLUSTER); + + // when + testRunner.run(1); + + // then + RecordedRequest request = server.takeRequest(); + assertEquals( + zendeskResource.apiPath(exportMethod) + "?" + exportMethod.getCursorQueryParameterName() + "=" + DEFAULT_CURSOR_VALUE, + request.getPath()); + } + + @ParameterizedTest + @MethodSource("supportedZendeskResourcesExportMethodCombinations") + public void testCursorPositionIsStoredInState(ZendeskResource zendeskResource, ZendeskExportMethod exportMethod) throws IOException { + // given + server.enqueue(new MockResponse().setResponseCode(HTTP_OK) + .setBody(format(RESPONSE_WITH_CURSOR_FIELD_TEMPLATE, exportMethod.getCursorJsonFieldName()))); + testRunner.setProperty(ZENDESK_RESOURCE_NAME, zendeskResource.getValue()); + testRunner.setProperty(ZENDESK_EXPORT_METHOD_NAME, exportMethod.getValue()); + String stateKey = zendeskResource.getValue() + exportMethod.getValue(); + assertNull(testRunner.getStateManager().getState(CLUSTER).get(stateKey)); + + // when + testRunner.run(1); + + // then + assertEquals(DEFAULT_CURSOR_VALUE, testRunner.getStateManager().getState(CLUSTER).get(stateKey)); + } + + @ParameterizedTest + @MethodSource("supportedZendeskResourcesExportMethodCombinations") + public void testFlowFileIsCreatedAndContentIsAddedAndFlowFileAttributeIsSet(ZendeskResource zendeskResource, ZendeskExportMethod exportMethod) throws InterruptedException { + // given + server.enqueue(new MockResponse().setResponseCode(HTTP_OK) + .setBody(format(RESPONSE_WITH_THREE_RECORDS_TEMPLATE, zendeskResource.getResponseFieldName()))); + testRunner.setProperty(ZENDESK_RESOURCE_NAME, zendeskResource.getValue()); + testRunner.setProperty(ZENDESK_EXPORT_METHOD_NAME, exportMethod.getValue()); + + // when + testRunner.run(1); + + // then + List flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS_NAME); + assertEquals(1, flowFiles.size()); + MockFlowFile resultFlowFile = flowFiles.get(0); + assertEquals("3", resultFlowFile.getAttribute(RECORD_COUNT_ATTRIBUTE_NAME)); + assertEquals(THREE_RECORDS, resultFlowFile.getContent()); + } + + @ParameterizedTest + @MethodSource("supportedZendeskResourcesExportMethodCombinations") + public void testNoFlowFileIsEmittedWhenZeroRecordsAreSent(ZendeskResource zendeskResource, ZendeskExportMethod exportMethod) throws InterruptedException { + // given + server.enqueue(new MockResponse().setResponseCode(HTTP_OK) + .setBody(format(RESPONSE_WITH_ZERO_RECORDS_TEMPLATE, zendeskResource.getResponseFieldName()))); + testRunner.setProperty(ZENDESK_RESOURCE_NAME, zendeskResource.getValue()); + testRunner.setProperty(ZENDESK_EXPORT_METHOD_NAME, exportMethod.getValue()); + + // when + testRunner.run(1); + + // then + List flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS_NAME); + assertEquals(0, flowFiles.size()); + } + + @Test + public void testNoFlowFileIsEmittedWhenTooManyRequestResponseCodeReceived() { + // given + server.enqueue(new MockResponse().setResponseCode(HTTP_TOO_MANY_REQUESTS)); + testRunner.setProperty(ZENDESK_RESOURCE_NAME, TICKETS.getValue()); + testRunner.setProperty(ZENDESK_EXPORT_METHOD_NAME, CURSOR.getValue()); + + // when + testRunner.run(1); + + // then + List flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS_NAME); + assertEquals(0, flowFiles.size()); + } + + @Test + public void testNoFlowFileIsEmittedWhenNonOkHttpResponseIsSent() { + // given + server.enqueue(new MockResponse().setResponseCode(HTTP_BAD_REQUEST) + .setBody(format(RESPONSE_WITH_ZERO_RECORDS_TEMPLATE, TICKETS.getResponseFieldName()))); + testRunner.setProperty(ZENDESK_RESOURCE_NAME, TICKETS.getValue()); + testRunner.setProperty(ZENDESK_EXPORT_METHOD_NAME, CURSOR.getValue()); + + // when + testRunner.run(1); + + // then + List flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS_NAME); + assertEquals(0, flowFiles.size()); + } + + @ParameterizedTest + @MethodSource("unsupportedZendeskResourcesExportMethodCombinations") + public void testUnsupportedZendeskResourceAndExportMethodsPairsShouldFailOnValidation(ZendeskResource zendeskResource, ZendeskExportMethod exportMethod) { + // given + testRunner.setProperty(ZENDESK_RESOURCE_NAME, zendeskResource.getValue()); + testRunner.setProperty(ZENDESK_EXPORT_METHOD_NAME, exportMethod.getValue()); + + // when + then + assertThrows(AssertionFailedError.class, () -> testRunner.run(1)); + } + + class TestGetZendesk extends GetZendesk { + @Override + HttpUriBuilder uriBuilder(String subDomain, String resourcePath) { + HttpUrl url = server.url(resourcePath); + return new StandardHttpUriBuilder() + .scheme(url.scheme()) + .host(url.host()) + .port(url.port()) + .encodedPath(url.encodedPath()); + } + } +} diff --git a/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/test/java/org/apache/nifi/processors/zendesk/ZendeskAuthenticationTypeTest.java b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/test/java/org/apache/nifi/processors/zendesk/ZendeskAuthenticationTypeTest.java new file mode 100644 index 0000000000..2aedab3a60 --- /dev/null +++ b/nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/test/java/org/apache/nifi/processors/zendesk/ZendeskAuthenticationTypeTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.zendesk; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class ZendeskAuthenticationTypeTest { + + private static Stream userNameAuthenticationTypeAndExpectedUserNameArguments() { + return Stream.of( + Arguments.of("user_1", ZendeskAuthenticationType.PASSWORD, "user_1"), + Arguments.of("user_2", ZendeskAuthenticationType.TOKEN, "user_2/token") + ); + } + + @ParameterizedTest + @MethodSource("userNameAuthenticationTypeAndExpectedUserNameArguments") + public void testUserNameIsEnrichedAccordingToAuthenticationType(String userName, ZendeskAuthenticationType authenticationType, String expectedUserName) { + assertEquals(expectedUserName, authenticationType.enrichUserName(userName)); + } +} diff --git a/nifi-nar-bundles/nifi-zendesk-bundle/pom.xml b/nifi-nar-bundles/nifi-zendesk-bundle/pom.xml new file mode 100644 index 0000000000..b0be724153 --- /dev/null +++ b/nifi-nar-bundles/nifi-zendesk-bundle/pom.xml @@ -0,0 +1,33 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 1.18.0-SNAPSHOT + + + nifi-zendesk-bundle + pom + + + nifi-zendesk-processors + nifi-zendesk-nar + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index d690a1a781..2760930c20 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -110,6 +110,7 @@ nifi-snowflake-bundle nifi-salesforce-bundle nifi-rocksdb-bundle + nifi-zendesk-bundle nifi-hubspot-bundle nifi-dropbox-bundle nifi-airtable-bundle