NIFI-12448 Removed InfluxDB Processors

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8093.
This commit is contained in:
exceptionfactory 2023-11-30 15:28:21 -06:00 committed by Pierre Villard
parent 429b521cb9
commit 2ac3988881
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
18 changed files with 0 additions and 2338 deletions

View File

@ -2445,29 +2445,6 @@ style license.
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The binary distribution of this product bundles 'influxdb-java' under an MIT
style license.
Copyright (c) 2014-2017 Stefan Majer
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
This product bundles 'lodash' which is available under an MIT license.
Copyright JS Foundation and other contributors <https://js.foundation/>

View File

@ -1222,11 +1222,6 @@
<artifactId>nifi-iceberg-services-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-influxdb-processors</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-iotdb-processors</artifactId>

View File

@ -1,40 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-influxdb-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-influxdb-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-influxdb-processors</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,231 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.
This product bundles 'influxdb-java' which is available under an MIT license.
Copyright (c) 2014-2017 Stefan Majer
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -1,17 +0,0 @@
nifi-influxdb-nar
Copyright 2017-2020 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
===========================================
Apache Software License v2
===========================================
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2015 The Apache Software Foundation

View File

@ -1,55 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-influxdb-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-influxdb-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.23</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,155 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import okhttp3.OkHttpClient;
import okhttp3.OkHttpClient.Builder;
/**
* Abstract base class for InfluxDB processors
*/
public abstract class AbstractInfluxDBProcessor extends AbstractProcessor {
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("influxdb-charset")
.displayName("Character Set")
.description("Specifies the character set of the document data.")
.required(true)
.defaultValue("UTF-8")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
public static final PropertyDescriptor INFLUX_DB_URL = new PropertyDescriptor.Builder()
.name("influxdb-url")
.displayName("InfluxDB connection URL")
.description("InfluxDB URL to connect to. Eg: http://influxdb:8086")
.defaultValue("http://localhost:8086")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
public static final PropertyDescriptor INFLUX_DB_CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
.name("InfluxDB Max Connection Time Out (seconds)")
.displayName("InfluxDB Max Connection Time Out (seconds)")
.description("The maximum time for establishing connection to the InfluxDB")
.defaultValue("0 seconds")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.sensitive(false)
.build();
public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
.name("influxdb-dbname")
.displayName("Database Name")
.description("InfluxDB database to connect to")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("influxdb-username")
.displayName("Username")
.required(false)
.description("Username for accessing InfluxDB")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("influxdb-password")
.displayName("Password")
.required(false)
.description("Password for user")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor MAX_RECORDS_SIZE = new PropertyDescriptor.Builder()
.name("influxdb-max-records-size")
.displayName("Max size of records")
.description("Maximum size of records allowed to be posted in one batch")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.defaultValue("1 MB")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final String INFLUX_DB_ERROR_MESSAGE = "influxdb.error.message";
protected AtomicReference<InfluxDB> influxDB = new AtomicReference<>();
protected long maxRecordsSize;
/**
* Helper method to create InfluxDB instance
* @return InfluxDB instance
*/
protected synchronized InfluxDB getInfluxDB(ProcessContext context) {
if ( influxDB.get() == null ) {
String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
long connectionTimeout = context.getProperty(INFLUX_DB_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS);
String influxDbUrl = context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue();
try {
influxDB.set(makeConnection(username, password, influxDbUrl, connectionTimeout));
} catch(Exception e) {
getLogger().error("Error while getting connection {}", e.getLocalizedMessage(), e);
throw new RuntimeException("Error while getting connection " + e.getLocalizedMessage(),e);
}
getLogger().info("InfluxDB connection created for host {}",
new Object[] {influxDbUrl});
}
return influxDB.get();
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
Builder builder = new OkHttpClient.Builder().connectTimeout(connectionTimeout, TimeUnit.SECONDS);
if ( StringUtils.isBlank(username) || StringUtils.isBlank(password) ) {
return InfluxDBFactory.connect(influxDbUrl, builder);
} else {
return InfluxDBFactory.connect(influxDbUrl, username, password, builder);
}
}
@OnStopped
public void close() {
if (getLogger().isDebugEnabled()) {
getLogger().info("Closing connection");
}
if ( influxDB.get() != null ) {
influxDB.get().close();
influxDB.set(null);;
}
}
}

View File

