From 2ac39888818521c52a3f02577283bbd0543cfd43 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Thu, 30 Nov 2023 15:28:21 -0600 Subject: [PATCH] NIFI-12448 Removed InfluxDB Processors Signed-off-by: Pierre Villard This closes #8093. --- nifi-assembly/LICENSE | 23 -- nifi-code-coverage/pom.xml | 5 - .../nifi-influxdb-nar/pom.xml | 40 -- .../src/main/resources/META-INF/LICENSE | 231 ----------- .../src/main/resources/META-INF/NOTICE | 17 - .../nifi-influxdb-processors/pom.xml | 55 --- .../influxdb/AbstractInfluxDBProcessor.java | 155 -------- .../influxdb/ExecuteInfluxDBQuery.java | 322 ---------------- .../nifi/processors/influxdb/PutInfluxDB.java | 206 ---------- .../org.apache.nifi.processor.Processor | 16 - .../influxdb/AbstractITInfluxDB.java | 92 ----- .../influxdb/ITExecuteInfluxDBQuery.java | 362 ----------------- .../processors/influxdb/ITPutInfluxDB.java | 178 --------- .../influxdb/TestExecutetInfluxDBQuery.java | 227 ----------- .../processors/influxdb/TestPutInfluxDB.java | 364 ------------------ nifi-nar-bundles/nifi-influxdb-bundle/pom.xml | 42 -- nifi-nar-bundles/pom.xml | 1 - pom.xml | 2 - 18 files changed, 2338 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/pom.xml delete mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/LICENSE delete mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/NOTICE delete mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml delete mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java delete mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java delete mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java delete mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor delete mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java delete mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java delete mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITPutInfluxDB.java delete mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java delete mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestPutInfluxDB.java delete mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/pom.xml diff --git a/nifi-assembly/LICENSE b/nifi-assembly/LICENSE index 67c1b91c87..7dcfd4308b 100644 --- a/nifi-assembly/LICENSE +++ b/nifi-assembly/LICENSE @@ -2445,29 +2445,6 @@ style license. NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - The binary distribution of this product bundles 'influxdb-java' under an MIT - style license. - - Copyright (c) 2014-2017 Stefan Majer - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in all - copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - SOFTWARE. - This product bundles 'lodash' which is available under an MIT license. Copyright JS Foundation and other contributors diff --git a/nifi-code-coverage/pom.xml b/nifi-code-coverage/pom.xml index cd5870ab5b..6f3d72d0ce 100644 --- a/nifi-code-coverage/pom.xml +++ b/nifi-code-coverage/pom.xml @@ -1222,11 +1222,6 @@ nifi-iceberg-services-api 2.0.0-SNAPSHOT - - org.apache.nifi - nifi-influxdb-processors - 2.0.0-SNAPSHOT - org.apache.nifi nifi-iotdb-processors diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/pom.xml b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/pom.xml deleted file mode 100644 index 8baef11836..0000000000 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/pom.xml +++ /dev/null @@ -1,40 +0,0 @@ - - - - 4.0.0 - - - org.apache.nifi - nifi-influxdb-bundle - 2.0.0-SNAPSHOT - - - nifi-influxdb-nar - nar - - - - org.apache.nifi - nifi-standard-services-api-nar - 2.0.0-SNAPSHOT - nar - - - org.apache.nifi - nifi-influxdb-processors - - - diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/LICENSE deleted file mode 100644 index 37b7cf7866..0000000000 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/LICENSE +++ /dev/null @@ -1,231 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - -APACHE NIFI SUBCOMPONENTS: - -The Apache NiFi project contains subcomponents with separate copyright -notices and license terms. Your use of the source code for the these -subcomponents is subject to the terms and conditions of the following -licenses. - -This product bundles 'influxdb-java' which is available under an MIT license. - - Copyright (c) 2014-2017 Stefan Majer - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in all - copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - SOFTWARE. diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/NOTICE deleted file mode 100644 index 0ef2c7fdc5..0000000000 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/NOTICE +++ /dev/null @@ -1,17 +0,0 @@ -nifi-influxdb-nar -Copyright 2017-2020 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -=========================================== -Apache Software License v2 -=========================================== - -The following binary components are provided under the Apache Software License v2 - - (ASLv2) Apache Commons Lang - The following NOTICE information applies: - Apache Commons Lang - Copyright 2001-2015 The Apache Software Foundation - \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml deleted file mode 100644 index e245bca330..0000000000 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml +++ /dev/null @@ -1,55 +0,0 @@ - - - - 4.0.0 - - - org.apache.nifi - nifi-influxdb-bundle - 2.0.0-SNAPSHOT - - - nifi-influxdb-processors - jar - - - - org.influxdb - influxdb-java - 2.23 - - - org.apache.commons - commons-lang3 - - - org.apache.nifi - nifi-api - - - org.apache.nifi - nifi-utils - - - org.apache.nifi - nifi-mock - - - com.google.code.gson - gson - - - diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java deleted file mode 100644 index 77bedb74f3..0000000000 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.influxdb; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.util.StandardValidators; -import org.influxdb.InfluxDB; -import org.influxdb.InfluxDBFactory; -import okhttp3.OkHttpClient; -import okhttp3.OkHttpClient.Builder; - -/** - * Abstract base class for InfluxDB processors - */ -public abstract class AbstractInfluxDBProcessor extends AbstractProcessor { - - public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() - .name("influxdb-charset") - .displayName("Character Set") - .description("Specifies the character set of the document data.") - .required(true) - .defaultValue("UTF-8") - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .build(); - - public static final PropertyDescriptor INFLUX_DB_URL = new PropertyDescriptor.Builder() - .name("influxdb-url") - .displayName("InfluxDB connection URL") - .description("InfluxDB URL to connect to. Eg: http://influxdb:8086") - .defaultValue("http://localhost:8086") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .addValidator(StandardValidators.URL_VALIDATOR) - .build(); - - public static final PropertyDescriptor INFLUX_DB_CONNECTION_TIMEOUT = new PropertyDescriptor.Builder() - .name("InfluxDB Max Connection Time Out (seconds)") - .displayName("InfluxDB Max Connection Time Out (seconds)") - .description("The maximum time for establishing connection to the InfluxDB") - .defaultValue("0 seconds") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .sensitive(false) - .build(); - - public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder() - .name("influxdb-dbname") - .displayName("Database Name") - .description("InfluxDB database to connect to") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() - .name("influxdb-username") - .displayName("Username") - .required(false) - .description("Username for accessing InfluxDB") - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() - .name("influxdb-password") - .displayName("Password") - .required(false) - .description("Password for user") - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .sensitive(true) - .build(); - - public static final PropertyDescriptor MAX_RECORDS_SIZE = new PropertyDescriptor.Builder() - .name("influxdb-max-records-size") - .displayName("Max size of records") - .description("Maximum size of records allowed to be posted in one batch") - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .defaultValue("1 MB") - .required(true) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .build(); - - public static final String INFLUX_DB_ERROR_MESSAGE = "influxdb.error.message"; - protected AtomicReference influxDB = new AtomicReference<>(); - protected long maxRecordsSize; - - /** - * Helper method to create InfluxDB instance - * @return InfluxDB instance - */ - protected synchronized InfluxDB getInfluxDB(ProcessContext context) { - if ( influxDB.get() == null ) { - String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); - String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); - long connectionTimeout = context.getProperty(INFLUX_DB_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS); - String influxDbUrl = context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue(); - try { - influxDB.set(makeConnection(username, password, influxDbUrl, connectionTimeout)); - } catch(Exception e) { - getLogger().error("Error while getting connection {}", e.getLocalizedMessage(), e); - throw new RuntimeException("Error while getting connection " + e.getLocalizedMessage(),e); - } - getLogger().info("InfluxDB connection created for host {}", - new Object[] {influxDbUrl}); - } - return influxDB.get(); - } - - @OnScheduled - public void onScheduled(final ProcessContext context) { - } - - protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) { - Builder builder = new OkHttpClient.Builder().connectTimeout(connectionTimeout, TimeUnit.SECONDS); - if ( StringUtils.isBlank(username) || StringUtils.isBlank(password) ) { - return InfluxDBFactory.connect(influxDbUrl, builder); - } else { - return InfluxDBFactory.connect(influxDbUrl, username, password, builder); - } - } - - @OnStopped - public void close() { - if (getLogger().isDebugEnabled()) { - getLogger().info("Closing connection"); - } - if ( influxDB.get() != null ) { - influxDB.get().close(); - influxDB.set(null);; - } - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java deleted file mode 100644 index 150380dce4..0000000000 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java +++ /dev/null @@ -1,322 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.influxdb; - -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.influxdb.InfluxDB; -import org.influxdb.dto.Query; -import org.influxdb.dto.QueryResult; -import com.google.gson.Gson; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.SocketTimeoutException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.LinkedList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) -@SupportsBatching -@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) -@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") -@WritesAttributes({ - @WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), - @WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), - }) -public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { - - public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; - - private static final int DEFAULT_INFLUX_RESPONSE_CHUNK_SIZE = 0; - - public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() - .name("influxdb-query-result-time-unit") - .displayName("Query Result Time Units") - .description("The time unit of query results from the InfluxDB") - .defaultValue(TimeUnit.NANOSECONDS.name()) - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) - .sensitive(false) - .build(); - - public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() - .name("influxdb-query") - .displayName("InfluxDB Query") - .description("The InfluxDB query to execute. " - + "Note: If there are incoming connections, then the query is created from incoming FlowFile's content otherwise" - + " it is created from this property.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); - - public static final Integer MAX_CHUNK_SIZE = 10000; - - public static final PropertyDescriptor INFLUX_DB_QUERY_CHUNK_SIZE = new PropertyDescriptor.Builder() - .name("influxdb-query-chunk-size") - .displayName("Results chunk size") - .description("Chunking can be used to return results in a stream of smaller batches " - + "(each has a partial results up to a chunk size) rather than as a single response. " - + "Chunking queries can return an unlimited number of rows. Note: Chunking is enable when result chunk size is greater than 0") - .defaultValue(String.valueOf(DEFAULT_INFLUX_RESPONSE_CHUNK_SIZE)) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.createLongValidator(0, MAX_CHUNK_SIZE, true)) - .required(true) - .build(); - - static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") - .description("Successful InfluxDB queries are routed to this relationship").build(); - - static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") - .description("Falied InfluxDB queries are routed to this relationship").build(); - - static final Relationship REL_RETRY = new Relationship.Builder().name("retry") - .description("Failed queries that are retryable exception are routed to this relationship").build(); - - private static final Set relationships; - private static final List propertyDescriptors; - protected Gson gson = new Gson(); - - static { - final Set tempRelationships = new HashSet<>(); - tempRelationships.add(REL_SUCCESS); - tempRelationships.add(REL_FAILURE); - tempRelationships.add(REL_RETRY); - relationships = Collections.unmodifiableSet(tempRelationships); - final List tempDescriptors = new ArrayList<>(); - tempDescriptors.add(DB_NAME); - tempDescriptors.add(INFLUX_DB_URL); - tempDescriptors.add(INFLUX_DB_CONNECTION_TIMEOUT); - tempDescriptors.add(INFLUX_DB_QUERY_RESULT_TIMEUNIT); - tempDescriptors.add(INFLUX_DB_QUERY); - tempDescriptors.add(INFLUX_DB_QUERY_CHUNK_SIZE); - tempDescriptors.add(USERNAME); - tempDescriptors.add(PASSWORD); - tempDescriptors.add(CHARSET); - propertyDescriptors = Collections.unmodifiableList(tempDescriptors); - } - - @Override - public Set getRelationships() { - return relationships; - } - - @Override - public final List getSupportedPropertyDescriptors() { - return propertyDescriptors; - } - - @OnScheduled - public void onScheduled(final ProcessContext context) { - super.onScheduled(context); - // Either input connection or scheduled query is required - if ( ! context.getProperty(INFLUX_DB_QUERY).isSet() - && ! context.hasIncomingConnection() ) { - String error = "The InfluxDB Query processor requires input connection or scheduled InfluxDB query"; - getLogger().error(error); - throw new ProcessException(error); - } - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - - String query = null; - String database = null; - TimeUnit queryResultTimeunit = null; - Charset charset = null; - FlowFile outgoingFlowFile = null; - - // If there are incoming connections, prepare query params from flow file - if ( context.hasIncomingConnection() ) { - FlowFile incomingFlowFile = session.get(); - - if ( incomingFlowFile == null && context.hasNonLoopConnection() ) { - return; - } - - charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(incomingFlowFile).getValue()); - if ( incomingFlowFile.getSize() == 0 ) { - if ( context.getProperty(INFLUX_DB_QUERY).isSet() ) { - query = context.getProperty(INFLUX_DB_QUERY).evaluateAttributeExpressions(incomingFlowFile).getValue(); - } else { - String message = "FlowFile query is empty and no scheduled query is set"; - getLogger().error(message); - incomingFlowFile = session.putAttribute(incomingFlowFile, INFLUX_DB_ERROR_MESSAGE, message); - session.transfer(incomingFlowFile, REL_FAILURE); - return; - } - } else { - - try { - query = getQuery(session, charset, incomingFlowFile); - } catch(IOException ioe) { - getLogger().error("Exception while reading from FlowFile " + ioe.getLocalizedMessage(), ioe); - throw new ProcessException(ioe); - } - } - outgoingFlowFile = incomingFlowFile; - - } else { - outgoingFlowFile = session.create(); - charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(outgoingFlowFile).getValue()); - query = context.getProperty(INFLUX_DB_QUERY).evaluateAttributeExpressions(outgoingFlowFile).getValue(); - } - - database = context.getProperty(DB_NAME).evaluateAttributeExpressions(outgoingFlowFile).getValue(); - queryResultTimeunit = TimeUnit.valueOf(context.getProperty(INFLUX_DB_QUERY_RESULT_TIMEUNIT).evaluateAttributeExpressions(outgoingFlowFile).getValue()); - - try { - long startTimeMillis = System.currentTimeMillis(); - int chunkSize = context.getProperty(INFLUX_DB_QUERY_CHUNK_SIZE).evaluateAttributeExpressions(outgoingFlowFile).asInteger(); - List result = executeQuery(context, database, query, queryResultTimeunit, chunkSize); - - String json = result.size() == 1 ? gson.toJson(result.get(0)) : gson.toJson(result); - - if ( getLogger().isDebugEnabled() ) { - getLogger().debug("Query result {} ", new Object[] {result}); - } - - ByteArrayInputStream bais = new ByteArrayInputStream(json.getBytes(charset)); - session.importFrom(bais, outgoingFlowFile); - bais.close(); - - final long endTimeMillis = System.currentTimeMillis(); - - if ( ! hasErrors(result) ) { - outgoingFlowFile = session.putAttribute(outgoingFlowFile, INFLUX_DB_EXECUTED_QUERY, String.valueOf(query)); - session.getProvenanceReporter().send(outgoingFlowFile, makeProvenanceUrl(context, database), - (endTimeMillis - startTimeMillis)); - session.transfer(outgoingFlowFile, REL_SUCCESS); - } else { - outgoingFlowFile = populateErrorAttributes(session, outgoingFlowFile, query, queryErrors(result)); - session.transfer(outgoingFlowFile, REL_FAILURE); - } - - } catch (Exception exception) { - outgoingFlowFile = populateErrorAttributes(session, outgoingFlowFile, query, exception.getMessage()); - if ( exception.getCause() instanceof SocketTimeoutException ) { - getLogger().error("Failed to read from InfluxDB due SocketTimeoutException to {} and retrying", - exception.getCause().getLocalizedMessage(), exception.getCause()); - session.transfer(outgoingFlowFile, REL_RETRY); - } else { - getLogger().error("Failed to read from InfluxDB due to {}", - exception.getLocalizedMessage(), exception); - session.transfer(outgoingFlowFile, REL_FAILURE); - } - context.yield(); - } - } - - protected String getQuery(final ProcessSession session, Charset charset, FlowFile incomingFlowFile) - throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - session.exportTo(incomingFlowFile, baos); - baos.close(); - return new String(baos.toByteArray(), charset); - } - - protected String makeProvenanceUrl(final ProcessContext context, String database) { - return new StringBuilder("influxdb://") - .append(context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue()).append("/") - .append(database).toString(); - } - - protected List executeQuery(final ProcessContext context, String database, String query, TimeUnit timeunit, - int chunkSize) throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - InfluxDB influx = getInfluxDB(context); - Query influxQuery = new Query(query, database); - - if (chunkSize > 0) { - List results = new LinkedList<>(); - influx.query(influxQuery, chunkSize, result -> { - if (isQueryDone(result.getError())) { - latch.countDown(); - } else { - results.add(result); - } - }); - latch.await(); - - return results; - } else { - return Collections.singletonList(influx.query(influxQuery, timeunit)); - } - } - - private boolean isQueryDone(String error) { - return error != null && error.equals("DONE"); - } - - protected FlowFile populateErrorAttributes(final ProcessSession session, FlowFile flowFile, String query, - String message) { - Map attributes = new HashMap<>(); - attributes.put(INFLUX_DB_ERROR_MESSAGE, String.valueOf(message)); - attributes.put(INFLUX_DB_EXECUTED_QUERY, String.valueOf(query)); - flowFile = session.putAllAttributes(flowFile, attributes); - return flowFile; - } - - private Boolean hasErrors(List results) { - for (QueryResult result: results) { - if (result.hasError()) { - return true; - } - } - return false; - } - - private String queryErrors(List results) { - return results.stream() - .filter(QueryResult::hasError) - .map(QueryResult::getError) - .collect(Collectors.joining("\n")); - } - - @OnStopped - public void close() { - super.close(); - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java deleted file mode 100644 index 8b850bd0ce..0000000000 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.influxdb; - -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.influxdb.InfluxDB; -import org.influxdb.InfluxDBIOException; - -import java.io.ByteArrayOutputStream; -import java.net.SocketTimeoutException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -@SupportsBatching -@Tags({"influxdb", "measurement","insert", "write", "put", "timeseries"}) -@CapabilityDescription("Processor to write the content of a FlowFile in 'line protocol'. Please check details of the 'line protocol' in InfluxDB documentation (https://www.influxdb.com/). " - + " The flow file can contain single measurement point or multiple measurement points separated by line seperator. The timestamp (last field) should be in nano-seconds resolution.") -@WritesAttributes({ - @WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), - }) -public class PutInfluxDB extends AbstractInfluxDBProcessor { - - public static AllowableValue CONSISTENCY_LEVEL_ALL = new AllowableValue("ALL", "All", "Return success when all nodes have responded with write success"); - public static AllowableValue CONSISTENCY_LEVEL_ANY = new AllowableValue("ANY", "Any", "Return success when any nodes have responded with write success"); - public static AllowableValue CONSISTENCY_LEVEL_ONE = new AllowableValue("ONE", "One", "Return success when one node has responded with write success"); - public static AllowableValue CONSISTENCY_LEVEL_QUORUM = new AllowableValue("QUORUM", "Quorum", "Return success when a majority of nodes have responded with write success"); - - public static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder() - .name("influxdb-consistency-level") - .displayName("Consistency Level") - .description("InfluxDB consistency level") - .required(true) - .defaultValue(CONSISTENCY_LEVEL_ONE.getValue()) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .allowableValues(CONSISTENCY_LEVEL_ONE, CONSISTENCY_LEVEL_ANY, CONSISTENCY_LEVEL_ALL, CONSISTENCY_LEVEL_QUORUM) - .build(); - - public static final PropertyDescriptor RETENTION_POLICY = new PropertyDescriptor.Builder() - .name("influxdb-retention-policy") - .displayName("Retention Policy") - .description("Retention policy for the saving the records") - .defaultValue("autogen") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") - .description("Successful FlowFiles that are saved to InfluxDB are routed to this relationship").build(); - - static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") - .description("FlowFiles were not saved to InfluxDB are routed to this relationship").build(); - - static final Relationship REL_RETRY = new Relationship.Builder().name("retry") - .description("FlowFiles were not saved to InfluxDB due to retryable exception are routed to this relationship").build(); - - static final Relationship REL_MAX_SIZE_EXCEEDED = new Relationship.Builder().name("failure-max-size") - .description("FlowFiles exceeding max records size are routed to this relationship").build(); - - private static final Set relationships; - private static final List propertyDescriptors; - - static { - final Set tempRelationships = new HashSet<>(); - tempRelationships.add(REL_SUCCESS); - tempRelationships.add(REL_FAILURE); - tempRelationships.add(REL_RETRY); - tempRelationships.add(REL_MAX_SIZE_EXCEEDED); - relationships = Collections.unmodifiableSet(tempRelationships); - - final List tempDescriptors = new ArrayList<>(); - tempDescriptors.add(DB_NAME); - tempDescriptors.add(INFLUX_DB_URL); - tempDescriptors.add(INFLUX_DB_CONNECTION_TIMEOUT); - tempDescriptors.add(USERNAME); - tempDescriptors.add(PASSWORD); - tempDescriptors.add(CHARSET); - tempDescriptors.add(CONSISTENCY_LEVEL); - tempDescriptors.add(RETENTION_POLICY); - tempDescriptors.add(MAX_RECORDS_SIZE); - propertyDescriptors = Collections.unmodifiableList(tempDescriptors); - } - - @Override - public Set getRelationships() { - return relationships; - } - - @Override - public final List getSupportedPropertyDescriptors() { - return propertyDescriptors; - } - - @OnScheduled - public void onScheduled(final ProcessContext context) { - super.onScheduled(context); - maxRecordsSize = context.getProperty(MAX_RECORDS_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue(); - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - if ( flowFile.getSize() == 0) { - getLogger().error("Empty measurements"); - flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, "Empty measurement size " + flowFile.getSize()); - session.transfer(flowFile, REL_FAILURE); - return; - } - - if ( flowFile.getSize() > maxRecordsSize) { - getLogger().error("Message size of records exceeded {} max allowed is {}", new Object[] { flowFile.getSize(), maxRecordsSize}); - flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, "Max records size exceeded " + flowFile.getSize()); - session.transfer(flowFile, REL_MAX_SIZE_EXCEEDED); - return; - } - - Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue()); - String consistencyLevel = context.getProperty(CONSISTENCY_LEVEL).evaluateAttributeExpressions(flowFile).getValue(); - String database = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue(); - String retentionPolicy = context.getProperty(RETENTION_POLICY).evaluateAttributeExpressions(flowFile).getValue(); - - try { - long startTimeMillis = System.currentTimeMillis(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - session.exportTo(flowFile, baos); - String records = new String(baos.toByteArray(), charset); - - writeToInfluxDB(context, consistencyLevel, database, retentionPolicy, records); - - final long endTimeMillis = System.currentTimeMillis(); - getLogger().debug("Records {} inserted", new Object[] {records}); - - session.transfer(flowFile, REL_SUCCESS); - session.getProvenanceReporter().send(flowFile, - new StringBuilder("influxdb://").append(context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue()).append("/").append(database).toString(), - (endTimeMillis - startTimeMillis)); - } catch (InfluxDBIOException exception) { - flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, String.valueOf(exception.getMessage())); - if ( exception.getCause() instanceof SocketTimeoutException ) { - getLogger().error("Failed to insert into influxDB due SocketTimeoutException to {} and retrying", - exception.getLocalizedMessage(), exception); - session.transfer(flowFile, REL_RETRY); - } else { - getLogger().error("Failed to insert into influxDB due to {}", - exception.getLocalizedMessage(), exception); - session.transfer(flowFile, REL_FAILURE); - } - context.yield(); - } catch (Exception exception) { - getLogger().error("Failed to insert into influxDB due to {}", - exception.getLocalizedMessage(), exception); - flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, String.valueOf(exception.getMessage())); - session.transfer(flowFile, REL_FAILURE); - context.yield(); - } - } - - protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy, String records) { - getInfluxDB(context).write(database, retentionPolicy, InfluxDB.ConsistencyLevel.valueOf(consistencyLevel), records); - } - - @OnStopped - public void close() { - super.close(); - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor deleted file mode 100644 index 919b7b95d8..0000000000 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.nifi.processors.influxdb.PutInfluxDB -org.apache.nifi.processors.influxdb.ExecuteInfluxDBQuery diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java deleted file mode 100644 index c6a89ab198..0000000000 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.influxdb; - -import com.google.gson.reflect.TypeToken; -import org.apache.nifi.util.TestRunner; -import org.influxdb.InfluxDB; -import org.influxdb.InfluxDBFactory; -import org.influxdb.dto.Query; -import org.influxdb.dto.QueryResult; -import org.junit.jupiter.api.AfterEach; - -import java.lang.reflect.Type; -import java.util.List; - -/** - * Base integration test class for InfluxDB processors - */ -public class AbstractITInfluxDB { - protected TestRunner runner; - protected InfluxDB influxDB; - protected String dbName = "test"; - protected String dbUrl = "http://localhost:8086"; - protected String user = "admin"; - protected String password = "admin"; - protected static final String DEFAULT_RETENTION_POLICY = "autogen"; - - protected Type QueryResultListType = new TypeToken>(){}.getType(); - - protected void initInfluxDB() throws Exception { - influxDB = InfluxDBFactory.connect(dbUrl,user,password); - influxDB.createDatabase(dbName); - int max = 10; - while (!influxDB.databaseExists(dbName) && (max-- < 0)) { - Thread.sleep(5); - } - if ( ! influxDB.databaseExists(dbName) ) { - throw new Exception("unable to create database " + dbName); - } - } - - protected void cleanUpDatabase() throws InterruptedException { - if ( influxDB.databaseExists(dbName) ) { - QueryResult result = influxDB.query(new Query("DROP measurement water", dbName)); - checkError(result); - result = influxDB.query(new Query("DROP measurement testm", dbName)); - checkError(result); - result = influxDB.query(new Query("DROP measurement chunkedQueryTest", dbName)); - checkError(result); - result = influxDB.query(new Query("DROP database " + dbName, dbName)); - Thread.sleep(1000); - } - } - - protected void checkError(QueryResult result) { - if ( result.hasError() ) { - throw new IllegalStateException("Error while dropping measurements " + result.getError()); - } - } - - @AfterEach - public void tearDown() throws Exception { - runner = null; - if ( influxDB != null ) { - cleanUpDatabase(); - influxDB.close(); - } - } - - protected void initializeRunner() { - runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, dbName); - runner.setProperty(ExecuteInfluxDBQuery.USERNAME, user); - runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, password); - runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, dbUrl); - runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8"); - runner.assertValid(); - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java deleted file mode 100644 index 31fbbbf8b6..0000000000 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.influxdb; - -import com.google.gson.Gson; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunners; -import org.influxdb.InfluxDB; -import org.influxdb.dto.QueryResult; -import org.influxdb.dto.QueryResult.Series; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.StringReader; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - - -/** - * Integration test for executing InfluxDB queries. Please ensure that the InfluxDB is running - * on local host with default port and has database test with table test. Please set user - * and password if applicable before running the integration tests. - */ -public class ITExecuteInfluxDBQuery extends AbstractITInfluxDB { - - protected Gson gson = new Gson(); - @BeforeEach - public void setUp() throws Exception { - initInfluxDB(); - runner = TestRunners.newTestRunner(ExecuteInfluxDBQuery.class); - initializeRunner(); - runner.setEnvironmentVariableValue("influxDBUrl", "http://localhost:8086"); - runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "${influxDBUrl}"); - } - - @Test - public void testValidScheduleQueryWithNoIncoming() { - String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652"; - influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); - - String query = "select * from water"; - runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query); - - runner.setIncomingConnection(false); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null"); - assertEquals(query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal"); - - QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); - Series series = queryResult.getResults().get(0).getSeries().get(0); - validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0); - } - - @Test - public void testValidSinglePoint() { - String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652"; - influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); - String query = "select * from water"; - byte [] bytes = query.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null"); - assertEquals(query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal"); - - QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); - Series series = queryResult.getResults().get(0).getSeries().get(0); - validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0); - } - - @Test - public void testShowDatabases() { - String query = "show databases"; - byte [] bytes = query.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null"); - assertEquals(query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal"); - - String result = new String(flowFiles.get(0).toByteArray()); - QueryResult queryResult = gson.fromJson(new StringReader(result), QueryResult.class); - Series series = queryResult.getResults().get(0).getSeries().get(0); - assertEquals("databases", series.getName(), "series name should be same"); - assertEquals("name", series.getColumns().get(0), "series column should be same"); - boolean internal = series.getValues().get(0).stream().anyMatch(o -> o.equals("_internal")); - assertTrue(internal, "content should contain _internal " + queryResult); - boolean test = series.getValues().stream().flatMap(i -> ((List)i).stream()).anyMatch(o -> o.equals("test")); - assertTrue(test, "content should contain test " + queryResult); - } - - @Test - public void testCreateDB() { - String query = "create database test1"; - byte [] bytes = query.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null"); - assertEquals(query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal"); - - QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); - assertNotNull(queryResult.getResults(), "QueryResult should not be null"); - assertEquals(1, queryResult.getResults().size(), "results array should be same size"); - assertNull(queryResult.getResults().get(0).getSeries(), "No series"); - } - - @Test - public void testEmptyFlowFileQueryWithScheduledQuery() { - String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652"; - influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); - - String query = "select * from water"; - runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query); - - byte [] bytes = new byte [] {}; - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); - - List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertEquals(null, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be equal"); - assertEquals(query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal"); - - QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); - assertNotNull(queryResult.getResults(), "QueryResult should not be null"); - assertEquals(1, queryResult.getResults().size(), "results array should be same size"); - Series series = queryResult.getResults().get(0).getSeries().get(0); - validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0); - } - - @Test - public void testEmptyFlowFileQueryWithScheduledQueryEL() { - String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652"; - influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); - - String query = "select * from ${measurement}"; - runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query); - - byte [] bytes = new byte [] {}; - Map properties = new HashMap<>(); - properties.put("measurement","water"); - runner.enqueue(bytes, properties); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); - - List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null"); - assertEquals(query.replace("${measurement}", "water"), flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal"); - - QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); - assertNotNull(queryResult.getResults(), "QueryResult should not be null"); - assertEquals(1, queryResult.getResults().size(), "results array should be same size"); - Series series = queryResult.getResults().get(0).getSeries().get(0); - validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0); - } - - protected void validateSeries(String name, List columns, List values, String city, double rain) { - assertEquals("water", name, "Series name should be same"); - assertEquals("time", columns.get(0), "Series columns should be same"); - assertEquals("city", columns.get(1), "Series columns should be same"); - assertEquals("country", columns.get(2), "Series columns should be same"); - assertEquals("humidity", columns.get(3), "Series columns should be same"); - assertEquals("rain", columns.get(4), "Series columns should be same"); - - assertEquals(1.50100227485666867E18, values.get(0), "time value should be same"); - assertEquals(city, values.get(1), "city value should be same"); - assertEquals("US", values.get(2), "contry value should be same"); - assertEquals(0.6, values.get(3), "humidity value should be same"); - assertEquals(rain, values.get(4), "rain value should be same"); - } - - @Test - public void testEmptyFlowFileQuery() { - String query = ""; - byte [] bytes = query.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1); - List flowFilesSuccess = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); - assertEquals(0, flowFilesSuccess.size(), "Value should be equal"); - List flowFilesFailure = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE); - assertEquals(1, flowFilesFailure.size(), "Value should be equal"); - assertEquals("FlowFile query is empty and no scheduled query is set", flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be equal"); - assertNull(flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be null"); - } - - @Test - public void testNoFlowFileNoScheduledInfluxDBQuery() { - AssertionError error = assertThrows(AssertionError.class, () -> { - runner.setIncomingConnection(false); - runner.run(1, true, true); - }); - assertEquals( - "Could not invoke methods annotated with @OnScheduled annotation due to: java.lang.reflect.InvocationTargetException", - error.getLocalizedMessage(), "Message should be same"); - } - - @Test - public void testValidTwoPoints() { - String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652" + - System.lineSeparator() + - "water,country=US,city=nyc rain=2,humidity=0.6 1501002274856668652"; - influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); - String query = "select * from water"; - byte [] bytes = query.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null"); - assertEquals(query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal"); - QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); - assertNotNull(queryResult.getResults(), "QueryResult should not be null"); - assertEquals(1, queryResult.getResults().size(), "results array should be same size"); - assertEquals(1, queryResult.getResults().get(0).getSeries().size(), "Series size should be same"); - Series series1 = queryResult.getResults().get(0).getSeries().get(0); - validateSeries(series1.getName(),series1.getColumns(), series1.getValues().get(0),"newark",1.0); - - Series series2 = queryResult.getResults().get(0).getSeries().get(0); - validateSeries(series2.getName(),series2.getColumns(), series2.getValues().get(1),"nyc",2.0); - } - - @Test - public void testMalformedQuery() { - String query = "select * from"; - byte [] bytes = query.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1); - List flowFilesSuccess = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); - assertEquals(0, flowFilesSuccess.size(), "Value should be equal"); - List flowFilesFailure = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE); - assertEquals(1, flowFilesFailure.size(), "Value should be equal"); - assertEquals("{\"error\":\"error parsing query: found EOF, expected identifier at line 1, char 15\"}", - flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE).trim(), "Value should be equal"); - assertEquals(query, flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal"); - } - - @Test - public void testQueryResultHasError() throws Throwable { - ExecuteInfluxDBQuery mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { - @Override - protected List executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) throws InterruptedException{ - List result = super.executeQuery(context, database, query, timeunit, chunkSize); - result.get(0).setError("Test Error"); - return result; - } - - }; - runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery); - initializeRunner(); - - byte [] bytes = "select * from /.*/".getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1); - - List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE); - - assertEquals("Test Error",flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); - } - - @Test - public void testValidSameTwoPoints() { - String message = "water,country=US,city=nyc rain=1,humidity=0.6 1501002274856668652" + - System.lineSeparator() + - "water,country=US,city=nyc rain=1,humidity=0.6 1501002274856668652"; - influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); - String query = "select * from water"; - byte [] bytes = query.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null"); - assertEquals(query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal"); - - QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); - assertNotNull(queryResult.getResults(), "QueryResult should not be null"); - assertEquals(1, queryResult.getResults().size(), "Result size should be same"); - Series series = queryResult.getResults().get(0).getSeries().get(0); - validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"nyc",1.0); - } - - @Test - public void testValidTwoPointsUrlEL() { - runner.setEnvironmentVariableValue("influxDBUrl", "http://localhost:8086"); - runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "${influxDBUrl}"); - testValidTwoPoints(); - } - - @Test - public void testChunkedQuery() { - String message = - "chunkedQueryTest,country=GB value=1 1524938495000000000" + System.lineSeparator() + - "chunkedQueryTest,country=PL value=2 1524938505000000000" + System.lineSeparator() + - "chunkedQueryTest,country=US value=3 1524938505800000000"; - - influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); - - String query = "select * from chunkedQueryTest"; - byte [] bytes = query.getBytes(); - runner.enqueue(bytes); - runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY_CHUNK_SIZE, "2"); - runner.run(1,true,true); - - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null"); - - List queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResultListType); - - assertNotNull(queryResult, "QueryResult array should not be null"); - assertEquals(2, queryResult.size(), "QueryResult array size should be equal 2"); - - assertEquals(2, chunkSize(queryResult.get(0)), "First chunk should have 2 elements"); - assertEquals(1, chunkSize(queryResult.get(1)), "Second chunk should have 1 elements"); - } - - private int chunkSize(QueryResult queryResult) { - return queryResult.getResults().get(0).getSeries().get(0).getValues().size(); - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITPutInfluxDB.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITPutInfluxDB.java deleted file mode 100644 index 9f34beb14d..0000000000 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITPutInfluxDB.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.influxdb; - -import java.util.List; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunners; -import org.influxdb.dto.Query; -import org.influxdb.dto.QueryResult; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -/** - * Integration test for InfluxDB. Please ensure that the InfluxDB is running - * on local host with default port and has database test with table test. Please set user - * and password if applicable before running the integration tests. - */ -public class ITPutInfluxDB extends AbstractITInfluxDB { - - @BeforeEach - public void setUp() throws Exception { - runner = TestRunners.newTestRunner(PutInfluxDB.class); - initializeRunner(); - runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue()); - runner.setProperty(PutInfluxDB.RETENTION_POLICY, DEFAULT_RETENTION_POLICY); - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB"); - runner.assertValid(); - initInfluxDB(); - } - - @Test - public void testValidSinglePoint() { - String message = "water,country=US,city=newark rain=1,humidity=0.6 "; - byte [] bytes = message.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertEquals(null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Value should be equal"); - QueryResult result = influxDB.query(new Query("select * from water", dbName)); - assertEquals(1, result.getResults().iterator().next().getSeries().size(), "size should be same"); - List> values = result.getResults().iterator().next().getSeries().iterator().next().getValues(); - assertEquals(1, values.size(), "size should be same"); - } - - @Test - public void testValidSinglePointWithTime() { - QueryResult result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName)); - assertEquals(null, result.getResults().iterator().next().getSeries(), "Should have no results"); - String message = "water,country=US,city=sf rain=1,humidity=0.6 1501002274856668652"; - byte [] bytes = message.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertEquals(null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Value should be equal"); - result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName)); - assertEquals(1, result.getResults().iterator().next().getSeries().size(), "size should be same"); - List> values = result.getResults().iterator().next().getSeries().iterator().next().getValues(); - assertEquals(1, values.size(), "size should be same"); - } - - @Test - public void testValidSinglePointWithTimeAndUrlExpression() { - runner.setEnvironmentVariableValue("influxDBUrl", "http://localhost:8086"); - runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "${influxDBUrl}"); - QueryResult result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName)); - assertEquals(null, result.getResults().iterator().next().getSeries(), "Should have no results"); - String message = "water,country=US,city=sf rain=1,humidity=0.6 1501002274856668652"; - byte [] bytes = message.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertEquals(null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Value should be equal"); - result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName)); - assertEquals(1, result.getResults().iterator().next().getSeries().size(), "size should be same"); - List> values = result.getResults().iterator().next().getSeries().iterator().next().getValues(); - assertEquals(1, values.size(), "size should be same"); - } - - @Test - public void testValidSinglePointWithUsernameEL() { - runner.setEnvironmentVariableValue("influxdb.username", "admin"); - runner.setProperty(PutInfluxDB.USERNAME, "${influxdb.username}"); - QueryResult result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName)); - assertEquals(null, result.getResults().iterator().next().getSeries(), "Should have no results"); - String message = "water,country=US,city=sf rain=1,humidity=0.6 1501002274856668652"; - byte [] bytes = message.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1); - } - - @Test - public void testValidSinglePointWithPasswordEL() { - runner.setEnvironmentVariableValue("influxdb.password", "admin"); - runner.setProperty(PutInfluxDB.PASSWORD, "${influxdb.password}"); - QueryResult result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName)); - assertEquals(null, result.getResults().iterator().next().getSeries(), "Should have no results"); - String message = "water,country=US,city=sf rain=1,humidity=0.6 1501002274856668652"; - byte [] bytes = message.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1); - } - - @Test - public void testValidTwoPointWithSameMeasurement() { - String message = "water,country=US,city=newark rain=1,humidity=0.6" + System.lineSeparator() - + "water,country=US,city=nyc rain=2,humidity=0.7" + System.lineSeparator(); - byte [] bytes = message.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertEquals(null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Value should be equal"); - QueryResult result = influxDB.query(new Query("select * from water", dbName)); - assertEquals(1, result.getResults().iterator().next().getSeries().size(), "size should be same"); - List> values = result.getResults().iterator().next().getSeries().iterator().next().getValues(); - assertEquals(2, values.size(), "size should be same"); - } - - @Test - public void testValidTwoPointWithSameMeasurementBadFormat() { - String message = "water,country=US,city=newark rain=1,humidity=0.6" + System.lineSeparator() - + "water,country=US,city=nyc,rain=2,humidity=0.7" + System.lineSeparator(); - byte [] bytes = message.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1); - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertEquals("{\"error\":\"partial write: unable to parse 'water,country=US,city=nyc,rain=2,humidity=0.7': missing fields dropped=0\"}\n", - flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Value should be equal"); - QueryResult result = influxDB.query(new Query("select * from water", dbName)); - assertEquals(1, result.getResults().iterator().next().getSeries().size(), "size should be same"); - List> values = result.getResults().iterator().next().getSeries().iterator().next().getValues(); - assertEquals(1, values.size(), "size should be same"); - } - - @Test - public void testValidTwoPointWithDifferentMeasurement() { - String message = "water,country=US,city=newark rain=1,humidity=0.6" + System.lineSeparator() - + "testm,country=US,city=chicago rain=10,humidity=0.9" + System.lineSeparator(); - byte [] bytes = message.getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS); - assertEquals(1, flowFiles.size(), "Value should be equal"); - assertEquals(null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Value should be equal"); - QueryResult result = influxDB.query(new Query("select * from water, testm", dbName)); - assertEquals(2, result.getResults().iterator().next().getSeries().size(), "size should be same"); - List> values = result.getResults().iterator().next().getSeries().iterator().next().getValues(); - assertEquals(1, values.size(), "size should be same"); - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java deleted file mode 100644 index b3c217330e..0000000000 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.influxdb; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.influxdb.InfluxDB; -import org.influxdb.dto.QueryResult; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.net.SocketTimeoutException; -import java.nio.charset.Charset; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - - -public class TestExecutetInfluxDBQuery { - private TestRunner runner; - private ExecuteInfluxDBQuery mockExecuteInfluxDBQuery; - - @BeforeEach - public void setUp() throws Exception { - mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { - @Override - protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) { - return null; - } - - @Override - protected List executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) { - return null; - } - - }; - runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery); - runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test"); - runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "user"); - runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "password"); - runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl"); - runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8"); - runner.assertValid(); - } - - @AfterEach - public void tearDown() throws Exception { - runner = null; - } - - @Test - public void testDefaultValid() { - runner.assertValid(); - } - - @Test - public void testQueryThrowsRuntimeException() { - mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { - @Override - protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) { - return null; - } - - @Override - protected List executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) { - throw new RuntimeException("runtime exception"); - } - - }; - runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery); - runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test"); - runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1"); - runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1"); - runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8"); - runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl"); - runner.assertValid(); - - byte [] bytes = "select * from /.*/".getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1); - - List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE); - - assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"runtime exception"); - } - - @Test - public void testQueryThrowsRuntimeExceptionWithSocketTimeoutException() { - mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { - @Override - protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) { - return null; - } - - @Override - protected List executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) { - throw new RuntimeException("runtime exception", new SocketTimeoutException("timeout")); - } - - }; - runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery); - runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test"); - runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1"); - runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1"); - runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8"); - runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl"); - runner.assertValid(); - - byte [] bytes = "select * from /.*/".getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_RETRY, 1); - - List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_RETRY); - - assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"runtime exception"); - } - - @Test - public void testMakingQueryThrowsIOException() { - mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { - @Override - protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) { - return null; - } - - @Override - protected String getQuery(ProcessSession session, Charset charset, FlowFile incomingFlowFile) - throws IOException { - throw new IOException("Test IOException"); - } - }; - - runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery); - runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test"); - runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1"); - runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1"); - runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8"); - runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl"); - runner.assertValid(); - - byte [] bytes = "select * from /.*/".getBytes(); - runner.enqueue(bytes); - - AssertionError e = assertThrows(AssertionError.class, () -> runner.run(1,true,true)); - assertTrue(e.getCause() instanceof ProcessException); - } - - @Test - public void testMakeConnectionThrowsRuntimeException() { - mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { - @Override - protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) { - throw new RuntimeException("testException"); - } - - }; - runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery); - runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test"); - runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1"); - runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1"); - runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8"); - runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl"); - runner.assertValid(); - - byte [] bytes = "select * from /.*/".getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1); - - List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE); - - assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"Error while getting connection testException"); - } - - @Test - public void testTriggerThrowsException() { - mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { - @Override - protected InfluxDB getInfluxDB(ProcessContext context) { - throw new RuntimeException("testException"); - } - }; - runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery); - runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test"); - runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1"); - runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1"); - runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8"); - runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl"); - runner.assertValid(); - - byte [] bytes = "select * from".getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1); - - List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE); - - assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"testException"); - } - -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestPutInfluxDB.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestPutInfluxDB.java deleted file mode 100644 index e8a9fad7da..0000000000 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestPutInfluxDB.java +++ /dev/null @@ -1,364 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.influxdb; - -import java.io.EOFException; -import java.net.SocketTimeoutException; -import java.util.List; - -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.influxdb.InfluxDB; -import org.influxdb.InfluxDBIOException; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class TestPutInfluxDB { - private TestRunner runner; - private PutInfluxDB mockPutInfluxDB; - - @BeforeEach - public void setUp() throws Exception { - mockPutInfluxDB = new PutInfluxDB() { - @Override - protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) { - return null; - } - - @Override - protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy, - String records) { - } - }; - runner = TestRunners.newTestRunner(mockPutInfluxDB); - runner.setProperty(PutInfluxDB.DB_NAME, "test"); - runner.setProperty(PutInfluxDB.USERNAME, "user"); - runner.setProperty(PutInfluxDB.PASSWORD, "password"); - runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl"); - runner.setProperty(PutInfluxDB.CHARSET, "UTF-8"); - runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue()); - runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen"); - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB"); - runner.assertValid(); - } - - @AfterEach - public void tearDown() throws Exception { - runner = null; - } - - @Test - public void testDefaultValid() { - runner.assertValid(); - } - - @Test - public void testBlankDBUrl() { - runner.setProperty(PutInfluxDB.INFLUX_DB_URL, ""); - runner.assertNotValid(); - } - - @Test - public void testEmptyDBName() { - runner.setProperty(PutInfluxDB.DB_NAME, ""); - runner.assertNotValid(); - } - - @Test - public void testEmptyConnectionTimeout() { - runner.setProperty(PutInfluxDB.INFLUX_DB_CONNECTION_TIMEOUT, ""); - runner.assertNotValid(); - } - - @Test - public void testEmptyUsername() { - runner = TestRunners.newTestRunner(mockPutInfluxDB); - runner.setProperty(PutInfluxDB.DB_NAME, "test"); - runner.setProperty(PutInfluxDB.PASSWORD, "password"); - runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl"); - runner.setProperty(PutInfluxDB.CHARSET, "UTF-8"); - runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue()); - runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen"); - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB"); - runner.assertValid(); - runner.setProperty(PutInfluxDB.USERNAME, ""); - runner.assertNotValid(); - } - - @Test - public void testEmptyPassword() { - runner = TestRunners.newTestRunner(mockPutInfluxDB); - runner.setProperty(PutInfluxDB.DB_NAME, "test"); - runner.setProperty(PutInfluxDB.USERNAME, "username"); - runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl"); - runner.setProperty(PutInfluxDB.CHARSET, "UTF-8"); - runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue()); - runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen"); - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB"); - runner.assertValid(); - runner.setProperty(PutInfluxDB.PASSWORD, ""); - runner.assertNotValid(); - } - - @Test - public void testPasswordEL() { - runner = TestRunners.newTestRunner(mockPutInfluxDB); - runner.setEnvironmentVariableValue("influxdb.password", "password"); - runner.setProperty(PutInfluxDB.PASSWORD, "${influxdb.password}"); - runner.setProperty(PutInfluxDB.DB_NAME, "test"); - runner.setProperty(PutInfluxDB.USERNAME, "username"); - runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl"); - runner.setProperty(PutInfluxDB.CHARSET, "UTF-8"); - runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue()); - runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen"); - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB"); - runner.assertValid(); - } - - @Test - public void testUsernameEL() { - runner = TestRunners.newTestRunner(mockPutInfluxDB); - runner.setEnvironmentVariableValue("influxdb.username", "username"); - runner.setProperty(PutInfluxDB.PASSWORD, "password"); - runner.setProperty(PutInfluxDB.DB_NAME, "test"); - runner.setProperty(PutInfluxDB.USERNAME, "${influxdb.username}"); - runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl"); - runner.setProperty(PutInfluxDB.CHARSET, "UTF-8"); - runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue()); - runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen"); - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB"); - runner.assertValid(); - } - - @Test - public void testCharsetUTF8() { - runner.setProperty(PutInfluxDB.CHARSET, "UTF-8"); - runner.assertValid(); - } - - @Test - public void testEmptyConsistencyLevel() { - runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL, ""); - runner.assertNotValid(); - } - - @Test - public void testCharsetBlank() { - runner.setProperty(PutInfluxDB.CHARSET, ""); - runner.assertNotValid(); - } - @Test - public void testZeroMaxDocumentSize() { - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "0"); - runner.assertNotValid(); - } - - @Test - public void testSizeGreaterThanThresholdUsingEL() { - runner.setEnvironmentVariableValue("max.record.size", "1 B"); - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "${max.record.size}"); - runner.assertValid(); - byte [] bytes = new byte[2]; - for (int i = 0; i < bytes.length; i++) { - bytes[i] = 'a'; - } - runner.enqueue(bytes); - runner.run(1); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_MAX_SIZE_EXCEEDED, 1); - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_MAX_SIZE_EXCEEDED); - assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"Max records size exceeded " + bytes.length); - } - - @Test - public void testSizeGreaterThanThreshold() { - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 B"); - runner.assertValid(); - byte [] bytes = new byte[2]; - for (int i = 0; i < bytes.length; i++) { - bytes[i] = 'a'; - } - runner.enqueue(bytes); - runner.run(1); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_MAX_SIZE_EXCEEDED, 1); - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_MAX_SIZE_EXCEEDED); - assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"Max records size exceeded " + bytes.length); - } - - @Test - public void testValidSingleMeasurement() { - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 MB"); - runner.assertValid(); - byte [] bytes = "test".getBytes(); - - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1); - - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS); - - assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), null); - } - - @Test - public void testWriteThrowsException() { - mockPutInfluxDB = new PutInfluxDB() { - @Override - protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy, - String records) { - throw new RuntimeException("WriteException"); - } - }; - runner = TestRunners.newTestRunner(mockPutInfluxDB); - runner.setProperty(PutInfluxDB.DB_NAME, "test"); - runner.setProperty(PutInfluxDB.USERNAME, "u1"); - runner.setProperty(PutInfluxDB.PASSWORD, "p1"); - runner.setProperty(PutInfluxDB.CHARSET, "UTF-8"); - runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl"); - runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue()); - runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen"); - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB"); - runner.assertValid(); - - byte [] bytes = "test".getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1); - - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE); - - assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"WriteException"); - } - - @Test - public void testWriteThrowsIOException() { - mockPutInfluxDB = new PutInfluxDB() { - @Override - protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy, - String records) { - throw new InfluxDBIOException(new EOFException("EOFException")); - } - }; - runner = TestRunners.newTestRunner(mockPutInfluxDB); - runner.setProperty(PutInfluxDB.DB_NAME, "test"); - runner.setProperty(PutInfluxDB.USERNAME, "u1"); - runner.setProperty(PutInfluxDB.PASSWORD, "p1"); - runner.setProperty(PutInfluxDB.CHARSET, "UTF-8"); - runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl"); - runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue()); - runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen"); - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB"); - runner.assertValid(); - - byte [] bytes = "test".getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1); - - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE); - - assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"java.io.EOFException: EOFException"); - } - - @Test - public void testWriteThrowsSocketTimeoutException() { - mockPutInfluxDB = new PutInfluxDB() { - @Override - protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy, - String records) { - throw new InfluxDBIOException(new SocketTimeoutException("SocketTimeoutException")); - } - }; - runner = TestRunners.newTestRunner(mockPutInfluxDB); - runner.setProperty(PutInfluxDB.DB_NAME, "test"); - runner.setProperty(PutInfluxDB.USERNAME, "u1"); - runner.setProperty(PutInfluxDB.PASSWORD, "p1"); - runner.setProperty(PutInfluxDB.CHARSET, "UTF-8"); - runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl"); - runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue()); - runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen"); - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB"); - runner.assertValid(); - - byte [] bytes = "test".getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_RETRY, 1); - - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_RETRY); - - assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"java.net.SocketTimeoutException: SocketTimeoutException"); - } - - @Test - public void testTriggerThrowsException() { - mockPutInfluxDB = new PutInfluxDB() { - @Override - protected InfluxDB getInfluxDB(ProcessContext context) { - throw new RuntimeException("testException"); - } - }; - runner = TestRunners.newTestRunner(mockPutInfluxDB); - runner.setProperty(PutInfluxDB.DB_NAME, "test"); - runner.setProperty(PutInfluxDB.USERNAME, "u1"); - runner.setProperty(PutInfluxDB.PASSWORD, "p1"); - runner.setProperty(PutInfluxDB.CHARSET, "UTF-8"); - runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl"); - runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue()); - runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen"); - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB"); - runner.assertValid(); - - byte [] bytes = "test".getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1); - - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE); - - assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"testException"); - } - - @Test - public void testValidArrayMeasurement() { - runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 MB"); - runner.assertValid(); - - runner.enqueue("test rain=2\ntest rain=3".getBytes()); - runner.run(1,true,true); - - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS); - assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), null); - } - - @Test - public void testInvalidEmptySingleMeasurement() { - byte [] bytes = "".getBytes(); - runner.enqueue(bytes); - runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1); - - List flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE); - assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Empty measurement size 0"); - } - -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/pom.xml b/nifi-nar-bundles/nifi-influxdb-bundle/pom.xml deleted file mode 100644 index 916c77dd18..0000000000 --- a/nifi-nar-bundles/nifi-influxdb-bundle/pom.xml +++ /dev/null @@ -1,42 +0,0 @@ - - - - 4.0.0 - - - org.apache.nifi - nifi-nar-bundles - 2.0.0-SNAPSHOT - - - nifi-influxdb-bundle - pom - - - nifi-influxdb-processors - nifi-influxdb-nar - - - - - - org.apache.nifi - nifi-influxdb-processors - 2.0.0-SNAPSHOT - - - - diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index e4ea0e746f..c78809b394 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -71,7 +71,6 @@ nifi-snmp-bundle nifi-datadog-bundle nifi-windows-event-log-bundle - nifi-influxdb-bundle nifi-graph-bundle nifi-email-bundle nifi-groovyx-bundle diff --git a/pom.xml b/pom.xml index bf2bce8c73..8ecef83604 100644 --- a/pom.xml +++ b/pom.xml @@ -1167,8 +1167,6 @@ !ITMoveAzureDataLakeStorage, !AzureGraphUserGroupProviderIT, !JMSPublisherConsumerIT#validateMessageRedeliveryWhenNotAcked, - !ITPutInfluxDB, - !ITExecuteInfluxDBQuery, !GremlinClientServiceYamlSettingsAndBytecodeIT, !GremlinClientServiceControllerSettingsIT, !ITestConsumeEmail#validateUrl,