diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 0baa9445d5..09da408a2e 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -706,6 +706,12 @@ language governing permissions and limitations under the License. --> 1.18.0-SNAPSHOT nar + + org.apache.nifi + nifi-workday-processors-nar + 1.18.0-SNAPSHOT + nar + org.apache.nifi nifi-tcp-nar diff --git a/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors-nar/pom.xml b/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors-nar/pom.xml new file mode 100644 index 0000000000..054386f1c7 --- /dev/null +++ b/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors-nar/pom.xml @@ -0,0 +1,49 @@ + + + + + + nifi-workday-bundle + org.apache.nifi + 1.18.0-SNAPSHOT + + 4.0.0 + + nifi-workday-processors-nar + nar + + true + true + + + + + org.apache.nifi + nifi-standard-services-api-nar + 1.18.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-workday-processors + 1.18.0-SNAPSHOT + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..5a5a013b59 --- /dev/null +++ b/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,286 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + + The binary distribution of this product bundles 'Antlr 3' which is available + under a "3-clause BSD" license. For details see http://www.antlr3.org/license.html + + Copyright (c) 2010 Terence Parr + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + Neither the name of the author nor the names of its contributors may be used + to endorse or promote products derived from this software without specific + prior written permission. + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + THE POSSIBILITY OF SUCH DAMAGE. + + The binary distribution of this product bundles 'Bouncy Castle PKIX, CMS, EAC, TSP, PKCS, OCSP, CMP, and CRMF APIs' + under an MIT style license. + + Copyright (c) 2000 - 2016 The Legion of the Bouncy Castle Inc. (https://www.bouncycastle.org) + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. + + The binary distribution of this product bundles Checker Framework + under an MIT style license. + + Copyright 2004-present by the Checker Framework developers + + MIT License: + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/pom.xml b/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/pom.xml new file mode 100644 index 0000000000..e39d08ea85 --- /dev/null +++ b/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/pom.xml @@ -0,0 +1,104 @@ + + + + + + org.apache.nifi + nifi-workday-bundle + 1.18.0-SNAPSHOT + + 4.0.0 + + nifi-workday-processors + jar + + + + org.apache.nifi + nifi-api + provided + + + org.apache.nifi + nifi-web-client-provider-api + 1.18.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-utils + 1.18.0-SNAPSHOT + + + org.apache.nifi + nifi-record + 1.18.0-SNAPSHOT + + + org.apache.nifi + nifi-record-serialization-service-api + 1.18.0-SNAPSHOT + + + + org.apache.nifi + nifi-mock + 1.18.0-SNAPSHOT + test + + + com.squareup.okhttp3 + mockwebserver + test + + + + org.apache.nifi + nifi-web-client-provider-service + 1.18.0-SNAPSHOT + test + + + org.apache.nifi + nifi-ssl-context-service-api + 1.18.0-SNAPSHOT + test + + + org.apache.nifi + nifi-proxy-configuration-api + 1.18.0-SNAPSHOT + test + + + org.apache.nifi + nifi-record-serialization-services + 1.18.0-SNAPSHOT + test + + + org.apache.nifi + nifi-schema-registry-service-api + 1.18.0-SNAPSHOT + test + + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/src/main/java/org/apache/nifi/processors/workday/GetWorkdayReport.java b/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/src/main/java/org/apache/nifi/processors/workday/GetWorkdayReport.java new file mode 100644 index 0000000000..9fe7ee1355 --- /dev/null +++ b/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/src/main/java/org/apache/nifi/processors/workday/GetWorkdayReport.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.workday; + +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.URL_VALIDATOR; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.WebClientService; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +@Tags({"Workday", "report"}) +@InputRequirement(Requirement.INPUT_ALLOWED) +@CapabilityDescription("A processor which can interact with a configurable Workday Report. The processor can forward the content without modification, or you can transform it by" + + " providing the specific Record Reader and Record Writer services based on your needs. You can also remove fields by defining schema in the Record Writer. " + + "Supported Workday report formats are: csv, simplexml, json") +@SideEffectFree +@SupportsBatching +@WritesAttributes({ + @WritesAttribute(attribute = GetWorkdayReport.GET_WORKDAY_REPORT_JAVA_EXCEPTION_CLASS, description = "The Java exception class raised when the processor fails"), + @WritesAttribute(attribute = GetWorkdayReport.GET_WORKDAY_REPORT_JAVA_EXCEPTION_MESSAGE, description = "The Java exception message raised when the processor fails"), + @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Source / Record Writer"), + @WritesAttribute(attribute= GetWorkdayReport.RECORD_COUNT, description = "The number of records in an outgoing FlowFile. This is only populated on the 'success' relationship " + + "when Record Reader and Writer is set.")}) +public class GetWorkdayReport extends AbstractProcessor { + + protected static final String STATUS_CODE = "getworkdayreport.status.code"; + protected static final String REQUEST_URL = "getworkdayreport.request.url"; + protected static final String REQUEST_DURATION = "getworkdayreport.request.duration"; + protected static final String TRANSACTION_ID = "getworkdayreport.tx.id"; + protected static final String GET_WORKDAY_REPORT_JAVA_EXCEPTION_CLASS = "getworkdayreport.java.exception.class"; + protected static final String GET_WORKDAY_REPORT_JAVA_EXCEPTION_MESSAGE = "getworkdayreport.java.exception.message"; + protected static final String RECORD_COUNT = "record.count"; + protected static final String BASIC_PREFIX = "Basic "; + protected static final String HEADER_AUTHORIZATION = "Authorization"; + protected static final String HEADER_CONTENT_TYPE = "Content-Type"; + protected static final String USERNAME_PASSWORD_SEPARATOR = ":"; + + protected static final PropertyDescriptor REPORT_URL = new PropertyDescriptor.Builder() + .name("Workday Report URL") + .displayName("Workday Report URL") + .description("HTTP remote URL of Workday report including a scheme of http or https, as well as a hostname or IP address with optional port and path elements.") + .required(true) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .addValidator(URL_VALIDATOR) + .build(); + + protected static final PropertyDescriptor WORKDAY_USERNAME = new PropertyDescriptor.Builder() + .name("Workday Username") + .displayName("Workday Username") + .description("The username provided for authentication of Workday requests. Encoded using Base64 for HTTP Basic Authentication as described in RFC 7617.") + .required(true) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$"))) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .build(); + + protected static final PropertyDescriptor WORKDAY_PASSWORD = new PropertyDescriptor.Builder() + .name("Workday Password") + .displayName("Workday Password") + .description("The password provided for authentication of Workday requests. Encoded using Base64 for HTTP Basic Authentication as described in RFC 7617.") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$"))) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .build(); + + protected static final PropertyDescriptor WEB_CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("Web Client Service Provider") + .description("Web client which is used to communicate with the Workday API.") + .required(true) + .identifiesControllerService(WebClientServiceProvider.class) + .build(); + + protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") + .identifiesControllerService(RecordReaderFactory.class) + .required(false) + .build(); + + protected static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("The Record Writer to use for serializing Records to an output FlowFile.") + .identifiesControllerService(RecordSetWriterFactory.class) + .dependsOn(RECORD_READER_FACTORY) + .required(true) + .build(); + + protected static final Relationship ORIGINAL = new Relationship.Builder() + .name("original") + .description("Request FlowFiles transferred when receiving HTTP responses with a status code between 200 and 299.") + .autoTerminateDefault(true) + .build(); + + protected static final Relationship FAILURE = new Relationship.Builder() + .name("failure") + .description("Request FlowFiles transferred when receiving socket communication errors.") + .build(); + + protected static final Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("Response FlowFiles transferred when receiving HTTP responses with a status code between 200 and 299.") + .build(); + + protected static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ORIGINAL, SUCCESS, FAILURE))); + protected static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + REPORT_URL, + WORKDAY_USERNAME, + WORKDAY_PASSWORD, + WEB_CLIENT_SERVICE, + RECORD_READER_FACTORY, + RECORD_WRITER_FACTORY + )); + + private final AtomicReference webClientReference = new AtomicReference<>(); + private final AtomicReference recordReaderFactoryReference = new AtomicReference<>(); + private final AtomicReference recordSetWriterFactoryReference = new AtomicReference<>(); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @OnScheduled + public void setUpClient(final ProcessContext context) { + WebClientServiceProvider standardWebClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE).asControllerService(WebClientServiceProvider.class); + RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class); + RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class); + WebClientService webClientService = standardWebClientServiceProvider.getWebClientService(); + webClientReference.set(webClientService); + recordReaderFactoryReference.set(recordReaderFactory); + recordSetWriterFactoryReference.set(recordSetWriterFactory); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + + if (skipExecution(context, flowFile)) { + return; + } + + FlowFile responseFlowFile = null; + + try { + WebClientService webClientService = webClientReference.get(); + URI uri = new URI(context.getProperty(REPORT_URL).evaluateAttributeExpressions(flowFile).getValue().trim()); + long startNanos = System.nanoTime(); + String authorization = createAuthorizationHeader(context, flowFile); + + try(HttpResponseEntity httpResponseEntity = webClientService.get() + .uri(uri) + .header(HEADER_AUTHORIZATION, authorization) + .retrieve()) { + responseFlowFile = createResponseFlowFile(flowFile, session, httpResponseEntity); + long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + Map commonAttributes = createCommonAttributes(uri, httpResponseEntity, elapsedTime); + + if (flowFile != null) { + flowFile = session.putAllAttributes(flowFile, setMimeType(commonAttributes, httpResponseEntity)); + } + if (responseFlowFile != null) { + responseFlowFile = session.putAllAttributes(responseFlowFile, commonAttributes); + if (flowFile == null) { + session.getProvenanceReporter().receive(responseFlowFile, uri.toString(), elapsedTime); + } else { + session.getProvenanceReporter().fetch(responseFlowFile, uri.toString(), elapsedTime); + } + } + + route(flowFile, responseFlowFile, session, context, httpResponseEntity.statusCode()); + } + } catch (Exception e) { + if (flowFile == null) { + getLogger().error("Request Processing failed", e); + context.yield(); + } else { + getLogger().error("Request Processing failed: {}", flowFile, e); + session.penalize(flowFile); + flowFile = session.putAttribute(flowFile, GET_WORKDAY_REPORT_JAVA_EXCEPTION_CLASS, e.getClass().getSimpleName()); + flowFile = session.putAttribute(flowFile, GET_WORKDAY_REPORT_JAVA_EXCEPTION_MESSAGE, e.getMessage()); + session.transfer(flowFile, FAILURE); + } + + if (responseFlowFile != null) { + session.remove(responseFlowFile); + } + } + } + + /* + * If we have no FlowFile, and all incoming connections are self-loops then we can continue on. + * However, if we have no FlowFile and we have connections coming from other Processors, then + * we know that we should run only if we have a FlowFile. + */ + private boolean skipExecution(ProcessContext context, FlowFile flowfile) { + return context.hasIncomingConnection() && flowfile == null && context.hasNonLoopConnection(); + } + + private FlowFile createResponseFlowFile(FlowFile flowfile, ProcessSession session, HttpResponseEntity httpResponseEntity) + throws IOException, SchemaNotFoundException, MalformedRecordException { + FlowFile responseFlowFile = null; + try { + if (isSuccess(httpResponseEntity.statusCode())) { + responseFlowFile = flowfile == null ? session.create() : session.create(flowfile); + InputStream responseBodyStream = httpResponseEntity.body(); + if (recordReaderFactoryReference.get() != null) { + TransformResult transformResult = transformRecords(session, flowfile, responseFlowFile, responseBodyStream); + Map attributes = new HashMap<>(); + attributes.put(RECORD_COUNT, String.valueOf(transformResult.getNumberOfRecords())); + attributes.put(CoreAttributes.MIME_TYPE.key(), transformResult.getMimeType()); + responseFlowFile = session.putAllAttributes(responseFlowFile, attributes); + } else { + responseFlowFile = session.importFrom(responseBodyStream, responseFlowFile); + Optional mimeType = httpResponseEntity.headers().getFirstHeader(HEADER_CONTENT_TYPE); + if (mimeType.isPresent()) { + responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), mimeType.get()); + } + } + } + } catch (Exception e) { + session.remove(responseFlowFile); + throw e; + } + return responseFlowFile; + } + + private String createAuthorizationHeader(ProcessContext context, FlowFile flowfile) { + String userName = context.getProperty(WORKDAY_USERNAME).evaluateAttributeExpressions(flowfile).getValue(); + String password = context.getProperty(WORKDAY_PASSWORD).evaluateAttributeExpressions(flowfile).getValue(); + String base64Credential = Base64.getEncoder().encodeToString((userName + USERNAME_PASSWORD_SEPARATOR + password).getBytes(StandardCharsets.UTF_8)); + return BASIC_PREFIX + base64Credential; + } + + private TransformResult transformRecords(ProcessSession session, FlowFile flowfile, FlowFile responseFlowFile, InputStream responseBodyStream) + throws IOException, SchemaNotFoundException, MalformedRecordException { + int numberOfRecords = 0; + String mimeType; + try (RecordReader reader = recordReaderFactoryReference.get().createRecordReader(flowfile, new BufferedInputStream(responseBodyStream), getLogger())) { + RecordSchema schema = recordSetWriterFactoryReference.get().getSchema(flowfile == null ? Collections.emptyMap() : flowfile.getAttributes(), reader.getSchema()); + try (OutputStream responseStream = session.write(responseFlowFile); + RecordSetWriter recordSetWriter = recordSetWriterFactoryReference.get().createWriter(getLogger(), schema, responseStream, responseFlowFile)) { + mimeType = recordSetWriter.getMimeType(); + recordSetWriter.beginRecordSet(); + Record currentRecord; + // as the report can be changed independently from the flow, it's safer to ignore field types and unknown fields in the Record Reading process + while ((currentRecord = reader.nextRecord(false, true)) != null) { + recordSetWriter.write(currentRecord); + numberOfRecords++; + } + } + } + return new TransformResult(numberOfRecords, mimeType); + } + + private void route(FlowFile request, FlowFile response, ProcessSession session, ProcessContext context, int statusCode) { + if (!isSuccess(statusCode) && request == null) { + context.yield(); + } + + if (isSuccess(statusCode)) { + if (request != null) { + session.transfer(request, ORIGINAL); + } + if (response != null) { + session.transfer(response, SUCCESS); + } + } else { + if (request != null) { + session.transfer(request, FAILURE); + } + } + } + + private boolean isSuccess(int statusCode) { + return statusCode >= 200 && statusCode < 300; + } + + private Map createCommonAttributes(URI uri, HttpResponseEntity httpResponseEntity, long elapsedTime) { + Map attributes = new HashMap<>(); + attributes.put(STATUS_CODE, String.valueOf(httpResponseEntity.statusCode())); + attributes.put(REQUEST_URL, uri.toString()); + attributes.put(REQUEST_DURATION, Long.toString(elapsedTime)); + attributes.put(TRANSACTION_ID, UUID.randomUUID().toString()); + return attributes; + } + + private Map setMimeType(Map commonAttributes, HttpResponseEntity httpResponseEntity) { + Map attributes = commonAttributes; + Optional contentType = httpResponseEntity.headers().getFirstHeader(HEADER_CONTENT_TYPE); + if (contentType.isPresent()) { + attributes = new HashMap<>(commonAttributes); + attributes.put(CoreAttributes.MIME_TYPE.key(), contentType.get()); + } + return attributes; + } + + private static class TransformResult { + private final int numberOfRecords; + private final String mimeType; + + private TransformResult(int numberOfRecords, String mimeType) { + this.numberOfRecords = numberOfRecords; + this.mimeType = mimeType; + } + + private int getNumberOfRecords() { + return numberOfRecords; + } + + private String getMimeType() { + return mimeType; + } + } +} diff --git a/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..d7ea802fbe --- /dev/null +++ b/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.processors.workday.GetWorkdayReport \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/src/main/resources/docs/org.apache.nifi.processors.workday.GetWorkdayReport/additionalDetails.html b/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/src/main/resources/docs/org.apache.nifi.processors.workday.GetWorkdayReport/additionalDetails.html new file mode 100644 index 0000000000..4e79cb0420 --- /dev/null +++ b/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/src/main/resources/docs/org.apache.nifi.processors.workday.GetWorkdayReport/additionalDetails.html @@ -0,0 +1,90 @@ + + + + + + GetWorkdayReport + + + + +