@ -1,322 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import com.google.gson.Gson;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@SupportsBatching
@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"})
@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).")
@WritesAttributes({
@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"),
@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"),
})
public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query";
private static final int DEFAULT_INFLUX_RESPONSE_CHUNK_SIZE = 0;
public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder()
.name("influxdb-query-result-time-unit")
.displayName("Query Result Time Units")
.description("The time unit of query results from the InfluxDB")
.defaultValue(TimeUnit.NANOSECONDS.name())
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet()))
.sensitive(false)
.build();
public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder()
.name("influxdb-query")
.displayName("InfluxDB Query")
.description("The InfluxDB query to execute. "
+ "Note: If there are incoming connections, then the query is created from incoming FlowFile's content otherwise"
+ " it is created from this property.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final Integer MAX_CHUNK_SIZE = 10000;
public static final PropertyDescriptor INFLUX_DB_QUERY_CHUNK_SIZE = new PropertyDescriptor.Builder()
.name("influxdb-query-chunk-size")
.displayName("Results chunk size")
.description("Chunking can be used to return results in a stream of smaller batches "
+ "(each has a partial results up to a chunk size) rather than as a single response. "
+ "Chunking queries can return an unlimited number of rows. Note: Chunking is enable when result chunk size is greater than 0")
.defaultValue(String.valueOf(DEFAULT_INFLUX_RESPONSE_CHUNK_SIZE))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.createLongValidator(0, MAX_CHUNK_SIZE, true))
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successful InfluxDB queries are routed to this relationship").build();
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Falied InfluxDB queries are routed to this relationship").build();
static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
.description("Failed queries that are retryable exception are routed to this relationship").build();
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
protected Gson gson = new Gson();
static {
final Set<Relationship> tempRelationships = new HashSet<>();
tempRelationships.add(REL_SUCCESS);
tempRelationships.add(REL_FAILURE);
tempRelationships.add(REL_RETRY);
relationships = Collections.unmodifiableSet(tempRelationships);
final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
tempDescriptors.add(DB_NAME);
tempDescriptors.add(INFLUX_DB_URL);
tempDescriptors.add(INFLUX_DB_CONNECTION_TIMEOUT);
tempDescriptors.add(INFLUX_DB_QUERY_RESULT_TIMEUNIT);
tempDescriptors.add(INFLUX_DB_QUERY);
tempDescriptors.add(INFLUX_DB_QUERY_CHUNK_SIZE);
tempDescriptors.add(USERNAME);
tempDescriptors.add(PASSWORD);
tempDescriptors.add(CHARSET);
propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
// Either input connection or scheduled query is required
if ( ! context.getProperty(INFLUX_DB_QUERY).isSet()
&& ! context.hasIncomingConnection() ) {
String error = "The InfluxDB Query processor requires input connection or scheduled InfluxDB query";
getLogger().error(error);
throw new ProcessException(error);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
String query = null;
String database = null;
TimeUnit queryResultTimeunit = null;
Charset charset = null;
FlowFile outgoingFlowFile = null;
// If there are incoming connections, prepare query params from flow file
if ( context.hasIncomingConnection() ) {
FlowFile incomingFlowFile = session.get();
if ( incomingFlowFile == null && context.hasNonLoopConnection() ) {
return;
}
charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(incomingFlowFile).getValue());
if ( incomingFlowFile.getSize() == 0 ) {
if ( context.getProperty(INFLUX_DB_QUERY).isSet() ) {
query = context.getProperty(INFLUX_DB_QUERY).evaluateAttributeExpressions(incomingFlowFile).getValue();
} else {
String message = "FlowFile query is empty and no scheduled query is set";
getLogger().error(message);
incomingFlowFile = session.putAttribute(incomingFlowFile, INFLUX_DB_ERROR_MESSAGE, message);
session.transfer(incomingFlowFile, REL_FAILURE);
return;
}
} else {
try {
query = getQuery(session, charset, incomingFlowFile);
} catch(IOException ioe) {
getLogger().error("Exception while reading from FlowFile " + ioe.getLocalizedMessage(), ioe);
throw new ProcessException(ioe);
}
}
outgoingFlowFile = incomingFlowFile;
} else {
outgoingFlowFile = session.create();
charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(outgoingFlowFile).getValue());
query = context.getProperty(INFLUX_DB_QUERY).evaluateAttributeExpressions(outgoingFlowFile).getValue();
}
database = context.getProperty(DB_NAME).evaluateAttributeExpressions(outgoingFlowFile).getValue();
queryResultTimeunit = TimeUnit.valueOf(context.getProperty(INFLUX_DB_QUERY_RESULT_TIMEUNIT).evaluateAttributeExpressions(outgoingFlowFile).getValue());
try {
long startTimeMillis = System.currentTimeMillis();
int chunkSize = context.getProperty(INFLUX_DB_QUERY_CHUNK_SIZE).evaluateAttributeExpressions(outgoingFlowFile).asInteger();
List<QueryResult> result = executeQuery(context, database, query, queryResultTimeunit, chunkSize);
String json = result.size() == 1 ? gson.toJson(result.get(0)) : gson.toJson(result);
if ( getLogger().isDebugEnabled() ) {
getLogger().debug("Query result {} ", new Object[] {result});
}
ByteArrayInputStream bais = new ByteArrayInputStream(json.getBytes(charset));
session.importFrom(bais, outgoingFlowFile);
bais.close();
final long endTimeMillis = System.currentTimeMillis();
if ( ! hasErrors(result) ) {
outgoingFlowFile = session.putAttribute(outgoingFlowFile, INFLUX_DB_EXECUTED_QUERY, String.valueOf(query));
session.getProvenanceReporter().send(outgoingFlowFile, makeProvenanceUrl(context, database),
(endTimeMillis - startTimeMillis));
session.transfer(outgoingFlowFile, REL_SUCCESS);
} else {
outgoingFlowFile = populateErrorAttributes(session, outgoingFlowFile, query, queryErrors(result));
session.transfer(outgoingFlowFile, REL_FAILURE);
}
} catch (Exception exception) {
outgoingFlowFile = populateErrorAttributes(session, outgoingFlowFile, query, exception.getMessage());
if ( exception.getCause() instanceof SocketTimeoutException ) {
getLogger().error("Failed to read from InfluxDB due SocketTimeoutException to {} and retrying",
exception.getCause().getLocalizedMessage(), exception.getCause());
session.transfer(outgoingFlowFile, REL_RETRY);
} else {
getLogger().error("Failed to read from InfluxDB due to {}",
exception.getLocalizedMessage(), exception);
session.transfer(outgoingFlowFile, REL_FAILURE);
}
context.yield();
}
}
protected String getQuery(final ProcessSession session, Charset charset, FlowFile incomingFlowFile)
throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(incomingFlowFile, baos);
baos.close();
return new String(baos.toByteArray(), charset);
}
protected String makeProvenanceUrl(final ProcessContext context, String database) {
return new StringBuilder("influxdb://")
.append(context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue()).append("/")
.append(database).toString();
}
protected List<QueryResult> executeQuery(final ProcessContext context, String database, String query, TimeUnit timeunit,
int chunkSize) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
InfluxDB influx = getInfluxDB(context);
Query influxQuery = new Query(query, database);
if (chunkSize > 0) {
List<QueryResult> results = new LinkedList<>();
influx.query(influxQuery, chunkSize, result -> {
if (isQueryDone(result.getError())) {
latch.countDown();
} else {
results.add(result);
}
});
latch.await();
return results;
} else {
return Collections.singletonList(influx.query(influxQuery, timeunit));
}
}
private boolean isQueryDone(String error) {
return error != null && error.equals("DONE");
}
protected FlowFile populateErrorAttributes(final ProcessSession session, FlowFile flowFile, String query,
String message) {
Map<String,String> attributes = new HashMap<>();
attributes.put(INFLUX_DB_ERROR_MESSAGE, String.valueOf(message));
attributes.put(INFLUX_DB_EXECUTED_QUERY, String.valueOf(query));
flowFile = session.putAllAttributes(flowFile, attributes);
return flowFile;
}
private Boolean hasErrors(List<QueryResult> results) {
for (QueryResult result: results) {
if (result.hasError()) {
return true;
}
}
return false;
}
private String queryErrors(List<QueryResult> results) {
return results.stream()
.filter(QueryResult::hasError)
.map(QueryResult::getError)
.collect(Collectors.joining("\n"));
}
@OnStopped
public void close() {
super.close();
}
}

