diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 0daf8209d0..abcbd91366 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -606,12 +606,6 @@ language governing permissions and limitations under the License. --> 2.0.0-SNAPSHOT nar - - org.apache.nifi - nifi-spring-nar - 2.0.0-SNAPSHOT - nar - org.apache.nifi nifi-registry-nar diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-nar/pom.xml b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-nar/pom.xml deleted file mode 100644 index faf3f76afa..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-nar/pom.xml +++ /dev/null @@ -1,35 +0,0 @@ - - - 4.0.0 - - org.apache.nifi - nifi-spring-bundle - 2.0.0-SNAPSHOT - - nifi-spring-nar - nar - NiFi NAR for interacting with Spring - - true - true - - - - org.apache.nifi - nifi-spring-processors - - - diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-nar/src/main/resources/META-INF/LICENSE deleted file mode 100644 index f3c8ecef26..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-nar/src/main/resources/META-INF/LICENSE +++ /dev/null @@ -1,209 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - -APACHE NIFI SUBCOMPONENTS: - -The Apache NiFi project contains subcomponents with separate copyright -notices and license terms. Your use of the source code for the these -subcomponents is subject to the terms and conditions of the following -licenses. diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-nar/src/main/resources/META-INF/NOTICE deleted file mode 100644 index 785f07accc..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-nar/src/main/resources/META-INF/NOTICE +++ /dev/null @@ -1,19 +0,0 @@ -nifi-spring-nar -Copyright 2014-2023 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -****************** -Apache Software License v2 -****************** - -The following binary components are provided under the Apache Software License v2 - - (ASLv2) Apache Commons Lang - The following NOTICE information applies: - Apache Commons Lang - Copyright 2001-2014 The Apache Software Foundation - - This product includes software from the Spring Framework, - under the Apache License 2.0 (see: StringUtils.containsWhitespace()) diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/pom.xml b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/pom.xml deleted file mode 100644 index 9f92c1bed3..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/pom.xml +++ /dev/null @@ -1,70 +0,0 @@ - - - - org.apache.nifi - nifi-spring-bundle - 2.0.0-SNAPSHOT - - 4.0.0 - nifi-spring-processors - jar - - 5.5.18 - - - - - org.springframework - spring-messaging - provided - - - org.springframework - spring-context - - - org.apache.commons - commons-lang3 - - - commons-io - commons-io - - - org.apache.nifi - nifi-api - - - org.apache.nifi - nifi-utils - 2.0.0-SNAPSHOT - - - org.apache.nifi - nifi-mock - 2.0.0-SNAPSHOT - test - - - org.springframework.integration - spring-integration-core - ${spring.integration.version} - test - - - org.springframework.integration - spring-integration-event - ${spring.integration.version} - test - - - diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextFactory.java b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextFactory.java deleted file mode 100644 index b1a474d0aa..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextFactory.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.spring; - -import java.io.File; -import java.io.InputStream; -import java.lang.reflect.Constructor; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Helper class which provides factory method to create and initialize Spring - * Application Context while scoping it within the dedicated Class Loader. - */ -final class SpringContextFactory { - - private static final Logger logger = LoggerFactory.getLogger(SpringContextFactory.class); - - private static final String SC_DELEGATE_NAME = "org.apache.nifi.spring.bootstrap.SpringContextDelegate"; - - /** - * Creates and instance of Spring Application Context scoped within a - * dedicated Class Loader. - *
- * The core task of this factory is to load delegate that supports message - * exchange with Spring Application Context ({@link SpringDataExchanger}) - * using the same class loader used to load the Application Context. Such - * class loader isolation is required to ensure that multiple instances of - * Application Context (representing different applications) and the - * corresponding delegate can exist per single instance of Spring NAR. - *
- * The mechanism used here is relatively simple. While - * {@link SpringDataExchanger} is available to the current class loader and - * would normally be loaded once per instance of NAR, the below factory - * method first obtains class bytes for {@link SpringDataExchanger} and then - * loads it from these bytes via ClassLoader.defineClass(..) method, thus - * ensuring that multiple instances of {@link SpringDataExchanger} class can - * exist and everything that is loaded within its scope is using its class - * loader. Upon exit, the class loader is destroyed via close method - * essentially with everything that it loaded. - *
- * Also, during the initialization of {@link SpringDataExchanger} the new - * class loader is set as Thread.contextClassLoader ensuring that if there - * are any libraries used by Spring beans that rely on loading resources via - * Thread.contextClassLoader can find such resources. - */ - static SpringDataExchanger createSpringContextDelegate(String classpath, String config) { - List urls = gatherAdditionalClassPathUrls(classpath); - SpringContextClassLoader contextCl = new SpringContextClassLoader(urls.toArray(new URL[] {}), - SpringContextFactory.class.getClassLoader()); - ClassLoader tContextCl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(contextCl); - try { - InputStream delegateStream = contextCl.getResourceAsStream(SC_DELEGATE_NAME.replace('.', '/') + ".class"); - byte[] delegateBytes = IOUtils.toByteArray(delegateStream); - Class clazz = contextCl.doDefineClass(SC_DELEGATE_NAME, delegateBytes, 0, delegateBytes.length); - Constructor ctr = clazz.getDeclaredConstructor(String.class); - ctr.setAccessible(true); - SpringDataExchanger springDelegate = (SpringDataExchanger) ctr.newInstance(config); - if (logger.isInfoEnabled()) { - logger.info("Successfully instantiated Spring Application Context from '" + config + "'"); - } - return springDelegate; - } catch (Exception e) { - try { - contextCl.close(); - } catch (Exception e2) { - // ignore - } - throw new IllegalStateException("Failed to instantiate Spring Application Context. Config path: '" + config - + "'; Classpath: " + Arrays.asList(urls), e); - } finally { - Thread.currentThread().setContextClassLoader(tContextCl); - } - } - - /** - * - */ - static List gatherAdditionalClassPathUrls(String classPathRoot) { - if (logger.isDebugEnabled()) { - logger.debug("Adding additional resources from '" + classPathRoot + "' to the classpath."); - } - File classPathRootFile = new File(classPathRoot); - if (classPathRootFile.exists() && classPathRootFile.isDirectory()) { - String[] cpResourceNames = classPathRootFile.list(); - try { - List urls = new ArrayList<>(); - for (String resourceName : cpResourceNames) { - File r = new File(classPathRootFile, resourceName); - if (r.getName().toLowerCase().endsWith(".jar") || r.isDirectory()) { - URL url = r.toURI().toURL(); - urls.add(url); - if (logger.isDebugEnabled()) { - logger.debug("Identifying additional resource to the classpath: " + url); - } - } - } - urls.add(classPathRootFile.toURI().toURL()); - - return urls; - } catch (Exception e) { - throw new IllegalStateException( - "Failed to parse user libraries from '" + classPathRootFile.getAbsolutePath() + "'", e); - } - } else { - throw new IllegalArgumentException("Path '" + classPathRootFile.getAbsolutePath() - + "' is not valid because it doesn't exist or does not point to a directory."); - } - } - - /** - * - */ - private static class SpringContextClassLoader extends URLClassLoader { - /** - * - */ - public SpringContextClassLoader(URL[] urls, ClassLoader parent) { - super(urls, parent); - } - - /** - * - */ - public final Class doDefineClass(String name, byte[] b, int off, int len) { - return this.defineClass(name, b, off, len); - } - } -} diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java deleted file mode 100644 index c6497db850..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java +++ /dev/null @@ -1,442 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.spring; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.components.resource.ResourceCardinality; -import org.apache.nifi.components.resource.ResourceType; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.spring.SpringDataExchanger.SpringResponse; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.util.FormatUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.PollableChannel; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -/** - * Implementation of {@link Processor} capable of sending and receiving data - * from application defined in Spring Application context. It does so via - * predefined in/out {@link MessageChannel}s (see spring-messaging module of - * Spring). Once such channels are defined user is free to implement the rest of - * the application any way they wish (e.g., custom code and/or using frameworks - * such as Spring Integration or Camel). - *

- * The requirement and expectations for channel types are: - *

    - *
  • Input channel must be of type {@link MessageChannel} and named "fromNiFi" - * (see {@link SpringNiFiConstants#FROM_NIFI})
  • - *
  • Output channel must be of type {@link PollableChannel} and named "toNiFi" - * (see {@link SpringNiFiConstants#TO_NIFI})
  • - *
- *

- * Below is the example of sample configuration: - * - *
- * <?xml version="1.0" encoding="UTF-8"?>
- * <beans xmlns="http://www.springframework.org/schema/beans"
- *   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- *   xmlns:int="http://www.springframework.org/schema/integration"
- *  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- *      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd">
- *
- *  <int:channel id="fromNiFi"/>
- *
- *  . . . . .
- *
- *  <int:channel id="toNiFi">
- *      <int:queue/>
- *  </int:channel>
- *
- * </beans>
- * 
- *

- * Defining {@link MessageChannel} is optional. That's why this processor - * supports 3 modes of interaction with Spring Application Context: - *

    - *
  • Headless – no channels are defined therefore nothing is sent to or - * received from such Application Contexts (i.e., some monitoring app).
  • - *
  • One way (NiFi -> Spring or Spring -> NiFi) - depends on existence - * of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context. - *
  • - *
  • Bi-directional (NiFi -> Spring -> Nifi or Spring -> NiFi -> - * Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in - * the Spring Application Context
  • - *
- * - *

- *

- * To create an instance of the ApplicationConetxt this processor requires user - * to provide configuration file path and the path to the resources that needs - * to be added to the classpath of ApplicationContext. This essentially allows - * user to package their Spring Application any way they want as long as - * everything it requires is available on the classpath. - *

- *

- * Data exchange between Spring and NiFi relies on simple mechanism which is - * exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is - * converted to primitive representation that can be easily wrapped in Spring - * {@link Message}. The requirement imposed by this Processor is to send/receive - * {@link Message} with payload of type byte[] and headers of type - * Map<String, Object>. This is primarily for simplicity and type - * safety. Converters and Transformers could be used by either side to change - * representation of the content that is being exchanged between NiFi and - * Spring. - */ -@TriggerWhenEmpty -@Tags({ "Spring", "Message", "Get", "Put", "Integration" }) -@CapabilityDescription("A Processor that supports sending and receiving data from application defined in " - + "Spring Application Context via predefined in/out MessageChannels.") -public class SpringContextProcessor extends AbstractProcessor { - private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class); - - public static final PropertyDescriptor CTX_CONFIG_PATH = new PropertyDescriptor.Builder() - .name("Application Context config path") - .description("The path to the Spring Application Context configuration file relative to the classpath") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor CTX_LIB_PATH = new PropertyDescriptor.Builder() - .name("Application Context class path") - .description("Path to the directory with resources (i.e., JARs, configuration files etc.) required to be on " - + "the classpath of the ApplicationContext.") - .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.DIRECTORY) - .required(true) - .build(); - public static final PropertyDescriptor SEND_TIMEOUT = new PropertyDescriptor.Builder() - .name("Send Timeout") - .description("Timeout for sending data to Spring Application Context. Defaults to 0.") - .required(false) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - public static final PropertyDescriptor RECEIVE_TIMEOUT = new PropertyDescriptor.Builder() - .name("Receive Timeout") - .description("Timeout for receiving date from Spring context. Defaults to 0.") - .required(false) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - - // ==== - - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") - .description( - "All FlowFiles that are successfully received from Spring Application Context are routed to this relationship") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") - .description( - "All FlowFiles that cannot be sent to Spring Application Context are routed to this relationship") - .build(); - - private final static Set relationships; - - private final static List propertyDescriptors; - - // ======= - - private volatile String applicationContextConfigFileName; - - private volatile String applicationContextLibPath; - - private volatile long sendTimeout; - - private volatile long receiveTimeout; - - private volatile SpringDataExchanger exchanger; - - static { - List _propertyDescriptors = new ArrayList<>(); - _propertyDescriptors.add(CTX_CONFIG_PATH); - _propertyDescriptors.add(CTX_LIB_PATH); - _propertyDescriptors.add(SEND_TIMEOUT); - _propertyDescriptors.add(RECEIVE_TIMEOUT); - propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); - - Set _relationships = new HashSet<>(); - _relationships.add(REL_SUCCESS); - _relationships.add(REL_FAILURE); - relationships = Collections.unmodifiableSet(_relationships); - } - - /** - * - */ - @Override - public Set getRelationships() { - return relationships; - } - - /** - * - */ - @OnScheduled - public void initializeSpringContext(ProcessContext processContext) { - this.applicationContextConfigFileName = processContext.getProperty(CTX_CONFIG_PATH).getValue(); - this.applicationContextLibPath = processContext.getProperty(CTX_LIB_PATH).getValue(); - - String stStr = processContext.getProperty(SEND_TIMEOUT).getValue(); - this.sendTimeout = stStr == null ? 0 : FormatUtils.getTimeDuration(stStr, TimeUnit.MILLISECONDS); - - String rtStr = processContext.getProperty(RECEIVE_TIMEOUT).getValue(); - this.receiveTimeout = rtStr == null ? 0 : FormatUtils.getTimeDuration(rtStr, TimeUnit.MILLISECONDS); - - try { - if (logger.isDebugEnabled()) { - logger.debug( - "Initializing Spring Application Context defined in " + this.applicationContextConfigFileName); - } - this.exchanger = SpringContextFactory.createSpringContextDelegate(this.applicationContextLibPath, - this.applicationContextConfigFileName); - } catch (Exception e) { - throw new IllegalStateException("Failed while initializing Spring Application Context", e); - } - if (logger.isInfoEnabled()) { - logger.info("Successfully initialized Spring Application Context defined in " - + this.applicationContextConfigFileName); - } - } - - /** - * Will close the 'exchanger' which in turn will close both Spring - * Application Context and the ClassLoader that loaded it allowing new - * instance of Spring Application Context to be created upon the next start - * (which may have an updated classpath and functionality) without - * restarting NiFi. - */ - @OnStopped - public void closeSpringContext(ProcessContext processContext) { - if (this.exchanger != null) { - try { - if (logger.isDebugEnabled()) { - logger.debug("Closing Spring Application Context defined in " + this.applicationContextConfigFileName); - } - this.exchanger.close(); - if (logger.isInfoEnabled()) { - logger.info("Successfully closed Spring Application Context defined in " - + this.applicationContextConfigFileName); - } - } catch (IOException e) { - getLogger().warn("Failed while closing Spring Application Context", e); - } - } - } - - /** - * - */ - @Override - public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException { - FlowFile flowFileToProcess = processSession.get(); - if (flowFileToProcess != null) { - this.sendToSpring(flowFileToProcess, context, processSession); - } - this.receiveFromSpring(processSession); - } - - @Override - protected Collection customValidate(final ValidationContext validationContext) { - SpringContextConfigValidator v = new SpringContextConfigValidator(); - return Collections.singletonList(v.validate(CTX_CONFIG_PATH.getName(), null, validationContext)); - } - - /** - * - */ - @Override - protected List getSupportedPropertyDescriptors() { - return propertyDescriptors; - } - - /** - * - */ - private void sendToSpring(FlowFile flowFileToProcess, ProcessContext context, ProcessSession processSession) { - byte[] payload = this.extractMessage(flowFileToProcess, processSession); - boolean sent = false; - - try { - sent = this.exchanger.send(payload, flowFileToProcess.getAttributes(), this.sendTimeout); - if (sent) { - processSession.getProvenanceReporter().send(flowFileToProcess, this.applicationContextConfigFileName); - processSession.remove(flowFileToProcess); - } else { - processSession.transfer(processSession.penalize(flowFileToProcess), REL_FAILURE); - this.getLogger().error("Timed out while sending FlowFile to Spring Application Context " - + this.applicationContextConfigFileName); - context.yield(); - } - } catch (Exception e) { - processSession.transfer(flowFileToProcess, REL_FAILURE); - this.getLogger().error("Failed while sending FlowFile to Spring Application Context " - + this.applicationContextConfigFileName + "; " + e.getMessage(), e); - context.yield(); - } - } - - /** - * - */ - private void receiveFromSpring(ProcessSession processSession) { - final SpringResponse msgFromSpring = this.exchanger.receive(this.receiveTimeout); - if (msgFromSpring != null) { - FlowFile flowFileToProcess = processSession.create(); - flowFileToProcess = processSession.write(flowFileToProcess, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - Object payload = msgFromSpring.getPayload(); - byte[] payloadBytes = payload instanceof String ? ((String) payload).getBytes() : (byte[]) payload; - out.write(payloadBytes); - } - }); - flowFileToProcess = processSession.putAllAttributes(flowFileToProcess, - this.extractFlowFileAttributesFromMessageHeaders(msgFromSpring.getHeaders())); - processSession.transfer(flowFileToProcess, REL_SUCCESS); - processSession.getProvenanceReporter().receive(flowFileToProcess, this.applicationContextConfigFileName); - } - } - - /** - * - */ - private Map extractFlowFileAttributesFromMessageHeaders(Map messageHeaders) { - Map attributes = new HashMap<>(); - for (Entry entry : messageHeaders.entrySet()) { - if (entry.getValue() instanceof String) { - attributes.put(entry.getKey(), (String) entry.getValue()); - } - } - return attributes; - } - - /** - * Extracts contents of the {@link FlowFile} to byte array. - */ - private byte[] extractMessage(FlowFile flowFile, ProcessSession processSession) { - final byte[] messageContent = new byte[(int) flowFile.getSize()]; - processSession.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, messageContent, true); - } - }); - return messageContent; - } - - /** - * - */ - static class SpringContextConfigValidator implements Validator { - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - String configPath = context.getProperty(CTX_CONFIG_PATH).getValue(); - String libDirPath = context.getProperty(CTX_LIB_PATH).getValue(); - - StringBuilder invalidationMessageBuilder = new StringBuilder(); - if (configPath != null && libDirPath != null) { - validateClassPath(libDirPath, invalidationMessageBuilder); - - if (invalidationMessageBuilder.length() == 0 && !isConfigResolvable(configPath, new File(libDirPath))) { - invalidationMessageBuilder.append("'Application Context config path' can not be located " - + "in the provided classpath."); - } - } else if (StringUtils.isEmpty(configPath)) { - invalidationMessageBuilder.append("'Application Context config path' must not be empty."); - } else { - if (StringUtils.isEmpty(libDirPath)) { - invalidationMessageBuilder.append("'Application Context class path' must not be empty."); - } else { - validateClassPath(libDirPath, invalidationMessageBuilder); - } - } - - String invalidationMessage = invalidationMessageBuilder.toString(); - ValidationResult vResult = invalidationMessage.length() == 0 - ? new ValidationResult.Builder().subject(subject).input(input) - .explanation("Spring configuration '" + configPath + "' is resolvable " - + "against provided classpath '" + libDirPath + "'.").valid(true).build() - : new ValidationResult.Builder().subject(subject).input(input) - .explanation("Spring configuration '" + configPath + "' is NOT resolvable " - + "against provided classpath '" + libDirPath + "'. Validation message: " + invalidationMessage).valid(false).build(); - - return vResult; - } - } - - /** - * - */ - private static void validateClassPath(String libDirPath, StringBuilder invalidationMessageBuilder) { - File libDirPathFile = new File(libDirPath); - if (!libDirPathFile.exists()) { - invalidationMessageBuilder.append( - "'Application Context class path' does not exist. Was '" + libDirPathFile.getAbsolutePath() + "'."); - } else if (!libDirPathFile.isDirectory()) { - invalidationMessageBuilder.append("'Application Context class path' must point to a directory. Was '" - + libDirPathFile.getAbsolutePath() + "'."); - } - } - - /** - * - */ - private static boolean isConfigResolvable(String configPath, File libDirPathFile) { - ClassLoader parentLoader = SpringContextProcessor.class.getClassLoader(); - boolean resolvable = false; - URL[] urls = SpringContextFactory.gatherAdditionalClassPathUrls(libDirPathFile.getAbsolutePath()).toArray(new URL[]{}); - try (URLClassLoader throwawayCl = new URLClassLoader(urls, parentLoader)) { - resolvable = throwawayCl.getResource(configPath) != null; - } catch (IOException e) { - // ignore since it can only happen on CL.close() - } - return resolvable; - } -} diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringDataExchanger.java b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringDataExchanger.java deleted file mode 100644 index d05ba9f660..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringDataExchanger.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.spring; - -import java.io.Closeable; -import java.util.Map; - -import org.apache.nifi.flowfile.FlowFile; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.PollableChannel; - -/** - * Strategy to support type-safe data exchange between NiFi and Spring. It - * exposes send/receive operations with arguments that are independent of both - * frameworks simplifying conversion from {@link Message} to {@link FlowFile} - * and vice versa. - */ -public interface SpringDataExchanger extends Closeable { - - /** - * Sends data to Spring - * - * @param payload - * data that will be used as a payload or {@link Message} - * @param headers - * map that will be used to construct {@link MessageHeaders} - * @param timeout - * value to pass to the - * {@link MessageChannel#send(Message, long)} operation. - * @return 'true' if message was sent and 'false'otherwise. - */ - boolean send(T payload, Map headers, long timeout); - - /** - * Receives data from Spring - * - * @param timeout - * value to pass to {@link PollableChannel#receive(long)} - * operation - * @return {@link SpringResponse} representing content (key) and - * attributes of the FlowFile to be constructed. - */ - SpringResponse receive(long timeout); - - public static class SpringResponse { - private final T payload; - private final Map headers; - - public SpringResponse(T payload, Map headers) { - this.payload = payload; - this.headers = headers; - } - - public T getPayload() { - return payload; - } - - public Map getHeaders() { - return headers; - } - } -} diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringNiFiConstants.java b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringNiFiConstants.java deleted file mode 100644 index 93afc2f5a4..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringNiFiConstants.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.spring; - -/** - * Constants used by this module - */ -public interface SpringNiFiConstants { - - final String FROM_NIFI = "fromNiFi"; - - final String TO_NIFI = "toNiFi"; -} diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/bootstrap/SpringContextDelegate.java b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/bootstrap/SpringContextDelegate.java deleted file mode 100644 index 2507da3d86..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/bootstrap/SpringContextDelegate.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.spring.bootstrap; - -import java.io.Closeable; -import java.io.IOException; -import java.net.URLClassLoader; -import java.util.Map; - -import org.apache.nifi.spring.SpringDataExchanger; -import org.apache.nifi.spring.SpringNiFiConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.PollableChannel; -import org.springframework.messaging.support.MessageBuilder; - -/** - * Scopes instance of itself to a dedicated {@link ClassLoader}, thus allowing - * Spring Application Context and its class path to be modified and refreshed by - * simply re-starting SpringContextProcessor. Also ensures that there are no - * class path collisions between multiple instances of Spring Context Processor - * which are loaded by the same NAR Class Loader. - */ -/* - * This class is for internal use only and must never be instantiated by the NAR - * Class Loader (hence in a isolated package with nothing referencing it). It is - * loaded by a dedicated CL via byte array that represents it ensuring that this - * class can be loaded multiple times by multiple Class Loaders within a single - * instance of NAR. - */ -final class SpringContextDelegate implements Closeable, SpringDataExchanger { - - private final Logger logger = LoggerFactory.getLogger(SpringContextDelegate.class); - - private final ClassPathXmlApplicationContext applicationContext; - - private final MessageChannel toSpringChannel; - - private final PollableChannel fromSpringChannel; - - private final String configName; - - /** - * - */ - private SpringContextDelegate(String configName) { - this.configName = configName; - ClassLoader orig = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); - if (logger.isDebugEnabled()) { - logger.debug("Using " + Thread.currentThread().getContextClassLoader() - + " as context class loader while loading Spring Context '" + configName + "'."); - } - try { - this.applicationContext = new ClassPathXmlApplicationContext(configName); - if (this.applicationContext.containsBean(SpringNiFiConstants.FROM_NIFI)){ - this.toSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.FROM_NIFI, MessageChannel.class); - if (logger.isDebugEnabled()) { - logger.debug("Spring Application Context defined in '" + configName - + "' is capable of receiving messages from NiFi since 'fromNiFi' channel was discovered."); - } - } else { - this.toSpringChannel = null; - } - if (this.applicationContext.containsBean(SpringNiFiConstants.TO_NIFI)){ - this.fromSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.TO_NIFI, PollableChannel.class); - if (logger.isDebugEnabled()) { - logger.debug("Spring Application Context defined in '" + configName - + "' is capable of sending messages to " + "NiFi since 'toNiFi' channel was discovered."); - } - } else { - this.fromSpringChannel = null; - } - if (logger.isInfoEnabled() && this.toSpringChannel == null && this.fromSpringChannel == null){ - logger.info("Spring Application Context is headless since neither 'fromNiFi' nor 'toNiFi' channels were defined. " - + "No data will be exchanged."); - } - } finally { - Thread.currentThread().setContextClassLoader(orig); - } - } - - /** - * - */ - @Override - public boolean send(T payload, Map messageHeaders, long timeout) { - if (this.toSpringChannel != null){ - return this.toSpringChannel.send(MessageBuilder.withPayload(payload).copyHeaders(messageHeaders).build(), timeout); - } else { - throw new IllegalStateException("Failed to send message to '" + this.configName - + "'. There are no 'fromNiFi' channels configured which means the Application Conetxt is not set up to receive messages from NiFi"); - } - } - - /** - * - */ - @SuppressWarnings("unchecked") - @Override - public SpringResponse receive(long timeout) { - if (this.fromSpringChannel != null) { - final Message message = (Message) this.fromSpringChannel.receive(timeout); - if (message != null) { - if (!(message.getPayload() instanceof byte[]) && !(message.getPayload() instanceof String)) { - throw new IllegalStateException("Failed while receiving message from Spring due to the " - + "payload type being other then byte[] or String which are the only types that are supported. Please " - + "apply transformation/conversion on Spring side when sending message back to NiFi"); - } - return new SpringResponse(message.getPayload(), message.getHeaders()); - } - } - return null; - } - - /** - * - */ - @Override - public void close() throws IOException { - logger.info("Closing Spring Application Context"); - this.applicationContext.close(); - if (logger.isInfoEnabled()) { - logger.info("Closing " + this.getClass().getClassLoader()); - } - ((URLClassLoader) this.getClass().getClassLoader()).close(); - logger.info("Successfully closed Spring Application Context and its ClassLoader."); - } -} diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor deleted file mode 100644 index 55e6a73b15..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ /dev/null @@ -1,15 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.nifi.spring.SpringContextProcessor \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/resources/docs/org.apache.nifi.spring.SpringContextProcessor/additionalDetails.html b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/resources/docs/org.apache.nifi.spring.SpringContextProcessor/additionalDetails.html deleted file mode 100644 index 96c524e133..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/resources/docs/org.apache.nifi.spring.SpringContextProcessor/additionalDetails.html +++ /dev/null @@ -1,94 +0,0 @@ - - - - - - SpringContextProcessor - - - - - -

Description:

-

- SpringContextProcessor – allows integration of processes encapsulated in Spring Application Context to run as NiFi - processor by becoming a runtime host for an instance of Spring Application Context. -

-

- Communication between NiFi and process encapsulated within Spring Application Context is accomplished via Spring Messaging - (one of the core modules of Spring Framework) and supports 3 usage modes: -

    -
  • Headless - no interaction with NiFi, meaning nothing is sent to it and nothing is received from it (i.e., some monitoring app). - In this case NiFi simply plays the role of the runtime host.
  • -
  • One way (NiFi -> Spring or Spring -> NiFi). - This depends on existence of pre-defined message channel in Spring - Application Context. The name of the channel should be “fromNiFi” and the type org.springframework.messaging.MessageChannel.
  • -
  • By-directional (NiFi -> Spring -> NiFi or Spring -> NiFi -> Spring) - This depends on existence of two channels - in Spring Application Context. One channel receives messages from NiFi with name “fromNiFi” and type org.springframework.messaging.MessageChanneli> - and another is to receive messages from Spring with name “toNiFi” and type org.springframework.messaging.PollableChannel.
  • -
- The example below demonstrates template configuration for bi-directional Spring Application Context configuration: - -
-    <int:channel id=”fromNiFi”/>
-
-    <!—
-    your custom app configuration to receive messages from ‘fromNiFi’ channel and optionally send back to NiFi via ‘toNiFi’ channel.
-    It could contain any Spring-based application (i.e., Spring Integration, Apache Camel and/or custom code). All you need to do is inject
-    channels into your beans and send/receive messages from it.
-    -->
-
-    <int:channel id="toNiFi">
-        <int:queue/>
-    </int:channel>
-            
-
-

-

- The component is based on assumption that user has an existing Spring Application encapsulated in Spring Context that exposes optional in/out - MessagingChannels to allow data to flow to/from ApplicationContext and into/out-of. NiFi. - Such application is realized by having a directory on the file system, which contains contains all required resources for such application to run. - Such resources usually are JAR files to satisfy application's class-path as well as JAR representing the application and its configuration. - Below is the example of what such directory may contain. In this case the 'SI_DEMO-0.0.1-SNAPSHOT.jar' represents the actual application and the rest - of the JARs represent class-path dependency required by an application. -

-        deps
-         ├── SI_DEMO-0.0.1-SNAPSHOT.jar
-         ├── aopalliance-1.0.jar
-         ├── commons-logging-1.2.jar
-         ├── spring-aop-VERSION.jar
-         ├── spring-beans-VERSION.jar
-         ├── spring-context-VERSION.jar
-         ├── spring-core-VERSION.jar
-         ├── spring-expression-VERSION.jar
-         ├── spring-integration-core-VERSION.jar
-         ├── spring-messaging-VERSION.jar
-        
-

-

- You introduce the processor the usual way and then configure its properties: -

    -
  • Application Context config path [REQUIRED] - a path to the Application Context configuration. - The path is relative to the class-path of the application defined by the Application Context class path property
  • -
  • Application Context class path [REQUIRED] - a path to a directory on the file system where application dependencies are.
  • -
  • Send Timeout [OPTIONAL] - the wait time for sending messages to Spring Application Context. Only required if NiFi plans to send data to Spring. - Defaults to 0 (don't wait). FlowFiles that were successfully sent to Spring will be removed from session while FlowFiles that could not be sent to - Spring will be routed to failure relationship.
  • -
  • Receive Timeout - [OPTIONAL] - the wait time for receiving messages from Spring Application Context. Only required if NiFi plans to receive data from Spring. - Defaults to 0 (don't wait). FlowFile is created if and only if a message is successfully received from Spring. It is then transferred to success relationship.
  • -
-

- - diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/java/org/apache/nifi/spring/SpringContextFactoryTests.java b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/java/org/apache/nifi/spring/SpringContextFactoryTests.java deleted file mode 100644 index c9883906e1..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/java/org/apache/nifi/spring/SpringContextFactoryTests.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.spring; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.util.HashMap; - -import org.apache.nifi.spring.SpringDataExchanger.SpringResponse; -import org.junit.jupiter.api.Test; - -public class SpringContextFactoryTests { - - @Test - public void validateACFailureDueToNotFound() { - assertThrows(IllegalStateException.class, () -> SpringContextFactory.createSpringContextDelegate(".", "foo.xml")); - } - - @Test - public void validateMessageNotSentNoFromNiFiChannel() throws Exception { - SpringDataExchanger delegate = SpringContextFactory.createSpringContextDelegate(".", "context.xml"); - try { - assertThrows(IllegalStateException.class, () -> delegate.send("hello", new HashMap(), 1000L)); - } finally { - delegate.close(); - } - } - - @Test - public void validateMessageNotReceivedNoToNiFiChannel() throws Exception { - SpringDataExchanger delegate = SpringContextFactory.createSpringContextDelegate(".", "context.xml"); - SpringResponse fromSpring = delegate.receive(1000L); - assertNull(fromSpring); - delegate.close(); - } - - @Test - public void validateOneWaySent() throws Exception { - SpringDataExchanger delegate = SpringContextFactory.createSpringContextDelegate(".", "toSpringOnly.xml"); - boolean sent = delegate.send("hello", new HashMap(), 1000L); - assertTrue(sent); - SpringResponse fromSpring = delegate.receive(1000L); - assertNull(fromSpring); - delegate.close(); - } - - @Test - public void validateOneWayReceive() throws Exception { - SpringDataExchanger delegate = SpringContextFactory.createSpringContextDelegate(".", "fromSpringOnly.xml"); - try { - delegate.send("hello", new HashMap(), 1000L); - } catch (IllegalStateException e) { - // ignore since its expected - } - SpringResponse fromSpring = delegate.receive(1000L); - assertNotNull(fromSpring); - delegate.close(); - } - - @Test - public void validateRequestReply() throws Exception { - SpringDataExchanger delegate = SpringContextFactory.createSpringContextDelegate(".", "requestReply.xml"); - boolean sent = delegate.send("hello", new HashMap(), 1000L); - assertTrue(sent); - SpringResponse fromSpring = delegate.receive(1000L); - assertNotNull(fromSpring); - assertEquals("hello-hello", fromSpring.getPayload()); - assertEquals("foo", fromSpring.getHeaders().get("foo")); - delegate.close(); - } - - @Test - public void validateMultipleSendsWithAggregatedReply() throws Exception { - SpringDataExchanger delegate = SpringContextFactory.createSpringContextDelegate(".", "aggregated.xml"); - // 1 - boolean sent = delegate.send("hello", new HashMap(), 1000L); - assertTrue(sent); - SpringResponse fromSpring = delegate.receive(100L); - assertNull(fromSpring); - // 2 - sent = delegate.send("hello", new HashMap(), 1000L); - assertTrue(sent); - fromSpring = delegate.receive(100L); - assertNull(fromSpring); - // 3 - sent = delegate.send("hello", new HashMap(), 1000L); - assertTrue(sent); - fromSpring = delegate.receive(100L); - assertNull(fromSpring); - // 4 - sent = delegate.send("hello", new HashMap(), 1000L); - assertTrue(sent); - fromSpring = delegate.receive(100L); - assertNotNull(fromSpring); - assertEquals("4", fromSpring.getPayload()); - delegate.close(); - } -} diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/java/org/apache/nifi/spring/SpringContextProcessorTest.java b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/java/org/apache/nifi/spring/SpringContextProcessorTest.java deleted file mode 100644 index c3e60cb11b..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/java/org/apache/nifi/spring/SpringContextProcessorTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.spring; - -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.spring.SpringDataExchanger.SpringResponse; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -import java.lang.reflect.Field; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class SpringContextProcessorTest { - - @Test - public void notValid() { - TestRunner runner = TestRunners.newTestRunner(SpringContextProcessor.class); - runner.assertNotValid(); - - runner = TestRunners.newTestRunner(SpringContextProcessor.class); - runner.setProperty(SpringContextProcessor.CTX_CONFIG_PATH, "context.xml"); - runner.assertNotValid(); - - runner = TestRunners.newTestRunner(SpringContextProcessor.class); - runner.setProperty(SpringContextProcessor.CTX_LIB_PATH, "fool"); - runner.assertNotValid(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateOneWayFromNiFi() { - TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); - runner.setProperty(SpringContextProcessor.CTX_CONFIG_PATH, "toSpringOnly.xml"); - runner.setProperty(SpringContextProcessor.CTX_LIB_PATH, "."); - runner.setProperty(SpringContextProcessor.RECEIVE_TIMEOUT, "100 millis"); - runner.assertValid(); - runner.enqueue("Hello".getBytes()); - - TestProcessor processor = (TestProcessor) runner.getProcessor(); - SpringDataExchanger delegate = processor.getMockedDelegate(); - when(delegate.receive(Mockito.anyLong())).thenReturn(null); - when(delegate.send(Mockito.any(), Mockito.any(Map.class), Mockito.anyLong())).thenReturn(true); - - runner.run(1, false); - verify(delegate, times(1)).send(Mockito.any(), Mockito.any(Map.class), Mockito.anyLong()); - verify(delegate, times(1)).receive(100); - assertTrue(runner.getFlowFilesForRelationship(TestProcessor.REL_SUCCESS).isEmpty()); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateOneWayFromSpring() { - TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); - runner.setProperty(SpringContextProcessor.CTX_CONFIG_PATH, "fromSpringOnly.xml"); - runner.setProperty(SpringContextProcessor.CTX_LIB_PATH, "."); - runner.assertValid(); - - TestProcessor processor = (TestProcessor) runner.getProcessor(); - SpringDataExchanger delegate = processor.getMockedDelegate(); - SpringResponse r = new SpringResponse("hello".getBytes(), - Collections. emptyMap()); - when(delegate.receive(Mockito.anyLong())).thenReturn(r); - when(delegate.send(Mockito.any(), Mockito.any(Map.class), Mockito.anyLong())).thenReturn(true); - - runner.run(1, false); - - verify(delegate, never()).send(Mockito.any(), Mockito.any(Map.class), Mockito.anyLong()); - verify(delegate, times(1)).receive(0); - assertTrue(runner.getFlowFilesForRelationship(TestProcessor.REL_SUCCESS).size() == 1); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateBiDirectional() { - TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); - runner.setProperty(SpringContextProcessor.CTX_CONFIG_PATH, "requestReply.xml"); - runner.setProperty(SpringContextProcessor.CTX_LIB_PATH, "."); - runner.setProperty(SpringContextProcessor.RECEIVE_TIMEOUT, "100 millis"); - runner.assertValid(); - runner.enqueue("Hello".getBytes()); - - TestProcessor processor = (TestProcessor) runner.getProcessor(); - SpringDataExchanger delegate = processor.getMockedDelegate(); - Map headers = new HashMap<>(); - headers.put("foo", "foo"); - headers.put("bar", new Object()); - SpringResponse r = new SpringResponse("hello".getBytes(), headers); - when(delegate.receive(Mockito.anyLong())).thenReturn(r); - when(delegate.send(Mockito.any(), Mockito.any(Map.class), Mockito.anyLong())).thenReturn(true); - - runner.run(1, false); - verify(delegate, times(1)).send(Mockito.any(), Mockito.any(Map.class), Mockito.anyLong()); - verify(delegate, times(1)).receive(100); - List ffList = runner.getFlowFilesForRelationship(TestProcessor.REL_SUCCESS); - assertTrue(ffList.size() == 1); - assertEquals("foo", ffList.get(0).getAttribute("foo")); - assertNull(ffList.get(0).getAttribute("bar")); - runner.shutdown(); - } - - public static class TestProcessor extends SpringContextProcessor { - private final SpringDataExchanger mockedDelegate = mock(SpringDataExchanger.class); - - public SpringDataExchanger getMockedDelegate() { - return mockedDelegate; - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException { - try { - Field ef = SpringContextProcessor.class.getDeclaredField("exchanger"); - ef.setAccessible(true); - ef.set(this, this.mockedDelegate); - super.onTrigger(context, processSession); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - } -} diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/aggregated.xml b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/aggregated.xml deleted file mode 100644 index f425528522..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/aggregated.xml +++ /dev/null @@ -1,26 +0,0 @@ - - - - - - - - - - - - diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/context.xml b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/context.xml deleted file mode 100644 index 4c619b42e9..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/context.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - - diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/fromSpringOnly.xml b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/fromSpringOnly.xml deleted file mode 100644 index a29f38a38a..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/fromSpringOnly.xml +++ /dev/null @@ -1,27 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/log4j.properties b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/log4j.properties deleted file mode 100644 index 9397595a98..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/log4j.properties +++ /dev/null @@ -1,21 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -log4j.rootCategory=WARN, stdout - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n - -#log4j.category.org.apache.nifi.startup=INFO diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/requestReply.xml b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/requestReply.xml deleted file mode 100644 index 50bc95e13d..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/requestReply.xml +++ /dev/null @@ -1,27 +0,0 @@ - - - - - - - - - - - - - - - diff --git a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/toSpringOnly.xml b/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/toSpringOnly.xml deleted file mode 100644 index 904191b08c..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/test/resources/toSpringOnly.xml +++ /dev/null @@ -1,17 +0,0 @@ - - - - - - diff --git a/nifi-nar-bundles/nifi-spring-bundle/pom.xml b/nifi-nar-bundles/nifi-spring-bundle/pom.xml deleted file mode 100644 index 845c4b1bc7..0000000000 --- a/nifi-nar-bundles/nifi-spring-bundle/pom.xml +++ /dev/null @@ -1,37 +0,0 @@ - - - 4.0.0 - - org.apache.nifi - nifi-nar-bundles - 2.0.0-SNAPSHOT - - nifi-spring-bundle - pom - - nifi-spring-processors - nifi-spring-nar - - - - - org.apache.nifi - nifi-spring-processors - 2.0.0-SNAPSHOT - - - - diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 4e3e4049a3..abdbd7f4ab 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -63,7 +63,6 @@ nifi-jms-bundle nifi-beats-bundle nifi-cassandra-bundle - nifi-spring-bundle nifi-hive-bundle nifi-site-to-site-reporting-bundle nifi-mqtt-bundle