Summary

+

+ This processor acts as a client endpoint to interact with the Workday API. + It is capable of reading reports from Workday RaaS and transferring the content directly to the output, or you can define + the required Record Reader and RecordSet Writer, so you can transform the report to the required format. +

+ +

Supported report formats

+ +
    +
  • csv
  • +
  • simplexml
  • +
  • json
  • +
+ +

+ In case of json source you need to set the following parameters in the JsonTreeReader: +

    +
  • Starting Field Strategy: Nested Field
  • +
  • Starting Field Name: Report_Entry
  • +
+

+ +

+ It is possible to hide specific columns from the response if you define the Writer scheme explicitly in the configuration of the RecordSet Writer. +

+ +

+ Example: Remove name2 column from the response +

+

+ Let's say we have the following record structure: +

+ +
+                RecordSet (
+                  Record (
+                    Field "name1" = "value1",
+                    Field "name2" = 42
+                  ),
+                  Record (
+                    Field "name1" = "value2",
+                    Field "name2" = 84
+                  )
+                )
+            
+
+ +

+ If you would like to remove the "name2" column from the response, then you need to define the following writer schema: +

+ + +
+                {
+                  "name": "test",
+                  "namespace": "nifi",
+                  "type": "record",
+                  "fields": [
+                    { "name": "name1", "type": "string" }
+                ]
+                }
+            
+
+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/src/test/java/org/apache/nifi/processors/workday/GetWorkdayReportTest.java b/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/src/test/java/org/apache/nifi/processors/workday/GetWorkdayReportTest.java new file mode 100644 index 0000000000..6324d3fa40 --- /dev/null +++ b/nifi-nar-bundles/nifi-workday-bundle/nifi-workday-processors/src/test/java/org/apache/nifi/processors/workday/GetWorkdayReportTest.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.workday; + +import static org.apache.nifi.processors.workday.GetWorkdayReport.FAILURE; +import static org.apache.nifi.processors.workday.GetWorkdayReport.GET_WORKDAY_REPORT_JAVA_EXCEPTION_CLASS; +import static org.apache.nifi.processors.workday.GetWorkdayReport.GET_WORKDAY_REPORT_JAVA_EXCEPTION_MESSAGE; +import static org.apache.nifi.processors.workday.GetWorkdayReport.HEADER_AUTHORIZATION; +import static org.apache.nifi.processors.workday.GetWorkdayReport.ORIGINAL; +import static org.apache.nifi.processors.workday.GetWorkdayReport.RECORD_COUNT; +import static org.apache.nifi.processors.workday.GetWorkdayReport.RECORD_READER_FACTORY; +import static org.apache.nifi.processors.workday.GetWorkdayReport.RECORD_WRITER_FACTORY; +import static org.apache.nifi.processors.workday.GetWorkdayReport.STATUS_CODE; +import static org.apache.nifi.processors.workday.GetWorkdayReport.SUCCESS; +import static org.apache.nifi.processors.workday.GetWorkdayReport.WEB_CLIENT_SERVICE; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.apache.nifi.csv.CSVRecordSetWriter; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.json.JsonTreeReader; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; +import org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class GetWorkdayReportTest { + + private static final String LOCALHOST = "localhost"; + private static final String REPORT_URL = "http://" + LOCALHOST; + private static final String INVALID_URL = "invalid"; + private static final String INVALID_URL_PARAM = ":invalid_url"; + private static final String APPLICATION_JSON = "application/json"; + private static final String OK_STATUS_CODE = "200"; + private static final String CONTENT_TYPE = "Content-Type"; + private static final String TEXT_CSV = "text/csv"; + private static final String USER_NAME = "userName"; + private static final String PASSWORD = "password"; + + private TestRunner runner; + private MockWebServer mockWebServer; + + @BeforeEach + public void setRunner() { + runner = TestRunners.newTestRunner(new GetWorkdayReport()); + mockWebServer = new MockWebServer(); + } + + @AfterEach + public void shutdownServer() throws IOException { + mockWebServer.shutdown(); + } + + @Test + public void testNotValidWithoutReportUrlProperty() throws InitializationException { + withWebClientService(); + runner.setProperty(GetWorkdayReport.WORKDAY_USERNAME, USER_NAME); + runner.setProperty(GetWorkdayReport.WORKDAY_PASSWORD, PASSWORD); + + runner.assertNotValid(); + } + + @Test + public void testNotValidWithInvalidReportUrlProperty() throws InitializationException { + withWebClientService(); + runner.setProperty(GetWorkdayReport.WORKDAY_USERNAME, USER_NAME); + runner.setProperty(GetWorkdayReport.WORKDAY_PASSWORD, PASSWORD); + runner.setProperty(GetWorkdayReport.REPORT_URL, INVALID_URL); + runner.assertNotValid(); + } + + @Test + public void testNotValidWithoutUserName() throws InitializationException { + withWebClientService(); + runner.setProperty(GetWorkdayReport.WORKDAY_PASSWORD, PASSWORD); + runner.setProperty(GetWorkdayReport.REPORT_URL, REPORT_URL); + + runner.assertNotValid(); + } + + @Test + public void testNotValidWithoutPassword() throws InitializationException { + withWebClientService(); + runner.setProperty(GetWorkdayReport.WORKDAY_USERNAME, USER_NAME); + runner.setProperty(GetWorkdayReport.REPORT_URL, REPORT_URL); + + runner.assertNotValid(); + } + + @Test + public void testRunIncomingConnectionsWithNonLoopConnections() throws InitializationException { + withWebClientService(); + runner.setProperty(GetWorkdayReport.WORKDAY_USERNAME, USER_NAME); + runner.setProperty(GetWorkdayReport.WORKDAY_PASSWORD, PASSWORD); + runner.setProperty(GetWorkdayReport.REPORT_URL, REPORT_URL); + runner.setIncomingConnection(true); + runner.setNonLoopConnection(true); + + runner.run(); + runner.assertQueueEmpty(); + } + + @Test + public void testRunThrowsURISyntaxExceptionFailure() throws InitializationException { + withWebClientService(); + runner.setProperty(GetWorkdayReport.WORKDAY_USERNAME, USER_NAME); + runner.setProperty(GetWorkdayReport.WORKDAY_PASSWORD, PASSWORD); + + String urlAttributeKey = "request.url"; + runner.setProperty(GetWorkdayReport.REPORT_URL, String.format("${%s}", urlAttributeKey)); + + Map attributes = new HashMap<>(); + attributes.put(urlAttributeKey, INVALID_URL_PARAM); + + runner.enqueue("", attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(FAILURE); + runner.assertPenalizeCount(1); + + MockFlowFile flowFile = getFlowFile(FAILURE); + flowFile.assertAttributeEquals(GET_WORKDAY_REPORT_JAVA_EXCEPTION_CLASS, URISyntaxException.class.getSimpleName()); + flowFile.assertAttributeExists(GET_WORKDAY_REPORT_JAVA_EXCEPTION_MESSAGE); + } + + @Test + void testContextYieldIfHttpStatusIsNot2xxAndThereIsNoIncomingConnection() throws InitializationException { + runner.setIncomingConnection(false); + runner.setProperty(GetWorkdayReport.WORKDAY_USERNAME, USER_NAME); + runner.setProperty(GetWorkdayReport.WORKDAY_PASSWORD, PASSWORD); + withWebClientService(); + runner.setProperty(GetWorkdayReport.REPORT_URL, getMockWebServerUrl()); + + mockWebServer.enqueue(new MockResponse().setResponseCode(500)); + + runner.run(); + + assertTrue(((MockProcessContext) runner.getProcessContext()).isYieldCalled()); + runner.assertTransferCount(ORIGINAL, 0); + runner.assertTransferCount(SUCCESS, 0); + runner.assertTransferCount(FAILURE, 0); + } + + @Test + void testContextYieldAndForwardFlowFileToFailureIfHttpStatusIsNot2xxAndThereIsIncomingConnection() throws InitializationException { + runner.setIncomingConnection(true); + runner.setProperty(GetWorkdayReport.WORKDAY_USERNAME, USER_NAME); + runner.setProperty(GetWorkdayReport.WORKDAY_PASSWORD, PASSWORD); + withWebClientService(); + runner.setProperty(GetWorkdayReport.REPORT_URL, getMockWebServerUrl()); + + mockWebServer.enqueue(new MockResponse().setResponseCode(500)); + + runner.enqueue("test"); + runner.run(); + + assertFalse(((MockProcessContext) runner.getProcessContext()).isYieldCalled()); + runner.assertTransferCount(ORIGINAL, 0); + runner.assertTransferCount(SUCCESS, 0); + runner.assertTransferCount(FAILURE, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FAILURE).iterator().next(); + flowFile.assertAttributeEquals("getworkdayreport.status.code", "500"); + } + + @Test + void testYieldShouldBeCalledWhenExceptionHappensAndThereIsNoRequestFlowFile() throws InitializationException { + runner.setIncomingConnection(false); + runner.setProperty(GetWorkdayReport.WORKDAY_USERNAME, USER_NAME); + runner.setProperty(GetWorkdayReport.WORKDAY_PASSWORD, PASSWORD); + withWebClientService(); + String urlAttributeKey = "request.url"; + runner.setProperty(GetWorkdayReport.REPORT_URL, String.format("${%s}", urlAttributeKey)); + + runner.run(); + + assertTrue(((MockProcessContext) runner.getProcessContext()).isYieldCalled()); + runner.assertTransferCount(ORIGINAL, 0); + runner.assertTransferCount(SUCCESS, 0); + runner.assertTransferCount(FAILURE, 0); + } + + @Test + void testPassThroughContentWithoutModificationIfNoRecordReaderAndWriterDefined() throws InitializationException { + withWebClientService(); + runner.setProperty(GetWorkdayReport.WORKDAY_USERNAME, USER_NAME); + runner.setProperty(GetWorkdayReport.WORKDAY_PASSWORD, PASSWORD); + runner.setIncomingConnection(false); + runner.setProperty(GetWorkdayReport.REPORT_URL, getMockWebServerUrl()); + + String content = "id,name\n1,2"; + mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(content).setHeader(CONTENT_TYPE, TEXT_CSV)); + + runner.run(); + + assertFalse(((MockProcessContext) runner.getProcessContext()).isYieldCalled()); + runner.assertTransferCount(ORIGINAL, 0); + runner.assertTransferCount(SUCCESS, 1); + runner.assertTransferCount(FAILURE, 0); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(SUCCESS).iterator().next(); + flowFile.assertAttributeEquals(STATUS_CODE, OK_STATUS_CODE); + flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), TEXT_CSV); + flowFile.assertAttributeNotExists(RECORD_COUNT); + flowFile.assertContentEquals(content); + } + + @Test + void testRequestFlowFileIsTransferredToOriginalRelationship() throws InitializationException { + withWebClientService(); + runner.setProperty(GetWorkdayReport.WORKDAY_USERNAME, USER_NAME); + runner.setProperty(GetWorkdayReport.WORKDAY_PASSWORD, PASSWORD); + runner.setIncomingConnection(true); + runner.setProperty(GetWorkdayReport.REPORT_URL, getMockWebServerUrl()); + + String content = "id,name\n1,2"; + mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(content).setHeader(CONTENT_TYPE, TEXT_CSV)); + runner.enqueue(""); + + runner.run(); + + assertFalse(((MockProcessContext) runner.getProcessContext()).isYieldCalled()); + runner.assertTransferCount(ORIGINAL, 1); + runner.assertTransferCount(SUCCESS, 1); + runner.assertTransferCount(FAILURE, 0); + + MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(ORIGINAL).iterator().next(); + MockFlowFile responseFlowFile = runner.getFlowFilesForRelationship(SUCCESS).iterator().next(); + originalFlowFile.assertAttributeEquals(STATUS_CODE, OK_STATUS_CODE); + originalFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), TEXT_CSV); + responseFlowFile.assertAttributeEquals(STATUS_CODE, OK_STATUS_CODE); + responseFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), TEXT_CSV); + responseFlowFile.assertAttributeNotExists(RECORD_COUNT); + responseFlowFile.assertContentEquals(content); + } + + @Test + void testContentIsTransformedIfRecordReaderAndWriterIsDefined() throws InitializationException { + withWebClientService(); + withJsonRecordReader(); + withCsvRecordSetWriter(); + runner.setProperty(GetWorkdayReport.WORKDAY_USERNAME, USER_NAME); + runner.setProperty(GetWorkdayReport.WORKDAY_PASSWORD, PASSWORD); + runner.setIncomingConnection(false); + runner.setProperty(GetWorkdayReport.REPORT_URL, getMockWebServerUrl()); + + String jsonContent = "{\"id\": 1, \"name\": \"test\"}"; + String csvContent = "id,name\n1,test\n"; + mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(jsonContent).setHeader(CONTENT_TYPE, APPLICATION_JSON)); + + runner.run(); + + assertFalse(((MockProcessContext) runner.getProcessContext()).isYieldCalled()); + runner.assertTransferCount(ORIGINAL, 0); + runner.assertTransferCount(SUCCESS, 1); + runner.assertTransferCount(FAILURE, 0); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(SUCCESS).iterator().next(); + flowFile.assertAttributeEquals(STATUS_CODE, OK_STATUS_CODE); + flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), TEXT_CSV); + + flowFile.assertAttributeEquals(RECORD_COUNT, "1"); + flowFile.assertContentEquals(csvContent); + } + + @Test + void testBasicAuthentication() throws InitializationException, InterruptedException { + runner.setIncomingConnection(false); + runner.setProperty(GetWorkdayReport.WORKDAY_USERNAME, USER_NAME); + runner.setProperty(GetWorkdayReport.WORKDAY_PASSWORD, PASSWORD); + withWebClientService(); + runner.setProperty(GetWorkdayReport.REPORT_URL, getMockWebServerUrl()); + + mockWebServer.enqueue(new MockResponse().setResponseCode(200).setHeader(CONTENT_TYPE, APPLICATION_JSON)); + + runner.run(); + + RecordedRequest recordedRequest = mockWebServer.takeRequest(1, TimeUnit.SECONDS); + String authorization = recordedRequest.getHeader(HEADER_AUTHORIZATION); + assertNotNull(authorization, "Authorization Header not found"); + + Pattern basicAuthPattern = Pattern.compile("^Basic \\S+$"); + assertTrue(basicAuthPattern.matcher(authorization).matches(), "Basic Authentication not matched"); + } + + private String getMockWebServerUrl() { + return mockWebServer.url("workdayReport").newBuilder().host(LOCALHOST).build().toString(); + } + + private MockFlowFile getFlowFile(Relationship relationship) { + return runner.getFlowFilesForRelationship(relationship).iterator().next(); + } + + private void withMockRecordReaderFactory() throws InitializationException { + String serviceIdentifier = RecordReaderFactory.class.getName(); + RecordReaderFactory recordReaderFactory = mock(RecordReaderFactory.class); + when(recordReaderFactory.getIdentifier()).thenReturn(serviceIdentifier); + + runner.addControllerService(serviceIdentifier, recordReaderFactory); + runner.enableControllerService(recordReaderFactory); + runner.setProperty(RECORD_READER_FACTORY, serviceIdentifier); + } + + private void withMockRecordSetWriterFactory() throws InitializationException { + String serviceIdentifier = RecordSetWriterFactory.class.getName(); + RecordSetWriterFactory recordSetWriterFactory = mock(RecordSetWriterFactory.class); + when(recordSetWriterFactory.getIdentifier()).thenReturn(serviceIdentifier); + + runner.addControllerService(serviceIdentifier, recordSetWriterFactory); + runner.enableControllerService(recordSetWriterFactory); + runner.setProperty(RECORD_WRITER_FACTORY, serviceIdentifier); + } + + private void withWebClientService() throws InitializationException { + String serviceIdentifier = StandardWebClientServiceProvider.class.getName(); + WebClientServiceProvider webClientServiceProvider = new StandardWebClientServiceProvider(); + + runner.addControllerService(serviceIdentifier, webClientServiceProvider); + runner.enableControllerService(webClientServiceProvider); + runner.setProperty(WEB_CLIENT_SERVICE, serviceIdentifier); + } + + private void withJsonRecordReader() throws InitializationException { + String serviceIdentifier = JsonTreeReader.class.getName(); + + RecordReaderFactory recordReaderFactory = new JsonTreeReader(); + + runner.addControllerService(serviceIdentifier, recordReaderFactory); + + runner.enableControllerService(recordReaderFactory); + runner.setProperty(RECORD_READER_FACTORY, serviceIdentifier); + } + + private void withCsvRecordSetWriter() throws InitializationException { + String serviceIdentifier = RecordSetWriterFactory.class.getName(); + + RecordSetWriterFactory recordSetWriterFactory = new CSVRecordSetWriter(); + + runner.addControllerService(serviceIdentifier, recordSetWriterFactory); + + runner.enableControllerService(recordSetWriterFactory); + runner.setProperty(RECORD_WRITER_FACTORY, serviceIdentifier); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-workday-bundle/pom.xml b/nifi-nar-bundles/nifi-workday-bundle/pom.xml new file mode 100644 index 0000000000..fe193cfe33 --- /dev/null +++ b/nifi-nar-bundles/nifi-workday-bundle/pom.xml @@ -0,0 +1,36 @@ + + + + + 4.0.0 + + nifi-nar-bundles + org.apache.nifi + 1.18.0-SNAPSHOT + + + nifi-workday-bundle + pom + + + nifi-workday-processors + nifi-workday-processors-nar + + \ No newline at end of file diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 37bc3640aa..41cfc449b4 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -113,6 +113,7 @@ nifi-hubspot-bundle nifi-dropbox-bundle nifi-airtable-bundle + nifi-workday-bundle