View File

@ -1,206 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBIOException;
import java.io.ByteArrayOutputStream;
import java.net.SocketTimeoutException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
@Tags({"influxdb", "measurement","insert", "write", "put", "timeseries"})
@CapabilityDescription("Processor to write the content of a FlowFile in 'line protocol'. Please check details of the 'line protocol' in InfluxDB documentation (https://www.influxdb.com/). "
+ " The flow file can contain single measurement point or multiple measurement points separated by line seperator. The timestamp (last field) should be in nano-seconds resolution.")
@WritesAttributes({
@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"),
})
public class PutInfluxDB extends AbstractInfluxDBProcessor {
public static AllowableValue CONSISTENCY_LEVEL_ALL = new AllowableValue("ALL", "All", "Return success when all nodes have responded with write success");
public static AllowableValue CONSISTENCY_LEVEL_ANY = new AllowableValue("ANY", "Any", "Return success when any nodes have responded with write success");
public static AllowableValue CONSISTENCY_LEVEL_ONE = new AllowableValue("ONE", "One", "Return success when one node has responded with write success");
public static AllowableValue CONSISTENCY_LEVEL_QUORUM = new AllowableValue("QUORUM", "Quorum", "Return success when a majority of nodes have responded with write success");
public static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder()
.name("influxdb-consistency-level")
.displayName("Consistency Level")
.description("InfluxDB consistency level")
.required(true)
.defaultValue(CONSISTENCY_LEVEL_ONE.getValue())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.allowableValues(CONSISTENCY_LEVEL_ONE, CONSISTENCY_LEVEL_ANY, CONSISTENCY_LEVEL_ALL, CONSISTENCY_LEVEL_QUORUM)
.build();
public static final PropertyDescriptor RETENTION_POLICY = new PropertyDescriptor.Builder()
.name("influxdb-retention-policy")
.displayName("Retention Policy")
.description("Retention policy for the saving the records")
.defaultValue("autogen")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successful FlowFiles that are saved to InfluxDB are routed to this relationship").build();
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("FlowFiles were not saved to InfluxDB are routed to this relationship").build();
static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
.description("FlowFiles were not saved to InfluxDB due to retryable exception are routed to this relationship").build();
static final Relationship REL_MAX_SIZE_EXCEEDED = new Relationship.Builder().name("failure-max-size")
.description("FlowFiles exceeding max records size are routed to this relationship").build();
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
static {
final Set<Relationship> tempRelationships = new HashSet<>();
tempRelationships.add(REL_SUCCESS);
tempRelationships.add(REL_FAILURE);
tempRelationships.add(REL_RETRY);
tempRelationships.add(REL_MAX_SIZE_EXCEEDED);
relationships = Collections.unmodifiableSet(tempRelationships);
final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
tempDescriptors.add(DB_NAME);
tempDescriptors.add(INFLUX_DB_URL);
tempDescriptors.add(INFLUX_DB_CONNECTION_TIMEOUT);
tempDescriptors.add(USERNAME);
tempDescriptors.add(PASSWORD);
tempDescriptors.add(CHARSET);
tempDescriptors.add(CONSISTENCY_LEVEL);
tempDescriptors.add(RETENTION_POLICY);
tempDescriptors.add(MAX_RECORDS_SIZE);
propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
maxRecordsSize = context.getProperty(MAX_RECORDS_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
if ( flowFile.getSize() == 0) {
getLogger().error("Empty measurements");
flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, "Empty measurement size " + flowFile.getSize());
session.transfer(flowFile, REL_FAILURE);
return;
}
if ( flowFile.getSize() > maxRecordsSize) {
getLogger().error("Message size of records exceeded {} max allowed is {}", new Object[] { flowFile.getSize(), maxRecordsSize});
flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, "Max records size exceeded " + flowFile.getSize());
session.transfer(flowFile, REL_MAX_SIZE_EXCEEDED);
return;
}
Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
String consistencyLevel = context.getProperty(CONSISTENCY_LEVEL).evaluateAttributeExpressions(flowFile).getValue();
String database = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
String retentionPolicy = context.getProperty(RETENTION_POLICY).evaluateAttributeExpressions(flowFile).getValue();
try {
long startTimeMillis = System.currentTimeMillis();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos);
String records = new String(baos.toByteArray(), charset);
writeToInfluxDB(context, consistencyLevel, database, retentionPolicy, records);
final long endTimeMillis = System.currentTimeMillis();
getLogger().debug("Records {} inserted", new Object[] {records});
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile,
new StringBuilder("influxdb://").append(context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue()).append("/").append(database).toString(),
(endTimeMillis - startTimeMillis));
} catch (InfluxDBIOException exception) {
flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, String.valueOf(exception.getMessage()));
if ( exception.getCause() instanceof SocketTimeoutException ) {
getLogger().error("Failed to insert into influxDB due SocketTimeoutException to {} and retrying",
exception.getLocalizedMessage(), exception);
session.transfer(flowFile, REL_RETRY);
} else {
getLogger().error("Failed to insert into influxDB due to {}",
exception.getLocalizedMessage(), exception);
session.transfer(flowFile, REL_FAILURE);
}
context.yield();
} catch (Exception exception) {
getLogger().error("Failed to insert into influxDB due to {}",
exception.getLocalizedMessage(), exception);
flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, String.valueOf(exception.getMessage()));
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy, String records) {
getInfluxDB(context).write(database, retentionPolicy, InfluxDB.ConsistencyLevel.valueOf(consistencyLevel), records);
}
@OnStopped
public void close() {
super.close();
}
}

View File

@ -1,16 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.influxdb.PutInfluxDB
org.apache.nifi.processors.influxdb.ExecuteInfluxDBQuery

View File

@ -1,92 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import com.google.gson.reflect.TypeToken;
import org.apache.nifi.util.TestRunner;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.junit.jupiter.api.AfterEach;
import java.lang.reflect.Type;
import java.util.List;
/**
* Base integration test class for InfluxDB processors
*/
public class AbstractITInfluxDB {
protected TestRunner runner;
protected InfluxDB influxDB;
protected String dbName = "test";
protected String dbUrl = "http://localhost:8086";
protected String user = "admin";
protected String password = "admin";
protected static final String DEFAULT_RETENTION_POLICY = "autogen";
protected Type QueryResultListType = new TypeToken<List<QueryResult>>(){}.getType();
protected void initInfluxDB() throws Exception {
influxDB = InfluxDBFactory.connect(dbUrl,user,password);
influxDB.createDatabase(dbName);
int max = 10;
while (!influxDB.databaseExists(dbName) && (max-- < 0)) {
Thread.sleep(5);
}
if ( ! influxDB.databaseExists(dbName) ) {
throw new Exception("unable to create database " + dbName);
}
}
protected void cleanUpDatabase() throws InterruptedException {
if ( influxDB.databaseExists(dbName) ) {
QueryResult result = influxDB.query(new Query("DROP measurement water", dbName));
checkError(result);
result = influxDB.query(new Query("DROP measurement testm", dbName));
checkError(result);
result = influxDB.query(new Query("DROP measurement chunkedQueryTest", dbName));
checkError(result);
result = influxDB.query(new Query("DROP database " + dbName, dbName));
Thread.sleep(1000);
}
}
protected void checkError(QueryResult result) {
if ( result.hasError() ) {
throw new IllegalStateException("Error while dropping measurements " + result.getError());
}
}
@AfterEach
public void tearDown() throws Exception {
runner = null;
if ( influxDB != null ) {
cleanUpDatabase();
influxDB.close();
}
}
protected void initializeRunner() {
runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, dbName);
runner.setProperty(ExecuteInfluxDBQuery.USERNAME, user);
runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, password);
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, dbUrl);
runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8");
runner.assertValid();
}
}

View File

@ -1,362 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import com.google.gson.Gson;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.influxdb.InfluxDB;
import org.influxdb.dto.QueryResult;
import org.influxdb.dto.QueryResult.Series;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.StringReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Integration test for executing InfluxDB queries. Please ensure that the InfluxDB is running
* on local host with default port and has database test with table test. Please set user
* and password if applicable before running the integration tests.
*/
public class ITExecuteInfluxDBQuery extends AbstractITInfluxDB {
protected Gson gson = new Gson();
@BeforeEach
public void setUp() throws Exception {
initInfluxDB();
runner = TestRunners.newTestRunner(ExecuteInfluxDBQuery.class);
initializeRunner();
runner.setEnvironmentVariableValue("influxDBUrl", "http://localhost:8086");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "${influxDBUrl}");
}
@Test
public void testValidScheduleQueryWithNoIncoming() {
String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652";
influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message);
String query = "select * from water";
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query);
runner.setIncomingConnection(false);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null");
assertEquals(query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal");
QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class);
Series series = queryResult.getResults().get(0).getSeries().get(0);
validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0);
}
@Test
public void testValidSinglePoint() {
String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652";
influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message);
String query = "select * from water";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null");
assertEquals(query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal");
QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class);
Series series = queryResult.getResults().get(0).getSeries().get(0);
validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0);
}
@Test
public void testShowDatabases() {
String query = "show databases";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null");
assertEquals(query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal");
String result = new String(flowFiles.get(0).toByteArray());
QueryResult queryResult = gson.fromJson(new StringReader(result), QueryResult.class);
Series series = queryResult.getResults().get(0).getSeries().get(0);
assertEquals("databases", series.getName(), "series name should be same");
assertEquals("name", series.getColumns().get(0), "series column should be same");
boolean internal = series.getValues().get(0).stream().anyMatch(o -> o.equals("_internal"));
assertTrue(internal, "content should contain _internal " + queryResult);
boolean test = series.getValues().stream().flatMap(i -> ((List<Object>)i).stream()).anyMatch(o -> o.equals("test"));
assertTrue(test, "content should contain test " + queryResult);
}
@Test
public void testCreateDB() {
String query = "create database test1";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null");
assertEquals(query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal");
QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class);
assertNotNull(queryResult.getResults(), "QueryResult should not be null");
assertEquals(1, queryResult.getResults().size(), "results array should be same size");
assertNull(queryResult.getResults().get(0).getSeries(), "No series");
}
@Test
public void testEmptyFlowFileQueryWithScheduledQuery() {
String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652";
influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message);
String query = "select * from water";
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query);
byte [] bytes = new byte [] {};
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertEquals(null, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be equal");
assertEquals(query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal");
QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class);
assertNotNull(queryResult.getResults(), "QueryResult should not be null");
assertEquals(1, queryResult.getResults().size(), "results array should be same size");
Series series = queryResult.getResults().get(0).getSeries().get(0);
validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0);
}
@Test
public void testEmptyFlowFileQueryWithScheduledQueryEL() {
String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652";
influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message);
String query = "select * from ${measurement}";
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query);
byte [] bytes = new byte [] {};
Map<String,String> properties = new HashMap<>();
properties.put("measurement","water");
runner.enqueue(bytes, properties);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null");
assertEquals(query.replace("${measurement}", "water"), flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal");
QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class);
assertNotNull(queryResult.getResults(), "QueryResult should not be null");
assertEquals(1, queryResult.getResults().size(), "results array should be same size");
Series series = queryResult.getResults().get(0).getSeries().get(0);
validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0);
}
protected void validateSeries(String name, List<String> columns, List<Object> values, String city, double rain) {
assertEquals("water", name, "Series name should be same");
assertEquals("time", columns.get(0), "Series columns should be same");
assertEquals("city", columns.get(1), "Series columns should be same");
assertEquals("country", columns.get(2), "Series columns should be same");
assertEquals("humidity", columns.get(3), "Series columns should be same");
assertEquals("rain", columns.get(4), "Series columns should be same");
assertEquals(1.50100227485666867E18, values.get(0), "time value should be same");
assertEquals(city, values.get(1), "city value should be same");
assertEquals("US", values.get(2), "contry value should be same");
assertEquals(0.6, values.get(3), "humidity value should be same");
assertEquals(rain, values.get(4), "rain value should be same");
}
@Test
public void testEmptyFlowFileQuery() {
String query = "";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1);
List<MockFlowFile> flowFilesSuccess = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals(0, flowFilesSuccess.size(), "Value should be equal");
List<MockFlowFile> flowFilesFailure = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE);
assertEquals(1, flowFilesFailure.size(), "Value should be equal");
assertEquals("FlowFile query is empty and no scheduled query is set", flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be equal");
assertNull(flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be null");
}
@Test
public void testNoFlowFileNoScheduledInfluxDBQuery() {
AssertionError error = assertThrows(AssertionError.class, () -> {
runner.setIncomingConnection(false);
runner.run(1, true, true);
});
assertEquals(
"Could not invoke methods annotated with @OnScheduled annotation due to: java.lang.reflect.InvocationTargetException",
error.getLocalizedMessage(), "Message should be same");
}
@Test
public void testValidTwoPoints() {
String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652" +
System.lineSeparator() +
"water,country=US,city=nyc rain=2,humidity=0.6 1501002274856668652";
influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message);
String query = "select * from water";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null");
assertEquals(query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal");
QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class);
assertNotNull(queryResult.getResults(), "QueryResult should not be null");
assertEquals(1, queryResult.getResults().size(), "results array should be same size");
assertEquals(1, queryResult.getResults().get(0).getSeries().size(), "Series size should be same");
Series series1 = queryResult.getResults().get(0).getSeries().get(0);
validateSeries(series1.getName(),series1.getColumns(), series1.getValues().get(0),"newark",1.0);
Series series2 = queryResult.getResults().get(0).getSeries().get(0);
validateSeries(series2.getName(),series2.getColumns(), series2.getValues().get(1),"nyc",2.0);
}
@Test
public void testMalformedQuery() {
String query = "select * from";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1);
List<MockFlowFile> flowFilesSuccess = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals(0, flowFilesSuccess.size(), "Value should be equal");
List<MockFlowFile> flowFilesFailure = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE);
assertEquals(1, flowFilesFailure.size(), "Value should be equal");
assertEquals("{\"error\":\"error parsing query: found EOF, expected identifier at line 1, char 15\"}",
flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE).trim(), "Value should be equal");
assertEquals(query, flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal");
}
@Test
public void testQueryResultHasError() throws Throwable {
ExecuteInfluxDBQuery mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected List<QueryResult> executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) throws InterruptedException{
List<QueryResult> result = super.executeQuery(context, database, query, timeunit, chunkSize);
result.get(0).setError("Test Error");
return result;
}
};
runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery);
initializeRunner();
byte [] bytes = "select * from /.*/".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE);
assertEquals("Test Error",flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
}
@Test
public void testValidSameTwoPoints() {
String message = "water,country=US,city=nyc rain=1,humidity=0.6 1501002274856668652" +
System.lineSeparator() +
"water,country=US,city=nyc rain=1,humidity=0.6 1501002274856668652";
influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message);
String query = "select * from water";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null");
assertEquals(query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY), "Value should be equal");
QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class);
assertNotNull(queryResult.getResults(), "QueryResult should not be null");
assertEquals(1, queryResult.getResults().size(), "Result size should be same");
Series series = queryResult.getResults().get(0).getSeries().get(0);
validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"nyc",1.0);
}
@Test
public void testValidTwoPointsUrlEL() {
runner.setEnvironmentVariableValue("influxDBUrl", "http://localhost:8086");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "${influxDBUrl}");
testValidTwoPoints();
}
@Test
public void testChunkedQuery() {
String message =
"chunkedQueryTest,country=GB value=1 1524938495000000000" + System.lineSeparator() +
"chunkedQueryTest,country=PL value=2 1524938505000000000" + System.lineSeparator() +
"chunkedQueryTest,country=US value=3 1524938505800000000";
influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message);
String query = "select * from chunkedQueryTest";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY_CHUNK_SIZE, "2");
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertNull(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE), "Value should be null");
List<QueryResult> queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResultListType);
assertNotNull(queryResult, "QueryResult array should not be null");
assertEquals(2, queryResult.size(), "QueryResult array size should be equal 2");
assertEquals(2, chunkSize(queryResult.get(0)), "First chunk should have 2 elements");
assertEquals(1, chunkSize(queryResult.get(1)), "Second chunk should have 1 elements");
}
private int chunkSize(QueryResult queryResult) {
return queryResult.getResults().get(0).getSeries().get(0).getValues().size();
}
}

View File

@ -1,178 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import java.util.List;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Integration test for InfluxDB. Please ensure that the InfluxDB is running
* on local host with default port and has database test with table test. Please set user
* and password if applicable before running the integration tests.
*/
public class ITPutInfluxDB extends AbstractITInfluxDB {
@BeforeEach
public void setUp() throws Exception {
runner = TestRunners.newTestRunner(PutInfluxDB.class);
initializeRunner();
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY, DEFAULT_RETENTION_POLICY);
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
initInfluxDB();
}
@Test
public void testValidSinglePoint() {
String message = "water,country=US,city=newark rain=1,humidity=0.6 ";
byte [] bytes = message.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertEquals(null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Value should be equal");
QueryResult result = influxDB.query(new Query("select * from water", dbName));
assertEquals(1, result.getResults().iterator().next().getSeries().size(), "size should be same");
List<List<Object>> values = result.getResults().iterator().next().getSeries().iterator().next().getValues();
assertEquals(1, values.size(), "size should be same");
}
@Test
public void testValidSinglePointWithTime() {
QueryResult result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName));
assertEquals(null, result.getResults().iterator().next().getSeries(), "Should have no results");
String message = "water,country=US,city=sf rain=1,humidity=0.6 1501002274856668652";
byte [] bytes = message.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertEquals(null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Value should be equal");
result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName));
assertEquals(1, result.getResults().iterator().next().getSeries().size(), "size should be same");
List<List<Object>> values = result.getResults().iterator().next().getSeries().iterator().next().getValues();
assertEquals(1, values.size(), "size should be same");
}
@Test
public void testValidSinglePointWithTimeAndUrlExpression() {
runner.setEnvironmentVariableValue("influxDBUrl", "http://localhost:8086");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "${influxDBUrl}");
QueryResult result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName));
assertEquals(null, result.getResults().iterator().next().getSeries(), "Should have no results");
String message = "water,country=US,city=sf rain=1,humidity=0.6 1501002274856668652";
byte [] bytes = message.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertEquals(null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Value should be equal");
result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName));
assertEquals(1, result.getResults().iterator().next().getSeries().size(), "size should be same");
List<List<Object>> values = result.getResults().iterator().next().getSeries().iterator().next().getValues();
assertEquals(1, values.size(), "size should be same");
}
@Test
public void testValidSinglePointWithUsernameEL() {
runner.setEnvironmentVariableValue("influxdb.username", "admin");
runner.setProperty(PutInfluxDB.USERNAME, "${influxdb.username}");
QueryResult result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName));
assertEquals(null, result.getResults().iterator().next().getSeries(), "Should have no results");
String message = "water,country=US,city=sf rain=1,humidity=0.6 1501002274856668652";
byte [] bytes = message.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
}
@Test
public void testValidSinglePointWithPasswordEL() {
runner.setEnvironmentVariableValue("influxdb.password", "admin");
runner.setProperty(PutInfluxDB.PASSWORD, "${influxdb.password}");
QueryResult result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName));
assertEquals(null, result.getResults().iterator().next().getSeries(), "Should have no results");
String message = "water,country=US,city=sf rain=1,humidity=0.6 1501002274856668652";
byte [] bytes = message.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
}
@Test
public void testValidTwoPointWithSameMeasurement() {
String message = "water,country=US,city=newark rain=1,humidity=0.6" + System.lineSeparator()
+ "water,country=US,city=nyc rain=2,humidity=0.7" + System.lineSeparator();
byte [] bytes = message.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertEquals(null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Value should be equal");
QueryResult result = influxDB.query(new Query("select * from water", dbName));
assertEquals(1, result.getResults().iterator().next().getSeries().size(), "size should be same");
List<List<Object>> values = result.getResults().iterator().next().getSeries().iterator().next().getValues();
assertEquals(2, values.size(), "size should be same");
}
@Test
public void testValidTwoPointWithSameMeasurementBadFormat() {
String message = "water,country=US,city=newark rain=1,humidity=0.6" + System.lineSeparator()
+ "water,country=US,city=nyc,rain=2,humidity=0.7" + System.lineSeparator();
byte [] bytes = message.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertEquals("{\"error\":\"partial write: unable to parse 'water,country=US,city=nyc,rain=2,humidity=0.7': missing fields dropped=0\"}\n",
flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Value should be equal");
QueryResult result = influxDB.query(new Query("select * from water", dbName));
assertEquals(1, result.getResults().iterator().next().getSeries().size(), "size should be same");
List<List<Object>> values = result.getResults().iterator().next().getSeries().iterator().next().getValues();
assertEquals(1, values.size(), "size should be same");
}
@Test
public void testValidTwoPointWithDifferentMeasurement() {
String message = "water,country=US,city=newark rain=1,humidity=0.6" + System.lineSeparator()
+ "testm,country=US,city=chicago rain=10,humidity=0.9" + System.lineSeparator();
byte [] bytes = message.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
assertEquals(1, flowFiles.size(), "Value should be equal");
assertEquals(null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Value should be equal");
QueryResult result = influxDB.query(new Query("select * from water, testm", dbName));
assertEquals(2, result.getResults().iterator().next().getSeries().size(), "size should be same");
List<List<Object>> values = result.getResults().iterator().next().getSeries().iterator().next().getValues();
assertEquals(1, values.size(), "size should be same");
}
}

View File

@ -1,227 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.influxdb.InfluxDB;
import org.influxdb.dto.QueryResult;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestExecutetInfluxDBQuery {
private TestRunner runner;
private ExecuteInfluxDBQuery mockExecuteInfluxDBQuery;
@BeforeEach
public void setUp() throws Exception {
mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
return null;
}
@Override
protected List<QueryResult> executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) {
return null;
}
};
runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery);
runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test");
runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "user");
runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "password");
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8");
runner.assertValid();
}
@AfterEach
public void tearDown() throws Exception {
runner = null;
}
@Test
public void testDefaultValid() {
runner.assertValid();
}
@Test
public void testQueryThrowsRuntimeException() {
mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
return null;
}
@Override
protected List<QueryResult> executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) {
throw new RuntimeException("runtime exception");
}
};
runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery);
runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test");
runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1");
runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1");
runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8");
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl");
runner.assertValid();
byte [] bytes = "select * from /.*/".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE);
assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"runtime exception");
}
@Test
public void testQueryThrowsRuntimeExceptionWithSocketTimeoutException() {
mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
return null;
}
@Override
protected List<QueryResult> executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) {
throw new RuntimeException("runtime exception", new SocketTimeoutException("timeout"));
}
};
runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery);
runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test");
runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1");
runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1");
runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8");
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl");
runner.assertValid();
byte [] bytes = "select * from /.*/".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_RETRY, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_RETRY);
assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"runtime exception");
}
@Test
public void testMakingQueryThrowsIOException() {
mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
return null;
}
@Override
protected String getQuery(ProcessSession session, Charset charset, FlowFile incomingFlowFile)
throws IOException {
throw new IOException("Test IOException");
}
};
runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery);
runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test");
runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1");
runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1");
runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8");
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl");
runner.assertValid();
byte [] bytes = "select * from /.*/".getBytes();
runner.enqueue(bytes);
AssertionError e = assertThrows(AssertionError.class, () -> runner.run(1,true,true));
assertTrue(e.getCause() instanceof ProcessException);
}
@Test
public void testMakeConnectionThrowsRuntimeException() {
mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
throw new RuntimeException("testException");
}
};
runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery);
runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test");
runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1");
runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1");
runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8");
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl");
runner.assertValid();
byte [] bytes = "select * from /.*/".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE);
assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"Error while getting connection testException");
}
@Test
public void testTriggerThrowsException() {
mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected InfluxDB getInfluxDB(ProcessContext context) {
throw new RuntimeException("testException");
}
};
runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery);
runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test");
runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1");
runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1");
runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8");
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl");
runner.assertValid();
byte [] bytes = "select * from".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE);
assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"testException");
}
}

View File

@ -1,364 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import java.io.EOFException;
import java.net.SocketTimeoutException;
import java.util.List;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBIOException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestPutInfluxDB {
private TestRunner runner;
private PutInfluxDB mockPutInfluxDB;
@BeforeEach
public void setUp() throws Exception {
mockPutInfluxDB = new PutInfluxDB() {
@Override
protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
return null;
}
@Override
protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy,
String records) {
}
};
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "user");
runner.setProperty(PutInfluxDB.PASSWORD, "password");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
}
@AfterEach
public void tearDown() throws Exception {
runner = null;
}
@Test
public void testDefaultValid() {
runner.assertValid();
}
@Test
public void testBlankDBUrl() {
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "");
runner.assertNotValid();
}
@Test
public void testEmptyDBName() {
runner.setProperty(PutInfluxDB.DB_NAME, "");
runner.assertNotValid();
}
@Test
public void testEmptyConnectionTimeout() {
runner.setProperty(PutInfluxDB.INFLUX_DB_CONNECTION_TIMEOUT, "");
runner.assertNotValid();
}
@Test
public void testEmptyUsername() {
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.PASSWORD, "password");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
runner.setProperty(PutInfluxDB.USERNAME, "");
runner.assertNotValid();
}
@Test
public void testEmptyPassword() {
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "username");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
runner.setProperty(PutInfluxDB.PASSWORD, "");
runner.assertNotValid();
}
@Test
public void testPasswordEL() {
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setEnvironmentVariableValue("influxdb.password", "password");
runner.setProperty(PutInfluxDB.PASSWORD, "${influxdb.password}");
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "username");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
}
@Test
public void testUsernameEL() {
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setEnvironmentVariableValue("influxdb.username", "username");
runner.setProperty(PutInfluxDB.PASSWORD, "password");
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "${influxdb.username}");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
}
@Test
public void testCharsetUTF8() {
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.assertValid();
}
@Test
public void testEmptyConsistencyLevel() {
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL, "");
runner.assertNotValid();
}
@Test
public void testCharsetBlank() {
runner.setProperty(PutInfluxDB.CHARSET, "");
runner.assertNotValid();
}
@Test
public void testZeroMaxDocumentSize() {
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "0");
runner.assertNotValid();
}
@Test
public void testSizeGreaterThanThresholdUsingEL() {
runner.setEnvironmentVariableValue("max.record.size", "1 B");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "${max.record.size}");
runner.assertValid();
byte [] bytes = new byte[2];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue(bytes);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_MAX_SIZE_EXCEEDED, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_MAX_SIZE_EXCEEDED);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"Max records size exceeded " + bytes.length);
}
@Test
public void testSizeGreaterThanThreshold() {
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 B");
runner.assertValid();
byte [] bytes = new byte[2];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 'a';
}
runner.enqueue(bytes);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_MAX_SIZE_EXCEEDED, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_MAX_SIZE_EXCEEDED);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"Max records size exceeded " + bytes.length);
}
@Test
public void testValidSingleMeasurement() {
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 MB");
runner.assertValid();
byte [] bytes = "test".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), null);
}
@Test
public void testWriteThrowsException() {
mockPutInfluxDB = new PutInfluxDB() {
@Override
protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy,
String records) {
throw new RuntimeException("WriteException");
}
};
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "u1");
runner.setProperty(PutInfluxDB.PASSWORD, "p1");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
byte [] bytes = "test".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"WriteException");
}
@Test
public void testWriteThrowsIOException() {
mockPutInfluxDB = new PutInfluxDB() {
@Override
protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy,
String records) {
throw new InfluxDBIOException(new EOFException("EOFException"));
}
};
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "u1");
runner.setProperty(PutInfluxDB.PASSWORD, "p1");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
byte [] bytes = "test".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"java.io.EOFException: EOFException");
}
@Test
public void testWriteThrowsSocketTimeoutException() {
mockPutInfluxDB = new PutInfluxDB() {
@Override
protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy,
String records) {
throw new InfluxDBIOException(new SocketTimeoutException("SocketTimeoutException"));
}
};
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "u1");
runner.setProperty(PutInfluxDB.PASSWORD, "p1");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
byte [] bytes = "test".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_RETRY, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_RETRY);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"java.net.SocketTimeoutException: SocketTimeoutException");
}
@Test
public void testTriggerThrowsException() {
mockPutInfluxDB = new PutInfluxDB() {
@Override
protected InfluxDB getInfluxDB(ProcessContext context) {
throw new RuntimeException("testException");
}
};
runner = TestRunners.newTestRunner(mockPutInfluxDB);
runner.setProperty(PutInfluxDB.DB_NAME, "test");
runner.setProperty(PutInfluxDB.USERNAME, "u1");
runner.setProperty(PutInfluxDB.PASSWORD, "p1");
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid();
byte [] bytes = "test".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"testException");
}
@Test
public void testValidArrayMeasurement() {
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 MB");
runner.assertValid();
runner.enqueue("test rain=2\ntest rain=3".getBytes());
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), null);
}
@Test
public void testInvalidEmptySingleMeasurement() {
byte [] bytes = "".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE);
assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Empty measurement size 0");
}
}

View File

@ -1,42 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-influxdb-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-influxdb-processors</module>
<module>nifi-influxdb-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-influxdb-processors</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -71,7 +71,6 @@
<module>nifi-snmp-bundle</module>
<module>nifi-datadog-bundle</module>
<module>nifi-windows-event-log-bundle</module>
<module>nifi-influxdb-bundle</module>
<module>nifi-graph-bundle</module>
<module>nifi-email-bundle</module>
<module>nifi-groovyx-bundle</module>

View File

@ -1167,8 +1167,6 @@
!ITMoveAzureDataLakeStorage,
!AzureGraphUserGroupProviderIT,
!JMSPublisherConsumerIT#validateMessageRedeliveryWhenNotAcked,
!ITPutInfluxDB,
!ITExecuteInfluxDBQuery,
!GremlinClientServiceYamlSettingsAndBytecodeIT,
!GremlinClientServiceControllerSettingsIT,
!ITestConsumeEmail#validateUrl,